Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

raft: don't panic when looking for term conflicts #36

Merged
merged 6 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 43 additions & 42 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,24 +107,25 @@ func (l *raftLog) String() string {
// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
// it returns (last index of new entries, true).
func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
if l.matchTerm(index, logTerm) {
lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
if ci-offset > uint64(len(ents)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents))
}
l.append(ents[ci-offset:]...)
if !l.matchTerm(index, logTerm) {
return 0, false
}

lastnewi = index + uint64(len(ents))
ci := l.findConflict(ents)
switch {
case ci == 0:
case ci <= l.committed:
l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
default:
offset := index + 1
if ci-offset > uint64(len(ents)) {
l.logger.Panicf("index, %d, is out of range [%d]", ci-offset, len(ents))
}
l.commitTo(min(committed, lastnewi))
return lastnewi, true
l.append(ents[ci-offset:]...)
}
return 0, false
l.commitTo(min(committed, lastnewi))
return lastnewi, true
}

func (l *raftLog) append(ents ...pb.Entry) uint64 {
Expand Down Expand Up @@ -161,34 +162,31 @@ func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
return 0
}

// findConflictByTerm takes an (index, term) pair (indicating a conflicting log
// entry on a leader/follower during an append) and finds the largest index in
// log l with a term <= `term` and an index <= `index`. If no such index exists
// in the log, the log's first index is returned.
// findConflictByTerm returns a best guess on where this log ends matching
// another log, given that the only information known about the other log is the
// (index, term) of its single entry.
//
// Specifically, the first returned value is the max guessIndex <= index, such
// that term(guessIndex) <= term or term(guessIndex) is not known (because this
// index is compacted or not yet stored).
//
// The index provided MUST be equal to or less than l.lastIndex(). Invalid
// inputs log a warning and the input index is returned.
func (l *raftLog) findConflictByTerm(index uint64, term uint64) uint64 {
if li := l.lastIndex(); index > li {
// NB: such calls should not exist, but since there is a straightfoward
// way to recover, do it.
//
// It is tempting to also check something about the first index, but
// there is odd behavior with peers that have no log, in which case
// lastIndex will return zero and firstIndex will return one, which
// leads to calls with an index of zero into this method.
l.logger.Warningf("index(%d) is out of range [0, lastIndex(%d)] in findConflictByTerm",
index, li)
return index
}
for {
logTerm, err := l.term(index)
if logTerm <= term || err != nil {
break
// The second returned value is the term(guessIndex), or 0 if it is unknown.
//
// This function is used by a follower and leader to resolve log conflicts after
// an unsuccessful append to a follower, and ultimately restore the steady flow
// of appends.
func (l *raftLog) findConflictByTerm(index uint64, term uint64) (uint64, uint64) {
for ; index > 0; index-- {
// If there is an error (likely ErrCompacted or ErrUnavailable), we don't
// know whether it's a match or not, so assume a possible match and return
// the index, with 0 term indicating an unknown term.
if ourTerm, err := l.term(index); err != nil {
return index, 0
} else if ourTerm <= term {
return index, ourTerm
}
index--
}
return index
return 0, 0
}

// nextUnstableEnts returns all entries that are available to be written to the
Expand Down Expand Up @@ -447,7 +445,10 @@ func (l *raftLog) matchTerm(i, term uint64) bool {
}

func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
if maxIndex > l.committed && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term {
// NB: term should never be 0 on a commit because the leader campaigns at
// least at term 1. But if it is 0 for some reason, we don't want to consider
// this a term match in case zeroTermOnOutOfBounds returns 0.
if maxIndex > l.committed && term != 0 && l.zeroTermOnOutOfBounds(l.term(maxIndex)) == term {
l.commitTo(maxIndex)
return true
}
Expand Down
57 changes: 57 additions & 0 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,63 @@ func TestFindConflict(t *testing.T) {
}
}

func TestFindConflictByTerm(t *testing.T) {
ents := func(fromIndex uint64, terms []uint64) []pb.Entry {
e := make([]pb.Entry, 0, len(terms))
for i, term := range terms {
e = append(e, pb.Entry{Term: term, Index: fromIndex + uint64(i)})
}
return e
}
for _, tt := range []struct {
ents []pb.Entry // ents[0] contains the (index, term) of the snapshot
index uint64
term uint64
want uint64
}{
// Log starts from index 1.
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 100, term: 2, want: 100}, // ErrUnavailable
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 6, want: 5},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 5, want: 5},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 4, want: 2},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 2, want: 2},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 5, term: 1, want: 0},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 1, term: 2, want: 1},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 1, term: 1, want: 0},
{ents: ents(0, []uint64{0, 2, 2, 5, 5, 5}), index: 0, term: 0, want: 0},
// Log with compacted entries.
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 30, term: 3, want: 30}, // ErrUnavailable
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 9, want: 14},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 4, want: 14},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 3, want: 12},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 14, term: 2, want: 9},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 11, term: 5, want: 11},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 10, term: 5, want: 10},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 10, term: 3, want: 10},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 10, term: 2, want: 9},
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 9, term: 2, want: 9}, // ErrCompacted
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 4, term: 2, want: 4}, // ErrCompacted
{ents: ents(10, []uint64{3, 3, 3, 4, 4, 4}), index: 0, term: 0, want: 0}, // ErrCompacted
} {
t.Run("", func(t *testing.T) {
st := NewMemoryStorage()
require.NotEmpty(t, tt.ents)
st.ApplySnapshot(pb.Snapshot{Metadata: pb.SnapshotMetadata{
Index: tt.ents[0].Index,
Term: tt.ents[0].Term,
}})
l := newLog(st, raftLogger)
l.append(tt.ents[1:]...)

index, term := l.findConflictByTerm(tt.index, tt.term)
require.Equal(t, tt.want, index)
wantTerm, err := l.term(index)
wantTerm = l.zeroTermOnOutOfBounds(wantTerm, err)
require.Equal(t, wantTerm, term)
})
}
}

func TestIsUpToDate(t *testing.T) {
previousEnts := []pb.Entry{{Index: 1, Term: 1}, {Index: 2, Term: 2}, {Index: 3, Term: 3}}
raftLog := newLog(NewMemoryStorage(), raftLogger)
Expand Down
62 changes: 33 additions & 29 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1394,7 +1394,7 @@ func stepLeader(r *raft, m pb.Message) error {
// 7, the rejection points it at the end of the follower's log
// which is at a higher log term than the actually committed
// log.
nextProbeIdx = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
nextProbeIdx, _ = r.raftLog.findConflictByTerm(m.RejectHint, m.LogTerm)
}
if pr.MaybeDecrTo(m.Index, nextProbeIdx) {
r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
Expand Down Expand Up @@ -1648,37 +1648,39 @@ func (r *raft) handleAppendEntries(m pb.Message) {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
return
}

if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
} else {
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)

// Return a hint to the leader about the maximum index and term that the
// two logs could be divergent at. Do this by searching through the
// follower's log for the maximum (index, term) pair with a term <= the
// MsgApp's LogTerm and an index <= the MsgApp's Index. This can help
// skip all indexes in the follower's uncommitted tail with terms
// greater than the MsgApp's LogTerm.
//
// See the other caller for findConflictByTerm (in stepLeader) for a much
// more detailed explanation of this mechanism.
hintIndex := min(m.Index, r.raftLog.lastIndex())
hintIndex = r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
hintTerm, err := r.raftLog.term(hintIndex)
if err != nil {
panic(fmt.Sprintf("term(%d) must be valid, but got %v", hintIndex, err))
}
r.send(pb.Message{
To: m.From,
Type: pb.MsgAppResp,
Index: m.Index,
Reject: true,
RejectHint: hintIndex,
LogTerm: hintTerm,
})
return
}
r.logger.Debugf("%x [logterm: %d, index: %d] rejected MsgApp [logterm: %d, index: %d] from %x",
r.id, r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)

// Our log does not match the leader's at index m.Index. Return a hint to the
// leader - a guess on the maximal (index, term) at which the logs match. Do
// this by searching through the follower's log for the maximum (index, term)
// pair with a term <= the MsgApp's LogTerm and an index <= the MsgApp's
// Index. This can help skip all indexes in the follower's uncommitted tail
// with terms greater than the MsgApp's LogTerm.
//
// See the other caller for findConflictByTerm (in stepLeader) for a much more
// detailed explanation of this mechanism.

// NB: m.Index >= raftLog.committed by now (see the early return above), and
// raftLog.lastIndex() >= raftLog.committed by invariant, so min of the two is
// also >= raftLog.committed. Hence, the findConflictByTerm argument is within
// the valid interval, which then will return a valid (index, term) pair with
// a non-zero term (unless the log is empty). However, it is safe to send a zero
// LogTerm in this response in any case, so we don't verify it here.
hintIndex := min(m.Index, r.raftLog.lastIndex())
hintIndex, hintTerm := r.raftLog.findConflictByTerm(hintIndex, m.LogTerm)
r.send(pb.Message{
To: m.From,
Type: pb.MsgAppResp,
Index: m.Index,
Reject: true,
RejectHint: hintIndex,
LogTerm: hintTerm,
})
}

func (r *raft) handleHeartbeat(m pb.Message) {
Expand Down Expand Up @@ -1909,6 +1911,8 @@ func (r *raft) abortLeaderTransfer() {

// committedEntryInCurrentTerm return true if the peer has committed an entry in its term.
func (r *raft) committedEntryInCurrentTerm() bool {
// NB: r.Term is never 0 on a leader, so if zeroTermOnOutOfBounds returns 0,
pav-kv marked this conversation as resolved.
Show resolved Hide resolved
// we won't see it as a match with r.Term.
return r.raftLog.zeroTermOnOutOfBounds(r.raftLog.term(r.raftLog.committed)) == r.Term
}

Expand Down
Loading