diff --git a/src/syn_gen_scope.erl b/src/syn_gen_scope.erl index 7c194a93..2a9b941e 100644 --- a/src/syn_gen_scope.erl +++ b/src/syn_gen_scope.erl @@ -36,7 +36,8 @@ -export([ broadcast/2, broadcast/3, - send_to_node/3 + send_to_node/3, + send_to_node_ordered/3 ]). %% gen_server callbacks @@ -127,6 +128,10 @@ broadcast(Message, ExcludedNodes, #state{multicast_pid = MulticastPid} = State) send_to_node(RemoteNode, Message, #state{process_name = ProcessName}) -> {ProcessName, RemoteNode} ! Message. +-spec send_to_node_ordered(RemoteNode :: node(), Message :: term(), #state{}) -> any(). +send_to_node_ordered(RemoteNode, Message, #state{multicast_pid = MulticastPid, process_name = ProcessName}) -> + MulticastPid ! {send_single, RemoteNode, Message, ProcessName}. + %% =================================================================== %% Callbacks %% =================================================================== @@ -208,9 +213,9 @@ handle_info({'3.0', discover, RemoteScopePid}, #state{ error_logger:info_msg("SYN[~s|~s<~s>] Received DISCOVER request from node ~s", [node(), HandlerLogName, Scope, RemoteScopeNode] ), - %% send local data to remote + %% send local data to remote (ordered to maintain FIFO with broadcasts) {ok, LocalData} = Handler:get_local_data(State), - send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State), + send_to_node_ordered(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State), %% is this a new node? case maps:is_key(RemoteScopeNode, NodesMap) of true -> @@ -244,9 +249,9 @@ handle_info({'3.0', ack_sync, RemoteScopePid, Data}, #state{ false -> %% monitor _MRef = monitor(process, RemoteScopePid), - %% send local to remote + %% send local to remote (ordered to maintain FIFO with broadcasts) {ok, LocalData} = Handler:get_local_data(State), - send_to_node(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State), + send_to_node_ordered(RemoteScopeNode, {'3.0', ack_sync, self(), LocalData}, State), %% return {noreply, State#state{nodes_map = NodesMap#{RemoteScopeNode => RemoteScopePid}}} end; @@ -339,6 +344,10 @@ multicast_loop() -> end, maps:keys(NodesMap) -- ExcludedNodes), multicast_loop(); + {send_single, RemoteNode, Message, ProcessName} -> + {ProcessName, RemoteNode} ! Message, + multicast_loop(); + terminate -> terminated end. diff --git a/src/syn_pg.erl b/src/syn_pg.erl index 66b5a169..195b87e1 100644 --- a/src/syn_pg.erl +++ b/src/syn_pg.erl @@ -425,7 +425,7 @@ handle_info({'3.0', sync_join, GroupName, Pid, Meta, Time, Reason}, #state{nodes handle_pg_sync(GroupName, Pid, Meta, Time, Reason, State); false -> - %% ignore, race condition + %% ignore, node not yet discovered (ack_sync not yet received) ok end, {noreply, State}; diff --git a/src/syn_registry.erl b/src/syn_registry.erl index 774e750a..7f22713c 100755 --- a/src/syn_registry.erl +++ b/src/syn_registry.erl @@ -320,7 +320,7 @@ handle_info({'3.0', sync_register, Name, Pid, Meta, Time, Reason}, #state{nodes_ handle_registry_sync(Name, Pid, Meta, Time, Reason, State); false -> - %% ignore, race condition + %% ignore, node not yet discovered (ack_sync not yet received) ok end, {noreply, State};