Skip to content

Commit

Permalink
db: track files' pre-zeroing largest sequence numbers
Browse files Browse the repository at this point in the history
When there exist no keys beneath a compaction's key range, Pebble performs
sequence number zeroing. This is an optimization that allows for easier
detection of new user keys during iteration. This commit introduces a new
FileMetadata field LargestSeqNumAbsolute that provides an upper bound on the
sequence numbers of a sstables' keys before they were zeroed. This is useful as
a stable upper bound on the recency of an sstable's keys.

In this commit we use this new upper bound to provide an alternative solution
to the interaction between delete-only compactions and sequence number zeroing.
Previously any compaction that zeroed sequence numbers and overlapped a
delete-only compaction hint was required to clear the conflicting hints to
ensure a delete-only compaction did not accidentally drop a table containing
keys more recent than any of the hint's constituent tombstones. This
interaction was a bit indirect and subtle. Encoding the pre-zeroing sequence
number on the file metadata is more direct, and will allow us to use the
sequence number for recency ordering of sstables' keys more generally,
including in cockroachdb#2112.

When the database is closed and re-opened, the new field LargestSeqNumAbsolute
is initialized to LargestSeqNum for all existing sstables. This means that
LargestSeqNumAbsolute only provides an upper bound of a sstables' keys'
sequence numbers over the lifetime of the database instance in the current
process. This is sufficient for many use cases, including delete-only
compaction accounting.

The reason this is sufficient in the delete-only compaction use case is subtle.
The problem we're seeking to avoid is a range tombstone [start,end)#n deleting
a key k#m such that s ≤ k < e and m ≥ n. Because of the sequence number
invariant, the range tombstone can never fall beneath the key k that it does
not delete within the LSM. However, our in-memory delete-only compaction hints
are not atomically updated with transformations of the LSM. They represent the
state of the LSM at a single instant when the table stats collector observed
range deletion(s) within a particular file. This stale view of the LSM is what
necessitates a mechanism like LargestSeqNumAbsolute to avoid erroroneous
applications of a deletion hint. After a database restart, none of the previous
instance's in-memory delete-only compactions hints exist. The table stats
collector must re-populate the hints by scanning range deletions in sstables in
the background. However, because the sequence number invariant prevents
inversion of sequence numbers across process restarts, any hints we construct
from the LSM will be correct with respect to that view of the LSM.
  • Loading branch information
jbowens committed May 2, 2024
1 parent 902b6d0 commit 562ff38
Show file tree
Hide file tree
Showing 14 changed files with 212 additions and 178 deletions.
102 changes: 41 additions & 61 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,18 @@ type compaction struct {
pickerMetrics compactionPickerMetrics
}

// inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any
// input sstables.
func (c *compaction) inputLargestSeqNumAbsolute() uint64 {
var seqNum uint64
for _, cl := range c.inputs {
cl.files.Each(func(m *manifest.FileMetadata) {
seqNum = max(seqNum, m.LargestSeqNumAbsolute)
})
}
return seqNum
}

func (c *compaction) makeInfo(jobID JobID) CompactionInfo {
info := CompactionInfo{
JobID: int(jobID),
Expand Down Expand Up @@ -1583,7 +1595,6 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize
d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels
d.maybeUpdateDeleteCompactionHints(c)
}

d.clearCompactingState(c, err != nil)
Expand Down Expand Up @@ -1973,8 +1984,16 @@ func (h *deleteCompactionHint) canDelete(
cmp Compare, m *fileMetadata, snapshots compact.Snapshots,
) bool {
// The file can only be deleted if all of its keys are older than the
// earliest tombstone aggregated into the hint.
if m.LargestSeqNum >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
// earliest tombstone aggregated into the hint. Note that we use
// m.LargestSeqNumAbsolute, not m.LargestSeqNum. Consider a compaction that
// zeroes sequence numbers. A compaction may zero the sequence number of a
// key with a sequence number > h.tombstoneSmallestSeqNum and set it to
// zero. If we looked at m.LargestSeqNum, the resulting output file would
// appear to not contain any keys more recent than the oldest tombstone. To
// avoid this error, the largest pre-zeroing sequence number is maintained
// in LargestSeqNumAbsolute and used here to make the determination whether
// the file's keys are older than all of the hint's tombstones.
if m.LargestSeqNumAbsolute >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
return false
}

Expand Down Expand Up @@ -2012,55 +2031,6 @@ func (h *deleteCompactionHint) canDelete(
return cmp(h.start, m.Smallest.UserKey) <= 0 && cmp(m.Largest.UserKey, h.end) < 0
}

func (d *DB) maybeUpdateDeleteCompactionHints(c *compaction) {
// Compactions that zero sequence numbers can interfere with compaction
// deletion hints. Deletion hints apply to tables containing keys older
// than a threshold. If a key more recent than the threshold is zeroed in
// a compaction, a delete-only compaction may mistake it as meeting the
// threshold and drop a table containing live data.
//
// To avoid this scenario, compactions that zero sequence numbers remove
// any conflicting deletion hints. A deletion hint is conflicting if both
// of the following conditions apply:
// * its key space overlaps with the compaction
// * at least one of its inputs contains a key as recent as one of the
// hint's tombstones.
//
if !c.allowedZeroSeqNum {
return
}

updatedHints := d.mu.compact.deletionHints[:0]
for _, h := range d.mu.compact.deletionHints {
// If the compaction's key space is disjoint from the hint's key
// space, the zeroing of sequence numbers won't affect the hint. Keep
// the hint.
keysDisjoint := d.cmp(h.end, c.smallest.UserKey) < 0 || d.cmp(h.start, c.largest.UserKey) > 0
if keysDisjoint {
updatedHints = append(updatedHints, h)
continue
}

// All of the compaction's inputs must be older than the hint's
// tombstones.
inputsOlder := true
for _, in := range c.inputs {
iter := in.files.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
inputsOlder = inputsOlder && f.LargestSeqNum < h.tombstoneSmallestSeqNum
}
}
if inputsOlder {
updatedHints = append(updatedHints, h)
continue
}

// Drop h, because the compaction c may have zeroed sequence numbers
// of keys more recent than some of h's tombstones.
}
d.mu.compact.deletionHints = updatedHints
}

func checkDeleteCompactionHints(
cmp Compare, v *version, hints []deleteCompactionHint, snapshots compact.Snapshots,
) ([]compactionLevel, []deleteCompactionHint) {
Expand Down Expand Up @@ -2248,7 +2218,6 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize
d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels
d.maybeUpdateDeleteCompactionHints(c)
}

// NB: clearing compacting state must occur before updating the read state;
Expand Down Expand Up @@ -2316,14 +2285,15 @@ func (d *DB) runCopyCompaction(
// a new FileNum. This has the potential of making the block cache less
// effective, however.
newMeta := &fileMetadata{
Size: inputMeta.Size,
CreationTime: inputMeta.CreationTime,
SmallestSeqNum: inputMeta.SmallestSeqNum,
LargestSeqNum: inputMeta.LargestSeqNum,
Stats: inputMeta.Stats,
Virtual: inputMeta.Virtual,
SyntheticPrefix: inputMeta.SyntheticPrefix,
SyntheticSuffix: inputMeta.SyntheticSuffix,
Size: inputMeta.Size,
CreationTime: inputMeta.CreationTime,
SmallestSeqNum: inputMeta.SmallestSeqNum,
LargestSeqNum: inputMeta.LargestSeqNum,
LargestSeqNumAbsolute: inputMeta.LargestSeqNumAbsolute,
Stats: inputMeta.Stats,
Virtual: inputMeta.Virtual,
SyntheticPrefix: inputMeta.SyntheticPrefix,
SyntheticSuffix: inputMeta.SyntheticSuffix,
}
if inputMeta.HasPointKeys {
newMeta.ExtendPointKeyBounds(c.cmp, inputMeta.SmallestPointKey, inputMeta.LargestPointKey)
Expand Down Expand Up @@ -2648,6 +2618,7 @@ func (d *DB) runCompaction(
// The table is typically written at the maximum allowable format implied by
// the current format major version of the DB.
tableFormat := formatVers.MaxTableFormat()
inputLargestSeqNumAbsolute := c.inputLargestSeqNumAbsolute()

// In format major versions with maximum table formats of Pebblev3, value
// blocks were conditional on an experimental setting. In format major
Expand Down Expand Up @@ -2785,6 +2756,15 @@ func (d *DB) runCompaction(
meta.Size = writerMeta.Size
meta.SmallestSeqNum = writerMeta.SmallestSeqNum
meta.LargestSeqNum = writerMeta.LargestSeqNum
if c.flushing == nil {
// Set the file's LargestSeqNumAbsolute to be the maximum value of any
// of the compaction's input sstables.
// TODO(jackson): This could be narrowed to be the maximum of input
// sstables that overlap the output sstable's key range.
meta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute
} else {
meta.LargestSeqNumAbsolute = writerMeta.LargestSeqNum
}
meta.InitPhysicalBacking()

// If the file didn't contain any range deletions, we can fill its
Expand Down
14 changes: 10 additions & 4 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func loadVersion(t *testing.T, d *datadriven.TestData) (*version, *Options, stri
key = base.MakeInternalKey([]byte(fmt.Sprintf("%04d", i)), i, InternalKeyKindSet)
}
m := (&fileMetadata{
FileNum: base.FileNum(uint64(level)*100_000 + i),
SmallestSeqNum: key.SeqNum(),
LargestSeqNum: key.SeqNum(),
Size: 1,
FileNum: base.FileNum(uint64(level)*100_000 + i),
SmallestSeqNum: key.SeqNum(),
LargestSeqNum: key.SeqNum(),
LargestSeqNumAbsolute: key.SeqNum(),
Size: 1,
Stats: manifest.TableStats{
RangeDeletionsBytesEstimate: 0,
},
Expand Down Expand Up @@ -391,6 +392,7 @@ func TestCompactionPickerL0(t *testing.T) {
if m.SmallestSeqNum > m.LargestSeqNum {
m.SmallestSeqNum, m.LargestSeqNum = m.LargestSeqNum, m.SmallestSeqNum
}
m.LargestSeqNumAbsolute = m.LargestSeqNum
m.InitPhysicalBacking()
return m, nil
}
Expand Down Expand Up @@ -616,6 +618,7 @@ func TestCompactionPickerConcurrency(t *testing.T) {
}
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.Largest.SeqNum()
return m, nil
}

Expand Down Expand Up @@ -812,6 +815,7 @@ func TestCompactionPickerPickReadTriggered(t *testing.T) {
}
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.Largest.SeqNum()
return m, nil
}

Expand Down Expand Up @@ -973,6 +977,7 @@ func TestPickedCompactionSetupInputs(t *testing.T) {
if m.SmallestSeqNum > m.LargestSeqNum {
m.SmallestSeqNum, m.LargestSeqNum = m.LargestSeqNum, m.SmallestSeqNum
}
m.LargestSeqNumAbsolute = m.LargestSeqNum
m.InitPhysicalBacking()
return m
}
Expand Down Expand Up @@ -1238,6 +1243,7 @@ func TestCompactionOutputFileSize(t *testing.T) {
}
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.LargestSeqNum
return m, nil
}

Expand Down
3 changes: 3 additions & 0 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,7 @@ func TestCompactionFindL0Limit(t *testing.T) {
fileNumCounter++
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.LargestSeqNum

for _, field := range fields[1:] {
parts := strings.Split(field, "=")
Expand Down Expand Up @@ -2229,6 +2230,7 @@ func TestCompactionErrorOnUserKeyOverlap(t *testing.T) {
)
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.LargestSeqNum
m.InitPhysicalBacking()
return m
}
Expand Down Expand Up @@ -2359,6 +2361,7 @@ func TestCompactionCheckOrdering(t *testing.T) {
)
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.LargestSeqNum
m.InitPhysicalBacking()
return m
}
Expand Down
1 change: 1 addition & 0 deletions get_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ func TestGetIter(t *testing.T) {
meta.LargestSeqNum = ikey.SeqNum()
}
}
meta.LargestSeqNumAbsolute = meta.LargestSeqNum
}

files[tt.level] = append(files[tt.level], meta)
Expand Down
19 changes: 11 additions & 8 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@ func setSeqNumInMetadata(m *fileMetadata, seqNum uint64, cmp Compare, format bas
// Properties.GlobalSeqNum when an sstable is loaded.
m.SmallestSeqNum = seqNum
m.LargestSeqNum = seqNum
m.LargestSeqNumAbsolute = seqNum
// Ensure the new bounds are consistent.
if err := m.Validate(cmp, format); err != nil {
return err
Expand Down Expand Up @@ -1951,10 +1952,11 @@ func (d *DB) excise(
FileNum: d.mu.versions.getNextFileNum(),
// Note that these are loose bounds for smallest/largest seqnums, but they're
// sufficient for maintaining correctness.
SmallestSeqNum: m.SmallestSeqNum,
LargestSeqNum: m.LargestSeqNum,
SyntheticPrefix: m.SyntheticPrefix,
SyntheticSuffix: m.SyntheticSuffix,
SmallestSeqNum: m.SmallestSeqNum,
LargestSeqNum: m.LargestSeqNum,
LargestSeqNumAbsolute: m.LargestSeqNumAbsolute,
SyntheticPrefix: m.SyntheticPrefix,
SyntheticSuffix: m.SyntheticSuffix,
}
if m.HasPointKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.SmallestPointKey) {
// This file will probably contain point keys.
Expand Down Expand Up @@ -2047,10 +2049,11 @@ func (d *DB) excise(
FileNum: d.mu.versions.getNextFileNum(),
// Note that these are loose bounds for smallest/largest seqnums, but they're
// sufficient for maintaining correctness.
SmallestSeqNum: m.SmallestSeqNum,
LargestSeqNum: m.LargestSeqNum,
SyntheticPrefix: m.SyntheticPrefix,
SyntheticSuffix: m.SyntheticSuffix,
SmallestSeqNum: m.SmallestSeqNum,
LargestSeqNum: m.LargestSeqNum,
LargestSeqNumAbsolute: m.LargestSeqNumAbsolute,
SyntheticPrefix: m.SyntheticPrefix,
SyntheticSuffix: m.SyntheticSuffix,
}
if m.HasPointKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.LargestPointKey) {
// This file will probably contain point keys
Expand Down
17 changes: 9 additions & 8 deletions internal/keyspan/keyspanimpl/level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,15 @@ func TestLevelIterEquivalence(t *testing.T) {
for k, file := range level {
fileIters = append(fileIters, keyspan.NewIter(base.DefaultComparer.Compare, file))
meta := &manifest.FileMetadata{
FileNum: base.FileNum(k + 1),
Size: 1024,
SmallestSeqNum: 2,
LargestSeqNum: 2,
SmallestRangeKey: base.MakeInternalKey(file[0].Start, file[0].SmallestKey().SeqNum(), file[0].SmallestKey().Kind()),
LargestRangeKey: base.MakeExclusiveSentinelKey(file[len(file)-1].LargestKey().Kind(), file[len(file)-1].End),
HasPointKeys: false,
HasRangeKeys: true,
FileNum: base.FileNum(k + 1),
Size: 1024,
SmallestSeqNum: 2,
LargestSeqNum: 2,
LargestSeqNumAbsolute: 2,
SmallestRangeKey: base.MakeInternalKey(file[0].Start, file[0].SmallestKey().SeqNum(), file[0].SmallestKey().Kind()),
LargestRangeKey: base.MakeExclusiveSentinelKey(file[len(file)-1].LargestKey().Kind(), file[len(file)-1].End),
HasPointKeys: false,
HasRangeKeys: true,
}
meta.InitPhysicalBacking()
meta.ExtendRangeKeyBounds(base.DefaultComparer.Compare, meta.SmallestRangeKey, meta.LargestRangeKey)
Expand Down
10 changes: 6 additions & 4 deletions internal/manifest/l0_sublevels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func TestL0Sublevels(t *testing.T) {
if m.Largest.IsExclusiveSentinel() {
m.LargestSeqNum = m.SmallestSeqNum
}
m.LargestSeqNumAbsolute = m.LargestSeqNum
m.FileNum = base.FileNum(fileNum)
m.Size = uint64(256)
m.InitPhysicalBacking()
Expand Down Expand Up @@ -531,10 +532,11 @@ func TestAddL0FilesEquivalence(t *testing.T) {
continue
}
meta := (&FileMetadata{
FileNum: base.FileNum(i*10 + j + 1),
Size: rng.Uint64n(1 << 20),
SmallestSeqNum: uint64(2*i + 1),
LargestSeqNum: uint64(2*i + 2),
FileNum: base.FileNum(i*10 + j + 1),
Size: rng.Uint64n(1 << 20),
SmallestSeqNum: uint64(2*i + 1),
LargestSeqNum: uint64(2*i + 2),
LargestSeqNumAbsolute: uint64(2*i + 2),
}).ExtendPointKeyBounds(
base.DefaultComparer.Compare,
base.MakeInternalKey(startKey, uint64(2*i+1), base.InternalKeyKindSet),
Expand Down
1 change: 1 addition & 0 deletions internal/manifest/level_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestLevelIterator(t *testing.T) {
)
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.LargestSeqNum
m.InitPhysicalBacking()
files = append(files, m)
}
Expand Down
Loading

0 comments on commit 562ff38

Please sign in to comment.