From ea5ff8a0ffa85fdd5e46a501a156ba364f23e4fd Mon Sep 17 00:00:00 2001 From: Tobias Schottdorf Date: Fri, 19 Oct 2018 20:09:47 +0200 Subject: [PATCH] raft: fix bug in unbounded log growth prevention mechanism The previous code was using the proto-generated `Size()` method to track the size of an incoming proposal at the leader. This includes the Index and Term, which were mutated after the call to `Size()` when appending to the log. Additionally, it was not taking into account that an ignored configuration change would ignore the original proposal and append an empty entry instead. As a result, a fully committed Raft group could end up with a non- zero tracked uncommitted Raft log counter that would eventually hit the ceiling and drop all future proposals indiscriminately. It would also immediately imply that proposals exceeding the threshold alone would get refused (as the "first uncommitted proposal" gets special treatment and is always allowed in). Track only the size of the payload actually appended to the Raft log instead. For context, see: https://github.com/cockroachdb/cockroach/issues/31618#issuecomment-431374938 --- raft/node_test.go | 2 +- raft/raft.go | 14 ++++++++------ raft/raft_test.go | 14 ++++++++++++-- raft/rawnode_test.go | 2 +- raft/util.go | 6 ++++++ 5 files changed, 28 insertions(+), 10 deletions(-) diff --git a/raft/node_test.go b/raft/node_test.go index a729068bfc21..e977da6d6e1f 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -1006,7 +1006,7 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) { const maxEntries = 16 data := []byte("testdata") testEntry := raftpb.Entry{Data: data} - maxEntrySize := uint64(maxEntries * testEntry.Size()) + maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) diff --git a/raft/raft.go b/raft/raft.go index bf0a8983c462..41d09d7d6c87 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -970,10 +970,6 @@ func stepLeader(r *raft, m pb.Message) error { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) return ErrProposalDropped } - if !r.increaseUncommittedSize(m.Entries) { - r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id) - return ErrProposalDropped - } for i, e := range m.Entries { if e.Type == pb.EntryConfChange { @@ -986,6 +982,12 @@ func stepLeader(r *raft, m pb.Message) error { } } } + // Track the size of this uncommitted proposal. Note that the payload + // was potentially mutated above, so ordering matters. + if !r.increaseUncommittedSize(m.Entries) { + r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id) + return ErrProposalDropped + } r.appendEntry(m.Entries...) r.bcastAppend() return nil @@ -1490,7 +1492,7 @@ func (r *raft) abortLeaderTransfer() { func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { var s uint64 for _, e := range ents { - s += uint64(e.Size()) + s += uint64(PayloadSize(e)) } if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize { @@ -1513,7 +1515,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) { var s uint64 for _, e := range ents { - s += uint64(e.Size()) + s += uint64(PayloadSize(e)) } if s > r.uncommittedSize { // uncommittedSize may underestimate the size of the uncommitted Raft diff --git a/raft/raft_test.go b/raft/raft_test.go index cac4bb6c2caa..6c3abe5f5659 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -363,12 +363,19 @@ func TestProgressFlowControl(t *testing.T) { } func TestUncommittedEntryLimit(t *testing.T) { - const maxEntries = 16 + // Use a relatively large number of entries here to prevent regression of a + // bug in which we'd use the Size() instead of the PayloadSize(). Size() + // changes when the Index and Term are assigned before putting the proposal + // in the log. This test would fail with the bug, either because we'd get + // dropped proposals earlier than we expect them, or because the final tally + // ends up nonzero. (At the time of writing, the former). + const maxEntries = 1024 testEntry := pb.Entry{Data: []byte("testdata")} - maxEntrySize := maxEntries * testEntry.Size() + maxEntrySize := maxEntries * PayloadSize(testEntry) cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage()) cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize) + cfg.MaxInflightMsgs = 2*1024 // avoid interference r := newRaft(cfg) r.becomeCandidate() r.becomeLeader() @@ -425,6 +432,9 @@ func TestUncommittedEntryLimit(t *testing.T) { t.Fatalf("expected %d messages, got %d", e, len(ms)) } r.reduceUncommittedSize(propEnts) + if n := r.uncommittedSize; n != 0 { + t.Fatalf("expected zero uncommitted size, got %d", n) + } } func TestLeaderElection(t *testing.T) { diff --git a/raft/rawnode_test.go b/raft/rawnode_test.go index 3e56733aa425..6348bb7e3248 100644 --- a/raft/rawnode_test.go +++ b/raft/rawnode_test.go @@ -493,7 +493,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) { const maxEntries = 16 data := []byte("testdata") testEntry := raftpb.Entry{Data: data} - maxEntrySize := uint64(maxEntries * testEntry.Size()) + maxEntrySize := uint64(maxEntries * PayloadSize(testEntry)) s := NewMemoryStorage() cfg := newTestConfig(1, []uint64{1}, 10, 1, s) diff --git a/raft/util.go b/raft/util.go index 1a7a1e9ac3a2..79eaa0c626f8 100644 --- a/raft/util.go +++ b/raft/util.go @@ -101,6 +101,12 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string { return buf.String() } +// PayloadSize is the size of the payload of this Entry. Notably, it does not +// depend on its Index or Term. +func PayloadSize(e pb.Entry) int { + return len(e.Data) +} + // DescribeEntry returns a concise human-readable description of an // Entry for debugging. func DescribeEntry(e pb.Entry, f EntryFormatter) string {