Skip to content

Commit

Permalink
.*: virtual sstable FileMetadata changes
Browse files Browse the repository at this point in the history
This pr introduces the FileMetadata changes which will
be required to make further virtual sstable changes.

This pr doesn't deal with persisting of the FileMetadata
to disk through the version edits.

RFC: https://github.com/cockroachdb/pebble/blob/master/docs/RFCS/20221122_virtual_sstable.md
  • Loading branch information
bananabrick committed Mar 28, 2023
1 parent fb9bced commit 2e9d3f5
Show file tree
Hide file tree
Showing 32 changed files with 1,029 additions and 268 deletions.
41 changes: 36 additions & 5 deletions checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ func mkdirAllAndSyncParents(fs vfs.FS, destDir string) (vfs.File, error) {
// space overhead for a checkpoint if hard links are disabled. Also beware that
// even if hard links are used, the space overhead for the checkpoint will
// increase over time as the DB performs compactions.
//
// TODO(bananabrick): Test checkpointing of virtual sstables once virtual
// sstables is running e2e.
func (d *DB) Checkpoint(
destDir string, opts ...CheckpointOption,
) (
Expand Down Expand Up @@ -188,7 +191,10 @@ func (d *DB) Checkpoint(
manifestFileNum := d.mu.versions.manifestFileNum
manifestSize := d.mu.versions.manifest.Size()
optionsFileNum := d.optionsFileNum

virtualBackingFiles := make(map[base.FileNum]struct{})
for fileNum := range d.mu.versions.fileBackingMap {
virtualBackingFiles[fileNum] = struct{}{}
}
// Release the manifest and DB.mu so we don't block other operations on
// the database.
d.mu.versions.logUnlock()
Expand Down Expand Up @@ -254,7 +260,9 @@ func (d *DB) Checkpoint(
}

var excludedFiles map[deletedFileEntry]*fileMetadata

// Set of FileBacking.FileNum which will be required by virtual sstables in
// the checkpoint.
requiredVirtualBackingFiles := make(map[base.FileNum]struct{})
// Link or copy the sstables.
for l := range current.Levels {
iter := current.Levels[l].Iter()
Expand All @@ -270,7 +278,15 @@ func (d *DB) Checkpoint(
continue
}

srcPath := base.MakeFilepath(fs, d.dirname, fileTypeTable, f.FileNum)
fileBacking := f.FileBacking
if f.Virtual {
if _, ok := requiredVirtualBackingFiles[fileBacking.FileNum]; ok {
continue
}
requiredVirtualBackingFiles[fileBacking.FileNum] = struct{}{}
}

srcPath := base.MakeFilepath(fs, d.dirname, fileTypeTable, fileBacking.FileNum)
destPath := fs.PathJoin(destDir, fs.PathBase(srcPath))
ckErr = vfs.LinkOrCopy(fs, srcPath, destPath)
if ckErr != nil {
Expand All @@ -279,7 +295,19 @@ func (d *DB) Checkpoint(
}
}

ckErr = d.writeCheckpointManifest(fs, formatVers, destDir, dir, manifestFileNum, manifestSize, excludedFiles)
var removeBackingTables []base.FileNum
for fileNum := range virtualBackingFiles {
if _, ok := requiredVirtualBackingFiles[fileNum]; !ok {
// The backing sstable associated with fileNum is no longer
// required.
removeBackingTables = append(removeBackingTables, fileNum)
}
}

ckErr = d.writeCheckpointManifest(
fs, formatVers, destDir, dir, manifestFileNum, manifestSize,
excludedFiles, removeBackingTables,
)
if ckErr != nil {
return ckErr
}
Expand Down Expand Up @@ -318,6 +346,7 @@ func (d *DB) writeCheckpointManifest(
manifestFileNum FileNum,
manifestSize int64,
excludedFiles map[deletedFileEntry]*fileMetadata,
removeBackingTables []base.FileNum,
) error {
// Copy the MANIFEST, and create a pointer to it. We copy rather
// than link because additional version edits added to the
Expand Down Expand Up @@ -349,8 +378,10 @@ func (d *DB) writeCheckpointManifest(
if len(excludedFiles) > 0 {
// Write out an additional VersionEdit that deletes the excluded SST files.
ve := versionEdit{
DeletedFiles: excludedFiles,
DeletedFiles: excludedFiles,
RemovedBackingTables: removeBackingTables,
}

rw := record.NewWriter(dst)
w, err := rw.Next()
if err != nil {
Expand Down
59 changes: 35 additions & 24 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1384,7 +1384,9 @@ func (c *compaction) newInputIter(
// of range tombstones outside the file's internal key bounds. Skip
// any range tombstones completely outside file bounds.
rangeDelIter = keyspan.Truncate(
c.cmp, rangeDelIter, lowerBound.UserKey, upperBound.UserKey, &f.Smallest, &f.Largest)
c.cmp, rangeDelIter, lowerBound.UserKey, upperBound.UserKey,
&f.Smallest, &f.Largest,
)
}
if rangeDelIter == nil {
rangeDelIter = emptyKeyspanIter
Expand Down Expand Up @@ -1836,12 +1838,12 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
for _, file := range c.flushing[0].flushable.(*ingestedFlushable).files {
level, err = ingestTargetLevel(
d.newIters, d.tableNewRangeKeyIter, iterOpts, d.cmp,
c.version, baseLevel, d.mu.compact.inProgress, file,
c.version, baseLevel, d.mu.compact.inProgress, file.FileMetadata,
)
if err != nil {
return nil, err
}
ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file})
ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: file.FileMetadata})
levelMetrics := c.metrics[level]
if levelMetrics == nil {
levelMetrics = &LevelMetrics{}
Expand Down Expand Up @@ -1958,7 +1960,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
startTime := d.timeNow()

var ve *manifest.VersionEdit
var pendingOutputs []*manifest.FileMetadata
var pendingOutputs []physicalMeta
// To determine the target level of the files in the ingestedFlushable, we
// need to acquire the logLock, and not release it for that duration. Since,
// we need to acquire the logLock below to perform the logAndApply step
Expand Down Expand Up @@ -2023,7 +2025,12 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
if err != nil {
info.Err = err
// TODO(peter): untested.
d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, pendingOutputs...)
for _, f := range pendingOutputs {
d.mu.versions.obsoleteTables = append(
d.mu.versions.obsoleteTables,
fileInfo{f.FileNum, f.Size},
)
}
d.mu.versions.updateObsoleteTableMetricsLocked()
}
} else {
Expand Down Expand Up @@ -2522,7 +2529,12 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
})
if err != nil {
// TODO(peter): untested.
d.mu.versions.obsoleteTables = append(d.mu.versions.obsoleteTables, pendingOutputs...)
for _, f := range pendingOutputs {
d.mu.versions.obsoleteTables = append(
d.mu.versions.obsoleteTables,
fileInfo{f.FileNum, f.Size},
)
}
d.mu.versions.updateObsoleteTableMetricsLocked()
}
}
Expand Down Expand Up @@ -2564,7 +2576,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
// re-acquired during the course of this method.
func (d *DB) runCompaction(
jobID int, c *compaction,
) (ve *versionEdit, pendingOutputs []*fileMetadata, retErr error) {
) (ve *versionEdit, pendingOutputs []physicalMeta, retErr error) {
// As a sanity check, confirm that the smallest / largest keys for new and
// deleted files in the new versionEdit pass a validation function before
// returning the edit.
Expand Down Expand Up @@ -2745,7 +2757,7 @@ func (d *DB) runCompaction(
d.mu.Lock()
fileNum := d.mu.versions.getNextFileNum()
fileMeta.FileNum = fileNum
pendingOutputs = append(pendingOutputs, fileMeta)
pendingOutputs = append(pendingOutputs, fileMeta.PhysicalMeta())
d.mu.Unlock()

writable, objMeta, err := d.objProvider.Create(context.TODO(), fileTypeTable, fileNum, objstorage.CreateOptions{} /* TODO */)
Expand Down Expand Up @@ -2896,9 +2908,13 @@ func (d *DB) runCompaction(
meta.Size = writerMeta.Size
meta.SmallestSeqNum = writerMeta.SmallestSeqNum
meta.LargestSeqNum = writerMeta.LargestSeqNum
meta.InitPhysicalBacking()

// If the file didn't contain any range deletions, we can fill its
// table stats now, avoiding unnecessarily loading the table later.
maybeSetStatsFromProperties(meta, &writerMeta.Properties)
maybeSetStatsFromProperties(
meta.PhysicalMeta(), &writerMeta.Properties,
)

if c.flushing == nil {
outputMetrics.TablesCompacted++
Expand Down Expand Up @@ -3250,7 +3266,7 @@ func (d *DB) scanObsoleteFiles(list []string) {
manifestFileNum := d.mu.versions.manifestFileNum

var obsoleteLogs []fileInfo
var obsoleteTables []*fileMetadata
var obsoleteTables []fileInfo
var obsoleteManifests []fileInfo
var obsoleteOptions []fileInfo

Expand Down Expand Up @@ -3301,13 +3317,13 @@ func (d *DB) scanObsoleteFiles(list []string) {
if _, ok := liveFileNums[obj.FileNum]; ok {
continue
}
fileMeta := &fileMetadata{
FileNum: obj.FileNum,
fileInfo := fileInfo{
fileNum: obj.FileNum,
}
if size, err := d.objProvider.Size(obj); err == nil {
fileMeta.Size = uint64(size)
fileInfo.fileSize = uint64(size)
}
obsoleteTables = append(obsoleteTables, fileMeta)
obsoleteTables = append(obsoleteTables, fileInfo)

default:
// Ignore object types we don't know about.
Expand All @@ -3316,7 +3332,7 @@ func (d *DB) scanObsoleteFiles(list []string) {

d.mu.log.queue = merge(d.mu.log.queue, obsoleteLogs)
d.mu.versions.metrics.WAL.Files = int64(len(d.mu.log.queue))
d.mu.versions.obsoleteTables = mergeFileMetas(d.mu.versions.obsoleteTables, obsoleteTables)
d.mu.versions.obsoleteTables = mergeFileInfo(d.mu.versions.obsoleteTables, obsoleteTables)
d.mu.versions.updateObsoleteTableMetricsLocked()
d.mu.versions.obsoleteManifests = merge(d.mu.versions.obsoleteManifests, obsoleteManifests)
d.mu.versions.obsoleteOptions = merge(d.mu.versions.obsoleteOptions, obsoleteOptions)
Expand Down Expand Up @@ -3432,12 +3448,7 @@ func (d *DB) doDeleteObsoleteFiles(jobID int) {
}
}

for _, table := range d.mu.versions.obsoleteTables {
obsoleteTables = append(obsoleteTables, fileInfo{
fileNum: table.FileNum,
fileSize: table.Size,
})
}
obsoleteTables = append(obsoleteTables, d.mu.versions.obsoleteTables...)
d.mu.versions.obsoleteTables = nil

// Sort the manifests cause we want to delete some contiguous prefix
Expand Down Expand Up @@ -3638,19 +3649,19 @@ func merge(a, b []fileInfo) []fileInfo {
return a[:n]
}

func mergeFileMetas(a, b []*fileMetadata) []*fileMetadata {
func mergeFileInfo(a, b []fileInfo) []fileInfo {
if len(b) == 0 {
return a
}

a = append(a, b...)
sort.Slice(a, func(i, j int) bool {
return a[i].FileNum < a[j].FileNum
return a[i].fileNum < a[j].fileNum
})

n := 0
for i := 0; i < len(a); i++ {
if n == 0 || a[i].FileNum != a[n-1].FileNum {
if n == 0 || a[i].fileNum != a[n-1].fileNum {
a[n] = a[i]
n++
}
Expand Down
8 changes: 8 additions & 0 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func loadVersion(d *datadriven.TestData) (*version, *Options, [numLevels]int64,
LargestSeqNum: key.SeqNum(),
Size: 1,
}).ExtendPointKeyBounds(opts.Comparer.Compare, key, key)
m.InitPhysicalBacking()
if size >= 100 {
// If the requested size of the level is very large only add a single
// file in order to avoid massive blow-up in the number of files in
Expand Down Expand Up @@ -364,6 +365,7 @@ func TestCompactionPickerL0(t *testing.T) {
)
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.InitPhysicalBacking()
return m, nil
}

Expand Down Expand Up @@ -595,6 +597,7 @@ func TestCompactionPickerConcurrency(t *testing.T) {
base.ParseInternalKey(strings.TrimSpace(parts[0])),
base.ParseInternalKey(strings.TrimSpace(parts[1])),
)
m.InitPhysicalBacking()
for _, p := range fields[1:] {
if strings.HasPrefix(p, "size=") {
v, err := strconv.Atoi(strings.TrimPrefix(p, "size="))
Expand Down Expand Up @@ -812,6 +815,7 @@ func TestCompactionPickerPickReadTriggered(t *testing.T) {
base.ParseInternalKey(strings.TrimSpace(parts[0])),
base.ParseInternalKey(strings.TrimSpace(parts[1])),
)
m.InitPhysicalBacking()
for _, p := range fields[1:] {
if strings.HasPrefix(p, "size=") {
v, err := strconv.Atoi(strings.TrimPrefix(p, "size="))
Expand Down Expand Up @@ -984,6 +988,7 @@ func TestPickedCompactionSetupInputs(t *testing.T) {
)
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
m.InitPhysicalBacking()
return m
}

Expand Down Expand Up @@ -1120,6 +1125,7 @@ func TestPickedCompactionExpandInputs(t *testing.T) {
base.ParseInternalKey(parts[0]),
base.ParseInternalKey(parts[1]),
)
m.InitPhysicalBacking()
return m
}

Expand Down Expand Up @@ -1203,6 +1209,7 @@ func TestCompactionOutputFileSize(t *testing.T) {
base.ParseInternalKey(strings.TrimSpace(parts[0])),
base.ParseInternalKey(strings.TrimSpace(parts[1])),
)
m.InitPhysicalBacking()
for _, p := range fields[1:] {
if strings.HasPrefix(p, "size=") {
v, err := strconv.Atoi(strings.TrimPrefix(p, "size="))
Expand Down Expand Up @@ -1342,6 +1349,7 @@ func TestCompactionPickerCompensatedSize(t *testing.T) {
for _, tc := range testCases {
t.Run("", func(t *testing.T) {
f := &fileMetadata{Size: tc.size}
f.InitPhysicalBacking()
f.Stats.PointDeletionsBytesEstimate = tc.pointDelEstimateBytes
f.Stats.RangeDeletionsBytesEstimate = tc.rangeDelEstimateBytes
gotBytes := compensatedSize(f, tc.pointTombstoneWeight)
Expand Down
Loading

0 comments on commit 2e9d3f5

Please sign in to comment.