Skip to content

Commit

Permalink
compact: move interleaving iterators to compact.Iter
Browse files Browse the repository at this point in the history
During compaction, we obtain rangedel and rangekey spans by querying
the interleaving iterators that are used as input to `compact.Iter`.
To reduce this tight coupling, we move the interleaving iterators to
`compact.Iter`. We now pass the point/rangedel/rangekey iterators
separately and we use new `compact.Iter` methods to obtain the
relevant spans.
  • Loading branch information
RaduBerinde committed May 1, 2024
1 parent 426803e commit 902b6d0
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 88 deletions.
53 changes: 24 additions & 29 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/compact"
"github.com/cockroachdb/pebble/internal/invalidating"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/keyspan/keyspanimpl"
Expand Down Expand Up @@ -239,12 +238,6 @@ type compaction struct {
smallest InternalKey
largest InternalKey

// rangeDelInterlaving is an interleaving iterator for range deletions, that
// interleaves range tombstones among the point keys.
rangeDelInterleaving keyspan.InterleavingIter
// rangeKeyInterleaving is the interleaving iter for range keys.
rangeKeyInterleaving keyspan.InterleavingIter

// A list of objects to close when the compaction finishes. Used by input
// iteration to keep rangeDelIters open for the lifetime of the compaction,
// and only close them when the compaction finishes.
Expand Down Expand Up @@ -699,22 +692,27 @@ func (c *compaction) allowZeroSeqNum() bool {
return len(c.flushing) == 0 && c.delElision.ElidesEverything() && c.rangeKeyElision.ElidesEverything()
}

func (c *compaction) newInputIter(
// newInputIters returns an iterator over all the input tables in a compaction.
func (c *compaction) newInputIters(
newIters tableNewIters, newRangeKeyIter keyspanimpl.TableNewSpanIter,
) (_ internalIterator, retErr error) {
) (
pointIter internalIterator,
rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
retErr error,
) {
// Validate the ordering of compaction input files for defense in depth.
if len(c.flushing) == 0 {
if c.startLevel.level >= 0 {
err := manifest.CheckOrdering(c.cmp, c.formatKey,
manifest.Level(c.startLevel.level), c.startLevel.files.Iter())
if err != nil {
return nil, err
return nil, nil, nil, err
}
}
err := manifest.CheckOrdering(c.cmp, c.formatKey,
manifest.Level(c.outputLevel.level), c.outputLevel.files.Iter())
if err != nil {
return nil, err
return nil, nil, nil, err
}
if c.startLevel.level == 0 {
if c.startLevel.l0SublevelInfo == nil {
Expand All @@ -724,7 +722,7 @@ func (c *compaction) newInputIter(
err := manifest.CheckOrdering(c.cmp, c.formatKey,
info.sublevel, info.Iter())
if err != nil {
return nil, err
return nil, nil, nil, err
}
}
}
Expand All @@ -736,7 +734,7 @@ func (c *compaction) newInputIter(
err := manifest.CheckOrdering(c.cmp, c.formatKey,
manifest.Level(interLevel.level), interLevel.files.Iter())
if err != nil {
return nil, err
return nil, nil, nil, err
}
}
}
Expand Down Expand Up @@ -901,13 +899,13 @@ func (c *compaction) newInputIter(
for _, info := range c.startLevel.l0SublevelInfo {
sublevelCompactionLevel := &compactionLevel{0, info.LevelSlice, nil}
if err := addItersForLevel(sublevelCompactionLevel, info.sublevel); err != nil {
return nil, err
return nil, nil, nil, err
}
}
continue
}
if err := addItersForLevel(&c.inputs[i], manifest.Level(c.inputs[i].level)); err != nil {
return nil, err
return nil, nil, nil, err
}
}
}
Expand All @@ -916,9 +914,9 @@ func (c *compaction) newInputIter(
// of a *mergingIter. This is possible, for example, when performing a flush
// of a single memtable. Otherwise, combine all the iterators into a merging
// iter.
iter := iters[0]
pointIter = iters[0]
if len(iters) > 1 {
iter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
pointIter = newMergingIter(c.logger, &c.stats, c.cmp, nil, iters...)
}

// In normal operation, levelIter iterates over the point operations in a
Expand All @@ -937,21 +935,20 @@ func (c *compaction) newInputIter(
if len(rangeDelIters) > 0 {
mi := &keyspanimpl.MergingIter{}
mi.Init(c.cmp, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeDelIters...)
c.rangeDelInterleaving.Init(c.comparer, iter, mi, keyspan.InterleavingIterOpts{})
iter = &c.rangeDelInterleaving
rangeDelIter = mi
}

// If there are range key iterators, we need to combine them using
// keyspanimpl.MergingIter, and then interleave them among the points.
if len(rangeKeyIters) > 0 {
mi := &keyspanimpl.MergingIter{}
mi.Init(c.cmp, keyspan.NoopTransform, new(keyspanimpl.MergingBuffers), rangeKeyIters...)
// TODO(radu): why do we have a defragmenter here but not above?
di := &keyspan.DefragmentingIter{}
di.Init(c.comparer, mi, keyspan.DefragmentInternal, keyspan.StaticDefragmentReducer, new(keyspan.DefragmentingBuffers))
c.rangeKeyInterleaving.Init(c.comparer, iter, di, keyspan.InterleavingIterOpts{})
iter = &c.rangeKeyInterleaving
rangeKeyIter = di
}
return iter, nil
return pointIter, rangeDelIter, rangeKeyIter, nil
}

func (c *compaction) newRangeDelIter(
Expand Down Expand Up @@ -2580,15 +2577,13 @@ func (d *DB) runCompaction(
c.bufferPool.Init(12)
defer c.bufferPool.Release()

iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter)
pointIter, rangeDelIter, rangeKeyIter, err := c.newInputIters(d.newIters, d.tableNewRangeKeyIter)
if err != nil {
return nil, pendingOutputs, stats, err
}
c.allowedZeroSeqNum = c.allowZeroSeqNum()
iiter = invalidating.MaybeWrapIfInvariants(iiter)
cfg := compact.IterConfig{
Cmp: c.cmp,
Equal: c.equal,
Comparer: c.comparer,
Merge: d.merge,
TombstoneElision: c.delElision,
RangeKeyElision: c.rangeKeyElision,
Expand All @@ -2597,7 +2592,7 @@ func (d *DB) runCompaction(
IneffectualSingleDeleteCallback: d.opts.Experimental.IneffectualSingleDeleteCallback,
SingleDeleteInvariantViolationCallback: d.opts.Experimental.SingleDeleteInvariantViolationCallback,
}
iter := compact.NewIter(cfg, iiter)
iter := compact.NewIter(cfg, pointIter, rangeDelIter, rangeKeyIter)

var (
createdFiles []base.DiskFileNum
Expand Down Expand Up @@ -2943,14 +2938,14 @@ func (d *DB) runCompaction(
// Since the keys' Suffix and Value fields are not deep cloned, the
// underlying blockIter must be kept open for the lifetime of the
// compaction.
iter.AddTombstoneSpan(c.rangeDelInterleaving.Span())
iter.AddTombstoneSpan(iter.RangeDelSpan())
continue
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
// Range keys are handled in the same way as range tombstones.
// Since the keys' Suffix and Value fields are not deep cloned, the
// underlying blockIter must be kept open for the lifetime of the
// compaction.
iter.AddRangeKeySpan(c.rangeKeyInterleaving.Span())
iter.AddRangeKeySpan(iter.RangeKeySpan())
continue
}
if tw == nil {
Expand Down
2 changes: 1 addition & 1 deletion compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2457,7 +2457,7 @@ func TestCompactionCheckOrdering(t *testing.T) {
return iterSet{point: &errorIter{}}, nil
}
result := "OK"
_, err := c.newInputIter(newIters, nil)
_, _, _, err := c.newInputIters(newIters, nil)
if err != nil {
result = fmt.Sprint(err)
}
Expand Down
72 changes: 57 additions & 15 deletions internal/compact/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/bytealloc"
"github.com/cockroachdb/pebble/internal/invalidating"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/rangekey"
Expand Down Expand Up @@ -154,7 +155,16 @@ type Iter struct {

cfg IterConfig

iter base.InternalIterator
// rangeDelInterlaving is an interleaving iterator for range deletions, that
// interleaves range tombstones among the point keys.
rangeDelInterleaving keyspan.InterleavingIter
// rangeKeyInterleaving is the interleaving iter for range keys.
rangeKeyInterleaving keyspan.InterleavingIter

// iter is the iterator which interleaves points with RANGEDELs and range
// keys.
iter base.InternalIterator

delElider pointTombstoneElider
rangeDelElider rangeTombstoneElider
rangeKeyElider rangeTombstoneElider
Expand Down Expand Up @@ -241,9 +251,8 @@ type Iter struct {

// IterConfig contains the parameters necessary to create a compaction iterator.
type IterConfig struct {
Cmp base.Compare
Equal base.Equal
Merge base.Merge
Comparer *base.Comparer
Merge base.Merge

// The snapshot sequence numbers that need to be maintained. These sequence
// numbers define the snapshot stripes.
Expand Down Expand Up @@ -290,17 +299,33 @@ const (

// NewIter creates a new compaction iterator. See the comment for Iter for a
// detailed description.
func NewIter(cfg IterConfig, iter base.InternalIterator) *Iter {
// rangeDelIter and rangeKeyIter can be nil.
func NewIter(
cfg IterConfig,
pointIter base.InternalIterator,
rangeDelIter, rangeKeyIter keyspan.FragmentIterator,
) *Iter {
cfg.ensureDefaults()
i := &Iter{
cmp: cfg.Cmp,
cfg: cfg,
iter: iter,
cmp: cfg.Comparer.Compare,
cfg: cfg,
}

iter := pointIter
if rangeDelIter != nil {
i.rangeDelInterleaving.Init(cfg.Comparer, iter, rangeDelIter, keyspan.InterleavingIterOpts{})
iter = &i.rangeDelInterleaving
}
if rangeKeyIter != nil {
i.rangeKeyInterleaving.Init(cfg.Comparer, iter, rangeKeyIter, keyspan.InterleavingIterOpts{})
iter = &i.rangeKeyInterleaving
}
i.frontiers.Init(cfg.Cmp)
i.delElider.Init(cfg.Cmp, cfg.TombstoneElision)
i.rangeDelElider.Init(cfg.Cmp, cfg.TombstoneElision)
i.rangeKeyElider.Init(cfg.Cmp, cfg.RangeKeyElision)
i.iter = invalidating.MaybeWrapIfInvariants(iter)

i.frontiers.Init(i.cmp)
i.delElider.Init(i.cmp, cfg.TombstoneElision)
i.rangeDelElider.Init(i.cmp, cfg.TombstoneElision)
i.rangeKeyElider.Init(i.cmp, cfg.RangeKeyElision)
return i
}

Expand Down Expand Up @@ -348,6 +373,9 @@ func (i *Iter) First() (*base.InternalKey, []byte) {
}

// Next has the same semantics as InternalIterator.Next.
// Note that when Next returns a RANGEDEL key, the caller can call
// RangeDelSpan() to get the corresponding span. Similarly, when Next returns a
// range key, the caller can use RangeKeySpan().
func (i *Iter) Next() (*base.InternalKey, []byte) {
if i.err != nil {
return nil, nil
Expand Down Expand Up @@ -581,6 +609,20 @@ func (i *Iter) Next() (*base.InternalKey, []byte) {
return nil, nil
}

// RangeDelSpan returns the range deletion span corresponding to the current
// key. Can only be called right after a Next() call that returned a RANGEDEL
// key.
func (i *Iter) RangeDelSpan() *keyspan.Span {
return i.rangeDelInterleaving.Span()
}

// RangeKeySpan returns the range deletion span corresponding to the current
// key. Can only be called right after a Next() call that returned a range
// key.
func (i *Iter) RangeKeySpan() *keyspan.Span {
return i.rangeKeyInterleaving.Span()
}

func (i *Iter) closeValueCloser() error {
if i.valueCloser == nil {
return nil
Expand Down Expand Up @@ -670,7 +712,7 @@ func (i *Iter) nextInStripeHelper() stripeChangeType {
// number ordering within a user key. If the previous key was one
// of these keys, we consider the new key a `newStripeNewKey` to
// reflect that it's the beginning of a new stream of point keys.
if i.key.IsExclusiveSentinel() || !i.cfg.Equal(i.key.UserKey, kv.K.UserKey) {
if i.key.IsExclusiveSentinel() || !i.cfg.Comparer.Equal(i.key.UserKey, kv.K.UserKey) {
i.curSnapshotIdx, i.curSnapshotSeqNum = i.cfg.Snapshots.IndexAndSeqNum(kv.SeqNum())
return newStripeNewKey
}
Expand Down Expand Up @@ -1421,7 +1463,7 @@ func (i *Iter) RangeKeysUpTo(key []byte) []keyspan.Span {
}
if y > start {
keysDst := dst.Keys[usedLen:cap(dst.Keys)]
rangekey.Coalesce(i.cmp, i.cfg.Equal, s.Keys[start:y], &keysDst)
rangekey.Coalesce(i.cmp, i.cfg.Comparer.Equal, s.Keys[start:y], &keysDst)
if y == len(s.Keys) {
// This is the last snapshot stripe. Unsets and deletes can be elided.
keysDst = elideInLastStripe(keysDst)
Expand All @@ -1433,7 +1475,7 @@ func (i *Iter) RangeKeysUpTo(key []byte) []keyspan.Span {
}
if y < len(s.Keys) {
keysDst := dst.Keys[usedLen:cap(dst.Keys)]
rangekey.Coalesce(i.cmp, i.cfg.Equal, s.Keys[y:], &keysDst)
rangekey.Coalesce(i.cmp, i.cfg.Comparer.Equal, s.Keys[y:], &keysDst)
keysDst = elideInLastStripe(keysDst)
usedLen += len(keysDst)
dst.Keys = append(dst.Keys, keysDst...)
Expand Down
Loading

0 comments on commit 902b6d0

Please sign in to comment.