Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Ra server resilience when log infrastructure encounters faults #428

Merged
merged 5 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@ PLT_APPS += eunit proper syntax_tools erts kernel stdlib common_test inets aten
EDOC_OUTPUT = docs
EDOC_OPTS = {pretty_printer, erl_pp}, {sort_functions, false}

COVER_EXCLUDE_MODS = ra_server_meck_original \
ra_server_proc_meck_original \
ra_log_wal_meck_original \
ra_log_segment_writer_meck_original \
ra_log_meck_original \
ra_snapshot_meck_original \
ra_machine_meck_original \
ra_log_meta_meck_original

all::

escript-zip::
Expand Down
33 changes: 33 additions & 0 deletions src/ra_file.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2017-2023 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries.
%%
%% @hidden
-module(ra_file).

-include("ra.hrl").

-define(HANDLE_EAGAIN(Op),
case Op of
{error, eagain} ->
?DEBUG("EAGAIN during file operation, retrying once in 10ms...", []),
timer:sleep(10),
case Op of
{error, eagain} = Err ->
?DEBUG("EAGAIN again during file operation", []),
Err;
Res ->
Res
end;
Res ->
Res
end).

-export([
sync/1
]).

sync(Fd) ->
?HANDLE_EAGAIN(file:sync(Fd)).
19 changes: 0 additions & 19 deletions src/ra_leaderboard.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,22 +64,3 @@ lookup(ClusterName) ->
error:badarg ->
undefined
end.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").

lookup_leader_test() ->
ClusterName = <<"mah-cluster">>,
?assertEqual(undefined, lookup_leader(ClusterName)),
init(),
?assertEqual(undefined, lookup_leader(ClusterName)),
Me = {me, node()},
record(ClusterName, Me, [Me]),
?assertEqual(Me, lookup_leader(ClusterName)),
?assertEqual([Me], lookup_members(ClusterName)),
You = {you, node()},
record(ClusterName, You, [Me, You]),
?assertEqual(You, lookup_leader(ClusterName)),
?assertEqual([Me, You], lookup_members(ClusterName)),

ok.
-endif.
2 changes: 1 addition & 1 deletion src/ra_lib.erl
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ sync_file(Name) ->
-spec sync_and_close_fd(file:fd()) ->
ok | file_err().
sync_and_close_fd(Fd) ->
case file:sync(Fd) of
case ra_file:sync(Fd) of
ok ->
file:close(Fd);
Err ->
Expand Down
152 changes: 122 additions & 30 deletions src/ra_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
sparse_read/2,
last_index_term/1,
set_last_index/2,
reset_to_last_known_written/1,
handle_event/2,
last_written/1,
fetch/2,
Expand Down Expand Up @@ -47,7 +48,8 @@

% external reader
register_reader/2,
readers/1
readers/1,
tick/2
]).

-include("ra.hrl").
Expand All @@ -56,6 +58,7 @@
-define(MIN_SNAPSHOT_INTERVAL, 4096).
-define(MIN_CHECKPOINT_INTERVAL, 16384).
-define(LOG_APPEND_TIMEOUT, 5000).
-define(WAL_RESEND_TIMEOUT, 5000).

-type ra_meta_key() :: atom().
-type segment_ref() :: {From :: ra_index(), To :: ra_index(),
Expand Down Expand Up @@ -104,7 +107,8 @@
% if this is set a snapshot write is in progress for the
% index specified
cache = ra_log_cache:init() :: ra_log_cache:state(),
last_resend_time :: option(integer()),
last_resend_time :: option({integer(), WalPid :: pid() | undefined}),
last_wal_write :: {pid(), Ms :: integer()},
reader :: ra_log_reader:state(),
readers = [] :: [pid()]
}).
Expand Down Expand Up @@ -233,7 +237,8 @@ init(#{uid := UId,
first_index = max(SnapIdx + 1, FirstIdx),
last_index = max(SnapIdx, LastIdx0),
reader = Reader,
snapshot_state = SnapshotState
snapshot_state = SnapshotState,
last_wal_write = {whereis(Wal), now_ms()}
},
put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx),
LastIdx = State000#?MODULE.last_index,
Expand Down Expand Up @@ -307,9 +312,13 @@ write([{FstIdx, _, _} = First | Rest] = Entries,
% it is the next entry after a snapshot
% we need to tell the wal to truncate as we
% are not going to receive any entries prior to the snapshot
State0 = wal_truncate_write(State00, First),
% write the rest normally
write_entries(Rest, State0);
try wal_truncate_write(State00, First) of
State0 ->
% write the rest normally
write_entries(Rest, State0)
catch error:wal_down ->
{error, wal_down}
end;
_ ->
write_entries(Entries, State00)
end;
Expand Down Expand Up @@ -431,6 +440,26 @@ set_last_index(Idx, #?MODULE{cfg = Cfg,
last_written_index_term = {LWIdx, LWTerm}}}
end.

%% this function forces both last_index and last_written_index_term to
%% the last know index to be written to the wal.
%% This is only used after the wal has been detected down
%% to try to avoid ever having to resend data to the wal
-spec reset_to_last_known_written(state()) -> state().
reset_to_last_known_written(#?MODULE{cfg = Cfg,
cache = Cache0,
last_index = LastIdx,
last_written_index_term = LW} = State0) ->
{Idx, Term, State} = last_index_term_in_wal(LastIdx, State0),
?DEBUG("~ts ~s: index: ~b term: ~b: previous ~w",
[Cfg#cfg.log_id, ?FUNCTION_NAME, Idx, Term, LW]),
Cache = ra_log_cache:set_last(Idx, Cache0),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, Idx),
State#?MODULE{last_index = Idx,
last_term = Term,
cache = Cache,
last_written_index_term = {Idx, Term}}.

-spec handle_event(event_body(), state()) ->
{state(), [effect()]}.
handle_event({written, {FromIdx, _ToIdx, _Term}},
Expand Down Expand Up @@ -480,15 +509,15 @@ handle_event({written, {FromIdx, ToIdx0, Term}},
[State#?MODULE.cfg#cfg.log_id, Term, ToIdx, OtherTerm]),
{State, []}
end;
handle_event({written, {FromIdx, _, _}},
handle_event({written, {FromIdx, _, _Term}},
#?MODULE{cfg = #cfg{log_id = LogId},
last_written_index_term = {LastWrittenIdx, _}} = State0)
last_written_index_term = {LastWrittenIdx, _}} = State)
when FromIdx > LastWrittenIdx + 1 ->
% leaving a gap is not ok - resend from cache
% leaving a gap is not ok - may need to resend from cache
Expected = LastWrittenIdx + 1,
?DEBUG("~ts: ra_log: written gap detected at ~b expected ~b!",
[LogId, FromIdx, Expected]),
{resend_from(Expected, State0), []};
?INFO("~ts: ra_log: written gap detected at ~b expected ~b!",
[LogId, FromIdx, Expected]),
{resend_from(Expected, State), []};
handle_event({truncate_cache, FromIdx, ToIdx}, State) ->
truncate_cache(FromIdx, ToIdx, State, []);
handle_event(flush_cache, State) ->
Expand Down Expand Up @@ -711,6 +740,24 @@ flush_cache(#?MODULE{cache = Cache} = State) ->
needs_cache_flush(#?MODULE{cache = Cache}) ->
ra_log_cache:needs_flush(Cache).

-spec tick(Now :: integer(), state()) -> state().
tick(Now, #?MODULE{cfg = #cfg{wal = Wal},
cache = Cache,
last_written_index_term = {LastWrittenIdx, _},
last_wal_write = {WalPid, Ms}} = State) ->
CurWalPid = whereis(Wal),
case Now > Ms + ?WAL_RESEND_TIMEOUT andalso
CurWalPid =/= undefined andalso
CurWalPid =/= WalPid andalso
ra_log_cache:size(Cache) > 0 of
true ->
%% the wal has restarted, it has been at least 5s and there are
%% cached items, resend them
resend_from(LastWrittenIdx + 1, State);
false ->
State
end.

suggest_snapshot0(SnapKind, Idx, Cluster, MacVersion, MacState, State0) ->
ClusterServerIds = maps:map(fun (_, V) ->
maps:with([voter_status], V)
Expand Down Expand Up @@ -805,6 +852,7 @@ overview(#?MODULE{last_index = LastIndex,
last_written_index_term = LWIT,
snapshot_state = SnapshotState,
reader = Reader,
last_wal_write = {_LastPid, LastMs},
cache = Cache}) ->
#{type => ?MODULE,
last_index => LastIndex,
Expand All @@ -821,7 +869,8 @@ overview(#?MODULE{last_index = LastIndex,
undefined -> undefined;
{I, _} -> I
end,
cache_size => ra_log_cache:size(Cache)
cache_size => ra_log_cache:size(Cache),
last_wal_write => LastMs
}.

-spec write_config(ra_server:config(), state()) -> ok.
Expand Down Expand Up @@ -925,7 +974,7 @@ delete_segments(SnapIdx, #?MODULE{cfg = #cfg{log_id = LogId,
UId, Pivot)
end),
Active = ra_log_reader:segment_refs(Reader),
?DEBUG("~ts: ~b obsolete segments at ~b - remaining: ~b, pivot ~w",
?DEBUG("~ts: ~b obsolete segments at ~b - remaining: ~b, pivot ~0p",
[LogId, length(Obsolete), SnapIdx, length(Active), Pivot]),
State = State0#?MODULE{reader = Reader},
{State, log_update_effects(Readers, Pid, State)}
Expand All @@ -938,24 +987,47 @@ wal_truncate_write(#?MODULE{cfg = #cfg{uid = UId,
% this is the next write after a snapshot was taken or received
% we need to indicate to the WAL that this may be a non-contiguous write
% and that prior entries should be considered stale
ok = ra_log_wal:truncate_write({UId, self()}, Wal, Idx, Term, Cmd),
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
ok = put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx, last_term = Term,
cache = ra_log_cache:add(Entry, Cache)}.
case ra_log_wal:truncate_write({UId, self()}, Wal, Idx, Term, Cmd) of
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx, last_term = Term,
last_wal_write = {Pid, now_ms()},
cache = ra_log_cache:add(Entry, Cache)};
{error, wal_down} ->
error(wal_down)
end.

wal_write(#?MODULE{cfg = #cfg{uid = UId,
wal = Wal} = Cfg,
cache = Cache} = State,
{Idx, Term, Cmd} = Entry) ->
case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of
ok ->
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx, last_term = Term,
last_wal_write = {Pid, now_ms()},
cache = ra_log_cache:add(Entry, Cache)};
{error, wal_down} ->
exit(wal_down)
error(wal_down)
end.

%% unly used by resend to wal functionality and doesn't set the cache as it
%% is already set
wal_rewrite(#?MODULE{cfg = #cfg{uid = UId,
wal = Wal} = Cfg} = State,
{Idx, Term, Cmd}) ->
case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx),
State#?MODULE{last_index = Idx,
last_term = Term,
last_wal_write = {Pid, now_ms()}
};
{error, wal_down} ->
error(wal_down)
end.

wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,
Expand All @@ -972,14 +1044,15 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId,

[{_, _, LastIdx, LastTerm, _} | _] = WalCommands,
case ra_log_wal:write_batch(Wal, lists:reverse(WalCommands)) of
ok ->
{ok, Pid} ->
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num),
put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx),
State#?MODULE{last_index = LastIdx,
last_term = LastTerm,
last_wal_write = {Pid, now_ms()},
cache = Cache};
{error, wal_down} ->
exit(wal_down)
error(wal_down)
end.

truncate_cache(_FromIdx, ToIdx,
Expand All @@ -1002,7 +1075,7 @@ resend_from(Idx, #?MODULE{cfg = #cfg{uid = UId}} = State0) ->
try resend_from0(Idx, State0) of
State -> State
catch
exit:wal_down ->
error:wal_down ->
?WARN("~ts: ra_log: resending from ~b failed with wal_down",
[UId, Idx]),
State0
Expand All @@ -1017,15 +1090,18 @@ resend_from0(Idx, #?MODULE{cfg = Cfg,
ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_RESENDS, LastIdx - Idx + 1),
lists:foldl(fun (I, Acc) ->
{I, T, C} = ra_log_cache:fetch(I, Cache),
wal_write(Acc, {I, T, C})
wal_rewrite(Acc, {I, T, C})
end,
State#?MODULE{last_resend_time = erlang:system_time(seconds)},
State#?MODULE{last_resend_time = {erlang:system_time(seconds),
whereis(Cfg#cfg.wal)}},
lists:seq(Idx, LastIdx));
resend_from0(Idx, #?MODULE{last_resend_time = LastResend,
resend_from0(Idx, #?MODULE{last_resend_time = {LastResend, WalPid},
cfg = #cfg{resend_window_seconds = ResendWindow}} = State) ->
case erlang:system_time(seconds) > LastResend + ResendWindow of
case erlang:system_time(seconds) > LastResend + ResendWindow orelse
(is_pid(WalPid) andalso not is_process_alive(WalPid)) of
true ->
% it has been more than a minute since last resend
% it has been more than the resend window since last resend
% _or_ the wal has been restarted since then
% ok to try again
resend_from(Idx, State#?MODULE{last_resend_time = undefined});
false ->
Expand All @@ -1051,7 +1127,7 @@ write_entries([{FstIdx, _, _} | Rest] = Entries, State0) ->
try
{ok, wal_write_batch(State0, Entries)}
catch
exit:wal_down ->
error:wal_down ->
{error, wal_down}
end;
Error ->
Expand Down Expand Up @@ -1161,6 +1237,22 @@ maps_with_values(Keys, Map) ->
end
end, [], Keys).

last_index_term_in_wal(Idx, #?MODULE{last_written_index_term = {Idx, Term}} = State) ->
% we reached the lower limit which is the last known written index
{Idx, Term, State};
last_index_term_in_wal(Idx, #?MODULE{reader = Reader0} = State) ->
case ra_log_reader:fetch_term(Idx, Reader0) of
{undefined, Reader} ->
last_index_term_in_wal(Idx-1, State#?MODULE{reader = Reader});
{Term, Reader} ->
%% if it can be read when bypassing the local cache it is in the
%% wal
{Idx, Term, State#?MODULE{reader = Reader}}
end.

now_ms() ->
erlang:system_time(millisecond).

%%%% TESTS

-ifdef(TEST).
Expand Down
Loading
Loading