Skip to content

Commit

Permalink
khepri_machine: Skip extra query before a query with a fence
Browse files Browse the repository at this point in the history
... if it is unneeded.

[Why]
We do that extra query to ensure that previous async commands were
handled by the local Ra server before we proceed with the query with a
fence. This comes with a performance penalty of course.

We don't need that extra query if the previous command or query made by
the calling process was synchronous.

[How]
We now keep a flag in the calling process dictionary to indicate if the
last command was synchonous or it was a query. The flag is cleared with
an async command.

When we have to perform a query with a fence, we look at this flag to
determine if the extra query is needed.
  • Loading branch information
dumbbell committed Aug 13, 2024
1 parent f67b5c9 commit a705687
Showing 1 changed file with 97 additions and 14 deletions.
111 changes: 97 additions & 14 deletions src/khepri_machine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,12 @@ do_process_sync_command(StoreId, Command, Options) ->
CommandOptions = #{timeout => Timeout, reply_from => ReplyFrom},
T0 = khepri_utils:start_timeout_window(Timeout),
Dest = case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined -> LeaderId;
undefined -> RaServer
LeaderId when LeaderId =/= undefined ->
sending_command_remotely(StoreId),
LeaderId;
undefined ->
sending_sync_command_locally(StoreId),
RaServer
end,
case ra:process_command(Dest, Command, CommandOptions) of
{ok, Ret, _LeaderId} ->
Expand Down Expand Up @@ -903,15 +907,18 @@ process_async_command(
StoreId, Command, ?DEFAULT_RA_COMMAND_CORRELATION = Correlation, Priority) ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
sending_async_command_locally(StoreId),
ra:pipeline_command(RaServer, Command, Correlation, Priority);
process_async_command(
StoreId, Command, Correlation, Priority) ->
case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
sending_command_remotely(StoreId),
ra:pipeline_command(LeaderId, Command, Correlation, Priority);
undefined ->
ThisNode = node(),
RaServer = khepri_cluster:node_to_member(StoreId, ThisNode),
sending_async_command_locally(StoreId),
ra:pipeline_command(RaServer, Command, Correlation, Priority)
end.

Expand Down Expand Up @@ -983,6 +990,7 @@ process_query(StoreId, QueryFun, Options) ->
end.

process_query1(StoreId, QueryFun, Options) ->
sending_query_locally(StoreId),
LocalServerId = {StoreId, node()},
case ra:local_query(LocalServerId, QueryFun, Options) of
{ok, {_RaIdxTerm, Ret}, _NewLeaderId} ->
Expand All @@ -1009,9 +1017,10 @@ add_applied_condition1(StoreId, Options, Timeout) ->
%% the order of operations between updates and queries. We have to follow
%% several steps to prepare that condition.
%%
%% We first send an arbitrary query to the local Ra server. This is to
%% make sure that previously submitted pipelined commands were processed
%% by that server.
%% If the last message from the calling process to the local Ra server was
%% an async command or if it never sent a command yet, we first send an
%% arbitrary query to the local Ra server. This is to make sure that
%% previously submitted pipelined commands were processed by that server.
%%
%% For instance, if there was a pipelined command without any correlation
%% ID, it ensures it was forwarded to the leader. Likewise for a
Expand All @@ -1020,23 +1029,32 @@ add_applied_condition1(StoreId, Options, Timeout) ->
%% We can't have this guaranty for pipelined commands with a correlation
%% because the caller is responsible for receiving the rejection from the
%% follower and handle the redirect to the leader.
T0 = khepri_utils:start_timeout_window(Timeout),
QueryFun = fun erlang:is_tuple/1,
case process_query1(StoreId, QueryFun, Timeout) of
case can_skip_fence_preliminary_query(StoreId) of
true ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition2(StoreId, Options, NewTimeout);
Other when Other =/= false ->
Other
add_applied_condition2(StoreId, Options, Timeout);
false ->
T0 = khepri_utils:start_timeout_window(Timeout),
QueryFun = fun erlang:is_tuple/1,
case process_query1(StoreId, QueryFun, Timeout) of
true ->
NewTimeout = khepri_utils:end_timeout_window(Timeout, T0),
add_applied_condition2(StoreId, Options, NewTimeout);
Other when Other =/= false ->
Other
end
end.

add_applied_condition2(StoreId, Options, Timeout) ->
%% After the previous local query, there is a great chance that the leader
%% was cached, though not 100% guarantied.
%% After the previous local query or sync command if there was one, there
%% is a great chance that the leader was cached, though not 100%
%% guarantied.
case ra_leaderboard:lookup_leader(StoreId) of
LeaderId when LeaderId =/= undefined ->
add_applied_condition3(StoreId, Options, LeaderId, Timeout);
undefined ->
%% If the leader is unknown, executing a preliminary query should
%% tell us who the leader is.
ask_fence_preliminary_query(StoreId),
add_applied_condition1(StoreId, Options, Timeout)
end.

Expand Down Expand Up @@ -1092,6 +1110,71 @@ get_timeout(_) -> khepri_app:get_default_timeout().
clear_cache(_StoreId) ->
ok.

-define(CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
{?MODULE, can_skip_fence_preliminary_query, StoreId}).

-spec sending_sync_command_locally(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that a synchronous command is about to be sent locally.
%%
%% After that, we know we don't need a fence preliminary query.

sending_sync_command_locally(StoreId) ->
Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
_ = erlang:put(Key, true),
ok.

-spec sending_query_locally(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that a query is about to be executed locally.
%%
%% After that, we know we don't need a fence preliminary query.

sending_query_locally(StoreId) ->
%% Same behavior as a local sync command.
sending_sync_command_locally(StoreId).

-spec sending_async_command_locally(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that an asynchronous command is about to be sent locally.

sending_async_command_locally(StoreId) ->
Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
_ = erlang:erase(Key),
ok.

-spec sending_command_remotely(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Records that a command is about to be sent to a remote store.

sending_command_remotely(StoreId) ->
%% Same behavior as a local async command.
sending_async_command_locally(StoreId).

-spec ask_fence_preliminary_query(StoreId) -> ok when
StoreId :: khepri:store_id().
%% @doc Explicitly requests that a call to {@link
%% can_skip_fence_preliminary_query/1} returns `true'.

ask_fence_preliminary_query(StoreId) ->
%% Same behavior as a local async command.
sending_async_command_locally(StoreId).

-spec can_skip_fence_preliminary_query(StoreId) -> LastMsgWasSync when
StoreId :: khepri:store_id(),
LastMsgWasSync :: boolean().
%% @doc Indicates if the calling process sent a synchronous command or a query
%% before this call.
%%
%% @returns `true' if the calling process sent a synchrorous command or a
%% query to the given store before this call, `false' if the calling process
%% never sent anything to the given store, or if the last message was an
%% asynchrorous command, th

can_skip_fence_preliminary_query(StoreId) ->
Key = ?CAN_SKIP_FENCE_PRELIMINARY_QUERY_KEY(StoreId),
erlang:get(Key) =:= true.

%% -------------------------------------------------------------------
%% ra_machine callbacks.
%% -------------------------------------------------------------------
Expand Down

0 comments on commit a705687

Please sign in to comment.