Skip to content

Commit

Permalink
raft: Introduce CommittedEntries pagination
Browse files Browse the repository at this point in the history
The MaxSizePerMsg setting is now used to limit the size of
Ready.CommittedEntries. This prevents out-of-memory errors if the raft
log has become very large and commits all at once.
  • Loading branch information
bdarnell committed Aug 6, 2018
1 parent bc14dee commit 84c537d
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 25 deletions.
11 changes: 7 additions & 4 deletions raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,20 @@ type raftLog struct {
applied uint64

logger Logger

maxMsgSize uint64
}

// newLog returns log using the given storage. It recovers the log to the state
// that it just commits and applies the latest snapshot.
func newLog(storage Storage, logger Logger) *raftLog {
func newLog(storage Storage, logger Logger, maxMsgSize uint64) *raftLog {
if storage == nil {
log.Panic("storage must not be nil")
}
log := &raftLog{
storage: storage,
logger: logger,
storage: storage,
logger: logger,
maxMsgSize: maxMsgSize,
}
firstIndex, err := storage.FirstIndex()
if err != nil {
Expand Down Expand Up @@ -139,7 +142,7 @@ func (l *raftLog) unstableEntries() []pb.Entry {
func (l *raftLog) nextEnts() (ents []pb.Entry) {
off := max(l.applied+1, l.firstIndex())
if l.committed+1 > off {
ents, err := l.slice(off, l.committed+1, noLimit)
ents, err := l.slice(off, l.committed+1, l.maxMsgSize)
if err != nil {
l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
}
Expand Down
34 changes: 17 additions & 17 deletions raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestFindConflict(t *testing.T) {
}

for i, tt := range tests {
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog := newLog(NewMemoryStorage(), raftLogger, noLimit)
raftLog.append(previousEnts...)

gconflict := raftLog.findConflict(tt.ents)
Expand All @@ -57,7 +57,7 @@ func TestFindConflict(t *testing.T) {

func TestIsUpToDate(t *testing.T) {
previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog := newLog(NewMemoryStorage(), raftLogger, noLimit)
raftLog.append(previousEnts...)
tests := []struct {
lastIndex uint64
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestAppend(t *testing.T) {
for i, tt := range tests {
storage := NewMemoryStorage()
storage.Append(previousEnts)
raftLog := newLog(storage, raftLogger)
raftLog := newLog(storage, raftLogger, noLimit)

index := raftLog.append(tt.ents...)
if index != tt.windex {
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestLogMaybeAppend(t *testing.T) {
}

for i, tt := range tests {
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog := newLog(NewMemoryStorage(), raftLogger, noLimit)
raftLog.append(previousEnts...)
raftLog.committed = commit
func() {
Expand Down Expand Up @@ -284,7 +284,7 @@ func TestCompactionSideEffects(t *testing.T) {
for i = 1; i <= unstableIndex; i++ {
storage.Append([]pb.Entry{{Term: i, Index: i}})
}
raftLog := newLog(storage, raftLogger)
raftLog := newLog(storage, raftLogger, noLimit)
for i = unstableIndex; i < lastIndex; i++ {
raftLog.append(pb.Entry{Term: i + 1, Index: i + 1})
}
Expand Down Expand Up @@ -358,7 +358,7 @@ func TestHasNextEnts(t *testing.T) {
for i, tt := range tests {
storage := NewMemoryStorage()
storage.ApplySnapshot(snap)
raftLog := newLog(storage, raftLogger)
raftLog := newLog(storage, raftLogger, noLimit)
raftLog.append(ents...)
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied)
Expand Down Expand Up @@ -391,7 +391,7 @@ func TestNextEnts(t *testing.T) {
for i, tt := range tests {
storage := NewMemoryStorage()
storage.ApplySnapshot(snap)
raftLog := newLog(storage, raftLogger)
raftLog := newLog(storage, raftLogger, noLimit)
raftLog.append(ents...)
raftLog.maybeCommit(5, 1)
raftLog.appliedTo(tt.applied)
Expand Down Expand Up @@ -421,7 +421,7 @@ func TestUnstableEnts(t *testing.T) {
storage.Append(previousEnts[:tt.unstable-1])

// append unstable entries to raftlog
raftLog := newLog(storage, raftLogger)
raftLog := newLog(storage, raftLogger, noLimit)
raftLog.append(previousEnts[tt.unstable-1:]...)

ents := raftLog.unstableEntries()
Expand Down Expand Up @@ -459,7 +459,7 @@ func TestCommitTo(t *testing.T) {
}
}
}()
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog := newLog(NewMemoryStorage(), raftLogger, noLimit)
raftLog.append(previousEnts...)
raftLog.committed = commit
raftLog.commitTo(tt.commit)
Expand All @@ -482,7 +482,7 @@ func TestStableTo(t *testing.T) {
{3, 1, 1}, // bad index
}
for i, tt := range tests {
raftLog := newLog(NewMemoryStorage(), raftLogger)
raftLog := newLog(NewMemoryStorage(), raftLogger, noLimit)
raftLog.append([]pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}}...)
raftLog.stableTo(tt.stablei, tt.stablet)
if raftLog.unstable.offset != tt.wunstable {
Expand Down Expand Up @@ -519,7 +519,7 @@ func TestStableToWithSnap(t *testing.T) {
for i, tt := range tests {
s := NewMemoryStorage()
s.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: snapi, Term: snapt}})
raftLog := newLog(s, raftLogger)
raftLog := newLog(s, raftLogger, noLimit)
raftLog.append(tt.newEnts...)
raftLog.stableTo(tt.stablei, tt.stablet)
if raftLog.unstable.offset != tt.wunstable {
Expand Down Expand Up @@ -557,7 +557,7 @@ func TestCompaction(t *testing.T) {
for i := uint64(1); i <= tt.lastIndex; i++ {
storage.Append([]pb.Entry{{Index: i}})
}
raftLog := newLog(storage, raftLogger)
raftLog := newLog(storage, raftLogger, noLimit)
raftLog.maybeCommit(tt.lastIndex, 0)
raftLog.appliedTo(raftLog.committed)

Expand All @@ -583,7 +583,7 @@ func TestLogRestore(t *testing.T) {
snap := pb.SnapshotMetadata{Index: index, Term: term}
storage := NewMemoryStorage()
storage.ApplySnapshot(pb.Snapshot{Metadata: snap})
raftLog := newLog(storage, raftLogger)
raftLog := newLog(storage, raftLogger, noLimit)

if len(raftLog.allEntries()) != 0 {
t.Errorf("len = %d, want 0", len(raftLog.allEntries()))
Expand All @@ -607,7 +607,7 @@ func TestIsOutOfBounds(t *testing.T) {
num := uint64(100)
storage := NewMemoryStorage()
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset}})
l := newLog(storage, raftLogger)
l := newLog(storage, raftLogger, noLimit)
for i := uint64(1); i <= num; i++ {
l.append(pb.Entry{Index: i + offset})
}
Expand Down Expand Up @@ -690,7 +690,7 @@ func TestTerm(t *testing.T) {

storage := NewMemoryStorage()
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: offset, Term: 1}})
l := newLog(storage, raftLogger)
l := newLog(storage, raftLogger, noLimit)
for i = 1; i < num; i++ {
l.append(pb.Entry{Index: offset + i, Term: i})
}
Expand Down Expand Up @@ -720,7 +720,7 @@ func TestTermWithUnstableSnapshot(t *testing.T) {

storage := NewMemoryStorage()
storage.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: storagesnapi, Term: 1}})
l := newLog(storage, raftLogger)
l := newLog(storage, raftLogger, noLimit)
l.restore(pb.Snapshot{Metadata: pb.SnapshotMetadata{Index: unstablesnapi, Term: 1}})

tests := []struct {
Expand Down Expand Up @@ -757,7 +757,7 @@ func TestSlice(t *testing.T) {
for i = 1; i < num/2; i++ {
storage.Append([]pb.Entry{{Index: offset + i, Term: offset + i}})
}
l := newLog(storage, raftLogger)
l := newLog(storage, raftLogger, noLimit)
for i = num / 2; i < num; i++ {
l.append(pb.Entry{Index: offset + i, Term: offset + i})
}
Expand Down
9 changes: 9 additions & 0 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,15 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
}
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
// If we hit a size limit when loadaing CommittedEntries, clamp
// our HardState.Commit to what we're actually returning. This is
// also used as our cursor to resume for the next Ready batch.
if len(rd.CommittedEntries) > 0 {
lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1]
if rd.HardState.Commit > lastCommit.Index {
rd.HardState.Commit = lastCommit.Index
}
}
}
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
Expand Down
51 changes: 51 additions & 0 deletions raft/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ import (
"github.com/coreos/etcd/raft/raftpb"
)

// readyWithTimeout selects from n.Ready() with a 1-second timeout. It
// panics on timeout, which is better than the indefinite wait that
// would occur if this channel were read without being wrapped in a
// select.
func readyWithTimeout(n Node) Ready {
select {
case rd := <-n.Ready():
return rd
case <-time.After(time.Second):
panic("timed out waiting for ready")
}
}

// TestNodeStep ensures that node.Step sends msgProp to propc chan
// and other kinds of messages to recvc chan.
func TestNodeStep(t *testing.T) {
Expand Down Expand Up @@ -875,3 +888,41 @@ func TestAppendPagination(t *testing.T) {
t.Error("didn't see any messages more than half the max size; something is wrong with this test")
}
}

func TestCommitPagination(t *testing.T) {
s := NewMemoryStorage()
cfg := newTestConfig(1, []uint64{1}, 10, 1, s)
cfg.MaxSizePerMsg = 2048
r := newRaft(cfg)
n := newNode()
go n.run(r)
n.Campaign(context.TODO())

rd := readyWithTimeout(&n)
if len(rd.CommittedEntries) != 1 {
t.Fatalf("expected 1 (empty) entry, got %d", len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()

blob := []byte(strings.Repeat("a", 1000))
for i := 0; i < 3; i++ {
if err := n.Propose(context.TODO(), blob); err != nil {
t.Fatal(err)
}
}

// The 3 proposals will commit in two batches.
rd = readyWithTimeout(&n)
if len(rd.CommittedEntries) != 2 {
t.Fatalf("expected 2 entries in first batch, got %d", len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()
rd = readyWithTimeout(&n)
if len(rd.CommittedEntries) != 1 {
t.Fatalf("expected 1 entry in second batch, got %d", len(rd.CommittedEntries))
}
s.Append(rd.Entries)
n.Advance()
}
2 changes: 1 addition & 1 deletion raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func newRaft(c *Config) *raft {
if err := c.validate(); err != nil {
panic(err.Error())
}
raftlog := newLog(c.Storage, c.Logger)
raftlog := newLog(c.Storage, c.Logger, c.MaxSizePerMsg)
hs, cs, err := c.Storage.InitialState()
if err != nil {
panic(err) // TODO(bdarnell)
Expand Down
6 changes: 3 additions & 3 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ func TestDuelingCandidates(t *testing.T) {
}{
{a, StateFollower, 2, wlog},
{b, StateFollower, 2, wlog},
{c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger)},
{c, StateFollower, 2, newLog(NewMemoryStorage(), raftLogger, noLimit)},
}

for i, tt := range tests {
Expand Down Expand Up @@ -923,7 +923,7 @@ func TestDuelingPreCandidates(t *testing.T) {
}{
{a, StateLeader, 1, wlog},
{b, StateFollower, 1, wlog},
{c, StateFollower, 1, newLog(NewMemoryStorage(), raftLogger)},
{c, StateFollower, 1, newLog(NewMemoryStorage(), raftLogger, noLimit)},
}

for i, tt := range tests {
Expand Down Expand Up @@ -1079,7 +1079,7 @@ func TestProposal(t *testing.T) {
send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})

wantLog := newLog(NewMemoryStorage(), raftLogger)
wantLog := newLog(NewMemoryStorage(), raftLogger, noLimit)
if tt.success {
wantLog = &raftLog{
storage: &MemoryStorage{
Expand Down

0 comments on commit 84c537d

Please sign in to comment.