Skip to content

Commit

Permalink
Use selective receive to allow for vnode to be blocked (#27)
Browse files Browse the repository at this point in the history
* Use selective receive to allow for vnode to be blocked

Block and unblock a vnode when triggering a repair.  Otherwise their is a potential race condition:

- aae_controller is triggered into repair mode, queueing all updates to be re-applied after rebuild complete
- update is applied to leveled backend by vnode
- snapshot is taken for rebuild (including update)
- message is cast to aae_controller with update ... which will be queued and applied twice
- update is queue

* Handle message outside of selective receive

If the selective receive times out - then need to handle the release message

* Add configurable block time
  • Loading branch information
martinsumner authored Apr 9, 2024
1 parent aec41e6 commit 0c060a3
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 11 deletions.
18 changes: 16 additions & 2 deletions priv/riak_kv.schema
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@
{commented, disabled}
]}.

%% @doc Frequency to prompt exchange per vnode
%% @doc Frequency to prompt exchange per vnode (milliseconds)
%% The number of milliseconds which the vnode must wait between self-pokes to
%% maybe prompt the next exchange. Default is 8 minutes - check all partitions
%% when n=3 once every hour (in each direction). A cycle of exchanges will
Expand All @@ -161,7 +161,7 @@
hidden
]}.

%% @doc Frequency to prompt rebuild check per vnode
%% @doc Frequency to prompt rebuild check per vnode (milliseconds)
%% The number of milliseconds which the vnode must wait between self-pokes to
%% maybe prompt the next rebuild. Default is 60 minutes.
%% When a node is being re-introduced to a cluster following a long delay, then
Expand All @@ -173,6 +173,20 @@
hidden
]}.

%% @doc Block vnode for tree rebuild (milliseconds)
%% When rebuilding a vnode's tree cache, the vnode can be blocked while the
%% snapshot is taken - to eliminate race conditions that might otherwise cause
%% a segment to be miscalculated. By default the vnode block is always
%% released within 1000ms i.e. should a process crash or there be a delay in
%% the expected release, the block will still be released after this delay, and
%% the risk of miscalculation will be accepted in preference to having a longer
%% block
{mapping, "tictacaae_rebuild_blocktime", "riak_kv.tictacaae_rebuild_blocktime", [
{datatype, integer},
{default, 1000},
hidden
]}.

%% @doc Max number of leaf IDs per exchange
%% To control the length of time for each exchange, only a subset of the
%% conflicting leaves will be compared on each exchange. If there are issues
Expand Down
60 changes: 51 additions & 9 deletions src/riak_kv_vnode.erl
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@
-define(REAPER_BATCH_SIZE, 1024).
-define(ERASER_BATCH_SIZE, 1024).

-define(INIT_REBUILD_BLOCKTIME, 1000).

%% Erlang's if Bool -> thing; true -> thang end. syntax hurts my
%% brain. It scans as if true -> thing; true -> thang end. So, here is
%% a macro, ?ELSE to use in if statements. You're welcome.
Expand Down Expand Up @@ -439,21 +441,35 @@ queue_tictactreerebuild(AAECntrl, Partition, OnlyIfBroken, State) ->
fun() ->
?LOG_INFO("Starting tree rebuild for partition=~w", [Partition]),
SW = os:timestamp(),
case when_loading_complete(AAECntrl,
Preflists,
fun preflistfun/2,
OnlyIfBroken) of
BlockRequest = self(),
BlockTimeMS =
application:get_env(
riak_kv,
tictacaae_rebuild_blocktime,
?INIT_REBUILD_BLOCKTIME
),
{blocked, VnodePid} =
riak_core_vnode_master:sync_command(
{Partition, node()},
{block_vnode, BlockRequest, BlockTimeMS},
riak_kv_vnode_master,
infinity),
case when_loading_complete(
AAECntrl, Preflists, fun preflistfun/2, OnlyIfBroken) of
{ok, StoreFold, FinishFun} ->
VnodePid ! {release_vnode, BlockRequest},
Output = StoreFold(),
FinishFun(Output),
Duration =
timer:now_diff(os:timestamp(), SW) div (1000 * 1000),
?LOG_INFO("Tree rebuild complete for partition=~w" ++
" in duration=~w seconds",
[Partition, Duration]);
?LOG_INFO(
"Tree rebuild complete for partition=~w"
" in duration=~w seconds",
[Partition, Duration]);
skipped ->
?LOG_INFO("Tree rebuild skipped for partition=~w",
[Partition])
VnodePid ! {release_vnode, BlockRequest},
?LOG_INFO(
"Tree rebuild skipped for partition=~w", [Partition])
end,
ok
end,
Expand Down Expand Up @@ -1485,6 +1501,26 @@ handle_command({reset_hashtree_tokens, MinToken, MaxToken}, _Sender, State) ->
end,
{reply, ok, State};

handle_command({block_vnode, BlockRequest, BlockTimeMS}, Sender, State) ->
riak_core_vnode:reply(Sender, {blocked, self()}),
SW = os:timestamp(),
receive
{release_vnode, BlockRequest} ->
LockedTime = timer:now_diff(os:timestamp(), SW),
?LOG_INFO(
"Vnode block released for ~w request ~w after"
"block_time=~w microseconds",
[Sender, BlockRequest, LockedTime]),
{noreply, State}
after
BlockTimeMS ->
?LOG_WARNING(
"Vnode block request timed out after ~w for ~w request ~w",
[BlockTimeMS, Sender, BlockRequest]
),
{noreply, State}
end;

handle_command(Req, Sender, State) ->
handle_request(riak_kv_requests:request_type(Req), Req, Sender, State).

Expand Down Expand Up @@ -2640,6 +2676,12 @@ handle_info({aae_pong, QueueTime}, State) ->
ok
end,
{ok, State};
handle_info({release_vnode, BlockRequest}, State) ->
?LOG_WARNING(
"Vnode block release request ~w received outside of block",
[BlockRequest]
),
{ok, State};
handle_info({Ref, ok}, State) ->
?LOG_INFO("Ignoring ok returned after timeout for Ref ~p", [Ref]),
{ok, State}.
Expand Down

0 comments on commit 0c060a3

Please sign in to comment.