Skip to content

raft: clean up raftLog entry slice indexing #130980

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 54 additions & 49 deletions pkg/raft/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (l *raftLog) nextCommittedEnts(allowUnstable bool) (ents []pb.Entry) {
// See comment in hasNextCommittedEnts.
return nil
}
lo, hi := l.applying+1, l.maxAppliableIndex(allowUnstable)+1 // [lo, hi)
lo, hi := l.applying, l.maxAppliableIndex(allowUnstable) // (lo, hi]
if lo >= hi {
// Nothing to apply.
return nil
Expand Down Expand Up @@ -420,44 +420,42 @@ func (l *raftLog) lastEntryID() entryID {
return entryID{term: t, index: index}
}

func (l *raftLog) term(i uint64) (uint64, error) {
// Check the unstable log first, even before computing the valid term range,
// which may need to access stable Storage. If we find the entry's term in
// the unstable log, we know it was in the valid range.
if t, ok := l.unstable.maybeTerm(i); ok {
return t, nil
}

// The valid term range is [firstIndex-1, lastIndex]. Even though the entry at
// firstIndex-1 is compacted away, its term is available for matching purposes
// when doing log appends.
if i+1 < l.firstIndex() {
return 0, ErrCompacted
}
if i > l.lastIndex() {
// term returns the term of the log entry at the given index.
func (l *raftLog) term(index uint64) (uint64, error) {
// Check the unstable log first, even before computing the valid index range,
// which may need to access the storage. If we find the entry's term in the
// unstable log, we know it was in the valid range.
if index > l.unstable.lastIndex() {
return 0, ErrUnavailable
} else if index >= l.unstable.prev.index {
return l.unstable.termAt(index), nil
} else if index+1 < l.firstIndex() {
return 0, ErrCompacted
}

t, err := l.storage.Term(i)
term, err := l.storage.Term(index)
if err == nil {
return t, nil
}
if err == ErrCompacted || err == ErrUnavailable {
return term, nil
} else if err == ErrCompacted || err == ErrUnavailable {
return 0, err
}
panic(err) // TODO(bdarnell)
panic(err) // TODO(pav-kv): return the error and handle it up the stack.
}

func (l *raftLog) entries(i uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
if i > l.lastIndex() {
// entries returns a contiguous slice of log entries at indices > after, with
// the total size not exceeding maxSize. The total size can exceed maxSize if
// the first entry (at index after+1) is larger than maxSize. Returns nil if
// there are no entries at indices > after.
func (l *raftLog) entries(after uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
if after >= l.lastIndex() {
return nil, nil
}
return l.slice(i, l.lastIndex()+1, maxSize)
return l.slice(after, l.lastIndex(), maxSize)
}

// allEntries returns all entries in the log.
// allEntries returns all entries in the log. For testing only.
func (l *raftLog) allEntries() []pb.Entry {
ents, err := l.entries(l.firstIndex(), noLimit)
ents, err := l.entries(l.firstIndex()-1, noLimit)
if err == nil {
return ents
}
Expand Down Expand Up @@ -498,12 +496,12 @@ func (l *raftLog) restore(s snapshot) bool {
return true
}

// scan visits all log entries in the [lo, hi) range, returning them via the
// scan visits all log entries in the (lo, hi] range, returning them via the
// given callback. The callback can be invoked multiple times, with consecutive
// sub-ranges of the requested range. Returns up to pageSize bytes worth of
// entries at a time. May return more if a single entry size exceeds the limit.
//
// The entries in [lo, hi) must exist, otherwise scan() eventually returns an
// The entries in (lo, hi] must exist, otherwise scan() eventually returns an
// error (possibly after passing some entries through the callback).
//
// If the callback returns an error, scan terminates and returns this error
Expand All @@ -524,34 +522,44 @@ func (l *raftLog) scan(lo, hi uint64, pageSize entryEncodingSize, v func([]pb.En
return nil
}

// slice returns a slice of log entries from lo through hi-1, inclusive.
// slice returns a prefix of the log in the (lo, hi] interval, with the total
// entries size up to maxSize. May exceed maxSize if the first entry (lo+1) is
// larger. Returns at least one entry if the interval is non-empty.
//
// The returned slice can be appended to, but the entries in it must not be
// mutated.
func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, error) {
// TODO(pav-kv): simplify a bunch of arithmetics below.
if err := l.mustCheckOutOfBounds(lo, hi); err != nil {
return nil, err
}
if lo == hi {
} else if lo >= hi {
return nil, nil
}
if lo > l.unstable.prev.index {
ents := limitSize(l.unstable.slice(lo, hi), maxSize)

// Fast path: the (lo, hi] interval is fully in the unstable log.
if lo >= l.unstable.prev.index {
ents := limitSize(l.unstable.sub(lo, hi), maxSize)
// NB: use the full slice expression to protect the unstable slice from
// appends to the returned ents slice.
// potential appends to the returned slice.
return ents[:len(ents):len(ents)], nil
}

cut := min(hi, l.unstable.prev.index+1)
ents, err := l.storage.Entries(lo, cut, uint64(maxSize))
// Invariant: lo < cut = min(hi, l.unstable.prev.index).
cut := min(hi, l.unstable.prev.index)
// TODO(pav-kv): make Entries() take (lo, hi] instead of [lo, hi), for
// consistency. All raft log slices are constructed in context of being
// appended after a certain index, so (lo, hi] addressing makes more sense.
ents, err := l.storage.Entries(lo+1, cut+1, uint64(maxSize))
if err == ErrCompacted {
return nil, err
} else if err == ErrUnavailable {
l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, cut)
l.logger.Panicf("entries(%d:%d] is unavailable from storage", lo, cut)
} else if err != nil {
panic(err) // TODO(pavelkalinnikov): handle errors uniformly
panic(err) // TODO(pav-kv): handle errors uniformly
}
if hi <= l.unstable.prev.index+1 {
if hi <= l.unstable.prev.index { // all (lo, hi] entries are in storage
return ents, nil
}
// Invariant below: lo < cut < hi, and cut == l.unstable.prev.index.

// Fast path to check if ents has reached the size limitation. Either the
// returned slice is shorter than requested (which means the next entry would
Expand All @@ -566,7 +574,7 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e
return ents, nil
}

unstable := limitSize(l.unstable.slice(l.unstable.prev.index+1, hi), maxSize-size)
unstable := limitSize(l.unstable.sub(cut, hi), maxSize-size)
// Total size of unstable may exceed maxSize-size only if len(unstable) == 1.
// If this happens, ignore this extra entry.
if len(unstable) == 1 && size+entsSize(unstable) > maxSize {
Expand All @@ -577,19 +585,16 @@ func (l *raftLog) slice(lo, hi uint64, maxSize entryEncodingSize) ([]pb.Entry, e
return extend(ents, unstable), nil
}

// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
// mustCheckOutOfBounds checks that the (lo, hi] interval is within the bounds
// of this raft log: l.firstIndex()-1 <= lo <= hi <= l.lastIndex().
func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
if lo > hi {
l.logger.Panicf("invalid slice %d > %d", lo, hi)
}
fi := l.firstIndex()
if lo < fi {
if fi := l.firstIndex(); lo+1 < fi {
return ErrCompacted
}

length := l.lastIndex() + 1 - fi
if hi > fi+length {
l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
} else if li := l.lastIndex(); hi > li {
l.logger.Panicf("slice(%d,%d] out of bound [%d,%d]", lo, hi, fi, li)
}
return nil
}
Expand Down
94 changes: 31 additions & 63 deletions pkg/raft/log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,46 +690,15 @@ func TestIsOutOfBounds(t *testing.T) {
wpanic bool
wErrCompacted bool
}{
{
first - 2, first + 1,
false,
true,
},
{
first - 1, first + 1,
false,
true,
},
{
first, first,
false,
false,
},
{
first + num/2, first + num/2,
false,
false,
},
{
first + num - 1, first + num - 1,
false,
false,
},
{
first + num, first + num,
false,
false,
},
{
first + num, first + num + 1,
true,
false,
},
{
first + num + 1, first + num + 1,
true,
false,
},
{lo: first - 3, hi: first, wErrCompacted: true},
{lo: first - 2, hi: first, wErrCompacted: true},
{lo: first - 1, hi: first},
{lo: first + num/2, hi: first + num/2},
{lo: first + num - 1, hi: first + num - 1},
{lo: first + num - 2, hi: first + num - 1},
{lo: first + num, hi: first + num, wpanic: true},
{lo: first + num, hi: first + num + 1, wpanic: true},
{lo: first + num + 1, hi: first + num + 1, wpanic: true},
} {
t.Run("", func(t *testing.T) {
defer func() {
Expand Down Expand Up @@ -815,17 +784,16 @@ func TestSlice(t *testing.T) {
half := offset + num/2
halfe := pb.Entry{Index: half, Term: half}

entries := func(from, to uint64) []pb.Entry {
return index(from).termRange(from, to)
entries := func(lo, hi uint64) []pb.Entry { // (lo, hi]
return index(lo+1).termRange(lo+1, hi+1)
}

storage := NewMemoryStorage()
require.NoError(t, storage.ApplySnapshot(pb.Snapshot{
Metadata: pb.SnapshotMetadata{Index: offset}}))
require.NoError(t, storage.Append(entries(offset+1, half)))
require.NoError(t, storage.Append(entries(offset, half)))
l := newLog(storage, discardLogger)
require.True(t, l.append(entryID{term: half - 1, index: half - 1}.
append(intRange(half, last)...)))
require.True(t, l.append(pbEntryID(&halfe).append(intRange(half+1, last+1)...)))

for _, tt := range []struct {
lo uint64
Expand All @@ -836,18 +804,18 @@ func TestSlice(t *testing.T) {
wpanic bool
}{
// ErrCompacted.
{lo: offset - 1, hi: offset + 1, lim: noLimit, w: nil},
{lo: offset, hi: offset + 1, lim: noLimit, w: nil},
{lo: offset - 2, hi: offset, lim: noLimit, w: nil},
{lo: offset - 1, hi: offset, lim: noLimit, w: nil},
// panics
{lo: half, hi: half - 1, lim: noLimit, wpanic: true}, // lo and hi inversion
{lo: last, hi: last + 1, lim: noLimit, wpanic: true}, // hi is out of bounds
{lo: half, hi: half - 1, lim: noLimit, wpanic: true}, // lo and hi inversion
{lo: last - 1, hi: last + 1, lim: noLimit, wpanic: true}, // hi is out of bounds

// No limit.
{lo: offset + 1, hi: offset + 1, lim: noLimit, w: nil},
{lo: offset + 1, hi: half - 1, lim: noLimit, w: entries(offset+1, half-1)},
{lo: offset + 1, hi: half, lim: noLimit, w: entries(offset+1, half)},
{lo: offset + 1, hi: half + 1, lim: noLimit, w: entries(offset+1, half+1)},
{lo: offset + 1, hi: last, lim: noLimit, w: entries(offset+1, last)},
{lo: offset, hi: offset, lim: noLimit, w: nil},
{lo: offset, hi: half - 1, lim: noLimit, w: entries(offset, half-1)},
{lo: offset, hi: half, lim: noLimit, w: entries(offset, half)},
{lo: offset, hi: half + 1, lim: noLimit, w: entries(offset, half+1)},
{lo: offset, hi: last, lim: noLimit, w: entries(offset, last)},
{lo: half - 1, hi: half, lim: noLimit, w: entries(half-1, half)},
{lo: half - 1, hi: half + 1, lim: noLimit, w: entries(half-1, half+1)},
{lo: half - 1, hi: last, lim: noLimit, w: entries(half-1, last)},
Expand All @@ -856,24 +824,24 @@ func TestSlice(t *testing.T) {
{lo: last - 1, hi: last, lim: noLimit, w: entries(last-1, last)},

// At least one entry is always returned.
{lo: offset + 1, hi: last, lim: 0, w: entries(offset+1, offset+2)},
{lo: offset, hi: last, lim: 0, w: entries(offset, offset+1)},
{lo: half - 1, hi: half + 1, lim: 0, w: entries(half-1, half)},
{lo: half, hi: last, lim: 0, w: entries(half, half+1)},
{lo: half + 1, hi: last, lim: 0, w: entries(half+1, half+2)},
// Low limit.
{lo: offset + 1, hi: last, lim: uint64(halfe.Size() - 1), w: entries(offset+1, offset+2)},
{lo: offset, hi: last, lim: uint64(halfe.Size() - 1), w: entries(offset, offset+1)},
{lo: half - 1, hi: half + 1, lim: uint64(halfe.Size() - 1), w: entries(half-1, half)},
{lo: half, hi: last, lim: uint64(halfe.Size() - 1), w: entries(half, half+1)},
// Just enough for one limit.
{lo: offset + 1, hi: last, lim: uint64(halfe.Size()), w: entries(offset+1, offset+2)},
{lo: offset, hi: last, lim: uint64(halfe.Size()), w: entries(offset, offset+1)},
{lo: half - 1, hi: half + 1, lim: uint64(halfe.Size()), w: entries(half-1, half)},
{lo: half, hi: last, lim: uint64(halfe.Size()), w: entries(half, half+1)},
// Not enough for two limit.
{lo: offset + 1, hi: last, lim: uint64(halfe.Size() + 1), w: entries(offset+1, offset+2)},
{lo: offset, hi: last, lim: uint64(halfe.Size() + 1), w: entries(offset, offset+1)},
{lo: half - 1, hi: half + 1, lim: uint64(halfe.Size() + 1), w: entries(half-1, half)},
{lo: half, hi: last, lim: uint64(halfe.Size() + 1), w: entries(half, half+1)},
// Enough for two limit.
{lo: offset + 1, hi: last, lim: uint64(halfe.Size() * 2), w: entries(offset+1, offset+3)},
{lo: offset, hi: last, lim: uint64(halfe.Size() * 2), w: entries(offset, offset+2)},
{lo: half - 2, hi: half + 1, lim: uint64(halfe.Size() * 2), w: entries(half-2, half)},
{lo: half - 1, hi: half + 1, lim: uint64(halfe.Size() * 2), w: entries(half-1, half+1)},
{lo: half, hi: last, lim: uint64(halfe.Size() * 2), w: entries(half, half+2)},
Expand All @@ -889,8 +857,8 @@ func TestSlice(t *testing.T) {
}
}()
g, err := l.slice(tt.lo, tt.hi, entryEncodingSize(tt.lim))
require.False(t, tt.lo <= offset && err != ErrCompacted)
require.False(t, tt.lo > offset && err != nil)
require.False(t, tt.lo < offset && err != ErrCompacted)
require.False(t, tt.lo >= offset && err != nil)
require.Equal(t, tt.w, g)
})
}
Expand All @@ -916,8 +884,8 @@ func TestScan(t *testing.T) {

// Test that scan() returns the same entries as slice(), on all inputs.
for _, pageSize := range []entryEncodingSize{0, 1, 10, 100, entrySize, entrySize + 1} {
for lo := offset + 1; lo < last; lo++ {
for hi := lo; hi <= last; hi++ {
for lo := offset; lo < last; lo++ {
for hi := lo + 1; hi <= last; hi++ {
var got []pb.Entry
require.NoError(t, l.scan(lo, hi, pageSize, func(e []pb.Entry) error {
got = append(got, e...)
Expand Down
39 changes: 0 additions & 39 deletions pkg/raft/log_unstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,6 @@ func (u *unstable) maybeFirstIndex() (uint64, bool) {
return 0, false
}

// maybeTerm returns the term of the entry at index i, if there is any.
func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
if i < u.prev.index || i > u.lastIndex() {
return 0, false
}
return u.termAt(i), true
}

// nextEntries returns the unstable entries that are not already in the process
// of being written to storage.
func (u *unstable) nextEntries() []pb.Entry {
Expand Down Expand Up @@ -328,34 +320,3 @@ func (u *unstable) truncateAndAppend(a logSlice) bool {
u.entryInProgress = min(u.entryInProgress, a.prev.index)
return true
}

// slice returns the entries from the unstable log with indexes in the range
// [lo, hi). The entire range must be stored in the unstable log or the method
// will panic. The returned slice can be appended to, but the entries in it must
// not be changed because they are still shared with unstable.
//
// TODO(pavelkalinnikov): this, and similar []pb.Entry slices, may bubble up all
// the way to the application code through Ready struct. Protect other slices
// similarly, and document how the client can use them.
func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
u.mustCheckOutOfBounds(lo, hi)
// NB: use the full slice expression to limit what the caller can do with the
// returned slice. For example, an append will reallocate and copy this slice
// instead of corrupting the neighbouring u.entries.
offset := u.prev.index + 1
return u.entries[lo-offset : hi-offset : hi-offset]
}

// mustCheckOutOfBounds checks that [lo, hi) interval is included in
// (u.prev.index, u.lastIndex()].
// Equivalently, u.prev.index + 1 <= lo <= hi <= u.lastIndex() + 1.
//
// TODO(pav-kv): the callers check this already. Remove.
func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
if lo > hi {
u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi)
}
if last := u.lastIndex(); lo <= u.prev.index || hi > last+1 {
u.logger.Panicf("unstable.slice[%d,%d) out of bound (%d,%d]", lo, hi, u.prev.index, last)
}
}
Loading
Loading