Skip to content

Commit

Permalink
Adding a propose request vote message to speed up some reconfiguratio…
Browse files Browse the repository at this point in the history
…ns (#5697)
  • Loading branch information
achamayou authored Oct 5, 2023
1 parent cf8b57b commit 438ff87
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 9 deletions.
82 changes: 82 additions & 0 deletions src/consensus/aft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,15 @@ namespace aft
break;
}

case raft_propose_request_vote:
{
ProposeRequestVote r =
channels->template recv_authenticated<ProposeRequestVote>(
from, data, size);
recv_propose_request_vote(from, r);
break;
}

default:
{
RAFT_FAIL_FMT("Unhandled AFT message type: {}", type);
Expand Down Expand Up @@ -1706,6 +1715,32 @@ namespace aft
add_vote_for_me(from);
}

void recv_propose_request_vote(
const ccf::NodeId& from, ProposeRequestVote r)
{
std::lock_guard<ccf::pal::Mutex> guard(state->lock);

#ifdef CCF_RAFT_TRACING
nlohmann::json j = {};
j["function"] = "recv_propose_request_vote";
j["packet"] = r;
j["state"] = *state;
j["from_node_id"] = from;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif
if (can_endorse_primary() && ticking && r.term == state->current_view)
{
RAFT_INFO_FMT(
"Becoming candidate early due to propose request vote from {}", from);
become_candidate();
}
else
{
RAFT_INFO_FMT("Ignoring propose request vote from {}", from);
}
}

void restart_election_timeout()
{
// Randomise timeout_elapsed to get a random election timeout
Expand Down Expand Up @@ -1932,6 +1967,53 @@ namespace aft
{
leader_id.reset();
state->leadership_state = kv::LeadershipState::None;
ProposeRequestVote prv{
{raft_propose_request_vote}, state->current_view};

std::optional<ccf::NodeId> successor = std::nullopt;
Index max_match_idx = 0;
kv::ReconfigurationId reconf_id_of_max_match = 0;

// Pick the node that has the highest match_idx, and break
// ties by looking at the highest reconfiguration id they are
// part of. This can lead to nudging a node that is
// about to retire too, but that node will then nudge
// a successor, and that seems preferable to nudging a node that
// risks not being eligible if reconfiguration id is prioritised.
// Alternatively, we could pick the node with the higest match idx
// in the latest config, provided that match idx at least as high as a
// majority. That would make them both eligible and unlikely to retire
// soon.
for (auto& [node, node_state] : all_other_nodes)
{
if (node_state.match_idx >= max_match_idx)
{
kv::ReconfigurationId latest_reconf_id = 0;
auto conf = configurations.rbegin();
while (conf != configurations.rend())
{
if (conf->nodes.find(node) != conf->nodes.end())
{
latest_reconf_id = conf->idx;
break;
}
conf++;
}
if (!(node_state.match_idx == max_match_idx &&
latest_reconf_id < reconf_id_of_max_match))
{
reconf_id_of_max_match = latest_reconf_id;
successor = node;
max_match_idx = node_state.match_idx;
}
}
}
if (successor.has_value())
{
RAFT_INFO_FMT("Node retired, nudging {}", successor.value());
channels->send_authenticated(
successor.value(), ccf::NodeMsgType::consensus_msg, prv);
}
}

state->membership_state = kv::MembershipState::Retired;
Expand Down
13 changes: 12 additions & 1 deletion src/consensus/aft/raft_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ namespace aft
raft_append_entries_response,
raft_append_entries_signed_response,
raft_request_vote,
raft_request_vote_response
raft_request_vote_response,
raft_propose_request_vote,
};
DECLARE_JSON_ENUM(
RaftMsgType,
Expand All @@ -110,6 +111,7 @@ namespace aft
"raft_append_entries_signed_response"},
{RaftMsgType::raft_request_vote, "raft_request_vote"},
{RaftMsgType::raft_request_vote_response, "raft_request_vote_response"},
{RaftMsgType::raft_propose_request_vote, "raft_propose_request_vote"},
});

#pragma pack(push, 1)
Expand Down Expand Up @@ -190,5 +192,14 @@ namespace aft
DECLARE_JSON_TYPE_WITH_BASE(RequestVoteResponse, RaftHeader);
DECLARE_JSON_REQUIRED_FIELDS(RequestVoteResponse, term, vote_granted);

struct ProposeRequestVote : RaftHeader
{
// A node sends this to nudge another node to begin an election, for
// instance because the sender is a retiring primary
Term term;
};
DECLARE_JSON_TYPE_WITH_BASE(ProposeRequestVote, RaftHeader);
DECLARE_JSON_REQUIRED_FIELDS(ProposeRequestVote, term);

#pragma pack(pop)
}
16 changes: 16 additions & 0 deletions src/consensus/aft/test/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,16 @@ class RaftDriver
rlog(node_id, tgt_node_id, s, dropped);
}

void log_msg_details(
ccf::NodeId node_id,
ccf::NodeId tgt_node_id,
aft::ProposeRequestVote prv,
bool dropped)
{
const auto s = fmt::format("propose_request_vote for term {}", prv.term);
log(node_id, tgt_node_id, s, dropped);
}

void log_msg_details(
ccf::NodeId node_id,
ccf::NodeId tgt_node_id,
Expand Down Expand Up @@ -370,6 +380,12 @@ class RaftDriver
log_msg_details(node_id, tgt_node_id, aer, dropped);
break;
}
case (aft::RaftMsgType::raft_propose_request_vote):
{
auto prv = *(aft::ProposeRequestVote*)data;
log_msg_details(node_id, tgt_node_id, prv, dropped);
break;
}
default:
{
throw std::runtime_error(
Expand Down
3 changes: 0 additions & 3 deletions tests/raft_scenarios/4582.2
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ assert_is_primary,0
assert_commit_idx,0,2
assert_commit_idx,1,0

periodic_all,1000
dispatch_all

assert_is_backup,1
state_all

Expand Down
1 change: 1 addition & 0 deletions tla/MCccfraft.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ CONSTANTS
AppendEntriesRequest = AppendEntriesRequest
AppendEntriesResponse = AppendEntriesResponse
NotifyCommitMessage = NotifyCommitMessage
ProposeVoteRequest = ProposeVoteRequest

TypeEntry = Entry
TypeSignature = Signature
Expand Down
1 change: 1 addition & 0 deletions tla/MCccfraftWithReconfig.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ CONSTANTS
AppendEntriesRequest = AppendEntriesRequest
AppendEntriesResponse = AppendEntriesResponse
NotifyCommitMessage = NotifyCommitMessage
ProposeVoteRequest = ProposeVoteRequest

TypeEntry = Entry
TypeSignature = Signature
Expand Down
1 change: 1 addition & 0 deletions tla/SIMccfraft.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CONSTANTS
AppendEntriesRequest = AppendEntriesRequest
AppendEntriesResponse = AppendEntriesResponse
NotifyCommitMessage = NotifyCommitMessage
ProposeVoteRequest = ProposeVoteRequest

TypeEntry = Entry
TypeSignature = Signature
Expand Down
1 change: 1 addition & 0 deletions tla/Traceccfraft.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ CONSTANTS
AppendEntriesRequest = AppendEntriesRequest
AppendEntriesResponse = AppendEntriesResponse
NotifyCommitMessage = NotifyCommitMessage
ProposeVoteRequest = ProposeVoteRequest

TypeEntry = TypeEntry
TypeSignature = TypeSignature
Expand Down
18 changes: 17 additions & 1 deletion tla/Traceccfraft.tla
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ EXTENDS ccfraft, Json, IOUtils, Sequences, Network
\* raft_types.h enum RaftMsgType
RaftMsgType ==
"raft_append_entries" :> AppendEntriesRequest @@ "raft_append_entries_response" :> AppendEntriesResponse @@
"raft_request_vote" :> RequestVoteRequest @@ "raft_request_vote_response" :> RequestVoteResponse
"raft_request_vote" :> RequestVoteRequest @@ "raft_request_vote_response" :> RequestVoteResponse @@
"raft_propose_request_vote" :> ProposeVoteRequest

LeadershipState ==
Leader :> "Leader" @@ Follower :> "Follower" @@ Candidate :> "Candidate" @@ Pending :> "Pending"
Expand Down Expand Up @@ -301,6 +302,19 @@ IsCheckQuorum ==
/\ CheckQuorum(logline.msg.state.node_id)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)

IsRcvProposeVoteRequest ==
/\ IsEvent("recv_propose_request_vote")
/\ state[logline.msg.state.node_id] = Leader
/\ LET i == logline.msg.state.node_id
j == logline.msg.to_node_id
IN /\ \E m \in Messages':
/\ m.type = ProposeVoteRequest
/\ m.type = RaftMsgType[logline.msg.packet.msg]
/\ m.term = logline.msg.packet.term
\* There is now one more message of this type.
/\ OneMoreMessage(m)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)

TraceNext ==
\/ IsTimeout
\/ IsBecomeLeader
Expand All @@ -325,6 +339,8 @@ TraceNext ==
\/ IsRcvRequestVoteResponse
\/ IsExecuteAppendEntries

\/ IsRcvProposeVoteRequest

RaftDriverQuirks ==
\* The "nodes" command in raft scenarios causes N consecutive "add_configuration" log lines to be emitted,
\* where N is determined by the "nodes" parameter. At this stage, the nodes are in the "Pending" state.
Expand Down
42 changes: 38 additions & 4 deletions tla/ccfraft.tla
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ CONSTANTS
RequestVoteResponse,
AppendEntriesRequest,
AppendEntriesResponse,
NotifyCommitMessage
NotifyCommitMessage,
ProposeVoteRequest

\* CCF: Content types (Normal entry or a signature that signs
\* previous entries or a reconfiguration entry)
Expand Down Expand Up @@ -152,7 +153,7 @@ ReconfigurationVarsTypeInv ==
\* A set representing requests and responses sent from one server
\* to another. With CCF, we have message integrity and can ensure unique messages.
\* Messages only records messages that are currently in-flight, actions should
\* removed messages once received.
\* remove messages once received.
\* We model messages as a single (unsorted) set and do not assume ordered message delivery between nodes.
\* Node-to-node channels use TCP but out-of-order delivery could be observed due to reconnection or a malicious host.
VARIABLE messages
Expand Down Expand Up @@ -200,6 +201,10 @@ NotifyCommitMessageTypeOK(m) ==
/\ m.type = NotifyCommitMessage
/\ m.commitIndex \in Nat

ProposeVoteRequestTypeOK(m) ==
/\ m.type = ProposeVoteRequest
/\ m.term \in Nat

MessagesTypeInv ==
\A m \in Messages :
/\ m.source \in Servers
Expand All @@ -210,6 +215,7 @@ MessagesTypeInv ==
\/ RequestVoteRequestTypeOK(m)
\/ RequestVoteResponseTypeOK(m)
\/ NotifyCommitMessageTypeOK(m)
\/ ProposeVoteRequestTypeOK(m)

\* CCF: After reconfiguration, a RetiredLeader leader may need to notify servers
\* of the current commit level to ensure that no deadlock is reached through
Expand Down Expand Up @@ -435,6 +441,10 @@ MaxConfigurationIndex(server) ==
MaxConfiguration(server) ==
configurations[server][MaxConfigurationIndex(server)]

HighestConfigurationWithNode(server, node) ==
\* Highest configuration index, known to server, that includes node
Max({configIndex \in DOMAIN configurations[server] : node \in configurations[server][configIndex]} \union {0})

NextConfigurationIndex(server) ==
\* The configuration with the 2nd smallest index is the first of the pending configurations
LET dom == DOMAIN configurations[server]
Expand Down Expand Up @@ -481,6 +491,14 @@ AppendEntriesBatchsize(i, j) ==
\* This can be redefined to send bigger batches of entries.
{nextIndex[i][j]}


PlausibleSucessorNodes(i) ==
\* Find plausible successor nodes for i
LET
activeServers == Servers \ removedFromConfiguration
highestMatchServers == {n \in activeServers : \A m \in activeServers : matchIndex[i][n] >= matchIndex[i][m]}
IN {n \in highestMatchServers : \A m \in highestMatchServers: HighestConfigurationWithNode(i, n) >= HighestConfigurationWithNode(i, m)}

------------------------------------------------------------------------------
\* Define initial values for all variables

Expand Down Expand Up @@ -728,8 +746,14 @@ AdvanceCommitIndex(i) ==
/\ configurations' = [configurations EXCEPT ![i] = new_configurations]
\* Retire if i is not in active configuration anymore
/\ IF i \notin configurations[i][Min(DOMAIN new_configurations)]
THEN /\ state' = [state EXCEPT ![i] = RetiredLeader]
/\ UNCHANGED << currentTerm, votedFor, reconfigurationCount, removedFromConfiguration >>
THEN \E j \in PlausibleSucessorNodes(i) :
/\ state' = [state EXCEPT ![i] = RetiredLeader]
/\ LET msg == [type |-> ProposeVoteRequest,
term |-> currentTerm[i],
source |-> i,
dest |-> j ]
IN Send(msg)
/\ UNCHANGED << currentTerm, votedFor, reconfigurationCount, removedFromConfiguration >>
\* Otherwise, states remain unchanged
ELSE UNCHANGED <<serverVars, reconfigurationCount, removedFromConfiguration>>
\* Otherwise, Configuration and states remain unchanged
Expand Down Expand Up @@ -1065,6 +1089,14 @@ RcvUpdateCommitIndex(i, j) ==
/\ UpdateCommitIndex(m.dest, m.source, m)
/\ Discard(m)

RcvProposeVoteRequest(i, j) ==
\E m \in MessagesTo(i) :
/\ j = m.source
/\ m.type = ProposeVoteRequest
/\ m.term = currentTerm[i]
/\ Timeout(m.dest)
/\ Discard(m)

Receive(i, j) ==
\/ RcvDropIgnoredMessage(i, j)
\/ RcvUpdateTerm(i, j)
Expand All @@ -1073,6 +1105,7 @@ Receive(i, j) ==
\/ RcvAppendEntriesRequest(i, j)
\/ RcvAppendEntriesResponse(i, j)
\/ RcvUpdateCommitIndex(i, j)
\/ RcvProposeVoteRequest(i, j)

\* End of message handlers.
------------------------------------------------------------------------------
Expand Down Expand Up @@ -1108,6 +1141,7 @@ Spec ==
/\ \A i, j \in Servers : WF_vars(RcvAppendEntriesRequest(i, j))
/\ \A i, j \in Servers : WF_vars(RcvAppendEntriesResponse(i, j))
/\ \A i, j \in Servers : WF_vars(RcvUpdateCommitIndex(i, j))
/\ \A i, j \in Servers : WF_vars(RcvProposeVoteRequest(i, j))
\* Node actions
/\ \A s, t \in Servers : WF_vars(AppendEntries(s, t))
/\ \A s, t \in Servers : WF_vars(RequestVote(s, t))
Expand Down

0 comments on commit 438ff87

Please sign in to comment.