Skip to content

Commit

Permalink
compaction: remove nonzero-seqnum splitting logic
Browse files Browse the repository at this point in the history
Remove the `nonZeroSeqNumSplitter`. The logic preventing splitting at
nonzero-seqnum keys was responsible for the creation of arbitrarily
large files (cockroachdb#1181). The condition that the `nonZeroSeqNumSplitter`
guards against is no longer possible.

Fix cockroachdb#1181.
  • Loading branch information
jbowens committed Jul 12, 2021
1 parent 4fcf409 commit 59f79a5
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 297 deletions.
236 changes: 90 additions & 146 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (a *splitterGroup) onNewOutput(key *InternalKey) []byte {
// the compaction output is at the boundary between two user keys (also
// the boundary between atomic compaction units). Use this splitter to wrap
// any splitters that don't guarantee user key splits (i.e. splitters that make
// their determinatino in ways other than comparing the current key against a
// their determination in ways other than comparing the current key against a
// limit key.
type userKeyChangeSplitter struct {
cmp Compare
Expand Down Expand Up @@ -344,49 +344,6 @@ func (u *userKeyChangeSplitter) onNewOutput(key *InternalKey) []byte {
return u.splitter.onNewOutput(key)
}

// nonZeroSeqNumSplitter is a compactionOutputSplitter that takes in a child
// splitter, and advises a split when 1) that child splitter advises a split,
// and 2) the compaction output is at a point where the previous point sequence
// number is nonzero.
type nonZeroSeqNumSplitter struct {
c *compaction
splitter compactionOutputSplitter
prevPointSeqNum uint64
splitOnNonZeroSeqNum bool
}

func (n *nonZeroSeqNumSplitter) shouldSplitBefore(
key *InternalKey, tw *sstable.Writer,
) compactionSplitSuggestion {
curSeqNum := key.SeqNum()
keyKind := key.Kind()
prevPointSeqNum := n.prevPointSeqNum
if keyKind != InternalKeyKindRangeDelete {
n.prevPointSeqNum = curSeqNum
}

if n.splitOnNonZeroSeqNum {
if prevPointSeqNum > 0 || n.c.rangeDelFrag.Empty() {
n.splitOnNonZeroSeqNum = false
return splitNow
}
} else if split := n.splitter.shouldSplitBefore(key, tw); split == splitNow {
userKeyChange := curSeqNum > prevPointSeqNum
if prevPointSeqNum > 0 || n.c.rangeDelFrag.Empty() || userKeyChange {
return splitNow
}
n.splitOnNonZeroSeqNum = true
return splitSoon
}
return noSplit
}

func (n *nonZeroSeqNumSplitter) onNewOutput(key *InternalKey) []byte {
n.prevPointSeqNum = InternalKeySeqNumMax
n.splitOnNonZeroSeqNum = false
return n.splitter.onNewOutput(key)
}

// compactionFile is a vfs.File wrapper that, on every write, updates a metric
// in `versions` on bytes written by in-progress compactions so far. It also
// increments a per-compaction `written` int.
Expand Down Expand Up @@ -951,7 +908,7 @@ func (c *compaction) errorOnUserKeyOverlap(ve *versionEdit) error {
// snapshots requiring them to be kept. It performs this determination by
// looking for an sstable which overlaps the bounds of the compaction at a
// lower level in the LSM.
func (c *compaction) allowZeroSeqNum(iter internalIterator) bool {
func (c *compaction) allowZeroSeqNum() bool {
return c.elideRangeTombstone(c.smallest.UserKey, c.largest.UserKey)
}

Expand Down Expand Up @@ -2060,7 +2017,7 @@ func (d *DB) runCompaction(
if err != nil {
return nil, pendingOutputs, err
}
c.allowedZeroSeqNum = c.allowZeroSeqNum(iiter)
c.allowedZeroSeqNum = c.allowZeroSeqNum()
iter := newCompactionIter(c.cmp, c.formatKey, d.merge, iiter, snapshots,
&c.rangeDelFrag, c.allowedZeroSeqNum, c.elideTombstone, c.elideRangeTombstone)

Expand Down Expand Up @@ -2149,14 +2106,15 @@ func (d *DB) runCompaction(

splitL0Outputs := c.outputLevel.level == 0 && d.opts.FlushSplitBytes > 0

// finishOutput is called for an sstable with the first key of the next sstable, and for the
// last sstable with an empty key.
finishOutput := func(key []byte) error {
// finishOutput is called with the a user key up to which all tombstones
// should be flushed. Typically, this is the first key of the next
// sstable or an empty key if this output is the final sstable.
finishOutput := func(splitKey []byte) error {
// If we haven't output any point records to the sstable (tw == nil)
// then the sstable will only contain range tombstones. The smallest
// key in the sstable will be the start key of the first range
// tombstone added. We need to ensure that this start key is distinct
// from the limit (key) passed to finishOutput (if set), otherwise we
// from the splitKey passed to finishOutput (if set), otherwise we
// would generate an sstable where the largest key is smaller than the
// smallest key due to how the largest key boundary is set below.
// NB: It is permissible for the range tombstone start key to be the
Expand All @@ -2169,15 +2127,15 @@ func (d *DB) runCompaction(
if len(iter.tombstones) > 0 {
startKey = iter.tombstones[0].Start.UserKey
}
if key != nil && d.cmp(startKey, key) == 0 {
if splitKey != nil && d.cmp(startKey, splitKey) == 0 {
return nil
}
}

// NB: clone the key because the data can be held on to by the call to
// compactionIter.Tombstones via rangedel.Fragmenter.FlushTo.
key = append([]byte(nil), key...)
for _, v := range iter.Tombstones(key, splitL0Outputs) {
splitKey = append([]byte(nil), splitKey...)
for _, v := range iter.Tombstones(splitKey, splitL0Outputs) {
if tw == nil {
if err := newOutput(); err != nil {
return err
Expand Down Expand Up @@ -2294,7 +2252,7 @@ func (d *DB) runCompaction(
}
}

if key != nil && writerMeta.LargestRange.UserKey != nil {
if splitKey != nil && writerMeta.LargestRange.UserKey != nil {
// The current file is not the last output file and there is a range tombstone in it.
// If the tombstone extends into the next file, then truncate it for the purposes of
// computing meta.Largest. For example, say the next file's first key is c#7,1 and the
Expand All @@ -2303,8 +2261,8 @@ func (d *DB) runCompaction(
// c#inf where inf is the InternalKeyRangeDeleteSentinel. Note that this is just for
// purposes of bounds computation -- the current sstable will end up with a Largest key
// of c#7,1 so the range tombstone in the current file will be able to delete c#7.
if d.cmp(writerMeta.LargestRange.UserKey, key) >= 0 {
writerMeta.LargestRange.UserKey = key
if d.cmp(writerMeta.LargestRange.UserKey, splitKey) >= 0 {
writerMeta.LargestRange.UserKey = splitKey
writerMeta.LargestRange.Trailer = InternalKeyRangeDeleteSentinel
}
}
Expand Down Expand Up @@ -2365,27 +2323,11 @@ func (d *DB) runCompaction(
// we start off with splitters for file sizes, grandparent limits, and (for
// L0 splits) L0 limits, before wrapping them in an splitterGroup.
//
// There is a complication here: we can't split outputs where the largest
// key on the left side has a seqnum of zero. This limitation
// exists because a range tombstone which extends into the next sstable
// will cause the smallest key for the next sstable to have the same user
// key, but we need the two tables to be disjoint in key space. Consider
// the scenario:
//
// a#RANGEDEL-c,3 b#SET,0
//
// If b#SET,0 is the last key added to an sstable, the range tombstone
// [b-c)#3 will extend into the next sstable. The boundary generation
// code in finishOutput() will compute the smallest key for that sstable
// as b#RANGEDEL,3 which sorts before b#SET,0. Normally we just adjust
// the seqnum of this key, but that isn't possible for seqnum 0. To ensure
// we only split where the previous point key has a zero seqnum, we wrap
// our splitters with a nonZeroSeqNumSplitter.
//
// Another case where we may not be able to switch SSTables right away is
// when we are splitting an L0 output. We do not split the same user key
// across different sstables within one flush, so the userKeyChangeSplitter
// ensures we are at a user key change boundary when doing a split.
// There is a complication here: We may not be able to switch SSTables
// right away is when we are splitting an L0 output. We do not split the
// same user key across different sstables within one flush, so the
// userKeyChangeSplitter ensures we are at a user key change boundary when
// doing a split.
outputSplitters := []compactionOutputSplitter{
&fileSizeSplitter{maxFileSize: c.maxOutputFileSize},
&grandparentLimitSplitter{c: c, ve: ve},
Expand All @@ -2405,18 +2347,6 @@ func (d *DB) runCompaction(
cmp: c.cmp,
splitters: outputSplitters,
}
// Compactions to L0 don't need nonzero last-point-key seqnums at split
// boundaries because when writing to L0, we are able to guarantee that
// the end key of tombstones will also be truncated (through the
// TruncateAndFlushTo call), and no user keys will
// be split between sstables. So a nonZeroSeqNumSplitter is unnecessary
// in that case.
if !splitL0Outputs {
splitter = &nonZeroSeqNumSplitter{
c: c,
splitter: splitter,
}
}

// NB: we avoid calling maybeThrottle on a nilPacer because the cost of
// dynamic dispatch in the hot loop below is pronounced in CPU profiles (see
Expand All @@ -2435,36 +2365,22 @@ func (d *DB) runCompaction(
// progress guarantees ensure that eventually the input iterator will be
// exhausted and the range tombstone fragments will all be flushed.
for key, val := iter.First(); key != nil || !c.rangeDelFrag.Empty(); {
limit := splitter.onNewOutput(key)
splitterSuggestion := splitter.onNewOutput(key)

// Each inner loop iteration processes one key from the input iterator.
prevPointSeqNum := InternalKeySeqNumMax
for ; key != nil; key, val = iter.Next() {
if split := splitter.shouldSplitBefore(key, tw); split != noSplit {
if split == splitNow {
limit = key.UserKey
if splitL0Outputs {
// Flush all tombstones up until key.UserKey, and
// truncate them at that key.
//
// The fragmenter could save the passed-in key. As this
// key could live beyond the write into the current
// sstable output file, make a copy.
c.rangeDelFrag.TruncateAndFlushTo(key.Clone().UserKey)
}
break
if split := splitter.shouldSplitBefore(key, tw); split == splitNow {
if splitL0Outputs {
// Flush all tombstones up until key.UserKey, and
// truncate them at that key.
//
// The fragmenter could save the passed-in key. As this
// key could live beyond the write into the current
// sstable output file, make a copy.
c.rangeDelFrag.TruncateAndFlushTo(key.Clone().UserKey)
}
// split == splitSoon
//
// Invalidate the limit here. It has probably been exceeded
// by the current key, but we can't split just yet, such as to
// maintain the nonzero sequence number invariant mentioned
// above. Setting limit to nil is okay as it's just a transient
// setting, as when split eventually equals splitNow, we will
// set the limit to the key after that. If the compaction were
// to run out of keys before we get to that point, limit would
// be nil as it should be for all end-of-compaction cases.
limit = nil
break
}

atomic.StoreUint64(c.atomicBytesIterated, c.bytesIterated)
Expand Down Expand Up @@ -2492,49 +2408,77 @@ func (d *DB) runCompaction(
prevPointSeqNum = key.SeqNum()
}

// A splitter requested a split, and we're ready to finish the output.
// We need to choose the key at which to split any pending range
// tombstones.
//
// There's a complication here. We need to ensure that for a user key
// k we never end up with one output's largest key as k#0 and the
// next output's smallest key as k#RANGEDEL,#x where x > 0. This is a
// problem because k#RANGEDEL,#x sorts before k#0. Normally, we just
// adjust the seqnum of the next output's smallest boundary to be
// less, but that's not possible with the zero seqnum. We can avoid
// this case with careful picking of where to split pending range
// tombstones.
var splitKey []byte
switch {
case key == nil && prevPointSeqNum == 0 && !c.rangeDelFrag.Empty():
// We ran out of keys and the last key added to the sstable has a zero
// seqnum and there are buffered range tombstones, so we're unable to use
// the grandparent/flush limit for the sstable boundary. See the example in the
// in the loop above with range tombstones straddling sstables. Setting
// limit to nil ensures that we flush the entirety of the rangedel
// fragmenter when writing the last output.
limit = nil
case key == nil && splitL0Outputs && !c.rangeDelFrag.Empty():
// We ran out of keys with flush splits enabled, and have remaining
// buffered range tombstones. Set limit to nil so all range
// tombstones get flushed in the current sstable. Consider this
// example:
case key != nil:
// We hit the size, grandparent, or L0 limit for the sstable.
// The next key either has a greater user key than the previous
// key, or if not, the previous key must not have had a zero
// sequence number.

// TODO(jackson): If we hit the grandparent limit, the next
// grandparent's smallest key may be less than the current key.
// Splitting at the current key will cause this output to overlap
// a potentially unbounded number of grandparents.
splitKey = key.UserKey
case key == nil && splitL0Outputs:
// We ran out of keys with flush splits enabled. Set splitKey to
// nil so all range tombstones get flushed in the current sstable.
// Consider this example:
//
// a.SET.4
// d.MERGE.5
// d.RANGEDEL.3:f
// (no more keys remaining)
//
// Where d is a flush split key (i.e. limit = 'd'). Since d.MERGE.5
// has already been written to this output by this point (as it's
// <= limit), and flushes cannot have user keys split across
// multiple sstables, we have to set limit to a key greater than
// 'd' to ensure the range deletion also gets flushed. Setting
// the limit to nil is the simplest way to ensure that.
limit = nil
case key == nil /* && (prevPointSeqNum != 0 || c.rangeDelFrag.Empty()) */ :
// We ran out of keys. Because of the previous case, either rangeDelFrag
// is empty or the last record added to the sstable has a non-zero
// seqnum. If the rangeDelFragmenter is empty we have no concerns as
// there won't be another sstable generated by this compaction and the
// current limit is fine (it won't apply). Otherwise, if the last key
// added to the sstable had a non-zero seqnum we're also in the clear as
// we can decrement that seqnum to create a boundary key for the next
// sstable (if we end up generating a next sstable).
case key != nil:
// We either hit the size, grandparent, or L0 limit for the sstable.
// Where d is a flush split key (i.e. splitterSuggestion = 'd').
// Since d.MERGE.5 has already been written to this output by this
// point (as it's <= splitterSuggestion), and flushes cannot have
// user keys split across multiple sstables, we have to set
// splitKey to a key greater than 'd' to ensure the range deletion
// also gets flushed. Setting the splitKey to nil is the simplest
// way to ensure that.
//
// TODO(jackson): This case is only problematic if splitKey equals
// the user key of the last point key added. We don't need to
// flush *all* sstables to the current sstable. We could flush up
// to the next grandparent limit greater than `splitterSuggestion`
// instead.
splitKey = nil
case key == nil && prevPointSeqNum != 0:
// The last key added did not have a zero sequence number, so
// we'll always be able to adjust the next table's smallest key.
splitKey = splitterSuggestion
case key == nil && prevPointSeqNum == 0:
// The last key added DID have a zero sequence number. The
// splitters' suggested split point might have the same user key,
// which would cause the next output to have an unadjustable
// smallest key. To prevent that, we ignore the splitter's
// suggestion, leaving splitKey nil to flush all pending range
// tombstones.
// TODO(jackson): This case is only problematic if splitKey equals
// the user key of the last point key added. We don't need to
// flush *all* sstables to the current sstable. We could flush up
// to the next grandparent limit greater than `splitterSuggestion`
// instead.
splitKey = nil
default:
return nil, nil, errors.New("pebble: not reached")
}

if err := finishOutput(limit); err != nil {
if err := finishOutput(splitKey); err != nil {
return nil, pendingOutputs, err
}
}
Expand Down
Loading

0 comments on commit 59f79a5

Please sign in to comment.