From 47984ccb3ef972f1e8a956de89cc2ec0542a7545 Mon Sep 17 00:00:00 2001 From: Karl Nilsson Date: Fri, 5 Jul 2024 15:33:37 +0100 Subject: [PATCH] Try a different checkpoint thinning approach Instead of setting a fixed upper limit the max_checkpoints ra_log settings instead determins the point at which thinning will beging to be applied. This means there is no upper limit on the number of checkpoints that can exist but the differnce in indexes will begin to increase as the number goes up. --- src/ra.hrl | 5 +- src/ra_lib.erl | 8 +-- src/ra_log.erl | 21 +++++-- src/ra_log_snapshot.erl | 16 ++++-- src/ra_server_proc.erl | 9 ++- src/ra_snapshot.erl | 102 ++++++++++++--------------------- test/coordination_SUITE.erl | 32 +++++++++++ test/ra_log_snapshot_SUITE.erl | 10 ++-- 8 files changed, 116 insertions(+), 87 deletions(-) diff --git a/src/ra.hrl b/src/ra.hrl index 9d46ec21..06a63629 100644 --- a/src/ra.hrl +++ b/src/ra.hrl @@ -261,6 +261,8 @@ "Total number of checkpoints written"}, {checkpoint_bytes_written, ?C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, counter, "Number of checkpoint bytes written"}, + {checkpoints_promoted, ?C_RA_LOG_CHECKPOINTS_PROMOTED, counter, + "Number of checkpoints promoted to snapshots"}, {reserved_1, ?C_RA_LOG_RESERVED, counter, "Reserved counter"} ]). -define(C_RA_LOG_WRITE_OPS, 1). @@ -277,7 +279,8 @@ -define(C_RA_LOG_OPEN_SEGMENTS, 12). -define(C_RA_LOG_CHECKPOINTS_WRITTEN, 13). -define(C_RA_LOG_CHECKPOINT_BYTES_WRITTEN, 14). --define(C_RA_LOG_RESERVED, 15). +-define(C_RA_LOG_CHECKPOINTS_PROMOTED, 15). +-define(C_RA_LOG_RESERVED, 16). -define(C_RA_SRV_AER_RECEIVED_FOLLOWER, ?C_RA_LOG_RESERVED + 1). -define(C_RA_SRV_AER_REPLIES_SUCCESS, ?C_RA_LOG_RESERVED + 2). diff --git a/src/ra_lib.erl b/src/ra_lib.erl index 7133b25e..77edf82c 100644 --- a/src/ra_lib.erl +++ b/src/ra_lib.erl @@ -321,12 +321,12 @@ retry(Func, Attempt, Sleep) -> end. -spec write_file(file:name_all(), iodata()) -> - ok | file_err(). + ok | {error, file_err()}. write_file(Name, IOData) -> write_file(Name, IOData, true). -spec write_file(file:name_all(), iodata(), Sync :: boolean()) -> - ok | file_err(). + ok | {error, file_err()}. write_file(Name, IOData, Sync) -> case file:open(Name, [binary, write, raw]) of {ok, Fd} -> @@ -347,7 +347,7 @@ write_file(Name, IOData, Sync) -> end. -spec sync_file(file:name_all()) -> - ok | file_err(). + ok | {error, file_err()}. sync_file(Name) -> case file:open(Name, [binary, read, write, raw]) of {ok, Fd} -> @@ -357,7 +357,7 @@ sync_file(Name) -> end. -spec sync_and_close_fd(file:fd()) -> - ok | file_err(). + ok | {error, file_err()}. sync_and_close_fd(Fd) -> case ra_file:sync(Fd) of ok -> diff --git a/src/ra_log.erl b/src/ra_log.erl index 34294350..767a7362 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -129,7 +129,8 @@ max_open_segments => non_neg_integer(), snapshot_module => module(), counter => counters:counters_ref(), - initial_access_pattern => sequential | random}. + initial_access_pattern => sequential | random, + max_checkpoints => non_neg_integer()}. -type overview() :: @@ -199,6 +200,7 @@ init(#{uid := UId, undefined -> {-1, -1}; Curr -> Curr end, + AccessPattern = maps:get(initial_access_pattern, Conf, random), Reader0 = ra_log_reader:init(UId, Dir, 0, MaxOpen, AccessPattern, [], Names, Counter), @@ -244,6 +246,13 @@ init(#{uid := UId, LastIdx = State000#?MODULE.last_index, put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx), put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_WRITTEN_INDEX, LastIdx), + case ra_snapshot:latest_checkpoint(SnapshotState) of + undefined -> + ok; + {ChIdx, _ChTerm} -> + put_counter(Cfg, ?C_RA_SVR_METRIC_CHECKPOINT_INDEX, ChIdx) + end, + % recover the last term {LastTerm0, State00} = case LastIdx of SnapIdx -> @@ -743,11 +752,11 @@ promote_checkpoint(Idx, #?MODULE{cfg = Cfg, _ -> {WasPromoted, SnapState, Effects} = ra_snapshot:promote_checkpoint(Idx, SnapState0), - if WasPromoted -> - ok = incr_counter(Cfg, ?C_RA_LOG_SNAPSHOTS_WRITTEN, 1); - true -> - ok - end, + if WasPromoted -> + ok = incr_counter(Cfg, ?C_RA_LOG_CHECKPOINTS_PROMOTED, 1); + true -> + ok + end, {State#?MODULE{snapshot_state = SnapState}, Effects} end. diff --git a/src/ra_log_snapshot.erl b/src/ra_log_snapshot.erl index 43bb7bcb..21acee4c 100644 --- a/src/ra_log_snapshot.erl +++ b/src/ra_log_snapshot.erl @@ -44,7 +44,7 @@ prepare(_Index, State) -> State. %% @end -spec write(file:filename(), meta(), term(), Sync :: boolean()) -> - ok | {error, file_err()}. + {ok, non_neg_integer()} | {error, file_err()}. write(Dir, Meta, MacState, Sync) -> %% no compression on meta data to make sure reading it is as fast %% as possible @@ -53,10 +53,16 @@ write(Dir, Meta, MacState, Sync) -> Data = [<<(size(MetaBin)):32/unsigned>>, MetaBin | IOVec], Checksum = erlang:crc32(Data), File = filename(Dir), - ra_lib:write_file(File, [<>, - Data], Sync). + Bytes = 9 + iolist_size(Data), + case ra_lib:write_file(File, [<>, + Data], Sync) of + ok -> + {ok, Bytes}; + Err -> + Err + end. -spec sync(file:filename()) -> ok | {error, file_err()}. diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index d039310c..92517816 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -1698,12 +1698,19 @@ do_state_query(QueryName, #state{server_state = State}) -> ra_server:state_query(QueryName, State). config_defaults(ServerId) -> + Counter = case ra_counters:fetch(ServerId) of + undefined -> + ra_counters:new(ServerId, + {persistent_term, ?FIELDSPEC_KEY}); + C -> + C + end, #{broadcast_time => ?DEFAULT_BROADCAST_TIME, tick_timeout => ?TICK_INTERVAL_MS, install_snap_rpc_timeout => ?INSTALL_SNAP_RPC_TIMEOUT, await_condition_timeout => ?DEFAULT_AWAIT_CONDITION_TIMEOUT, initial_members => [], - counter => ra_counters:new(ServerId, {persistent_term, ?FIELDSPEC_KEY}), + counter => Counter, system_config => ra_system:default_config() }. diff --git a/src/ra_snapshot.erl b/src/ra_snapshot.erl index efc5734b..1897fae1 100644 --- a/src/ra_snapshot.erl +++ b/src/ra_snapshot.erl @@ -582,22 +582,49 @@ take_older_checkpoints(Idx, #?MODULE{checkpoints = Checkpoints0} = State0) -> -spec take_extra_checkpoints(state()) -> {state(), [checkpoint()]}. +take_extra_checkpoints(State0) -> + take_extra_checkpoints(State0, []). + + take_extra_checkpoints(#?MODULE{checkpoints = Checkpoints0, - max_checkpoints = MaxCheckpoints} = State0) -> + max_checkpoints = MaxCheckpoints} = State0, + Checks) -> Len = erlang:length(Checkpoints0), - case Len - MaxCheckpoints of - ToDelete when ToDelete > 0 -> - %% Take `ToDelete' checkpoints from the list randomly without - %% ever taking the first or last checkpoint. - IdxsToTake = random_idxs_to_take(MaxCheckpoints, ToDelete), - {Checkpoints, Extras} = lists_take_idxs(Checkpoints0, IdxsToTake), - {State0#?MODULE{checkpoints = Checkpoints}, Extras}; - _ -> - {State0, []} + case Len > MaxCheckpoints of + true -> + %% when the number of checkpoints grow we increase the difference + %% between checkpoints in order to keep the total count kept on disk + %% down but keep some upper limit (~500k) to avoid huge differences + Mult = min(8, Len div MaxCheckpoints), + case find_checkpoint_to_delete(Mult, lists:reverse(Checkpoints0)) of + undefined -> + {State0, Checks}; + {_, _} = Check -> + Checkpoints = lists:delete(Check, Checkpoints0), + {State0#?MODULE{checkpoints = Checkpoints}, + [Check | Checks]} + end; + false -> + {State0, Checks} end. %% Utility +-define(MAX_DIFF, 65_536). + +find_checkpoint_to_delete(Mult, + [{FstIdx, _}, + {_, _} = Pot, + {ThrdIdx, _} | _] = Checks) -> + case ThrdIdx - FstIdx < (?MAX_DIFF * Mult) of + true -> + Pot; + false -> + find_checkpoint_to_delete(Mult, tl(Checks)) + end; +find_checkpoint_to_delete(_, _) -> + undefined. + make_snapshot_dir(Dir, Index, Term) -> I = ra_lib:zpad_hex(Index), T = ra_lib:zpad_hex(Term), @@ -608,65 +635,10 @@ counters_add(undefined, _, _) -> counters_add(Counter, Ix, Incr) -> counters:add(Counter, Ix, Incr). -random_idxs_to_take(Max, N) -> - %% Always retain the first and last elements. - AllIdxs = lists:seq(2, Max - 1), - %% Take a random subset of those indices of length N. - lists:sublist(ra_lib:lists_shuffle(AllIdxs), N). - -%% Take items from the given list by the given indices without disturbing the -%% order of the list. --spec lists_take_idxs(List, Idxs) -> {List1, Taken} when - List :: list(Elem), - Elem :: any(), - Idxs :: list(pos_integer()), - List1 :: list(Elem), - Taken :: list(Elem). -lists_take_idxs(List, Idxs0) -> - %% Sort the indices so `lists_take_idxs/5' may run linearly on the two lists - Idxs = lists:sort(Idxs0), - %% 1-indexing like the `lists' module. - lists_take_idxs(List, Idxs, 1, [], []). - -lists_take_idxs([Elem | Elems], [Idx | Idxs], Idx, TakeAcc, ElemAcc) -> - lists_take_idxs(Elems, Idxs, Idx + 1, [Elem | TakeAcc], ElemAcc); -lists_take_idxs([Elem | Elems], Idxs, Idx, TakeAcc, ElemAcc) -> - lists_take_idxs(Elems, Idxs, Idx + 1, TakeAcc, [Elem | ElemAcc]); -lists_take_idxs(Elems, _Idxs = [], _Idx, TakeAcc, ElemAcc) -> - {lists:reverse(ElemAcc, Elems), lists:reverse(TakeAcc)}; -lists_take_idxs(_Elems = [], _Idxs, _Idx, TakeAcc, ElemAcc) -> - {lists:reverse(ElemAcc), lists:reverse(TakeAcc)}. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -random_idxs_to_take_test() -> - Idxs = random_idxs_to_take(10, 3), - ?assertEqual(3, length(Idxs)), - [Min, _, Max] = lists:sort(Idxs), - %% The first and last elements are excluded. - ?assert(Min > 1), - ?assert(Max < 10), - ok. - -lists_take_idxs_test() -> - ?assertEqual( - {[1, 3, 5, 7, 8], [2, 4, 6]}, - lists_take_idxs(lists:seq(1, 8), [2, 4, 6])), - - %% Ordering of `Idxs' doesn't matter. - ?assertEqual( - {[1, 3, 5, 7, 8], [2, 4, 6]}, - lists_take_idxs(lists:seq(1, 8), [4, 6, 2])), - ?assertEqual( - {[a, c], [b]}, - lists_take_idxs([a, b, c], [2])), - - %% `List''s order is preserved even when nothing is taken. - ?assertEqual( - {[a, b, c], []}, - lists_take_idxs([a, b, c], [])), - ok. -endif. diff --git a/test/coordination_SUITE.erl b/test/coordination_SUITE.erl index f5182579..a0ab7be8 100644 --- a/test/coordination_SUITE.erl +++ b/test/coordination_SUITE.erl @@ -787,6 +787,26 @@ recover_from_checkpoint(Config) -> LeaderIdx =:= 8 andalso Follower1Idx =:= 8 andalso Follower2Idx =:= 8 end, 20), + CounterKeys = [ + checkpoint_bytes_written, + checkpoint_index, + checkpoints, + checkpoints_written, + checkpoints_promoted + ], + [begin + ?assertMatch( + #{ + checkpoint_bytes_written := B, + checkpoint_index := 8, + checkpoints := 1, + checkpoints_written := 1, + checkpoints_promoted := 0 + } when B > 0, + ct_rpc:call(N, ra_counters, counters, + [ServerId, CounterKeys])) + end || {_, N} = ServerId <- ServerIds], + %% Restart the servers [ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds], @@ -814,6 +834,18 @@ recover_from_checkpoint(Config) -> Follower2Idx =:= 8 end, 20), + [begin + ?assertMatch( + #{ + checkpoint_bytes_written := B, + checkpoint_index := 8, + checkpoints := 1, + checkpoints_written := 1, + checkpoints_promoted := 1 + } when B > 0, + ct_rpc:call(N, ra_counters, counters, + [ServerId, CounterKeys])) + end || {_, N} = ServerId <- ServerIds], %% Restart the servers: the servers should be able to recover from the %% snapshot which was promoted from a checkpoint. [ok = ra:stop_server(?SYS, ServerId) || ServerId <- ServerIds], diff --git a/test/ra_log_snapshot_SUITE.erl b/test/ra_log_snapshot_SUITE.erl index adfd560c..a1210cff 100644 --- a/test/ra_log_snapshot_SUITE.erl +++ b/test/ra_log_snapshot_SUITE.erl @@ -73,7 +73,7 @@ roundtrip(Config) -> Dir = ?config(dir, Config), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotRef = my_state, - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), + {ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), Context = #{can_accept_full_file => true}, ?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir, Context)), ok. @@ -82,7 +82,7 @@ roundtrip_compat(Config) -> Dir = ?config(dir, Config), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotRef = my_state, - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), + {ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), ?assertEqual({SnapshotMeta, SnapshotRef}, read(Dir)), ok. @@ -107,7 +107,7 @@ test_accept(Config, Name, DataSize, FullFile, ChunkSize) -> ct:pal("test_accept ~w ~b ~w ~b", [Name, DataSize, FullFile, ChunkSize]), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotRef = crypto:strong_rand_bytes(DataSize), - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), + {ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), Context = #{can_accept_full_file => FullFile}, {ok, Meta, St} = ra_log_snapshot:begin_read(Dir, Context), %% how to ensure @@ -180,7 +180,7 @@ read_meta_data(Config) -> Dir = ?config(dir, Config), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotRef = my_state, - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), + {ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotRef, true), {ok, SnapshotMeta} = ra_log_snapshot:read_meta(Dir), ok. @@ -188,7 +188,7 @@ recover_same_as_read(Config) -> Dir = ?config(dir, Config), SnapshotMeta = meta(33, 94, [{banana, node@jungle}, {banana, node@savanna}]), SnapshotData = my_state, - ok = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData, true), + {ok, _} = ra_log_snapshot:write(Dir, SnapshotMeta, SnapshotData, true), {ok, SnapshotMeta, SnapshotData} = ra_log_snapshot:recover(Dir), ok.