diff --git a/src/ra_server.erl b/src/ra_server.erl index 2ea11dd9..9b27c8c6 100644 --- a/src/ra_server.erl +++ b/src/ra_server.erl @@ -1411,6 +1411,20 @@ handle_receive_snapshot(#install_snapshot_rpc{term = Term, State = update_term(Term, State0#{log => Log}), {receive_snapshot, State, [{reply, Reply}]} end; +handle_receive_snapshot(#append_entries_rpc{term = Term} = Msg, + #{current_term := CurTerm, + cfg := #cfg{log_id = LogId}, + log := Log0} = State) + when Term > CurTerm -> + ?INFO("~ts: follower receiving snapshot saw append_entries_rpc from ~w for term ~b " + "abdicates term: ~b!", + [LogId, Msg#append_entries_rpc.leader_id, + Term, CurTerm]), + SnapState0 = ra_log:snapshot_state(Log0), + SnapState = ra_snapshot:abort_accept(SnapState0), + Log = ra_log:set_snapshot_state(SnapState, Log0), + {follower, update_term(Term, clear_leader_id(State#{log => Log})), + [{next_event, Msg}]}; handle_receive_snapshot({ra_log_event, Evt}, State = #{cfg := #cfg{id = _Id, log_id = LogId}, log := Log0}) -> diff --git a/src/ra_server_proc.erl b/src/ra_server_proc.erl index dcd31740..3f89ff0d 100644 --- a/src/ra_server_proc.erl +++ b/src/ra_server_proc.erl @@ -874,10 +874,15 @@ receive_snapshot(EventType, Msg, State0) -> {receive_snapshot, State1, Effects} -> {#state{conf = Conf} = State, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), - #conf{receive_snapshot_timeout = ReceiveSnapshotTimeout} = Conf, - {keep_state, State, - [{state_timeout, ReceiveSnapshotTimeout, - receive_snapshot_timeout} | Actions]}; + TimeoutActions = case Msg of + #install_snapshot_rpc{} -> + %% Reset timeout only on receive snapshot progress. + [{state_timeout, Conf#conf.receive_snapshot_timeout, + receive_snapshot_timeout}]; + _ -> + [] + end, + {keep_state, State, TimeoutActions ++ Actions}; {follower, State1, Effects} -> {State2, Actions} = ?HANDLE_EFFECTS(Effects, EventType, State1), State = follower_leader_change(State0, State2), diff --git a/test/ra_server_SUITE.erl b/test/ra_server_SUITE.erl index 321e45ce..2e414c29 100644 --- a/test/ra_server_SUITE.erl +++ b/test/ra_server_SUITE.erl @@ -57,6 +57,7 @@ all() -> follower_ignores_installs_snapshot_with_higher_machine_version, follower_receives_stale_snapshot, receive_snapshot_timeout, + receive_snapshot_new_leader_aer, snapshotted_follower_received_append_entries, leader_received_append_entries_reply_with_stale_last_index, leader_receives_install_snapshot_result, @@ -2107,6 +2108,38 @@ receive_snapshot_timeout(_Config) -> undefined = ra_snapshot:accepting(SS), ok. +receive_snapshot_new_leader_aer(_Config) -> + N1 = ?N1, N2 = ?N2, N3 = ?N3, + #{N3 := {_, FState0 = #{cluster := Config, + current_term := CurTerm, + commit_index := CommitIdx}, _}} + = init_servers([N1, N2, N3], {module, ra_queue, #{}}), + FState = FState0#{last_applied => 3}, + LastTerm = 1, % snapshot term + Idx = 6, + ISRpc = #install_snapshot_rpc{term = CurTerm, leader_id = N1, + meta = snap_meta(Idx, LastTerm, Config), + chunk_state = {1, last}, + data = []}, + {receive_snapshot, FState1, + [{next_event, ISRpc}, {record_leader_msg, _}]} = + ra_server:handle_follower(ISRpc, FState), + + %% revert back to follower on next-term AER + AER = #append_entries_rpc{term = CurTerm + 1, leader_id = N1, + prev_log_index = CommitIdx, + prev_log_term = CurTerm, + leader_commit = CommitIdx, + entries = []}, + {follower, #{log := Log, current_term := NewTerm}, _} + = ra_server:handle_receive_snapshot(AER, FState1), + %% term should be updated + NewTerm = CurTerm + 1, + %% snapshot should be aborted + SS = ra_log:snapshot_state(Log), + undefined = ra_snapshot:accepting(SS), + ok. + snapshotted_follower_received_append_entries(_Config) -> N1 = ?N1, N2 = ?N2, N3 = ?N3, #{N3 := {_, FState0 = #{cluster := Config}, _}} =