From ec0a3d074f751f41eaec8b2da3fd391927c94038 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 29 Jan 2024 13:10:11 +0000 Subject: [PATCH 1/8] tracker: throttle empty probes Signed-off-by: Pavel Kalinnikov --- tracker/progress.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/tracker/progress.go b/tracker/progress.go index cb4312a9..7a716dc8 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -159,12 +159,7 @@ func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) { // consider this message being a probe, so that the flow is paused. pr.MsgAppFlowPaused = pr.Inflights.Full() case StateProbe: - // TODO(pavelkalinnikov): this condition captures the previous behaviour, - // but we should set MsgAppFlowPaused unconditionally for simplicity, because any - // MsgApp in StateProbe is a probe, not only non-empty ones. - if entries > 0 { - pr.MsgAppFlowPaused = true - } + pr.MsgAppFlowPaused = true default: panic(fmt.Sprintf("sending append in unhandled state %s", pr.State)) } From a2d3231612e508225c31df8298393d74e956fbc5 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 29 Jan 2024 13:15:35 +0000 Subject: [PATCH 2/8] tracker: use a simpler SentEntires method name Signed-off-by: Pavel Kalinnikov --- raft.go | 2 +- tracker/progress.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/raft.go b/raft.go index 7f591f26..5030cbb4 100644 --- a/raft.go +++ b/raft.go @@ -640,7 +640,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { Entries: ents, Commit: r.raftLog.committed, }) - pr.UpdateOnEntriesSend(len(ents), uint64(payloadsSize(ents))) + pr.SentEntries(len(ents), uint64(payloadsSize(ents))) return true } diff --git a/tracker/progress.go b/tracker/progress.go index 7a716dc8..1adbee43 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -143,12 +143,12 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { pr.PendingSnapshot = snapshoti } -// UpdateOnEntriesSend updates the progress on the given number of consecutive -// entries being sent in a MsgApp, with the given total bytes size, appended at -// log indices >= pr.Next. +// SentEntries updates the progress on the given number of consecutive entries +// being sent in a MsgApp, with the given total bytes size, appended at log +// indices >= pr.Next. // // Must be used with StateProbe or StateReplicate. -func (pr *Progress) UpdateOnEntriesSend(entries int, bytes uint64) { +func (pr *Progress) SentEntries(entries int, bytes uint64) { switch pr.State { case StateReplicate: if entries > 0 { From 06b28b3ac8cd0c64cfb001aafcabefd15eddd71a Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Mon, 29 Jan 2024 13:03:53 +0000 Subject: [PATCH 3/8] tracker: track in-flight commit index This commit adds a Progress.pendingCommit field tracking the highest commit index <= Next-1 which the leader sent to the follower. It is used to distinguish cases when a commit index update needs or doesn't need to be sent to a follower. Signed-off-by: Pavel Kalinnikov --- raft.go | 24 ++++++++++++++--------- testdata/confchange_v2_replace_leader.txt | 4 ---- testdata/probe_and_replicate.txt | 12 ------------ tracker/progress.go | 23 ++++++++++++++++++++++ 4 files changed, 38 insertions(+), 25 deletions(-) diff --git a/raft.go b/raft.go index 5030cbb4..a2e5ad0f 100644 --- a/raft.go +++ b/raft.go @@ -599,6 +599,10 @@ func (r *raft) sendAppend(to uint64) { // argument controls whether messages with no entries will be sent // ("empty" messages are useful to convey updated Commit indexes, but // are undesirable when we're sending multiple messages in a batch). +// +// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress +// struct contains all the state necessary for deciding whether to send a +// message. func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { pr := r.trk.Progress[to] if pr.IsPaused() { @@ -641,6 +645,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { Commit: r.raftLog.committed, }) pr.SentEntries(len(ents), uint64(payloadsSize(ents))) + pr.SentCommit(r.raftLog.committed) return true } @@ -675,21 +680,21 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { // sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to uint64, ctx []byte) { + pr := r.trk.Progress[to] // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message, // the receiver(follower) might not be matched with the leader // or it might not have all the committed entries. // The leader MUST NOT forward the follower's commit to // an unmatched index. - commit := min(r.trk.Progress[to].Match, r.raftLog.committed) - m := pb.Message{ + commit := min(pr.Match, r.raftLog.committed) + r.send(pb.Message{ To: to, Type: pb.MsgHeartbeat, Commit: commit, Context: ctx, - } - - r.send(m) + }) + pr.SentCommit(commit) } // bcastAppend sends RPC, with entries to all peers that are not up-to-date @@ -1480,7 +1485,6 @@ func stepLeader(r *raft, m pb.Message) error { r.sendAppend(m.From) } } else { - oldPaused := pr.IsPaused() // We want to update our tracking if the response updates our // matched index or if the response can move a probing peer back // into StateReplicate (see heartbeat_rep_recovers_from_probing.txt @@ -1517,9 +1521,11 @@ func stepLeader(r *raft, m pb.Message) error { // to respond to pending read index requests releasePendingReadIndexMessages(r) r.bcastAppend() - } else if oldPaused { - // If we were paused before, this node may be missing the - // latest commit index, so send it. + } else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) { + // This node may be missing the latest commit index, so send it. + // NB: this is not strictly necessary because the periodic heartbeat + // messages deliver commit indices too. However, a message sent now + // may arrive earlier than the next heartbeat fires. r.sendAppend(m.From) } // We've updated flow control information above, which may diff --git a/testdata/confchange_v2_replace_leader.txt b/testdata/confchange_v2_replace_leader.txt index ae43ce21..c31adad1 100644 --- a/testdata/confchange_v2_replace_leader.txt +++ b/testdata/confchange_v2_replace_leader.txt @@ -284,12 +284,10 @@ stabilize CommittedEntries: 2/5 EntryNormal "" Messages: - 4->1 MsgApp Term:2 Log:2/5 Commit:4 4->1 MsgApp Term:2 Log:2/5 Commit:5 4->2 MsgApp Term:2 Log:2/5 Commit:5 4->3 MsgApp Term:2 Log:2/5 Commit:5 > 1 receiving messages - 4->1 MsgApp Term:2 Log:2/5 Commit:4 4->1 MsgApp Term:2 Log:2/5 Commit:5 > 2 receiving messages 4->2 MsgApp Term:2 Log:2/5 Commit:5 @@ -302,7 +300,6 @@ stabilize 2/5 EntryNormal "" Messages: 1->4 MsgAppResp Term:2 Log:0/5 - 1->4 MsgAppResp Term:2 Log:0/5 > 2 handling Ready Ready MustSync=false: HardState Term:2 Vote:4 Commit:5 @@ -318,7 +315,6 @@ stabilize Messages: 3->4 MsgAppResp Term:2 Log:0/5 > 4 receiving messages - 1->4 MsgAppResp Term:2 Log:0/5 1->4 MsgAppResp Term:2 Log:0/5 2->4 MsgAppResp Term:2 Log:0/5 3->4 MsgAppResp Term:2 Log:0/5 diff --git a/testdata/probe_and_replicate.txt b/testdata/probe_and_replicate.txt index c4100e97..832be27b 100644 --- a/testdata/probe_and_replicate.txt +++ b/testdata/probe_and_replicate.txt @@ -513,18 +513,6 @@ stabilize 1 2 2->1 MsgAppResp Term:8 Log:0/21 > 1 receiving messages 2->1 MsgAppResp Term:8 Log:0/21 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->2 MsgApp Term:8 Log:8/21 Commit:18 -> 2 receiving messages - 1->2 MsgApp Term:8 Log:8/21 Commit:18 -> 2 handling Ready - Ready MustSync=false: - Messages: - 2->1 MsgAppResp Term:8 Log:0/21 -> 1 receiving messages - 2->1 MsgAppResp Term:8 Log:0/21 stabilize 1 3 ---- diff --git a/tracker/progress.go b/tracker/progress.go index 1adbee43..cca87074 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -37,6 +37,11 @@ type Progress struct { // Invariant: 0 <= Match < Next. Next uint64 + // pendingCommit is the highest commit index in flight to the follower. + // + // Invariant: 0 <= pendingCommit < Next. + pendingCommit uint64 + // State defines how the leader should interact with the follower. // // When in StateProbe, leader sends at most one replication message @@ -127,6 +132,7 @@ func (pr *Progress) BecomeProbe() { } else { pr.ResetState(StateProbe) pr.Next = pr.Match + 1 + pr.pendingCommit = min(pr.pendingCommit, pr.Match) } } @@ -134,6 +140,7 @@ func (pr *Progress) BecomeProbe() { func (pr *Progress) BecomeReplicate() { pr.ResetState(StateReplicate) pr.Next = pr.Match + 1 + pr.pendingCommit = min(pr.pendingCommit, pr.Match) } // BecomeSnapshot moves the Progress to StateSnapshot with the specified pending @@ -165,6 +172,20 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) { } } +// CanBumpCommit returns true if sending the given commit index can potentially +// advance the follower's commit index. +func (pr *Progress) CanBumpCommit(index uint64) bool { + return pr.pendingCommit < min(index, pr.Next-1) +} + +// SentCommit updates the pendingCommit. +func (pr *Progress) SentCommit(commit uint64) { + // Sending the given commit index may bump the follower's commit index up to + // Next-1, or even higher. We track only up to Next-1, and maintain the + // invariant: pendingCommit < Next. + pr.pendingCommit = min(max(pr.pendingCommit, commit), pr.Next-1) +} + // MaybeUpdate is called when an MsgAppResp arrives from the follower, with the // index acked by it. The method returns false if the given n index comes from // an outdated message. Otherwise it updates the progress and returns true. @@ -200,6 +221,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { // // TODO(tbg): why not use matchHint if it's larger? pr.Next = pr.Match + 1 + pr.pendingCommit = min(pr.pendingCommit, pr.Match) return true } @@ -211,6 +233,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { } pr.Next = max(min(rejected, matchHint+1), pr.Match+1) + pr.pendingCommit = min(pr.pendingCommit, pr.Next-1) pr.MsgAppFlowPaused = false return true } From 626b5c896e768e1953e29d4ac3a91ce29e935cb1 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 23 Feb 2024 11:21:33 +0000 Subject: [PATCH 4/8] tracker: consolidate MsgApp decisions in Progress Signed-off-by: Pavel Kalinnikov --- doc.go | 8 +- raft.go | 130 +++++++------------- raft_test.go | 16 +-- testdata/replicate_pause.txt | 3 + testdata/slow_follower_after_compaction.txt | 2 + tracker/inflights.go | 2 + tracker/progress.go | 74 ++++++++--- tracker/progress_test.go | 11 +- 8 files changed, 128 insertions(+), 118 deletions(-) diff --git a/doc.go b/doc.go index 06253f4e..45138cb1 100644 --- a/doc.go +++ b/doc.go @@ -315,7 +315,7 @@ stale log entries: rafthttp package. 'MsgApp' contains log entries to replicate. A leader calls bcastAppend, - which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp' + which calls maybeSendAppend, which sends soon-to-be-replicated logs in 'MsgApp' type. When 'MsgApp' is passed to candidate's Step method, candidate reverts back to follower, because it indicates that there is a valid leader sending 'MsgApp' messages. Candidate and follower respond to this message in @@ -353,8 +353,8 @@ stale log entries: 'MsgSnap' requests to install a snapshot message. When a node has just become a leader or the leader receives 'MsgProp' message, it calls - 'bcastAppend' method, which then calls 'sendAppend' method to each - follower. In 'sendAppend', if a leader fails to get term or entries, + 'bcastAppend' method, which then calls 'maybeSendAppend' method to each + follower. In 'maybeSendAppend', if a leader fails to get term or entries, the leader requests snapshot by sending 'MsgSnap' type message. 'MsgSnapStatus' tells the result of snapshot install message. When a @@ -376,7 +376,7 @@ stale log entries: 'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp' is passed to leader's Step method, the leader knows which follower responded. And only when the leader's last committed index is greater than - follower's Match index, the leader runs 'sendAppend` method. + follower's Match index, the leader runs 'maybeSendAppend` method. 'MsgUnreachable' tells that request(message) wasn't delivered. When 'MsgUnreachable' is passed to leader's Step method, the leader discovers diff --git a/raft.go b/raft.go index a2e5ad0f..f5f95afb 100644 --- a/raft.go +++ b/raft.go @@ -588,24 +588,24 @@ func (r *raft) send(m pb.Message) { } } -// sendAppend sends an append RPC with new entries (if any) and the -// current commit index to the given peer. -func (r *raft) sendAppend(to uint64) { - r.maybeSendAppend(to, true) -} - -// maybeSendAppend sends an append RPC with new entries to the given peer, -// if necessary. Returns true if a message was sent. The sendIfEmpty -// argument controls whether messages with no entries will be sent -// ("empty" messages are useful to convey updated Commit indexes, but -// are undesirable when we're sending multiple messages in a batch). +// maybeSendAppend sends an append RPC with log entries (if any) that are not +// yet known to be replicated in the given peer's log, as well as the current +// commit index. Usually it sends a MsgApp message, but in some cases (e.g. the +// log has been compacted) it can send a MsgSnap. +// +// In some cases, the MsgApp message can have zero entries, and yet being sent. +// When the follower log is not fully up-to-date, we must send a MsgApp +// periodically so that eventually the flow is either accepted or rejected. Not +// doing so can result in replication stall, in cases when a MsgApp is dropped. // -// TODO(pav-kv): make invocation of maybeSendAppend stateless. The Progress -// struct contains all the state necessary for deciding whether to send a -// message. -func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { +// Returns true if a message was sent, or false otherwise. A message is not sent +// if the follower log and commit index are up-to-date, the flow is paused (for +// reasons like in-flight limits), or the message could not be constructed. +func (r *raft) maybeSendAppend(to uint64) bool { pr := r.trk.Progress[to] - if pr.IsPaused() { + + last, commit := r.raftLog.lastIndex(), r.raftLog.committed + if !pr.ShouldSendMsgApp(last, commit) { return false } @@ -617,35 +617,25 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return r.maybeSendSnapshot(to, pr) } - var ents []pb.Entry - // In a throttled StateReplicate only send empty MsgApp, to ensure progress. - // Otherwise, if we had a full Inflights and all inflight messages were in - // fact dropped, replication to that follower would stall. Instead, an empty - // MsgApp will eventually reach the follower (heartbeats responses prompt the - // leader to send an append), allowing it to be acked or rejected, both of - // which will clear out Inflights. - if pr.State != tracker.StateReplicate || !pr.Inflights.Full() { - ents, err = r.raftLog.entries(pr.Next, r.maxMsgSize) - } - if len(ents) == 0 && !sendIfEmpty { - return false - } - // TODO(pav-kv): move this check up to where err is returned. - if err != nil { // send a snapshot if we failed to get the entries - return r.maybeSendSnapshot(to, pr) + var entries []pb.Entry + if pr.CanSendEntries(last) { + if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil { + // Send a snapshot if we failed to get the entries. + return r.maybeSendSnapshot(to, pr) + } } - // Send the actual MsgApp otherwise, and update the progress accordingly. + // Send the MsgApp, and update the progress accordingly. r.send(pb.Message{ To: to, Type: pb.MsgApp, Index: prevIndex, LogTerm: prevTerm, - Entries: ents, - Commit: r.raftLog.committed, + Entries: entries, + Commit: commit, }) - pr.SentEntries(len(ents), uint64(payloadsSize(ents))) - pr.SentCommit(r.raftLog.committed) + pr.SentEntries(len(entries), uint64(payloadsSize(entries))) + pr.SentCommit(commit) return true } @@ -704,7 +694,7 @@ func (r *raft) bcastAppend() { if id == r.id { return } - r.sendAppend(id) + r.maybeSendAppend(id) }) } @@ -1482,7 +1472,7 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State == tracker.StateReplicate { pr.BecomeProbe() } - r.sendAppend(m.From) + r.maybeSendAppend(m.From) } } else { // We want to update our tracking if the response updates our @@ -1521,21 +1511,13 @@ func stepLeader(r *raft, m pb.Message) error { // to respond to pending read index requests releasePendingReadIndexMessages(r) r.bcastAppend() - } else if r.id != m.From && pr.CanBumpCommit(r.raftLog.committed) { - // This node may be missing the latest commit index, so send it. - // NB: this is not strictly necessary because the periodic heartbeat - // messages deliver commit indices too. However, a message sent now - // may arrive earlier than the next heartbeat fires. - r.sendAppend(m.From) } - // We've updated flow control information above, which may - // allow us to send multiple (size-limited) in-flight messages - // at once (such as when transitioning from probe to - // replicate, or when freeTo() covers multiple messages). If - // we have more entries to send, send as many messages as we - // can (without sending empty messages for the commit index) + // We've updated flow control information above, which may allow us to + // send multiple (size-limited) in-flight messages at once (such as when + // transitioning from probe to replicate, or when freeTo() covers + // multiple messages). Send as many messages as we can. if r.id != m.From { - for r.maybeSendAppend(m.From, false /* sendIfEmpty */) { + for r.maybeSendAppend(m.From) { } } // Transfer leadership is in progress. @@ -1548,23 +1530,7 @@ func stepLeader(r *raft, m pb.Message) error { case pb.MsgHeartbeatResp: pr.RecentActive = true pr.MsgAppFlowPaused = false - - // NB: if the follower is paused (full Inflights), this will still send an - // empty append, allowing it to recover from situations in which all the - // messages that filled up Inflights in the first place were dropped. Note - // also that the outgoing heartbeat already communicated the commit index. - // - // If the follower is fully caught up but also in StateProbe (as can happen - // if ReportUnreachable was called), we also want to send an append (it will - // be empty) to allow the follower to transition back to StateReplicate once - // it responds. - // - // Note that StateSnapshot typically satisfies pr.Match < lastIndex, but - // `pr.Paused()` is always true for StateSnapshot, so sendAppend is a - // no-op. - if pr.Match < r.raftLog.lastIndex() || pr.State == tracker.StateProbe { - r.sendAppend(m.From) - } + r.maybeSendAppend(m.From) if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { return nil @@ -1634,7 +1600,8 @@ func stepLeader(r *raft, m pb.Message) error { r.sendTimeoutNow(leadTransferee) r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) } else { - r.sendAppend(leadTransferee) + pr.MsgAppFlowPaused = false + r.maybeSendAppend(leadTransferee) } } return nil @@ -1982,21 +1949,14 @@ func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.Co return cs } - if r.maybeCommit() { - // If the configuration change means that more entries are committed now, - // broadcast/append to everyone in the updated config. - r.bcastAppend() - } else { - // Otherwise, still probe the newly added replicas; there's no reason to - // let them wait out a heartbeat interval (or the next incoming - // proposal). - r.trk.Visit(func(id uint64, pr *tracker.Progress) { - if id == r.id { - return - } - r.maybeSendAppend(id, false /* sendIfEmpty */) - }) - } + r.maybeCommit() + // If the configuration change means that more entries are committed now, + // broadcast/append to everyone in the updated config. + // + // Otherwise, still probe the newly added replicas; there's no reason to let + // them wait out a heartbeat interval (or the next incoming proposal). + r.bcastAppend() + // If the leadTransferee was removed or demoted, abort the leadership transfer. if _, tOK := r.trk.Config.Voters.IDs()[r.leadTransferee]; !tOK && r.leadTransferee != 0 { r.abortLeaderTransfer() diff --git a/raft_test.go b/raft_test.go index 5a258e56..c6b65896 100644 --- a/raft_test.go +++ b/raft_test.go @@ -141,8 +141,8 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { } r.trk.Progress[2].MsgAppFlowPaused = true r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].MsgAppFlowPaused { + t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) } } @@ -2773,7 +2773,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { // loop. After that, the follower is paused until a heartbeat response is // received. mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2) msg := r.readMessages() if len(msg) != 1 { t.Errorf("len(msg) = %d, want %d", len(msg), 1) @@ -2788,7 +2788,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2) if l := len(r.readMessages()); l != 0 { t.Errorf("len(msg) = %d, want %d", l, 0) } @@ -2835,7 +2835,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) { for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2) msgs := r.readMessages() if len(msgs) != 1 { t.Errorf("len(msg) = %d, want %d", len(msgs), 1) @@ -2852,7 +2852,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.sendAppend(2) + r.maybeSendAppend(2) msgs := r.readMessages() if len(msgs) != 0 { t.Errorf("len(msg) = %d, want %d", len(msgs), 0) @@ -4677,10 +4677,10 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) { // r1 sends 2 MsgApp messages to r2. mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")}) - r1.sendAppend(2) + r1.maybeSendAppend(2) req1 := expectOneMessage(t, r1) mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")}) - r1.sendAppend(2) + r1.maybeSendAppend(2) req2 := expectOneMessage(t, r1) // r2 receives the second MsgApp first due to reordering. diff --git a/testdata/replicate_pause.txt b/testdata/replicate_pause.txt index d9cee59f..4931480e 100644 --- a/testdata/replicate_pause.txt +++ b/testdata/replicate_pause.txt @@ -76,6 +76,9 @@ deliver-msgs drop=3 dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"] dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"] dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"] +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:12 +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:13 +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 # Repeat committing 3 entries. diff --git a/testdata/slow_follower_after_compaction.txt b/testdata/slow_follower_after_compaction.txt index 0d3d48c8..2ce02ada 100644 --- a/testdata/slow_follower_after_compaction.txt +++ b/testdata/slow_follower_after_compaction.txt @@ -88,6 +88,8 @@ deliver-msgs drop=3 ---- dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"] dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"] +dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:15 +dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:16 # Truncate the leader's log beyond node 3 log size. compact 1 17 diff --git a/tracker/inflights.go b/tracker/inflights.go index cb091e54..e22bcb8d 100644 --- a/tracker/inflights.go +++ b/tracker/inflights.go @@ -32,6 +32,8 @@ type Inflights struct { count int // number of inflight messages in the buffer bytes uint64 // number of inflight bytes + // TODO(pav-kv): do not store the limits here, pass them to methods. For flow + // control, we need to support dynamic limits. size int // the max number of inflight messages maxBytes uint64 // the max total byte size of inflight messages diff --git a/tracker/progress.go b/tracker/progress.go index cca87074..2dcd5fbe 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -27,6 +27,9 @@ import ( // NB(tbg): Progress is basically a state machine whose transitions are mostly // strewn around `*raft.raft`. Additionally, some fields are only used when in a // certain State. All of this isn't ideal. +// +// TODO(pav-kv): consolidate all flow control state changes here. Much of the +// transitions in raft.go logically belong here. type Progress struct { // Match is the index up to which the follower's log is known to match the // leader's. @@ -113,7 +116,7 @@ type Progress struct { // ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused, // PendingSnapshot, and Inflights. func (pr *Progress) ResetState(state StateType) { - pr.MsgAppFlowPaused = false + pr.PauseMsgAppProbes(false) pr.PendingSnapshot = 0 pr.State = state pr.Inflights.reset() @@ -156,20 +159,25 @@ func (pr *Progress) BecomeSnapshot(snapshoti uint64) { // // Must be used with StateProbe or StateReplicate. func (pr *Progress) SentEntries(entries int, bytes uint64) { - switch pr.State { - case StateReplicate: - if entries > 0 { - pr.Next += uint64(entries) - pr.Inflights.Add(pr.Next-1, bytes) - } - // If this message overflows the in-flights tracker, or it was already full, - // consider this message being a probe, so that the flow is paused. - pr.MsgAppFlowPaused = pr.Inflights.Full() - case StateProbe: - pr.MsgAppFlowPaused = true - default: - panic(fmt.Sprintf("sending append in unhandled state %s", pr.State)) + if pr.State == StateReplicate && entries > 0 { + pr.Next += uint64(entries) + pr.Inflights.Add(pr.Next-1, bytes) } + pr.PauseMsgAppProbes(true) +} + +// PauseMsgAppProbes pauses or unpauses empty MsgApp messages flow, depending on +// the passed-in bool. +func (pr *Progress) PauseMsgAppProbes(pause bool) { + pr.MsgAppFlowPaused = pause +} + +// CanSendEntries returns true if the flow control state allows sending at least +// one log entry to this follower. +// +// Must be used with StateProbe or StateReplicate. +func (pr *Progress) CanSendEntries(lastIndex uint64) bool { + return pr.Next <= lastIndex && (pr.State == StateProbe || !pr.Inflights.Full()) } // CanBumpCommit returns true if sending the given commit index can potentially @@ -195,7 +203,6 @@ func (pr *Progress) MaybeUpdate(n uint64) bool { } pr.Match = n pr.Next = max(pr.Next, n+1) // invariant: Match < Next - pr.MsgAppFlowPaused = false return true } @@ -234,7 +241,7 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { pr.Next = max(min(rejected, matchHint+1), pr.Match+1) pr.pendingCommit = min(pr.pendingCommit, pr.Next-1) - pr.MsgAppFlowPaused = false + pr.PauseMsgAppProbes(false) return true } @@ -249,7 +256,7 @@ func (pr *Progress) IsPaused() bool { case StateProbe: return pr.MsgAppFlowPaused case StateReplicate: - return pr.MsgAppFlowPaused + return pr.MsgAppFlowPaused && pr.Inflights.Full() case StateSnapshot: return true default: @@ -257,6 +264,39 @@ func (pr *Progress) IsPaused() bool { } } +// ShouldSendMsgApp returns true if the leader should send a MsgApp to the +// follower represented by this Progress. The given last and commit index of the +// leader log help determining if there is outstanding workload, and contribute +// to this decision-making. +// +// In StateProbe, a message is sent periodically. The flow is paused after every +// message, and un-paused on a heartbeat response. This ensures that probes are +// not too frequent, and eventually the MsgApp is either accepted or rejected. +// +// In StateReplicate, generally a message is sent if there are log entries that +// are not yet in-flight, and the in-flight limits are not exceeded. Otherwise, +// we don't send a message, or send a "probe" message in a few situations. +// +// A probe message (containing no log entries) is sent if the follower's commit +// index can be updated, or there hasn't been a probe message recently. We must +// send a message periodically even if all log entries are in-flight, in order +// to guarantee that eventually the flow is either accepted or rejected. +// +// In StateSnapshot, we do not send append messages. +func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool { + switch pr.State { + case StateProbe: + return !pr.MsgAppFlowPaused + case StateReplicate: + return pr.CanBumpCommit(commit) || + pr.Match < last && (!pr.MsgAppFlowPaused || pr.CanSendEntries(last)) + case StateSnapshot: + return false + default: + panic("unexpected state") + } +} + func (pr *Progress) String() string { var buf strings.Builder fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next) diff --git a/tracker/progress_test.go b/tracker/progress_test.go index 49dedb53..5ceaa59f 100644 --- a/tracker/progress_test.go +++ b/tracker/progress_test.go @@ -47,7 +47,7 @@ func TestProgressIsPaused(t *testing.T) { {StateProbe, false, false}, {StateProbe, true, true}, {StateReplicate, false, false}, - {StateReplicate, true, true}, + {StateReplicate, true, false}, {StateSnapshot, false, true}, {StateSnapshot, true, true}, } @@ -61,8 +61,11 @@ func TestProgressIsPaused(t *testing.T) { } } -// TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset -// MsgAppFlowPaused. +// TestProgressResume ensures that MaybeDecrTo resets MsgAppFlowPaused, and +// MaybeUpdate does not. +// +// TODO(pav-kv): there is little sense in testing these micro-behaviours in the +// struct. We should test the visible behaviour instead. func TestProgressResume(t *testing.T) { p := &Progress{ Next: 2, @@ -72,7 +75,7 @@ func TestProgressResume(t *testing.T) { assert.False(t, p.MsgAppFlowPaused) p.MsgAppFlowPaused = true p.MaybeUpdate(2) - assert.False(t, p.MsgAppFlowPaused) + assert.True(t, p.MsgAppFlowPaused) } func TestProgressBecomeProbe(t *testing.T) { From c38c1b7bafd2623fa192022d911f36ee4604306d Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 23 Feb 2024 12:31:40 +0000 Subject: [PATCH 5/8] raft: pass Progress to maybeSendAppend Signed-off-by: Pavel Kalinnikov --- raft.go | 16 +++++++--------- raft_test.go | 24 ++++++++++++++---------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/raft.go b/raft.go index f5f95afb..adec68ec 100644 --- a/raft.go +++ b/raft.go @@ -601,9 +601,7 @@ func (r *raft) send(m pb.Message) { // Returns true if a message was sent, or false otherwise. A message is not sent // if the follower log and commit index are up-to-date, the flow is paused (for // reasons like in-flight limits), or the message could not be constructed. -func (r *raft) maybeSendAppend(to uint64) bool { - pr := r.trk.Progress[to] - +func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool { last, commit := r.raftLog.lastIndex(), r.raftLog.committed if !pr.ShouldSendMsgApp(last, commit) { return false @@ -690,11 +688,11 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // bcastAppend sends RPC, with entries to all peers that are not up-to-date // according to the progress recorded in r.trk. func (r *raft) bcastAppend() { - r.trk.Visit(func(id uint64, _ *tracker.Progress) { + r.trk.Visit(func(id uint64, pr *tracker.Progress) { if id == r.id { return } - r.maybeSendAppend(id) + r.maybeSendAppend(id, pr) }) } @@ -1472,7 +1470,7 @@ func stepLeader(r *raft, m pb.Message) error { if pr.State == tracker.StateReplicate { pr.BecomeProbe() } - r.maybeSendAppend(m.From) + r.maybeSendAppend(m.From, pr) } } else { // We want to update our tracking if the response updates our @@ -1517,7 +1515,7 @@ func stepLeader(r *raft, m pb.Message) error { // transitioning from probe to replicate, or when freeTo() covers // multiple messages). Send as many messages as we can. if r.id != m.From { - for r.maybeSendAppend(m.From) { + for r.maybeSendAppend(m.From, pr) { } } // Transfer leadership is in progress. @@ -1530,7 +1528,7 @@ func stepLeader(r *raft, m pb.Message) error { case pb.MsgHeartbeatResp: pr.RecentActive = true pr.MsgAppFlowPaused = false - r.maybeSendAppend(m.From) + r.maybeSendAppend(m.From, pr) if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { return nil @@ -1601,7 +1599,7 @@ func stepLeader(r *raft, m pb.Message) error { r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) } else { pr.MsgAppFlowPaused = false - r.maybeSendAppend(leadTransferee) + r.maybeSendAppend(leadTransferee, pr) } } return nil diff --git a/raft_test.go b/raft_test.go index c6b65896..c25bd14d 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2764,7 +2764,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeProbe() + pr2 := r.trk.Progress[2] + pr2.BecomeProbe() // each round is a heartbeat for i := 0; i < 3; i++ { @@ -2773,7 +2774,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { // loop. After that, the follower is paused until a heartbeat response is // received. mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.maybeSendAppend(2) + r.maybeSendAppend(2, pr2) msg := r.readMessages() if len(msg) != 1 { t.Errorf("len(msg) = %d, want %d", len(msg), 1) @@ -2788,7 +2789,7 @@ func TestSendAppendForProgressProbe(t *testing.T) { } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.maybeSendAppend(2) + r.maybeSendAppend(2, pr2) if l := len(r.readMessages()); l != 0 { t.Errorf("len(msg) = %d, want %d", l, 0) } @@ -2831,11 +2832,12 @@ func TestSendAppendForProgressReplicate(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeReplicate() + pr2 := r.trk.Progress[2] + pr2.BecomeReplicate() for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.maybeSendAppend(2) + r.maybeSendAppend(2, pr2) msgs := r.readMessages() if len(msgs) != 1 { t.Errorf("len(msg) = %d, want %d", len(msgs), 1) @@ -2848,11 +2850,12 @@ func TestSendAppendForProgressSnapshot(t *testing.T) { r.becomeCandidate() r.becomeLeader() r.readMessages() - r.trk.Progress[2].BecomeSnapshot(10) + pr2 := r.trk.Progress[2] + pr2.BecomeSnapshot(10) for i := 0; i < 10; i++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) - r.maybeSendAppend(2) + r.maybeSendAppend(2, pr2) msgs := r.readMessages() if len(msgs) != 0 { t.Errorf("len(msg) = %d, want %d", len(msgs), 0) @@ -4671,16 +4674,17 @@ func TestLogReplicationWithReorderedMessage(t *testing.T) { r1.becomeCandidate() r1.becomeLeader() r1.readMessages() - r1.trk.Progress[2].BecomeReplicate() + pr2 := r1.trk.Progress[2] + pr2.BecomeReplicate() r2 := newTestRaft(2, 10, 1, newTestMemoryStorage(withPeers(1, 2))) // r1 sends 2 MsgApp messages to r2. mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")}) - r1.maybeSendAppend(2) + r1.maybeSendAppend(2, pr2) req1 := expectOneMessage(t, r1) mustAppendEntry(r1, pb.Entry{Data: []byte("somedata")}) - r1.maybeSendAppend(2) + r1.maybeSendAppend(2, pr2) req2 := expectOneMessage(t, r1) // r2 receives the second MsgApp first due to reordering. From 017bdda39435d5c95459de6425c1f33e37e35c6c Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 23 Feb 2024 11:28:08 +0000 Subject: [PATCH 6/8] tracker: rename the paused probes flow field Signed-off-by: Pavel Kalinnikov --- raft.go | 6 +++--- raft_snap_test.go | 8 ++++---- raft_test.go | 28 ++++++++++++++-------------- tracker/progress.go | 31 ++++++++++++++++++------------- tracker/progress_test.go | 34 +++++++++++++++++----------------- 5 files changed, 56 insertions(+), 51 deletions(-) diff --git a/raft.go b/raft.go index adec68ec..17549c63 100644 --- a/raft.go +++ b/raft.go @@ -1527,7 +1527,7 @@ func stepLeader(r *raft, m pb.Message) error { } case pb.MsgHeartbeatResp: pr.RecentActive = true - pr.MsgAppFlowPaused = false + pr.PauseMsgAppProbes(false) r.maybeSendAppend(m.From, pr) if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 { @@ -1561,7 +1561,7 @@ func stepLeader(r *raft, m pb.Message) error { // If snapshot finish, wait for the MsgAppResp from the remote node before sending // out the next MsgApp. // If snapshot failure, wait for a heartbeat interval before next try - pr.MsgAppFlowPaused = true + pr.PauseMsgAppProbes(true) case pb.MsgUnreachable: // During optimistic replication, if the remote becomes unreachable, // there is huge probability that a MsgApp is lost. @@ -1598,7 +1598,7 @@ func stepLeader(r *raft, m pb.Message) error { r.sendTimeoutNow(leadTransferee) r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee) } else { - pr.MsgAppFlowPaused = false + pr.PauseMsgAppProbes(false) r.maybeSendAppend(leadTransferee, pr) } } diff --git a/raft_snap_test.go b/raft_snap_test.go index e6058c68..a69d1993 100644 --- a/raft_snap_test.go +++ b/raft_snap_test.go @@ -83,8 +83,8 @@ func TestSnapshotFailure(t *testing.T) { if sm.trk.Progress[2].Next != 1 { t.Fatalf("Next = %d, want 1", sm.trk.Progress[2].Next) } - if !sm.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused) + if !sm.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("msgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused) } } @@ -106,8 +106,8 @@ func TestSnapshotSucceed(t *testing.T) { if sm.trk.Progress[2].Next != 12 { t.Fatalf("Next = %d, want 12", sm.trk.Progress[2].Next) } - if !sm.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("MsgAppFlowPaused = %v, want true", sm.trk.Progress[2].MsgAppFlowPaused) + if !sm.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("MsgAppProbesPaused = %v, want true", sm.trk.Progress[2].MsgAppProbesPaused) } } diff --git a/raft_test.go b/raft_test.go index c25bd14d..c41c484e 100644 --- a/raft_test.go +++ b/raft_test.go @@ -128,21 +128,21 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) { r.becomeCandidate() r.becomeLeader() - r.trk.Progress[2].MsgAppFlowPaused = true + r.trk.Progress[2].PauseMsgAppProbes(true) r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused) } r.trk.Progress[2].BecomeReplicate() - if r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppFlowPaused) + if r.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("paused = %v, want false", r.trk.Progress[2].MsgAppProbesPaused) } - r.trk.Progress[2].MsgAppFlowPaused = true + r.trk.Progress[2].PauseMsgAppProbes(true) r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp}) - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused) } } @@ -2784,8 +2784,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { } } - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused) } for j := 0; j < 10; j++ { mustAppendEntry(r, pb.Entry{Data: []byte("somedata")}) @@ -2799,8 +2799,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { for j := 0; j < r.heartbeatTimeout; j++ { r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat}) } - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused) } // consume the heartbeat @@ -2822,8 +2822,8 @@ func TestSendAppendForProgressProbe(t *testing.T) { if msg[0].Index != 0 { t.Errorf("index = %d, want %d", msg[0].Index, 0) } - if !r.trk.Progress[2].MsgAppFlowPaused { - t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppFlowPaused) + if !r.trk.Progress[2].MsgAppProbesPaused { + t.Errorf("paused = %v, want true", r.trk.Progress[2].MsgAppProbesPaused) } } diff --git a/tracker/progress.go b/tracker/progress.go index 2dcd5fbe..53157783 100644 --- a/tracker/progress.go +++ b/tracker/progress.go @@ -87,13 +87,13 @@ type Progress struct { // This is always true on the leader. RecentActive bool - // MsgAppFlowPaused is used when the MsgApp flow to a node is throttled. This - // happens in StateProbe, or StateReplicate with saturated Inflights. In both - // cases, we need to continue sending MsgApp once in a while to guarantee - // progress, but we only do so when MsgAppFlowPaused is false (it is reset on - // receiving a heartbeat response), to not overflow the receiver. See - // IsPaused(). - MsgAppFlowPaused bool + // MsgAppProbesPaused set to true prevents sending "probe" MsgApp messages to + // this follower. Used in StateProbe, or StateReplicate when all entries are + // in-flight or the in-flight volume exceeds limits. See ShouldSendMsgApp(). + // + // TODO(pav-kv): unexport this field. It is used by a few tests, but should be + // replaced by PauseMsgAppProbes() and ShouldSendMsgApp(). + MsgAppProbesPaused bool // Inflights is a sliding window for the inflight messages. // Each inflight message contains one or more log entries. @@ -113,7 +113,7 @@ type Progress struct { IsLearner bool } -// ResetState moves the Progress into the specified State, resetting MsgAppFlowPaused, +// ResetState moves the Progress into the specified State, resetting MsgAppProbesPaused, // PendingSnapshot, and Inflights. func (pr *Progress) ResetState(state StateType) { pr.PauseMsgAppProbes(false) @@ -169,7 +169,7 @@ func (pr *Progress) SentEntries(entries int, bytes uint64) { // PauseMsgAppProbes pauses or unpauses empty MsgApp messages flow, depending on // the passed-in bool. func (pr *Progress) PauseMsgAppProbes(pause bool) { - pr.MsgAppFlowPaused = pause + pr.MsgAppProbesPaused = pause } // CanSendEntries returns true if the flow control state allows sending at least @@ -251,12 +251,17 @@ func (pr *Progress) MaybeDecrTo(rejected, matchHint uint64) bool { // operation, this is false. A throttled node will be contacted less frequently // until it has reached a state in which it's able to accept a steady stream of // log entries again. +// +// TODO(pav-kv): this method is deprecated, remove it. It is still used in tests +// and String(), find a way to avoid this. The problem is that the actual flow +// control state depends on the log size and commit index, which are not part of +// this Progress struct - they are passed-in to methods like ShouldSendMsgApp(). func (pr *Progress) IsPaused() bool { switch pr.State { case StateProbe: - return pr.MsgAppFlowPaused + return pr.MsgAppProbesPaused case StateReplicate: - return pr.MsgAppFlowPaused && pr.Inflights.Full() + return pr.MsgAppProbesPaused && pr.Inflights.Full() case StateSnapshot: return true default: @@ -286,10 +291,10 @@ func (pr *Progress) IsPaused() bool { func (pr *Progress) ShouldSendMsgApp(last, commit uint64) bool { switch pr.State { case StateProbe: - return !pr.MsgAppFlowPaused + return !pr.MsgAppProbesPaused case StateReplicate: return pr.CanBumpCommit(commit) || - pr.Match < last && (!pr.MsgAppFlowPaused || pr.CanSendEntries(last)) + pr.Match < last && (!pr.MsgAppProbesPaused || pr.CanSendEntries(last)) case StateSnapshot: return false default: diff --git a/tracker/progress_test.go b/tracker/progress_test.go index 5ceaa59f..c9485c21 100644 --- a/tracker/progress_test.go +++ b/tracker/progress_test.go @@ -24,14 +24,14 @@ func TestProgressString(t *testing.T) { ins := NewInflights(1, 0) ins.Add(123, 1) pr := &Progress{ - Match: 1, - Next: 2, - State: StateSnapshot, - PendingSnapshot: 123, - RecentActive: false, - MsgAppFlowPaused: true, - IsLearner: true, - Inflights: ins, + Match: 1, + Next: 2, + State: StateSnapshot, + PendingSnapshot: 123, + RecentActive: false, + MsgAppProbesPaused: true, + IsLearner: true, + Inflights: ins, } const exp = `StateSnapshot match=1 next=2 learner paused pendingSnap=123 inactive inflight=1[full]` assert.Equal(t, exp, pr.String()) @@ -53,29 +53,29 @@ func TestProgressIsPaused(t *testing.T) { } for i, tt := range tests { p := &Progress{ - State: tt.state, - MsgAppFlowPaused: tt.paused, - Inflights: NewInflights(256, 0), + State: tt.state, + MsgAppProbesPaused: tt.paused, + Inflights: NewInflights(256, 0), } assert.Equal(t, tt.w, p.IsPaused(), i) } } -// TestProgressResume ensures that MaybeDecrTo resets MsgAppFlowPaused, and +// TestProgressResume ensures that MaybeDecrTo resets MsgAppProbesPaused, and // MaybeUpdate does not. // // TODO(pav-kv): there is little sense in testing these micro-behaviours in the // struct. We should test the visible behaviour instead. func TestProgressResume(t *testing.T) { p := &Progress{ - Next: 2, - MsgAppFlowPaused: true, + Next: 2, + MsgAppProbesPaused: true, } p.MaybeDecrTo(1, 1) - assert.False(t, p.MsgAppFlowPaused) - p.MsgAppFlowPaused = true + assert.False(t, p.MsgAppProbesPaused) + p.MsgAppProbesPaused = true p.MaybeUpdate(2) - assert.True(t, p.MsgAppFlowPaused) + assert.True(t, p.MsgAppProbesPaused) } func TestProgressBecomeProbe(t *testing.T) { From 76264df803022ef40d4a4d104706cb34ed4d6e49 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 24 Jan 2024 08:52:28 +0000 Subject: [PATCH 7/8] rawnode: expose per-follower MsgApp message stream Signed-off-by: Pavel Kalinnikov --- raft.go | 65 ++++++++++++++++++++++++++++++++++++++++++++++++------ rawnode.go | 23 +++++++++++++++++++ 2 files changed, 81 insertions(+), 7 deletions(-) diff --git a/raft.go b/raft.go index 17549c63..c5cfa98f 100644 --- a/raft.go +++ b/raft.go @@ -218,6 +218,21 @@ type Config struct { // throughput limit of 10 MB/s for this group. With RTT of 400ms, this drops // to 2.5 MB/s. See Little's law to understand the maths behind. MaxInflightBytes uint64 + // DisableEagerAppends makes raft hold off constructing log append messages in + // response to Step() calls. The messages can be collected via a separate + // MessagesTo method. + // + // This way, the application has better control when raft may call Storage + // methods and allocate memory for entries and messages. + // + // Setting this to true also improves batching: messages are constructed on + // demand, and tend to contain more entries. The application can control the + // latency/throughput trade-off by collecting messages more or less + // frequently. + // + // With this setting set to false, messages are constructed eagerly in Step() + // calls, and typically will consist of a single / few entries. + DisableEagerAppends bool // CheckQuorum specifies if the leader should check quorum activity. Leader // steps down when quorum is not active for an electionTimeout. @@ -335,6 +350,12 @@ func (c *Config) validate() error { return nil } +type msgBuf []pb.Message + +func (mb *msgBuf) append(m pb.Message) { + *(*[]pb.Message)(mb) = append(*(*[]pb.Message)(mb), m) +} + type raft struct { id uint64 @@ -360,7 +381,7 @@ type raft struct { // other nodes. // // Messages in this list must target other nodes. - msgs []pb.Message + msgs msgBuf // msgsAfterAppend contains the list of messages that should be sent after // the accumulated unstable state (e.g. term, vote, []entry, and snapshot) // has been persisted to durable storage. This includes waiting for any @@ -372,6 +393,10 @@ type raft struct { // Messages in this list have the type MsgAppResp, MsgVoteResp, or // MsgPreVoteResp. See the comment in raft.send for details. msgsAfterAppend []pb.Message + // disableEagerAppends instructs append message construction and sending until + // the Ready() call. This improves batching and allows better resource + // allocation control by the application. + disableEagerAppends bool // the leader id lead uint64 @@ -447,6 +472,7 @@ func newRaft(c *Config) *raft { maxMsgSize: entryEncodingSize(c.MaxSizePerMsg), maxUncommittedSize: entryPayloadSize(c.MaxUncommittedEntriesSize), trk: tracker.MakeProgressTracker(c.MaxInflightMsgs, c.MaxInflightBytes), + disableEagerAppends: c.DisableEagerAppends, electionTimeout: c.ElectionTick, heartbeatTimeout: c.HeartbeatTick, logger: c.Logger, @@ -502,6 +528,11 @@ func (r *raft) hardState() pb.HardState { // send schedules persisting state to a stable storage and AFTER that // sending the message (as part of next Ready message processing). func (r *raft) send(m pb.Message) { + r.sendTo(&r.msgs, m) +} + +// sendTo prepares the given message, and puts it to the output messages buffer. +func (r *raft) sendTo(buf *msgBuf, m pb.Message) { if m.From == None { m.From = r.id } @@ -584,10 +615,21 @@ func (r *raft) send(m pb.Message) { if m.To == r.id { r.logger.Panicf("message should not be self-addressed when sending %s", m.Type) } - r.msgs = append(r.msgs, m) + buf.append(m) } } +func (r *raft) getMessages(to uint64, fc FlowControl, buffer []pb.Message) []pb.Message { + if to == r.id { + // TODO(pav-kv): async log storage writes should go through this path. + return buffer + } + pr := r.trk.Progress[to] + buf := msgBuf(buffer) + r.maybeSendAppendBuf(to, pr, &buf) + return buf +} + // maybeSendAppend sends an append RPC with log entries (if any) that are not // yet known to be replicated in the given peer's log, as well as the current // commit index. Usually it sends a MsgApp message, but in some cases (e.g. the @@ -602,6 +644,15 @@ func (r *raft) send(m pb.Message) { // if the follower log and commit index are up-to-date, the flow is paused (for // reasons like in-flight limits), or the message could not be constructed. func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool { + if r.disableEagerAppends { + return false + } + return r.maybeSendAppendBuf(to, pr, &r.msgs) +} + +// maybeSendAppendBuf implements maybeSendAppend, and puts the messages into the +// provided buffer. +func (r *raft) maybeSendAppendBuf(to uint64, pr *tracker.Progress, buf *msgBuf) bool { last, commit := r.raftLog.lastIndex(), r.raftLog.committed if !pr.ShouldSendMsgApp(last, commit) { return false @@ -612,19 +663,19 @@ func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool { if err != nil { // The log probably got truncated at >= pr.Next, so we can't catch up the // follower log anymore. Send a snapshot instead. - return r.maybeSendSnapshot(to, pr) + return r.maybeSendSnapshot(to, pr, buf) } var entries []pb.Entry if pr.CanSendEntries(last) { if entries, err = r.raftLog.entries(pr.Next, r.maxMsgSize); err != nil { // Send a snapshot if we failed to get the entries. - return r.maybeSendSnapshot(to, pr) + return r.maybeSendSnapshot(to, pr, buf) } } // Send the MsgApp, and update the progress accordingly. - r.send(pb.Message{ + r.sendTo(buf, pb.Message{ To: to, Type: pb.MsgApp, Index: prevIndex, @@ -639,7 +690,7 @@ func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool { // maybeSendSnapshot fetches a snapshot from Storage, and sends it to the given // node. Returns true iff the snapshot message has been emitted successfully. -func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { +func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress, buf *msgBuf) bool { if !pr.RecentActive { r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) return false @@ -662,7 +713,7 @@ func (r *raft) maybeSendSnapshot(to uint64, pr *tracker.Progress) bool { pr.BecomeSnapshot(sindex) r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr) - r.send(pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) + r.sendTo(buf, pb.Message{To: to, Type: pb.MsgSnap, Snapshot: &snapshot}) return true } diff --git a/rawnode.go b/rawnode.go index 428ef519..e7a22f96 100644 --- a/rawnode.go +++ b/rawnode.go @@ -136,6 +136,29 @@ func (rn *RawNode) Ready() Ready { return rd } +// FlowControl tunes the volume and types of messages that GetMessages call can +// return to the application. +type FlowControl struct { + // MaxMsgAppBytes limits the number of byte in append messages. Ignored if + // zero. + MaxMsgAppBytes uint64 + + // TODO(pav-kv): specify limits for local storage append messages. + // TODO(pav-kv): control the snapshots. +} + +// MessagesTo returns outstanding messages to a particular node. It appends the +// messages to the given slice, and returns the resulting slice. +// +// At the moment, MessagesTo only returns MsgApp or MsgSnap messages, and only +// if Config.DisableEagerAppends is true. All other messages are communicated +// via Ready calls. +// +// WARNING: this is an experimental API, use it with caution. +func (rn *RawNode) MessagesTo(id uint64, fc FlowControl, buffer []pb.Message) []pb.Message { + return rn.raft.getMessages(id, fc, buffer) +} + // readyWithoutAccept returns a Ready. This is a read-only operation, i.e. there // is no obligation that the Ready must be handled. func (rn *RawNode) readyWithoutAccept() Ready { From 419efa1747cdd641f6491b9ba3a5a73744a678d4 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Fri, 9 Feb 2024 12:04:10 +0000 Subject: [PATCH 8/8] demo: the effect of batched MsgApp ready Signed-off-by: Pavel Kalinnikov --- raft.go | 7 +- rafttest/interaction_env_handler_add_nodes.go | 1 + rawnode.go | 21 +++++- testdata/async_storage_writes.txt | 52 ++++--------- .../async_storage_writes_append_aba_race.txt | 24 +++--- testdata/confchange_disable_validation.txt | 4 +- testdata/confchange_v1_add_single.txt | 2 +- testdata/confchange_v1_remove_leader.txt | 36 ++++----- .../confchange_v1_remove_leader_stepdown.txt | 36 ++++----- testdata/confchange_v2_add_double_auto.txt | 72 +++++++++--------- .../confchange_v2_add_double_implicit.txt | 12 ++- testdata/confchange_v2_add_single_auto.txt | 2 +- .../confchange_v2_add_single_explicit.txt | 20 +++-- .../heartbeat_resp_recovers_from_probing.txt | 16 +--- testdata/lagging_commit.txt | 24 +++--- testdata/prevote.txt | 4 +- testdata/replicate_pause.txt | 75 +++++++++++++++---- testdata/slow_follower_after_compaction.txt | 13 ++-- testdata/snapshot_succeed_via_app_resp.txt | 2 +- .../snapshot_succeed_via_app_resp_behind.txt | 2 +- 20 files changed, 227 insertions(+), 198 deletions(-) diff --git a/raft.go b/raft.go index c5cfa98f..70c209f5 100644 --- a/raft.go +++ b/raft.go @@ -626,7 +626,8 @@ func (r *raft) getMessages(to uint64, fc FlowControl, buffer []pb.Message) []pb. } pr := r.trk.Progress[to] buf := msgBuf(buffer) - r.maybeSendAppendBuf(to, pr, &buf) + for r.maybeSendAppendBuf(to, pr, &buf) { + } return buf } @@ -650,6 +651,10 @@ func (r *raft) maybeSendAppend(to uint64, pr *tracker.Progress) bool { return r.maybeSendAppendBuf(to, pr, &r.msgs) } +func (r *raft) appendsReady(pr *tracker.Progress) bool { + return pr.ShouldSendMsgApp(r.raftLog.lastIndex(), r.raftLog.committed) +} + // maybeSendAppendBuf implements maybeSendAppend, and puts the messages into the // provided buffer. func (r *raft) maybeSendAppendBuf(to uint64, pr *tracker.Progress, buf *msgBuf) bool { diff --git a/rafttest/interaction_env_handler_add_nodes.go b/rafttest/interaction_env_handler_add_nodes.go index e68a295f..5c699eb5 100644 --- a/rafttest/interaction_env_handler_add_nodes.go +++ b/rafttest/interaction_env_handler_add_nodes.go @@ -144,6 +144,7 @@ func (env *InteractionEnv) AddNodes(n int, cfg raft.Config, snap pb.Snapshot) er } cfg.Logger = env.Output + cfg.DisableEagerAppends = true rn, err := raft.NewRawNode(&cfg) if err != nil { return err diff --git a/rawnode.go b/rawnode.go index e7a22f96..1be65ce1 100644 --- a/rawnode.go +++ b/rawnode.go @@ -16,7 +16,6 @@ package raft import ( "errors" - pb "go.etcd.io/raft/v3/raftpb" "go.etcd.io/raft/v3/tracker" ) @@ -208,6 +207,15 @@ func (rn *RawNode) readyWithoutAccept() Ready { } } + if r.disableEagerAppends && r.state == StateLeader { + r.trk.Visit(func(id uint64, pr *tracker.Progress) { + if id == r.id { + return + } + rd.Messages = r.getMessages(id, FlowControl{}, rd.Messages) + }) + } + return rd } @@ -486,6 +494,17 @@ func (rn *RawNode) HasReady() bool { if len(r.msgs) > 0 || len(r.msgsAfterAppend) > 0 { return true } + if rn.raft.state == StateLeader { + ready := false + rn.raft.trk.Visit(func(id uint64, pr *tracker.Progress) { + if id != rn.raft.id && !ready { + ready = rn.raft.appendsReady(pr) + } + }) + if ready { + return true + } + } if r.raftLog.hasNextUnstableEnts() || r.raftLog.hasNextCommittedEnts(rn.applyUnstableEntries()) { return true } diff --git a/testdata/async_storage_writes.txt b/testdata/async_storage_writes.txt index 02e59e5a..40330fb0 100644 --- a/testdata/async_storage_writes.txt +++ b/testdata/async_storage_writes.txt @@ -87,12 +87,12 @@ stabilize Entries: 1/11 EntryNormal "" Messages: - 1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] - 1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] 1->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] Responses:[ 1->1 MsgAppResp Term:1 Log:0/11 AppendThread->1 MsgStorageAppendResp Term:1 Log:1/11 ] + 1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] + 1->3 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] > 2 receiving messages 1->2 MsgApp Term:1 Log:1/10 Commit:10 Entries:[1/11 EntryNormal ""] > 3 receiving messages @@ -151,12 +151,12 @@ stabilize CommittedEntries: 1/11 EntryNormal "" Messages: - 1->2 MsgApp Term:1 Log:1/11 Commit:11 - 1->3 MsgApp Term:1 Log:1/11 Commit:11 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:11 Vote:1 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] Responses:[ ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/11 EntryNormal ""] ] + 1->2 MsgApp Term:1 Log:1/11 Commit:11 + 1->3 MsgApp Term:1 Log:1/11 Commit:11 > 2 receiving messages 1->2 MsgApp Term:1 Log:1/11 Commit:11 > 3 receiving messages @@ -235,12 +235,12 @@ process-ready 1 2 3 Entries: 1/12 EntryNormal "prop_1" Messages: - 1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"] - 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"] 1->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses:[ 1->1 MsgAppResp Term:1 Log:0/12 AppendThread->1 MsgStorageAppendResp Term:1 Log:1/12 ] + 1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"] + 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"] > 2 handling Ready > 3 handling Ready @@ -285,12 +285,12 @@ process-ready 1 2 3 Entries: 1/13 EntryNormal "prop_2" Messages: - 1->2 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_2"] - 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_2"] 1->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses:[ 1->1 MsgAppResp Term:1 Log:0/13 AppendThread->1 MsgStorageAppendResp Term:1 Log:1/13 ] + 1->2 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_2"] + 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_2"] > 2 handling Ready > 3 handling Ready @@ -368,10 +368,6 @@ process-ready 1 2 3 CommittedEntries: 1/12 EntryNormal "prop_1" Messages: - 1->2 MsgApp Term:1 Log:1/13 Commit:12 - 1->3 MsgApp Term:1 Log:1/13 Commit:12 - 1->2 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] - 1->3 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"] Responses:[ 1->1 MsgAppResp Term:1 Log:0/14 AppendThread->1 MsgStorageAppendResp Term:1 Log:1/14 @@ -379,6 +375,8 @@ process-ready 1 2 3 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] Responses:[ ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] ] + 1->2 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] + 1->3 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] > 2 handling Ready > 3 handling Ready @@ -386,9 +384,7 @@ process-ready 1 2 3 deliver-msgs 1 2 3 ---- -1->2 MsgApp Term:1 Log:1/13 Commit:12 1->2 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] -1->3 MsgApp Term:1 Log:1/13 Commit:12 1->3 MsgApp Term:1 Log:1/13 Commit:12 Entries:[1/14 EntryNormal "prop_3"] process-ready 1 2 3 @@ -404,7 +400,6 @@ process-ready 1 2 3 1/12 EntryNormal "prop_1" Messages: 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"] Responses:[ - 2->1 MsgAppResp Term:1 Log:0/13 2->1 MsgAppResp Term:1 Log:0/14 AppendThread->2 MsgStorageAppendResp Term:1 Log:1/14 ] @@ -420,7 +415,6 @@ process-ready 1 2 3 1/12 EntryNormal "prop_1" Messages: 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"] Responses:[ - 3->1 MsgAppResp Term:1 Log:0/13 3->1 MsgAppResp Term:1 Log:0/14 AppendThread->3 MsgStorageAppendResp Term:1 Log:1/14 ] @@ -472,10 +466,6 @@ process-ready 1 2 3 CommittedEntries: 1/13 EntryNormal "prop_2" Messages: - 1->2 MsgApp Term:1 Log:1/14 Commit:13 - 1->3 MsgApp Term:1 Log:1/14 Commit:13 - 1->2 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] - 1->3 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"] Responses:[ 1->1 MsgAppResp Term:1 Log:0/15 AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15 @@ -483,6 +473,8 @@ process-ready 1 2 3 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] Responses:[ ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] ] + 1->2 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] + 1->3 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] > 2 handling Ready > 3 handling Ready @@ -490,9 +482,7 @@ process-ready 1 2 3 deliver-msgs 1 2 3 ---- -1->2 MsgApp Term:1 Log:1/14 Commit:13 1->2 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] -1->3 MsgApp Term:1 Log:1/14 Commit:13 1->3 MsgApp Term:1 Log:1/14 Commit:13 Entries:[1/15 EntryNormal "prop_4"] process-ready 1 2 3 @@ -508,7 +498,6 @@ process-ready 1 2 3 1/13 EntryNormal "prop_2" Messages: 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"] Responses:[ - 2->1 MsgAppResp Term:1 Log:0/14 2->1 MsgAppResp Term:1 Log:0/15 AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15 ] @@ -524,7 +513,6 @@ process-ready 1 2 3 1/13 EntryNormal "prop_2" Messages: 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"] Responses:[ - 3->1 MsgAppResp Term:1 Log:0/14 3->1 MsgAppResp Term:1 Log:0/15 AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15 ] @@ -544,14 +532,12 @@ process-append-thread 1 2 3 Processing: 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"] Responses: - 2->1 MsgAppResp Term:1 Log:0/13 2->1 MsgAppResp Term:1 Log:0/14 AppendThread->2 MsgStorageAppendResp Term:1 Log:1/14 > 3 processing append thread Processing: 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:12 Vote:1 Entries:[1/14 EntryNormal "prop_3"] Responses: - 3->1 MsgAppResp Term:1 Log:0/13 3->1 MsgAppResp Term:1 Log:0/14 AppendThread->3 MsgStorageAppendResp Term:1 Log:1/14 @@ -577,9 +563,7 @@ deliver-msgs 1 2 3 ---- 1->1 MsgAppResp Term:1 Log:0/14 AppendThread->1 MsgStorageAppendResp Term:1 Log:1/14 -2->1 MsgAppResp Term:1 Log:0/13 2->1 MsgAppResp Term:1 Log:0/14 -3->1 MsgAppResp Term:1 Log:0/13 3->1 MsgAppResp Term:1 Log:0/14 ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/12 EntryNormal "prop_1"] AppendThread->2 MsgStorageAppendResp Term:1 Log:1/14 @@ -595,14 +579,14 @@ process-ready 1 2 3 CommittedEntries: 1/14 EntryNormal "prop_3" Messages: - 1->2 MsgApp Term:1 Log:1/15 Commit:14 - 1->3 MsgApp Term:1 Log:1/15 Commit:14 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:14 Vote:1 Responses:[ AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15 ] 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] Responses:[ ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/14 EntryNormal "prop_3"] ] + 1->2 MsgApp Term:1 Log:1/15 Commit:14 + 1->3 MsgApp Term:1 Log:1/15 Commit:14 > 2 handling Ready > 3 handling Ready @@ -656,14 +640,12 @@ process-append-thread 1 2 3 Processing: 2->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"] Responses: - 2->1 MsgAppResp Term:1 Log:0/14 2->1 MsgAppResp Term:1 Log:0/15 AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15 > 3 processing append thread Processing: 3->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:13 Vote:1 Entries:[1/15 EntryNormal "prop_4"] Responses: - 3->1 MsgAppResp Term:1 Log:0/14 3->1 MsgAppResp Term:1 Log:0/15 AppendThread->3 MsgStorageAppendResp Term:1 Log:1/15 @@ -689,9 +671,7 @@ deliver-msgs 1 2 3 ---- 1->1 MsgAppResp Term:1 Log:0/15 AppendThread->1 MsgStorageAppendResp Term:1 Log:1/15 -2->1 MsgAppResp Term:1 Log:0/14 2->1 MsgAppResp Term:1 Log:0/15 -3->1 MsgAppResp Term:1 Log:0/14 3->1 MsgAppResp Term:1 Log:0/15 ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/13 EntryNormal "prop_2"] AppendThread->2 MsgStorageAppendResp Term:1 Log:1/15 @@ -707,12 +687,12 @@ process-ready 1 2 3 CommittedEntries: 1/15 EntryNormal "prop_4" Messages: - 1->2 MsgApp Term:1 Log:1/15 Commit:15 - 1->3 MsgApp Term:1 Log:1/15 Commit:15 1->AppendThread MsgStorageAppend Term:1 Log:0/0 Commit:15 Vote:1 1->ApplyThread MsgStorageApply Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] Responses:[ ApplyThread->1 MsgStorageApplyResp Term:0 Log:0/0 Entries:[1/15 EntryNormal "prop_4"] ] + 1->2 MsgApp Term:1 Log:1/15 Commit:15 + 1->3 MsgApp Term:1 Log:1/15 Commit:15 > 2 handling Ready > 3 handling Ready diff --git a/testdata/async_storage_writes_append_aba_race.txt b/testdata/async_storage_writes_append_aba_race.txt index 83964fe7..2f5e3cd4 100644 --- a/testdata/async_storage_writes_append_aba_race.txt +++ b/testdata/async_storage_writes_append_aba_race.txt @@ -36,16 +36,16 @@ Ready MustSync=true: Entries: 1/12 EntryNormal "init_prop" Messages: +2->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[1/12 EntryNormal "init_prop"] Responses:[ + 2->2 MsgAppResp Term:1 Log:0/12 + AppendThread->2 MsgStorageAppendResp Term:1 Log:1/12 +] 2->1 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "init_prop"] 2->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "init_prop"] 2->4 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "init_prop"] 2->5 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "init_prop"] 2->6 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "init_prop"] 2->7 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "init_prop"] -2->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[1/12 EntryNormal "init_prop"] Responses:[ - 2->2 MsgAppResp Term:1 Log:0/12 - AppendThread->2 MsgStorageAppendResp Term:1 Log:1/12 -] deliver-msgs 1 drop=(3,4,5,6,7) ---- @@ -191,16 +191,16 @@ Lead:3 State:StateLeader Entries: 2/12 EntryNormal "" Messages: +3->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[2/12 EntryNormal ""] Responses:[ + 3->3 MsgAppResp Term:2 Log:0/12 + AppendThread->3 MsgStorageAppendResp Term:2 Log:2/12 +] 3->1 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] 3->2 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] 3->4 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] 3->5 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] 3->6 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] 3->7 MsgApp Term:2 Log:1/11 Commit:11 Entries:[2/12 EntryNormal ""] -3->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[2/12 EntryNormal ""] Responses:[ - 3->3 MsgAppResp Term:2 Log:0/12 - AppendThread->3 MsgStorageAppendResp Term:2 Log:2/12 -] deliver-msgs 1 drop=(2,4,5,6,7) ---- @@ -348,16 +348,16 @@ Lead:4 State:StateLeader Entries: 3/12 EntryNormal "" Messages: +4->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[3/12 EntryNormal ""] Responses:[ + 4->4 MsgAppResp Term:3 Log:0/12 + AppendThread->4 MsgStorageAppendResp Term:3 Log:3/12 +] 4->1 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] 4->2 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] 4->3 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] 4->5 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] 4->6 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] 4->7 MsgApp Term:3 Log:1/11 Commit:11 Entries:[3/12 EntryNormal ""] -4->AppendThread MsgStorageAppend Term:0 Log:0/0 Entries:[3/12 EntryNormal ""] Responses:[ - 4->4 MsgAppResp Term:3 Log:0/12 - AppendThread->4 MsgStorageAppendResp Term:3 Log:3/12 -] # Step 7: before the new entries reach node 1, it hears of the term change # through a heartbeat and persists the new term. Node 1 then receives these diff --git a/testdata/confchange_disable_validation.txt b/testdata/confchange_disable_validation.txt index 1a2bc4fd..b0a131b9 100644 --- a/testdata/confchange_disable_validation.txt +++ b/testdata/confchange_disable_validation.txt @@ -71,6 +71,6 @@ stabilize CommittedEntries: 1/6 EntryConfChangeV2 Messages: - 1->2 MsgApp Term:1 Log:1/5 Commit:5 Entries:[1/6 EntryConfChangeV2] - 1->3 MsgApp Term:1 Log:1/5 Commit:5 Entries:[1/6 EntryConfChangeV2] + 1->2 MsgApp Term:1 Log:1/5 Commit:6 Entries:[1/6 EntryConfChangeV2] + 1->3 MsgApp Term:1 Log:1/5 Commit:6 Entries:[1/6 EntryConfChangeV2] INFO 1 switched to configuration voters=(1) learners=(2 3) diff --git a/testdata/confchange_v1_add_single.txt b/testdata/confchange_v1_add_single.txt index e54a183f..64a2c2b1 100644 --- a/testdata/confchange_v1_add_single.txt +++ b/testdata/confchange_v1_add_single.txt @@ -71,9 +71,9 @@ stabilize 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] +> 1 handling Ready DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] -> 1 handling Ready Ready MustSync=false: Messages: 1->2 MsgSnap Term:1 Log:0/0 diff --git a/testdata/confchange_v1_remove_leader.txt b/testdata/confchange_v1_remove_leader.txt index cc91508a..435b68a1 100644 --- a/testdata/confchange_v1_remove_leader.txt +++ b/testdata/confchange_v1_remove_leader.txt @@ -52,25 +52,30 @@ Entries: 1/4 EntryConfChange r1 1/5 EntryNormal "foo" Messages: -1->2 MsgApp Term:1 Log:1/3 Commit:3 Entries:[1/4 EntryConfChange r1] -1->3 MsgApp Term:1 Log:1/3 Commit:3 Entries:[1/4 EntryConfChange r1] -1->2 MsgApp Term:1 Log:1/4 Commit:3 Entries:[1/5 EntryNormal "foo"] -1->3 MsgApp Term:1 Log:1/4 Commit:3 Entries:[1/5 EntryNormal "foo"] +1->2 MsgApp Term:1 Log:1/3 Commit:3 Entries:[ + 1/4 EntryConfChange r1 + 1/5 EntryNormal "foo" +] +1->3 MsgApp Term:1 Log:1/3 Commit:3 Entries:[ + 1/4 EntryConfChange r1 + 1/5 EntryNormal "foo" +] # Send response from n2 (which is enough to commit the entries so far next time # n1 runs). stabilize 2 ---- > 2 receiving messages - 1->2 MsgApp Term:1 Log:1/3 Commit:3 Entries:[1/4 EntryConfChange r1] - 1->2 MsgApp Term:1 Log:1/4 Commit:3 Entries:[1/5 EntryNormal "foo"] + 1->2 MsgApp Term:1 Log:1/3 Commit:3 Entries:[ + 1/4 EntryConfChange r1 + 1/5 EntryNormal "foo" + ] > 2 handling Ready Ready MustSync=true: Entries: 1/4 EntryConfChange r1 1/5 EntryNormal "foo" Messages: - 2->1 MsgAppResp Term:1 Log:0/4 2->1 MsgAppResp Term:1 Log:0/5 # Put another entry in n1's log. @@ -92,7 +97,6 @@ stabilize 1 1->2 MsgApp Term:1 Log:1/5 Commit:3 Entries:[1/6 EntryNormal "bar"] 1->3 MsgApp Term:1 Log:1/5 Commit:3 Entries:[1/6 EntryNormal "bar"] > 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/4 2->1 MsgAppResp Term:1 Log:0/5 > 1 handling Ready Ready MustSync=false: @@ -101,8 +105,6 @@ stabilize 1 1/4 EntryConfChange r1 1/5 EntryNormal "foo" Messages: - 1->2 MsgApp Term:1 Log:1/6 Commit:4 - 1->3 MsgApp Term:1 Log:1/6 Commit:4 1->2 MsgApp Term:1 Log:1/6 Commit:5 1->3 MsgApp Term:1 Log:1/6 Commit:5 INFO 1 switched to configuration voters=(2 3) @@ -118,7 +120,6 @@ stabilize 2 ---- > 2 receiving messages 1->2 MsgApp Term:1 Log:1/5 Commit:3 Entries:[1/6 EntryNormal "bar"] - 1->2 MsgApp Term:1 Log:1/6 Commit:4 1->2 MsgApp Term:1 Log:1/6 Commit:5 > 2 handling Ready Ready MustSync=true: @@ -131,7 +132,6 @@ stabilize 2 Messages: 2->1 MsgAppResp Term:1 Log:0/6 2->1 MsgAppResp Term:1 Log:0/6 - 2->1 MsgAppResp Term:1 Log:0/6 INFO 2 switched to configuration voters=(2 3) # ... which thankfully is what we see on the leader. @@ -140,16 +140,16 @@ stabilize 1 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/6 2->1 MsgAppResp Term:1 Log:0/6 - 2->1 MsgAppResp Term:1 Log:0/6 # When n3 responds, quorum is reached and everything falls into place. stabilize ---- > 3 receiving messages - 1->3 MsgApp Term:1 Log:1/3 Commit:3 Entries:[1/4 EntryConfChange r1] - 1->3 MsgApp Term:1 Log:1/4 Commit:3 Entries:[1/5 EntryNormal "foo"] + 1->3 MsgApp Term:1 Log:1/3 Commit:3 Entries:[ + 1/4 EntryConfChange r1 + 1/5 EntryNormal "foo" + ] 1->3 MsgApp Term:1 Log:1/5 Commit:3 Entries:[1/6 EntryNormal "bar"] - 1->3 MsgApp Term:1 Log:1/6 Commit:4 1->3 MsgApp Term:1 Log:1/6 Commit:5 > 3 handling Ready Ready MustSync=true: @@ -162,18 +162,14 @@ stabilize 1/4 EntryConfChange r1 1/5 EntryNormal "foo" Messages: - 3->1 MsgAppResp Term:1 Log:0/4 3->1 MsgAppResp Term:1 Log:0/5 3->1 MsgAppResp Term:1 Log:0/6 3->1 MsgAppResp Term:1 Log:0/6 - 3->1 MsgAppResp Term:1 Log:0/6 INFO 3 switched to configuration voters=(2 3) > 1 receiving messages - 3->1 MsgAppResp Term:1 Log:0/4 3->1 MsgAppResp Term:1 Log:0/5 3->1 MsgAppResp Term:1 Log:0/6 3->1 MsgAppResp Term:1 Log:0/6 - 3->1 MsgAppResp Term:1 Log:0/6 > 1 handling Ready Ready MustSync=false: HardState Term:1 Vote:1 Commit:6 diff --git a/testdata/confchange_v1_remove_leader_stepdown.txt b/testdata/confchange_v1_remove_leader_stepdown.txt index fe397650..7c814598 100644 --- a/testdata/confchange_v1_remove_leader_stepdown.txt +++ b/testdata/confchange_v1_remove_leader_stepdown.txt @@ -53,25 +53,30 @@ Entries: 1/4 EntryConfChange r1 1/5 EntryNormal "foo" Messages: -1->2 MsgApp Term:1 Log:1/3 Commit:3 Entries:[1/4 EntryConfChange r1] -1->3 MsgApp Term:1 Log:1/3 Commit:3 Entries:[1/4 EntryConfChange r1] -1->2 MsgApp Term:1 Log:1/4 Commit:3 Entries:[1/5 EntryNormal "foo"] -1->3 MsgApp Term:1 Log:1/4 Commit:3 Entries:[1/5 EntryNormal "foo"] +1->2 MsgApp Term:1 Log:1/3 Commit:3 Entries:[ + 1/4 EntryConfChange r1 + 1/5 EntryNormal "foo" +] +1->3 MsgApp Term:1 Log:1/3 Commit:3 Entries:[ + 1/4 EntryConfChange r1 + 1/5 EntryNormal "foo" +] # Send response from n2 (which is enough to commit the entries so far next time # n1 runs). stabilize 2 ---- > 2 receiving messages - 1->2 MsgApp Term:1 Log:1/3 Commit:3 Entries:[1/4 EntryConfChange r1] - 1->2 MsgApp Term:1 Log:1/4 Commit:3 Entries:[1/5 EntryNormal "foo"] + 1->2 MsgApp Term:1 Log:1/3 Commit:3 Entries:[ + 1/4 EntryConfChange r1 + 1/5 EntryNormal "foo" + ] > 2 handling Ready Ready MustSync=true: Entries: 1/4 EntryConfChange r1 1/5 EntryNormal "foo" Messages: - 2->1 MsgAppResp Term:1 Log:0/4 2->1 MsgAppResp Term:1 Log:0/5 # Put another entry in n1's log. @@ -91,7 +96,6 @@ stabilize 1 1->2 MsgApp Term:1 Log:1/5 Commit:3 Entries:[1/6 EntryNormal "bar"] 1->3 MsgApp Term:1 Log:1/5 Commit:3 Entries:[1/6 EntryNormal "bar"] > 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/4 2->1 MsgAppResp Term:1 Log:0/5 > 1 handling Ready Ready MustSync=false: @@ -100,8 +104,6 @@ stabilize 1 1/4 EntryConfChange r1 1/5 EntryNormal "foo" Messages: - 1->2 MsgApp Term:1 Log:1/6 Commit:4 - 1->3 MsgApp Term:1 Log:1/6 Commit:4 1->2 MsgApp Term:1 Log:1/6 Commit:5 1->3 MsgApp Term:1 Log:1/6 Commit:5 INFO 1 switched to configuration voters=(2 3) @@ -121,7 +123,6 @@ stabilize 2 ---- > 2 receiving messages 1->2 MsgApp Term:1 Log:1/5 Commit:3 Entries:[1/6 EntryNormal "bar"] - 1->2 MsgApp Term:1 Log:1/6 Commit:4 1->2 MsgApp Term:1 Log:1/6 Commit:5 > 2 handling Ready Ready MustSync=true: @@ -134,7 +135,6 @@ stabilize 2 Messages: 2->1 MsgAppResp Term:1 Log:0/6 2->1 MsgAppResp Term:1 Log:0/6 - 2->1 MsgAppResp Term:1 Log:0/6 INFO 2 switched to configuration voters=(2 3) # ...because the old leader n1 ignores the append responses. @@ -143,16 +143,16 @@ stabilize 1 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/6 2->1 MsgAppResp Term:1 Log:0/6 - 2->1 MsgAppResp Term:1 Log:0/6 # When n3 responds, quorum is reached and everything falls into place. stabilize ---- > 3 receiving messages - 1->3 MsgApp Term:1 Log:1/3 Commit:3 Entries:[1/4 EntryConfChange r1] - 1->3 MsgApp Term:1 Log:1/4 Commit:3 Entries:[1/5 EntryNormal "foo"] + 1->3 MsgApp Term:1 Log:1/3 Commit:3 Entries:[ + 1/4 EntryConfChange r1 + 1/5 EntryNormal "foo" + ] 1->3 MsgApp Term:1 Log:1/5 Commit:3 Entries:[1/6 EntryNormal "bar"] - 1->3 MsgApp Term:1 Log:1/6 Commit:4 1->3 MsgApp Term:1 Log:1/6 Commit:5 > 3 handling Ready Ready MustSync=true: @@ -165,18 +165,14 @@ stabilize 1/4 EntryConfChange r1 1/5 EntryNormal "foo" Messages: - 3->1 MsgAppResp Term:1 Log:0/4 3->1 MsgAppResp Term:1 Log:0/5 3->1 MsgAppResp Term:1 Log:0/6 3->1 MsgAppResp Term:1 Log:0/6 - 3->1 MsgAppResp Term:1 Log:0/6 INFO 3 switched to configuration voters=(2 3) > 1 receiving messages - 3->1 MsgAppResp Term:1 Log:0/4 3->1 MsgAppResp Term:1 Log:0/5 3->1 MsgAppResp Term:1 Log:0/6 3->1 MsgAppResp Term:1 Log:0/6 - 3->1 MsgAppResp Term:1 Log:0/6 # n1 can no longer propose. propose 1 baz diff --git a/testdata/confchange_v2_add_double_auto.txt b/testdata/confchange_v2_add_double_auto.txt index bf1dfcbe..81fa8388 100644 --- a/testdata/confchange_v2_add_double_auto.txt +++ b/testdata/confchange_v2_add_double_auto.txt @@ -69,8 +69,14 @@ stabilize 1 Entries: 1/5 EntryConfChangeV2 Messages: - 1->2 MsgApp Term:1 Log:1/3 Commit:4 Entries:[1/4 EntryConfChangeV2 v2 v3] - 1->3 MsgApp Term:1 Log:1/3 Commit:4 Entries:[1/4 EntryConfChangeV2 v2 v3] + 1->2 MsgApp Term:1 Log:1/3 Commit:4 Entries:[ + 1/4 EntryConfChangeV2 v2 v3 + 1/5 EntryConfChangeV2 + ] + 1->3 MsgApp Term:1 Log:1/3 Commit:4 Entries:[ + 1/4 EntryConfChangeV2 v2 v3 + 1/5 EntryConfChangeV2 + ] # First, play out the whole interaction between n1 and n2. We see n1's probe to # n2 get rejected (since n2 needs a snapshot); the snapshot is delivered at which @@ -80,7 +86,10 @@ stabilize 1 stabilize 1 2 ---- > 2 receiving messages - 1->2 MsgApp Term:1 Log:1/3 Commit:4 Entries:[1/4 EntryConfChangeV2 v2 v3] + 1->2 MsgApp Term:1 Log:1/3 Commit:4 Entries:[ + 1/4 EntryConfChangeV2 v2 v3 + 1/5 EntryConfChangeV2 + ] INFO 2 [term: 0] received a MsgApp message with higher term from 1 [term: 1] INFO 2 became follower at term 1 DEBUG 2 [logterm: 0, index: 3] rejected MsgApp [logterm: 1, index: 3] from 1 @@ -94,9 +103,9 @@ stabilize 1 2 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] +> 1 handling Ready DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] -> 1 handling Ready Ready MustSync=false: Messages: 1->2 MsgSnap Term:1 Log:0/0 @@ -156,7 +165,10 @@ stabilize 1 2 stabilize 1 3 ---- > 3 receiving messages - 1->3 MsgApp Term:1 Log:1/3 Commit:4 Entries:[1/4 EntryConfChangeV2 v2 v3] + 1->3 MsgApp Term:1 Log:1/3 Commit:4 Entries:[ + 1/4 EntryConfChangeV2 v2 v3 + 1/5 EntryConfChangeV2 + ] INFO 3 [term: 0] received a MsgApp message with higher term from 1 [term: 1] INFO 3 became follower at term 1 DEBUG 3 [logterm: 0, index: 3] rejected MsgApp [logterm: 1, index: 3] from 1 @@ -170,9 +182,9 @@ stabilize 1 3 3->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 3 for index 3 DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=1] +> 1 handling Ready DEBUG 1 [firstindex: 3, commit: 5] sent snapshot[index: 5, term: 1] to 3 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=1 paused pendingSnap=5] -> 1 handling Ready Ready MustSync=false: Messages: 1->3 MsgSnap Term:1 Log:0/0 @@ -271,10 +283,14 @@ stabilize 1 1/7 EntryNormal "foo" 1/8 EntryNormal "bar" Messages: - 1->2 MsgApp Term:1 Log:1/6 Commit:5 Entries:[1/7 EntryNormal "foo"] - 1->3 MsgApp Term:1 Log:1/6 Commit:5 Entries:[1/7 EntryNormal "foo"] - 1->2 MsgApp Term:1 Log:1/7 Commit:5 Entries:[1/8 EntryNormal "bar"] - 1->3 MsgApp Term:1 Log:1/7 Commit:5 Entries:[1/8 EntryNormal "bar"] + 1->2 MsgApp Term:1 Log:1/6 Commit:5 Entries:[ + 1/7 EntryNormal "foo" + 1/8 EntryNormal "bar" + ] + 1->3 MsgApp Term:1 Log:1/6 Commit:5 Entries:[ + 1/7 EntryNormal "foo" + 1/8 EntryNormal "bar" + ] > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/6 3->1 MsgAppResp Term:1 Log:0/6 @@ -300,13 +316,17 @@ stabilize 1 stabilize 2 3 ---- > 2 receiving messages - 1->2 MsgApp Term:1 Log:1/6 Commit:5 Entries:[1/7 EntryNormal "foo"] - 1->2 MsgApp Term:1 Log:1/7 Commit:5 Entries:[1/8 EntryNormal "bar"] + 1->2 MsgApp Term:1 Log:1/6 Commit:5 Entries:[ + 1/7 EntryNormal "foo" + 1/8 EntryNormal "bar" + ] 1->2 MsgApp Term:1 Log:1/8 Commit:6 1->2 MsgApp Term:1 Log:1/8 Commit:6 Entries:[1/9 EntryConfChangeV2] > 3 receiving messages - 1->3 MsgApp Term:1 Log:1/6 Commit:5 Entries:[1/7 EntryNormal "foo"] - 1->3 MsgApp Term:1 Log:1/7 Commit:5 Entries:[1/8 EntryNormal "bar"] + 1->3 MsgApp Term:1 Log:1/6 Commit:5 Entries:[ + 1/7 EntryNormal "foo" + 1/8 EntryNormal "bar" + ] 1->3 MsgApp Term:1 Log:1/8 Commit:6 1->3 MsgApp Term:1 Log:1/8 Commit:6 Entries:[1/9 EntryConfChangeV2] > 2 handling Ready @@ -319,7 +339,6 @@ stabilize 2 3 CommittedEntries: 1/6 EntryConfChangeV2 r2 r3 Messages: - 2->1 MsgAppResp Term:1 Log:0/7 2->1 MsgAppResp Term:1 Log:0/8 2->1 MsgAppResp Term:1 Log:0/8 2->1 MsgAppResp Term:1 Log:0/9 @@ -334,7 +353,6 @@ stabilize 2 3 CommittedEntries: 1/6 EntryConfChangeV2 r2 r3 Messages: - 3->1 MsgAppResp Term:1 Log:0/7 3->1 MsgAppResp Term:1 Log:0/8 3->1 MsgAppResp Term:1 Log:0/8 3->1 MsgAppResp Term:1 Log:0/9 @@ -346,11 +364,9 @@ stabilize 2 3 stabilize ---- > 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/7 2->1 MsgAppResp Term:1 Log:0/8 2->1 MsgAppResp Term:1 Log:0/8 2->1 MsgAppResp Term:1 Log:0/9 - 3->1 MsgAppResp Term:1 Log:0/7 3->1 MsgAppResp Term:1 Log:0/8 3->1 MsgAppResp Term:1 Log:0/8 3->1 MsgAppResp Term:1 Log:0/9 @@ -362,20 +378,12 @@ stabilize 1/8 EntryNormal "bar" 1/9 EntryConfChangeV2 Messages: - 1->2 MsgApp Term:1 Log:1/9 Commit:7 - 1->3 MsgApp Term:1 Log:1/9 Commit:7 - 1->2 MsgApp Term:1 Log:1/9 Commit:8 - 1->3 MsgApp Term:1 Log:1/9 Commit:8 1->2 MsgApp Term:1 Log:1/9 Commit:9 1->3 MsgApp Term:1 Log:1/9 Commit:9 INFO 1 switched to configuration voters=(1) > 2 receiving messages - 1->2 MsgApp Term:1 Log:1/9 Commit:7 - 1->2 MsgApp Term:1 Log:1/9 Commit:8 1->2 MsgApp Term:1 Log:1/9 Commit:9 > 3 receiving messages - 1->3 MsgApp Term:1 Log:1/9 Commit:7 - 1->3 MsgApp Term:1 Log:1/9 Commit:8 1->3 MsgApp Term:1 Log:1/9 Commit:9 > 2 handling Ready Ready MustSync=false: @@ -386,8 +394,6 @@ stabilize 1/9 EntryConfChangeV2 Messages: 2->1 MsgAppResp Term:1 Log:0/9 - 2->1 MsgAppResp Term:1 Log:0/9 - 2->1 MsgAppResp Term:1 Log:0/9 INFO 2 switched to configuration voters=(1) > 3 handling Ready Ready MustSync=false: @@ -398,19 +404,9 @@ stabilize 1/9 EntryConfChangeV2 Messages: 3->1 MsgAppResp Term:1 Log:0/9 - 3->1 MsgAppResp Term:1 Log:0/9 - 3->1 MsgAppResp Term:1 Log:0/9 INFO 3 switched to configuration voters=(1) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/9 raft: cannot step as peer not found - 2->1 MsgAppResp Term:1 Log:0/9 - raft: cannot step as peer not found - 2->1 MsgAppResp Term:1 Log:0/9 - raft: cannot step as peer not found - 3->1 MsgAppResp Term:1 Log:0/9 - raft: cannot step as peer not found - 3->1 MsgAppResp Term:1 Log:0/9 - raft: cannot step as peer not found 3->1 MsgAppResp Term:1 Log:0/9 raft: cannot step as peer not found diff --git a/testdata/confchange_v2_add_double_implicit.txt b/testdata/confchange_v2_add_double_implicit.txt index 536d66b8..e523bc41 100644 --- a/testdata/confchange_v2_add_double_implicit.txt +++ b/testdata/confchange_v2_add_double_implicit.txt @@ -61,9 +61,15 @@ stabilize 1 2 Entries: 1/5 EntryConfChangeV2 Messages: - 1->2 MsgApp Term:1 Log:1/3 Commit:4 Entries:[1/4 EntryConfChangeV2 v2] + 1->2 MsgApp Term:1 Log:1/3 Commit:4 Entries:[ + 1/4 EntryConfChangeV2 v2 + 1/5 EntryConfChangeV2 + ] > 2 receiving messages - 1->2 MsgApp Term:1 Log:1/3 Commit:4 Entries:[1/4 EntryConfChangeV2 v2] + 1->2 MsgApp Term:1 Log:1/3 Commit:4 Entries:[ + 1/4 EntryConfChangeV2 v2 + 1/5 EntryConfChangeV2 + ] INFO 2 [term: 0] received a MsgApp message with higher term from 1 [term: 1] INFO 2 became follower at term 1 DEBUG 2 [logterm: 0, index: 3] rejected MsgApp [logterm: 1, index: 3] from 1 @@ -77,9 +83,9 @@ stabilize 1 2 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] +> 1 handling Ready DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] -> 1 handling Ready Ready MustSync=false: Messages: 1->2 MsgSnap Term:1 Log:0/0 diff --git a/testdata/confchange_v2_add_single_auto.txt b/testdata/confchange_v2_add_single_auto.txt index 1c487da8..09a674e7 100644 --- a/testdata/confchange_v2_add_single_auto.txt +++ b/testdata/confchange_v2_add_single_auto.txt @@ -72,9 +72,9 @@ stabilize 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] +> 1 handling Ready DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] -> 1 handling Ready Ready MustSync=false: Messages: 1->2 MsgSnap Term:1 Log:0/0 diff --git a/testdata/confchange_v2_add_single_explicit.txt b/testdata/confchange_v2_add_single_explicit.txt index 123cd17a..65334f92 100644 --- a/testdata/confchange_v2_add_single_explicit.txt +++ b/testdata/confchange_v2_add_single_explicit.txt @@ -72,9 +72,9 @@ stabilize 1 2 2->1 MsgAppResp Term:1 Log:0/3 Rejected (Hint: 0) DEBUG 1 received MsgAppResp(rejected, hint: (index 0, term 0)) from 2 for index 3 DEBUG 1 decreased progress of 2 to [StateProbe match=0 next=1] +> 1 handling Ready DEBUG 1 [firstindex: 3, commit: 4] sent snapshot[index: 4, term: 1] to 2 [StateProbe match=0 next=1] DEBUG 1 paused sending replication messages to 2 [StateSnapshot match=0 next=1 paused pendingSnap=4] -> 1 handling Ready Ready MustSync=false: Messages: 1->2 MsgSnap Term:1 Log:0/0 @@ -130,21 +130,23 @@ stabilize 1/5 EntryNormal "" 1/6 EntryConfChangeV2 Messages: - 1->2 MsgApp Term:1 Log:1/4 Commit:4 Entries:[1/5 EntryNormal ""] - 1->2 MsgApp Term:1 Log:1/5 Commit:4 Entries:[1/6 EntryConfChangeV2] + 1->2 MsgApp Term:1 Log:1/4 Commit:4 Entries:[ + 1/5 EntryNormal "" + 1/6 EntryConfChangeV2 + ] > 2 receiving messages - 1->2 MsgApp Term:1 Log:1/4 Commit:4 Entries:[1/5 EntryNormal ""] - 1->2 MsgApp Term:1 Log:1/5 Commit:4 Entries:[1/6 EntryConfChangeV2] + 1->2 MsgApp Term:1 Log:1/4 Commit:4 Entries:[ + 1/5 EntryNormal "" + 1/6 EntryConfChangeV2 + ] > 2 handling Ready Ready MustSync=true: Entries: 1/5 EntryNormal "" 1/6 EntryConfChangeV2 Messages: - 2->1 MsgAppResp Term:1 Log:0/5 2->1 MsgAppResp Term:1 Log:0/6 > 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/5 2->1 MsgAppResp Term:1 Log:0/6 > 1 handling Ready Ready MustSync=false: @@ -153,11 +155,9 @@ stabilize 1/5 EntryNormal "" 1/6 EntryConfChangeV2 Messages: - 1->2 MsgApp Term:1 Log:1/6 Commit:5 1->2 MsgApp Term:1 Log:1/6 Commit:6 INFO 1 switched to configuration voters=(1 2) > 2 receiving messages - 1->2 MsgApp Term:1 Log:1/6 Commit:5 1->2 MsgApp Term:1 Log:1/6 Commit:6 > 2 handling Ready Ready MustSync=false: @@ -167,11 +167,9 @@ stabilize 1/6 EntryConfChangeV2 Messages: 2->1 MsgAppResp Term:1 Log:0/6 - 2->1 MsgAppResp Term:1 Log:0/6 INFO 2 switched to configuration voters=(1 2) > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/6 - 2->1 MsgAppResp Term:1 Log:0/6 # Check that trying to transition out again won't do anything. propose-conf-change 1 diff --git a/testdata/heartbeat_resp_recovers_from_probing.txt b/testdata/heartbeat_resp_recovers_from_probing.txt index e606a155..76d75a26 100644 --- a/testdata/heartbeat_resp_recovers_from_probing.txt +++ b/testdata/heartbeat_resp_recovers_from_probing.txt @@ -55,33 +55,25 @@ stabilize Messages: 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 + 1->2 MsgApp Term:1 Log:1/11 Commit:11 > 2 receiving messages 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:11 + 1->2 MsgApp Term:1 Log:1/11 Commit:11 > 3 receiving messages 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 > 2 handling Ready Ready MustSync=false: Messages: 2->1 MsgHeartbeatResp Term:1 Log:0/0 + 2->1 MsgAppResp Term:1 Log:0/11 > 3 handling Ready Ready MustSync=false: Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 - 3->1 MsgHeartbeatResp Term:1 Log:0/0 -> 1 handling Ready - Ready MustSync=false: - Messages: - 1->2 MsgApp Term:1 Log:1/11 Commit:11 -> 2 receiving messages - 1->2 MsgApp Term:1 Log:1/11 Commit:11 -> 2 handling Ready - Ready MustSync=false: - Messages: - 2->1 MsgAppResp Term:1 Log:0/11 -> 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/11 + 3->1 MsgHeartbeatResp Term:1 Log:0/0 status 1 ---- diff --git a/testdata/lagging_commit.txt b/testdata/lagging_commit.txt index 8f8ba336..fe1bf413 100644 --- a/testdata/lagging_commit.txt +++ b/testdata/lagging_commit.txt @@ -39,10 +39,14 @@ ok deliver-msgs 2 3 ---- -1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "data1"] -1->2 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "data2"] -1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "data1"] -1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "data2"] +1->2 MsgApp Term:1 Log:1/11 Commit:11 Entries:[ + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" +] +1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[ + 1/12 EntryNormal "data1" + 1/13 EntryNormal "data2" +] process-ready 3 ---- @@ -51,14 +55,12 @@ Entries: 1/12 EntryNormal "data1" 1/13 EntryNormal "data2" Messages: -3->1 MsgAppResp Term:1 Log:0/12 3->1 MsgAppResp Term:1 Log:0/13 # Suppose there is a network blip which prevents the leader learning that the # follower 3 has appended the proposed entries to the log. deliver-msgs drop=(1) ---- -dropped: 3->1 MsgAppResp Term:1 Log:0/12 dropped: 3->1 MsgAppResp Term:1 Log:0/13 # In the meantime, the entries are committed, and the leader sends the commit @@ -71,10 +73,8 @@ stabilize 1 2 1/12 EntryNormal "data1" 1/13 EntryNormal "data2" Messages: - 2->1 MsgAppResp Term:1 Log:0/12 2->1 MsgAppResp Term:1 Log:0/13 > 1 receiving messages - 2->1 MsgAppResp Term:1 Log:0/12 2->1 MsgAppResp Term:1 Log:0/13 > 1 handling Ready Ready MustSync=false: @@ -83,12 +83,9 @@ stabilize 1 2 1/12 EntryNormal "data1" 1/13 EntryNormal "data2" Messages: - 1->2 MsgApp Term:1 Log:1/13 Commit:12 - 1->3 MsgApp Term:1 Log:1/13 Commit:12 1->2 MsgApp Term:1 Log:1/13 Commit:13 1->3 MsgApp Term:1 Log:1/13 Commit:13 > 2 receiving messages - 1->2 MsgApp Term:1 Log:1/13 Commit:12 1->2 MsgApp Term:1 Log:1/13 Commit:13 > 2 handling Ready Ready MustSync=false: @@ -98,16 +95,13 @@ stabilize 1 2 1/13 EntryNormal "data2" Messages: 2->1 MsgAppResp Term:1 Log:0/13 - 2->1 MsgAppResp Term:1 Log:0/13 > 1 receiving messages 2->1 MsgAppResp Term:1 Log:0/13 - 2->1 MsgAppResp Term:1 Log:0/13 # The network blip prevents the follower 3 from learning that the previously # appended entries are now committed. deliver-msgs drop=(3) ---- -dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:12 dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:13 # The network blip ends here. @@ -116,7 +110,7 @@ status 1 ---- 1: StateReplicate match=13 next=14 2: StateReplicate match=13 next=14 -3: StateReplicate match=11 next=14 inflight=2 +3: StateReplicate match=11 next=14 inflight=1 # The leader still observes that the entries are in-flight to the follower 3, # since it hasn't heard from it. Nothing triggers updating the follower's diff --git a/testdata/prevote.txt b/testdata/prevote.txt index db763d35..a7a3fecf 100644 --- a/testdata/prevote.txt +++ b/testdata/prevote.txt @@ -90,9 +90,9 @@ stabilize CommittedEntries: 1/12 EntryNormal "prop_1" Messages: + 1->3 MsgPreVoteResp Term:1 Log:0/0 Rejected (Hint: 0) 1->2 MsgApp Term:1 Log:1/12 Commit:12 1->3 MsgApp Term:1 Log:1/12 Commit:12 - 1->3 MsgPreVoteResp Term:1 Log:0/0 Rejected (Hint: 0) > 2 handling Ready Ready MustSync=false: Messages: @@ -102,8 +102,8 @@ stabilize > 3 receiving messages 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1"] INFO 3 became follower at term 1 - 1->3 MsgApp Term:1 Log:1/12 Commit:12 1->3 MsgPreVoteResp Term:1 Log:0/0 Rejected (Hint: 0) + 1->3 MsgApp Term:1 Log:1/12 Commit:12 2->3 MsgPreVoteResp Term:1 Log:0/0 Rejected (Hint: 0) > 2 handling Ready Ready MustSync=false: diff --git a/testdata/replicate_pause.txt b/testdata/replicate_pause.txt index 4931480e..326f5c23 100644 --- a/testdata/replicate_pause.txt +++ b/testdata/replicate_pause.txt @@ -47,8 +47,8 @@ ok status 1 ---- 1: StateReplicate match=14 next=15 -2: StateReplicate match=11 next=15 paused inflight=3[full] -3: StateReplicate match=11 next=15 paused inflight=3[full] +2: StateReplicate match=11 next=15 inflight=1 +3: StateReplicate match=11 next=15 inflight=1 log-level none ---- @@ -68,16 +68,16 @@ status 1 ---- 1: StateReplicate match=14 next=15 2: StateReplicate match=14 next=15 -3: StateReplicate match=11 next=15 paused inflight=3[full] +3: StateReplicate match=11 next=15 inflight=1 # Drop append messages to node 3. deliver-msgs drop=3 ---- -dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[1/12 EntryNormal "prop_1_12"] -dropped: 1->3 MsgApp Term:1 Log:1/12 Commit:11 Entries:[1/13 EntryNormal "prop_1_13"] -dropped: 1->3 MsgApp Term:1 Log:1/13 Commit:11 Entries:[1/14 EntryNormal "prop_1_14"] -dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:12 -dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:13 +dropped: 1->3 MsgApp Term:1 Log:1/11 Commit:11 Entries:[ + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" +] dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 @@ -98,8 +98,8 @@ ok status 1 ---- 1: StateReplicate match=14 next=15 -2: StateReplicate match=14 next=18 paused inflight=3[full] -3: StateReplicate match=11 next=15 paused inflight=3[full] +2: StateReplicate match=14 next=15 +3: StateReplicate match=11 next=15 inflight=1 log-level none ---- @@ -119,7 +119,7 @@ status 1 ---- 1: StateReplicate match=17 next=18 2: StateReplicate match=17 next=18 -3: StateReplicate match=11 next=15 paused inflight=3[full] +3: StateReplicate match=11 next=18 inflight=2 # Make a heartbeat roundtrip. tick-heartbeat 1 @@ -139,6 +139,14 @@ stabilize 2 3 > 2 receiving messages 1->2 MsgHeartbeat Term:1 Log:0/0 Commit:17 > 3 receiving messages + 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[ + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" + ] + DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1 + 1->3 MsgApp Term:1 Log:1/17 Commit:17 + DEBUG 3 [logterm: 0, index: 17] rejected MsgApp [logterm: 1, index: 17] from 1 1->3 MsgHeartbeat Term:1 Log:0/0 Commit:11 > 2 handling Ready Ready MustSync=false: @@ -148,6 +156,8 @@ stabilize 2 3 Ready MustSync=false: Messages: 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) + 3->1 MsgAppResp Term:1 Log:1/17 Rejected (Hint: 11) # After handling heartbeat responses, node 1 sends an empty MsgApp to a # throttled node 3 because it hasn't yet replied to a single MsgApp, and the @@ -157,21 +167,54 @@ stabilize 1 > 1 receiving messages 2->1 MsgHeartbeatResp Term:1 Log:0/0 3->1 MsgHeartbeatResp Term:1 Log:0/0 + 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) + DEBUG 1 received MsgAppResp(rejected, hint: (index 11, term 1)) from 3 for index 14 + DEBUG 1 decreased progress of 3 to [StateReplicate match=11 next=12 inflight=2] + 3->1 MsgAppResp Term:1 Log:1/17 Rejected (Hint: 11) + DEBUG 1 received MsgAppResp(rejected, hint: (index 11, term 1)) from 3 for index 17 > 1 handling Ready Ready MustSync=false: Messages: - 1->3 MsgApp Term:1 Log:1/14 Commit:17 + 1->3 MsgApp Term:1 Log:1/11 Commit:17 Entries:[ + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" + ] # Node 3 finally receives a MsgApp, but there was a gap, so it rejects it. stabilize 3 ---- > 3 receiving messages - 1->3 MsgApp Term:1 Log:1/14 Commit:17 - DEBUG 3 [logterm: 0, index: 14] rejected MsgApp [logterm: 1, index: 14] from 1 + 1->3 MsgApp Term:1 Log:1/11 Commit:17 Entries:[ + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" + ] > 3 handling Ready - Ready MustSync=false: + Ready MustSync=true: + HardState Term:1 Vote:1 Commit:17 + Entries: + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" + CommittedEntries: + 1/12 EntryNormal "prop_1_12" + 1/13 EntryNormal "prop_1_13" + 1/14 EntryNormal "prop_1_14" + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" Messages: - 3->1 MsgAppResp Term:1 Log:1/14 Rejected (Hint: 11) + 3->1 MsgAppResp Term:1 Log:0/17 log-level none ---- diff --git a/testdata/slow_follower_after_compaction.txt b/testdata/slow_follower_after_compaction.txt index 2ce02ada..a033952d 100644 --- a/testdata/slow_follower_after_compaction.txt +++ b/testdata/slow_follower_after_compaction.txt @@ -81,15 +81,18 @@ status 1 ---- 1: StateReplicate match=18 next=19 2: StateReplicate match=18 next=19 -3: StateReplicate match=14 next=17 paused inflight=2[full] +3: StateReplicate match=14 next=19 inflight=1 # Break the MsgApp flow from the leader to node 3. deliver-msgs drop=3 ---- -dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[1/15 EntryNormal "prop_1_15"] -dropped: 1->3 MsgApp Term:1 Log:1/15 Commit:14 Entries:[1/16 EntryNormal "prop_1_16"] -dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:15 -dropped: 1->3 MsgApp Term:1 Log:1/16 Commit:16 +dropped: 1->3 MsgApp Term:1 Log:1/14 Commit:14 Entries:[ + 1/15 EntryNormal "prop_1_15" + 1/16 EntryNormal "prop_1_16" + 1/17 EntryNormal "prop_1_17" + 1/18 EntryNormal "prop_1_18" +] +dropped: 1->3 MsgApp Term:1 Log:1/18 Commit:18 # Truncate the leader's log beyond node 3 log size. compact 1 17 diff --git a/testdata/snapshot_succeed_via_app_resp.txt b/testdata/snapshot_succeed_via_app_resp.txt index 5c4b0c61..ee4c8706 100644 --- a/testdata/snapshot_succeed_via_app_resp.txt +++ b/testdata/snapshot_succeed_via_app_resp.txt @@ -86,9 +86,9 @@ stabilize 1 ---- > 1 receiving messages 3->1 MsgHeartbeatResp Term:1 Log:0/0 +> 1 handling Ready DEBUG 1 [firstindex: 12, commit: 11] sent snapshot[index: 11, term: 1] to 3 [StateProbe match=0 next=11] DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=11 paused pendingSnap=11] -> 1 handling Ready Ready MustSync=false: Messages: 1->3 MsgSnap Term:1 Log:0/0 diff --git a/testdata/snapshot_succeed_via_app_resp_behind.txt b/testdata/snapshot_succeed_via_app_resp_behind.txt index b0f5883b..beec45f3 100644 --- a/testdata/snapshot_succeed_via_app_resp_behind.txt +++ b/testdata/snapshot_succeed_via_app_resp_behind.txt @@ -123,9 +123,9 @@ stabilize 1 3->1 MsgAppResp Term:1 Log:1/10 Rejected (Hint: 5) DEBUG 1 received MsgAppResp(rejected, hint: (index 5, term 1)) from 3 for index 10 DEBUG 1 decreased progress of 3 to [StateProbe match=0 next=6] +> 1 handling Ready DEBUG 1 [firstindex: 11, commit: 12] sent snapshot[index: 12, term: 1] to 3 [StateProbe match=0 next=6] DEBUG 1 paused sending replication messages to 3 [StateSnapshot match=0 next=6 paused pendingSnap=12] -> 1 handling Ready Ready MustSync=false: Messages: 1->3 MsgSnap Term:1 Log:0/0