diff --git a/src/consensus/aft/raft.h b/src/consensus/aft/raft.h index 40e86f58273e..43742c881954 100644 --- a/src/consensus/aft/raft.h +++ b/src/consensus/aft/raft.h @@ -689,6 +689,15 @@ namespace aft break; } + case raft_propose_request_vote: + { + ProposeRequestVote r = + channels->template recv_authenticated( + from, data, size); + recv_propose_request_vote(from, r); + break; + } + default: { RAFT_FAIL_FMT("Unhandled AFT message type: {}", type); @@ -1706,6 +1715,32 @@ namespace aft add_vote_for_me(from); } + void recv_propose_request_vote( + const ccf::NodeId& from, ProposeRequestVote r) + { + std::lock_guard guard(state->lock); + +#ifdef CCF_RAFT_TRACING + nlohmann::json j = {}; + j["function"] = "recv_propose_request_vote"; + j["packet"] = r; + j["state"] = *state; + j["from_node_id"] = from; + j["committable_indices"] = committable_indices; + RAFT_TRACE_JSON_OUT(j); +#endif + if (can_endorse_primary() && ticking && r.term == state->current_view) + { + RAFT_INFO_FMT( + "Becoming candidate early due to propose request vote from {}", from); + become_candidate(); + } + else + { + RAFT_INFO_FMT("Ignoring propose request vote from {}", from); + } + } + void restart_election_timeout() { // Randomise timeout_elapsed to get a random election timeout @@ -1932,6 +1967,53 @@ namespace aft { leader_id.reset(); state->leadership_state = kv::LeadershipState::None; + ProposeRequestVote prv{ + {raft_propose_request_vote}, state->current_view}; + + std::optional successor = std::nullopt; + Index max_match_idx = 0; + kv::ReconfigurationId reconf_id_of_max_match = 0; + + // Pick the node that has the highest match_idx, and break + // ties by looking at the highest reconfiguration id they are + // part of. This can lead to nudging a node that is + // about to retire too, but that node will then nudge + // a successor, and that seems preferable to nudging a node that + // risks not being eligible if reconfiguration id is prioritised. + // Alternatively, we could pick the node with the higest match idx + // in the latest config, provided that match idx at least as high as a + // majority. That would make them both eligible and unlikely to retire + // soon. + for (auto& [node, node_state] : all_other_nodes) + { + if (node_state.match_idx >= max_match_idx) + { + kv::ReconfigurationId latest_reconf_id = 0; + auto conf = configurations.rbegin(); + while (conf != configurations.rend()) + { + if (conf->nodes.find(node) != conf->nodes.end()) + { + latest_reconf_id = conf->idx; + break; + } + conf++; + } + if (!(node_state.match_idx == max_match_idx && + latest_reconf_id < reconf_id_of_max_match)) + { + reconf_id_of_max_match = latest_reconf_id; + successor = node; + max_match_idx = node_state.match_idx; + } + } + } + if (successor.has_value()) + { + RAFT_INFO_FMT("Node retired, nudging {}", successor.value()); + channels->send_authenticated( + successor.value(), ccf::NodeMsgType::consensus_msg, prv); + } } state->membership_state = kv::MembershipState::Retired; diff --git a/src/consensus/aft/raft_types.h b/src/consensus/aft/raft_types.h index 2c4c3e3596d6..d6f68528e843 100644 --- a/src/consensus/aft/raft_types.h +++ b/src/consensus/aft/raft_types.h @@ -98,7 +98,8 @@ namespace aft raft_append_entries_response, raft_append_entries_signed_response, raft_request_vote, - raft_request_vote_response + raft_request_vote_response, + raft_propose_request_vote, }; DECLARE_JSON_ENUM( RaftMsgType, @@ -110,6 +111,7 @@ namespace aft "raft_append_entries_signed_response"}, {RaftMsgType::raft_request_vote, "raft_request_vote"}, {RaftMsgType::raft_request_vote_response, "raft_request_vote_response"}, + {RaftMsgType::raft_propose_request_vote, "raft_propose_request_vote"}, }); #pragma pack(push, 1) @@ -190,5 +192,14 @@ namespace aft DECLARE_JSON_TYPE_WITH_BASE(RequestVoteResponse, RaftHeader); DECLARE_JSON_REQUIRED_FIELDS(RequestVoteResponse, term, vote_granted); + struct ProposeRequestVote : RaftHeader + { + // A node sends this to nudge another node to begin an election, for + // instance because the sender is a retiring primary + Term term; + }; + DECLARE_JSON_TYPE_WITH_BASE(ProposeRequestVote, RaftHeader); + DECLARE_JSON_REQUIRED_FIELDS(ProposeRequestVote, term); + #pragma pack(pop) } diff --git a/src/consensus/aft/test/driver.h b/src/consensus/aft/test/driver.h index 55afec81629f..ce2c4814fe78 100644 --- a/src/consensus/aft/test/driver.h +++ b/src/consensus/aft/test/driver.h @@ -334,6 +334,16 @@ class RaftDriver rlog(node_id, tgt_node_id, s, dropped); } + void log_msg_details( + ccf::NodeId node_id, + ccf::NodeId tgt_node_id, + aft::ProposeRequestVote prv, + bool dropped) + { + const auto s = fmt::format("propose_request_vote for term {}", prv.term); + log(node_id, tgt_node_id, s, dropped); + } + void log_msg_details( ccf::NodeId node_id, ccf::NodeId tgt_node_id, @@ -370,6 +380,12 @@ class RaftDriver log_msg_details(node_id, tgt_node_id, aer, dropped); break; } + case (aft::RaftMsgType::raft_propose_request_vote): + { + auto prv = *(aft::ProposeRequestVote*)data; + log_msg_details(node_id, tgt_node_id, prv, dropped); + break; + } default: { throw std::runtime_error( diff --git a/tests/raft_scenarios/4582.2 b/tests/raft_scenarios/4582.2 index 068aa22f18aa..4cd2edde97cb 100644 --- a/tests/raft_scenarios/4582.2 +++ b/tests/raft_scenarios/4582.2 @@ -60,9 +60,6 @@ assert_is_primary,0 assert_commit_idx,0,2 assert_commit_idx,1,0 -periodic_all,1000 -dispatch_all - assert_is_backup,1 state_all diff --git a/tla/MCccfraft.cfg b/tla/MCccfraft.cfg index 5524c07461f5..f96ec5c01dd0 100644 --- a/tla/MCccfraft.cfg +++ b/tla/MCccfraft.cfg @@ -26,6 +26,7 @@ CONSTANTS AppendEntriesRequest = AppendEntriesRequest AppendEntriesResponse = AppendEntriesResponse NotifyCommitMessage = NotifyCommitMessage + ProposeVoteRequest = ProposeVoteRequest TypeEntry = Entry TypeSignature = Signature diff --git a/tla/MCccfraftWithReconfig.cfg b/tla/MCccfraftWithReconfig.cfg index 235171a9b28e..74d76cbcbd2c 100644 --- a/tla/MCccfraftWithReconfig.cfg +++ b/tla/MCccfraftWithReconfig.cfg @@ -26,6 +26,7 @@ CONSTANTS AppendEntriesRequest = AppendEntriesRequest AppendEntriesResponse = AppendEntriesResponse NotifyCommitMessage = NotifyCommitMessage + ProposeVoteRequest = ProposeVoteRequest TypeEntry = Entry TypeSignature = Signature diff --git a/tla/SIMccfraft.cfg b/tla/SIMccfraft.cfg index 87a7d8651076..a976cdde5d32 100644 --- a/tla/SIMccfraft.cfg +++ b/tla/SIMccfraft.cfg @@ -16,6 +16,7 @@ CONSTANTS AppendEntriesRequest = AppendEntriesRequest AppendEntriesResponse = AppendEntriesResponse NotifyCommitMessage = NotifyCommitMessage + ProposeVoteRequest = ProposeVoteRequest TypeEntry = Entry TypeSignature = Signature diff --git a/tla/Traceccfraft.cfg b/tla/Traceccfraft.cfg index 75410636f39b..67c84e431e13 100644 --- a/tla/Traceccfraft.cfg +++ b/tla/Traceccfraft.cfg @@ -65,6 +65,7 @@ CONSTANTS AppendEntriesRequest = AppendEntriesRequest AppendEntriesResponse = AppendEntriesResponse NotifyCommitMessage = NotifyCommitMessage + ProposeVoteRequest = ProposeVoteRequest TypeEntry = TypeEntry TypeSignature = TypeSignature diff --git a/tla/Traceccfraft.tla b/tla/Traceccfraft.tla index 38bd119f1ac5..e4bfb84cdb40 100644 --- a/tla/Traceccfraft.tla +++ b/tla/Traceccfraft.tla @@ -4,7 +4,8 @@ EXTENDS ccfraft, Json, IOUtils, Sequences, Network \* raft_types.h enum RaftMsgType RaftMsgType == "raft_append_entries" :> AppendEntriesRequest @@ "raft_append_entries_response" :> AppendEntriesResponse @@ - "raft_request_vote" :> RequestVoteRequest @@ "raft_request_vote_response" :> RequestVoteResponse + "raft_request_vote" :> RequestVoteRequest @@ "raft_request_vote_response" :> RequestVoteResponse @@ + "raft_propose_request_vote" :> ProposeVoteRequest LeadershipState == Leader :> "Leader" @@ Follower :> "Follower" @@ Candidate :> "Candidate" @@ Pending :> "Pending" @@ -301,6 +302,19 @@ IsCheckQuorum == /\ CheckQuorum(logline.msg.state.node_id) /\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices) +IsRcvProposeVoteRequest == + /\ IsEvent("recv_propose_request_vote") + /\ state[logline.msg.state.node_id] = Leader + /\ LET i == logline.msg.state.node_id + j == logline.msg.to_node_id + IN /\ \E m \in Messages': + /\ m.type = ProposeVoteRequest + /\ m.type = RaftMsgType[logline.msg.packet.msg] + /\ m.term = logline.msg.packet.term + \* There is now one more message of this type. + /\ OneMoreMessage(m) + /\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices) + TraceNext == \/ IsTimeout \/ IsBecomeLeader @@ -325,6 +339,8 @@ TraceNext == \/ IsRcvRequestVoteResponse \/ IsExecuteAppendEntries + \/ IsRcvProposeVoteRequest + RaftDriverQuirks == \* The "nodes" command in raft scenarios causes N consecutive "add_configuration" log lines to be emitted, \* where N is determined by the "nodes" parameter. At this stage, the nodes are in the "Pending" state. diff --git a/tla/ccfraft.tla b/tla/ccfraft.tla index 559dbe25db1a..ab9297751447 100644 --- a/tla/ccfraft.tla +++ b/tla/ccfraft.tla @@ -64,7 +64,8 @@ CONSTANTS RequestVoteResponse, AppendEntriesRequest, AppendEntriesResponse, - NotifyCommitMessage + NotifyCommitMessage, + ProposeVoteRequest \* CCF: Content types (Normal entry or a signature that signs \* previous entries or a reconfiguration entry) @@ -152,7 +153,7 @@ ReconfigurationVarsTypeInv == \* A set representing requests and responses sent from one server \* to another. With CCF, we have message integrity and can ensure unique messages. \* Messages only records messages that are currently in-flight, actions should -\* removed messages once received. +\* remove messages once received. \* We model messages as a single (unsorted) set and do not assume ordered message delivery between nodes. \* Node-to-node channels use TCP but out-of-order delivery could be observed due to reconnection or a malicious host. VARIABLE messages @@ -200,6 +201,10 @@ NotifyCommitMessageTypeOK(m) == /\ m.type = NotifyCommitMessage /\ m.commitIndex \in Nat +ProposeVoteRequestTypeOK(m) == + /\ m.type = ProposeVoteRequest + /\ m.term \in Nat + MessagesTypeInv == \A m \in Messages : /\ m.source \in Servers @@ -210,6 +215,7 @@ MessagesTypeInv == \/ RequestVoteRequestTypeOK(m) \/ RequestVoteResponseTypeOK(m) \/ NotifyCommitMessageTypeOK(m) + \/ ProposeVoteRequestTypeOK(m) \* CCF: After reconfiguration, a RetiredLeader leader may need to notify servers \* of the current commit level to ensure that no deadlock is reached through @@ -435,6 +441,10 @@ MaxConfigurationIndex(server) == MaxConfiguration(server) == configurations[server][MaxConfigurationIndex(server)] +HighestConfigurationWithNode(server, node) == + \* Highest configuration index, known to server, that includes node + Max({configIndex \in DOMAIN configurations[server] : node \in configurations[server][configIndex]} \union {0}) + NextConfigurationIndex(server) == \* The configuration with the 2nd smallest index is the first of the pending configurations LET dom == DOMAIN configurations[server] @@ -481,6 +491,14 @@ AppendEntriesBatchsize(i, j) == \* This can be redefined to send bigger batches of entries. {nextIndex[i][j]} + +PlausibleSucessorNodes(i) == + \* Find plausible successor nodes for i + LET + activeServers == Servers \ removedFromConfiguration + highestMatchServers == {n \in activeServers : \A m \in activeServers : matchIndex[i][n] >= matchIndex[i][m]} + IN {n \in highestMatchServers : \A m \in highestMatchServers: HighestConfigurationWithNode(i, n) >= HighestConfigurationWithNode(i, m)} + ------------------------------------------------------------------------------ \* Define initial values for all variables @@ -728,8 +746,14 @@ AdvanceCommitIndex(i) == /\ configurations' = [configurations EXCEPT ![i] = new_configurations] \* Retire if i is not in active configuration anymore /\ IF i \notin configurations[i][Min(DOMAIN new_configurations)] - THEN /\ state' = [state EXCEPT ![i] = RetiredLeader] - /\ UNCHANGED << currentTerm, votedFor, reconfigurationCount, removedFromConfiguration >> + THEN \E j \in PlausibleSucessorNodes(i) : + /\ state' = [state EXCEPT ![i] = RetiredLeader] + /\ LET msg == [type |-> ProposeVoteRequest, + term |-> currentTerm[i], + source |-> i, + dest |-> j ] + IN Send(msg) + /\ UNCHANGED << currentTerm, votedFor, reconfigurationCount, removedFromConfiguration >> \* Otherwise, states remain unchanged ELSE UNCHANGED <> \* Otherwise, Configuration and states remain unchanged @@ -1065,6 +1089,14 @@ RcvUpdateCommitIndex(i, j) == /\ UpdateCommitIndex(m.dest, m.source, m) /\ Discard(m) +RcvProposeVoteRequest(i, j) == + \E m \in MessagesTo(i) : + /\ j = m.source + /\ m.type = ProposeVoteRequest + /\ m.term = currentTerm[i] + /\ Timeout(m.dest) + /\ Discard(m) + Receive(i, j) == \/ RcvDropIgnoredMessage(i, j) \/ RcvUpdateTerm(i, j) @@ -1073,6 +1105,7 @@ Receive(i, j) == \/ RcvAppendEntriesRequest(i, j) \/ RcvAppendEntriesResponse(i, j) \/ RcvUpdateCommitIndex(i, j) + \/ RcvProposeVoteRequest(i, j) \* End of message handlers. ------------------------------------------------------------------------------ @@ -1108,6 +1141,7 @@ Spec == /\ \A i, j \in Servers : WF_vars(RcvAppendEntriesRequest(i, j)) /\ \A i, j \in Servers : WF_vars(RcvAppendEntriesResponse(i, j)) /\ \A i, j \in Servers : WF_vars(RcvUpdateCommitIndex(i, j)) + /\ \A i, j \in Servers : WF_vars(RcvProposeVoteRequest(i, j)) \* Node actions /\ \A s, t \in Servers : WF_vars(AppendEntries(s, t)) /\ \A s, t \in Servers : WF_vars(RequestVote(s, t))