From 7a8ab37bfd04970f0597ae8ef59d6737e5aafc8a Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Tue, 4 Sep 2018 14:52:23 +0200 Subject: [PATCH] raft: fix correctness bug in CommittedEntries pagination In #9982, a mechanism to limit the size of `CommittedEntries` was introduced. The way this mechanism worked was that it would load applicable entries (passing the max size hint) and would emit a `HardState` whose commit index was truncated to match the limitation applied to the entries. Unfortunately, this was subtly incorrect when the user-provided `Entries` implementation didn't exactly match what Raft uses internally. Depending on whether a `Node` or a `RawNode` was used, this would either lead to regressing the HardState's commit index or outright forgetting to apply entries, respectively. Asking implementers to precisely match the Raft size limitation semantics was considered but looks like a bad idea as it puts correctness squarely in the hands of downstream users. Instead, this PR removes the truncation of `HardState` when limiting is active and tracks the applied index separately. This removes the old paradigm (that the previous code tried to work around) that the client will always apply all the way to the commit index, which isn't true when commit entries are paginated. See [1] for more on the discovery of this bug (CockroachDB's implementation of `Entries` returns one more entry than Raft's when the size limit hits). [1]: https://github.com/cockroachdb/cockroach/issues/28918#issuecomment-418174448 --- raft/node.go | 31 +++++++++++------ raft/node_test.go | 71 +++++++++++++++++++++++++++++++++++++++ raft/rawnode.go | 19 +++++------ raft/rawnode_test.go | 79 ++++++++++++++++++++++++++++++++++++++++++++ raft/util.go | 10 ++++++ 5 files changed, 188 insertions(+), 22 deletions(-) diff --git a/raft/node.go b/raft/node.go index 7c5f329e45f..b4e2e535545 100644 --- a/raft/node.go +++ b/raft/node.go @@ -109,6 +109,19 @@ func (rd Ready) containsUpdates() bool { len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0 } +// appliedCursor extracts from the Ready the highest index the client has +// applied (once the Ready is confirmed via Advance). If no information is +// contained in the Ready, returns zero. +func (rd Ready) appliedCursor() uint64 { + if n := len(rd.CommittedEntries); n > 0 { + return rd.CommittedEntries[n-1].Index + } + if index := rd.Snapshot.Metadata.Index; index > 0 { + return index + } + return 0 +} + // Node represents a node in a raft cluster. type Node interface { // Tick increments the internal logical clock for the Node by a single tick. Election @@ -282,6 +295,7 @@ func (n *node) run(r *raft) { var prevLastUnstablei, prevLastUnstablet uint64 var havePrevLastUnstablei bool var prevSnapi uint64 + var applyingToI uint64 var rd Ready lead := None @@ -381,13 +395,17 @@ func (n *node) run(r *raft) { if !IsEmptySnap(rd.Snapshot) { prevSnapi = rd.Snapshot.Metadata.Index } + if index := rd.appliedCursor(); index != 0 { + applyingToI = index + } r.msgs = nil r.readStates = nil advancec = n.advancec case <-advancec: - if prevHardSt.Commit != 0 { - r.raftLog.appliedTo(prevHardSt.Commit) + if applyingToI != 0 { + r.raftLog.appliedTo(applyingToI) + applyingToI = 0 } if havePrevLastUnstablei { r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) @@ -559,15 +577,6 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { } if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { rd.HardState = hardSt - // If we hit a size limit when loadaing CommittedEntries, clamp - // our HardState.Commit to what we're actually returning. This is - // also used as our cursor to resume for the next Ready batch. - if len(rd.CommittedEntries) > 0 { - lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1] - if rd.HardState.Commit > lastCommit.Index { - rd.HardState.Commit = lastCommit.Index - } - } } if r.raftLog.unstable.snapshot != nil { rd.Snapshot = *r.raftLog.unstable.snapshot diff --git a/raft/node_test.go b/raft/node_test.go index 1a6501cb8eb..b067aaad4d1 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -17,6 +17,8 @@ package raft import ( "bytes" "context" + "fmt" + "math" "reflect" "strings" "testing" @@ -926,3 +928,72 @@ func TestCommitPagination(t *testing.T) { s.Append(rd.Entries) n.Advance() } + +type ignoreSizeHintMemStorage struct { + *MemoryStorage +} + +func (s *ignoreSizeHintMemStorage) Entries(lo, hi uint64, maxSize uint64) ([]raftpb.Entry, error) { + return s.MemoryStorage.Entries(lo, hi, math.MaxUint64) +} + +// TestNodeCommitPaginationAfterRestart regression tests a scenario in which the +// Storage's Entries size limitation is slightly more permissive than Raft's +// internal one. The original bug was the following: +// +// - node learns that index 11 (or 100, doesn't matter) is committed +// - nextEnts returns index 1..10 in CommittedEntries due to size limiting. However, +// index 10 already exceeds maxBytes, due to a user-provided impl of Entries. +// - Commit index gets bumped to 10 +// - the node persists the HardState, but crashes before applying the entries +// - upon restart, the storage returns the same entries, but `slice` takes a different code path +// (since it is now called with an upper bound of 10) and removes the last entry. +// - Raft emits a HardState with a regressing commit index. +// +// A simpler version of this test would have the storage return a lot less entries than dictated +// by maxSize (for example, exactly one entry) after the restart, resulting in a larger regression. +// This wouldn't need to exploit anything about Raft-internal code paths to fail. +func TestNodeCommitPaginationAfterRestart(t *testing.T) { + s := &ignoreSizeHintMemStorage{ + MemoryStorage: NewMemoryStorage(), + } + persistedHardState := raftpb.HardState{ + Term: 1, + Vote: 1, + Commit: 10, + } + + s.hardState = persistedHardState + s.ents = make([]raftpb.Entry, 10) + var size uint64 + for i := range s.ents { + ent := raftpb.Entry{ + Term: 1, + Index: uint64(i + 1), + Type: raftpb.EntryNormal, + Data: []byte("a"), + } + + s.ents[i] = ent + size += uint64(ent.Size()) + } + + cfg := newTestConfig(1, []uint64{1}, 10, 1, s) + // Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should + // not be included in the initial rd.CommittedEntries. However, our storage will ignore + // this and *will* return it (which is how the Commit index ended up being 10 initially). + cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1 + + r := newRaft(cfg) + n := newNode() + go n.run(r) + defer n.Stop() + + rd := readyWithTimeout(&n) + if !IsEmptyHardState(rd.HardState) && rd.HardState.Commit < persistedHardState.Commit { + t.Errorf("HardState regressed: Commit %d -> %d\nCommitting:\n%+v", + persistedHardState.Commit, rd.HardState.Commit, + DescribeEntries(rd.CommittedEntries, func(data []byte) string { return fmt.Sprintf("%q", data) }), + ) + } +} diff --git a/raft/rawnode.go b/raft/rawnode.go index a4cecfc8f7f..5f8a116dd63 100644 --- a/raft/rawnode.go +++ b/raft/rawnode.go @@ -47,18 +47,15 @@ func (rn *RawNode) commitReady(rd Ready) { if !IsEmptyHardState(rd.HardState) { rn.prevHardSt = rd.HardState } - if rn.prevHardSt.Commit != 0 { - // In most cases, prevHardSt and rd.HardState will be the same - // because when there are new entries to apply we just sent a - // HardState with an updated Commit value. However, on initial - // startup the two are different because we don't send a HardState - // until something changes, but we do send any un-applied but - // committed entries (and previously-committed entries may be - // incorporated into the snapshot, even if rd.CommittedEntries is - // empty). Therefore we mark all committed entries as applied - // whether they were included in rd.HardState or not. - rn.raft.raftLog.appliedTo(rn.prevHardSt.Commit) + + // If entries were applied (or a snapshot), update our cursor for + // the next Ready. Note that if the current HardState contains a + // new Commit index, this does not mean that we're also applying + // all of the new entries due to commit pagination by size. + if index := rd.appliedCursor(); index > 0 { + rn.raft.raftLog.appliedTo(index) } + if len(rd.Entries) > 0 { e := rd.Entries[len(rd.Entries)-1] rn.raft.raftLog.stableTo(e.Index, e.Term) diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 8b8ccf5d3cb..7e3841695cc 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -401,3 +401,82 @@ func TestRawNodeStatus(t *testing.T) { t.Errorf("expected status struct, got nil") } } + +// TestRawNodeCommitPaginationAfterRestart is the RawNode version of +// TestNodeCommitPaginationAfterRestart. The anomaly here was even worse as the +// Raft group would forget to apply entries: +// +// - node learns that index 11 is committed +// - nextEnts returns index 1..10 in CommittedEntries (but index 10 already +// exceeds maxBytes), which isn't noticed internally by Raft +// - Commit index gets bumped to 10 +// - the node persists the HardState, but crashes before applying the entries +// - upon restart, the storage returns the same entries, but `slice` takes a +// different code path and removes the last entry. +// - Raft does not emit a HardState, but when the app calls Advance(), it bumps +// its internal applied index cursor to 10 (when it should be 9) +// - the next Ready asks the app to apply index 11 (omitting index 10), losing a +// write. +func TestRawNodeCommitPaginationAfterRestart(t *testing.T) { + s := &ignoreSizeHintMemStorage{ + MemoryStorage: NewMemoryStorage(), + } + persistedHardState := raftpb.HardState{ + Term: 1, + Vote: 1, + Commit: 10, + } + + s.hardState = persistedHardState + s.ents = make([]raftpb.Entry, 10) + var size uint64 + for i := range s.ents { + ent := raftpb.Entry{ + Term: 1, + Index: uint64(i + 1), + Type: raftpb.EntryNormal, + Data: []byte("a"), + } + + s.ents[i] = ent + size += uint64(ent.Size()) + } + + cfg := newTestConfig(1, []uint64{1}, 10, 1, s) + // Set a MaxSizePerMsg that would suggest to Raft that the last committed entry should + // not be included in the initial rd.CommittedEntries. However, our storage will ignore + // this and *will* return it (which is how the Commit index ended up being 10 initially). + cfg.MaxSizePerMsg = size - uint64(s.ents[len(s.ents)-1].Size()) - 1 + + s.ents = append(s.ents, raftpb.Entry{ + Term: 1, + Index: uint64(11), + Type: raftpb.EntryNormal, + Data: []byte("boom"), + }) + + rawNode, err := NewRawNode(cfg, []Peer{{ID: 1}}) + if err != nil { + t.Fatal(err) + } + + for highestApplied := uint64(0); highestApplied != 11; { + rd := rawNode.Ready() + n := len(rd.CommittedEntries) + if n == 0 { + t.Fatalf("stopped applying entries at index %d", highestApplied) + } + if next := rd.CommittedEntries[0].Index; highestApplied != 0 && highestApplied+1 != next { + t.Fatalf("attempting to apply index %d after index %d, leaving a gap", next, highestApplied) + } + highestApplied = rd.CommittedEntries[n-1].Index + rawNode.Advance(rd) + rawNode.Step(raftpb.Message{ + Type: raftpb.MsgHeartbeat, + To: 1, + From: 1, // illegal, but we get away with it + Term: 1, + Commit: 11, + }) + } +} diff --git a/raft/util.go b/raft/util.go index d744927c4a7..1a7a1e9ac3a 100644 --- a/raft/util.go +++ b/raft/util.go @@ -113,6 +113,16 @@ func DescribeEntry(e pb.Entry, f EntryFormatter) string { return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted) } +// DescribeEntries calls DescribeEntry for each Entry, adding a newline to +// each. +func DescribeEntries(ents []pb.Entry, f EntryFormatter) string { + var buf bytes.Buffer + for _, e := range ents { + _, _ = buf.WriteString(DescribeEntry(e, f) + "\n") + } + return buf.String() +} + func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry { if len(ents) == 0 { return ents