Skip to content

Commit

Permalink
db: track files' pre-zeroing largest sequence numbers
Browse files Browse the repository at this point in the history
When there exist no keys beneath a compaction's key range, Pebble performs
sequence number zeroing. This is an optimization that allows for easier
detection of new user keys during iteration. This commit introduces a new
FileMetadata field LargestSeqNumAbsolute that provides an upper bound on the
sequence numbers of a sstables' keys before they were zeroed. This is useful as
a stable upper bound on the recency of an sstable's keys.

In this commit we use this new upper bound to provide an alternative solution
to the interaction between delete-only compactions and sequence number zeroing.
Previously any compaction that zeroed sequence numbers and overlapped a
delete-only compaction hint was required to clear the conflicting hints to
ensure a delete-only compaction did not accidentally drop a table containing
keys more recent than any of the hint's constituent tombstones. This
interaction was a bit indirect and subtle. Encoding the pre-zeroing sequence
number on the file metadata is more direct, and will allow us to use the
sequence number for recency ordering of sstables' keys more generally,
including in cockroachdb#2112.
  • Loading branch information
jbowens committed May 2, 2024
1 parent 902b6d0 commit d2010d4
Show file tree
Hide file tree
Showing 14 changed files with 211 additions and 178 deletions.
102 changes: 41 additions & 61 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,18 @@ type compaction struct {
pickerMetrics compactionPickerMetrics
}

// inputLargestSeqNumAbsolute returns the maximum LargestSeqNumAbsolute of any
// input sstables.
func (c *compaction) inputLargestSeqNumAbsolute() uint64 {
var seqNum uint64
for _, cl := range c.inputs {
cl.files.Each(func(m *manifest.FileMetadata) {
seqNum = max(seqNum, m.LargestSeqNumAbsolute)
})
}
return seqNum
}

func (c *compaction) makeInfo(jobID JobID) CompactionInfo {
info := CompactionInfo{
JobID: int(jobID),
Expand Down Expand Up @@ -1583,7 +1595,6 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize
d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels
d.maybeUpdateDeleteCompactionHints(c)
}

d.clearCompactingState(c, err != nil)
Expand Down Expand Up @@ -1973,8 +1984,16 @@ func (h *deleteCompactionHint) canDelete(
cmp Compare, m *fileMetadata, snapshots compact.Snapshots,
) bool {
// The file can only be deleted if all of its keys are older than the
// earliest tombstone aggregated into the hint.
if m.LargestSeqNum >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
// earliest tombstone aggregated into the hint. Note that we use
// m.LargestSeqNumAbsolute, not m.LargestSeqNum. Consider a compaction that
// zeroes sequence numbers. A compaction may zero the sequence number of a
// key with a sequence number > h.tombstoneSmallestSeqNum and set it to
// zero. If we looked at m.LargestSeqNum, the resulting output file would
// appear to not contain any keys more recent than the oldest tombstone. To
// avoid this error, the largest pre-zeroing sequence number is maintained
// in LargestSeqNumAbsolute and used here to make the determination whether
// the file's keys are older than all of the hint's tombstones.
if m.LargestSeqNumAbsolute >= h.tombstoneSmallestSeqNum || m.SmallestSeqNum < h.fileSmallestSeqNum {
return false
}

Expand Down Expand Up @@ -2012,55 +2031,6 @@ func (h *deleteCompactionHint) canDelete(
return cmp(h.start, m.Smallest.UserKey) <= 0 && cmp(m.Largest.UserKey, h.end) < 0
}

func (d *DB) maybeUpdateDeleteCompactionHints(c *compaction) {
// Compactions that zero sequence numbers can interfere with compaction
// deletion hints. Deletion hints apply to tables containing keys older
// than a threshold. If a key more recent than the threshold is zeroed in
// a compaction, a delete-only compaction may mistake it as meeting the
// threshold and drop a table containing live data.
//
// To avoid this scenario, compactions that zero sequence numbers remove
// any conflicting deletion hints. A deletion hint is conflicting if both
// of the following conditions apply:
// * its key space overlaps with the compaction
// * at least one of its inputs contains a key as recent as one of the
// hint's tombstones.
//
if !c.allowedZeroSeqNum {
return
}

updatedHints := d.mu.compact.deletionHints[:0]
for _, h := range d.mu.compact.deletionHints {
// If the compaction's key space is disjoint from the hint's key
// space, the zeroing of sequence numbers won't affect the hint. Keep
// the hint.
keysDisjoint := d.cmp(h.end, c.smallest.UserKey) < 0 || d.cmp(h.start, c.largest.UserKey) > 0
if keysDisjoint {
updatedHints = append(updatedHints, h)
continue
}

// All of the compaction's inputs must be older than the hint's
// tombstones.
inputsOlder := true
for _, in := range c.inputs {
iter := in.files.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
inputsOlder = inputsOlder && f.LargestSeqNum < h.tombstoneSmallestSeqNum
}
}
if inputsOlder {
updatedHints = append(updatedHints, h)
continue
}

// Drop h, because the compaction c may have zeroed sequence numbers
// of keys more recent than some of h's tombstones.
}
d.mu.compact.deletionHints = updatedHints
}

func checkDeleteCompactionHints(
cmp Compare, v *version, hints []deleteCompactionHint, snapshots compact.Snapshots,
) ([]compactionLevel, []deleteCompactionHint) {
Expand Down Expand Up @@ -2248,7 +2218,6 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize
d.mu.versions.metrics.Keys.MissizedTombstonesCount += stats.countMissizedDels
d.maybeUpdateDeleteCompactionHints(c)
}

// NB: clearing compacting state must occur before updating the read state;
Expand Down Expand Up @@ -2316,14 +2285,15 @@ func (d *DB) runCopyCompaction(
// a new FileNum. This has the potential of making the block cache less
// effective, however.
newMeta := &fileMetadata{
Size: inputMeta.Size,
CreationTime: inputMeta.CreationTime,
SmallestSeqNum: inputMeta.SmallestSeqNum,
LargestSeqNum: inputMeta.LargestSeqNum,
Stats: inputMeta.Stats,
Virtual: inputMeta.Virtual,
SyntheticPrefix: inputMeta.SyntheticPrefix,
SyntheticSuffix: inputMeta.SyntheticSuffix,
Size: inputMeta.Size,
CreationTime: inputMeta.CreationTime,
SmallestSeqNum: inputMeta.SmallestSeqNum,
LargestSeqNum: inputMeta.LargestSeqNum,
LargestSeqNumAbsolute: inputMeta.LargestSeqNumAbsolute,
Stats: inputMeta.Stats,
Virtual: inputMeta.Virtual,
SyntheticPrefix: inputMeta.SyntheticPrefix,
SyntheticSuffix: inputMeta.SyntheticSuffix,
}
if inputMeta.HasPointKeys {
newMeta.ExtendPointKeyBounds(c.cmp, inputMeta.SmallestPointKey, inputMeta.LargestPointKey)
Expand Down Expand Up @@ -2648,6 +2618,7 @@ func (d *DB) runCompaction(
// The table is typically written at the maximum allowable format implied by
// the current format major version of the DB.
tableFormat := formatVers.MaxTableFormat()
inputLargestSeqNumAbsolute := c.inputLargestSeqNumAbsolute()

// In format major versions with maximum table formats of Pebblev3, value
// blocks were conditional on an experimental setting. In format major
Expand Down Expand Up @@ -2785,6 +2756,15 @@ func (d *DB) runCompaction(
meta.Size = writerMeta.Size
meta.SmallestSeqNum = writerMeta.SmallestSeqNum
meta.LargestSeqNum = writerMeta.LargestSeqNum
if c.flushing == nil {
// Set the file's LargestSeqNumAbsolute to be the maximum value of any
// of the compaction's input sstables.
// TODO(jackson): This could be narrowed to be the maximum of input
// sstables that overlap the output sstable's key range.
meta.LargestSeqNumAbsolute = inputLargestSeqNumAbsolute
} else {
meta.LargestSeqNumAbsolute = writerMeta.LargestSeqNum
}
meta.InitPhysicalBacking()

// If the file didn't contain any range deletions, we can fill its
Expand Down
14 changes: 10 additions & 4 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ func loadVersion(t *testing.T, d *datadriven.TestData) (*version, *Options, stri
key = base.MakeInternalKey([]byte(fmt.Sprintf("%04d", i)), i, InternalKeyKindSet)
}
m := (&fileMetadata{
FileNum: base.FileNum(uint64(level)*100_000 + i),
SmallestSeqNum: key.SeqNum(),
LargestSeqNum: key.SeqNum(),
Size: 1,
FileNum: base.FileNum(uint64(level)*100_000 + i),
SmallestSeqNum: key.SeqNum(),
LargestSeqNum: key.SeqNum(),
LargestSeqNumAbsolute: key.SeqNum(),
Size: 1,
Stats: manifest.TableStats{
RangeDeletionsBytesEstimate: 0,
},
Expand Down Expand Up @@ -391,6 +392,7 @@ func TestCompactionPickerL0(t *testing.T) {
if m.SmallestSeqNum > m.LargestSeqNum {
m.SmallestSeqNum, m.LargestSeqNum = m.LargestSeqNum, m.SmallestSeqNum
}
m.LargestSeqNumAbsolute = m.LargestSeqNum
m.InitPhysicalBacking()
return m, nil
}
Expand Down Expand Up @@ -616,6 +618,7 @@ func TestCompactionPickerConcurrency(t *testing.T) {
}
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.Largest.SeqNum()
return m, nil
}

Expand Down Expand Up @@ -812,6 +815,7 @@ func TestCompactionPickerPickReadTriggered(t *testing.T) {
}
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.Largest.SeqNum()
return m, nil
}

Expand Down Expand Up @@ -973,6 +977,7 @@ func TestPickedCompactionSetupInputs(t *testing.T) {
if m.SmallestSeqNum > m.LargestSeqNum {
m.SmallestSeqNum, m.LargestSeqNum = m.LargestSeqNum, m.SmallestSeqNum
}
m.LargestSeqNumAbsolute = m.LargestSeqNum
m.InitPhysicalBacking()
return m
}
Expand Down Expand Up @@ -1238,6 +1243,7 @@ func TestCompactionOutputFileSize(t *testing.T) {
}
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.LargestSeqNum
return m, nil
}

Expand Down
3 changes: 3 additions & 0 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,7 @@ func TestCompactionFindL0Limit(t *testing.T) {
fileNumCounter++
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.LargestSeqNum

for _, field := range fields[1:] {
parts := strings.Split(field, "=")
Expand Down Expand Up @@ -2229,6 +2230,7 @@ func TestCompactionErrorOnUserKeyOverlap(t *testing.T) {
)
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.LargestSeqNum
m.InitPhysicalBacking()
return m
}
Expand Down Expand Up @@ -2359,6 +2361,7 @@ func TestCompactionCheckOrdering(t *testing.T) {
)
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.LargestSeqNum
m.InitPhysicalBacking()
return m
}
Expand Down
1 change: 1 addition & 0 deletions get_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ func TestGetIter(t *testing.T) {
meta.LargestSeqNum = ikey.SeqNum()
}
}
meta.LargestSeqNumAbsolute = meta.LargestSeqNum
}

files[tt.level] = append(files[tt.level], meta)
Expand Down
19 changes: 11 additions & 8 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@ func setSeqNumInMetadata(m *fileMetadata, seqNum uint64, cmp Compare, format bas
// Properties.GlobalSeqNum when an sstable is loaded.
m.SmallestSeqNum = seqNum
m.LargestSeqNum = seqNum
m.LargestSeqNumAbsolute = seqNum
// Ensure the new bounds are consistent.
if err := m.Validate(cmp, format); err != nil {
return err
Expand Down Expand Up @@ -1951,10 +1952,11 @@ func (d *DB) excise(
FileNum: d.mu.versions.getNextFileNum(),
// Note that these are loose bounds for smallest/largest seqnums, but they're
// sufficient for maintaining correctness.
SmallestSeqNum: m.SmallestSeqNum,
LargestSeqNum: m.LargestSeqNum,
SyntheticPrefix: m.SyntheticPrefix,
SyntheticSuffix: m.SyntheticSuffix,
SmallestSeqNum: m.SmallestSeqNum,
LargestSeqNum: m.LargestSeqNum,
LargestSeqNumAbsolute: m.LargestSeqNumAbsolute,
SyntheticPrefix: m.SyntheticPrefix,
SyntheticSuffix: m.SyntheticSuffix,
}
if m.HasPointKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.SmallestPointKey) {
// This file will probably contain point keys.
Expand Down Expand Up @@ -2047,10 +2049,11 @@ func (d *DB) excise(
FileNum: d.mu.versions.getNextFileNum(),
// Note that these are loose bounds for smallest/largest seqnums, but they're
// sufficient for maintaining correctness.
SmallestSeqNum: m.SmallestSeqNum,
LargestSeqNum: m.LargestSeqNum,
SyntheticPrefix: m.SyntheticPrefix,
SyntheticSuffix: m.SyntheticSuffix,
SmallestSeqNum: m.SmallestSeqNum,
LargestSeqNum: m.LargestSeqNum,
LargestSeqNumAbsolute: m.LargestSeqNumAbsolute,
SyntheticPrefix: m.SyntheticPrefix,
SyntheticSuffix: m.SyntheticSuffix,
}
if m.HasPointKeys && !exciseSpan.ContainsInternalKey(d.cmp, m.LargestPointKey) {
// This file will probably contain point keys
Expand Down
17 changes: 9 additions & 8 deletions internal/keyspan/keyspanimpl/level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,14 +286,15 @@ func TestLevelIterEquivalence(t *testing.T) {
for k, file := range level {
fileIters = append(fileIters, keyspan.NewIter(base.DefaultComparer.Compare, file))
meta := &manifest.FileMetadata{
FileNum: base.FileNum(k + 1),
Size: 1024,
SmallestSeqNum: 2,
LargestSeqNum: 2,
SmallestRangeKey: base.MakeInternalKey(file[0].Start, file[0].SmallestKey().SeqNum(), file[0].SmallestKey().Kind()),
LargestRangeKey: base.MakeExclusiveSentinelKey(file[len(file)-1].LargestKey().Kind(), file[len(file)-1].End),
HasPointKeys: false,
HasRangeKeys: true,
FileNum: base.FileNum(k + 1),
Size: 1024,
SmallestSeqNum: 2,
LargestSeqNum: 2,
LargestSeqNumAbsolute: 2,
SmallestRangeKey: base.MakeInternalKey(file[0].Start, file[0].SmallestKey().SeqNum(), file[0].SmallestKey().Kind()),
LargestRangeKey: base.MakeExclusiveSentinelKey(file[len(file)-1].LargestKey().Kind(), file[len(file)-1].End),
HasPointKeys: false,
HasRangeKeys: true,
}
meta.InitPhysicalBacking()
meta.ExtendRangeKeyBounds(base.DefaultComparer.Compare, meta.SmallestRangeKey, meta.LargestRangeKey)
Expand Down
10 changes: 6 additions & 4 deletions internal/manifest/l0_sublevels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func TestL0Sublevels(t *testing.T) {
if m.Largest.IsExclusiveSentinel() {
m.LargestSeqNum = m.SmallestSeqNum
}
m.LargestSeqNumAbsolute = m.LargestSeqNum
m.FileNum = base.FileNum(fileNum)
m.Size = uint64(256)
m.InitPhysicalBacking()
Expand Down Expand Up @@ -531,10 +532,11 @@ func TestAddL0FilesEquivalence(t *testing.T) {
continue
}
meta := (&FileMetadata{
FileNum: base.FileNum(i*10 + j + 1),
Size: rng.Uint64n(1 << 20),
SmallestSeqNum: uint64(2*i + 1),
LargestSeqNum: uint64(2*i + 2),
FileNum: base.FileNum(i*10 + j + 1),
Size: rng.Uint64n(1 << 20),
SmallestSeqNum: uint64(2*i + 1),
LargestSeqNum: uint64(2*i + 2),
LargestSeqNumAbsolute: uint64(2*i + 2),
}).ExtendPointKeyBounds(
base.DefaultComparer.Compare,
base.MakeInternalKey(startKey, uint64(2*i+1), base.InternalKeyKindSet),
Expand Down
1 change: 1 addition & 0 deletions internal/manifest/level_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func TestLevelIterator(t *testing.T) {
)
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.LargestSeqNumAbsolute = m.LargestSeqNum
m.InitPhysicalBacking()
files = append(files, m)
}
Expand Down
Loading

0 comments on commit d2010d4

Please sign in to comment.