Skip to content

Commit

Permalink
Enable async_dist when processing send_msg effects
Browse files Browse the repository at this point in the history
for remote nodes.

This will allow users to simplify their Ra client implementations
not to have to handle lost messages as could occur
when the distribution buffer fills.
  • Loading branch information
kjnilsson committed Jan 22, 2025
1 parent 7b69b45 commit e957465
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 12 deletions.
27 changes: 20 additions & 7 deletions src/ra_server_proc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,19 @@
-define(HANDLE_EFFECTS(Effects, EvtType, State0),
handle_effects(?FUNCTION_NAME, Effects, EvtType, State0)).

-define(ASYNC_DIST(Node, Send),
case Node == node() of
true ->
Send,
ok;
false ->
%% use async_dist for remote sends
process_flag(async_dist, true),
Send,
process_flag(async_dist, false),
ok
end).

-type query_fun() :: ra:query_fun().
-type query_options() :: #{condition => ra:query_condition()}.

Expand Down Expand Up @@ -1345,8 +1358,8 @@ handle_effect(_, {next_event, _, _} = Next, _, State, Actions) ->
{State, [Next | Actions]};
handle_effect(_, {send_msg, To, Msg}, _, State, Actions) ->
%% default is to send without any wrapping
%% TODO: handle send failure? how?
_ = send(To, Msg, State#state.conf),
ToNode = get_node(To),
?ASYNC_DIST(ToNode, _ = send(To, Msg, State#state.conf)),
{State, Actions};
handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff,
_, State, Actions) ->
Expand All @@ -1355,13 +1368,13 @@ handle_effect(RaftState, {send_msg, To, _Msg, Options} = Eff,
true ->
case can_execute_locally(RaftState, ToNode, State) of
true ->
send_msg(Eff, State);
?ASYNC_DIST(ToNode, send_msg(Eff, State));
false ->
ok
end;
false when RaftState == leader ->
%% the effect got here so we can execute
send_msg(Eff, State);
?ASYNC_DIST(ToNode, send_msg(Eff, State));
false ->
ok
end,
Expand Down Expand Up @@ -1935,10 +1948,10 @@ handle_tick_metrics(State) ->

can_execute_locally(RaftState, TargetNode,
#state{server_state = ServerState} = State) ->
Membership = ra_server:get_membership(ServerState),
case RaftState of
follower when Membership == voter ->
TargetNode == node();
follower ->
TargetNode == node() andalso
voter == ra_server:get_membership(ServerState);
leader when TargetNode =/= node() ->
%% We need to evaluate whether to send the message.
%% Only send if there isn't a local node for the target pid.
Expand Down
4 changes: 2 additions & 2 deletions src/ra_system.erl
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@
low_priority_commands_flush_size => non_neg_integer(),
low_priority_commands_in_memory_size => non_neg_integer(),
server_recovery_strategy => undefined |
registered |
{module(), atom(), list()}
registered |
{module(), atom(), list()}
}.

-export_type([
Expand Down
7 changes: 4 additions & 3 deletions test/ra_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,16 @@ end_per_testcase(_TestCase, Config) ->
Config.

single_server_processes_command(Config) ->
% ok = logger:set_primary_config(level, all),
Name = ?config(test_name, Config),
N1 = nth_server_name(Config, 1),
{_RaName, _} = N1 = nth_server_name(Config, 1),
ok = ra:start_server(default, Name, N1, add_machine(), []),
ok = ra:trigger_election(N1),
% index is 2 as leaders commit a no-op entry on becoming leaders
monitor(process, element(1, N1)),
{ok, 5, _} = ra:process_command(N1, 5, 2000),
{ok, 10, _} = ra:process_command(N1, 5, 2000),
terminate_cluster([N1]).
terminate_cluster([N1]),
ok.

pipeline_commands(Config) ->
Name = ?config(test_name, Config),
Expand Down

0 comments on commit e957465

Please sign in to comment.