Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding a propose request vote message to speed up some reconfigurations #5697

Merged
merged 28 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
eb42e08
Initial draft of a propose request vote message
achamayou Sep 29, 2023
7905147
Merge branch 'main' into propose_vote_request
achamayou Sep 29, 2023
00f38e6
Fix driver message handling, add message to spec
achamayou Sep 29, 2023
430bf11
Don't need one more round of election
achamayou Sep 29, 2023
08692ba
This is fine
achamayou Sep 29, 2023
fb0acc6
One more
achamayou Sep 29, 2023
792e93a
Update Trace validation
achamayou Sep 29, 2023
cc4d806
Merge branch 'main' into propose_vote_request
achamayou Sep 29, 2023
a957986
Update spec to be more precise
achamayou Oct 2, 2023
286a7fb
Merge branch 'propose_vote_request' of https://github.com/achamayou/C…
achamayou Oct 2, 2023
40d938b
Clearly need coffee
achamayou Oct 2, 2023
e6c2856
comment
achamayou Oct 2, 2023
0d3c967
Ignore ProposeVotes with too high a term
achamayou Oct 2, 2023
0d27c6c
Already checked in RcvUpdateTerm
achamayou Oct 2, 2023
c9756e1
Merge branch 'main' into propose_vote_request
achamayou Oct 3, 2023
848245e
Merge https://github.com/microsoft/CCF into propose_vote_request
achamayou Oct 3, 2023
4e9720d
Pick a plausible successor the same way as the implementation
achamayou Oct 3, 2023
ed88214
Merge branch 'propose_vote_request' of https://github.com/achamayou/C…
achamayou Oct 3, 2023
dcb2553
Merge branch 'main' into propose_vote_request
achamayou Oct 3, 2023
000845f
Actually exclude removed servers
achamayou Oct 4, 2023
cb06796
Merge branch 'propose_vote_request' of https://github.com/achamayou/C…
achamayou Oct 4, 2023
a8acacd
Don't choose
achamayou Oct 4, 2023
322c389
Pick the node known to be in the highest configuration by that server
achamayou Oct 4, 2023
4cc4545
Merge branch 'main' into propose_vote_request
achamayou Oct 4, 2023
5835f6c
0
achamayou Oct 4, 2023
33d537a
Merge branch 'main' into propose_vote_request
achamayou Oct 4, 2023
0fb271c
Merge branch 'main' into propose_vote_request
achamayou Oct 4, 2023
3a8f70a
Merge branch 'main' into propose_vote_request
achamayou Oct 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions src/consensus/aft/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,15 @@ namespace aft
break;
}

case raft_propose_request_vote:
{
ProposeRequestVote r =
channels->template recv_authenticated<ProposeRequestVote>(
from, data, size);
recv_propose_request_vote(from, r);
break;
}

default:
{
RAFT_FAIL_FMT("Unhandled AFT message type: {}", type);
Expand Down Expand Up @@ -1706,6 +1715,32 @@ namespace aft
add_vote_for_me(from);
}

void recv_propose_request_vote(
const ccf::NodeId& from, ProposeRequestVote r)
{
std::lock_guard<ccf::pal::Mutex> guard(state->lock);

#ifdef CCF_RAFT_TRACING
nlohmann::json j = {};
j["function"] = "recv_propose_request_vote";
j["packet"] = r;
j["state"] = *state;
j["from_node_id"] = from;
j["committable_indices"] = committable_indices;
RAFT_TRACE_JSON_OUT(j);
#endif
if (can_endorse_primary() && ticking && r.term == state->current_view)
{
RAFT_INFO_FMT(
"Becoming candidate early due to propose request vote from {}", from);
become_candidate();
}
else
{
RAFT_INFO_FMT("Ignoring propose request vote from {}", from);
}
}

void restart_election_timeout()
{
// Randomise timeout_elapsed to get a random election timeout
Expand Down Expand Up @@ -1932,6 +1967,53 @@ namespace aft
{
leader_id.reset();
state->leadership_state = kv::LeadershipState::None;
ProposeRequestVote prv{
{raft_propose_request_vote}, state->current_view};

std::optional<ccf::NodeId> successor = std::nullopt;
Index max_match_idx = 0;
kv::ReconfigurationId reconf_id_of_max_match = 0;

// Pick the node that has the highest match_idx, and break
// ties by looking at the highest reconfiguration id they are
// part of. This can lead to nudging a node that is
// about to retire too, but that node will then nudge
// a successor, and that seems preferable to nudging a node that
// risks not being eligible if reconfiguration id is prioritised.
// Alternatively, we could pick the node with the higest match idx
// in the latest config, provided that match idx at least as high as a
// majority. That would make them both eligible and unlikely to retire
// soon.
for (auto& [node, node_state] : all_other_nodes)
{
if (node_state.match_idx >= max_match_idx)
{
kv::ReconfigurationId latest_reconf_id = 0;
auto conf = configurations.rbegin();
while (conf != configurations.rend())
{
if (conf->nodes.find(node) != conf->nodes.end())
{
latest_reconf_id = conf->idx;
break;
}
conf++;
}
if (!(node_state.match_idx == max_match_idx &&
latest_reconf_id < reconf_id_of_max_match))
{
reconf_id_of_max_match = latest_reconf_id;
successor = node;
max_match_idx = node_state.match_idx;
}
}
}
if (successor.has_value())
{
RAFT_INFO_FMT("Node retired, nudging {}", successor.value());
channels->send_authenticated(
successor.value(), ccf::NodeMsgType::consensus_msg, prv);
}
}

state->membership_state = kv::MembershipState::Retired;
Expand Down
13 changes: 12 additions & 1 deletion src/consensus/aft/raft_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ namespace aft
raft_append_entries_response,
raft_append_entries_signed_response,
raft_request_vote,
raft_request_vote_response
raft_request_vote_response,
raft_propose_request_vote,
};
DECLARE_JSON_ENUM(
RaftMsgType,
Expand All @@ -110,6 +111,7 @@ namespace aft
"raft_append_entries_signed_response"},
{RaftMsgType::raft_request_vote, "raft_request_vote"},
{RaftMsgType::raft_request_vote_response, "raft_request_vote_response"},
{RaftMsgType::raft_propose_request_vote, "raft_propose_request_vote"},
});

#pragma pack(push, 1)
Expand Down Expand Up @@ -190,5 +192,14 @@ namespace aft
DECLARE_JSON_TYPE_WITH_BASE(RequestVoteResponse, RaftHeader);
DECLARE_JSON_REQUIRED_FIELDS(RequestVoteResponse, term, vote_granted);

struct ProposeRequestVote : RaftHeader
{
// A node sends this to nudge another node to begin an election, for
// instance because the sender is a retiring primary
Term term;
};
DECLARE_JSON_TYPE_WITH_BASE(ProposeRequestVote, RaftHeader);
DECLARE_JSON_REQUIRED_FIELDS(ProposeRequestVote, term);

#pragma pack(pop)
}
16 changes: 16 additions & 0 deletions src/consensus/aft/test/driver.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,16 @@ class RaftDriver
rlog(node_id, tgt_node_id, s, dropped);
}

void log_msg_details(
ccf::NodeId node_id,
ccf::NodeId tgt_node_id,
aft::ProposeRequestVote prv,
bool dropped)
{
const auto s = fmt::format("propose_request_vote for term {}", prv.term);
log(node_id, tgt_node_id, s, dropped);
}

void log_msg_details(
ccf::NodeId node_id,
ccf::NodeId tgt_node_id,
Expand Down Expand Up @@ -370,6 +380,12 @@ class RaftDriver
log_msg_details(node_id, tgt_node_id, aer, dropped);
break;
}
case (aft::RaftMsgType::raft_propose_request_vote):
{
auto prv = *(aft::ProposeRequestVote*)data;
log_msg_details(node_id, tgt_node_id, prv, dropped);
break;
}
default:
{
throw std::runtime_error(
Expand Down
3 changes: 0 additions & 3 deletions tests/raft_scenarios/4582.2
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,6 @@ assert_is_primary,0
assert_commit_idx,0,2
assert_commit_idx,1,0

periodic_all,1000
dispatch_all

assert_is_backup,1
state_all

Expand Down
1 change: 1 addition & 0 deletions tla/MCccfraft.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ CONSTANTS
AppendEntriesRequest = AppendEntriesRequest
AppendEntriesResponse = AppendEntriesResponse
NotifyCommitMessage = NotifyCommitMessage
ProposeVoteRequest = ProposeVoteRequest

TypeEntry = Entry
TypeSignature = Signature
Expand Down
1 change: 1 addition & 0 deletions tla/MCccfraftWithReconfig.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ CONSTANTS
AppendEntriesRequest = AppendEntriesRequest
AppendEntriesResponse = AppendEntriesResponse
NotifyCommitMessage = NotifyCommitMessage
ProposeVoteRequest = ProposeVoteRequest

TypeEntry = Entry
TypeSignature = Signature
Expand Down
1 change: 1 addition & 0 deletions tla/SIMccfraft.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ CONSTANTS
AppendEntriesRequest = AppendEntriesRequest
AppendEntriesResponse = AppendEntriesResponse
NotifyCommitMessage = NotifyCommitMessage
ProposeVoteRequest = ProposeVoteRequest

TypeEntry = Entry
TypeSignature = Signature
Expand Down
1 change: 1 addition & 0 deletions tla/Traceccfraft.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ CONSTANTS
AppendEntriesRequest = AppendEntriesRequest
AppendEntriesResponse = AppendEntriesResponse
NotifyCommitMessage = NotifyCommitMessage
ProposeVoteRequest = ProposeVoteRequest

TypeEntry = TypeEntry
TypeSignature = TypeSignature
Expand Down
18 changes: 17 additions & 1 deletion tla/Traceccfraft.tla
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ EXTENDS ccfraft, Json, IOUtils, Sequences, Network
\* raft_types.h enum RaftMsgType
RaftMsgType ==
"raft_append_entries" :> AppendEntriesRequest @@ "raft_append_entries_response" :> AppendEntriesResponse @@
"raft_request_vote" :> RequestVoteRequest @@ "raft_request_vote_response" :> RequestVoteResponse
"raft_request_vote" :> RequestVoteRequest @@ "raft_request_vote_response" :> RequestVoteResponse @@
"raft_propose_request_vote" :> ProposeVoteRequest

LeadershipState ==
Leader :> "Leader" @@ Follower :> "Follower" @@ Candidate :> "Candidate" @@ Pending :> "Pending"
Expand Down Expand Up @@ -301,6 +302,19 @@ IsCheckQuorum ==
/\ CheckQuorum(logline.msg.state.node_id)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)

IsRcvProposeVoteRequest ==
/\ IsEvent("recv_propose_request_vote")
/\ state[logline.msg.state.node_id] = Leader
/\ LET i == logline.msg.state.node_id
j == logline.msg.to_node_id
IN /\ \E m \in Messages':
/\ m.type = ProposeVoteRequest
/\ m.type = RaftMsgType[logline.msg.packet.msg]
/\ m.term = logline.msg.packet.term
\* There is now one more message of this type.
/\ OneMoreMessage(m)
/\ committableIndices[logline.msg.state.node_id] = Range(logline.msg.committable_indices)

TraceNext ==
\/ IsTimeout
\/ IsBecomeLeader
Expand All @@ -325,6 +339,8 @@ TraceNext ==
\/ IsRcvRequestVoteResponse
\/ IsExecuteAppendEntries

\/ IsRcvProposeVoteRequest

RaftDriverQuirks ==
\* The "nodes" command in raft scenarios causes N consecutive "add_configuration" log lines to be emitted,
\* where N is determined by the "nodes" parameter. At this stage, the nodes are in the "Pending" state.
Expand Down
42 changes: 38 additions & 4 deletions tla/ccfraft.tla
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ CONSTANTS
RequestVoteResponse,
AppendEntriesRequest,
AppendEntriesResponse,
NotifyCommitMessage
NotifyCommitMessage,
ProposeVoteRequest

\* CCF: Content types (Normal entry or a signature that signs
\* previous entries or a reconfiguration entry)
Expand Down Expand Up @@ -152,7 +153,7 @@ ReconfigurationVarsTypeInv ==
\* A set representing requests and responses sent from one server
\* to another. With CCF, we have message integrity and can ensure unique messages.
\* Messages only records messages that are currently in-flight, actions should
\* removed messages once received.
\* remove messages once received.
\* We model messages as a single (unsorted) set and do not assume ordered message delivery between nodes.
\* Node-to-node channels use TCP but out-of-order delivery could be observed due to reconnection or a malicious host.
VARIABLE messages
Expand Down Expand Up @@ -200,6 +201,10 @@ NotifyCommitMessageTypeOK(m) ==
/\ m.type = NotifyCommitMessage
/\ m.commitIndex \in Nat

ProposeVoteRequestTypeOK(m) ==
/\ m.type = ProposeVoteRequest
/\ m.term \in Nat

MessagesTypeInv ==
\A m \in Messages :
/\ m.source \in Servers
Expand All @@ -210,6 +215,7 @@ MessagesTypeInv ==
\/ RequestVoteRequestTypeOK(m)
\/ RequestVoteResponseTypeOK(m)
\/ NotifyCommitMessageTypeOK(m)
\/ ProposeVoteRequestTypeOK(m)

\* CCF: After reconfiguration, a RetiredLeader leader may need to notify servers
\* of the current commit level to ensure that no deadlock is reached through
Expand Down Expand Up @@ -435,6 +441,10 @@ MaxConfigurationIndex(server) ==
MaxConfiguration(server) ==
configurations[server][MaxConfigurationIndex(server)]

HighestConfigurationWithNode(server, node) ==
\* Highest configuration index, known to server, that includes node
Max({configIndex \in DOMAIN configurations[server] : node \in configurations[server][configIndex]} \union {0})

NextConfigurationIndex(server) ==
\* The configuration with the 2nd smallest index is the first of the pending configurations
LET dom == DOMAIN configurations[server]
Expand Down Expand Up @@ -481,6 +491,14 @@ AppendEntriesBatchsize(i, j) ==
\* This can be redefined to send bigger batches of entries.
{nextIndex[i][j]}


PlausibleSucessorNodes(i) ==
\* Find plausible successor nodes for i
LET
activeServers == Servers \ removedFromConfiguration
highestMatchServers == {n \in activeServers : \A m \in activeServers : matchIndex[i][n] >= matchIndex[i][m]}
IN {n \in highestMatchServers : \A m \in highestMatchServers: HighestConfigurationWithNode(i, n) >= HighestConfigurationWithNode(i, m)}

------------------------------------------------------------------------------
\* Define initial values for all variables

Expand Down Expand Up @@ -728,8 +746,14 @@ AdvanceCommitIndex(i) ==
/\ configurations' = [configurations EXCEPT ![i] = new_configurations]
\* Retire if i is not in active configuration anymore
/\ IF i \notin configurations[i][Min(DOMAIN new_configurations)]
THEN /\ state' = [state EXCEPT ![i] = RetiredLeader]
/\ UNCHANGED << currentTerm, votedFor, reconfigurationCount, removedFromConfiguration >>
THEN \E j \in PlausibleSucessorNodes(i) :
/\ state' = [state EXCEPT ![i] = RetiredLeader]
/\ LET msg == [type |-> ProposeVoteRequest,
term |-> currentTerm[i],
source |-> i,
dest |-> j ]
IN Send(msg)
/\ UNCHANGED << currentTerm, votedFor, reconfigurationCount, removedFromConfiguration >>
\* Otherwise, states remain unchanged
ELSE UNCHANGED <<serverVars, reconfigurationCount, removedFromConfiguration>>
\* Otherwise, Configuration and states remain unchanged
Expand Down Expand Up @@ -1065,6 +1089,14 @@ RcvUpdateCommitIndex(i, j) ==
/\ UpdateCommitIndex(m.dest, m.source, m)
/\ Discard(m)

RcvProposeVoteRequest(i, j) ==
\E m \in MessagesTo(i) :
/\ j = m.source
/\ m.type = ProposeVoteRequest
achamayou marked this conversation as resolved.
Show resolved Hide resolved
/\ m.term = currentTerm[i]
/\ Timeout(m.dest)
/\ Discard(m)

Receive(i, j) ==
\/ RcvDropIgnoredMessage(i, j)
\/ RcvUpdateTerm(i, j)
Expand All @@ -1073,6 +1105,7 @@ Receive(i, j) ==
\/ RcvAppendEntriesRequest(i, j)
\/ RcvAppendEntriesResponse(i, j)
\/ RcvUpdateCommitIndex(i, j)
\/ RcvProposeVoteRequest(i, j)

\* End of message handlers.
------------------------------------------------------------------------------
Expand Down Expand Up @@ -1108,6 +1141,7 @@ Spec ==
/\ \A i, j \in Servers : WF_vars(RcvAppendEntriesRequest(i, j))
/\ \A i, j \in Servers : WF_vars(RcvAppendEntriesResponse(i, j))
/\ \A i, j \in Servers : WF_vars(RcvUpdateCommitIndex(i, j))
/\ \A i, j \in Servers : WF_vars(RcvProposeVoteRequest(i, j))
\* Node actions
/\ \A s, t \in Servers : WF_vars(AppendEntries(s, t))
/\ \A s, t \in Servers : WF_vars(RequestVote(s, t))
Expand Down