Skip to content

Commit

Permalink
db: refactor compaction splitting to reduce key comparisons
Browse files Browse the repository at this point in the history
Introduce a new type `frontiers`, designed to monitor several different user
key frontiers during a compaction. When a user key is encountered that equals
or exceeds the configured frontier, the code that specified the frontier is
notified and given an opportunity to set a new frontier. Internally,
`frontiers` uses a heap (code largely copied from the merging iterator's heap)
to avoid N key comparisons for every key.

This commit refactors the `limitFuncSplitter` type to make use of `frontiers`.
The `limitFuncSplitter` type is used to split flushes to L0 flush split keys,
and to split both flushes and compactions to avoid excessive overlap with
grandparent files.

This change is motivated by cockroachdb#2156, which will introduce an additional
compaction-output splitter that must perform key comparisons against the next
key to decide when to split a compaction. Additionally, the `frontiers` type
may also be useful for other uses, such as applying key-space-dependent logic
during a compaction (eg, compaction-time GC, disaggregated storage locality
policies, or keyspan-restricted snapshots cockroachdb#1810).
  • Loading branch information
jbowens committed Feb 3, 2023
1 parent 115cde7 commit fa7f5c1
Show file tree
Hide file tree
Showing 5 changed files with 374 additions and 63 deletions.
117 changes: 61 additions & 56 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,15 +96,15 @@ func (cl compactionLevel) String() string {

// Return output from compactionOutputSplitters. See comment on
// compactionOutputSplitter.shouldSplitBefore() on how this value is used.
type compactionSplitSuggestion int
type maybeSplit int

const (
noSplit compactionSplitSuggestion = iota
noSplit maybeSplit = iota
splitNow
)

// String implements the Stringer interface.
func (c compactionSplitSuggestion) String() string {
func (c maybeSplit) String() string {
if c == noSplit {
return "no-split"
}
Expand All @@ -123,15 +123,15 @@ type compactionOutputSplitter interface {
// means no split is advised. If shouldSplitBefore(a) advises a split then
// shouldSplitBefore(b) should also advise a split given b >= a, until
// onNewOutput is called.
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) compactionSplitSuggestion
shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit
// onNewOutput updates internal splitter state when the compaction switches
// to a new sstable, and returns the next limit for the new output which
// would get used to truncate range tombstones if the compaction iterator
// runs out of keys. The limit returned MUST be > key according to the
// compaction's comparator. The specified key is the first key in the new
// output, or nil if this sstable will only contain range tombstones already
// in the fragmenter.
onNewOutput(key *InternalKey) []byte
onNewOutput(key []byte) []byte
}

// fileSizeSplitter is a compactionOutputSplitter that makes a determination
Expand All @@ -142,9 +142,7 @@ type fileSizeSplitter struct {
maxFileSize uint64
}

func (f *fileSizeSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
func (f *fileSizeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
// The Kind != RangeDelete part exists because EstimatedSize doesn't grow
// rightaway when a range tombstone is added to the fragmenter. It's always
// better to make a sequence of range tombstones visible to the fragmenter.
Expand All @@ -155,55 +153,43 @@ func (f *fileSizeSplitter) shouldSplitBefore(
return noSplit
}

func (f *fileSizeSplitter) onNewOutput(key *InternalKey) []byte {
func (f *fileSizeSplitter) onNewOutput(key []byte) []byte {
return nil
}

func newLimitFuncSplitter(f *frontiers, limitFunc func(userKey []byte) []byte) *limitFuncSplitter {
s := &limitFuncSplitter{limitFunc: limitFunc}
s.frontier.Init(f, nil, s.reached)
return s
}

type limitFuncSplitter struct {
c *compaction
frontier frontier
limitFunc func(userKey []byte) []byte
limit []byte
split maybeSplit
}

func (lf *limitFuncSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
// NB: The limit must be applied using >= since lf.limit may be used as the
// `splitterSuggestion` ultimately passed to `compactionIter.Tombstones` to
// serve as an *exclusive* end boundary truncation point. If we used > then,
// we may have already added a key with the user key `lf.limit` to the
// previous sstable.
if lf.limit != nil && lf.c.cmp(key.UserKey, lf.limit) >= 0 {
return splitNow
}
return noSplit
func (lf *limitFuncSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
return lf.split
}

func (lf *limitFuncSplitter) reached(nextKey []byte) []byte {
lf.split = splitNow
return nil
}

func (lf *limitFuncSplitter) onNewOutput(key *InternalKey) []byte {
lf.limit = nil
func (lf *limitFuncSplitter) onNewOutput(key []byte) []byte {
lf.split = noSplit
if key != nil {
lf.limit = lf.limitFunc(key.UserKey)
} else {
// Use the start key of the first pending tombstone to find the
// next limit. All pending tombstones have the same start key.
// We use this as opposed to the end key of the
// last written sstable to effectively handle cases like these:
//
// a.SET.3
// (lf.limit at b)
// d.RANGEDEL.4:f
//
// In this case, the partition after b has only range deletions,
// so if we were to find the limit after the last written key at
// the split point (key a), we'd get the limit b again, and
// finishOutput() would not advance any further because the next
// range tombstone to write does not start until after the L0
// split point.
if startKey := lf.c.rangeDelFrag.Start(); startKey != nil {
lf.limit = lf.limitFunc(startKey)
}
}
return lf.limit
// TODO(jackson): For some users, like L0 flush splits, there's no need
// to binary search over all the flush splits every time. The next split
// point must be ahead of the previous flush split point.
limit := lf.limitFunc(key)
lf.frontier.Update(limit)
return limit
}
lf.frontier.Update(nil)
return nil
}

// splitterGroup is a compactionOutputSplitter that splits whenever one of its
Expand All @@ -215,7 +201,7 @@ type splitterGroup struct {

func (a *splitterGroup) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) (suggestion compactionSplitSuggestion) {
) (suggestion maybeSplit) {
for _, splitter := range a.splitters {
if splitter.shouldSplitBefore(key, tw) == splitNow {
return splitNow
Expand All @@ -224,7 +210,7 @@ func (a *splitterGroup) shouldSplitBefore(
return noSplit
}

func (a *splitterGroup) onNewOutput(key *InternalKey) []byte {
func (a *splitterGroup) onNewOutput(key []byte) []byte {
var earliestLimit []byte
for _, splitter := range a.splitters {
limit := splitter.onNewOutput(key)
Expand Down Expand Up @@ -252,9 +238,7 @@ type userKeyChangeSplitter struct {
unsafePrevUserKey func() []byte
}

func (u *userKeyChangeSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
func (u *userKeyChangeSplitter) shouldSplitBefore(key *InternalKey, tw *sstable.Writer) maybeSplit {
if split := u.splitter.shouldSplitBefore(key, tw); split != splitNow {
return split
}
Expand All @@ -264,7 +248,7 @@ func (u *userKeyChangeSplitter) shouldSplitBefore(
return noSplit
}

func (u *userKeyChangeSplitter) onNewOutput(key *InternalKey) []byte {
func (u *userKeyChangeSplitter) onNewOutput(key []byte) []byte {
return u.splitter.onNewOutput(key)
}

Expand Down Expand Up @@ -2849,10 +2833,10 @@ func (d *DB) runCompaction(
return c.rangeDelFrag.Start()
},
},
&limitFuncSplitter{c: c, limitFunc: c.findGrandparentLimit},
newLimitFuncSplitter(&iter.frontiers, c.findGrandparentLimit),
}
if splitL0Outputs {
outputSplitters = append(outputSplitters, &limitFuncSplitter{c: c, limitFunc: c.findL0Limit})
outputSplitters = append(outputSplitters, newLimitFuncSplitter(&iter.frontiers, c.findL0Limit))
}
splitter := &splitterGroup{cmp: c.cmp, splitters: outputSplitters}

Expand All @@ -2865,7 +2849,28 @@ func (d *DB) runCompaction(
// progress guarantees ensure that eventually the input iterator will be
// exhausted and the range tombstone fragments will all be flushed.
for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty() || !c.rangeKeyFrag.Empty(); {
splitterSuggestion := splitter.onNewOutput(key)
var firstKey []byte
if key != nil {
firstKey = key.UserKey
} else if startKey := c.rangeDelFrag.Start(); startKey != nil {
// Pass the start key of the first pending tombstone to find the
// next limit. All pending tombstones have the same start key. We
// use this as opposed to the end key of the last written sstable to
// effectively handle cases like these:
//
// a.SET.3
// (lf.limit at b)
// d.RANGEDEL.4:f
//
// In this case, the partition after b has only range deletions, so
// if we were to find the limit after the last written key at the
// split point (key a), we'd get the limit b again, and
// finishOutput() would not advance any further because the next
// range tombstone to write does not start until after the L0 split
// point.
firstKey = startKey
}
splitterSuggestion := splitter.onNewOutput(firstKey)

// Each inner loop iteration processes one key from the input iterator.
for ; key != nil; key, val = iter.Next() {
Expand Down
Loading

0 comments on commit fa7f5c1

Please sign in to comment.