Skip to content

Commit

Permalink
Merge pull request #10199 from tschottdorf/fix-max-uncommitted-size
Browse files Browse the repository at this point in the history
raft: fix bug in unbounded log growth prevention mechanism
  • Loading branch information
tbg authored Oct 22, 2018
2 parents a27a73e + ad49c8f commit b42b394
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 20 deletions.
2 changes: 1 addition & 1 deletion raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ func TestNodeBoundedLogGrowthWithPartition(t *testing.T) {
const maxEntries = 16
data := []byte("testdata")
testEntry := raftpb.Entry{Data: data}
maxEntrySize := uint64(maxEntries * testEntry.Size())
maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))

s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
Expand Down
36 changes: 27 additions & 9 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,17 +635,27 @@ func (r *raft) reset(term uint64) {
r.readOnly = newReadOnly(r.readOnly.option)
}

func (r *raft) appendEntry(es ...pb.Entry) {
func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
li := r.raftLog.lastIndex()
for i := range es {
es[i].Term = r.Term
es[i].Index = li + 1 + uint64(i)
}
// Track the size of this uncommitted proposal.
if !r.increaseUncommittedSize(es) {
r.logger.Debugf(
"%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
r.id,
)
// Drop the proposal.
return false
}
// use latest "last" index after truncate/append
li = r.raftLog.append(es...)
r.getProgress(r.id).maybeUpdate(li)
// Regardless of maybeCommit's return, our caller will call bcastAppend.
r.maybeCommit()
return true
}

// tickElection is run by followers and candidates after r.electionTimeout.
Expand Down Expand Up @@ -739,7 +749,16 @@ func (r *raft) becomeLeader() {
// could be expensive.
r.pendingConfIndex = r.raftLog.lastIndex()

r.appendEntry(pb.Entry{Data: nil})
emptyEnt := pb.Entry{Data: nil}
if !r.appendEntry(emptyEnt) {
// This won't happen because we just called reset() above.
r.logger.Panic("empty entry was dropped")
}
// As a special case, don't count the initial empty entry towards the
// uncommitted log quota. This is because we want to preserve the
// behavior of allowing one entry larger than quota if the current
// usage is zero.
r.reduceUncommittedSize([]pb.Entry{emptyEnt})
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

Expand Down Expand Up @@ -970,10 +989,6 @@ func stepLeader(r *raft, m pb.Message) error {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return ErrProposalDropped
}
if !r.increaseUncommittedSize(m.Entries) {
r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id)
return ErrProposalDropped
}

for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
Expand All @@ -986,7 +1001,10 @@ func stepLeader(r *raft, m pb.Message) error {
}
}
}
r.appendEntry(m.Entries...)

if !r.appendEntry(m.Entries...) {
return ErrProposalDropped
}
r.bcastAppend()
return nil
case pb.MsgReadIndex:
Expand Down Expand Up @@ -1490,7 +1508,7 @@ func (r *raft) abortLeaderTransfer() {
func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
var s uint64
for _, e := range ents {
s += uint64(e.Size())
s += uint64(PayloadSize(e))
}

if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
Expand All @@ -1513,7 +1531,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) {

var s uint64
for _, e := range ents {
s += uint64(e.Size())
s += uint64(PayloadSize(e))
}
if s > r.uncommittedSize {
// uncommittedSize may underestimate the size of the uncommitted Raft
Expand Down
2 changes: 1 addition & 1 deletion raft/raft_paper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestLeaderBcastBeat(t *testing.T) {
r.becomeCandidate()
r.becomeLeader()
for i := 0; i < 10; i++ {
r.appendEntry(pb.Entry{Index: uint64(i) + 1})
mustAppendEntry(r, pb.Entry{Index: uint64(i) + 1})
}

for i := 0; i < hi; i++ {
Expand Down
37 changes: 29 additions & 8 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func nextEnts(r *raft, s *MemoryStorage) (ents []pb.Entry) {
return ents
}

func mustAppendEntry(r *raft, ents ...pb.Entry) {
if !r.appendEntry(ents...) {
panic("entry unexpectedly dropped")
}
}

type stateMachine interface {
Step(m pb.Message) error
readMessages() []pb.Message
Expand Down Expand Up @@ -363,15 +369,24 @@ func TestProgressFlowControl(t *testing.T) {
}

func TestUncommittedEntryLimit(t *testing.T) {
const maxEntries = 16
// Use a relatively large number of entries here to prevent regression of a
// bug which computed the size before it was fixed. This test would fail
// with the bug, either because we'd get dropped proposals earlier than we
// expect them, or because the final tally ends up nonzero. (At the time of
// writing, the former).
const maxEntries = 1024
testEntry := pb.Entry{Data: []byte("testdata")}
maxEntrySize := maxEntries * testEntry.Size()
maxEntrySize := maxEntries * PayloadSize(testEntry)

cfg := newTestConfig(1, []uint64{1, 2, 3}, 5, 1, NewMemoryStorage())
cfg.MaxUncommittedEntriesSize = uint64(maxEntrySize)
cfg.MaxInflightMsgs = 2 * 1024 // avoid interference
r := newRaft(cfg)
r.becomeCandidate()
r.becomeLeader()
if n := r.uncommittedSize; n != 0 {
t.Fatalf("expected zero uncommitted size, got %d bytes", n)
}

// Set the two followers to the replicate state. Commit to tail of log.
const numFollowers = 2
Expand Down Expand Up @@ -401,6 +416,9 @@ func TestUncommittedEntryLimit(t *testing.T) {
t.Fatalf("expected %d messages, got %d", e, len(ms))
}
r.reduceUncommittedSize(propEnts)
if r.uncommittedSize != 0 {
t.Fatalf("committed everything, but still tracking %d", r.uncommittedSize)
}

// Send a single large proposal to r1. Should be accepted even though it
// pushes us above the limit because we were beneath it before the proposal.
Expand All @@ -425,6 +443,9 @@ func TestUncommittedEntryLimit(t *testing.T) {
t.Fatalf("expected %d messages, got %d", e, len(ms))
}
r.reduceUncommittedSize(propEnts)
if n := r.uncommittedSize; n != 0 {
t.Fatalf("expected zero uncommitted size, got %d", n)
}
}

func TestLeaderElection(t *testing.T) {
Expand Down Expand Up @@ -2585,7 +2606,7 @@ func TestBcastBeat(t *testing.T) {
sm.becomeCandidate()
sm.becomeLeader()
for i := 0; i < 10; i++ {
sm.appendEntry(pb.Entry{Index: uint64(i) + 1})
mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
}
// slow follower
sm.prs[2].Match, sm.prs[2].Next = 5, 6
Expand Down Expand Up @@ -2709,7 +2730,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
// we expect that raft will only send out one msgAPP on the first
// loop. After that, the follower is paused until a heartbeat response is
// received.
r.appendEntry(pb.Entry{Data: []byte("somedata")})
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
msg := r.readMessages()
if len(msg) != 1 {
Expand All @@ -2724,7 +2745,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
t.Errorf("paused = %v, want true", r.prs[2].Paused)
}
for j := 0; j < 10; j++ {
r.appendEntry(pb.Entry{Data: []byte("somedata")})
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
if l := len(r.readMessages()); l != 0 {
t.Errorf("len(msg) = %d, want %d", l, 0)
Expand Down Expand Up @@ -2771,7 +2792,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
r.prs[2].becomeReplicate()

for i := 0; i < 10; i++ {
r.appendEntry(pb.Entry{Data: []byte("somedata")})
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
msgs := r.readMessages()
if len(msgs) != 1 {
Expand All @@ -2788,7 +2809,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
r.prs[2].becomeSnapshot(10)

for i := 0; i < 10; i++ {
r.appendEntry(pb.Entry{Data: []byte("somedata")})
mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
r.sendAppend(2)
msgs := r.readMessages()
if len(msgs) != 0 {
Expand Down Expand Up @@ -3182,7 +3203,7 @@ func TestNewLeaderPendingConfig(t *testing.T) {
for i, tt := range tests {
r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
if tt.addEntry {
r.appendEntry(pb.Entry{Type: pb.EntryNormal})
mustAppendEntry(r, pb.Entry{Type: pb.EntryNormal})
}
r.becomeCandidate()
r.becomeLeader()
Expand Down
2 changes: 1 addition & 1 deletion raft/rawnode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,7 @@ func TestRawNodeBoundedLogGrowthWithPartition(t *testing.T) {
const maxEntries = 16
data := []byte("testdata")
testEntry := raftpb.Entry{Data: data}
maxEntrySize := uint64(maxEntries * testEntry.Size())
maxEntrySize := uint64(maxEntries * PayloadSize(testEntry))

s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
Expand Down
6 changes: 6 additions & 0 deletions raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ func DescribeMessage(m pb.Message, f EntryFormatter) string {
return buf.String()
}

// PayloadSize is the size of the payload of this Entry. Notably, it does not
// depend on its Index or Term.
func PayloadSize(e pb.Entry) int {
return len(e.Data)
}

// DescribeEntry returns a concise human-readable description of an
// Entry for debugging.
func DescribeEntry(e pb.Entry, f EntryFormatter) string {
Expand Down

0 comments on commit b42b394

Please sign in to comment.