diff --git a/github.com/coreos/etcd/raft/log.go b/github.com/coreos/etcd/raft/log.go index c3036d3c90..a3be7d4867 100644 --- a/github.com/coreos/etcd/raft/log.go +++ b/github.com/coreos/etcd/raft/log.go @@ -38,17 +38,27 @@ type raftLog struct { applied uint64 logger Logger + + maxMsgSize uint64 } -// newLog returns log using the given storage. It recovers the log to the state -// that it just commits and applies the latest snapshot. +// newLog returns log using the given storage and default options. It +// recovers the log to the state that it just commits and applies the +// latest snapshot. func newLog(storage Storage, logger Logger) *raftLog { + return newLogWithSize(storage, logger, noLimit) +} + +// newLogWithSize returns a log using the given storage and max +// message size. +func newLogWithSize(storage Storage, logger Logger, maxMsgSize uint64) *raftLog { if storage == nil { log.Panic("storage must not be nil") } log := &raftLog{ - storage: storage, - logger: logger, + storage: storage, + logger: logger, + maxMsgSize: maxMsgSize, } firstIndex, err := storage.FirstIndex() if err != nil { @@ -139,7 +149,7 @@ func (l *raftLog) unstableEntries() []pb.Entry { func (l *raftLog) nextEnts() (ents []pb.Entry) { off := max(l.applied+1, l.firstIndex()) if l.committed+1 > off { - ents, err := l.slice(off, l.committed+1, noLimit) + ents, err := l.slice(off, l.committed+1, l.maxMsgSize) if err != nil { l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err) } diff --git a/github.com/coreos/etcd/raft/node.go b/github.com/coreos/etcd/raft/node.go index b24ba609f3..57a974fcfe 100644 --- a/github.com/coreos/etcd/raft/node.go +++ b/github.com/coreos/etcd/raft/node.go @@ -559,6 +559,15 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { } if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { rd.HardState = hardSt + // If we hit a size limit when loadaing CommittedEntries, clamp + // our HardState.Commit to what we're actually returning. This is + // also used as our cursor to resume for the next Ready batch. + if len(rd.CommittedEntries) > 0 { + lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1] + if rd.HardState.Commit > lastCommit.Index { + rd.HardState.Commit = lastCommit.Index + } + } } if r.raftLog.unstable.snapshot != nil { rd.Snapshot = *r.raftLog.unstable.snapshot diff --git a/github.com/coreos/etcd/raft/raft.go b/github.com/coreos/etcd/raft/raft.go index c18100e897..7de19f9489 100644 --- a/github.com/coreos/etcd/raft/raft.go +++ b/github.com/coreos/etcd/raft/raft.go @@ -302,7 +302,7 @@ func newRaft(c *Config) *raft { if err := c.validate(); err != nil { panic(err.Error()) } - raftlog := newLog(c.Storage, c.Logger) + raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxSizePerMsg) hs, cs, err := c.Storage.InitialState() if err != nil { panic(err) // TODO(bdarnell) @@ -441,22 +441,35 @@ func (r *raft) getProgress(id uint64) *Progress { return r.learnerPrs[id] } -// sendAppend sends RPC, with entries to the given peer. +// sendAppend sends an append RPC with new entries (if any) and the +// current commit index to the given peer. func (r *raft) sendAppend(to uint64) { + r.maybeSendAppend(to, true) +} + +// maybeSendAppend sends an append RPC with new entries to the given peer, +// if necessary. Returns true if a message was sent. The sendIfEmpty +// argument controls whether messages with no entries will be sent +// ("empty" messages are useful to convey updated Commit indexes, but +// are undesirable when we're sending multiple messages in a batch). +func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { pr := r.getProgress(to) if pr.IsPaused() { - return + return false } m := pb.Message{} m.To = to term, errt := r.raftLog.term(pr.Next - 1) ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize) + if len(ents) == 0 && !sendIfEmpty { + return false + } if errt != nil || erre != nil { // send snapshot if we failed to get term or entries if !pr.RecentActive { r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to) - return + return false } m.Type = pb.MsgSnap @@ -464,7 +477,7 @@ func (r *raft) sendAppend(to uint64) { if err != nil { if err == ErrSnapshotTemporarilyUnavailable { r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to) - return + return false } panic(err) // TODO(bdarnell) } @@ -498,6 +511,7 @@ func (r *raft) sendAppend(to uint64) { } } r.send(m) + return true } // sendHeartbeat sends an empty MsgApp @@ -1020,10 +1034,18 @@ func stepLeader(r *raft, m pb.Message) error { if r.maybeCommit() { r.bcastAppend() } else if oldPaused { - // update() reset the wait state on this node. If we had delayed sending - // an update before, send it now. + // If we were paused before, this node may be missing the + // latest commit index, so send it. r.sendAppend(m.From) } + // We've updated flow control information above, which may + // allow us to send multiple (size-limited) in-flight messages + // at once (such as when transitioning from probe to + // replicate, or when freeTo() covers multiple messages). If + // we have more entries to send, send as many messages as we + // can (without sending empty messages for the commit index) + for r.maybeSendAppend(m.From, false) { + } // Transfer leadership is in progress. if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() { r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)