diff --git a/raft/log.go b/raft/log.go index c3036d3c90dd..20bfc803eee4 100644 --- a/raft/log.go +++ b/raft/log.go @@ -38,17 +38,20 @@ type raftLog struct { applied uint64 logger Logger + + maxMsgSize uint64 } // newLog returns log using the given storage. It recovers the log to the state // that it just commits and applies the latest snapshot. -func newLog(storage Storage, logger Logger) *raftLog { +func newLog(storage Storage, logger Logger, maxMsgSize uint64) *raftLog { if storage == nil { log.Panic("storage must not be nil") } log := &raftLog{ - storage: storage, - logger: logger, + storage: storage, + logger: logger, + maxMsgSize: maxMsgSize, } firstIndex, err := storage.FirstIndex() if err != nil { @@ -139,7 +142,7 @@ func (l *raftLog) unstableEntries() []pb.Entry { func (l *raftLog) nextEnts() (ents []pb.Entry) { off := max(l.applied+1, l.firstIndex()) if l.committed+1 > off { - ents, err := l.slice(off, l.committed+1, noLimit) + ents, err := l.slice(off, l.committed+1, l.maxMsgSize) if err != nil { l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) } diff --git a/raft/log_test.go b/raft/log_test.go index 8fa60db84fd5..b6ce9cb86f1f 100644 --- a/raft/log_test.go +++ b/raft/log_test.go @@ -45,7 +45,7 @@ func TestFindConflict(t *testing.T) { } for i, tt := range tests { - raftLog := newLog(NewMemoryStorage(), raftLogger) + raftLog := newLog(NewMemoryStorage(), raftLogger, noLimit) raftLog.append(previousEnts...) gconflict := raftLog.findConflict(tt.ents) @@ -57,7 +57,7 @@ func TestFindConflict(t *testing.T) { func TestIsUpToDate(t *testing.T) { previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}} - raftLog := newLog(NewMemoryStorage(), raftLogger) + raftLog := newLog(NewMemoryStorage(), raftLogger, noLimit) raftLog.append(previousEnts...) tests := []struct { lastIndex uint64 @@ -125,7 +125,7 @@ func TestAppend(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.Append(previousEnts) - raftLog := newLog(storage, raftLogger) + raftLog := newLog(storage, raftLogger, noLimit) index := raftLog.append(tt.ents...) if index != tt.windex { @@ -236,7 +236,7 @@ func TestLogMaybeAppend(t *testing.T) { } for i, tt := range tests { - raftLog := newLog(NewMemoryStorage(), raftLogger) + raftLog := newLog(NewMemoryStorage(), raftLogger, noLimit) raftLog.append(previousEnts...) raftLog.committed = commit func() { @@ -284,7 +284,7 @@ func TestCompactionSideEffects(t *testing.T) { for i = 1; i <= unstableIndex; i++ { storage.Append([]pb.Entry{{Term: i, Index: i}}) } - raftLog := newLog(storage, raftLogger) + raftLog := newLog(storage, raftLogger, noLimit) for i = unstableIndex; i < lastIndex; i++ { raftLog.append(pb.Entry{Term: i + 1, Index: i + 1}) } @@ -358,7 +358,7 @@ func TestHasNextEnts(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.ApplySnapshot(snap) - raftLog := newLog(storage, raftLogger) + raftLog := newLog(storage, raftLogger, noLimit) raftLog.append(ents...) raftLog.maybeCommit(5, 1) raftLog.appliedTo(tt.applied) @@ -391,7 +391,7 @@ func TestNextEnts(t *testing.T) { for i, tt := range tests { storage := NewMemoryStorage() storage.ApplySnapshot(snap) - raftLog := newLog(storage, raftLogger) + raftLog := newLog(storage, raftLogger, noLimit) raftLog.append(ents...) raftLog.maybeCommit(5, 1) raftLog.appliedTo(tt.applied) @@ -421,7 +421,7 @@ func TestUnstableEnts(t *testing.T) { storage.Append(previousEnts[:tt.unstable-1]) // append unstable entries to raftlog - raftLog := newLog(storage, raftLogger) + raftLog := newLog(storage, raftLogger, noLimit) raftLog.append(previousEnts[tt.unstable-1:]...) ents := raftLog.unstableEntries() @@ -459,7 +459,7 @@ func TestCommitTo(t *testing.T) { } } }() - raftLog := newLog(NewMemoryStorage(), raftLogger) + raftLog := newLog(NewMemoryStorage(), raftLogger, noLimit) raftLog.append(previousEnts...) raftLog.committed = commit raftLog.commitTo(tt.commit) @@ -482,7 +482,7 @@ func TestStableTo(t *testing.T) { {3, 1, 1}, // bad index } for i, tt := range tests { - raftLog := newLog(NewMemoryStorage(), raftLogger) + raftLog := newLog(NewMemoryStorage(), raftLogger, noLimit) raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...) raftLog.stableTo(tt.stablei, tt.stablet) if raftLog.unstable.offset != tt.wunstable { @@ -519,7 +519,7 @@ func TestStableToWithSnap(t *testing.T) { for i, tt := range tests { s := NewMemoryStorage() s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}}) - raftLog := newLog(s, raftLogger) + raftLog := newLog(s, raftLogger, noLimit) raftLog.append(tt.newEnts...) raftLog.stableTo(tt.stablei, tt.stablet) if raftLog.unstable.offset != tt.wunstable { @@ -557,7 +557,7 @@ func TestCompaction(t *testing.T) { for i := uint64(1); i <= tt.lastIndex; i++ { storage.Append([]pb.Entry{{Index: i}}) } - raftLog := newLog(storage, raftLogger) + raftLog := newLog(storage, raftLogger, noLimit) raftLog.maybeCommit(tt.lastIndex, 0) raftLog.appliedTo(raftLog.committed) @@ -583,7 +583,7 @@ func TestLogRestore(t *testing.T) { snap := pb.SnapshotMetadata{Index: index, Term: term} storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: snap}) - raftLog := newLog(storage, raftLogger) + raftLog := newLog(storage, raftLogger, noLimit) if len(raftLog.allEntries()) != 0 { t.Errorf("len = %d, want 0", len(raftLog.allEntries())) @@ -607,7 +607,7 @@ func TestIsOutOfBounds(t *testing.T) { num := uint64(100) storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}}) - l := newLog(storage, raftLogger) + l := newLog(storage, raftLogger, noLimit) for i := uint64(1); i <= num; i++ { l.append(pb.Entry{Index: i + offset}) } @@ -690,7 +690,7 @@ func TestTerm(t *testing.T) { storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}}) - l := newLog(storage, raftLogger) + l := newLog(storage, raftLogger, noLimit) for i = 1; i < num; i++ { l.append(pb.Entry{Index: offset + i, Term: i}) } @@ -720,7 +720,7 @@ func TestTermWithUnstableSnapshot(t *testing.T) { storage := NewMemoryStorage() storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: storagesnapi, Term: 1}}) - l := newLog(storage, raftLogger) + l := newLog(storage, raftLogger, noLimit) l.restore(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}}) tests := []struct { @@ -757,7 +757,7 @@ func TestSlice(t *testing.T) { for i = 1; i < num/2; i++ { storage.Append([]pb.Entry{{Index: offset + i, Term: offset + i}}) } - l := newLog(storage, raftLogger) + l := newLog(storage, raftLogger, noLimit) for i = num / 2; i < num; i++ { l.append(pb.Entry{Index: offset + i, Term: offset + i}) } diff --git a/raft/node.go b/raft/node.go index b24ba609f388..57a974fcfe6a 100644 --- a/raft/node.go +++ b/raft/node.go @@ -559,6 +559,15 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { } if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { rd.HardState = hardSt + // If we hit a size limit when loadaing CommittedEntries, clamp + // our HardState.Commit to what we're actually returning. This is + // also used as our cursor to resume for the next Ready batch. + if len(rd.CommittedEntries) > 0 { + lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1] + if rd.HardState.Commit > lastCommit.Index { + rd.HardState.Commit = lastCommit.Index + } + } } if r.raftLog.unstable.snapshot != nil { rd.Snapshot = *r.raftLog.unstable.snapshot diff --git a/raft/node_test.go b/raft/node_test.go index 4820dd7a5ca0..e2002208f8de 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -26,6 +26,19 @@ import ( "github.com/coreos/etcd/raft/raftpb" ) +// readyWithTimeout selects from n.Ready() with a 1-second timeout. It +// panics on timeout, which is better than the indefinite wait that +// would occur if this channel were read without being wrapped in a +// select. +func readyWithTimeout(n Node) Ready { + select { + case rd := <-n.Ready(): + return rd + case <-time.After(time.Second): + panic("timed out waiting for ready") + } +} + // TestNodeStep ensures that node.Step sends msgProp to propc chan // and other kinds of messages to recvc chan. func TestNodeStep(t *testing.T) { @@ -875,3 +888,41 @@ func TestAppendPagination(t *testing.T) { t.Error("didn't see any messages more than half the max size; something is wrong with this test") } } + +func TestCommitPagination(t *testing.T) { + s := NewMemoryStorage() + cfg := newTestConfig(1, []uint64{1}, 10, 1, s) + cfg.MaxSizePerMsg = 2048 + r := newRaft(cfg) + n := newNode() + go n.run(r) + n.Campaign(context.TODO()) + + rd := readyWithTimeout(&n) + if len(rd.CommittedEntries) != 1 { + t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + n.Advance() + + blob := []byte(strings.Repeat("a", 1000)) + for i := 0; i < 3; i++ { + if err := n.Propose(context.TODO(), blob); err != nil { + t.Fatal(err) + } + } + + // The 3 proposals will commit in two batches. + rd = readyWithTimeout(&n) + if len(rd.CommittedEntries) != 2 { + t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + n.Advance() + rd = readyWithTimeout(&n) + if len(rd.CommittedEntries) != 1 { + t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries)) + } + s.Append(rd.Entries) + n.Advance() +} diff --git a/raft/raft.go b/raft/raft.go index 0c8c96c3f802..e3da45ce6eda 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -301,7 +301,7 @@ func newRaft(c *Config) *raft { if err := c.validate(); err != nil { panic(err.Error()) } - raftlog := newLog(c.Storage, c.Logger) + raftlog := newLog(c.Storage, c.Logger, c.MaxSizePerMsg) hs, cs, err := c.Storage.InitialState() if err != nil { panic(err) // TODO(bdarnell) diff --git a/raft/raft_test.go b/raft/raft_test.go index 0c95340e7c62..eadd3c752d10 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -853,7 +853,7 @@ func TestDuelingCandidates(t *testing.T) { }{ {a, StateFollower, 2, wlog}, {b, StateFollower, 2, wlog}, - {c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)}, + {c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger, noLimit)}, } for i, tt := range tests { @@ -923,7 +923,7 @@ func TestDuelingPreCandidates(t *testing.T) { }{ {a, StateLeader, 1, wlog}, {b, StateFollower, 1, wlog}, - {c, StateFollower, 1, newLog(NewMemoryStorage(), raftLogger)}, + {c, StateFollower, 1, newLog(NewMemoryStorage(), raftLogger, noLimit)}, } for i, tt := range tests { @@ -1079,7 +1079,7 @@ func TestProposal(t *testing.T) { send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}}) - wantLog := newLog(NewMemoryStorage(), raftLogger) + wantLog := newLog(NewMemoryStorage(), raftLogger, noLimit) if tt.success { wantLog = &raftLog{ storage: &MemoryStorage{