Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: optimize levelIter for non-matching bloom filter #1091

Merged
merged 1 commit into from
Mar 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,8 +1028,8 @@ func (c *compaction) newInputIter(newIters tableNewIters) (_ internalIterator, r

iterOpts := IterOptions{logger: c.logger}
addItersForLevel := func(iters []internalIterator, level *compactionLevel) ([]internalIterator, error) {
iters = append(iters, newLevelIter(iterOpts, c.cmp, newIters, level.files.Iter(),
manifest.Level(level.level), &c.bytesIterated))
iters = append(iters, newLevelIter(iterOpts, c.cmp, nil /* split */, newIters,
level.files.Iter(), manifest.Level(level.level), &c.bytesIterated))
// Add the range deletion iterator for each file as an independent level
// in mergingIter, as opposed to making a levelIter out of those. This
// is safer as levelIter expects all keys coming from underlying
Expand Down
2 changes: 1 addition & 1 deletion db.go
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ func finishInitializingIter(buf *iterAlloc) *Iterator {
li = &levelIter{}
}

li.init(dbi.opts, dbi.cmp, dbi.newIters, files, level, nil)
li.init(dbi.opts, dbi.cmp, dbi.split, dbi.newIters, files, level, nil)
li.initRangeDel(&mlevels[0].rangeDelIter)
li.initSmallestLargestUserKey(&mlevels[0].smallestUserKey, &mlevels[0].largestUserKey,
&mlevels[0].isLargestUserKeyRangeDelSentinel)
Expand Down
5 changes: 3 additions & 2 deletions get_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,8 @@ func (g *getIter) Next() (*InternalKey, []byte) {
files := g.l0[n-1].Iter()
g.l0 = g.l0[:n-1]
iterOpts := IterOptions{logger: g.logger}
g.levelIter.init(iterOpts, g.cmp, g.newIters, files, manifest.L0Sublevel(n), nil)
g.levelIter.init(iterOpts, g.cmp, nil /* split */, g.newIters,
files, manifest.L0Sublevel(n), nil)
g.levelIter.initRangeDel(&g.rangeDelIter)
g.iter = &g.levelIter
g.iterKey, g.iterValue = g.iter.SeekGE(g.key)
Expand All @@ -163,7 +164,7 @@ func (g *getIter) Next() (*InternalKey, []byte) {
}

iterOpts := IterOptions{logger: g.logger}
g.levelIter.init(iterOpts, g.cmp, g.newIters,
g.levelIter.init(iterOpts, g.cmp, nil /* split */, g.newIters,
g.version.Levels[g.level].Iter(), manifest.Level(g.level), nil)
g.levelIter.initRangeDel(&g.rangeDelIter)
g.level++
Expand Down
3 changes: 2 additions & 1 deletion ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ func ingestTargetLevel(

level := baseLevel
for ; level < numLevels; level++ {
levelIter := newLevelIter(iterOps, cmp, newIters, v.Levels[level].Iter(), manifest.Level(level), nil)
levelIter := newLevelIter(iterOps, cmp, nil /* split */, newIters,
v.Levels[level].Iter(), manifest.Level(level), nil)
var rangeDelIter internalIterator
// Pass in a non-nil pointer to rangeDelIter so that levelIter.findFileGE sets it up for the target file.
levelIter.initRangeDel(&rangeDelIter)
Expand Down
119 changes: 70 additions & 49 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/datadriven"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/sstable"
"github.com/cockroachdb/pebble/vfs"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1003,10 +1004,31 @@ func BenchmarkIteratorSeqSeekPrefixGENotFound(b *testing.B) {
const restartInterval = 16
const levelCount = 5
const keyOffset = 100000
readers, levelSlices, _ := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, levelCount, keyOffset, false)
readersWithTombstone, levelSlicesWithTombstone, _ := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, 1, keyOffset, true)

var readers [4][][]*sstable.Reader
var levelSlices [4][]manifest.LevelSlice
indexFunc := func(bloom bool, withTombstone bool) int {
index := 0
if bloom {
index = 2
}
if withTombstone {
index++
}
return index
}
for _, bloom := range []bool{false, true} {
for _, withTombstone := range []bool{false, true} {
index := indexFunc(bloom, withTombstone)
levels := levelCount
if withTombstone {
levels = 1
}
readers[index], levelSlices[index], _ = buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, levels, keyOffset, withTombstone, bloom)

}
}
// We will not be seeking to the keys that were written but instead to
// keys before the written keys. This is to validate that the optimization
// to use Next still functions when mergingIter checks for the prefix
Expand All @@ -1018,53 +1040,52 @@ func BenchmarkIteratorSeqSeekPrefixGENotFound(b *testing.B) {
keys = append(keys, []byte(fmt.Sprintf("%08d", i)))
}
for _, skip := range []int{1, 2, 4} {
for _, withTombstone := range []bool{false, true} {
b.Run(fmt.Sprintf("skip=%d/with-tombstone=%t", skip, withTombstone),
func(b *testing.B) {
readers := readers
levelSlices := levelSlices
if withTombstone {
readers = readersWithTombstone
levelSlices = levelSlicesWithTombstone
}
m := buildMergingIter(readers, levelSlices)
iter := Iterator{
cmp: DefaultComparer.Compare,
equal: DefaultComparer.Equal,
split: func(a []byte) int { return len(a) },
merge: DefaultMerger.Merge,
iter: m,
}
pos := 0
b.ResetTimer()
for i := 0; i < b.N; i++ {
// When withTombstone=true, and prior to the
// optimization to stop early due to a range
// tombstone, the iteration would continue into the
// next file, and not be able to use Next at the lower
// level in the next SeekPrefixGE call. So we would
// incur the cost of iterating over all the deleted
// keys for every seek. Note that it is not possible
// to do a noop optimization in Iterator for the
// prefix case, unlike SeekGE/SeekLT, since we don't
// know if the iterators inside mergingIter are all
// appropriately positioned -- some may not be due to
// bloom filters not matching.
valid := iter.SeekPrefixGE(keys[pos])
if valid {
b.Fatalf("key should not be found")
for _, bloom := range []bool{false, true} {
for _, withTombstone := range []bool{false, true} {
b.Run(fmt.Sprintf("skip=%d/bloom=%t/with-tombstone=%t", skip, bloom, withTombstone),
func(b *testing.B) {
index := indexFunc(bloom, withTombstone)
readers := readers[index]
levelSlices := levelSlices[index]
m := buildMergingIter(readers, levelSlices)
iter := Iterator{
cmp: DefaultComparer.Compare,
equal: DefaultComparer.Equal,
split: func(a []byte) int { return len(a) },
merge: DefaultMerger.Merge,
iter: m,
}
pos += skip
if pos >= keyOffset {
pos = 0
pos := 0
b.ResetTimer()
for i := 0; i < b.N; i++ {
// When withTombstone=true, and prior to the
// optimization to stop early due to a range
// tombstone, the iteration would continue into the
// next file, and not be able to use Next at the lower
// level in the next SeekPrefixGE call. So we would
// incur the cost of iterating over all the deleted
// keys for every seek. Note that it is not possible
// to do a noop optimization in Iterator for the
// prefix case, unlike SeekGE/SeekLT, since we don't
// know if the iterators inside mergingIter are all
// appropriately positioned -- some may not be due to
// bloom filters not matching.
valid := iter.SeekPrefixGE(keys[pos])
if valid {
b.Fatalf("key should not be found")
}
pos += skip
if pos >= keyOffset {
pos = 0
}
}
}
b.StopTimer()
iter.Close()
})
b.StopTimer()
iter.Close()
})
}
}
}
for _, r := range [][][]*sstable.Reader{readers, readersWithTombstone} {
for _, r := range readers {
for i := range r {
for j := range r[i] {
r[i][j].Close()
Expand All @@ -1081,7 +1102,7 @@ func BenchmarkIteratorSeqSeekGEWithBounds(b *testing.B) {
const restartInterval = 16
const levelCount = 5
readers, levelSlices, keys := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false)
b, blockSize, restartInterval, levelCount, 0 /* keyOffset */, false, false)
m := buildMergingIter(readers, levelSlices)
iter := Iterator{
cmp: DefaultComparer.Compare,
Expand Down Expand Up @@ -1118,7 +1139,7 @@ func BenchmarkIteratorSeekGENoop(b *testing.B) {
const levelCount = 5
const keyOffset = 10000
readers, levelSlices, _ := buildLevelsForMergingIterSeqSeek(
b, blockSize, restartInterval, levelCount, keyOffset, false)
b, blockSize, restartInterval, levelCount, keyOffset, false, false)
var keys [][]byte
for i := 0; i < keyOffset; i++ {
keys = append(keys, []byte(fmt.Sprintf("%08d", i)))
Expand Down
5 changes: 3 additions & 2 deletions level_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ func checkLevelsInternal(c *checkConfig) (err error) {
manifestIter := current.L0Sublevels.Levels[sublevel].Iter()
iterOpts := IterOptions{logger: c.logger}
li := &levelIter{}
li.init(iterOpts, c.cmp, c.newIters, manifestIter,
li.init(iterOpts, c.cmp, nil /* split */, c.newIters, manifestIter,
manifest.L0Sublevel(sublevel), nil)
li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
li.initSmallestLargestUserKey(&mlevelAlloc[0].smallestUserKey, nil, nil)
Expand All @@ -650,7 +650,8 @@ func checkLevelsInternal(c *checkConfig) (err error) {

iterOpts := IterOptions{logger: c.logger}
li := &levelIter{}
li.init(iterOpts, c.cmp, c.newIters, current.Levels[level].Iter(), manifest.Level(level), nil)
li.init(iterOpts, c.cmp, nil /* split */, c.newIters,
current.Levels[level].Iter(), manifest.Level(level), nil)
li.initRangeDel(&mlevelAlloc[0].rangeDelIter)
li.initSmallestLargestUserKey(&mlevelAlloc[0].smallestUserKey, nil, nil)
mlevelAlloc[0].iter = li
Expand Down
19 changes: 18 additions & 1 deletion level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type tableNewIters func(
type levelIter struct {
logger Logger
cmp Compare
split Split
// The lower/upper bounds for iteration as specified at creation or the most
// recent call to SetBounds.
lower []byte
Expand Down Expand Up @@ -153,22 +154,26 @@ type levelIter struct {
// levelIter implements the base.InternalIterator interface.
var _ base.InternalIterator = (*levelIter)(nil)

// newLevelIter returns a levelIter. It is permissible to pass a nil split
// parameter if the caller is never going to call SeekPrefixGE.
func newLevelIter(
opts IterOptions,
cmp Compare,
split Split,
newIters tableNewIters,
files manifest.LevelIterator,
level manifest.Level,
bytesIterated *uint64,
) *levelIter {
l := &levelIter{}
l.init(opts, cmp, newIters, files, level, bytesIterated)
l.init(opts, cmp, split, newIters, files, level, bytesIterated)
return l
}

func (l *levelIter) init(
opts IterOptions,
cmp Compare,
split Split,
newIters tableNewIters,
files manifest.LevelIterator,
level manifest.Level,
Expand All @@ -181,6 +186,7 @@ func (l *levelIter) init(
l.upper = opts.UpperBound
l.tableOpts.TableFilter = opts.TableFilter
l.cmp = cmp
l.split = split
l.iterFile = nil
l.newIters = newIters
l.files = files
Expand Down Expand Up @@ -435,6 +441,17 @@ func (l *levelIter) SeekPrefixGE(
}
return l.verify(l.largestBoundary, nil)
}
// It is possible that we are here because bloom filter matching failed.
// In that case it is likely that all keys matching the prefix are wholly
// within the current file and cannot be in the subsequent file. In that
// case we don't want to go to the next file, since loading and seeking in
// there has some cost. Additionally, for sparse key spaces, loading the
// next file will defeat the optimization for the next SeekPrefixGE that
// is called with trySeekUsingNext=true, since for sparse key spaces it is
// likely that the next key will also be contained in the current file.
if n := l.split(l.iterFile.Largest.UserKey); l.cmp(prefix, l.iterFile.Largest.UserKey[:n]) < 0 {
return nil, nil
}
return l.verify(l.skipEmptyFileForward())
}

Expand Down
31 changes: 16 additions & 15 deletions level_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func TestLevelIter(t *testing.T) {
}

iter := newLevelIter(opts, DefaultComparer.Compare,
newIters, files.Iter(), manifest.Level(level), nil)
func(a []byte) int { return len(a) }, newIters, files.Iter(), manifest.Level(level),
nil)
defer iter.Close()
// Fake up the range deletion initialization.
iter.initRangeDel(new(internalIterator))
Expand Down Expand Up @@ -122,7 +123,8 @@ func TestLevelIter(t *testing.T) {
}

iter := newLevelIter(opts, DefaultComparer.Compare,
newIters2, files.Iter(), manifest.Level(level), nil)
func(a []byte) int { return len(a) }, newIters2, files.Iter(),
manifest.Level(level), nil)
iter.SeekGE([]byte(key))
lower, upper := tableOpts.GetLowerBound(), tableOpts.GetUpperBound()
return fmt.Sprintf("[%s,%s]\n", lower, upper)
Expand Down Expand Up @@ -283,8 +285,9 @@ func TestLevelIterBoundaries(t *testing.T) {
}
if iter == nil {
slice := manifest.NewLevelSliceKeySorted(lt.cmp.Compare, lt.metas)
iter = newLevelIter(IterOptions{}, DefaultComparer.Compare, lt.newIters,
slice.Iter(), manifest.Level(level), nil)
iter = newLevelIter(IterOptions{}, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, lt.newIters, slice.Iter(),
manifest.Level(level), nil)
// Fake up the range deletion initialization.
iter.initRangeDel(new(internalIterator))
}
Expand Down Expand Up @@ -377,8 +380,9 @@ func TestLevelIterSeek(t *testing.T) {
case "iter":
slice := manifest.NewLevelSliceKeySorted(lt.cmp.Compare, lt.metas)
iter := &levelIterTestIter{
levelIter: newLevelIter(IterOptions{}, DefaultComparer.Compare, lt.newIters,
slice.Iter(), manifest.Level(level), nil),
levelIter: newLevelIter(IterOptions{}, DefaultComparer.Compare,
func(a []byte) int { return len(a) }, lt.newIters, slice.Iter(),
manifest.Level(level), nil),
}
defer iter.Close()
iter.initRangeDel(&iter.rangeDelIter)
Expand Down Expand Up @@ -479,8 +483,7 @@ func BenchmarkLevelIterSeekGE(b *testing.B) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
rng := rand.New(rand.NewSource(uint64(time.Now().UnixNano())))

b.ResetTimer()
Expand Down Expand Up @@ -522,8 +525,7 @@ func BenchmarkLevelIterSeqSeekGEWithBounds(b *testing.B) {
opts.LowerBound, opts.UpperBound)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)
// Fake up the range deletion initialization, to resemble the usage
// in a mergingIter.
l.initRangeDel(new(internalIterator))
Expand Down Expand Up @@ -571,7 +573,8 @@ func BenchmarkLevelIterSeqSeekPrefixGE(b *testing.B) {
b.Run(fmt.Sprintf("skip=%d/use-next=%t", skip, useNext),
func(b *testing.B) {
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
newIters, metas.Iter(), manifest.Level(level), nil)
func(a []byte) int { return len(a) }, newIters, metas.Iter(),
manifest.Level(level), nil)
// Fake up the range deletion initialization, to resemble the usage
// in a mergingIter.
l.initRangeDel(new(internalIterator))
Expand Down Expand Up @@ -613,8 +616,7 @@ func BenchmarkLevelIterNext(b *testing.B) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -648,8 +650,7 @@ func BenchmarkLevelIterPrev(b *testing.B) {
iter, err := readers[file.FileNum].NewIter(nil /* lower */, nil /* upper */)
return iter, nil, err
}
l := newLevelIter(IterOptions{}, DefaultComparer.Compare,
newIters, metas.Iter(), manifest.Level(level), nil)
l := newLevelIter(IterOptions{}, DefaultComparer.Compare, nil, newIters, metas.Iter(), manifest.Level(level), nil)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down
Loading