diff --git a/go.etcd.io/etcd/raft/README.md b/go.etcd.io/etcd/raft/README.md index 0ddf3f48f8..a78e5f720b 100644 --- a/go.etcd.io/etcd/raft/README.md +++ b/go.etcd.io/etcd/raft/README.md @@ -41,6 +41,7 @@ This raft implementation also includes a few optional enhancements: - Writing to leader's disk in parallel - Internal proposal redirection from followers to leader - Automatic stepping down when the leader loses quorum +- Protection against unbounded log growth when quorum is lost ## Notable Users diff --git a/go.etcd.io/etcd/raft/doc.go b/go.etcd.io/etcd/raft/doc.go index 2c10c0f5dc..c30d88445f 100644 --- a/go.etcd.io/etcd/raft/doc.go +++ b/go.etcd.io/etcd/raft/doc.go @@ -87,7 +87,7 @@ large). Note: Marshalling messages is not thread-safe; it is important that you make sure that no new entries are persisted while marshalling. -The easiest way to achieve this is to serialise the messages directly inside +The easiest way to achieve this is to serialize the messages directly inside your main raft loop. 3. Apply Snapshot (if any) and CommittedEntries to the state machine. @@ -153,7 +153,7 @@ If the proposal is committed, data will appear in committed entries with type raftpb.EntryNormal. There is no guarantee that a proposed command will be committed; you may have to re-propose after a timeout. -To add or remove node in a cluster, build ConfChange struct 'cc' and call: +To add or remove a node in a cluster, build ConfChange struct 'cc' and call: n.ProposeConfChange(ctx, cc) @@ -260,7 +260,7 @@ stale log entries: 'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election protocol. When Config.PreVote is true, a pre-election is carried out first (using the same rules as a regular election), and no node increases its term - number unless the pre-election indicates that the campaigining node would win. + number unless the pre-election indicates that the campaigning node would win. This minimizes disruption when a partitioned node rejoins the cluster. 'MsgSnap' requests to install a snapshot message. When a node has just diff --git a/go.etcd.io/etcd/raft/node.go b/go.etcd.io/etcd/raft/node.go index f9053da092..f67628fd36 100644 --- a/go.etcd.io/etcd/raft/node.go +++ b/go.etcd.io/etcd/raft/node.go @@ -109,6 +109,19 @@ func (rd Ready) containsUpdates() bool { len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0 } +// appliedCursor extracts from the Ready the highest index the client has +// applied (once the Ready is confirmed via Advance). If no information is +// contained in the Ready, returns zero. +func (rd Ready) appliedCursor() uint64 { + if n := len(rd.CommittedEntries); n > 0 { + return rd.CommittedEntries[n-1].Index + } + if index := rd.Snapshot.Metadata.Index; index > 0 { + return index + } + return 0 +} + // Node represents a node in a raft cluster. type Node interface { // Tick increments the internal logical clock for the Node by a single tick. Election @@ -282,6 +295,7 @@ func (n *node) run(r *raft) { var prevLastUnstablei, prevLastUnstablet uint64 var havePrevLastUnstablei bool var prevSnapi uint64 + var applyingToI uint64 var rd Ready lead := None @@ -381,13 +395,18 @@ func (n *node) run(r *raft) { if !IsEmptySnap(rd.Snapshot) { prevSnapi = rd.Snapshot.Metadata.Index } + if index := rd.appliedCursor(); index != 0 { + applyingToI = index + } r.msgs = nil r.readStates = nil + r.reduceUncommittedSize(rd.CommittedEntries) advancec = n.advancec case <-advancec: - if prevHardSt.Commit != 0 { - r.raftLog.appliedTo(prevHardSt.Commit) + if applyingToI != 0 { + r.raftLog.appliedTo(applyingToI) + applyingToI = 0 } if havePrevLastUnstablei { r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet) @@ -559,15 +578,6 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready { } if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) { rd.HardState = hardSt - // If we hit a size limit when loadaing CommittedEntries, clamp - // our HardState.Commit to what we're actually returning. This is - // also used as our cursor to resume for the next Ready batch. - if len(rd.CommittedEntries) > 0 { - lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1] - if rd.HardState.Commit > lastCommit.Index { - rd.HardState.Commit = lastCommit.Index - } - } } if r.raftLog.unstable.snapshot != nil { rd.Snapshot = *r.raftLog.unstable.snapshot diff --git a/go.etcd.io/etcd/raft/raft.go b/go.etcd.io/etcd/raft/raft.go index 81bad3bec7..bf0a8983c4 100644 --- a/go.etcd.io/etcd/raft/raft.go +++ b/go.etcd.io/etcd/raft/raft.go @@ -148,12 +148,17 @@ type Config struct { // applied entries. This is a very application dependent configuration. Applied uint64 - // MaxSizePerMsg limits the max size of each append message. Smaller value - // lowers the raft recovery cost(initial probing and message lost during normal - // operation). On the other side, it might affect the throughput during normal - // replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per - // message. + // MaxSizePerMsg limits the max byte size of each append message. Smaller + // value lowers the raft recovery cost(initial probing and message lost + // during normal operation). On the other side, it might affect the + // throughput during normal replication. Note: math.MaxUint64 for unlimited, + // 0 for at most one entry per message. MaxSizePerMsg uint64 + // MaxUncommittedEntriesSize limits the aggregate byte size of the + // uncommitted entries that may be appended to a leader's log. Once this + // limit is exceeded, proposals will begin to return ErrProposalDropped + // errors. Note: 0 for no limit. + MaxUncommittedEntriesSize uint64 // MaxInflightMsgs limits the max number of in-flight append messages during // optimistic replication phase. The application transportation layer usually // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid @@ -215,6 +220,10 @@ func (c *Config) validate() error { return errors.New("storage cannot be nil") } + if c.MaxUncommittedEntriesSize == 0 { + c.MaxUncommittedEntriesSize = noLimit + } + if c.MaxInflightMsgs <= 0 { return errors.New("max inflight messages must be greater than 0") } @@ -241,11 +250,12 @@ type raft struct { // the log raftLog *raftLog - maxInflight int - maxMsgSize uint64 - prs map[uint64]*Progress - learnerPrs map[uint64]*Progress - matchBuf uint64Slice + maxMsgSize uint64 + maxUncommittedSize uint64 + maxInflight int + prs map[uint64]*Progress + learnerPrs map[uint64]*Progress + matchBuf uint64Slice state StateType @@ -268,6 +278,10 @@ type raft struct { // be proposed if the leader's applied index is greater than this // value. pendingConfIndex uint64 + // an estimate of the size of the uncommitted tail of the Raft log. Used to + // prevent unbounded log growth. Only maintained by the leader. Reset on + // term changes. + uncommittedSize uint64 readOnly *readOnly @@ -326,6 +340,7 @@ func newRaft(c *Config) *raft { raftLog: raftlog, maxMsgSize: c.MaxSizePerMsg, maxInflight: c.MaxInflightMsgs, + maxUncommittedSize: c.MaxUncommittedEntriesSize, prs: make(map[uint64]*Progress), learnerPrs: make(map[uint64]*Progress), electionTimeout: c.ElectionTick, @@ -514,7 +529,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool { return true } -// sendHeartbeat sends an empty MsgApp +// sendHeartbeat sends a heartbeat RPC to the given peer. func (r *raft) sendHeartbeat(to uint64, ctx []byte) { // Attach the commit as min(to.matched, r.committed). // When the leader sends out heartbeat message, @@ -616,6 +631,7 @@ func (r *raft) reset(term uint64) { }) r.pendingConfIndex = 0 + r.uncommittedSize = 0 r.readOnly = newReadOnly(r.readOnly.option) } @@ -954,6 +970,10 @@ func stepLeader(r *raft, m pb.Message) error { r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee) return ErrProposalDropped } + if !r.increaseUncommittedSize(m.Entries) { + r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id) + return ErrProposalDropped + } for i, e := range m.Entries { if e.Type == pb.EntryConfChange { @@ -1462,6 +1482,49 @@ func (r *raft) abortLeaderTransfer() { r.leadTransferee = None } +// increaseUncommittedSize computes the size of the proposed entries and +// determines whether they would push leader over its maxUncommittedSize limit. +// If the new entries would exceed the limit, the method returns false. If not, +// the increase in uncommitted entry size is recorded and the method returns +// true. +func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool { + var s uint64 + for _, e := range ents { + s += uint64(e.Size()) + } + + if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize { + // If the uncommitted tail of the Raft log is empty, allow any size + // proposal. Otherwise, limit the size of the uncommitted tail of the + // log and drop any proposal that would push the size over the limit. + return false + } + r.uncommittedSize += s + return true +} + +// reduceUncommittedSize accounts for the newly committed entries by decreasing +// the uncommitted entry size limit. +func (r *raft) reduceUncommittedSize(ents []pb.Entry) { + if r.uncommittedSize == 0 { + // Fast-path for followers, who do not track or enforce the limit. + return + } + + var s uint64 + for _, e := range ents { + s += uint64(e.Size()) + } + if s > r.uncommittedSize { + // uncommittedSize may underestimate the size of the uncommitted Raft + // log tail but will never overestimate it. Saturate at 0 instead of + // allowing overflow. + r.uncommittedSize = 0 + } else { + r.uncommittedSize -= s + } +} + func numOfPendingConf(ents []pb.Entry) int { n := 0 for i := range ents { diff --git a/go.etcd.io/etcd/raft/rawnode.go b/go.etcd.io/etcd/raft/rawnode.go index a4cecfc8f7..4a4ec2e946 100644 --- a/go.etcd.io/etcd/raft/rawnode.go +++ b/go.etcd.io/etcd/raft/rawnode.go @@ -47,18 +47,15 @@ func (rn *RawNode) commitReady(rd Ready) { if !IsEmptyHardState(rd.HardState) { rn.prevHardSt = rd.HardState } - if rn.prevHardSt.Commit != 0 { - // In most cases, prevHardSt and rd.HardState will be the same - // because when there are new entries to apply we just sent a - // HardState with an updated Commit value. However, on initial - // startup the two are different because we don't send a HardState - // until something changes, but we do send any un-applied but - // committed entries (and previously-committed entries may be - // incorporated into the snapshot, even if rd.CommittedEntries is - // empty). Therefore we mark all committed entries as applied - // whether they were included in rd.HardState or not. - rn.raft.raftLog.appliedTo(rn.prevHardSt.Commit) + + // If entries were applied (or a snapshot), update our cursor for + // the next Ready. Note that if the current HardState contains a + // new Commit index, this does not mean that we're also applying + // all of the new entries due to commit pagination by size. + if index := rd.appliedCursor(); index > 0 { + rn.raft.raftLog.appliedTo(index) } + if len(rd.Entries) > 0 { e := rd.Entries[len(rd.Entries)-1] rn.raft.raftLog.stableTo(e.Index, e.Term) @@ -201,6 +198,7 @@ func (rn *RawNode) Step(m pb.Message) error { func (rn *RawNode) Ready() Ready { rd := rn.newReady() rn.raft.msgs = nil + rn.raft.reduceUncommittedSize(rd.CommittedEntries) return rd } diff --git a/go.etcd.io/etcd/raft/util.go b/go.etcd.io/etcd/raft/util.go index d744927c4a..1a7a1e9ac3 100644 --- a/go.etcd.io/etcd/raft/util.go +++ b/go.etcd.io/etcd/raft/util.go @@ -113,6 +113,16 @@ func DescribeEntry(e pb.Entry, f EntryFormatter) string { return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted) } +// DescribeEntries calls DescribeEntry for each Entry, adding a newline to +// each. +func DescribeEntries(ents []pb.Entry, f EntryFormatter) string { + var buf bytes.Buffer + for _, e := range ents { + _, _ = buf.WriteString(DescribeEntry(e, f) + "\n") + } + return buf.String() +} + func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry { if len(ents) == 0 { return ents