diff --git a/compaction.go b/compaction.go index 0d16d5d01c..870050b052 100644 --- a/compaction.go +++ b/compaction.go @@ -1855,9 +1855,10 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) { var level int var err error for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files { - level, err = ingestTargetLevel( + level, _, err = ingestTargetLevel( d.newIters, d.tableNewRangeKeyIter, iterOpts, d.cmp, c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata, + false, /* suggestSplit */ ) if err != nil { return nil, err diff --git a/data_test.go b/data_test.go index 873e682e04..7da8c5cd9f 100644 --- a/data_test.go +++ b/data_test.go @@ -1229,8 +1229,9 @@ func runForceIngestCmd(td *datadriven.TestData, d *DB) error { int, map[*compaction]struct{}, *fileMetadata, - ) (int, error) { - return level, nil + bool, + ) (int, *fileMetadata, error) { + return level, nil, nil }, nil, KeyRange{}) return err } diff --git a/ingest.go b/ingest.go index 1b87205be4..7fab8c7ab6 100644 --- a/ingest.go +++ b/ingest.go @@ -12,6 +12,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/keyspan" "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/internal/private" @@ -612,7 +613,8 @@ func ingestTargetLevel( baseLevel int, compactions map[*compaction]struct{}, meta *fileMetadata, -) (int, error) { + suggestSplit bool, +) (int, *fileMetadata, error) { // Find the lowest level which does not have any files which overlap meta. We // search from L0 to L6 looking for whether there are any files in the level // which overlap meta. We want the "lowest" level (where lower means @@ -627,6 +629,12 @@ func ingestTargetLevel( // violate the sequence number invariant. // - no file boundary overlap with level i, since that will violate the // invariant that files do not overlap in levels i > 0. + // - if there is only a file overlap at a given level, and no data overlap, + // we can still slot a file at that level. We return the fileMetadata with + // which we have file boundary overlap (must be only one file by definition, as + // sstable bounds are tight on user keys) and the caller is expected to split + // that sstable into two virtual sstables, allowing this file to go into that + // level. // // The file boundary overlap check is simpler to conceptualize. Consider the // following example, in which the ingested file lies completely before or @@ -676,7 +684,7 @@ func ingestTargetLevel( // This assertion implicitly checks that we have the current version of // the metadata. if v.L0Sublevels == nil { - return 0, errors.AssertionFailedf("could not read L0 sublevels") + return 0, nil, errors.AssertionFailedf("could not read L0 sublevels") } // Check for overlap over the keys of L0 by iterating over the sublevels. for subLevel := 0; subLevel < len(v.L0SublevelFiles); subLevel++ { @@ -702,14 +710,15 @@ func ingestTargetLevel( err := iter.Close() // Closes range del iter as well. err = firstError(err, levelIter.Close()) if err != nil { - return 0, err + return 0, nil, err } if overlap { - return targetLevel, nil + return targetLevel, nil, nil } } level := baseLevel + var fileToSplit *fileMetadata for ; level < numLevels; level++ { levelIter := newLevelIter(iterOps, cmp, nil /* split */, newIters, v.Levels[level].Iter(), manifest.Level(level), nil) @@ -732,17 +741,35 @@ func ingestTargetLevel( err := levelIter.Close() // Closes range del iter as well. err = firstError(err, rkeyLevelIter.Close()) if err != nil { - return 0, err + return 0, nil, err } if overlap { - return targetLevel, nil + return targetLevel, fileToSplit, nil } // Check boundary overlap. + fileToSplit = nil boundaryOverlaps := v.Overlaps(level, cmp, meta.Smallest.UserKey, meta.Largest.UserKey, meta.Largest.IsExclusiveSentinel()) if !boundaryOverlaps.Empty() { - continue + // We are already guaranteed to not have any data overlaps with files + // in boundaryOverlaps, otherwise we'd have gone into the above if + // statements. Use this, plus boundaryOverlaps.Len() == 1 to detect for + // the case where we can slot this file into the current level despite + // a boundary overlap, by splitting one existing file into two virtual + // sstables. + if suggestSplit && boundaryOverlaps.Len() == 1 { + iter := boundaryOverlaps.Iter() + fileToSplit = iter.First() + } else { + // We either don't want to suggest ingest-time splits (i.e. + // !suggestSplit), or we boundary-overlapped with more than one file. The + // latter should never happen today, but possibly some future version has + // started writing FileMetadatas with key bounds that are loose on user + // keys. Handle this defensively. + fileToSplit = nil + continue + } } // Check boundary overlap with any ongoing compactions. @@ -765,9 +792,13 @@ func ingestTargetLevel( } if !overlaps { targetLevel = level + } else { + // We found an overlap with a compaction. Splitting a file won't resolve + // this, so reset fileToSplit. + fileToSplit = nil } } - return targetLevel, nil + return targetLevel, fileToSplit, nil } // Ingest ingests a set of sstables into the DB. Ingestion of the files is @@ -1538,7 +1569,94 @@ type ingestTargetLevelFunc func( baseLevel int, compactions map[*compaction]struct{}, meta *fileMetadata, -) (int, error) + suggestSplit bool, +) (int, *fileMetadata, error) + +type ingestSplitFile struct { + // ingestFile is the file being ingested, and splitFile is the file that + // needs to be split to allow ingestFile to slot into `level` level. + ingestFile, splitFile *fileMetadata + level int +} + +// ingestSplit splits files specified in `files` and updates ve in-place to +// account for existing files getting split into two virtual sstables. The map +// `replacedFiles` contains an in-progress map of all files that have been +// replaced with new virtual sstables in this version edit so far, which is also +// updated in-place. +// +// d.mu must be held when calling this method, however it will be +// unconditionally released and then re-acquired during this method as it does +// IO. +func (d *DB) ingestSplit( + ve *versionEdit, + updateMetrics func(*fileMetadata, int, []newFileEntry), + files []ingestSplitFile, + exciseSpan KeyRange, + replacedFiles map[base.FileNum][]newFileEntry, +) error { + // Note the opposite order: Unlock() followed by Lock(). + d.mu.Unlock() + defer d.mu.Lock() + for _, s := range files { + // replacedFiles can be thought of as a tree, where we start iterating with + // s.splitFile and run its fileNum through replacedFiles, then find which + // of the replaced files overlaps with s.ingestFile, check its + // replacements in replacedFiles again for overlap with s.ingestFile, and + // so on until we can't find the current splitFile in replacedFiles. + splitFile := s.splitFile + for splitFile != nil { + replaced, ok := replacedFiles[splitFile.FileNum] + if !ok { + break + } + updatedSplitFile := false + for i := range replaced { + if replaced[i].Meta.Overlaps(d.cmp, s.ingestFile.Smallest.UserKey, s.ingestFile.Largest.UserKey, s.ingestFile.Largest.IsExclusiveSentinel()) { + splitFile = replaced[i].Meta + updatedSplitFile = true + } + } + if !updatedSplitFile { + // None of the replaced files overlapped with the file being ingested. + // This is only okay if we have an excise span that overlaps with + // the ingested file. + if !exciseSpan.Valid() || !s.ingestFile.Overlaps(d.cmp, exciseSpan.Start, exciseSpan.End, true /* exclusiveEnd */) { + panic("could not find a file to split that overlaps with ingested file") + } + // No file to split. + splitFile = nil + } + } + if splitFile == nil { + continue + } + // NB: excise operates on [start, end). We're splitting at [start, end] + // (assuming !s.ingestFile.Largest.IsExclusiveSentinel()). The conflation + // of exclusive vs inclusive end bounds should not make a difference here + // as we're guaranteed to not have any data overlap between splitFile and + // s.ingestFile, so panic if we do see a newly added file with an endKey + // equalling s.ingestFile.Largest, and !s.ingestFile.Largest.IsExclusiveSentinel() + added, err := d.excise(KeyRange{Start: s.ingestFile.Smallest.UserKey, End: s.ingestFile.Largest.UserKey}, splitFile, ve, s.level) + if err != nil { + return err + } + if _, ok := ve.DeletedFiles[deletedFileEntry{ + Level: s.level, + FileNum: splitFile.FileNum, + }]; !ok { + panic("did not split file that was expected to be split") + } + replacedFiles[splitFile.FileNum] = added + for i := range added { + if s.ingestFile.Overlaps(d.cmp, added[i].Meta.Smallest.UserKey, added[i].Meta.Largest.UserKey, added[i].Meta.Largest.IsExclusiveSentinel()) { + panic("ingest-time split produced a file that overlaps with ingested file") + } + } + updateMetrics(splitFile, s.level, added) + } + return nil +} func (d *DB) ingestApply( jobID int, @@ -1553,7 +1671,7 @@ func (d *DB) ingestApply( ve := &versionEdit{ NewFiles: make([]newFileEntry, len(lr.localMeta)+len(lr.sharedMeta)), } - if exciseSpan.Valid() { + if exciseSpan.Valid() || d.opts.Experimental.IngestSplit { ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{} } metrics := make(map[int]*LevelMetrics) @@ -1578,6 +1696,8 @@ func (d *DB) ingestApply( current := d.mu.versions.currentVersion() baseLevel := d.mu.versions.picker.getBaseLevel() iterOps := IterOptions{logger: d.opts.Logger} + // filesToSplit is a map of files being ingested -> files being split. + filesToSplit := make([]ingestSplitFile, 0) for i := 0; i < len(lr.localMeta)+len(lr.sharedMeta); i++ { // Determine the lowest level in the LSM for which the sstable doesn't // overlap any existing files in the level. @@ -1598,6 +1718,7 @@ func (d *DB) ingestApply( } ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking) } else { + var splitFile *fileMetadata if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest.UserKey) && (exciseSpan.Contains(d.cmp, m.Largest.UserKey) || (d.cmp(m.Largest.UserKey, exciseSpan.End) == 0 && m.Largest.IsExclusiveSentinel())) { // This file fits perfectly within the excise span. We can slot it at @@ -1611,7 +1732,26 @@ func (d *DB) ingestApply( f.Level = 6 } } else { - f.Level, err = findTargetLevel(d.newIters, d.tableNewRangeKeyIter, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m) + // TODO(bilal): findTargetLevel does disk IO (reading files for data + // overlap) even though we're holding onto d.mu. Consider unlocking + // d.mu while we do this. We already hold versions.logLock so we should + // not see any version applications while we're at this. The one + // complication here would be pulling out the mu.compact.inProgress + // check from findTargetLevel, as that requires d.mu to be held. + // + // TODO(bilal): Once https://github.com/cockroachdb/pebble/issues/2557 + // is completed, gate suggestSplit on the format major version as well. + f.Level, splitFile, err = findTargetLevel( + d.newIters, d.tableNewRangeKeyIter, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m, d.opts.Experimental.IngestSplit) + } + + if splitFile != nil { + if invariants.Enabled { + if lf := current.Levels[f.Level].Find(d.cmp, splitFile); lf == nil { + panic("splitFile returned is not in level it should be") + } + } + filesToSplit = append(filesToSplit, ingestSplitFile{ingestFile: m, splitFile: splitFile, level: f.Level}) } } if err != nil { @@ -1629,6 +1769,23 @@ func (d *DB) ingestApply( levelMetrics.BytesIngested += m.Size levelMetrics.TablesIngested++ } + // replacedFiles maps files excised due to exciseSpan (or splitFiles returned + // by ingestTargetLevel), to files that were created to replace it. This map + // + replacedFiles := make(map[base.FileNum][]newFileEntry) + updateLevelMetricsOnExcise := func(m *fileMetadata, level int, added []newFileEntry) { + levelMetrics := metrics[level] + if levelMetrics == nil { + levelMetrics = &LevelMetrics{} + metrics[level] = levelMetrics + } + levelMetrics.NumFiles-- + levelMetrics.Size -= int64(m.Size) + for i := range added { + levelMetrics.NumFiles++ + levelMetrics.Size += int64(added[i].Meta.Size) + } + } if exciseSpan.Valid() { // Release the db mutex. We don't need to hold it while we do the excise; // we just need to prevent any new versions from being installed between @@ -1655,21 +1812,19 @@ func (d *DB) ingestApply( // We did not excise this file. continue } - levelMetrics := metrics[level] - if levelMetrics == nil { - levelMetrics = &LevelMetrics{} - metrics[level] = levelMetrics - } - levelMetrics.NumFiles-- - levelMetrics.Size -= int64(m.Size) - for i := range excised { - levelMetrics.NumFiles++ - levelMetrics.Size += int64(excised[i].Meta.Size) - } + replacedFiles[m.FileNum] = excised + updateLevelMetricsOnExcise(m, level, excised) } } d.mu.Lock() } + if len(filesToSplit) > 0 { + // Similar to the above, we don't need to hold the DB mutex while we + // do IO in d.excise. + if err := d.ingestSplit(ve, updateLevelMetricsOnExcise, filesToSplit, exciseSpan, replacedFiles); err != nil { + return nil, err + } + } if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) }); err != nil { diff --git a/ingest_test.go b/ingest_test.go index 6cd8239f8d..e456190edb 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -1049,16 +1049,28 @@ func TestIngestTargetLevel(t *testing.T) { case "target": var buf bytes.Buffer + suggestSplit := false + for _, cmd := range td.CmdArgs { + switch cmd.Key { + case "suggest-split": + suggestSplit = true + } + } for _, target := range strings.Split(td.Input, "\n") { meta := parseMeta(target) - level, err := ingestTargetLevel( + level, overlapFile, err := ingestTargetLevel( d.newIters, d.tableNewRangeKeyIter, IterOptions{logger: d.opts.Logger}, d.cmp, d.mu.versions.currentVersion(), 1, d.mu.compact.inProgress, meta, + suggestSplit, ) if err != nil { return err.Error() } - fmt.Fprintf(&buf, "%d\n", level) + if overlapFile != nil { + fmt.Fprintf(&buf, "%d (split file: %s)\n", level, overlapFile.FileNum) + } else { + fmt.Fprintf(&buf, "%d\n", level) + } } return buf.String() @@ -1076,7 +1088,7 @@ func TestIngest(t *testing.T) { require.NoError(t, d.Close()) }() - reset := func() { + reset := func(split bool) { if d != nil { require.NoError(t, d.Close()) } @@ -1093,6 +1105,7 @@ func TestIngest(t *testing.T) { }}, FormatMajorVersion: internalFormatNewest, } + opts.Experimental.IngestSplit = split // Disable automatic compactions because otherwise we'll race with // delete-only compactions triggered by ingesting range tombstones. opts.DisableAutomaticCompactions = true @@ -1101,12 +1114,21 @@ func TestIngest(t *testing.T) { d, err = Open("", opts) require.NoError(t, err) } - reset() + reset(false /* split */) datadriven.RunTest(t, "testdata/ingest", func(t *testing.T, td *datadriven.TestData) string { switch td.Cmd { case "reset": - reset() + split := false + for _, cmd := range td.CmdArgs { + switch cmd.Key { + case "enable-split": + split = true + default: + return fmt.Sprintf("unexpected key: %s", cmd.Key) + } + } + reset(split) return "" case "batch": b := d.NewIndexedBatch() diff --git a/options.go b/options.go index c415f77fa3..8da2f237bc 100644 --- a/options.go +++ b/options.go @@ -521,6 +521,11 @@ type Options struct { // concurrency slots as determined by the two options is chosen. CompactionDebtConcurrency int + // IngestSplit, if true, allows for ingest-time splitting of existing + // sstables into two virtual sstables to allow ingestion sstables to slot + // into a lower level than they otherwise would have. + IngestSplit bool + // MinDeletionRate is the minimum number of bytes per second that would // be deleted. Deletion pacing is used to slow down deletions when // compactions finish up or readers close, and newly-obsolete files need diff --git a/testdata/ingest b/testdata/ingest index 3334e58254..c32480a84d 100644 --- a/testdata/ingest +++ b/testdata/ingest @@ -53,7 +53,7 @@ zmemtbl 0 0 B titers 0 filter - - 0.0% (score == utility) ingest 1 - + iter seek-ge a @@ -904,3 +904,137 @@ lsm 000006:[f#12,SET-h#12,SET] 000010:[s#16,RANGEKEYDEL-x#inf,RANGEKEYDEL] 000009:[x#15,SET-y#15,SET] + +reset enable-split +---- + +build ext10 +set a foo +set e bar +---- + +ingest ext10 +---- + +lsm +---- +6: + 000004:[a#10,SET-e#10,SET] + +# The below ingestion should split one existing file. + +build ext11 +set b foobar +set d foobar +---- + +ingest ext11 +---- + +lsm +---- +6: + 000006:[a#10,SET-a#10,SET] + 000005:[b#11,SET-d#11,SET] + 000007:[e#10,SET-e#10,SET] + +iter +first +next +next +next +---- +a: (foo, .) +b: (foobar, .) +d: (foobar, .) +e: (bar, .) + +# This ingestion should not split any files due to data overlap. + +build ext12 +set c foobar +set e baz +---- + +ingest ext12 +---- + +lsm +---- +0.0: + 000008:[c#12,SET-e#12,SET] +6: + 000006:[a#10,SET-a#10,SET] + 000005:[b#11,SET-d#11,SET] + 000007:[e#10,SET-e#10,SET] + +# The below ingestion should fall through one existing file and split another +# file. + +build ext13 +set cc foo +set ccc foooo +---- + +ingest ext13 +---- + +lsm +---- +0.0: + 000008:[c#12,SET-e#12,SET] +6: + 000006:[a#10,SET-a#10,SET] + 000010:[b#11,SET-b#11,SET] + 000009:[cc#13,SET-ccc#13,SET] + 000011:[d#11,SET-d#11,SET] + 000007:[e#10,SET-e#10,SET] + +iter +seek-ge c +next +next +next +next +---- +c: (foobar, .) +cc: (foo, .) +ccc: (foooo, .) +d: (foobar, .) +e: (baz, .) + +# Ingestion splitting doesn't kick in at L0. + +build ext14 +set d updated +set dd new +---- + +ingest ext14 +---- + +lsm +---- +0.1: + 000012:[d#14,SET-dd#14,SET] +0.0: + 000008:[c#12,SET-e#12,SET] +6: + 000006:[a#10,SET-a#10,SET] + 000010:[b#11,SET-b#11,SET] + 000009:[cc#13,SET-ccc#13,SET] + 000011:[d#11,SET-d#11,SET] + 000007:[e#10,SET-e#10,SET] + +iter +seek-lt d +next +next +next +next +---- +ccc: (foooo, .) +d: (updated, .) +dd: (new, .) +e: (baz, .) +. diff --git a/testdata/ingest_target_level b/testdata/ingest_target_level index 66d8d65924..e6b8edf1a3 100644 --- a/testdata/ingest_target_level +++ b/testdata/ingest_target_level @@ -246,3 +246,79 @@ target rkey:a-c ---- 4 + +# Cases with boundary overlap and no data overlap. With suggest-split off +# we get a target level of L0, but with suggest-split on, we get suggested +# a file split. + +define +L6 + a.SET.2:2 + d.SET.3:3 +L6 + f.SET.4:4 + k.SET.6:6 +---- +6: + 000004:[a#2,SET-d#3,SET] + 000005:[f#4,SET-k#6,SET] + +target +b-c +e-g +---- +5 +5 + +target suggest-split +b-c +e-g +---- +6 (split file: 000004) +5 + +target suggest-split +g-i +---- +6 (split file: 000005) + +# suggest-split recognizes and avoids in-progress compactions. + +define +L6 + a.SET.2:2 + d.SET.3:3 +L6 + f.SET.4:4 + k.SET.6:6 + compact:f-k +---- +6: + 000004:[a#2,SET-d#3,SET] + 000005:[f#4,SET-k#6,SET] + +target suggest-split +g-i +---- +5 + +# Ingestion splitting correctly recognizes data overlap in L6, and suggests +# split in L5. + +define +L5 + a.SET.2:2 + e.SET.3:3 +L6 + c.SET.1:1 + k.SET.1:1 +---- +5: + 000004:[a#2,SET-e#3,SET] +6: + 000005:[c#1,SET-k#1,SET] + +target suggest-split +b-c +---- +5 (split file: 000004)