Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check KV ready before starting nextgenrepl #23

Merged
merged 2 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions src/riak_kv_replrtq_peer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
-define(AUTO_DISCOVERY_MAXIMUM_SECONDS, 900).
-define(AUTO_DISCOVERY_MINIMUM_SECONDS, 60).


-record(state, {discovery_peers = [] :: list(discovery_peer())}).

%%%============================================================================
Expand Down Expand Up @@ -86,17 +87,8 @@ init([]) ->
DefaultQueue = app_helper:get_env(riak_kv, replrtq_sinkqueue),
SnkQueuePeerInfo =
riak_kv_replrtq_snk:tokenise_peers(DefaultQueue, SinkPeers),

MinDelay =
application:get_env(riak_kv,
replrtq_prompt_min_seconds,
?AUTO_DISCOVERY_MINIMUM_SECONDS),

lists:foreach(
fun({QueueName, _PeerInfo}) ->
_ = schedule_discovery(QueueName, self(), MinDelay)
end,
SnkQueuePeerInfo),
erlang:send_after(
riak_kv_util:ngr_initial_timeout(), self(), deferred_start),
{ok, #state{discovery_peers = SnkQueuePeerInfo}};
false ->
{ok, #state{}}
Expand Down Expand Up @@ -131,6 +123,33 @@ handle_cast({prompt_discovery, QueueName}, State) ->
_ = do_discovery(QueueName, PeerInfo, regular),
{noreply, State}.

handle_info(deferred_start, State) ->
MinDelay =
application:get_env(
riak_kv,
replrtq_prompt_min_seconds,
?AUTO_DISCOVERY_MINIMUM_SECONDS),
case riak_kv_util:kv_ready() of
true ->
lists:foreach(
fun({QueueName, PeerInfo}) ->
_ = schedule_discovery(QueueName, self(), MinDelay),
?LOG_INFO(
"Initiated real-time repl peer ~p for queue ~p",
[PeerInfo, QueueName])
end,
State#state.discovery_peers),
{noreply, State};
false ->
?LOG_INFO(
"Real-time repl peer discovery waiting ~w ms "
"to initialise as riak_kv not ready",
[MinDelay]
),
erlang:send_after(
riak_kv_util:ngr_initial_timeout(), self(), deferred_start),
{noreply, State}
end;
handle_info({scheduled_discovery, QueueName}, State) ->
ok = prompt_discovery(QueueName),
MinDelay =
Expand All @@ -152,6 +171,7 @@ handle_info({Ref, {error, HTTPClientError}}, State) when is_reference(Ref) ->
[HTTPClientError]),
{noreply, State}.


terminate(_Reason, _State) ->
ok.

Expand Down
24 changes: 18 additions & 6 deletions src/riak_kv_replrtq_snk.erl
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
-define(STARTING_DELAYMS, 8).
-define(MAX_SUCCESS_DELAYMS, 1024).
-define(ON_ERROR_DELAYMS, 65536).
-define(INITIAL_TIMEOUT_MS, 60000).
-define(DEFAULT_WORKERCOUNT, 1).

-record(sink_work, {queue_name :: queue_name(),
Expand Down Expand Up @@ -410,9 +409,21 @@ handle_cast({requeue_work, WorkItem}, State) ->
{noreply, State}
end.

handle_info(deferred_start, State) ->
prompt_work(),
erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats),
handle_info(deferred_start, State) ->
case riak_kv_util:kv_ready() of
true ->
?LOG_INFO("Initiated real-time repl sink"),
prompt_work(),
erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats);
false ->
?LOG_INFO(
"Real-time repl sink waiting ~w ms "
"to initialise as riak_kv not ready",
[riak_kv_util:ngr_initial_timeout()]
),
erlang:send_after(
riak_kv_util:ngr_initial_timeout(), self(), deferred_start)
end,
{noreply, State};
handle_info(log_stats, State) ->
erlang:send_after(?LOG_TIMER_SECONDS * 1000, self(), log_stats),
Expand Down Expand Up @@ -453,7 +464,8 @@ handle_continue(initialise_work, State) ->
{SnkQueueName, Iteration, SnkW}
end,
Work = lists:map(MapPeerInfoFun, SnkQueuePeerInfo),
erlang:send_after(?INITIAL_TIMEOUT_MS, self(), deferred_start),
erlang:send_after(
riak_kv_util:ngr_initial_timeout(), self(), deferred_start),
{noreply, State#state{enabled = true, work = Work}}.

terminate(_Reason, State) ->
Expand Down Expand Up @@ -678,7 +690,7 @@ close_pbc_client(PBC) ->
%% @doc
%% For an item of work which has been removed from the work queue, spawn a
%% snk worker (using the repl_fetcher fun) to manage that item of work. The
%% worker must ensure the wortk_item is delivered back on completion.
%% worker must ensure the work_item is delivered back on completion.
-spec do_work(sink_work()) -> sink_work().
do_work({QueueName, Iteration, SinkWork}) ->
WorkQueue = SinkWork#sink_work.work_queue,
Expand Down
38 changes: 27 additions & 11 deletions src/riak_kv_ttaaefs_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,6 @@
-include_lib("kernel/include/logger.hrl").

-define(SECONDS_IN_DAY, 86400).
-define(INITIAL_TIMEOUT, 60000).
% Wait a minute before the first allocation is considered, Lot may be
% going on at a node immeidately at startup
-define(LOOP_TIMEOUT, 15000).
% Always wait at least 15s after completing an action before
% prompting another
Expand Down Expand Up @@ -326,9 +323,9 @@ init([]) ->
queue_name = SrcQueueName,
peer_queue_name = PeerQueueName,
check_window = CheckWindow},

?LOG_INFO("Initiated Tictac AAE Full-Sync Mgr with scope=~w", [Scope]),
{ok, State2, ?INITIAL_TIMEOUT}.
erlang:send_after(
riak_kv_util:ngr_initial_timeout(), self(), deferred_start),
{ok, State2}.

handle_call(pause, _From, State) ->
case State#state.is_paused of
Expand All @@ -351,7 +348,7 @@ handle_call(pause, _From, State) ->
slice_allocations = [],
slice_set_start = undefined,
is_paused = true},
?INITIAL_TIMEOUT}
riak_kv_util:ngr_initial_timeout()}
end;
handle_call(resume, _From, State) ->
case State#state.is_paused of
Expand All @@ -364,16 +361,19 @@ handle_call(resume, _From, State) ->
is_paused = false,
slice_allocations = [],
slice_set_start = undefined},
?INITIAL_TIMEOUT};
riak_kv_util:ngr_initial_timeout()};
false ->
{reply, {error, not_paused}, State, ?INITIAL_TIMEOUT}
{reply,
{error, not_paused},
State,
riak_kv_util:ngr_initial_timeout()}
end;
handle_call({set_sink, Protocol, PeerIP, PeerPort}, _From, State) ->
State0 =
State#state{peer_ip = PeerIP,
peer_port = PeerPort,
peer_protocol = Protocol},
{reply, ok, State0, ?INITIAL_TIMEOUT};
{reply, ok, State0, riak_kv_util:ngr_initial_timeout()};
handle_call({set_queuename, QueueName}, _From, State) ->
{reply, ok, State#state{queue_name = QueueName}};
handle_call({set_allsync, LocalNVal, RemoteNVal}, _From, State) ->
Expand Down Expand Up @@ -633,7 +633,23 @@ handle_cast({auto_check, ReqID, From, Now}, State) ->
end,
{noreply, State}.


handle_info(deferred_start, State) ->
case riak_kv_util:kv_ready() of
true ->
?LOG_INFO(
"Initiated Tictac AAE Full-Sync Mgr with scope=~w",
[State#state.scope]),
handle_info(timeout, State);
false ->
?LOG_INFO(
"Tictac AAE Full-Sync Mgr waiting ~w ms "
"to initialise as riak_kv not ready",
[riak_kv_util:ngr_initial_timeout()]
),
erlang:send_after(
riak_kv_util:ngr_initial_timeout(), self(), deferred_start),
{noreply, State}
end;
handle_info(timeout, State) ->
SlotInfoFun = State#state.slot_info_fun,
SlotInfo = SlotInfoFun(),
Expand Down
14 changes: 13 additions & 1 deletion src/riak_kv_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@
overload_reply/1,
get_backend_config/3,
is_modfun_allowed/2,
shuffle_list/1]).
shuffle_list/1,
kv_ready/0,
ngr_initial_timeout/0
]).
-export([report_hashtree_tokens/0, reset_hashtree_tokens/2]).

-include_lib("kernel/include/logger.hrl").
Expand Down Expand Up @@ -214,7 +217,16 @@ get_write_once(Bucket) ->
Err
end.

-spec kv_ready() -> boolean().
kv_ready() ->
lists:member(riak_kv, riak_core_node_watcher:services(node())).

%% @doc
%% Replication services may wait a period on startup to ensure stability before
%% commencing. Default 60s. Normally only modified in test.
-spec ngr_initial_timeout() -> pos_integer().
ngr_initial_timeout() ->
application:get_env(riak_kv, ngr_initial_timeout, 60000).

%% ===================================================================
%% Hashtree token management functions
Expand Down
Loading