diff --git a/src/ra_log.erl b/src/ra_log.erl index 87f7f4a6..8d8c1760 100644 --- a/src/ra_log.erl +++ b/src/ra_log.erl @@ -48,7 +48,8 @@ % external reader register_reader/2, - readers/1 + readers/1, + tick/2 ]). -include("ra.hrl"). @@ -106,6 +107,7 @@ % index specified cache = ra_log_cache:init() :: ra_log_cache:state(), last_resend_time :: option({integer(), WalPid :: pid() | undefined}), + last_wal_write :: {pid(), Ms :: ra_machine:milliseconds()}, reader :: ra_log_reader:state(), readers = [] :: [pid()] }). @@ -234,7 +236,8 @@ init(#{uid := UId, first_index = max(SnapIdx + 1, FirstIdx), last_index = max(SnapIdx, LastIdx0), reader = Reader, - snapshot_state = SnapshotState + snapshot_state = SnapshotState, + last_wal_write = {whereis(Wal), now_ms()} }, put_counter(Cfg, ?C_RA_SVR_METRIC_SNAPSHOT_INDEX, SnapIdx), LastIdx = State000#?MODULE.last_index, @@ -736,6 +739,24 @@ flush_cache(#?MODULE{cache = Cache} = State) -> needs_cache_flush(#?MODULE{cache = Cache}) -> ra_log_cache:needs_flush(Cache). +-spec tick(Now :: integer(), state()) -> state(). +tick(Now, #?MODULE{cfg = #cfg{wal = Wal}, + cache = Cache, + last_written_index_term = {LastWrittenIdx, _}, + last_wal_write = {WalPid, Ms}} = State) -> + CurWalPid = whereis(Wal), + case Now > Ms + 5000 andalso + CurWalPid =/= undefined andalso + CurWalPid =/= WalPid andalso + ra_log_cache:size(Cache) > 0 of + true -> + %% the wal has restarted, it has been at least 5s and there are + %% cached items, resend them + resend_from(LastWrittenIdx + 1, State); + false -> + State + end. + suggest_snapshot0(SnapKind, Idx, Cluster, MacVersion, MacState, State0) -> ClusterServerIds = maps:map(fun (_, V) -> maps:with([voter_status], V) @@ -830,6 +851,7 @@ overview(#?MODULE{last_index = LastIndex, last_written_index_term = LWIT, snapshot_state = SnapshotState, reader = Reader, + last_wal_write = {_LastPid, LastMs}, cache = Cache}) -> #{type => ?MODULE, last_index => LastIndex, @@ -846,7 +868,8 @@ overview(#?MODULE{last_index = LastIndex, undefined -> undefined; {I, _} -> I end, - cache_size => ra_log_cache:size(Cache) + cache_size => ra_log_cache:size(Cache), + last_wal_write => LastMs }. -spec write_config(ra_server:config(), state()) -> ok. @@ -964,10 +987,11 @@ wal_truncate_write(#?MODULE{cfg = #cfg{uid = UId, % we need to indicate to the WAL that this may be a non-contiguous write % and that prior entries should be considered stale case ra_log_wal:truncate_write({UId, self()}, Wal, Idx, Term, Cmd) of - ok -> + {ok, Pid} -> ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), State#?MODULE{last_index = Idx, last_term = Term, + last_wal_write = {Pid, now_ms()}, cache = ra_log_cache:add(Entry, Cache)}; {error, wal_down} -> error(wal_down) @@ -978,10 +1002,11 @@ wal_write(#?MODULE{cfg = #cfg{uid = UId, cache = Cache} = State, {Idx, Term, Cmd} = Entry) -> case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of - ok -> + {ok, Pid} -> ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), State#?MODULE{last_index = Idx, last_term = Term, + last_wal_write = {Pid, now_ms()}, cache = ra_log_cache:add(Entry, Cache)}; {error, wal_down} -> error(wal_down) @@ -993,11 +1018,13 @@ wal_rewrite(#?MODULE{cfg = #cfg{uid = UId, wal = Wal} = Cfg} = State, {Idx, Term, Cmd}) -> case ra_log_wal:write({UId, self()}, Wal, Idx, Term, Cmd) of - ok -> + {ok, Pid} -> ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, 1), put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, Idx), State#?MODULE{last_index = Idx, - last_term = Term}; + last_term = Term, + last_wal_write = {Pid, now_ms()} + }; {error, wal_down} -> error(wal_down) end. @@ -1016,11 +1043,12 @@ wal_write_batch(#?MODULE{cfg = #cfg{uid = UId, [{_, _, LastIdx, LastTerm, _} | _] = WalCommands, case ra_log_wal:write_batch(Wal, lists:reverse(WalCommands)) of - ok -> + {ok, Pid} -> ok = incr_counter(Cfg, ?C_RA_LOG_WRITE_OPS, Num), put_counter(Cfg, ?C_RA_SVR_METRIC_LAST_INDEX, LastIdx), State#?MODULE{last_index = LastIdx, last_term = LastTerm, + last_wal_write = {Pid, now_ms()}, cache = Cache}; {error, wal_down} -> error(wal_down) @@ -1221,6 +1249,9 @@ last_index_term_in_wal(Idx, #?MODULE{reader = Reader0} = State) -> {Idx, Term, State#?MODULE{reader = Reader}} end. +now_ms() -> + erlang:system_time(millisecond). + %%%% TESTS -ifdef(TEST). diff --git a/src/ra_log_wal.erl b/src/ra_log_wal.erl index 85259825..e22acee0 100644 --- a/src/ra_log_wal.erl +++ b/src/ra_log_wal.erl @@ -149,19 +149,20 @@ {call, from(), wal_command()}. -spec write(writer_id(), atom() | pid(), ra_index(), ra_term(), term()) -> - ok | {error, wal_down}. + {ok, pid()} | {error, wal_down}. write(From, Wal, Idx, Term, Cmd) -> named_cast(Wal, {append, From, Idx, Term, Cmd}). -spec truncate_write(writer_id(), atom(), ra_index(), ra_term(), term()) -> - ok | {error, wal_down}. + {ok, pid()} | {error, wal_down}. truncate_write(From, Wal, Idx, Term, Cmd) -> named_cast(Wal, {truncate, From, Idx, Term, Cmd}). -spec write_batch(Wal :: atom() | pid(), [wal_command()]) -> - ok | {error, wal_down}. + {ok, pid()} | {error, wal_down}. write_batch(Wal, WalCommands) when is_pid(Wal) -> - gen_batch_server:cast_batch(Wal, WalCommands); + gen_batch_server:cast_batch(Wal, WalCommands), + {ok, Wal}; write_batch(Wal, WalCommands) when is_atom(Wal) -> case whereis(Wal) of undefined -> @@ -171,7 +172,8 @@ write_batch(Wal, WalCommands) when is_atom(Wal) -> end. named_cast(To, Msg) when is_pid(To) -> - gen_batch_server:cast(To, Msg); + gen_batch_server:cast(To, Msg), + {ok, To}; named_cast(Wal, Msg) -> case whereis(Wal) of undefined -> diff --git a/src/ra_server.erl b/src/ra_server.erl index 1ed47613..2110a49a 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -27,6 +27,7 @@ handle_aux/4, handle_state_enter/2, tick/1, + log_tick/1, overview/1, metrics/1, is_new/1, @@ -1464,6 +1465,14 @@ tick(#{cfg := #cfg{effective_machine_module = MacMod}, Now = erlang:system_time(millisecond), ra_machine:tick(MacMod, Now, MacState). +-spec log_tick(ra_server_state()) -> effects(). +log_tick(#{cfg := #cfg{}, + log := Log0} = State) -> + Now = erlang:system_time(millisecond), + Log = ra_log:tick(Now, Log0), + State#{log => Log}. + + -spec handle_state_enter(ra_state() | eol, ra_server_state()) -> {ra_server_state() | eol, effects()}. handle_state_enter(RaftState, #{cfg := #cfg{effective_machine_module = MacMod}, diff --git a/test/ra_log_2_SUITE.erl b/test/ra_log_2_SUITE.erl index bc5266ba..3a7e5e25 100644 --- a/test/ra_log_2_SUITE.erl +++ b/test/ra_log_2_SUITE.erl @@ -19,6 +19,7 @@ all() -> all_tests() -> [ resend_write, + resend_write_after_tick, handle_overwrite, receive_segment, read_one, @@ -564,7 +565,8 @@ resend_write(Config) -> % simulate lost messages requiring the ra server to resend in flight % writes meck:new(ra_log_wal, [passthrough]), - meck:expect(ra_log_wal, write, fun (_, _, 10, _, _) -> ok; + meck:expect(ra_log_wal, write, fun (_, _, 10, _, _) -> + {ok, self()}; (A, B, C, D, E) -> meck:passthrough([A, B, C, D, E]) end), @@ -598,6 +600,34 @@ resend_write(Config) -> ok. +resend_write_after_tick(Config) -> + meck:new(ra_log_wal, [passthrough]), + WalPid = whereis(ra_log_wal), + timer:sleep(100), + ct:pal("ra_log_init"), + Log0 = ra_log_init(Config), + {0, 0} = ra_log:last_index_term(Log0), + ct:pal("appending"), + meck:expect(ra_log_wal, write, fun (_, _, _, _, _) -> + {ok, WalPid} + end), + Log1 = ra_log:append({1, 2, banana}, Log0), + %% this append should be lost + meck:unload(ra_log_wal), + %% restart wal to get a new wal pid so that the ra_log detects on tick + %% that the walhas changed + ct:pal("restart wal"), + restart_wal(), + + Ms = erlang:system_time(millisecond) + 5001, + Log2 = ra_log:tick(Ms, Log1), + Log = assert_log_events(Log2, fun (L) -> + {1, 2} == ra_log:last_written(L) + end), + ct:pal("overvew ~p", [ra_log:overview(Log)]), + ra_log:close(Log), + ok. + wal_crash_recover(Config) -> Log0 = ra_log_init(Config, #{resend_window => 1}), Log1 = write_n(1, 50, 2, Log0), @@ -660,17 +690,14 @@ detect_lost_written_range(Config) -> end), % WAL rolls over and WAL file is deleted % simulate wal outage - meck:expect(ra_log_wal, write, fun (_, _, _, _, _) -> ok end), + meck:expect(ra_log_wal, write, fun (_, _, _, _, _) -> {ok, self()} end), % append some messages that will be lost Log3 = append_n(10, 15, 2, Log2), % restart WAL to ensure lose the transient state keeping track of % each writer's last written index - [SupPid] = [P || {ra_log_wal_sup, P, _, _} - <- supervisor:which_children(ra_log_sup)], - ok = supervisor:terminate_child(SupPid, ra_log_wal), - {ok, _} = supervisor:restart_child(SupPid, ra_log_wal), + restart_wal(), % WAL recovers meck:unload(ra_log_wal), @@ -1397,3 +1424,10 @@ ra_log_init(Config, Cfg0) -> ra_log_take(From, To, Log0) -> {Acc, Log} = ra_log:fold(From, To, fun (E, Acc) -> [E | Acc] end, [], Log0), {lists:reverse(Acc), Log}. + +restart_wal() -> + [SupPid] = [P || {ra_log_wal_sup, P, _, _} + <- supervisor:which_children(ra_log_sup)], + ok = supervisor:terminate_child(SupPid, ra_log_wal), + {ok, _} = supervisor:restart_child(SupPid, ra_log_wal), + ok. diff --git a/test/ra_log_wal_SUITE.erl b/test/ra_log_wal_SUITE.erl index 0101537d..0713abc7 100644 --- a/test/ra_log_wal_SUITE.erl +++ b/test/ra_log_wal_SUITE.erl @@ -123,9 +123,9 @@ basic_log_writes(Config) -> Conf = ?config(wal_conf, Config), {UId, _} = WriterId = ?config(writer_id, Config), {ok, Pid} = ra_log_wal:start_link(Conf), - ok = ra_log_wal:write(WriterId, ra_log_wal, 12, 1, "value"), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 12, 1, "value"), {12, 1, "value"} = await_written(WriterId, {12, 12, 1}), - ok = ra_log_wal:write(WriterId, ra_log_wal, 13, 1, "value2"), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 13, 1, "value2"), {13, 1, "value2"} = await_written(WriterId, {13, 13, 1}), % previous log value is still there {12, 1, "value"} = mem_tbl_read(UId, 12), @@ -141,9 +141,9 @@ basic_log_writes_compress_mem_table(Config) -> Conf = ?config(wal_conf, Config), {UId, _} = WriterId = ?config(writer_id, Config), {ok, Pid} = ra_log_wal:start_link(Conf#{compress_mem_tables => true}), - ok = ra_log_wal:write(WriterId, ra_log_wal, 12, 1, "value"), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 12, 1, "value"), {12, 1, "value"} = await_written(WriterId, {12, 12, 1}), - ok = ra_log_wal:write(WriterId, ra_log_wal, 13, 1, "value2"), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 13, 1, "value2"), {13, 1, "value2"} = await_written(WriterId, {13, 13, 1}), % previous log value is still there {12, 1, "value"} = mem_tbl_read(UId, 12), @@ -158,12 +158,12 @@ same_uid_different_process(Config) -> Conf = ?config(wal_conf, Config), {UId, _} = WriterId = ?config(writer_id, Config), {ok, Pid} = ra_log_wal:start_link(Conf), - ok = ra_log_wal:write(WriterId, ra_log_wal, 12, 1, "value"), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 12, 1, "value"), {12, 1, "value"} = await_written(WriterId, {12, 12, 1}), Self = self(), _ = spawn(fun() -> Wid = {UId, self()}, - ok = ra_log_wal:write(Wid, ra_log_wal, 13, 1, "value2"), + {ok, _} = ra_log_wal:write(Wid, ra_log_wal, 13, 1, "value2"), {13, 1, "value2"} = await_written(Wid, {13, 13, 1}), Self ! go end), @@ -217,7 +217,7 @@ test_write_many(Name, NumWrites, ComputeChecksums, BatchSize, DataSize, Config) garbage_collect => true, max_batch_size => BatchSize}), Data = crypto:strong_rand_bytes(DataSize), - ok = ra_log_wal:write(WriterId, ra_log_wal, 0, 1, Data), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 0, 1, Data), timer:sleep(5), % start_profile(Config, [ra_log_wal, ra_file_handle, ets, file, lists, os]), Writes = lists:seq(1, NumWrites), @@ -229,7 +229,7 @@ test_write_many(Name, NumWrites, ComputeChecksums, BatchSize, DataSize, Config) {Taken, _} = timer:tc( fun () -> - [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, {data, Data}) || Idx <- Writes], receive {ra_log_event, {written, {_, NumWrites, 1}}} -> @@ -271,7 +271,7 @@ write_many_by_many(Config) -> {_UId, _} = WriterId = ?config(writer_id, Config), {ok, WalPid} = ra_log_wal:start_link(Conf#{compute_checksums => false}), Data = crypto:strong_rand_bytes(1024), - ok = ra_log_wal:write(WriterId, ra_log_wal, 0, 1, Data), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 0, 1, Data), timer:sleep(5), % start_profile(Config, [ra_log_wal, ra_file_handle, ets, file, lists, os]), Writes = lists:seq(1, NumWrites), @@ -285,7 +285,7 @@ write_many_by_many(Config) -> [spawn_link(fun () -> WId = {term_to_binary(I), self()}, put(wid, WId), - [ok = ra_log_wal:write(WId, ra_log_wal, Idx, 1, + [{ok, _} = ra_log_wal:write(WId, ra_log_wal, Idx, 1, {data, Data}) || Idx <- Writes], receive {ra_log_event, {written, {_, NumWrites, 1}}} -> @@ -334,12 +334,12 @@ overwrite(Config) -> WriterId = ?config(writer_id, Config), {ok, Pid} = ra_log_wal:start_link(Conf), Data = data, - [ok = ra_log_wal:write(WriterId, ra_log_wal, I, 1, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, I, 1, Data) || I <- lists:seq(1, 3)], await_written(WriterId, {1, 3, 1}), % write next index then immediately overwrite - ok = ra_log_wal:write(WriterId, ra_log_wal, 4, 1, Data), - ok = ra_log_wal:write(WriterId, ra_log_wal, 2, 2, Data), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 4, 1, Data), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 2, 2, Data), % ensure we await the correct range that should not have a wonky start await_written(WriterId, {2, 2, 2}), proc_lib:stop(Pid), @@ -353,12 +353,12 @@ truncate_write(Config) -> {ok, Pid} = ra_log_wal:start_link(Conf), Data = crypto:strong_rand_bytes(1024), % write 1-3 - [ok = ra_log_wal:write(WriterId, ra_log_wal, I, 1, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, I, 1, Data) || I <- lists:seq(1, 3)], await_written(WriterId, {1, 3, 1}), % then write 7 as may happen after snapshot installation - ok = ra_log_wal:truncate_write(WriterId, ra_log_wal, 7, 1, Data), - ok = ra_log_wal:write(WriterId, ra_log_wal, 8, 1, Data), + {ok, _} = ra_log_wal:truncate_write(WriterId, ra_log_wal, 7, 1, Data), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 8, 1, Data), await_written(WriterId, {7, 8, 1}), [{UId, 7, 8, Tid}] = ets:lookup(ra_log_open_mem_tables, UId), [_] = ets:lookup(Tid, 7), @@ -377,11 +377,11 @@ out_of_seq_writes(Config) -> {ok, Pid} = ra_log_wal:start_link(Conf), Data = crypto:strong_rand_bytes(1024), % write 1-3 - [ok = ra_log_wal:write(WriterId, ra_log_wal, I, 1, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, I, 1, Data) || I <- lists:seq(1, 3)], await_written(WriterId, {1, 3, 1}), % then write 5 - ok = ra_log_wal:write(WriterId, ra_log_wal, 5, 1, Data), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 5, 1, Data), % ensure an out of sync notification is received receive {ra_log_event, {resend_write, 4}} -> ok @@ -389,25 +389,25 @@ out_of_seq_writes(Config) -> throw(reset_write_timeout) end, % try writing 6 - ok = ra_log_wal:write(WriterId, ra_log_wal, 6, 1, Data), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 6, 1, Data), % then write 4 and 5 - ok = ra_log_wal:write(WriterId, ra_log_wal, 4, 1, Data), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 4, 1, Data), await_written(WriterId, {4, 4, 1}), - ok = ra_log_wal:write(WriterId, ra_log_wal, 5, 1, Data), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 5, 1, Data), await_written(WriterId, {5, 5, 1}), % perform another out of sync write - ok = ra_log_wal:write(WriterId, ra_log_wal, 7, 1, Data), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 7, 1, Data), receive {ra_log_event, {resend_write, 6}} -> ok after 500 -> throw(written_timeout) end, % force a roll over - ok = ra_log_wal:force_roll_over(ra_log_wal), + ok= ra_log_wal:force_roll_over(ra_log_wal), % try writing another - ok = ra_log_wal:write(WriterId, ra_log_wal, 8, 1, Data), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 8, 1, Data), % ensure a written event is _NOT_ received % when a roll-over happens after out of sync write receive @@ -416,7 +416,7 @@ out_of_seq_writes(Config) -> after 500 -> ok end, % write the missing one - ok = ra_log_wal:write(WriterId, ra_log_wal, 6, 1, Data), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 6, 1, Data), await_written(WriterId, {6, 6, 1}), proc_lib:stop(Pid), ok. @@ -434,7 +434,7 @@ roll_over(Config) -> % write enough entries to trigger roll over Data = crypto:strong_rand_bytes(1024), [begin - ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) end || Idx <- lists:seq(1, NumWrites)], % wait for writes receive {ra_log_event, {written, {_, NumWrites, 1}}} -> ok @@ -474,7 +474,7 @@ roll_over_with_data_larger_than_max_size(Config) -> % write entries each larger than the WAL max size to trigger roll over Data = crypto:strong_rand_bytes(64 * 1024), [begin - ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) end || Idx <- lists:seq(1, NumWrites)], % wait for writes receive {ra_log_event, {written, {_, NumWrites, 1}}} -> ok @@ -514,7 +514,7 @@ roll_over_entry_limit(Config) -> % write enough entries to trigger roll over Data = crypto:strong_rand_bytes(1024), [begin - ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) end || Idx <- lists:seq(1, NumWrites)], % wait for writes receive {ra_log_event, {written, {_, NumWrites, 1}}} -> ok @@ -554,9 +554,9 @@ recover_truncated_write(Config) -> meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), {ok, _Pid} = ra_log_wal:start_link(Conf), - [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) || Idx <- lists:seq(1, 3)], - ok = ra_log_wal:truncate_write(WriterId, ra_log_wal, 9, 1, Data), + {ok, _} = ra_log_wal:truncate_write(WriterId, ra_log_wal, 9, 1, Data), empty_mailbox(), proc_lib:stop(ra_log_wal), {ok, Pid} = ra_log_wal:start_link(Conf), @@ -586,7 +586,7 @@ recover_after_roll_over(Config) -> meck:new(ra_log_segment_writer, [passthrough]), meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), {ok, _} = ra_log_wal:start_link(Conf), - [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) || Idx <- lists:seq(1, 100)], empty_mailbox(), proc_lib:stop(ra_log_wal), @@ -610,11 +610,11 @@ recover(Config) -> meck:new(ra_log_segment_writer, [passthrough]), meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), {ok, _Wal} = ra_log_wal:start_link(Conf), - [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) || Idx <- lists:seq(1, 100)], _ = await_written(WriterId, {1, 100, 1}), ra_log_wal:force_roll_over(ra_log_wal), - [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 2, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 2, Data) || Idx <- lists:seq(101, 200)], _ = await_written(WriterId, {101, 200, 2}), empty_mailbox(), @@ -657,8 +657,8 @@ recover_overwrite_in_same_batch(Config) -> meck:new(ra_log_segment_writer, [passthrough]), meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), {ok, _Wal} = ra_log_wal:start_link(Conf), - ok = ra_log_wal:write(WriterId, ra_log_wal, 1, 1, <<"data1">>), - ok = ra_log_wal:write(WriterId, ra_log_wal, 1, 2, <<"data2">>), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 1, 1, <<"data1">>), + {ok, _} = ra_log_wal:write(WriterId, ra_log_wal, 1, 2, <<"data2">>), _ = await_written(WriterId, {1, 1, 2}), ra_log_wal:force_roll_over(ra_log_wal), empty_mailbox(), @@ -694,11 +694,11 @@ recover_with_small_chunks(Config) -> meck:new(ra_log_segment_writer, [passthrough]), meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), {ok, _Wal} = ra_log_wal:start_link(Conf), - [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) || Idx <- lists:seq(1, 100)], _ = await_written(WriterId, {1, 100, 1}), ra_log_wal:force_roll_over(ra_log_wal), - [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 2, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 2, Data) || Idx <- lists:seq(101, 200)], _ = await_written(WriterId, {101, 200, 2}), proc_lib:stop(ra_log_wal), @@ -755,7 +755,7 @@ recover_with_partial_last_entry(Config) -> meck:new(ra_log_segment_writer, [passthrough]), meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), {ok, _Wal} = ra_log_wal:start_link(Conf), - [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) || Idx <- lists:seq(1, 100)], _ = await_written(WriterId, {1, 100, 1}), empty_mailbox(), @@ -785,7 +785,7 @@ recover_with_last_entry_corruption(Config) -> meck:new(ra_log_segment_writer, [passthrough]), meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), {ok, _Wal} = ra_log_wal:start_link(Conf), - [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) || Idx <- lists:seq(1, 100)], _ = await_written(WriterId, {1, 100, 1}), empty_mailbox(), @@ -815,7 +815,7 @@ recover_with_last_entry_corruption_pre_allocate(Config) -> meck:new(ra_log_segment_writer, [passthrough]), meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), {ok, _Wal} = ra_log_wal:start_link(Conf), - [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) || Idx <- lists:seq(1, 100)], _ = await_written(WriterId, {1, 100, 1}), empty_mailbox(), @@ -847,7 +847,7 @@ checksum_failure_in_middle_of_file_should_fail(Config) -> meck:new(ra_log_segment_writer, [passthrough]), meck:expect(ra_log_segment_writer, await, fun(_) -> ok end), {ok, _Wal} = ra_log_wal:start_link(Conf), - [ok = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) + [{ok, _} = ra_log_wal:write(WriterId, ra_log_wal, Idx, 1, Data) || Idx <- lists:seq(1, 100)], _ = await_written(WriterId, {1, 100, 1}), empty_mailbox(),