Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CBG-4221 [3.1.11 backport] Don't increment high sequence cached for unused sequences on pendingLogs #7106

Merged
merged 2 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 61 additions & 68 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"errors"
"fmt"
"math"
"sort"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -126,6 +125,10 @@ func (entry *LogEntry) SetDeleted() {
entry.Flags |= channels.Deleted
}

func (entry *LogEntry) IsUnusedRange() bool {
return entry.DocID == "" && entry.EndSequence > 0
}

type LogEntries []*LogEntry

// A priority-queue of LogEntries, kept ordered by increasing sequence #.
Expand Down Expand Up @@ -572,7 +575,6 @@ func (c *changeCache) releaseUnusedSequence(ctx context.Context, sequence uint64
} else {
changedChannels.Add(unusedSeq)
}
c.channelCache.AddUnusedSequence(change)
if c.notifyChange != nil && len(changedChannels) > 0 {
c.notifyChange(ctx, changedChannels)
}
Expand All @@ -595,7 +597,6 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen
}
changedChannels := c.processEntry(ctx, change)
allChangedChannels = allChangedChannels.Update(changedChannels)
c.channelCache.AddUnusedSequence(change)
if c.notifyChange != nil {
c.notifyChange(ctx, allChangedChannels)
}
Expand All @@ -605,9 +606,6 @@ func (c *changeCache) releaseUnusedSequenceRange(ctx context.Context, fromSequen
// push unused range to either pending or skipped lists based on current state of the change cache
allChangedChannels = c.processUnusedRange(ctx, fromSequence, toSequence, allChangedChannels, timeReceived)

// update high seq cached
c.channelCache.AddUnusedSequence(&LogEntry{Sequence: toSequence})

if c.notifyChange != nil {
c.notifyChange(ctx, allChangedChannels)
}
Expand All @@ -633,71 +631,23 @@ func (c *changeCache) processUnusedRange(ctx context.Context, fromSequence, toSe
// isn't possible under normal processing - unused sequence ranges will normally be moved
// from pending to skipped in their entirety, as it's the processing of the pending sequence
// *after* the range that triggers the range to be skipped. A partial range in skipped means
// an duplicate entry with a sequence within the bounds of the range was previously present
// a duplicate entry with a sequence within the bounds of the range was previously present
// in pending.
base.WarnfCtx(ctx, "unused sequence range of #%d to %d contains duplicate sequences", fromSequence, toSequence)
base.WarnfCtx(ctx, "unused sequence range of #%d to %d contains duplicate sequences, will be ignored", fromSequence, toSequence)
}
return allChangedChannels
}

// _pushRangeToPending will push a sequence range to pending logs. If pending has entries in it, we will check if
// those entries are in the range and handle it, so we don't push duplicate sequences to pending
// _pushRangeToPending will push a sequence range to pendingLogs
func (c *changeCache) _pushRangeToPending(ctx context.Context, startSeq, endSeq uint64, timeReceived time.Time) {
if c.pendingLogs.Len() == 0 {
// push whole range & return early to avoid duplicate checks
entry := &LogEntry{
TimeReceived: timeReceived,
Sequence: startSeq,
EndSequence: endSeq,
}
heap.Push(&c.pendingLogs, entry)
return
}

// check for duplicate sequences between range and pending logs
// loop till we have processed unused sequence range (or until we
// have range of sequences that aren't present in pending list)
for startSeq <= endSeq {
i, found := sort.Find(c.pendingLogs.Len(), func(i int) int {
value := c.pendingLogs[i]
if value.Sequence > endSeq {
// range is less than current pending entry
return -1
}
if startSeq <= value.Sequence && endSeq >= value.Sequence {
// found pending entry that has duplicate entry between itself and unused range
return 0
}
// range is larger then current element
return 1
})
if found {
// grab pending entry at that index and process unused range between startSeq and pending entry.Sequence - 1
pendingEntry := c.pendingLogs[i]
base.DebugfCtx(ctx, base.KeyCache, "Ignoring duplicate of #%d (unusedSequence)", pendingEntry.Sequence)
entry := &LogEntry{
TimeReceived: timeReceived,
Sequence: startSeq,
EndSequence: pendingEntry.Sequence - 1,
}
heap.Push(&c.pendingLogs, entry)
// update start seq on range
startSeq = pendingEntry.Sequence + 1
} else {
// if range not found in pending then break from loop early
break
}
entry := &LogEntry{
TimeReceived: timeReceived,
Sequence: startSeq,
EndSequence: endSeq,
}
heap.Push(&c.pendingLogs, entry)

// push what's left of seq range
if startSeq <= endSeq {
entry := &LogEntry{
TimeReceived: timeReceived,
Sequence: startSeq,
EndSequence: endSeq,
}
heap.Push(&c.pendingLogs, entry)
}
}

// Process unused sequence notification. Extracts sequence from docID and sends to cache for buffering
Expand Down Expand Up @@ -848,8 +798,9 @@ func (c *changeCache) _addToCache(ctx context.Context, change *LogEntry) []chann
}
delete(c.receivedSeqs, change.Sequence)

// If unused sequence or principal, we're done after updating sequence
// If unused sequence, notify the cache and return
if change.DocID == "" {
c.channelCache.AddUnusedSequence(change)
return nil
}

Expand Down Expand Up @@ -885,16 +836,23 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
isNext = oldestPending.Sequence == c.nextSequence

if isNext {
heap.Pop(&c.pendingLogs)
oldestPending = c._popPendingLog(ctx)
changedChannels = changedChannels.UpdateWithSlice(c._addToCache(ctx, oldestPending))
} else if oldestPending.Sequence < c.nextSequence {
// oldest pending is lower than next sequence, should be ignored
base.InfofCtx(ctx, base.KeyCache, "Oldest entry in pending logs %v (%d, %d) is earlier than cache next sequence (%d), ignoring as sequence has already been cached", base.UD(oldestPending.DocID), oldestPending.Sequence, oldestPending.EndSequence, c.nextSequence)
oldestPending = c._popPendingLog(ctx)

// If the oldestPending was a range that extended past nextSequence, update nextSequence
if oldestPending.IsUnusedRange() && oldestPending.EndSequence >= c.nextSequence {
c.nextSequence = oldestPending.EndSequence + 1
}
} else if len(c.pendingLogs) > c.options.CachePendingSeqMaxNum || time.Since(c.pendingLogs[0].TimeReceived) >= c.options.CachePendingSeqMaxWait {
// Skip all sequences up to the oldest Pending
c.PushSkipped(ctx, c.nextSequence, oldestPending.Sequence-1)
// disallow c.nextSequence decreasing
if c.nextSequence < oldestPending.Sequence {
c.nextSequence = oldestPending.Sequence
}
c.nextSequence = oldestPending.Sequence
} else {
// nextSequence is not in pending logs, and pending logs size/age doesn't trigger skipped sequences
break
}
}
Expand All @@ -905,6 +863,41 @@ func (c *changeCache) _addPendingLogs(ctx context.Context) channels.Set {
return changedChannels
}

// _popPendingLog pops the next pending LogEntry from the c.pendingLogs heap. When the popped entry is an unused range,
// performs a defensive check for duplicates with the next entry in pending. If unused range overlaps with next entry,
// reduces the unused range to stop at the next pending entry.
func (c *changeCache) _popPendingLog(ctx context.Context) *LogEntry {
poppedEntry := heap.Pop(&c.pendingLogs).(*LogEntry)
// If it's not a range, no additional handling needed
if !poppedEntry.IsUnusedRange() {
return poppedEntry
}
// If there are no more pending logs, no additional handling needed
if len(c.pendingLogs) == 0 {
return poppedEntry
}

nextPendingEntry := c.pendingLogs[0]
// If popped entry range does not overlap with next pending entry, no additional handling needed
// e.g. popped [15-20], nextPendingEntry is [25]
if poppedEntry.EndSequence < nextPendingEntry.Sequence {
return poppedEntry
}

// If nextPendingEntry's sequence duplicates the start of the unused range, ignored popped entry and return next entry instead
// e.g. popped [15-20], nextPendingEntry is [15]
if poppedEntry.Sequence == nextPendingEntry.Sequence {
base.InfofCtx(ctx, base.KeyCache, "Unused sequence range in pendingLogs (%d, %d) has start equal to next pending sequence (%s, %d) - unused range will be ignored", poppedEntry.Sequence, poppedEntry.EndSequence, nextPendingEntry.DocID, nextPendingEntry.Sequence)
return c._popPendingLog(ctx)
}

// Otherwise, reduce the popped unused range to end before the next pending sequence
// e.g. popped [15-20], nextPendingEntry is [18]
base.InfofCtx(ctx, base.KeyCache, "Unused sequence range in pendingLogs (%d, %d) overlaps with next pending sequence (%s, %d) - unused range will be truncated", poppedEntry.Sequence, poppedEntry.EndSequence, nextPendingEntry.DocID, nextPendingEntry.Sequence)
poppedEntry.EndSequence = nextPendingEntry.Sequence - 1
return poppedEntry
}

func (c *changeCache) GetStableSequence(docID string) SequenceID {
// Stable sequence is independent of docID in changeCache
return SequenceID{Seq: c.LastSequence()}
Expand Down
Loading
Loading