Skip to content
This repository has been archived by the owner on Jun 24, 2024. It is now read-only.

Commit

Permalink
Bump go.etcd.io/etcd
Browse files Browse the repository at this point in the history
This picks up etcd-io/etcd#10167.
  • Loading branch information
nvanbenschoten committed Oct 14, 2018
1 parent 2f8c86b commit e858131
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 36 deletions.
1 change: 1 addition & 0 deletions go.etcd.io/etcd/raft/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ This raft implementation also includes a few optional enhancements:
- Writing to leader's disk in parallel
- Internal proposal redirection from followers to leader
- Automatic stepping down when the leader loses quorum
- Protection against unbounded log growth when quorum is lost

## Notable Users

Expand Down
6 changes: 3 additions & 3 deletions go.etcd.io/etcd/raft/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ large).
Note: Marshalling messages is not thread-safe; it is important that you
make sure that no new entries are persisted while marshalling.
The easiest way to achieve this is to serialise the messages directly inside
The easiest way to achieve this is to serialize the messages directly inside
your main raft loop.
3. Apply Snapshot (if any) and CommittedEntries to the state machine.
Expand Down Expand Up @@ -153,7 +153,7 @@ If the proposal is committed, data will appear in committed entries with type
raftpb.EntryNormal. There is no guarantee that a proposed command will be
committed; you may have to re-propose after a timeout.
To add or remove node in a cluster, build ConfChange struct 'cc' and call:
To add or remove a node in a cluster, build ConfChange struct 'cc' and call:
n.ProposeConfChange(ctx, cc)
Expand Down Expand Up @@ -260,7 +260,7 @@ stale log entries:
'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election
protocol. When Config.PreVote is true, a pre-election is carried out first
(using the same rules as a regular election), and no node increases its term
number unless the pre-election indicates that the campaigining node would win.
number unless the pre-election indicates that the campaigning node would win.
This minimizes disruption when a partitioned node rejoins the cluster.
'MsgSnap' requests to install a snapshot message. When a node has just
Expand Down
32 changes: 21 additions & 11 deletions go.etcd.io/etcd/raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@ func (rd Ready) containsUpdates() bool {
len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
}

// appliedCursor extracts from the Ready the highest index the client has
// applied (once the Ready is confirmed via Advance). If no information is
// contained in the Ready, returns zero.
func (rd Ready) appliedCursor() uint64 {
if n := len(rd.CommittedEntries); n > 0 {
return rd.CommittedEntries[n-1].Index
}
if index := rd.Snapshot.Metadata.Index; index > 0 {
return index
}
return 0
}

// Node represents a node in a raft cluster.
type Node interface {
// Tick increments the internal logical clock for the Node by a single tick. Election
Expand Down Expand Up @@ -282,6 +295,7 @@ func (n *node) run(r *raft) {
var prevLastUnstablei, prevLastUnstablet uint64
var havePrevLastUnstablei bool
var prevSnapi uint64
var applyingToI uint64
var rd Ready

lead := None
Expand Down Expand Up @@ -381,13 +395,18 @@ func (n *node) run(r *raft) {
if !IsEmptySnap(rd.Snapshot) {
prevSnapi = rd.Snapshot.Metadata.Index
}
if index := rd.appliedCursor(); index != 0 {
applyingToI = index
}

r.msgs = nil
r.readStates = nil
r.reduceUncommittedSize(rd.CommittedEntries)
advancec = n.advancec
case <-advancec:
if prevHardSt.Commit != 0 {
r.raftLog.appliedTo(prevHardSt.Commit)
if applyingToI != 0 {
r.raftLog.appliedTo(applyingToI)
applyingToI = 0
}
if havePrevLastUnstablei {
r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
Expand Down Expand Up @@ -559,15 +578,6 @@ func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
}
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
// If we hit a size limit when loadaing CommittedEntries, clamp
// our HardState.Commit to what we're actually returning. This is
// also used as our cursor to resume for the next Ready batch.
if len(rd.CommittedEntries) > 0 {
lastCommit := rd.CommittedEntries[len(rd.CommittedEntries)-1]
if rd.HardState.Commit > lastCommit.Index {
rd.HardState.Commit = lastCommit.Index
}
}
}
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
Expand Down
85 changes: 74 additions & 11 deletions go.etcd.io/etcd/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,17 @@ type Config struct {
// applied entries. This is a very application dependent configuration.
Applied uint64

// MaxSizePerMsg limits the max size of each append message. Smaller value
// lowers the raft recovery cost(initial probing and message lost during normal
// operation). On the other side, it might affect the throughput during normal
// replication. Note: math.MaxUint64 for unlimited, 0 for at most one entry per
// message.
// MaxSizePerMsg limits the max byte size of each append message. Smaller
// value lowers the raft recovery cost(initial probing and message lost
// during normal operation). On the other side, it might affect the
// throughput during normal replication. Note: math.MaxUint64 for unlimited,
// 0 for at most one entry per message.
MaxSizePerMsg uint64
// MaxUncommittedEntriesSize limits the aggregate byte size of the
// uncommitted entries that may be appended to a leader's log. Once this
// limit is exceeded, proposals will begin to return ErrProposalDropped
// errors. Note: 0 for no limit.
MaxUncommittedEntriesSize uint64
// MaxInflightMsgs limits the max number of in-flight append messages during
// optimistic replication phase. The application transportation layer usually
// has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
Expand Down Expand Up @@ -215,6 +220,10 @@ func (c *Config) validate() error {
return errors.New("storage cannot be nil")
}

if c.MaxUncommittedEntriesSize == 0 {
c.MaxUncommittedEntriesSize = noLimit
}

if c.MaxInflightMsgs <= 0 {
return errors.New("max inflight messages must be greater than 0")
}
Expand All @@ -241,11 +250,12 @@ type raft struct {
// the log
raftLog *raftLog

maxInflight int
maxMsgSize uint64
prs map[uint64]*Progress
learnerPrs map[uint64]*Progress
matchBuf uint64Slice
maxMsgSize uint64
maxUncommittedSize uint64
maxInflight int
prs map[uint64]*Progress
learnerPrs map[uint64]*Progress
matchBuf uint64Slice

state StateType

Expand All @@ -268,6 +278,10 @@ type raft struct {
// be proposed if the leader's applied index is greater than this
// value.
pendingConfIndex uint64
// an estimate of the size of the uncommitted tail of the Raft log. Used to
// prevent unbounded log growth. Only maintained by the leader. Reset on
// term changes.
uncommittedSize uint64

readOnly *readOnly

Expand Down Expand Up @@ -326,6 +340,7 @@ func newRaft(c *Config) *raft {
raftLog: raftlog,
maxMsgSize: c.MaxSizePerMsg,
maxInflight: c.MaxInflightMsgs,
maxUncommittedSize: c.MaxUncommittedEntriesSize,
prs: make(map[uint64]*Progress),
learnerPrs: make(map[uint64]*Progress),
electionTimeout: c.ElectionTick,
Expand Down Expand Up @@ -514,7 +529,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
return true
}

// sendHeartbeat sends an empty MsgApp
// sendHeartbeat sends a heartbeat RPC to the given peer.
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
// Attach the commit as min(to.matched, r.committed).
// When the leader sends out heartbeat message,
Expand Down Expand Up @@ -616,6 +631,7 @@ func (r *raft) reset(term uint64) {
})

r.pendingConfIndex = 0
r.uncommittedSize = 0
r.readOnly = newReadOnly(r.readOnly.option)
}

Expand Down Expand Up @@ -954,6 +970,10 @@ func stepLeader(r *raft, m pb.Message) error {
r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
return ErrProposalDropped
}
if !r.increaseUncommittedSize(m.Entries) {
r.logger.Debugf("%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal", r.id)
return ErrProposalDropped
}

for i, e := range m.Entries {
if e.Type == pb.EntryConfChange {
Expand Down Expand Up @@ -1462,6 +1482,49 @@ func (r *raft) abortLeaderTransfer() {
r.leadTransferee = None
}

// increaseUncommittedSize computes the size of the proposed entries and
// determines whether they would push leader over its maxUncommittedSize limit.
// If the new entries would exceed the limit, the method returns false. If not,
// the increase in uncommitted entry size is recorded and the method returns
// true.
func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
var s uint64
for _, e := range ents {
s += uint64(e.Size())
}

if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
// If the uncommitted tail of the Raft log is empty, allow any size
// proposal. Otherwise, limit the size of the uncommitted tail of the
// log and drop any proposal that would push the size over the limit.
return false
}
r.uncommittedSize += s
return true
}

// reduceUncommittedSize accounts for the newly committed entries by decreasing
// the uncommitted entry size limit.
func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
if r.uncommittedSize == 0 {
// Fast-path for followers, who do not track or enforce the limit.
return
}

var s uint64
for _, e := range ents {
s += uint64(e.Size())
}
if s > r.uncommittedSize {
// uncommittedSize may underestimate the size of the uncommitted Raft
// log tail but will never overestimate it. Saturate at 0 instead of
// allowing overflow.
r.uncommittedSize = 0
} else {
r.uncommittedSize -= s
}
}

func numOfPendingConf(ents []pb.Entry) int {
n := 0
for i := range ents {
Expand Down
20 changes: 9 additions & 11 deletions go.etcd.io/etcd/raft/rawnode.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,15 @@ func (rn *RawNode) commitReady(rd Ready) {
if !IsEmptyHardState(rd.HardState) {
rn.prevHardSt = rd.HardState
}
if rn.prevHardSt.Commit != 0 {
// In most cases, prevHardSt and rd.HardState will be the same
// because when there are new entries to apply we just sent a
// HardState with an updated Commit value. However, on initial
// startup the two are different because we don't send a HardState
// until something changes, but we do send any un-applied but
// committed entries (and previously-committed entries may be
// incorporated into the snapshot, even if rd.CommittedEntries is
// empty). Therefore we mark all committed entries as applied
// whether they were included in rd.HardState or not.
rn.raft.raftLog.appliedTo(rn.prevHardSt.Commit)

// If entries were applied (or a snapshot), update our cursor for
// the next Ready. Note that if the current HardState contains a
// new Commit index, this does not mean that we're also applying
// all of the new entries due to commit pagination by size.
if index := rd.appliedCursor(); index > 0 {
rn.raft.raftLog.appliedTo(index)
}

if len(rd.Entries) > 0 {
e := rd.Entries[len(rd.Entries)-1]
rn.raft.raftLog.stableTo(e.Index, e.Term)
Expand Down Expand Up @@ -201,6 +198,7 @@ func (rn *RawNode) Step(m pb.Message) error {
func (rn *RawNode) Ready() Ready {
rd := rn.newReady()
rn.raft.msgs = nil
rn.raft.reduceUncommittedSize(rd.CommittedEntries)
return rd
}

Expand Down
10 changes: 10 additions & 0 deletions go.etcd.io/etcd/raft/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,16 @@ func DescribeEntry(e pb.Entry, f EntryFormatter) string {
return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted)
}

// DescribeEntries calls DescribeEntry for each Entry, adding a newline to
// each.
func DescribeEntries(ents []pb.Entry, f EntryFormatter) string {
var buf bytes.Buffer
for _, e := range ents {
_, _ = buf.WriteString(DescribeEntry(e, f) + "\n")
}
return buf.String()
}

func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry {
if len(ents) == 0 {
return ents
Expand Down

0 comments on commit e858131

Please sign in to comment.