Skip to content

Commit

Permalink
db: add ingest-time splitting of ssts into virtual ones
Browse files Browse the repository at this point in the history
Currently, if we identify boundary overlap in a level
during ingest target level calculation, but no data overlap,
we are forced to find a target level above the file we saw
the overlap with (if we can't fall below it, such as if the
existing file is in L6, which happens commonly).

This change takes advantage of virtual sstables to split
existing sstables into two virtual sstables when an ingested
sstable would be able to go into the same level had the sstables
been split that way to begin with. Doing this split reduces a
lot of write-amp as it avoids us from having to compact the
newly-ingested sstable with the sstable it boundary-overlapped with.

Biggest part of cockroachdb#1683. First commit is cockroachdb#2538, which this shares
a lot of logic with (mostly just the excise function).
  • Loading branch information
itsbilal committed May 31, 2023
1 parent 5057eb4 commit ee0931e
Show file tree
Hide file tree
Showing 7 changed files with 425 additions and 31 deletions.
3 changes: 2 additions & 1 deletion compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1855,9 +1855,10 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
var level int
var err error
for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
level, err = ingestTargetLevel(
level, _, err = ingestTargetLevel(
d.newIters, d.tableNewRangeKeyIter, iterOpts, d.cmp,
c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata,
false, /* suggestSplit */
)
if err != nil {
return nil, err
Expand Down
5 changes: 3 additions & 2 deletions data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1229,8 +1229,9 @@ func runForceIngestCmd(td *datadriven.TestData, d *DB) error {
int,
map[*compaction]struct{},
*fileMetadata,
) (int, error) {
return level, nil
bool,
) (int, *fileMetadata, error) {
return level, nil, nil
}, nil, KeyRange{})
return err
}
Expand Down
199 changes: 177 additions & 22 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/invariants"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/private"
Expand Down Expand Up @@ -612,7 +613,8 @@ func ingestTargetLevel(
baseLevel int,
compactions map[*compaction]struct{},
meta *fileMetadata,
) (int, error) {
suggestSplit bool,
) (int, *fileMetadata, error) {
// Find the lowest level which does not have any files which overlap meta. We
// search from L0 to L6 looking for whether there are any files in the level
// which overlap meta. We want the "lowest" level (where lower means
Expand All @@ -627,6 +629,12 @@ func ingestTargetLevel(
// violate the sequence number invariant.
// - no file boundary overlap with level i, since that will violate the
// invariant that files do not overlap in levels i > 0.
// - if there is only a file overlap at a given level, and no data overlap,
// we can still slot a file at that level. We return the fileMetadata with
// which we have file boundary overlap (must be only one file by definition, as
// sstable bounds are tight on user keys) and the caller is expected to split
// that sstable into two virtual sstables, allowing this file to go into that
// level.
//
// The file boundary overlap check is simpler to conceptualize. Consider the
// following example, in which the ingested file lies completely before or
Expand Down Expand Up @@ -676,7 +684,7 @@ func ingestTargetLevel(
// This assertion implicitly checks that we have the current version of
// the metadata.
if v.L0Sublevels == nil {
return 0, errors.AssertionFailedf("could not read L0 sublevels")
return 0, nil, errors.AssertionFailedf("could not read L0 sublevels")
}
// Check for overlap over the keys of L0 by iterating over the sublevels.
for subLevel := 0; subLevel < len(v.L0SublevelFiles); subLevel++ {
Expand All @@ -702,14 +710,15 @@ func ingestTargetLevel(
err := iter.Close() // Closes range del iter as well.
err = firstError(err, levelIter.Close())
if err != nil {
return 0, err
return 0, nil, err
}
if overlap {
return targetLevel, nil
return targetLevel, nil, nil
}
}

level := baseLevel
var fileToSplit *fileMetadata
for ; level < numLevels; level++ {
levelIter := newLevelIter(iterOps, cmp, nil /* split */, newIters,
v.Levels[level].Iter(), manifest.Level(level), nil)
Expand All @@ -732,17 +741,35 @@ func ingestTargetLevel(
err := levelIter.Close() // Closes range del iter as well.
err = firstError(err, rkeyLevelIter.Close())
if err != nil {
return 0, err
return 0, nil, err
}
if overlap {
return targetLevel, nil
return targetLevel, fileToSplit, nil
}

// Check boundary overlap.
fileToSplit = nil
boundaryOverlaps := v.Overlaps(level, cmp, meta.Smallest.UserKey,
meta.Largest.UserKey, meta.Largest.IsExclusiveSentinel())
if !boundaryOverlaps.Empty() {
continue
// We are already guaranteed to not have any data overlaps with files
// in boundaryOverlaps, otherwise we'd have gone into the above if
// statements. Use this, plus boundaryOverlaps.Len() == 1 to detect for
// the case where we can slot this file into the current level despite
// a boundary overlap, by splitting one existing file into two virtual
// sstables.
if suggestSplit && boundaryOverlaps.Len() == 1 {
iter := boundaryOverlaps.Iter()
fileToSplit = iter.First()
} else {
// We either don't want to suggest ingest-time splits (i.e.
// !suggestSplit), or we boundary-overlapped with more than one file. The
// latter should never happen today, but possibly some future version has
// started writing FileMetadatas with key bounds that are loose on user
// keys. Handle this defensively.
fileToSplit = nil
continue
}
}

// Check boundary overlap with any ongoing compactions.
Expand All @@ -765,9 +792,13 @@ func ingestTargetLevel(
}
if !overlaps {
targetLevel = level
} else {
// We found an overlap with a compaction. Splitting a file won't resolve
// this, so reset fileToSplit.
fileToSplit = nil
}
}
return targetLevel, nil
return targetLevel, fileToSplit, nil
}

// Ingest ingests a set of sstables into the DB. Ingestion of the files is
Expand Down Expand Up @@ -1538,7 +1569,94 @@ type ingestTargetLevelFunc func(
baseLevel int,
compactions map[*compaction]struct{},
meta *fileMetadata,
) (int, error)
suggestSplit bool,
) (int, *fileMetadata, error)

type ingestSplitFile struct {
// ingestFile is the file being ingested, and splitFile is the file that
// needs to be split to allow ingestFile to slot into `level` level.
ingestFile, splitFile *fileMetadata
level int
}

// ingestSplit splits files specified in `files` and updates ve in-place to
// account for existing files getting split into two virtual sstables. The map
// `replacedFiles` contains an in-progress map of all files that have been
// replaced with new virtual sstables in this version edit so far, which is also
// updated in-place.
//
// d.mu must be held when calling this method, however it will be
// unconditionally released and then re-acquired during this method as it does
// IO.
func (d *DB) ingestSplit(
ve *versionEdit,
updateMetrics func(*fileMetadata, int, []newFileEntry),
files []ingestSplitFile,
exciseSpan KeyRange,
replacedFiles map[base.FileNum][]newFileEntry,
) error {
// Note the opposite order: Unlock() followed by Lock().
d.mu.Unlock()
defer d.mu.Lock()
for _, s := range files {
// replacedFiles can be thought of as a tree, where we start iterating with
// s.splitFile and run its fileNum through replacedFiles, then find which
// of the replaced files overlaps with s.ingestFile, check its
// replacements in replacedFiles again for overlap with s.ingestFile, and
// so on until we can't find the current splitFile in replacedFiles.
splitFile := s.splitFile
for splitFile != nil {
replaced, ok := replacedFiles[splitFile.FileNum]
if !ok {
break
}
updatedSplitFile := false
for i := range replaced {
if replaced[i].Meta.Overlaps(d.cmp, s.ingestFile.Smallest.UserKey, s.ingestFile.Largest.UserKey, s.ingestFile.Largest.IsExclusiveSentinel()) {
splitFile = replaced[i].Meta
updatedSplitFile = true
}
}
if !updatedSplitFile {
// None of the replaced files overlapped with the file being ingested.
// This is only okay if we have an excise span that overlaps with
// the ingested file.
if !exciseSpan.Valid() || !s.ingestFile.Overlaps(d.cmp, exciseSpan.Start, exciseSpan.End, true /* exclusiveEnd */) {
panic("could not find a file to split that overlaps with ingested file")
}
// No file to split.
splitFile = nil
}
}
if splitFile == nil {
continue
}
// NB: excise operates on [start, end). We're splitting at [start, end]
// (assuming !s.ingestFile.Largest.IsExclusiveSentinel()). The conflation
// of exclusive vs inclusive end bounds should not make a difference here
// as we're guaranteed to not have any data overlap between splitFile and
// s.ingestFile, so panic if we do see a newly added file with an endKey
// equalling s.ingestFile.Largest, and !s.ingestFile.Largest.IsExclusiveSentinel()
added, err := d.excise(KeyRange{Start: s.ingestFile.Smallest.UserKey, End: s.ingestFile.Largest.UserKey}, splitFile, ve, s.level)
if err != nil {
return err
}
if _, ok := ve.DeletedFiles[deletedFileEntry{
Level: s.level,
FileNum: splitFile.FileNum,
}]; !ok {
panic("did not split file that was expected to be split")
}
replacedFiles[splitFile.FileNum] = added
for i := range added {
if s.ingestFile.Overlaps(d.cmp, added[i].Meta.Smallest.UserKey, added[i].Meta.Largest.UserKey, added[i].Meta.Largest.IsExclusiveSentinel()) {
panic("ingest-time split produced a file that overlaps with ingested file")
}
}
updateMetrics(splitFile, s.level, added)
}
return nil
}

func (d *DB) ingestApply(
jobID int,
Expand All @@ -1553,7 +1671,7 @@ func (d *DB) ingestApply(
ve := &versionEdit{
NewFiles: make([]newFileEntry, len(lr.localMeta)+len(lr.sharedMeta)),
}
if exciseSpan.Valid() {
if exciseSpan.Valid() || d.opts.Experimental.IngestSplit {
ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{}
}
metrics := make(map[int]*LevelMetrics)
Expand All @@ -1578,6 +1696,8 @@ func (d *DB) ingestApply(
current := d.mu.versions.currentVersion()
baseLevel := d.mu.versions.picker.getBaseLevel()
iterOps := IterOptions{logger: d.opts.Logger}
// filesToSplit is a map of files being ingested -> files being split.
filesToSplit := make([]ingestSplitFile, 0)
for i := 0; i < len(lr.localMeta)+len(lr.sharedMeta); i++ {
// Determine the lowest level in the LSM for which the sstable doesn't
// overlap any existing files in the level.
Expand All @@ -1598,6 +1718,7 @@ func (d *DB) ingestApply(
}
ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking)
} else {
var splitFile *fileMetadata
if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest.UserKey) &&
(exciseSpan.Contains(d.cmp, m.Largest.UserKey) || (d.cmp(m.Largest.UserKey, exciseSpan.End) == 0 && m.Largest.IsExclusiveSentinel())) {
// This file fits perfectly within the excise span. We can slot it at
Expand All @@ -1611,7 +1732,26 @@ func (d *DB) ingestApply(
f.Level = 6
}
} else {
f.Level, err = findTargetLevel(d.newIters, d.tableNewRangeKeyIter, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m)
// TODO(bilal): findTargetLevel does disk IO (reading files for data
// overlap) even though we're holding onto d.mu. Consider unlocking
// d.mu while we do this. We already hold versions.logLock so we should
// not see any version applications while we're at this. The one
// complication here would be pulling out the mu.compact.inProgress
// check from findTargetLevel, as that requires d.mu to be held.
//
// TODO(bilal): Once https://github.com/cockroachdb/pebble/issues/2557
// is completed, gate suggestSplit on the format major version as well.
f.Level, splitFile, err = findTargetLevel(
d.newIters, d.tableNewRangeKeyIter, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m, d.opts.Experimental.IngestSplit)
}

if splitFile != nil {
if invariants.Enabled {
if lf := current.Levels[f.Level].Find(d.cmp, splitFile); lf == nil {
panic("splitFile returned is not in level it should be")
}
}
filesToSplit = append(filesToSplit, ingestSplitFile{ingestFile: m, splitFile: splitFile, level: f.Level})
}
}
if err != nil {
Expand All @@ -1629,6 +1769,23 @@ func (d *DB) ingestApply(
levelMetrics.BytesIngested += m.Size
levelMetrics.TablesIngested++
}
// replacedFiles maps files excised due to exciseSpan (or splitFiles returned
// by ingestTargetLevel), to files that were created to replace it. This map
//
replacedFiles := make(map[base.FileNum][]newFileEntry)
updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) {
levelMetrics := metrics[level]
if levelMetrics == nil {
levelMetrics = &LevelMetrics{}
metrics[level] = levelMetrics
}
levelMetrics.NumFiles--
levelMetrics.Size -= int64(m.Size)
for i := range added {
levelMetrics.NumFiles++
levelMetrics.Size += int64(added[i].Meta.Size)
}
}
if exciseSpan.Valid() {
// Release the db mutex. We don't need to hold it while we do the excise;
// we just need to prevent any new versions from being installed between
Expand All @@ -1655,21 +1812,19 @@ func (d *DB) ingestApply(
// We did not excise this file.
continue
}
levelMetrics := metrics[level]
if levelMetrics == nil {
levelMetrics = &LevelMetrics{}
metrics[level] = levelMetrics
}
levelMetrics.NumFiles--
levelMetrics.Size -= int64(m.Size)
for i := range excised {
levelMetrics.NumFiles++
levelMetrics.Size += int64(excised[i].Meta.Size)
}
replacedFiles[m.FileNum] = excised
updateLevelMetricsOnExcise(m, level, excised)
}
}
d.mu.Lock()
}
if len(filesToSplit) > 0 {
// Similar to the above, we don't need to hold the DB mutex while we
// do IO in d.excise.
if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, filesToSplit, exciseSpan, replacedFiles); err != nil {
return nil, err
}
}
if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo {
return d.getInProgressCompactionInfoLocked(nil)
}); err != nil {
Expand Down
Loading

0 comments on commit ee0931e

Please sign in to comment.