From 2e9d3f59c4a2a0616108273923187668fb33baab Mon Sep 17 00:00:00 2001 From: Arjun Nair Date: Tue, 21 Feb 2023 15:09:38 -0500 Subject: [PATCH] .*: virtual sstable FileMetadata changes This pr introduces the FileMetadata changes which will be required to make further virtual sstable changes. This pr doesn't deal with persisting of the FileMetadata to disk through the version edits. RFC: https://github.com/cockroachdb/pebble/blob/master/docs/RFCS/20221122_virtual_sstable.md --- checkpoint.go | 41 +++- compaction.go | 59 ++--- compaction_picker_test.go | 8 + compaction_test.go | 228 +++++++++++--------- data_test.go | 1 + db.go | 31 ++- flushable.go | 31 +-- format_major_version_test.go | 17 +- get_iter_test.go | 1 + ingest.go | 30 ++- ingest_test.go | 6 + internal/keyspan/level_iter_test.go | 3 + internal/manifest/btree.go | 8 +- internal/manifest/btree_test.go | 2 + internal/manifest/l0_sublevels_test.go | 3 +- internal/manifest/level_metadata_test.go | 1 + internal/manifest/version.go | 260 +++++++++++++++++++++-- internal/manifest/version_edit.go | 114 +++++++++- internal/manifest/version_edit_test.go | 7 + internal/manifest/version_test.go | 4 +- level_checker_test.go | 1 + level_iter_test.go | 3 + merging_iter_test.go | 2 + open.go | 20 +- open_test.go | 6 +- table_cache.go | 4 +- table_stats.go | 107 ++++++---- tool/db.go | 19 +- tool/find.go | 8 + tool/manifest.go | 6 +- version_set.go | 82 +++++-- version_set_test.go | 184 ++++++++++++++++ 32 files changed, 1029 insertions(+), 268 deletions(-) diff --git a/checkpoint.go b/checkpoint.go index d0c48f716c..36eebf9901 100644 --- a/checkpoint.go +++ b/checkpoint.go @@ -135,6 +135,9 @@ func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) { // space overhead for a checkpoint if hard links are disabled. Also beware that // even if hard links are used, the space overhead for the checkpoint will // increase over time as the DB performs compactions. +// +// TODO(bananabrick): Test checkpointing of virtual sstables once virtual +// sstables is running e2e. func (d *DB) Checkpoint( destDir string, opts ...CheckpointOption, ) ( @@ -188,7 +191,10 @@ func (d *DB) Checkpoint( manifestFileNum := d.mu.versions.manifestFileNum manifestSize := d.mu.versions.manifest.Size() optionsFileNum := d.optionsFileNum - + virtualBackingFiles := make(map[base.FileNum]struct{}) + for fileNum := range d.mu.versions.fileBackingMap { + virtualBackingFiles[fileNum] = struct{}{} + } // Release the manifest and DB.mu so we don't block other operations on // the database. d.mu.versions.logUnlock() @@ -254,7 +260,9 @@ func (d *DB) Checkpoint( } var excludedFiles map[deletedFileEntry]*fileMetadata - + // Set of FileBacking.FileNum which will be required by virtual sstables in + // the checkpoint. + requiredVirtualBackingFiles := make(map[base.FileNum]struct{}) // Link or copy the sstables. for l := range current.Levels { iter := current.Levels[l].Iter() @@ -270,7 +278,15 @@ func (d *DB) Checkpoint( continue } - srcPath := base.MakeFilepath(fs, d.dirname, fileTypeTable, f.FileNum) + fileBacking := f.FileBacking + if f.Virtual { + if _, ok := requiredVirtualBackingFiles[fileBacking.FileNum]; ok { + continue + } + requiredVirtualBackingFiles[fileBacking.FileNum] = struct{}{} + } + + srcPath := base.MakeFilepath(fs, d.dirname, fileTypeTable, fileBacking.FileNum) destPath := fs.PathJoin(destDir, fs.PathBase(srcPath)) ckErr = vfs.LinkOrCopy(fs, srcPath, destPath) if ckErr != nil { @@ -279,7 +295,19 @@ func (d *DB) Checkpoint( } } - ckErr = d.writeCheckpointManifest(fs, formatVers, destDir, dir, manifestFileNum, manifestSize, excludedFiles) + var removeBackingTables []base.FileNum + for fileNum := range virtualBackingFiles { + if _, ok := requiredVirtualBackingFiles[fileNum]; !ok { + // The backing sstable associated with fileNum is no longer + // required. + removeBackingTables = append(removeBackingTables, fileNum) + } + } + + ckErr = d.writeCheckpointManifest( + fs, formatVers, destDir, dir, manifestFileNum, manifestSize, + excludedFiles, removeBackingTables, + ) if ckErr != nil { return ckErr } @@ -318,6 +346,7 @@ func (d *DB) writeCheckpointManifest( manifestFileNum FileNum, manifestSize int64, excludedFiles map[deletedFileEntry]*fileMetadata, + removeBackingTables []base.FileNum, ) error { // Copy the MANIFEST, and create a pointer to it. We copy rather // than link because additional version edits added to the @@ -349,8 +378,10 @@ func (d *DB) writeCheckpointManifest( if len(excludedFiles) > 0 { // Write out an additional VersionEdit that deletes the excluded SST files. ve := versionEdit{ - DeletedFiles: excludedFiles, + DeletedFiles: excludedFiles, + RemovedBackingTables: removeBackingTables, } + rw := record.NewWriter(dst) w, err := rw.Next() if err != nil { diff --git a/compaction.go b/compaction.go index c24a8c8d71..ac348c86ee 100644 --- a/compaction.go +++ b/compaction.go @@ -1384,7 +1384,9 @@ func (c *compaction) newInputIter( // of range tombstones outside the file's internal key bounds. Skip // any range tombstones completely outside file bounds. rangeDelIter = keyspan.Truncate( - c.cmp, rangeDelIter, lowerBound.UserKey, upperBound.UserKey, &f.Smallest, &f.Largest) + c.cmp, rangeDelIter, lowerBound.UserKey, upperBound.UserKey, + &f.Smallest, &f.Largest, + ) } if rangeDelIter == nil { rangeDelIter = emptyKeyspanIter @@ -1836,12 +1838,12 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) { for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files { level, err = ingestTargetLevel( d.newIters, d.tableNewRangeKeyIter, iterOpts, d.cmp, - c.version, baseLevel, d.mu.compact.inProgress, file, + c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata, ) if err != nil { return nil, err } - ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file}) + ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata}) levelMetrics := c.metrics[level] if levelMetrics == nil { levelMetrics = &LevelMetrics{} @@ -1958,7 +1960,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { startTime := d.timeNow() var ve *manifest.VersionEdit - var pendingOutputs []*manifest.FileMetadata + var pendingOutputs []physicalMeta // To determine the target level of the files in the ingestedFlushable, we // need to acquire the logLock, and not release it for that duration. Since, // we need to acquire the logLock below to perform the logAndApply step @@ -2023,7 +2025,12 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) { if err != nil { info.Err = err // TODO(peter): untested. - d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, pendingOutputs...) + for _, f := range pendingOutputs { + d.mu.versions.obsoleteTables = append( + d.mu.versions.obsoleteTables, + fileInfo{f.FileNum, f.Size}, + ) + } d.mu.versions.updateObsoleteTableMetricsLocked() } } else { @@ -2522,7 +2529,12 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { }) if err != nil { // TODO(peter): untested. - d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, pendingOutputs...) + for _, f := range pendingOutputs { + d.mu.versions.obsoleteTables = append( + d.mu.versions.obsoleteTables, + fileInfo{f.FileNum, f.Size}, + ) + } d.mu.versions.updateObsoleteTableMetricsLocked() } } @@ -2564,7 +2576,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { // re-acquired during the course of this method. func (d *DB) runCompaction( jobID int, c *compaction, -) (ve *versionEdit, pendingOutputs []*fileMetadata, retErr error) { +) (ve *versionEdit, pendingOutputs []physicalMeta, retErr error) { // As a sanity check, confirm that the smallest / largest keys for new and // deleted files in the new versionEdit pass a validation function before // returning the edit. @@ -2745,7 +2757,7 @@ func (d *DB) runCompaction( d.mu.Lock() fileNum := d.mu.versions.getNextFileNum() fileMeta.FileNum = fileNum - pendingOutputs = append(pendingOutputs, fileMeta) + pendingOutputs = append(pendingOutputs, fileMeta.PhysicalMeta()) d.mu.Unlock() writable, objMeta, err := d.objProvider.Create(context.TODO(), fileTypeTable, fileNum, objstorage.CreateOptions{} /* TODO */) @@ -2896,9 +2908,13 @@ func (d *DB) runCompaction( meta.Size = writerMeta.Size meta.SmallestSeqNum = writerMeta.SmallestSeqNum meta.LargestSeqNum = writerMeta.LargestSeqNum + meta.InitPhysicalBacking() + // If the file didn't contain any range deletions, we can fill its // table stats now, avoiding unnecessarily loading the table later. - maybeSetStatsFromProperties(meta, &writerMeta.Properties) + maybeSetStatsFromProperties( + meta.PhysicalMeta(), &writerMeta.Properties, + ) if c.flushing == nil { outputMetrics.TablesCompacted++ @@ -3250,7 +3266,7 @@ func (d *DB) scanObsoleteFiles(list []string) { manifestFileNum := d.mu.versions.manifestFileNum var obsoleteLogs []fileInfo - var obsoleteTables []*fileMetadata + var obsoleteTables []fileInfo var obsoleteManifests []fileInfo var obsoleteOptions []fileInfo @@ -3301,13 +3317,13 @@ func (d *DB) scanObsoleteFiles(list []string) { if _, ok := liveFileNums[obj.FileNum]; ok { continue } - fileMeta := &fileMetadata{ - FileNum: obj.FileNum, + fileInfo := fileInfo{ + fileNum: obj.FileNum, } if size, err := d.objProvider.Size(obj); err == nil { - fileMeta.Size = uint64(size) + fileInfo.fileSize = uint64(size) } - obsoleteTables = append(obsoleteTables, fileMeta) + obsoleteTables = append(obsoleteTables, fileInfo) default: // Ignore object types we don't know about. @@ -3316,7 +3332,7 @@ func (d *DB) scanObsoleteFiles(list []string) { d.mu.log.queue = merge(d.mu.log.queue, obsoleteLogs) d.mu.versions.metrics.WAL.Files = int64(len(d.mu.log.queue)) - d.mu.versions.obsoleteTables = mergeFileMetas(d.mu.versions.obsoleteTables, obsoleteTables) + d.mu.versions.obsoleteTables = mergeFileInfo(d.mu.versions.obsoleteTables, obsoleteTables) d.mu.versions.updateObsoleteTableMetricsLocked() d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests) d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions) @@ -3432,12 +3448,7 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) { } } - for _, table := range d.mu.versions.obsoleteTables { - obsoleteTables = append(obsoleteTables, fileInfo{ - fileNum: table.FileNum, - fileSize: table.Size, - }) - } + obsoleteTables = append(obsoleteTables, d.mu.versions.obsoleteTables...) d.mu.versions.obsoleteTables = nil // Sort the manifests cause we want to delete some contiguous prefix @@ -3638,19 +3649,19 @@ func merge(a, b []fileInfo) []fileInfo { return a[:n] } -func mergeFileMetas(a, b []*fileMetadata) []*fileMetadata { +func mergeFileInfo(a, b []fileInfo) []fileInfo { if len(b) == 0 { return a } a = append(a, b...) sort.Slice(a, func(i, j int) bool { - return a[i].FileNum < a[j].FileNum + return a[i].fileNum < a[j].fileNum }) n := 0 for i := 0; i < len(a); i++ { - if n == 0 || a[i].FileNum != a[n-1].FileNum { + if n == 0 || a[i].fileNum != a[n-1].fileNum { a[n] = a[i] n++ } diff --git a/compaction_picker_test.go b/compaction_picker_test.go index 94d08a8fcb..b10ab0c95c 100644 --- a/compaction_picker_test.go +++ b/compaction_picker_test.go @@ -66,6 +66,7 @@ func loadVersion(d *datadriven.TestData) (*version, *Options, [numLevels]int64, LargestSeqNum: key.SeqNum(), Size: 1, }).ExtendPointKeyBounds(opts.Comparer.Compare, key, key) + m.InitPhysicalBacking() if size >= 100 { // If the requested size of the level is very large only add a single // file in order to avoid massive blow-up in the number of files in @@ -364,6 +365,7 @@ func TestCompactionPickerL0(t *testing.T) { ) m.SmallestSeqNum = m.Smallest.SeqNum() m.LargestSeqNum = m.Largest.SeqNum() + m.InitPhysicalBacking() return m, nil } @@ -595,6 +597,7 @@ func TestCompactionPickerConcurrency(t *testing.T) { base.ParseInternalKey(strings.TrimSpace(parts[0])), base.ParseInternalKey(strings.TrimSpace(parts[1])), ) + m.InitPhysicalBacking() for _, p := range fields[1:] { if strings.HasPrefix(p, "size=") { v, err := strconv.Atoi(strings.TrimPrefix(p, "size=")) @@ -812,6 +815,7 @@ func TestCompactionPickerPickReadTriggered(t *testing.T) { base.ParseInternalKey(strings.TrimSpace(parts[0])), base.ParseInternalKey(strings.TrimSpace(parts[1])), ) + m.InitPhysicalBacking() for _, p := range fields[1:] { if strings.HasPrefix(p, "size=") { v, err := strconv.Atoi(strings.TrimPrefix(p, "size=")) @@ -984,6 +988,7 @@ func TestPickedCompactionSetupInputs(t *testing.T) { ) m.SmallestSeqNum = m.Smallest.SeqNum() m.LargestSeqNum = m.Largest.SeqNum() + m.InitPhysicalBacking() return m } @@ -1120,6 +1125,7 @@ func TestPickedCompactionExpandInputs(t *testing.T) { base.ParseInternalKey(parts[0]), base.ParseInternalKey(parts[1]), ) + m.InitPhysicalBacking() return m } @@ -1203,6 +1209,7 @@ func TestCompactionOutputFileSize(t *testing.T) { base.ParseInternalKey(strings.TrimSpace(parts[0])), base.ParseInternalKey(strings.TrimSpace(parts[1])), ) + m.InitPhysicalBacking() for _, p := range fields[1:] { if strings.HasPrefix(p, "size=") { v, err := strconv.Atoi(strings.TrimPrefix(p, "size=")) @@ -1342,6 +1349,7 @@ func TestCompactionPickerCompensatedSize(t *testing.T) { for _, tc := range testCases { t.Run("", func(t *testing.T) { f := &fileMetadata{Size: tc.size} + f.InitPhysicalBacking() f.Stats.PointDeletionsBytesEstimate = tc.pointDelEstimateBytes f.Stats.RangeDeletionsBytesEstimate = tc.rangeDelEstimateBytes gotBytes := compensatedSize(f, tc.pointTombstoneWeight) diff --git a/compaction_test.go b/compaction_test.go index 51f6695a28..0588bdbc68 100644 --- a/compaction_test.go +++ b/compaction_test.go @@ -138,6 +138,7 @@ func TestPickCompaction(t *testing.T) { FileNum: fileNum, Size: size, }).ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest) + m.InitPhysicalBacking() return m } @@ -541,6 +542,7 @@ func TestElideTombstone(t *testing.T) { newFileMeta := func(smallest, largest base.InternalKey) *fileMetadata { m := (&fileMetadata{}).ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest) + m.InitPhysicalBacking() return m } @@ -692,6 +694,7 @@ func TestElideRangeTombstone(t *testing.T) { m := (&fileMetadata{}).ExtendPointKeyBounds( opts.Comparer.Compare, smallest, largest, ) + m.InitPhysicalBacking() return m } @@ -1098,6 +1101,7 @@ func TestValidateVersionEdit(t *testing.T) { cmp := DefaultComparer.Compare newFileMeta := func(smallest, largest base.InternalKey) *fileMetadata { m := (&fileMetadata{}).ExtendPointKeyBounds(cmp, smallest, largest) + m.InitPhysicalBacking() return m } @@ -1633,6 +1637,7 @@ func TestCompactionFindGrandparentLimit(t *testing.T) { InternalKey{UserKey: []byte(parts[0])}, InternalKey{UserKey: []byte(parts[1])}, ) + m.InitPhysicalBacking() return m } @@ -1735,6 +1740,7 @@ func TestCompactionFindL0Limit(t *testing.T) { m.Size = size } } + m.InitPhysicalBacking() return m, nil } @@ -1867,6 +1873,7 @@ func TestCompactionAtomicUnitBounds(t *testing.T) { base.ParseInternalKey(parts[0]), base.ParseInternalKey(parts[1]), ) + m.InitPhysicalBacking() return m } @@ -2525,6 +2532,7 @@ func TestCompactionInuseKeyRanges(t *testing.T) { ) m.SmallestSeqNum = m.Smallest.SeqNum() m.LargestSeqNum = m.Largest.SeqNum() + m.InitPhysicalBacking() return m } @@ -2639,6 +2647,7 @@ func TestCompactionInuseKeyRangesRandomized(t *testing.T) { ) m.SmallestSeqNum = m.Smallest.SeqNum() m.LargestSeqNum = m.Largest.SeqNum() + m.InitPhysicalBacking() return m } overlaps := func(startA, endA, startB, endB []byte) bool { @@ -2737,6 +2746,7 @@ func TestCompactionAllowZeroSeqNum(t *testing.T) { InternalKey{UserKey: []byte(match[2])}, InternalKey{UserKey: []byte(match[3])}, ) + meta.InitPhysicalBacking() return level, meta } @@ -2842,6 +2852,7 @@ func TestCompactionErrorOnUserKeyOverlap(t *testing.T) { ) m.SmallestSeqNum = m.Smallest.SeqNum() m.LargestSeqNum = m.Largest.SeqNum() + m.InitPhysicalBacking() return m } @@ -2971,6 +2982,7 @@ func TestCompactionCheckOrdering(t *testing.T) { ) m.SmallestSeqNum = m.Smallest.SeqNum() m.LargestSeqNum = m.Largest.SeqNum() + m.InitPhysicalBacking() return m } @@ -3409,8 +3421,10 @@ func TestAdjustGrandparentOverlapBytesForFlush(t *testing.T) { var lbaseFiles []*manifest.FileMetadata const lbaseSize = 5 << 20 for i := 0; i < 100; i++ { + m := &manifest.FileMetadata{Size: lbaseSize, FileNum: FileNum(i)} + m.InitPhysicalBacking() lbaseFiles = - append(lbaseFiles, &manifest.FileMetadata{Size: lbaseSize, FileNum: FileNum(i)}) + append(lbaseFiles, m) } const maxOutputFileSize = 2 << 20 // 20MB max overlap, so flush split into 25 files. @@ -3455,6 +3469,14 @@ func TestCompactionInvalidBounds(t *testing.T) { func Test_calculateInuseKeyRanges(t *testing.T) { opts := (*Options)(nil).EnsureDefaults() cmp := base.DefaultComparer.Compare + newFileMeta := func(fileNum FileNum, size uint64, smallest, largest base.InternalKey) *fileMetadata { + m := (&fileMetadata{ + FileNum: fileNum, + Size: size, + }).ExtendPointKeyBounds(opts.Comparer.Compare, smallest, largest) + m.InitPhysicalBacking() + return m + } tests := []struct { name string v *version @@ -3468,18 +3490,18 @@ func Test_calculateInuseKeyRanges(t *testing.T) { name: "No files in next level", v: newVersion(opts, [numLevels][]*fileMetadata{ 1: { - { - FileNum: 1, - Size: 1, - Smallest: base.ParseInternalKey("a.SET.2"), - Largest: base.ParseInternalKey("c.SET.2"), - }, - { - FileNum: 2, - Size: 1, - Smallest: base.ParseInternalKey("d.SET.2"), - Largest: base.ParseInternalKey("e.SET.2"), - }, + newFileMeta( + 1, + 1, + base.ParseInternalKey("a.SET.2"), + base.ParseInternalKey("c.SET.2"), + ), + newFileMeta( + 2, + 1, + base.ParseInternalKey("d.SET.2"), + base.ParseInternalKey("e.SET.2"), + ), }, }), level: 1, @@ -3501,32 +3523,32 @@ func Test_calculateInuseKeyRanges(t *testing.T) { name: "No overlapping key ranges", v: newVersion(opts, [numLevels][]*fileMetadata{ 1: { - { - FileNum: 1, - Size: 1, - Smallest: base.ParseInternalKey("a.SET.1"), - Largest: base.ParseInternalKey("c.SET.1"), - }, - { - FileNum: 2, - Size: 1, - Smallest: base.ParseInternalKey("l.SET.1"), - Largest: base.ParseInternalKey("p.SET.1"), - }, + newFileMeta( + 1, + 1, + base.ParseInternalKey("a.SET.1"), + base.ParseInternalKey("c.SET.1"), + ), + newFileMeta( + 2, + 1, + base.ParseInternalKey("l.SET.1"), + base.ParseInternalKey("p.SET.1"), + ), }, 2: { - { - FileNum: 3, - Size: 1, - Smallest: base.ParseInternalKey("d.SET.1"), - Largest: base.ParseInternalKey("i.SET.1"), - }, - { - FileNum: 4, - Size: 1, - Smallest: base.ParseInternalKey("s.SET.1"), - Largest: base.ParseInternalKey("w.SET.1"), - }, + newFileMeta( + 3, + 1, + base.ParseInternalKey("d.SET.1"), + base.ParseInternalKey("i.SET.1"), + ), + newFileMeta( + 4, + 1, + base.ParseInternalKey("s.SET.1"), + base.ParseInternalKey("w.SET.1"), + ), }, }), level: 1, @@ -3556,44 +3578,44 @@ func Test_calculateInuseKeyRanges(t *testing.T) { name: "First few non-overlapping, followed by overlapping", v: newVersion(opts, [numLevels][]*fileMetadata{ 1: { - { - FileNum: 1, - Size: 1, - Smallest: base.ParseInternalKey("a.SET.1"), - Largest: base.ParseInternalKey("c.SET.1"), - }, - { - FileNum: 2, - Size: 1, - Smallest: base.ParseInternalKey("d.SET.1"), - Largest: base.ParseInternalKey("e.SET.1"), - }, - { - FileNum: 3, - Size: 1, - Smallest: base.ParseInternalKey("n.SET.1"), - Largest: base.ParseInternalKey("o.SET.1"), - }, - { - FileNum: 4, - Size: 1, - Smallest: base.ParseInternalKey("p.SET.1"), - Largest: base.ParseInternalKey("q.SET.1"), - }, + newFileMeta( + 1, + 1, + base.ParseInternalKey("a.SET.1"), + base.ParseInternalKey("c.SET.1"), + ), + newFileMeta( + 2, + 1, + base.ParseInternalKey("d.SET.1"), + base.ParseInternalKey("e.SET.1"), + ), + newFileMeta( + 3, + 1, + base.ParseInternalKey("n.SET.1"), + base.ParseInternalKey("o.SET.1"), + ), + newFileMeta( + 4, + 1, + base.ParseInternalKey("p.SET.1"), + base.ParseInternalKey("q.SET.1"), + ), }, 2: { - { - FileNum: 5, - Size: 1, - Smallest: base.ParseInternalKey("m.SET.1"), - Largest: base.ParseInternalKey("q.SET.1"), - }, - { - FileNum: 6, - Size: 1, - Smallest: base.ParseInternalKey("s.SET.1"), - Largest: base.ParseInternalKey("w.SET.1"), - }, + newFileMeta( + 5, + 1, + base.ParseInternalKey("m.SET.1"), + base.ParseInternalKey("q.SET.1"), + ), + newFileMeta( + 6, + 1, + base.ParseInternalKey("s.SET.1"), + base.ParseInternalKey("w.SET.1"), + ), }, }), level: 1, @@ -3623,38 +3645,38 @@ func Test_calculateInuseKeyRanges(t *testing.T) { name: "All overlapping", v: newVersion(opts, [numLevels][]*fileMetadata{ 1: { - { - FileNum: 1, - Size: 1, - Smallest: base.ParseInternalKey("d.SET.1"), - Largest: base.ParseInternalKey("e.SET.1"), - }, - { - FileNum: 2, - Size: 1, - Smallest: base.ParseInternalKey("n.SET.1"), - Largest: base.ParseInternalKey("o.SET.1"), - }, - { - FileNum: 3, - Size: 1, - Smallest: base.ParseInternalKey("p.SET.1"), - Largest: base.ParseInternalKey("q.SET.1"), - }, + newFileMeta( + 1, + 1, + base.ParseInternalKey("d.SET.1"), + base.ParseInternalKey("e.SET.1"), + ), + newFileMeta( + 2, + 1, + base.ParseInternalKey("n.SET.1"), + base.ParseInternalKey("o.SET.1"), + ), + newFileMeta( + 3, + 1, + base.ParseInternalKey("p.SET.1"), + base.ParseInternalKey("q.SET.1"), + ), }, 2: { - { - FileNum: 4, - Size: 1, - Smallest: base.ParseInternalKey("a.SET.1"), - Largest: base.ParseInternalKey("c.SET.1"), - }, - { - FileNum: 5, - Size: 1, - Smallest: base.ParseInternalKey("d.SET.1"), - Largest: base.ParseInternalKey("w.SET.1"), - }, + newFileMeta( + 4, + 1, + base.ParseInternalKey("a.SET.1"), + base.ParseInternalKey("c.SET.1"), + ), + newFileMeta( + 5, + 1, + base.ParseInternalKey("d.SET.1"), + base.ParseInternalKey("w.SET.1"), + ), }, }), level: 1, diff --git a/data_test.go b/data_test.go index 5e11cfcf11..4bfddd07d1 100644 --- a/data_test.go +++ b/data_test.go @@ -909,6 +909,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) { InternalKey{UserKey: []byte(parts[0])}, InternalKey{UserKey: []byte(parts[1])}, ) + m.InitPhysicalBacking() return m, nil } diff --git a/db.go b/db.go index 23391ad641..bb3e4336bf 100644 --- a/db.go +++ b/db.go @@ -468,7 +468,8 @@ type DB struct { // job to validate one or more sstables. cond sync.Cond // pending is a slice of metadata for sstables waiting to be - // validated. + // validated. Only physical sstables should be added to the pending + // queue. pending []newFileEntry // validating is set to true when validation is running. validating bool @@ -1807,8 +1808,15 @@ func WithProperties() SSTablesOption { // SSTableInfo export manifest.TableInfo with sstable.Properties type SSTableInfo struct { manifest.TableInfo - - // Properties is the sstable properties of this table. + // Virtual indicates whether the sstable is virtual. + Virtual bool + // BackingSSTNum is the file number associated with backing sstable which + // backs the sstable associated with this SSTableInfo. If Virtual is false, + // then BackingSSTNum == FileNum. + BackingSSTNum base.FileNum + + // Properties is the sstable properties of this table. If Virtual is true, + // then the Properties are associated with the backing sst. Properties *sstable.Properties } @@ -1844,12 +1852,16 @@ func (d *DB) SSTables(opts ...SSTablesOption) ([][]SSTableInfo, error) { for m := iter.First(); m != nil; m = iter.Next() { destTables[j] = SSTableInfo{TableInfo: m.TableInfo()} if opt.withProperties { - p, err := d.tableCache.getTableProperties(m) + p, err := d.tableCache.getTableProperties( + m.PhysicalMeta(), + ) if err != nil { return nil, err } destTables[j].Properties = p } + destTables[j].Virtual = m.Virtual + destTables[j].BackingSSTNum = m.FileBacking.FileNum j++ } destLevels[i] = destTables[:j] @@ -1902,10 +1914,13 @@ func (d *DB) EstimateDiskUsage(start, end []byte) (uint64, error) { } else if d.opts.Comparer.Compare(file.Smallest.UserKey, end) <= 0 && d.opts.Comparer.Compare(start, file.Largest.UserKey) <= 0 { var size uint64 - err := d.tableCache.withReader(file, func(r *sstable.Reader) (err error) { - size, err = r.EstimateDiskUsage(start, end) - return err - }) + err := d.tableCache.withReader( + file, + func(r *sstable.Reader) (err error) { + size, err = r.EstimateDiskUsage(start, end) + return err + }, + ) if err != nil { return 0, err } diff --git a/flushable.go b/flushable.go index 13fca5fad8..0a5b2ae2bf 100644 --- a/flushable.go +++ b/flushable.go @@ -65,11 +65,11 @@ type flushableEntry struct { releaseMemAccounting func() // unrefFiles, if not nil, should be invoked to decrease the ref count of // files which are backing the flushable. - unrefFiles func() []*fileMetadata + unrefFiles func() []*fileBacking // deleteFnLocked should be called if the caller is holding DB.mu. - deleteFnLocked func(obsolete []*fileMetadata) + deleteFnLocked func(obsolete []*fileBacking) // deleteFn should be called if the caller is not holding DB.mu. - deleteFn func(obsolete []*fileMetadata) + deleteFn func(obsolete []*fileBacking) } func (e *flushableEntry) readerRef() { @@ -90,7 +90,7 @@ func (e *flushableEntry) readerUnrefLocked(deleteFiles bool) { } func (e *flushableEntry) readerUnrefHelper( - deleteFiles bool, deleteFn func(obsolete []*manifest.FileMetadata), + deleteFiles bool, deleteFn func(obsolete []*fileBacking), ) { switch v := atomic.AddInt32(&e.readerRefs, -1); { case v < 0: @@ -116,7 +116,7 @@ type flushableList []*flushableEntry // ingestedFlushable is the implementation of the flushable interface for the // ingesting sstables which are added to the flushable list. type ingestedFlushable struct { - files []*fileMetadata + files []physicalMeta cmp Compare split Split newIters tableNewIters @@ -136,21 +136,24 @@ func newIngestedFlushable( newIters tableNewIters, newRangeKeyIters keyspan.TableNewSpanIter, ) *ingestedFlushable { + var physicalFiles []physicalMeta + var hasRangeKeys bool + for _, f := range files { + if f.HasRangeKeys { + hasRangeKeys = true + } + physicalFiles = append(physicalFiles, f.PhysicalMeta()) + } + ret := &ingestedFlushable{ - files: files, + files: physicalFiles, cmp: cmp, split: split, newIters: newIters, newRangeKeyIters: newRangeKeyIters, // slice is immutable and can be set once and used many times. - slice: manifest.NewLevelSliceKeySorted(cmp, files), - } - - for _, f := range files { - if f.HasRangeKeys { - ret.hasRangeKeys = true - break - } + slice: manifest.NewLevelSliceKeySorted(cmp, files), + hasRangeKeys: hasRangeKeys, } return ret diff --git a/format_major_version_test.go b/format_major_version_test.go index 0e1078c1b8..28a4312643 100644 --- a/format_major_version_test.go +++ b/format_major_version_test.go @@ -439,14 +439,15 @@ func TestPebblev1Migration(t *testing.T) { for _, l := range v.Levels { iter := l.Iter() for m := iter.First(); m != nil; m = iter.Next() { - err := d.tableCache.withReader(m, func(r *sstable.Reader) error { - f, err := r.TableFormat() - if err != nil { - return err - } - tally[f]++ - return nil - }) + err := d.tableCache.withReader(m, + func(r *sstable.Reader) error { + f, err := r.TableFormat() + if err != nil { + return err + } + tally[f]++ + return nil + }) if err != nil { return err.Error() } diff --git a/get_iter_test.go b/get_iter_test.go index 1683c91a62..668454290e 100644 --- a/get_iter_test.go +++ b/get_iter_test.go @@ -490,6 +490,7 @@ func TestGetIter(t *testing.T) { meta := &fileMetadata{ FileNum: tt.fileNum, } + meta.InitPhysicalBacking() for i, datum := range tt.data { s := strings.Split(datum, " ") ikey := base.ParseInternalKey(s[0]) diff --git a/ingest.go b/ingest.go index 55812b6f7f..2b34c5282f 100644 --- a/ingest.go +++ b/ingest.go @@ -7,7 +7,6 @@ package pebble import ( "context" "sort" - "sync/atomic" "time" "github.com/cockroachdb/errors" @@ -82,6 +81,7 @@ func ingestLoad1( meta.FileNum = fileNum meta.Size = uint64(readable.Size()) meta.CreationTime = time.Now().Unix() + meta.InitPhysicalBacking() // Avoid loading into the table cache for collecting stats if we // don't need to. If there are no range deletions, we have all the @@ -92,7 +92,7 @@ func ingestLoad1( // disallowing removal of an open file. Under MemFS, if we don't populate // meta.Stats here, the file will be loaded into the table cache for // calculating stats before we can remove the original link. - maybeSetStatsFromProperties(meta, &r.Properties) + maybeSetStatsFromProperties(meta.PhysicalMeta(), &r.Properties) { iter, err := r.NewIter(nil /* lower */, nil /* upper */) @@ -703,13 +703,13 @@ func (d *DB) newIngestedFlushableEntry( // The flushable entry starts off with a single reader ref, so increment // the FileMetadata.Refs. for _, file := range f.files { - atomic.AddInt32(&file.Refs, 1) + file.Ref() } - entry.unrefFiles = func() []*fileMetadata { - var obsolete []*fileMetadata + entry.unrefFiles = func() []*fileBacking { + var obsolete []*fileBacking for _, file := range f.files { - if val := atomic.AddInt32(&file.Refs, -1); val == 0 { - obsolete = append(obsolete, file) + if file.Unref() == 0 { + obsolete = append(obsolete, file.FileMetadata.FileBacking) } } return obsolete @@ -1021,12 +1021,21 @@ func (d *DB) ingestApply( // maybeValidateSSTablesLocked adds the slice of newFileEntrys to the pending // queue of files to be validated, when the feature is enabled. // DB.mu must be locked when calling. +// +// TODO(bananabrick): Make sure that the ingestion step only passes in the +// physical sstables for validation here. func (d *DB) maybeValidateSSTablesLocked(newFiles []newFileEntry) { // Only add to the validation queue when the feature is enabled. if !d.opts.Experimental.ValidateOnIngest { return } + for _, f := range newFiles { + if f.Meta.Virtual { + panic("pebble: invalid call to maybeValidateSSTablesLocked") + } + } + d.mu.tableValidation.pending = append(d.mu.tableValidation.pending, newFiles...) if d.shouldValidateSSTablesLocked() { go d.validateSSTables() @@ -1086,9 +1095,10 @@ func (d *DB) validateSSTables() { } } - err := d.tableCache.withReader(f.Meta, func(r *sstable.Reader) error { - return r.ValidateBlockChecksums() - }) + err := d.tableCache.withReader( + f.Meta, func(r *sstable.Reader) error { + return r.ValidateBlockChecksums() + }) if err != nil { // TODO(travers): Hook into the corruption reporting pipeline, once // available. See pebble#1192. diff --git a/ingest_test.go b/ingest_test.go index 2e41654b19..fdd9d6a8fb 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -199,6 +199,7 @@ func TestIngestLoadRand(t *testing.T) { require.NoError(t, err) expected[i].Size = meta.Size + expected[i].InitPhysicalBacking() }() } @@ -264,6 +265,7 @@ func TestIngestSortAndVerify(t *testing.T) { return fmt.Sprintf("range %v-%v is not valid", smallest, largest) } m := (&fileMetadata{}).ExtendPointKeyBounds(cmp, smallest, largest) + m.InitPhysicalBacking() meta = append(meta, m) paths = append(paths, strconv.Itoa(i)) } @@ -305,6 +307,7 @@ func TestIngestLink(t *testing.T) { paths[j] = fmt.Sprintf("external%d", j) meta[j] = &fileMetadata{} meta[j].FileNum = FileNum(j) + meta[j].InitPhysicalBacking() f, err := opts.FS.Create(paths[j]) require.NoError(t, err) @@ -609,6 +612,7 @@ func TestIngestMemtableOverlaps(t *testing.T) { smallest, largest = largest, smallest } meta.ExtendPointKeyBounds(comparer.Compare, smallest, largest) + meta.InitPhysicalBacking() return meta } @@ -736,6 +740,7 @@ func TestIngestTargetLevel(t *testing.T) { InternalKey{UserKey: []byte(parts[1])}, ) } + m.InitPhysicalBacking() return m } @@ -1608,6 +1613,7 @@ func TestIngest_UpdateSequenceNumber(t *testing.T) { maybeUpdateUpperBound(wm.LargestRangeKey), ) } + m.InitPhysicalBacking() if err := m.Validate(cmp, base.DefaultFormatter); err != nil { return err.Error() } diff --git a/internal/keyspan/level_iter_test.go b/internal/keyspan/level_iter_test.go index 70f6dee72e..fe3cd33729 100644 --- a/internal/keyspan/level_iter_test.go +++ b/internal/keyspan/level_iter_test.go @@ -294,6 +294,7 @@ func TestLevelIterEquivalence(t *testing.T) { HasPointKeys: false, HasRangeKeys: true, } + meta.InitPhysicalBacking() meta.ExtendRangeKeyBounds(base.DefaultComparer.Compare, meta.SmallestRangeKey, meta.LargestRangeKey) metas = append(metas, meta) } @@ -376,6 +377,7 @@ func TestLevelIter(t *testing.T) { if len(pointKeys) != 0 { meta.ExtendPointKeyBounds(base.DefaultComparer.Compare, pointKeys[0], pointKeys[len(pointKeys)-1]) } + meta.InitPhysicalBacking() level = append(level, currentFile) metas = append(metas, meta) rangedels = append(rangedels, currentRangeDels) @@ -403,6 +405,7 @@ func TestLevelIter(t *testing.T) { meta := &manifest.FileMetadata{ FileNum: base.FileNum(len(level) + 1), } + meta.InitPhysicalBacking() level = append(level, currentFile) rangedels = append(rangedels, currentRangeDels) if len(currentFile) > 0 { diff --git a/internal/manifest/btree.go b/internal/manifest/btree.go index 66464c98cb..1a99de7953 100644 --- a/internal/manifest/btree.go +++ b/internal/manifest/btree.go @@ -207,7 +207,7 @@ func (n *node) decRef(contentsToo bool, obsolete *[]*FileMetadata) { // nodes, and they want to preserve the existing reference count. if contentsToo { for _, f := range n.items[:n.count] { - if atomic.AddInt32(&f.Refs, -1) == 0 { + if f.Unref() == 0 { // There are two sources of node dereferences: tree mutations // and Version dereferences. Files should only be made obsolete // during Version dereferences, during which `obsolete` will be @@ -241,7 +241,7 @@ func (n *node) clone() *node { c.subtreeCount = n.subtreeCount // Increase the refcount of each contained item. for _, f := range n.items[:n.count] { - atomic.AddInt32(&f.Refs, 1) + f.Ref() } if !c.leaf { // Copy children and increase each refcount. @@ -800,7 +800,7 @@ func (t *btree) Delete(item *FileMetadata) (obsolete bool) { return false } if out := mut(&t.root).Remove(t.cmp, item); out != nil { - obsolete = atomic.AddInt32(&out.Refs, -1) == 0 + obsolete = out.Unref() == 0 } if invariants.Enabled { t.root.verifyInvariants() @@ -832,7 +832,7 @@ func (t *btree) Insert(item *FileMetadata) error { newRoot.subtreeCount = t.root.subtreeCount + splitNode.subtreeCount + 1 t.root = newRoot } - atomic.AddInt32(&item.Refs, 1) + item.Ref() err := mut(&t.root).Insert(t.cmp, item) if invariants.Enabled { t.root.verifyInvariants() diff --git a/internal/manifest/btree_test.go b/internal/manifest/btree_test.go index 2cf03e870e..877e2dde65 100644 --- a/internal/manifest/btree_test.go +++ b/internal/manifest/btree_test.go @@ -22,6 +22,7 @@ func newItem(k InternalKey) *FileMetadata { m := (&FileMetadata{}).ExtendPointKeyBounds( base.DefaultComparer.Compare, k, k, ) + m.InitPhysicalBacking() return m } @@ -596,6 +597,7 @@ func TestRandomizedBTree(t *testing.T) { var metadataAlloc [maxFileNum]FileMetadata for i := 0; i < len(metadataAlloc); i++ { metadataAlloc[i].FileNum = base.FileNum(i) + metadataAlloc[i].InitPhysicalBacking() } // Use a btree comparator that sorts by file number to make it easier to diff --git a/internal/manifest/l0_sublevels_test.go b/internal/manifest/l0_sublevels_test.go index 414868cfa7..44b723ca8e 100644 --- a/internal/manifest/l0_sublevels_test.go +++ b/internal/manifest/l0_sublevels_test.go @@ -229,7 +229,7 @@ func TestL0Sublevels(t *testing.T) { } m.FileNum = base.FileNum(fileNum) m.Size = uint64(256) - + m.InitPhysicalBacking() if len(fields) > 1 { for _, field := range fields[1:] { parts := strings.Split(field, "=") @@ -592,6 +592,7 @@ func TestAddL0FilesEquivalence(t *testing.T) { base.MakeInternalKey(startKey, uint64(2*i+1), base.InternalKeyKindSet), base.MakeRangeDeleteSentinelKey(endKey), ) + meta.InitPhysicalBacking() fileMetas = append(fileMetas, meta) filesToAdd = append(filesToAdd, meta) } diff --git a/internal/manifest/level_metadata_test.go b/internal/manifest/level_metadata_test.go index 4e53368633..6bfe0b21f9 100644 --- a/internal/manifest/level_metadata_test.go +++ b/internal/manifest/level_metadata_test.go @@ -47,6 +47,7 @@ func TestLevelIterator(t *testing.T) { ) m.SmallestSeqNum = m.Smallest.SeqNum() m.LargestSeqNum = m.Largest.SeqNum() + m.InitPhysicalBacking() files = append(files, m) } } diff --git a/internal/manifest/version.go b/internal/manifest/version.go index ed8f4f1f2a..212bec9833 100644 --- a/internal/manifest/version.go +++ b/internal/manifest/version.go @@ -121,7 +121,33 @@ func (s CompactionState) String() string { } } -// FileMetadata holds the metadata for an on-disk table. +// FileMetadata is maintained for leveled-ssts, i.e., they belong to a level of +// some version. FileMetadata does not contain the actual level of the sst, +// since such leveled-ssts can move across levels in different versions, while +// sharing the same FileMetadata. There are two kinds of leveled-ssts, physical +// and virtual. Underlying both leveled-ssts is a backing-sst, for which the +// only state is FileBacking. A backing-sst is level-less. It is possible for a +// backing-sst to be referred to by a physical sst in one version and by one or +// more virtual ssts in one or more versions. A backing-sst becomes obsolete +// and can be deleted once it is no longer required by any physical or virtual +// sst in any version. +// +// We maintain some invariants: +// +// 1. Each physical and virtual sst will have a unique FileMetadata.FileNum, +// and there will be exactly one FileMetadata associated with the FileNum. +// +// 2. Within a version, a backing-sst is either only referred to by one +// physical sst or one or more virtual ssts. +// +// 3. Once a backing-sst is referred to by a virtual sst in the latest version, +// it cannot go back to being referred to by a physical sst in any future +// version. +// +// Once a physical sst is no longer needed by any version, we will no longer +// maintain the file metadata associated with it. We will still maintain the +// FileBacking associated with the physical sst if the backing sst is required +// by any virtual ssts in any version. type FileMetadata struct { // Atomic contains fields which are accessed atomically. Go allocations // are guaranteed to be 64-bit aligned which we take advantage of by @@ -140,24 +166,43 @@ type FileMetadata struct { statsValid uint32 } + // FileBacking is the state which backs either a physical or virtual + // sstables. + FileBacking *FileBacking + // InitAllowedSeeks is the inital value of allowed seeks. This is used // to re-set allowed seeks on a file once it hits 0. InitAllowedSeeks int64 - - // Reference count for the file: incremented when a file is added to a - // version and decremented when the version is unreferenced. The file is - // obsolete when the reference count falls to zero. - Refs int32 // FileNum is the file number. + // + // INVARIANT: when !FileMetadata.Virtual, FileNum == FileBacking.FileNum. + // + // TODO(bananabrick): Consider creating separate types for + // FileMetadata.FileNum and FileBacking.FileNum. FileNum is used both as + // an indentifier for the FileMetadata in Pebble, and also as a handle to + // perform reads and writes. We should ensure through types that + // FileMetadata.FileNum isn't used to perform reads, and that + // FileBacking.FileNum isn't used as an identifier for the FileMetadata. FileNum base.FileNum - // Size is the size of the file, in bytes. + // Size is the size of the file, in bytes. Size is an approximate value for + // virtual sstables. + // + // INVARIANT: when !FileMetadata.Virtual, Size == FileBacking.Size. + // + // TODO(bananabrick): Size is currently used in metrics, and for many key + // Pebble level heuristics. Make sure that the heuristics will still work + // appropriately with an approximate value of size. Size uint64 // File creation time in seconds since the epoch (1970-01-01 00:00:00 // UTC). For ingested sstables, this corresponds to the time the file was - // ingested. + // ingested. For virtual sstables, this corresponds to the wall clock time + // when the FileMetadata for the virtual sstable was first created. CreationTime int64 - // Smallest and largest sequence numbers in the table, across both point and - // range keys. + // Lower and upper bounds for the smallest and largest sequence numbers in + // the table, across both point and range keys. For physical sstables, these + // values are tight bounds. For virtual sstables, there is no guarantee that + // there will be keys with SmallestSeqNum or LargestSeqNum within virtual + // sstable bounds. SmallestSeqNum uint64 LargestSeqNum uint64 // SmallestPointKey and LargestPointKey are the inclusive bounds for the @@ -180,6 +225,13 @@ type FileMetadata struct { Smallest InternalKey Largest InternalKey // Stats describe table statistics. Protected by DB.mu. + // + // For virtual sstables, set stats upon virtual sstable creation as + // asynchronous computation of stats is not currently supported. + // + // TODO(bananabrick): To support manifest replay for virtual sstables, we + // probably need to compute virtual sstable stats asynchronously. Otherwise, + // we'd have to write virtual sstable stats to the version edit. Stats TableStats SubLevel int @@ -228,6 +280,171 @@ type FileMetadata struct { // key type (point or range) corresponds to the smallest and largest overall // table bounds. boundTypeSmallest, boundTypeLargest boundType + // Virtual is true if the FileMetadata belongs to a virtual sstable. + Virtual bool +} + +// PhysicalFileMeta is used by functions which want a guarantee that their input +// belongs to a physical sst and not a virtual sst. +// +// NB: This type should only be constructed by calling +// FileMetadata.PhysicalMeta. +type PhysicalFileMeta struct { + *FileMetadata +} + +// VirtualFileMeta is used by functions which want a guarantee that their input +// belongs to a virtual sst and not a physical sst. +// +// NB: This type should only be constructed by calling FileMetadata.VirtualMeta. +type VirtualFileMeta struct { + *FileMetadata +} + +// PhysicalMeta should be the only source of creating the PhysicalFileMeta +// wrapper type. +func (m *FileMetadata) PhysicalMeta() PhysicalFileMeta { + if m.Virtual { + panic("pebble: file metadata does not belong to a physical sstable") + } + return PhysicalFileMeta{ + m, + } +} + +// VirtualMeta should be the only source of creating the VirtualFileMeta wrapper +// type. +func (m *FileMetadata) VirtualMeta() VirtualFileMeta { + if !m.Virtual { + panic("pebble: file metadata does not belong to a virtual sstable") + } + return VirtualFileMeta{ + m, + } +} + +// FileBacking either backs a single physical sstable, or one or more virtual +// sstables. +// +// See the comment above the FileMetadata type for sstable terminology. +type FileBacking struct { + Atomic struct { + // Reference count for the backing file on disk: incremented when a + // physical or virtual sstable which is backed by the FileBacking is + // added to a version and decremented when the version is unreferenced. + // We ref count in order to determine when it is safe to delete a + // backing sst file from disk. The backing file is obsolete when the + // reference count falls to zero. + refs atomic.Int32 + // latestVersionRefs are the references to the FileBacking in the + // latest version. This reference can be through a single physical + // sstable in the latest version, or one or more virtual sstables in the + // latest version. + // + // INVARIANT: latestVersionRefs <= refs. + latestVersionRefs atomic.Int32 + // VirtualizedSize is set iff the backing sst is only referred to by + // virtual ssts in the latest version. VirtualizedSize is the sum of the + // virtual sstable sizes of all of the virtual sstables in the latest + // version which are backed by the physical sstable. When a virtual + // sstable is removed from the latest version, we will decrement the + // VirtualizedSize. During compaction picking, we'll compensate a + // virtual sstable file size by + // (FileBacking.Size - FileBacking.VirtualizedSize) / latestVersionRefs. + // The intuition is that if FileBacking.Size - FileBacking.VirtualizedSize + // is high, then the space amplification due to virtual sstables is + // high, and we should pick the virtual sstable with a higher priority. + // + // TODO(bananabrick): Compensate the virtual sstable file size using + // the VirtualizedSize during compaction picking and test. + VirtualizedSize atomic.Uint64 + } + FileNum base.FileNum + Size uint64 +} + +// InitPhysicalBacking allocates and sets the FileBacking which is required by a +// physical sstable FileMetadata. +// +// Ensure that the state required by FileBacking, such as the FileNum, is +// already set on the FileMetadata before InitPhysicalBacking is called. +// Calling InitPhysicalBacking only after the relevant state has been set in the +// FileMetadata is not necessary in tests which don't rely on FileBacking. +func (m *FileMetadata) InitPhysicalBacking() { + if m.Virtual { + panic("pebble: virtual sstables should use a pre-existing FileBacking") + } + if m.FileBacking == nil { + m.FileBacking = &FileBacking{Size: m.Size, FileNum: m.FileNum} + } +} + +// ValidateVirtual should be called once the FileMetadata for a virtual sstable +// is created to verify that the fields of the virtual sstable are sound. +func (m *FileMetadata) ValidateVirtual(createdFrom *FileMetadata) { + if !m.Virtual { + panic("pebble: invalid virtual sstable") + } + + if createdFrom.SmallestSeqNum != m.SmallestSeqNum { + panic("pebble: invalid smallest sequence number for virtual sstable") + } + + if createdFrom.LargestSeqNum != m.LargestSeqNum { + panic("pebble: invalid largest sequence number for virtual sstable") + } + + if createdFrom.FileBacking != nil && createdFrom.FileBacking != m.FileBacking { + panic("pebble: invalid physical sstable state for virtual sstable") + } +} + +// Refs returns the refcount of backing sstable. +func (m *FileMetadata) Refs() int32 { + return m.FileBacking.Atomic.refs.Load() +} + +// Ref increments the ref count associated with the backing sstable. +func (m *FileMetadata) Ref() { + m.FileBacking.Atomic.refs.Add(1) +} + +// Unref decrements the ref count associated with the backing sstable. +func (m *FileMetadata) Unref() int32 { + v := m.FileBacking.Atomic.refs.Add(-1) + if invariants.Enabled && v < 0 { + panic("pebble: invalid FileMetadata refcounting") + } + return v +} + +// LatestRef increments the latest ref count associated with the backing +// sstable. +func (m *FileMetadata) LatestRef() { + m.FileBacking.Atomic.latestVersionRefs.Add(1) + + if m.Virtual { + m.FileBacking.Atomic.VirtualizedSize.Add(m.Size) + } +} + +// LatestUnref decrements the latest ref count associated with the backing +// sstable. +func (m *FileMetadata) LatestUnref() int32 { + if m.Virtual { + m.FileBacking.Atomic.VirtualizedSize.Add(-m.Size) + } + + v := m.FileBacking.Atomic.latestVersionRefs.Add(-1) + if invariants.Enabled && v < 0 { + panic("pebble: invalid FileMetadata latest refcounting") + } + return v +} + +// LatestRefs returns the latest ref count associated with the backing sstable. +func (m *FileMetadata) LatestRefs() int32 { + return m.FileBacking.Atomic.latestVersionRefs.Load() } // SetCompactionState transitions this file's compaction state to the given @@ -524,6 +741,7 @@ func ParseFileMetadataDebug(s string) (m FileMetadata, err error) { m.SmallestPointKey, m.LargestPointKey = m.Smallest, m.Largest m.HasPointKeys = true } + m.InitPhysicalBacking() return } @@ -586,6 +804,11 @@ func (m *FileMetadata) Validate(cmp Compare, formatKey base.FormatKey) error { } } + // Ensure that FileMetadata.Init was called. + if m.FileBacking == nil { + return base.CorruptionErrorf("file metadata FileBacking not set") + } + return nil } @@ -822,7 +1045,7 @@ type Version struct { // The callback to invoke when the last reference to a version is // removed. Will be called with list.mu held. - Deleted func(obsolete []*FileMetadata) + Deleted func(obsolete []*FileBacking) // Stats holds aggregated stats about the version maintained from // version to version. @@ -928,11 +1151,15 @@ func (v *Version) Ref() { // locked. func (v *Version) Unref() { if atomic.AddInt32(&v.refs, -1) == 0 { - obsolete := v.unrefFiles() l := v.list l.mu.Lock() l.Remove(v) - v.Deleted(obsolete) + obsolete := v.unrefFiles() + fileBacking := make([]*FileBacking, len(obsolete)) + for i, f := range obsolete { + fileBacking[i] = f.FileBacking + } + v.Deleted(fileBacking) l.mu.Unlock() } } @@ -944,7 +1171,12 @@ func (v *Version) Unref() { func (v *Version) UnrefLocked() { if atomic.AddInt32(&v.refs, -1) == 0 { v.list.Remove(v) - v.Deleted(v.unrefFiles()) + obsolete := v.unrefFiles() + fileBacking := make([]*FileBacking, len(obsolete)) + for i, f := range obsolete { + fileBacking[i] = f.FileBacking + } + v.Deleted(fileBacking) } } diff --git a/internal/manifest/version_edit.go b/internal/manifest/version_edit.go index be97faac76..3dee300b3e 100644 --- a/internal/manifest/version_edit.go +++ b/internal/manifest/version_edit.go @@ -108,9 +108,40 @@ type VersionEdit struct { // found that there was no overlapping file at the higher level). DeletedFiles map[DeletedFileEntry]*FileMetadata NewFiles []NewFileEntry + // CreatedBackingTables can be used to preserve the FileBacking associated + // with a physical sstable. This is useful when virtual sstables in the + // latest version are reconstructed during manifest replay, and we also need + // to reconstruct the FileBacking which is required by these virtual + // sstables. + // + // INVARIANT: The FileBacking associated with a physical sstable must only + // be added as a backing file in the same version edit where the physical + // sstable is first virtualized. This means that the physical sstable must + // be present in DeletedFiles and that there must be at least one virtual + // sstable with the same FileBacking as the physical sstable in NewFiles. A + // file must be present in CreatedBackingTables in exactly one version edit. + // The physical sstable associated with the FileBacking must also not be + // present in NewFiles. + CreatedBackingTables []*FileBacking + // RemovedBackingTables is used to remove the FileBacking associated with a + // virtual sstable. Note that a backing sstable can be removed as soon as + // there are no virtual sstables in the latest version which are using the + // backing sstable, but the backing sstable doesn't necessarily have to be + // removed atomically with the version edit which removes the last virtual + // sstable associated with the backing sstable. The removal can happen in a + // future version edit. + // + // INVARIANT: A file must only be added to RemovedBackingTables if it was + // added to CreateBackingTables in a prior version edit. The same version + // edit also cannot have the same file present in both CreateBackingTables + // and RemovedBackingTables. A file must be present in RemovedBackingTables + // in exactly one version edit. + RemovedBackingTables []base.FileNum } // Decode decodes an edit from the specified reader. +// +// TODO(bananabrick): Support decoding of virtual sstable state. func (v *VersionEdit) Decode(r io.Reader) error { br, ok := r.(byteReader) if !ok { @@ -343,6 +374,7 @@ func (v *VersionEdit) Decode(r io.Reader) error { } } m.boundsSet = true + m.InitPhysicalBacking() v.NewFiles = append(v.NewFiles, NewFileEntry{ Level: level, Meta: m, @@ -366,6 +398,8 @@ func (v *VersionEdit) Decode(r io.Reader) error { } // Encode encodes an edit to the specified writer. +// +// TODO(bananabrick): Support encoding of virtual sstable state. func (v *VersionEdit) Encode(w io.Writer) error { e := versionEditEncoder{new(bytes.Buffer)} @@ -556,6 +590,9 @@ type BulkVersionEdit struct { Added [NumLevels]map[base.FileNum]*FileMetadata Deleted [NumLevels]map[base.FileNum]*FileMetadata + AddedFileBacking []*FileBacking + RemovedFileBacking []base.FileNum + // AddedByFileNum maps file number to file metadata for all added files // from accumulated version edits. AddedByFileNum is only populated if set // to non-nil by a caller. It must be set to non-nil when replaying @@ -635,36 +672,72 @@ func (b *BulkVersionEdit) Accumulate(ve *VersionEdit) error { b.MarkedForCompactionCountDiff++ } } + + // Generate state for the backing files. + b.AddedFileBacking = append(b.AddedFileBacking, ve.CreatedBackingTables...) + + // Since a file can be removed from backing files in exactly one version + // edit it is safe to just append without any de-duplication. + b.RemovedFileBacking = append(b.RemovedFileBacking, ve.RemovedBackingTables...) + return nil } -// AccumulateAndApplySingleVE should be called if a single version edit is to be -// applied to the provided curr Version and if the caller needs to update the -// versionSet.zombieTables map. This function exists separately from +// AccumulateIncompleteAndApplySingleVE should be called if a single version edit +// is to be applied to the provided curr Version and if the caller needs to +// update the versionSet.zombieTables map. This function exists separately from // BulkVersionEdit.Apply because it is easier to reason about properties // regarding BulkVersionedit.Accumulate/Apply and zombie table generation, if we // know that exactly one version edit is being accumulated. // +// Note that the version edit passed into this function may be incomplete +// because compactions don't have the ref counting information necessary to +// populate VersionEdit.RemovedBackingTables. This function will complete such a +// version edit by populating RemovedBackingTables. +// // Invariant: Any file being deleted through ve must belong to the curr Version. // We can't have a delete for some arbitrary file which does not exist in curr. -func AccumulateAndApplySingleVE( +func AccumulateIncompleteAndApplySingleVE( ve *VersionEdit, curr *Version, cmp Compare, formatKey base.FormatKey, flushSplitBytes int64, readCompactionRate int64, + backingStateMap map[base.FileNum]*FileBacking, ) (_ *Version, zombies map[base.FileNum]uint64, _ error) { + if len(ve.RemovedBackingTables) != 0 { + panic("pebble: invalid incomplete version edit") + } var b BulkVersionEdit err := b.Accumulate(ve) if err != nil { return nil, nil, err } zombies = make(map[base.FileNum]uint64) - v, err := b.Apply(curr, cmp, formatKey, flushSplitBytes, readCompactionRate, zombies) + v, err := b.Apply( + curr, cmp, formatKey, flushSplitBytes, readCompactionRate, zombies, + ) if err != nil { return nil, nil, err } + + for _, s := range b.AddedFileBacking { + backingStateMap[s.FileNum] = s + } + + for fileNum := range zombies { + if _, ok := backingStateMap[fileNum]; ok { + // This table was backing some virtual sstable in the latest version, + // but is now a zombie. We add RemovedBackingTables entries for + // these, before the version edit is written to disk. + ve.RemovedBackingTables = append( + ve.RemovedBackingTables, fileNum, + ) + delete(backingStateMap, fileNum) + } + } + return v, zombies, nil } @@ -686,14 +759,14 @@ func (b *BulkVersionEdit) Apply( readCompactionRate int64, zombies map[base.FileNum]uint64, ) (*Version, error) { - addZombie := func(fileNum base.FileNum, size uint64) { + addZombie := func(state *FileBacking) { if zombies != nil { - zombies[fileNum] = size + zombies[state.FileNum] = state.Size } } - removeZombie := func(fileNum base.FileNum) { + removeZombie := func(state *FileBacking) { if zombies != nil { - delete(zombies, fileNum) + delete(zombies, state.FileNum) } } @@ -753,7 +826,6 @@ func (b *BulkVersionEdit) Apply( // level. for _, f := range deletedFilesMap { - addZombie(f.FileNum, f.Size) if obsolete := v.Levels[level].tree.Delete(f); obsolete { // Deleting a file from the B-Tree may decrement its // reference count. However, because we cloned the @@ -772,6 +844,23 @@ func (b *BulkVersionEdit) Apply( return nil, err } } + + // Note that a backing sst will only become a zombie if the + // references to it in the latest version is 0. We will remove the + // backing sst from the zombie list in the next loop if one of the + // addedFiles in any of the levels is referencing the backing sst. + // This is possible if a physical sstable is virtualized, or if it + // is moved. + latestRefCount := f.LatestRefs() + if latestRefCount <= 0 { + // If a file is present in deletedFilesMap for a level, then it + // must have already been added to the level previously, which + // means that its latest ref count cannot be 0. + err := errors.Errorf("pebble: internal error: incorrect latestRefs reference counting for file", f.FileNum) + return nil, err + } else if f.LatestUnref() == 0 { + addZombie(f.FileBacking) + } } addedFiles := make([]*FileMetadata, 0, len(addedFilesMap)) @@ -800,6 +889,9 @@ func (b *BulkVersionEdit) Apply( f.InitAllowedSeeks = allowedSeeks err := lm.tree.Insert(f) + // We're adding this file to the new version, so increment the + // latest refs count. + f.LatestRef() if err != nil { return nil, errors.Wrap(err, "pebble") } @@ -809,7 +901,7 @@ func (b *BulkVersionEdit) Apply( return nil, errors.Wrap(err, "pebble") } } - removeZombie(f.FileNum) + removeZombie(f.FileBacking) // Track the keys with the smallest and largest keys, so that we can // check consistency of the modified span. if sm == nil || base.InternalCompare(cmp, sm.Smallest, f.Smallest) > 0 { diff --git a/internal/manifest/version_edit_test.go b/internal/manifest/version_edit_test.go index ba51bd55fa..06e0f8aa68 100644 --- a/internal/manifest/version_edit_test.go +++ b/internal/manifest/version_edit_test.go @@ -49,6 +49,7 @@ func TestVersionEditRoundTrip(t *testing.T) { base.DecodeInternalKey([]byte("abc\x00\x01\x02\x03\x04\x05\x06\x07")), base.DecodeInternalKey([]byte("xyz\x01\xff\xfe\xfd\xfc\xfb\xfa\xf9")), ) + m1.InitPhysicalBacking() m2 := (&FileMetadata{ FileNum: 806, @@ -62,6 +63,7 @@ func TestVersionEditRoundTrip(t *testing.T) { base.DecodeInternalKey([]byte("A\x00\x01\x02\x03\x04\x05\x06\x07")), base.DecodeInternalKey([]byte("Z\x01\xff\xfe\xfd\xfc\xfb\xfa\xf9")), ) + m2.InitPhysicalBacking() m3 := (&FileMetadata{ FileNum: 807, @@ -72,6 +74,7 @@ func TestVersionEditRoundTrip(t *testing.T) { base.MakeInternalKey([]byte("aaa"), 0, base.InternalKeyKindRangeKeySet), base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeySet, []byte("zzz")), ) + m3.InitPhysicalBacking() m4 := (&FileMetadata{ FileNum: 809, @@ -88,6 +91,7 @@ func TestVersionEditRoundTrip(t *testing.T) { base.MakeInternalKey([]byte("l"), 0, base.InternalKeyKindRangeKeySet), base.MakeExclusiveSentinelKey(base.InternalKeyKindRangeKeySet, []byte("z")), ) + m4.InitPhysicalBacking() testCases := []VersionEdit{ // An empty version edit. @@ -148,6 +152,7 @@ func TestVersionEditDecode(t *testing.T) { base.MakeInternalKey([]byte("bar"), 5, base.InternalKeyKindDelete), base.MakeInternalKey([]byte("foo"), 4, base.InternalKeyKindSet), ) + m.InitPhysicalBacking() testCases := []struct { filename string @@ -312,6 +317,7 @@ func TestVersionEditApply(t *testing.T) { if m.SmallestSeqNum > m.LargestSeqNum { m.SmallestSeqNum, m.LargestSeqNum = m.LargestSeqNum, m.SmallestSeqNum } + m.InitPhysicalBacking() return m, nil } @@ -366,6 +372,7 @@ func TestVersionEditApply(t *testing.T) { } versionFiles[meta.FileNum] = &meta v.Levels[level].tree.Insert(&meta) + meta.LatestRef() } else { ve.NewFiles = append(ve.NewFiles, NewFileEntry{Level: level, Meta: &meta}) diff --git a/internal/manifest/version_test.go b/internal/manifest/version_test.go index f53ff3ec4b..ecb8e75583 100644 --- a/internal/manifest/version_test.go +++ b/internal/manifest/version_test.go @@ -74,6 +74,7 @@ func TestIkeyRange(t *testing.T) { m := (&FileMetadata{ FileNum: base.FileNum(i), }).ExtendPointKeyBounds(cmp, ikey(s[0:1]), ikey(s[2:3])) + m.InitPhysicalBacking() f = append(f, m) } } @@ -128,6 +129,7 @@ func TestContains(t *testing.T) { FileNum: fileNum, Size: size, }).ExtendPointKeyBounds(cmp, smallest, largest) + m.InitPhysicalBacking() return m } m00 := newFileMeta( @@ -272,7 +274,7 @@ func TestContains(t *testing.T) { func TestVersionUnref(t *testing.T) { list := &VersionList{} list.Init(&sync.Mutex{}) - v := &Version{Deleted: func([]*FileMetadata) {}} + v := &Version{Deleted: func([]*FileBacking) {}} v.Ref() list.PushBack(v) v.Unref() diff --git a/level_checker_test.go b/level_checker_test.go index 1d82a6fe04..77afcc88e1 100644 --- a/level_checker_test.go +++ b/level_checker_test.go @@ -141,6 +141,7 @@ func TestCheckLevelsCornerCases(t *testing.T) { m := (&fileMetadata{ FileNum: fileNum, }).ExtendPointKeyBounds(cmp, smallestKey, largestKey) + m.InitPhysicalBacking() *li = append(*li, m) i++ diff --git a/level_iter_test.go b/level_iter_test.go index 518b9af788..af65b943b4 100644 --- a/level_iter_test.go +++ b/level_iter_test.go @@ -63,6 +63,7 @@ func TestLevelIter(t *testing.T) { f.keys[0], f.keys[len(f.keys)-1], ) + meta.InitPhysicalBacking() metas = append(metas, meta) } files = manifest.NewLevelSliceKeySorted(base.DefaultComparer.Compare, metas) @@ -277,6 +278,7 @@ func (lt *levelIterTest) runBuild(d *datadriven.TestData) string { if meta.HasRangeKeys { m.ExtendRangeKeyBounds(lt.cmp.Compare, meta.SmallestRangeKey, meta.LargestRangeKey) } + m.InitPhysicalBacking() lt.metas = append(lt.metas, m) var buf bytes.Buffer @@ -511,6 +513,7 @@ func buildLevelIterTables( meta[i].FileNum = FileNum(i) largest, _ := iter.Last() meta[i].ExtendPointKeyBounds(opts.Comparer.Compare, (*smallest).Clone(), (*largest).Clone()) + meta[i].InitPhysicalBacking() } slice := manifest.NewLevelSliceKeySorted(base.DefaultComparer.Compare, meta) return readers, slice, keys, cleanup diff --git a/merging_iter_test.go b/merging_iter_test.go index 6d86a7bf99..8d5c8d6088 100644 --- a/merging_iter_test.go +++ b/merging_iter_test.go @@ -190,6 +190,7 @@ func TestMergingIterCornerCases(t *testing.T) { m := (&fileMetadata{ FileNum: fileNum, }).ExtendPointKeyBounds(cmp, smallestKey, largestKey) + m.InitPhysicalBacking() files[level] = append(files[level], m) i++ @@ -599,6 +600,7 @@ func buildLevelsForMergingIterSeqSeek( meta[j].FileNum = FileNum(j) largest, _ := iter.Last() meta[j].ExtendPointKeyBounds(opts.Comparer.Compare, smallest.Clone(), largest.Clone()) + meta[j].InitPhysicalBacking() } levelSlices[i] = manifest.NewLevelSliceSpecificOrder(meta) } diff --git a/open.go b/open.go index d5bfd3f883..9c28571f5a 100644 --- a/open.go +++ b/open.go @@ -843,7 +843,7 @@ func (d *DB) replayWAL( []*flushableEntry{entry}, ) for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files { - ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file}) + ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: 0, Meta: file.FileMetadata}) } } return toFlush, maxSeqNum, nil @@ -991,24 +991,32 @@ func checkConsistency(v *manifest.Version, dirname string, objProvider objstorag var buf bytes.Buffer var args []interface{} + dedup := make(map[base.FileNum]struct{}) for level, files := range v.Levels { iter := files.Iter() for f := iter.First(); f != nil; f = iter.Next() { - meta, err := objProvider.Lookup(base.FileTypeTable, f.FileNum) + backingState := f.FileBacking + if _, ok := dedup[backingState.FileNum]; ok { + continue + } + dedup[backingState.FileNum] = struct{}{} + fileNum := backingState.FileNum + fileSize := backingState.Size + meta, err := objProvider.Lookup(base.FileTypeTable, fileNum) var size int64 if err == nil { size, err = objProvider.Size(meta) } if err != nil { buf.WriteString("L%d: %s: %v\n") - args = append(args, errors.Safe(level), errors.Safe(f.FileNum), err) + args = append(args, errors.Safe(level), errors.Safe(fileNum), err) continue } - if size != int64(f.Size) { + if size != int64(fileSize) { buf.WriteString("L%d: %s: object size mismatch (%s): %d (disk) != %d (MANIFEST)\n") - args = append(args, errors.Safe(level), errors.Safe(f.FileNum), objProvider.Path(meta), - errors.Safe(size), errors.Safe(f.Size)) + args = append(args, errors.Safe(level), errors.Safe(fileNum), objProvider.Path(meta), + errors.Safe(size), errors.Safe(fileSize)) continue } } diff --git a/open_test.go b/open_test.go index c335d40f4d..6c485ea81c 100644 --- a/open_test.go +++ b/open_test.go @@ -1224,10 +1224,12 @@ func TestCheckConsistency(t *testing.T) { if err != nil { return nil, err } - return &manifest.FileMetadata{ + m := &manifest.FileMetadata{ FileNum: base.FileNum(fileNum), Size: uint64(size), - }, nil + } + m.InitPhysicalBacking() + return m, nil } datadriven.RunTest(t, "testdata/version_check_consistency", diff --git a/table_cache.go b/table_cache.go index 4aeee18a68..575fb08d89 100644 --- a/table_cache.go +++ b/table_cache.go @@ -145,8 +145,8 @@ func (c *tableCacheContainer) newRangeKeyIter( return c.tableCache.getShard(file.FileNum).newRangeKeyIter(file, opts, &c.dbOpts) } -func (c *tableCacheContainer) getTableProperties(file *fileMetadata) (*sstable.Properties, error) { - return c.tableCache.getShard(file.FileNum).getTableProperties(file, &c.dbOpts) +func (c *tableCacheContainer) getTableProperties(file physicalMeta) (*sstable.Properties, error) { + return c.tableCache.getShard(file.FileNum).getTableProperties(file.FileMetadata, &c.dbOpts) } func (c *tableCacheContainer) evict(fileNum FileNum) { diff --git a/table_stats.go b/table_stats.go index 286ea517b3..4167467625 100644 --- a/table_stats.go +++ b/table_stats.go @@ -186,7 +186,10 @@ func (d *DB) loadNewFileStats( continue } - stats, newHints, err := d.loadTableStats(rs.current, nf.Level, nf.Meta) + stats, newHints, err := d.loadTableStats( + rs.current, nf.Level, + nf.Meta.PhysicalMeta(), + ) if err != nil { d.opts.EventListener.BackgroundError(err) continue @@ -233,7 +236,9 @@ func (d *DB) scanReadStateTableStats( return fill, hints, moreRemain } - stats, newHints, err := d.loadTableStats(rs.current, l, f) + stats, newHints, err := d.loadTableStats( + rs.current, l, f.PhysicalMeta(), + ) if err != nil { // Set `moreRemain` so we'll try again. moreRemain = true @@ -250,31 +255,38 @@ func (d *DB) scanReadStateTableStats( return fill, hints, moreRemain } +// loadTableStats currently only supports stats collection for physical +// sstables. +// +// TODO(bananabrick): Support stats collection for virtual sstables. func (d *DB) loadTableStats( - v *version, level int, meta *fileMetadata, + v *version, level int, meta physicalMeta, ) (manifest.TableStats, []deleteCompactionHint, error) { var stats manifest.TableStats var compactionHints []deleteCompactionHint - err := d.tableCache.withReader(meta, func(r *sstable.Reader) (err error) { - stats.NumEntries = r.Properties.NumEntries - stats.NumDeletions = r.Properties.NumDeletions - if r.Properties.NumPointDeletions() > 0 { - if err = d.loadTablePointKeyStats(r, v, level, meta, &stats); err != nil { - return + err := d.tableCache.withReader( + meta.FileMetadata, func(r *sstable.Reader) (err error) { + stats.NumEntries = r.Properties.NumEntries + stats.NumDeletions = r.Properties.NumDeletions + if r.Properties.NumPointDeletions() > 0 { + if err = d.loadTablePointKeyStats(r, v, level, meta, &stats); err != nil { + return + } } - } - if r.Properties.NumRangeDeletions > 0 || r.Properties.NumRangeKeyDels > 0 { - if compactionHints, err = d.loadTableRangeDelStats(r, v, level, meta, &stats); err != nil { - return + if r.Properties.NumRangeDeletions > 0 || r.Properties.NumRangeKeyDels > 0 { + if compactionHints, err = d.loadTableRangeDelStats( + r, v, level, meta, &stats, + ); err != nil { + return + } } - } - // TODO(travers): Once we have real-world data, consider collecting - // additional stats that may provide improved heuristics for compaction - // picking. - stats.NumRangeKeySets = r.Properties.NumRangeKeySets - stats.ValueBlocksSize = r.Properties.ValueBlocksSize - return - }) + // TODO(travers): Once we have real-world data, consider collecting + // additional stats that may provide improved heuristics for compaction + // picking. + stats.NumRangeKeySets = r.Properties.NumRangeKeySets + stats.ValueBlocksSize = r.Properties.ValueBlocksSize + return + }) if err != nil { return stats, nil, err } @@ -284,7 +296,7 @@ func (d *DB) loadTableStats( // loadTablePointKeyStats calculates the point key statistics for the given // table. The provided manifest.TableStats are updated. func (d *DB) loadTablePointKeyStats( - r *sstable.Reader, v *version, level int, meta *fileMetadata, stats *manifest.TableStats, + r *sstable.Reader, v *version, level int, meta physicalMeta, stats *manifest.TableStats, ) error { // TODO(jackson): If the file has a wide keyspace, the average // value size beneath the entire file might not be representative @@ -304,9 +316,9 @@ func (d *DB) loadTablePointKeyStats( // loadTableRangeDelStats calculates the range deletion and range key deletion // statistics for the given table. func (d *DB) loadTableRangeDelStats( - r *sstable.Reader, v *version, level int, meta *fileMetadata, stats *manifest.TableStats, + r *sstable.Reader, v *version, level int, meta physicalMeta, stats *manifest.TableStats, ) ([]deleteCompactionHint, error) { - iter, err := newCombinedDeletionKeyspanIter(d.opts.Comparer, r, meta) + iter, err := newCombinedDeletionKeyspanIter(d.opts.Comparer, r, meta.FileMetadata) if err != nil { return nil, err } @@ -395,7 +407,7 @@ func (d *DB) loadTableRangeDelStats( hintType: hintType, start: make([]byte, len(start)), end: make([]byte, len(end)), - tombstoneFile: meta, + tombstoneFile: meta.FileMetadata, tombstoneLevel: level, tombstoneLargestSeqNum: s.LargestSeqNum(), tombstoneSmallestSeqNum: s.SmallestSeqNum(), @@ -409,7 +421,7 @@ func (d *DB) loadTableRangeDelStats( } func (d *DB) averageEntrySizeBeneath( - v *version, level int, meta *fileMetadata, + v *version, level int, meta physicalMeta, ) (avgKeySize, avgValueSize uint64, err error) { // Find all files in lower levels that overlap with meta, // summing their value sizes and entry counts. @@ -419,13 +431,23 @@ func (d *DB) averageEntrySizeBeneath( meta.Largest.UserKey, meta.Largest.IsExclusiveSentinel()) iter := overlaps.Iter() for file := iter.First(); file != nil; file = iter.Next() { - err := d.tableCache.withReader(file, func(r *sstable.Reader) (err error) { - fileSum += file.Size - entryCount += r.Properties.NumEntries - keySum += r.Properties.RawKeySize - valSum += r.Properties.RawValueSize - return nil - }) + var err error + if file.Virtual { + // TODO(bananabrick): Once we have Properties for the virtual + // sstables, use those here. + panic("pebble: not implemented") + } else { + err = d.tableCache.withReader( + file, + func(r *sstable.Reader) (err error) { + fileSum += file.Size + entryCount += r.Properties.NumEntries + keySum += r.Properties.RawKeySize + valSum += r.Properties.RawValueSize + return nil + }) + } + if err != nil { return 0, 0, err } @@ -471,6 +493,11 @@ func (d *DB) estimateReclaimedSizeBeneath( overlaps := v.Overlaps(l, d.cmp, start, end, true /* exclusiveEnd */) iter := overlaps.Iter() for file := iter.First(); file != nil; file = iter.Next() { + if file.Virtual { + // TODO(bananabrick): Remove this check once + // Reader.EstimatedDiskUsage works for virtual sstables. + panic("pebble: unimplemented") + } startCmp := d.cmp(start, file.Smallest.UserKey) endCmp := d.cmp(file.Largest.UserKey, end) if startCmp <= 0 && (endCmp < 0 || endCmp == 0 && file.Largest.IsExclusiveSentinel()) { @@ -527,10 +554,11 @@ func (d *DB) estimateReclaimedSizeBeneath( continue } var size uint64 - err := d.tableCache.withReader(file, func(r *sstable.Reader) (err error) { - size, err = r.EstimateDiskUsage(start, end) - return err - }) + err := d.tableCache.withReader( + file, func(r *sstable.Reader) (err error) { + size, err = r.EstimateDiskUsage(start, end) + return err + }) if err != nil { return 0, hintSeqNum, err } @@ -541,7 +569,7 @@ func (d *DB) estimateReclaimedSizeBeneath( return estimate, hintSeqNum, nil } -func maybeSetStatsFromProperties(meta *fileMetadata, props *sstable.Properties) bool { +func maybeSetStatsFromProperties(meta physicalMeta, props *sstable.Properties) bool { // If a table contains range deletions or range key deletions, we defer the // stats collection. There are two main reasons for this: // @@ -755,7 +783,8 @@ func newCombinedDeletionKeyspanIter( // Truncate tombstones to the containing file's bounds if necessary. // See docs/range_deletions.md for why this is necessary. iter = keyspan.Truncate( - comparer.Compare, iter, m.Smallest.UserKey, m.Largest.UserKey, nil, nil, + comparer.Compare, iter, m.Smallest.UserKey, m.Largest.UserKey, + nil, nil, ) mIter.AddLevel(iter) } diff --git a/tool/db.go b/tool/db.go index 01586672b3..0598bbc419 100644 --- a/tool/db.go +++ b/tool/db.go @@ -485,7 +485,10 @@ func (d *dbT) runProperties(cmd *cobra.Command, args []string) { d.fmtValue.setForComparer(ve.ComparerName, d.comparers) } } - v, err := bve.Apply(nil /* version */, cmp.Compare, d.fmtKey.fn, d.opts.FlushSplitBytes, d.opts.Experimental.ReadCompactionRate, nil /* zombies */) + v, err := bve.Apply( + nil /* version */, cmp.Compare, d.fmtKey.fn, d.opts.FlushSplitBytes, + d.opts.Experimental.ReadCompactionRate, nil, /* zombies */ + ) if err != nil { return err } @@ -504,7 +507,15 @@ func (d *dbT) runProperties(cmd *cobra.Command, args []string) { iter := l.Iter() var level props for t := iter.First(); t != nil; t = iter.Next() { - err := d.addProps(objProvider, t, &level) + if t.Virtual { + // TODO(bananabrick): Handle virtual sstables here. We don't + // really have any stats or properties at this point. Maybe + // we could approximate some of these properties for virtual + // sstables by first grabbing properties for the backing + // physical sstable, and then extrapolating. + continue + } + err := d.addProps(objProvider, t.PhysicalMeta(), &level) if err != nil { return err } @@ -648,7 +659,9 @@ func (p *props) update(o props) { p.TopLevelIndexSize += o.TopLevelIndexSize } -func (d *dbT) addProps(objProvider objstorage.Provider, m *manifest.FileMetadata, p *props) error { +func (d *dbT) addProps( + objProvider objstorage.Provider, m manifest.PhysicalFileMeta, p *props, +) error { ctx := context.Background() f, err := objProvider.OpenForReading(ctx, base.FileTypeTable, m.FileNum, objstorage.OpenOptions{}) if err != nil { diff --git a/tool/find.go b/tool/find.go index c7c3bde25d..15970db0da 100644 --- a/tool/find.go +++ b/tool/find.go @@ -28,6 +28,14 @@ type findRef struct { } // findT implements the find tool. +// +// TODO(bananabrick): Add support for virtual sstables in this tool. Currently, +// the tool will work because we're parsing files from disk, so virtual sstables +// will never be added to findT.tables. The manifest could contain information +// about virtual sstables. This is fine because the manifest is only used to +// compute the findT.editRefs, and editRefs is only used if a file in +// findT.tables contains a key. Of course, the tool won't be completely +// accurate without dealing with virtual sstable case. type findT struct { Root *cobra.Command diff --git a/tool/manifest.go b/tool/manifest.go index ae4a2630e2..4fbddb2575 100644 --- a/tool/manifest.go +++ b/tool/manifest.go @@ -234,7 +234,11 @@ func (m *manifestT) runDump(cmd *cobra.Command, args []string) { } if cmp != nil { - v, err := bve.Apply(nil /* version */, cmp.Compare, m.fmtKey.fn, 0, m.opts.Experimental.ReadCompactionRate, nil /* zombies */) + v, err := bve.Apply( + nil /* version */, cmp.Compare, m.fmtKey.fn, 0, + m.opts.Experimental.ReadCompactionRate, + nil, /* zombies */ + ) if err != nil { fmt.Fprintf(stdout, "%s\n", err) return diff --git a/version_set.go b/version_set.go index 78befd9296..bbc36f586d 100644 --- a/version_set.go +++ b/version_set.go @@ -30,6 +30,8 @@ const manifestMarkerName = `manifest` type bulkVersionEdit = manifest.BulkVersionEdit type deletedFileEntry = manifest.DeletedFileEntry type fileMetadata = manifest.FileMetadata +type physicalMeta = manifest.PhysicalFileMeta +type fileBacking = manifest.FileBacking type newFileEntry = manifest.NewFileEntry type version = manifest.Version type versionEdit = manifest.VersionEdit @@ -83,8 +85,8 @@ type versionSet struct { // A pointer to versionSet.addObsoleteLocked. Avoids allocating a new closure // on the creation of every version. - obsoleteFn func(obsolete []*manifest.FileMetadata) - obsoleteTables []*manifest.FileMetadata + obsoleteFn func(obsolete []*fileBacking) + obsoleteTables []fileInfo obsoleteManifests []fileInfo obsoleteOptions []fileInfo @@ -92,6 +94,18 @@ type versionSet struct { // still referenced by an inuse iterator. zombieTables map[FileNum]uint64 // filenum -> size + // fileBackingMap is a map for the FileBacking which is supporting virtual + // sstables in the latest version. Once the file backing is backing no + // virtual sstables in the latest version, it is removed from this map and + // the corresponding state is added to the zombieTables map. Note that we + // don't keep track of file backing which supports a virtual sstable + // which is not in the latest version. + // + // fileBackingMap is protected by the versionSet.logLock. It's populated + // during Open in versionSet.load, but it's not used concurrently during + // load. + fileBackingMap map[FileNum]*fileBacking + // minUnflushedLogNum is the smallest WAL log file number corresponding to // mutations that have not been flushed to an sstable. minUnflushedLogNum FileNum @@ -132,6 +146,7 @@ func (vs *versionSet) init( vs.versions.Init(mu) vs.obsoleteFn = vs.addObsoleteLocked vs.zombieTables = make(map[FileNum]uint64) + vs.fileBackingMap = make(map[FileNum]*fileBacking) vs.nextFileNum = 1 vs.manifestMarker = marker vs.setCurrent = setCurrent @@ -277,6 +292,16 @@ func (vs *versionSet) load( } vs.markFileNumUsed(vs.minUnflushedLogNum) + // Populate the fileBackingMap since we have finished version + // edit accumulation. + for _, s := range bve.AddedFileBacking { + vs.fileBackingMap[s.FileNum] = s + } + + for _, fileNum := range bve.RemovedFileBacking { + delete(vs.fileBackingMap, fileNum) + } + newVersion, err := bve.Apply( nil, vs.cmp, opts.Comparer.FormatKey, opts.FlushSplitBytes, opts.Experimental.ReadCompactionRate, nil, /* zombies */ @@ -464,9 +489,10 @@ func (vs *versionSet) logAndApply( defer vs.mu.Lock() var err error - newVersion, zombies, err = manifest.AccumulateAndApplySingleVE( + newVersion, zombies, err = manifest.AccumulateIncompleteAndApplySingleVE( ve, currentVersion, vs.cmp, vs.opts.Comparer.FormatKey, vs.opts.FlushSplitBytes, vs.opts.Experimental.ReadCompactionRate, + vs.fileBackingMap, ) if err != nil { return errors.Wrap(err, "MANIFEST apply failed") @@ -488,6 +514,7 @@ func (vs *versionSet) logAndApply( if err != nil { return errors.Wrap(err, "MANIFEST next record write failed") } + // NB: Any error from this point on is considered fatal as we don't now if // the MANIFEST write occurred or not. Trying to determine that is // fraught. Instead we rely on the standard recovery mechanism run when a @@ -652,6 +679,7 @@ func (vs *versionSet) createManifest( snapshot := versionEdit{ ComparerName: vs.cmpName, } + dedup := make(map[base.FileNum]struct{}) for level, levelMetadata := range vs.currentVersion().Levels { iter := levelMetadata.Iter() for meta := iter.First(); meta != nil; meta = iter.Next() { @@ -659,6 +687,14 @@ func (vs *versionSet) createManifest( Level: level, Meta: meta, }) + // TODO(bananabrick): Test snapshot changes. + if _, ok := dedup[meta.FileBacking.FileNum]; meta.Virtual && !ok { + dedup[meta.FileBacking.FileNum] = struct{}{} + snapshot.CreatedBackingTables = append( + snapshot.CreatedBackingTables, + meta.FileBacking, + ) + } } } @@ -729,7 +765,7 @@ func (vs *versionSet) addLiveFileNums(m map[FileNum]struct{}) { for _, lm := range v.Levels { iter := lm.Iter() for f := iter.First(); f != nil; f = iter.Next() { - m[f.FileNum] = struct{}{} + m[f.FileBacking.FileNum] = struct{}{} } } if v == current { @@ -738,27 +774,49 @@ func (vs *versionSet) addLiveFileNums(m map[FileNum]struct{}) { } } +// addObsoleteLocked will add the fileInfo associated with obsolete backing +// sstables to the obsolete tables list. +// +// The file backings in the obsolete list must not appear more than once. +// // DB.mu must be held when addObsoleteLocked is called. -func (vs *versionSet) addObsoleteLocked(obsolete []*manifest.FileMetadata) { +func (vs *versionSet) addObsoleteLocked(obsolete []*fileBacking) { if len(obsolete) == 0 { return } - for _, fileMeta := range obsolete { + obsoleteFileInfo := make([]fileInfo, len(obsolete)) + for i, bs := range obsolete { + obsoleteFileInfo[i].fileNum = bs.FileNum + obsoleteFileInfo[i].fileSize = bs.Size + } + + if invariants.Enabled { + dedup := make(map[base.FileNum]struct{}) + for _, fi := range obsoleteFileInfo { + dedup[fi.fileNum] = struct{}{} + } + if len(dedup) != len(obsoleteFileInfo) { + panic("pebble: duplicate FileBacking present in obsolete list") + } + } + + for _, fi := range obsoleteFileInfo { // Note that the obsolete tables are no longer zombie by the definition of // zombie, but we leave them in the zombie tables map until they are // deleted from disk. - if _, ok := vs.zombieTables[fileMeta.FileNum]; !ok { - vs.opts.Logger.Fatalf("MANIFEST obsolete table %s not marked as zombie", fileMeta.FileNum) + if _, ok := vs.zombieTables[fi.fileNum]; !ok { + vs.opts.Logger.Fatalf("MANIFEST obsolete table %s not marked as zombie", fi.fileNum) } } - vs.obsoleteTables = append(vs.obsoleteTables, obsolete...) + + vs.obsoleteTables = append(vs.obsoleteTables, obsoleteFileInfo...) vs.updateObsoleteTableMetricsLocked() } // addObsolete will acquire DB.mu, so DB.mu must not be held when this is // called. -func (vs *versionSet) addObsolete(obsolete []*manifest.FileMetadata) { +func (vs *versionSet) addObsolete(obsolete []*fileBacking) { vs.mu.Lock() defer vs.mu.Unlock() vs.addObsoleteLocked(obsolete) @@ -767,8 +825,8 @@ func (vs *versionSet) addObsolete(obsolete []*manifest.FileMetadata) { func (vs *versionSet) updateObsoleteTableMetricsLocked() { vs.metrics.Table.ObsoleteCount = int64(len(vs.obsoleteTables)) vs.metrics.Table.ObsoleteSize = 0 - for _, fileMeta := range vs.obsoleteTables { - vs.metrics.Table.ObsoleteSize += fileMeta.Size + for _, fi := range vs.obsoleteTables { + vs.metrics.Table.ObsoleteSize += fi.fileSize } } diff --git a/version_set_test.go b/version_set_test.go index dcf7b0899d..6fdf08d430 100644 --- a/version_set_test.go +++ b/version_set_test.go @@ -7,8 +7,10 @@ package pebble import ( "io" "testing" + "time" "github.com/cockroachdb/pebble/internal/base" + "github.com/cockroachdb/pebble/internal/manifest" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/sstable" @@ -26,6 +28,188 @@ func writeAndIngest(t *testing.T, mem vfs.FS, d *DB, k InternalKey, v []byte, fi require.NoError(t, d.Ingest([]string{path})) } +// TestLatestRefCounting sanity checks the ref counting implementation for +// FileMetadata.latestRefs, and makes sure that the zombie table implementation +// works when the version edit contains virtual sstables. It also checks that +// we're adding the physical sstable to the obsolete tables list iff the file is +// truly obsolete. +func TestLatestRefCounting(t *testing.T) { + mem := vfs.NewMem() + require.NoError(t, mem.MkdirAll("ext", 0755)) + + opts := &Options{ + FS: mem, + MaxManifestFileSize: 1, + DisableAutomaticCompactions: true, + } + d, err := Open("", opts) + require.NoError(t, err) + + err = d.Set([]byte{'a'}, []byte{'a'}, nil) + require.NoError(t, err) + err = d.Set([]byte{'b'}, []byte{'b'}, nil) + require.NoError(t, err) + + err = d.Flush() + require.NoError(t, err) + + iter := d.mu.versions.currentVersion().Levels[0].Iter() + var f *fileMetadata = iter.First() + require.NotNil(t, f) + require.Equal(t, 1, int(f.LatestRefs())) + require.Equal(t, 0, len(d.mu.versions.obsoleteTables)) + + // Grab some new file nums. + d.mu.Lock() + f1 := d.mu.versions.nextFileNum + f2 := f1 + 1 + d.mu.versions.nextFileNum += 2 + d.mu.Unlock() + + m1 := &manifest.FileMetadata{ + FileBacking: f.FileBacking, + FileNum: f1, + CreationTime: time.Now().Unix(), + Size: f.Size / 2, + SmallestSeqNum: f.SmallestSeqNum, + LargestSeqNum: f.LargestSeqNum, + Smallest: base.MakeInternalKey([]byte{'a'}, f.Smallest.SeqNum(), InternalKeyKindSet), + Largest: base.MakeInternalKey([]byte{'a'}, f.Smallest.SeqNum(), InternalKeyKindSet), + HasPointKeys: true, + Virtual: true, + } + + m2 := &manifest.FileMetadata{ + FileBacking: f.FileBacking, + FileNum: f2, + CreationTime: time.Now().Unix(), + Size: f.Size / 2, + SmallestSeqNum: f.SmallestSeqNum, + LargestSeqNum: f.LargestSeqNum, + Smallest: base.MakeInternalKey([]byte{'b'}, f.Largest.SeqNum(), InternalKeyKindSet), + Largest: base.MakeInternalKey([]byte{'b'}, f.Largest.SeqNum(), InternalKeyKindSet), + HasPointKeys: true, + Virtual: true, + } + + m1.LargestPointKey = m1.Largest + m1.SmallestPointKey = m1.Smallest + + m2.LargestPointKey = m2.Largest + m2.SmallestPointKey = m2.Smallest + + m1.ValidateVirtual(f) + m2.ValidateVirtual(f) + + fileMetrics := func(ve *versionEdit) map[int]*LevelMetrics { + metrics := newFileMetrics(ve.NewFiles) + for de, f := range ve.DeletedFiles { + lm := metrics[de.Level] + if lm == nil { + lm = &LevelMetrics{} + metrics[de.Level] = lm + } + metrics[de.Level].NumFiles-- + metrics[de.Level].Size -= int64(f.Size) + } + return metrics + } + + d.mu.Lock() + defer d.mu.Unlock() + applyVE := func(ve *versionEdit) error { + d.mu.versions.logLock() + jobID := d.mu.nextJobID + d.mu.nextJobID++ + + err := d.mu.versions.logAndApply(jobID, ve, fileMetrics(ve), false, func() []compactionInfo { + return d.getInProgressCompactionInfoLocked(nil) + }) + d.updateReadStateLocked(nil) + return err + } + + // Virtualize f. + ve := manifest.VersionEdit{} + d1 := manifest.DeletedFileEntry{Level: 0, FileNum: f.FileNum} + n1 := manifest.NewFileEntry{Level: 0, Meta: m1} + n2 := manifest.NewFileEntry{Level: 0, Meta: m2} + + ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata) + ve.DeletedFiles[d1] = f + ve.NewFiles = append(ve.NewFiles, n1) + ve.NewFiles = append(ve.NewFiles, n2) + ve.CreatedBackingTables = append(ve.CreatedBackingTables, f.FileBacking) + + require.NoError(t, applyVE(&ve)) + // 2 latestRefs from 2 virtual sstables in the latest version which refer + // to the physical sstable. + require.Equal(t, 2, int(m1.LatestRefs())) + require.Equal(t, 0, len(d.mu.versions.obsoleteTables)) + require.Equal(t, 1, len(d.mu.versions.fileBackingMap)) + _, ok := d.mu.versions.fileBackingMap[f.FileNum] + require.True(t, ok) + require.Equal(t, f.Size, m2.FileBacking.Atomic.VirtualizedSize.Load()) + + // Make sure that f is not present in zombie list, because it is not yet a + // zombie. + require.Equal(t, 0, len(d.mu.versions.zombieTables)) + + // Delete the virtual sstable m1. + ve = manifest.VersionEdit{} + d1 = manifest.DeletedFileEntry{Level: 0, FileNum: m1.FileNum} + ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata) + ve.DeletedFiles[d1] = m1 + require.NoError(t, applyVE(&ve)) + + // Only one virtual sstable in the latest version, confirm that the latest + // version ref counting is correct. + require.Equal(t, 1, int(m2.LatestRefs())) + require.Equal(t, 0, len(d.mu.versions.zombieTables)) + require.Equal(t, 0, len(d.mu.versions.obsoleteTables)) + require.Equal(t, 1, len(d.mu.versions.fileBackingMap)) + _, ok = d.mu.versions.fileBackingMap[f.FileNum] + require.True(t, ok) + require.Equal(t, m2.Size, m2.FileBacking.Atomic.VirtualizedSize.Load()) + + // Move m2 from L0 to L6 to test the move compaction case. + ve = manifest.VersionEdit{} + d1 = manifest.DeletedFileEntry{Level: 0, FileNum: m2.FileNum} + n1 = manifest.NewFileEntry{Level: 6, Meta: m2} + ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata) + ve.DeletedFiles[d1] = m2 + ve.NewFiles = append(ve.NewFiles, n1) + require.NoError(t, applyVE(&ve)) + + require.Equal(t, 1, int(m2.LatestRefs())) + require.Equal(t, 0, len(d.mu.versions.zombieTables)) + require.Equal(t, 0, len(d.mu.versions.obsoleteTables)) + require.Equal(t, 1, len(d.mu.versions.fileBackingMap)) + _, ok = d.mu.versions.fileBackingMap[f.FileNum] + require.True(t, ok) + require.Equal(t, m2.Size, m2.FileBacking.Atomic.VirtualizedSize.Load()) + + // Delete m2 from L6. + ve = manifest.VersionEdit{} + d1 = manifest.DeletedFileEntry{Level: 6, FileNum: m2.FileNum} + ve.DeletedFiles = make(map[manifest.DeletedFileEntry]*manifest.FileMetadata) + ve.DeletedFiles[d1] = m2 + require.NoError(t, applyVE(&ve)) + + // All virtual sstables are gone. + require.Equal(t, 0, int(m2.LatestRefs())) + require.Equal(t, 1, len(d.mu.versions.zombieTables)) + require.Equal(t, f.Size, d.mu.versions.zombieTables[f.FileNum]) + require.Equal(t, 0, len(d.mu.versions.fileBackingMap)) + _, ok = d.mu.versions.fileBackingMap[f.FileNum] + require.False(t, ok) + require.Equal(t, 0, int(m2.FileBacking.Atomic.VirtualizedSize.Load())) + + // Make sure that the backing file is added to the obsolete tables list. + require.Equal(t, 1, len(d.mu.versions.obsoleteTables)) + +} + func TestVersionSetCheckpoint(t *testing.T) { mem := vfs.NewMem() require.NoError(t, mem.MkdirAll("ext", 0755))