diff --git a/compaction.go b/compaction.go index a1c78ff62f..b8c1e00620 100644 --- a/compaction.go +++ b/compaction.go @@ -1333,10 +1333,10 @@ func (c *compaction) newInputIter( // internal iterator interface). The resulting merged rangedel iterator is // then included with the point levels in a single mergingIter. newRangeDelIter := func( - f manifest.LevelFile, _ *IterOptions, bytesIterated *uint64, + f manifest.LevelFile, _ *IterOptions, l manifest.Level, bytesIterated *uint64, ) (keyspan.FragmentIterator, error) { iter, rangeDelIter, err := newIters(context.Background(), f.FileMetadata, - nil /* iter options */, internalIterOpts{bytesIterated: &c.bytesIterated}) + &IterOptions{level: l}, internalIterOpts{bytesIterated: &c.bytesIterated}) if err == nil { // TODO(peter): It is mildly wasteful to open the point iterator only to // immediately close it. One way to solve this would be to add new @@ -1459,7 +1459,7 @@ func (c *compaction) newInputIter( // mergingIter. iter := level.files.Iter() for f := iter.First(); f != nil; f = iter.Next() { - rangeDelIter, err := newRangeDelIter(iter.Take(), nil, &c.bytesIterated) + rangeDelIter, err := newRangeDelIter(iter.Take(), nil, l, &c.bytesIterated) if err != nil { // The error will already be annotated with the BackingFileNum, so // we annotate it with the FileNum. @@ -1501,7 +1501,7 @@ func (c *compaction) newInputIter( } return iter, err } - li.Init(keyspan.SpanIterOptions{}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange) + li.Init(keyspan.SpanIterOptions{Level: l}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange) rangeKeyIters = append(rangeKeyIters, li) } return nil @@ -2564,9 +2564,20 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) { info.Duration = d.timeNow().Sub(startTime) if err == nil { d.mu.versions.logLock() - err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo { - return d.getInProgressCompactionInfoLocked(c) - }) + // Confirm if any of this compaction's inputs were deleted while this + // compaction was ongoing. + for i := range c.inputs { + c.inputs[i].files.Each(func(m *manifest.FileMetadata) { + if m.Deleted { + err = firstError(err, errors.New("pebble: file deleted by a concurrent operation, will retry compaction")) + } + }) + } + if err == nil { + err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo { + return d.getInProgressCompactionInfoLocked(c) + }) + } if err != nil { // TODO(peter): untested. for _, f := range pendingOutputs { diff --git a/data_test.go b/data_test.go index b4f17784a7..3fd5225e1a 100644 --- a/data_test.go +++ b/data_test.go @@ -1174,6 +1174,32 @@ func (d *DB) waitTableStats() { } } +func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { + var exciseSpan KeyRange + paths := make([]string, 0, len(td.CmdArgs)) + for i, arg := range td.CmdArgs { + switch td.CmdArgs[i].Key { + case "excise": + if len(td.CmdArgs[i].Vals) != 1 { + return errors.New("expected 2 values for excise separated by -, eg. ingest-and-excise foo1 excise=\"start-end\"") + } + fields := strings.Split(td.CmdArgs[i].Vals[0], "-") + if len(fields) != 2 { + return errors.New("expected 2 values for excise separated by -, eg. ingest-and-excise foo1 excise=\"start-end\"") + } + exciseSpan.Start = []byte(fields[0]) + exciseSpan.End = []byte(fields[1]) + default: + paths = append(paths, arg.String()) + } + } + + if _, err := d.IngestAndExcise(paths, nil /* shared */, exciseSpan); err != nil { + return err + } + return nil +} + func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error { paths := make([]string, 0, len(td.CmdArgs)) for _, arg := range td.CmdArgs { @@ -1212,7 +1238,7 @@ func runForceIngestCmd(td *datadriven.TestData, d *DB) error { *fileMetadata, ) (int, error) { return level, nil - }) + }, nil, KeyRange{}) return err } diff --git a/flushable_test.go b/flushable_test.go index 7c5273da13..70f0a58ada 100644 --- a/flushable_test.go +++ b/flushable_test.go @@ -58,9 +58,8 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // We can reuse the ingestLoad function for this test even if we're // not actually ingesting a file. - meta, paths, err := ingestLoad( - d.opts, d.FormatMajorVersion(), paths, d.cacheID, pendingOutputs, - ) + lr, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, nil, d.cacheID, pendingOutputs) + meta := lr.localMeta if err != nil { panic(err) } @@ -70,7 +69,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { } // Verify the sstables do not overlap. - if err := ingestSortAndVerify(d.cmp, meta, paths); err != nil { + if err := ingestSortAndVerify(d.cmp, lr, KeyRange{}); err != nil { panic("unsorted sstables") } @@ -79,7 +78,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) { // (e.g. because the files reside on a different filesystem), ingestLink will // fall back to copying, and if that fails we undo our work and return an // error. - if err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil { + if err := ingestLink(jobID, d.opts, d.objProvider, lr, nil /* shared */); err != nil { panic("couldn't hard link sstables") } diff --git a/ingest.go b/ingest.go index 7c91240ff1..46905a897e 100644 --- a/ingest.go +++ b/ingest.go @@ -33,6 +33,32 @@ func sstableKeyCompare(userCmp Compare, a, b InternalKey) int { return 0 } +// KeyRange encodes a key range in user key space. A KeyRange's Start is +// inclusive while its End is exclusive. +type KeyRange struct { + Start, End []byte +} + +// Valid returns true if the KeyRange is defined. +func (k *KeyRange) Valid() bool { + return k.Start != nil && k.End != nil +} + +// Contains returns whether the specified key exists in the KeyRange. +func (k *KeyRange) Contains(cmp base.Compare, key InternalKey) bool { + v := cmp(key.UserKey, k.End) + return (v < 0 || (v == 0 && key.IsExclusiveSentinel())) && cmp(k.Start, key.UserKey) <= 0 +} + +// Overlaps checks if the specified file has an overlap with the KeyRange. +// Note that we aren't checking for full containment of m within k, rather just +// that there's some intersection between m and k's bounds. +func (k *KeyRange) Overlaps(cmp base.Compare, m *fileMetadata) bool { + v := cmp(k.Start, m.Largest.UserKey) + return v <= 0 && !(m.Largest.IsExclusiveSentinel() && v == 0) && + cmp(k.End, m.Smallest.UserKey) > 0 +} + func ingestValidateKey(opts *Options, key *InternalKey) error { if key.Kind() == InternalKeyKindInvalid { return base.CorruptionErrorf("pebble: external sstable has corrupted key: %s", @@ -45,6 +71,54 @@ func ingestValidateKey(opts *Options, key *InternalKey) error { return nil } +// ingestLoad1Shared loads the fileMetadata for one shared sstable. It also +// sets the sequence numbers for a shared sstable. +func ingestLoad1Shared( + opts *Options, sm SharedSSTMeta, fileNum base.DiskFileNum, +) (*fileMetadata, error) { + // Don't load table stats. Doing a round trip to shared storage, one SST + // at a time is not worth it as it slows down ingestion. + meta := &fileMetadata{} + meta.FileNum = fileNum.FileNum() + meta.CreationTime = time.Now().Unix() + meta.Virtual = true + meta.Size = sm.Size + meta.InitProviderBacking(fileNum) + // Set the underlying FileBacking's size to the same size as the virtualized + // view of the sstable. This ensures that we don't over-prioritize this + // sstable for compaction just yet, as we do not have a clear sense of + // what parts of this sstable are referenced by other nodes. + meta.FileBacking.Size = sm.Size + seqNum := base.SeqNumForLevel(int(sm.Level)) + if sm.LargestRangeKey.Valid() && sm.LargestRangeKey.UserKey != nil { + meta.HasRangeKeys = true + meta.SmallestRangeKey = sm.SmallestRangeKey + meta.LargestRangeKey = sm.LargestRangeKey + meta.SmallestRangeKey.SetSeqNum(seqNum) + meta.LargestRangeKey.SetSeqNum(seqNum) + meta.SmallestSeqNum = seqNum + meta.LargestSeqNum = seqNum + // Initialize meta.{Smallest,Largest} and others by calling this. + meta.ExtendRangeKeyBounds(opts.Comparer.Compare, meta.SmallestRangeKey, meta.LargestRangeKey) + } + if sm.LargestPointKey.Valid() && sm.LargestPointKey.UserKey != nil { + meta.HasPointKeys = true + meta.SmallestPointKey = sm.SmallestPointKey + meta.LargestPointKey = sm.LargestPointKey + meta.SmallestPointKey.SetSeqNum(seqNum) + meta.LargestPointKey.SetSeqNum(seqNum) + meta.SmallestSeqNum = seqNum + meta.LargestSeqNum = seqNum + // Initialize meta.{Smallest,Largest} and others by calling this. + meta.ExtendPointKeyBounds(opts.Comparer.Compare, meta.SmallestPointKey, meta.LargestPointKey) + } + + if err := meta.Validate(opts.Comparer.Compare, opts.Comparer.FormatKey); err != nil { + return nil, err + } + return meta, nil +} + func ingestLoad1( opts *Options, fmv FormatMajorVersion, path string, cacheID uint64, fileNum base.DiskFileNum, ) (*fileMetadata, error) { @@ -195,22 +269,55 @@ func ingestLoad1( return meta, nil } +type ingestLoadResult struct { + localMeta, sharedMeta []*fileMetadata + localPaths []string + sharedLevels []uint8 +} + func ingestLoad( - opts *Options, fmv FormatMajorVersion, paths []string, cacheID uint64, pending []base.DiskFileNum, -) ([]*fileMetadata, []string, error) { + opts *Options, + fmv FormatMajorVersion, + paths []string, + shared []SharedSSTMeta, + cacheID uint64, + pending []base.DiskFileNum, +) (ingestLoadResult, error) { meta := make([]*fileMetadata, 0, len(paths)) newPaths := make([]string, 0, len(paths)) for i := range paths { m, err := ingestLoad1(opts, fmv, paths[i], cacheID, pending[i]) if err != nil { - return nil, nil, err + return ingestLoadResult{}, err } if m != nil { meta = append(meta, m) newPaths = append(newPaths, paths[i]) } } - return meta, newPaths, nil + if len(shared) == 0 { + return ingestLoadResult{localMeta: meta, localPaths: newPaths}, nil + } + sharedMeta := make([]*fileMetadata, 0, len(shared)) + levels := make([]uint8, 0, len(shared)) + for i := range shared { + m, err := ingestLoad1Shared(opts, shared[i], pending[len(paths)+i]) + if err != nil { + return ingestLoadResult{}, err + } + if shared[i].Level < sharedLevelsStart { + return ingestLoadResult{}, errors.New("cannot ingest shared file in level below sharedLevelsStart") + } + sharedMeta = append(sharedMeta, m) + levels = append(levels, shared[i].Level) + } + result := ingestLoadResult{ + localMeta: meta, + sharedMeta: sharedMeta, + localPaths: newPaths, + sharedLevels: levels, + } + return result, nil } // Struct for sorting metadatas by smallest user keys, while ensuring the @@ -235,20 +342,48 @@ func (m metaAndPaths) Swap(i, j int) { m.paths[i], m.paths[j] = m.paths[j], m.paths[i] } -func ingestSortAndVerify(cmp Compare, meta []*fileMetadata, paths []string) error { - if len(meta) <= 1 { +func ingestSortAndVerify(cmp Compare, lr ingestLoadResult, exciseSpan KeyRange) error { + // Verify that all the shared files (i.e. files in sharedMeta) + // fit within the exciseSpan. + for i := range lr.sharedMeta { + f := lr.sharedMeta[i] + if !exciseSpan.Contains(cmp, f.Smallest) || !exciseSpan.Contains(cmp, f.Largest) { + return errors.AssertionFailedf("pebble: shared file outside of excise span") + } + } + if len(lr.localMeta) <= 1 || len(lr.localPaths) <= 1 { return nil } sort.Sort(&metaAndPaths{ - meta: meta, - paths: paths, + meta: lr.localMeta, + paths: lr.localPaths, cmp: cmp, }) - for i := 1; i < len(meta); i++ { - if sstableKeyCompare(cmp, meta[i-1].Largest, meta[i].Smallest) >= 0 { - return errors.New("pebble: external sstables have overlapping ranges") + for i := 1; i < len(lr.localPaths); i++ { + if sstableKeyCompare(cmp, lr.localMeta[i-1].Largest, lr.localMeta[i].Smallest) >= 0 { + return errors.AssertionFailedf("pebble: external sstables have overlapping ranges") + } + } + if len(lr.sharedMeta) == 0 { + return nil + } + filesInLevel := make([]*fileMetadata, 0, len(lr.sharedMeta)) + for l := sharedLevelsStart; l < numLevels; l++ { + filesInLevel = filesInLevel[:0] + for i := range lr.sharedMeta { + if lr.sharedLevels[i] == uint8(l) { + filesInLevel = append(filesInLevel, lr.sharedMeta[i]) + } + } + sort.Slice(filesInLevel, func(i, j int) bool { + return cmp(filesInLevel[i].Smallest.UserKey, filesInLevel[j].Smallest.UserKey) < 0 + }) + for i := 1; i < len(filesInLevel); i++ { + if sstableKeyCompare(cmp, filesInLevel[i-1].Largest, filesInLevel[i].Smallest) >= 0 { + return errors.AssertionFailedf("pebble: external shared sstables have overlapping ranges") + } } } return nil @@ -265,17 +400,21 @@ func ingestCleanup(objProvider objstorage.Provider, meta []*fileMetadata) error } // ingestLink creates new objects which are backed by either hardlinks to or -// copies of the ingested files. +// copies of the ingested files. It also attaches shared objects to the provider. func ingestLink( - jobID int, opts *Options, objProvider objstorage.Provider, paths []string, meta []*fileMetadata, + jobID int, + opts *Options, + objProvider objstorage.Provider, + lr ingestLoadResult, + shared []SharedSSTMeta, ) error { - for i := range paths { + for i := range lr.localPaths { objMeta, err := objProvider.LinkOrCopyFromLocal( - context.TODO(), opts.FS, paths[i], fileTypeTable, meta[i].FileBacking.DiskFileNum, + context.TODO(), opts.FS, lr.localPaths[i], fileTypeTable, lr.localMeta[i].FileBacking.DiskFileNum, objstorage.CreateOptions{PreferSharedStorage: true}, ) if err != nil { - if err2 := ingestCleanup(objProvider, meta[:i]); err2 != nil { + if err2 := ingestCleanup(objProvider, lr.localMeta[:i]); err2 != nil { opts.Logger.Infof("ingest cleanup failed: %v", err2) } return err @@ -285,7 +424,33 @@ func ingestLink( JobID: jobID, Reason: "ingesting", Path: objProvider.Path(objMeta), - FileNum: meta[i].FileNum, + FileNum: lr.localMeta[i].FileNum, + }) + } + } + sharedObjs := make([]objstorage.SharedObjectToAttach, 0, len(shared)) + for i := range shared { + backing, err := shared[i].Backing.Get() + if err != nil { + return err + } + sharedObjs = append(sharedObjs, objstorage.SharedObjectToAttach{ + FileNum: lr.sharedMeta[i].FileBacking.DiskFileNum, + FileType: fileTypeTable, + Backing: backing, + }) + } + sharedObjMetas, err := objProvider.AttachSharedObjects(sharedObjs) + if err != nil { + return err + } + for i := range sharedObjMetas { + if opts.EventListener.TableCreated != nil { + opts.EventListener.TableCreated(TableCreateInfo{ + JobID: jobID, + Reason: "ingesting", + Path: objProvider.Path(sharedObjMetas[i]), + FileNum: lr.sharedMeta[i].FileNum, }) } } @@ -310,7 +475,8 @@ func ingestMemtableOverlaps(cmp Compare, mem flushable, meta []*fileMetadata) bo } for _, m := range meta { - if overlapWithIterator(iter, &rangeDelIter, rkeyIter, m, cmp) { + kr := internalKeyRange{smallest: m.Smallest, largest: m.Largest} + if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, cmp) { closeIters() return true } @@ -362,21 +528,25 @@ func ingestUpdateSeqNum( return nil } +type internalKeyRange struct { + smallest, largest InternalKey +} + func overlapWithIterator( iter internalIterator, rangeDelIter *keyspan.FragmentIterator, rkeyIter keyspan.FragmentIterator, - meta *fileMetadata, + keyRange internalKeyRange, cmp Compare, ) bool { // Check overlap with point operations. // // When using levelIter, it seeks to the SST whose boundaries - // contain meta.Smallest.UserKey(S). + // contain keyRange.smallest.UserKey(S). // It then tries to find a point in that SST that is >= S. // If there's no such point it means the SST ends in a tombstone in which case // levelIter.SeekGE generates a boundary range del sentinel. - // The comparison of this boundary with meta.Largest(L) below + // The comparison of this boundary with keyRange.largest(L) below // is subtle but maintains correctness. // 1) boundary < L, // since boundary is also > S (initial seek), @@ -388,9 +558,9 @@ func overlapWithIterator( // means boundary < L and hence is similar to 1). // 4) boundary == L and L is sentinel, // we'll always overlap since for any values of i,j ranges [i, k) and [j, k) always overlap. - key, _ := iter.SeekGE(meta.Smallest.UserKey, base.SeekGEFlagsNone) + key, _ := iter.SeekGE(keyRange.smallest.UserKey, base.SeekGEFlagsNone) if key != nil { - c := sstableKeyCompare(cmp, *key, meta.Largest) + c := sstableKeyCompare(cmp, *key, keyRange.largest) if c <= 0 { return true } @@ -402,7 +572,7 @@ func overlapWithIterator( computeOverlapWithSpans := func(rIter keyspan.FragmentIterator) bool { // NB: The spans surfaced by the fragment iterator are non-overlapping. - span := rIter.SeekLT(meta.Smallest.UserKey) + span := rIter.SeekLT(keyRange.smallest.UserKey) if span == nil { span = rIter.Next() } @@ -411,13 +581,13 @@ func overlapWithIterator( continue } key := span.SmallestKey() - c := sstableKeyCompare(cmp, key, meta.Largest) + c := sstableKeyCompare(cmp, key, keyRange.largest) if c > 0 { // The start of the span is after the largest key in the // ingested table. return false } - if cmp(span.End, meta.Smallest.UserKey) > 0 { + if cmp(span.End, keyRange.smallest.UserKey) > 0 { // The end of the span is greater than the smallest in the // table. Note that the span end key is exclusive, thus ">0" // instead of ">=0". @@ -537,7 +707,11 @@ func ingestTargetLevel( v.L0Sublevels.Levels[subLevel].Iter(), manifest.Level(0), manifest.KeyTypeRange, ) - overlap := overlapWithIterator(iter, &rangeDelIter, &levelIter, meta, cmp) + kr := internalKeyRange{ + smallest: meta.Smallest, + largest: meta.Largest, + } + overlap := overlapWithIterator(iter, &rangeDelIter, &levelIter, kr, cmp) err := iter.Close() // Closes range del iter as well. err = firstError(err, levelIter.Close()) if err != nil { @@ -563,7 +737,11 @@ func ingestTargetLevel( v.Levels[level].Iter(), manifest.Level(level), manifest.KeyTypeRange, ) - overlap := overlapWithIterator(levelIter, &rangeDelIter, rkeyLevelIter, meta, cmp) + kr := internalKeyRange{ + smallest: meta.Smallest, + largest: meta.Largest, + } + overlap := overlapWithIterator(levelIter, &rangeDelIter, rkeyLevelIter, kr, cmp) err := levelIter.Close() // Closes range del iter as well. err = firstError(err, rkeyLevelIter.Close()) if err != nil { @@ -612,6 +790,13 @@ func ingestTargetLevel( // the same filesystem as the DB. Sstables can be created for ingestion using // sstable.Writer. On success, Ingest removes the input paths. // +// Two types of sstables are accepted for ingestion(s): one is sstables present +// in the instance's vfs.FS and can be referenced locally. The other is sstables +// present in shared.Storage, referred to as shared or foreign sstables. These +// shared sstables can be linked through objstorageprovider.Provider, and do not +// need to already be present on the local vfs.FS. Foreign sstables must all fit +// in an excise span, and are destined for a level specified in SharedSSTMeta. +// // All sstables *must* be Sync()'d by the caller after all bytes are written // and before its file handle is closed; failure to do so could violate // durability or lead to corrupted on-disk state. This method cannot, in a @@ -623,21 +808,28 @@ func ingestTargetLevel( // Ingestion loads each sstable into the lowest level of the LSM which it // doesn't overlap (see ingestTargetLevel). If an sstable overlaps a memtable, // ingestion forces the memtable to flush, and then waits for the flush to -// occur. +// occur. In some cases, such as with no foreign sstables and no excise span, +// ingestion that gets blocked on a memtable can join the flushable queue and +// finish even before the memtable has been flushed. // // The steps for ingestion are: // // 1. Allocate file numbers for every sstable being ingested. -// 2. Load the metadata for all sstables being ingest. -// 3. Sort the sstables by smallest key, verifying non overlap. -// 4. Hard link (or copy) the sstables into the DB directory. +// 2. Load the metadata for all sstables being ingested. +// 3. Sort the sstables by smallest key, verifying non overlap (for local +// sstables). +// 4. Hard link (or copy) the local sstables into the DB directory. // 5. Allocate a sequence number to use for all of the entries in the -// sstables. This is the step where overlap with memtables is +// local sstables. This is the step where overlap with memtables is // determined. If there is overlap, we remember the most recent memtable // that overlaps. -// 6. Update the sequence number in the ingested sstables. +// 6. Update the sequence number in the ingested local sstables. (Shared +// sstables get fixed sequence numbers that were determined at load time.) // 7. Wait for the most recent memtable that overlaps to flush (if any). // 8. Add the ingested sstables to the version (DB.ingestApply). +// 8.1. If an excise span was specified, figure out what sstables in the +// current version overlap with the excise span, and create new virtual +// sstables out of those sstables that exclude the excised span (DB.excise). // 9. Publish the ingestion sequence number. // // Note that if the mutable memtable overlaps with ingestion, a flush of the @@ -654,7 +846,7 @@ func (d *DB) Ingest(paths []string) error { if d.opts.ReadOnly { return ErrReadOnly } - _, err := d.ingest(paths, ingestTargetLevel) + _, err := d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}) return err } @@ -683,7 +875,29 @@ func (d *DB) IngestWithStats(paths []string) (IngestOperationStats, error) { if d.opts.ReadOnly { return IngestOperationStats{}, ErrReadOnly } - return d.ingest(paths, ingestTargetLevel) + return d.ingest(paths, ingestTargetLevel, nil /* shared */, KeyRange{}) +} + +// IngestAndExcise does the same as IngestWithStats, and additionally accepts a +// list of shared files to ingest that can be read from a shared.Storage through +// a Provider. All the shared files must live within exciseSpan, and any existing +// keys in exciseSpan are deleted by turning existing sstables into virtual +// sstables (if not virtual already) and shrinking their spans to exclude +// exciseSpan. See the comment at Ingest for a more complete picture of the +// ingestion process. +// +// Panics if this DB instance was not instantiated with a shared.Storage and +// shared sstables are present. +func (d *DB) IngestAndExcise( + paths []string, shared []SharedSSTMeta, exciseSpan KeyRange, +) (IngestOperationStats, error) { + if err := d.closed.Load(); err != nil { + panic(err) + } + if d.opts.ReadOnly { + return IngestOperationStats{}, ErrReadOnly + } + return d.ingest(paths, ingestTargetLevel, shared, exciseSpan) } // Both DB.mu and commitPipeline.mu must be held while this is called. @@ -793,37 +1007,50 @@ func (d *DB) handleIngestAsFlushable(meta []*fileMetadata, seqNum uint64) error return nil } +// See comment at Ingest() for details on how this works. func (d *DB) ingest( - paths []string, targetLevelFunc ingestTargetLevelFunc, + paths []string, + targetLevelFunc ingestTargetLevelFunc, + shared []SharedSSTMeta, + exciseSpan KeyRange, ) (IngestOperationStats, error) { + if len(shared) > 0 && d.opts.Experimental.SharedStorage == nil { + panic("cannot ingest shared sstables with nil SharedStorage") + } + if (exciseSpan.Valid() || len(shared) > 0) && d.opts.FormatMajorVersion < ExperimentalFormatVirtualSSTables { + return IngestOperationStats{}, errors.New("pebble: format major version too old for excise or shared sstable ingestion") + } // Allocate file numbers for all of the files being ingested and mark them as // pending in order to prevent them from being deleted. Note that this causes // the file number ordering to be out of alignment with sequence number // ordering. The sorting of L0 tables by sequence number avoids relying on // that (busted) invariant. d.mu.Lock() - pendingOutputs := make([]base.DiskFileNum, len(paths)) + pendingOutputs := make([]base.DiskFileNum, len(paths)+len(shared)) for i := range paths { pendingOutputs[i] = d.mu.versions.getNextFileNum().DiskFileNum() } + for i := range shared { + pendingOutputs[len(paths)+i] = d.mu.versions.getNextFileNum().DiskFileNum() + } jobID := d.mu.nextJobID d.mu.nextJobID++ d.mu.Unlock() // Load the metadata for all of the files being ingested. This step detects // and elides empty sstables. - meta, paths, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, d.cacheID, pendingOutputs) + loadResult, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, shared, d.cacheID, pendingOutputs) if err != nil { return IngestOperationStats{}, err } - if len(meta) == 0 { + if len(loadResult.localMeta) == 0 && len(loadResult.sharedMeta) == 0 { // All of the sstables to be ingested were empty. Nothing to do. return IngestOperationStats{}, nil } // Verify the sstables do not overlap. - if err := ingestSortAndVerify(d.cmp, meta, paths); err != nil { + if err := ingestSortAndVerify(d.cmp, loadResult, exciseSpan); err != nil { return IngestOperationStats{}, err } @@ -832,7 +1059,7 @@ func (d *DB) ingest( // (e.g. because the files reside on a different filesystem), ingestLink will // fall back to copying, and if that fails we undo our work and return an // error. - if err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil { + if err := ingestLink(jobID, d.opts, d.objProvider, loadResult, shared); err != nil { return IngestOperationStats{}, err } // Make the new tables durable. We need to do this at some point before we @@ -845,11 +1072,12 @@ func (d *DB) ingest( // metaFlushableOverlaps is a slice parallel to meta indicating which of the // ingested sstables overlap some table in the flushable queue. It's used to // approximate ingest-into-L0 stats when using flushable ingests. - metaFlushableOverlaps := make([]bool, len(meta)) + metaFlushableOverlaps := make([]bool, len(loadResult.localMeta)+len(loadResult.sharedMeta)) var mem *flushableEntry var mut *memTable // asFlushable indicates whether the sstable was ingested as a flushable. var asFlushable bool + var overlapWithExciseSpan bool prepare := func(seqNum uint64) { // Note that d.commit.mu is held by commitPipeline when calling prepare. @@ -866,12 +1094,17 @@ func (d *DB) ingest( iter := m.newIter(nil) rangeDelIter := m.newRangeDelIter(nil) rkeyIter := m.newRangeKeyIter(nil) - for i := range meta { + + checkForOverlap := func(i int, meta *fileMetadata) { if metaFlushableOverlaps[i] { // This table already overlapped a more recent flushable. - continue + return } - if overlapWithIterator(iter, &rangeDelIter, rkeyIter, meta[i], d.cmp) { + kr := internalKeyRange{ + smallest: meta.Smallest, + largest: meta.Largest, + } + if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, d.cmp) { // If this is the first table to overlap a flushable, save // the flushable. This ingest must be ingested or flushed // after it. @@ -881,6 +1114,24 @@ func (d *DB) ingest( metaFlushableOverlaps[i] = true } } + for i := range loadResult.localMeta { + checkForOverlap(i, loadResult.localMeta[i]) + } + for i := range loadResult.sharedMeta { + checkForOverlap(len(loadResult.localMeta)+i, loadResult.sharedMeta[i]) + } + if exciseSpan.Valid() { + kr := internalKeyRange{ + smallest: base.MakeInternalKey(exciseSpan.Start, InternalKeySeqNumMax, InternalKeyKindMax), + largest: base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, exciseSpan.End), + } + if overlapWithIterator(iter, &rangeDelIter, rkeyIter, kr, d.cmp) { + if mem == nil { + mem = m + } + overlapWithExciseSpan = true + } + } err := iter.Close() if rangeDelIter != nil { err = firstError(err, rangeDelIter.Close()) @@ -911,9 +1162,16 @@ func (d *DB) ingest( // The ingestion overlaps with some entry in the flushable queue. if d.mu.formatVers.vers < FormatFlushableIngest || d.opts.Experimental.DisableIngestAsFlushable() || + len(shared) > 0 || overlapWithExciseSpan || (len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) { // We're not able to ingest as a flushable, // so we must synchronously flush. + // + // TODO(bilal): Currently, if any of the files being ingested are shared or + // there's overlap between the memtable and an excise span, we cannot use + // flushable ingests and need to wait synchronously. Either remove this + // caveat by fleshing out flushable ingest logic to also account for these + // cases, or remove this TODO. if mem.flushable == d.mu.mem.mutable { err = d.makeRoomForWrite(nil) } @@ -933,7 +1191,7 @@ func (d *DB) ingest( // Since there aren't too many memtables already queued up, we can // slide the ingested sstables on top of the existing memtables. asFlushable = true - err = d.handleIngestAsFlushable(meta, seqNum) + err = d.handleIngestAsFlushable(loadResult.localMeta, seqNum) } var ve *versionEdit @@ -950,12 +1208,15 @@ func (d *DB) ingest( return } - // Update the sequence number for all of the sstables in the + // Update the sequence number for all local sstables in the // metadata. Writing the metadata to the manifest when the // version edit is applied is the mechanism that persists the // sequence number. The sstables themselves are left unmodified. + // + // For shared sstables, we do not need to update sequence numbers. These + // sequence numbers are already set in ingestLoad. if err = ingestUpdateSeqNum( - d.cmp, d.opts.Comparer.FormatKey, seqNum, meta, + d.cmp, d.opts.Comparer.FormatKey, seqNum, loadResult.localMeta, ); err != nil { if mut != nil { if mut.writerUnref() { @@ -975,28 +1236,34 @@ func (d *DB) ingest( // Assign the sstables to the correct level in the LSM and apply the // version edit. - ve, err = d.ingestApply(jobID, meta, targetLevelFunc, mut) + ve, err = d.ingestApply(jobID, loadResult, targetLevelFunc, mut, exciseSpan) } - d.commit.AllocateSeqNum(len(meta), prepare, apply) + d.commit.AllocateSeqNum(len(loadResult.localPaths), prepare, apply) if err != nil { - if err2 := ingestCleanup(d.objProvider, meta); err2 != nil { + if err2 := ingestCleanup(d.objProvider, loadResult.localMeta); err2 != nil { d.opts.Logger.Infof("ingest cleanup failed: %v", err2) } } else { // Since we either created a hard link to the ingesting files, or copied // them over, it is safe to remove the originals paths. - for _, path := range paths { + for _, path := range loadResult.localPaths { if err2 := d.opts.FS.Remove(path); err2 != nil { d.opts.Logger.Infof("ingest failed to remove original file: %s", err2) } } } + // NB: Shared-sstable-only ingestions do not assign a sequence number to + // any sstables. + globalSeqNum := uint64(0) + if len(loadResult.localMeta) > 0 { + globalSeqNum = loadResult.localMeta[0].SmallestSeqNum + } info := TableIngestInfo{ JobID: jobID, - GlobalSeqNum: meta[0].SmallestSeqNum, + GlobalSeqNum: globalSeqNum, Err: err, flushable: asFlushable, } @@ -1014,16 +1281,17 @@ func (d *DB) ingest( if e.Level == 0 { stats.ApproxIngestedIntoL0Bytes += e.Meta.Size } - if metaFlushableOverlaps[i] { + if i < len(metaFlushableOverlaps) && metaFlushableOverlaps[i] { stats.MemtableOverlappingFiles++ } } } else if asFlushable { + // NB: If asFlushable == true, there are no shared sstables. info.Tables = make([]struct { TableInfo Level int - }, len(meta)) - for i, f := range meta { + }, len(loadResult.localMeta)) + for i, f := range loadResult.localMeta { info.Tables[i].Level = -1 info.Tables[i].TableInfo = f.TableInfo() stats.Bytes += f.Size @@ -1046,6 +1314,266 @@ func (d *DB) ingest( return stats, err } +// excise updates ve to include a replacement of the file m with new virtual +// sstables that exclude exciseSpan, returning a slice of newly-created files if +// any. If the entirety of m is deleted by exciseSpan, no new sstables are added +// and m is deleted. Note that ve is updated in-place. +// +// The manifest lock must be held when calling this method. +func (d *DB) excise( + exciseSpan KeyRange, m *fileMetadata, ve *versionEdit, level int, +) ([]manifest.NewFileEntry, error) { + numCreatedFiles := 0 + // Check if there's actually an overlap between m and exciseSpan. + if !exciseSpan.Overlaps(d.cmp, m) { + return nil, nil + } + ve.DeletedFiles[deletedFileEntry{ + Level: level, + FileNum: m.FileNum, + }] = m + // Fast path: m sits entirely within the exciseSpan, so just delete it. + if exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) { + return nil, nil + } + var iter internalIterator + var rangeDelIter keyspan.FragmentIterator + var rangeKeyIter keyspan.FragmentIterator + backingTableCreated := false + // Create a file to the left of the excise span, if necessary. + // The bounds of this file will be [m.Smallest, lastKeyBefore(exciseSpan.Start)]. + // + // We create bounds that are tight on user keys, and we make the effort to find + // the last key in the original sstable that's smaller than exciseSpan.Start + // even though it requires some sstable reads. We could choose to create + // virtual sstables on loose userKey bounds, in which case we could just set + // leftFile.Largest to an exclusive sentinel at exciseSpan.Start. The biggest + // issue with that approach would be that it'd lead to lots of small virtual + // sstables in the LSM that have no guarantee on containing even a single user + // key within the file bounds. This has the potential to increase both read and + // write-amp as we will be opening up these sstables only to find no relevant + // keys in the read path, and compacting sstables on top of them instead of + // directly into the space occupied by them. We choose to incur the cost of + // calculating tight bounds at this time instead of creating more work in the + // future. + // + // TODO(bilal): Some of this work can happen without grabbing the manifest + // lock; we could grab one currentVersion, release the lock, calculate excised + // files, then grab the lock again and recalculate for just the files that + // have changed since our previous calculation. Do this optimiaztino as part of + // https://github.com/cockroachdb/pebble/issues/2112 . + if d.cmp(m.Smallest.UserKey, exciseSpan.Start) < 0 { + leftFile := &fileMetadata{ + Virtual: true, + FileBacking: m.FileBacking, + FileNum: d.mu.versions.getNextFileNum(), + } + leftFile.Smallest = m.Smallest + leftFile.SmallestRangeKey = m.SmallestRangeKey + leftFile.SmallestPointKey = m.SmallestPointKey + leftFile.HasPointKeys = m.HasPointKeys + leftFile.HasRangeKeys = m.HasRangeKeys + if m.HasPointKeys && exciseSpan.Contains(d.cmp, m.SmallestPointKey) { + // This file will not contain any point keys, but will contain range keys. + leftFile.HasPointKeys = false + leftFile.Smallest = m.SmallestRangeKey + } else if m.HasRangeKeys && exciseSpan.Contains(d.cmp, m.SmallestRangeKey) { + leftFile.HasRangeKeys = false + leftFile.Smallest = m.SmallestPointKey + } + if leftFile.HasPointKeys { + var err error + iter, rangeDelIter, err = d.newIters(context.TODO(), m, &IterOptions{level: manifest.Level(level)}, internalIterOpts{}) + if err != nil { + return nil, err + } + var key *InternalKey + if iter != nil { + defer iter.Close() + key, _ = iter.SeekLT(exciseSpan.Start, base.SeekLTFlagsNone) + } else { + iter = emptyIter + } + // Store the min of (exciseSpan.Start, rdel.End) in lastRangeDel. This + // needs to be a copy if the key is owned by the range del iter. + var lastRangeDel []byte + if rangeDelIter != nil { + defer rangeDelIter.Close() + rdel := rangeDelIter.SeekLT(exciseSpan.Start) + if rdel != nil { + lastRangeDel = append(lastRangeDel[:0], rdel.End...) + if d.cmp(lastRangeDel, exciseSpan.Start) > 0 { + lastRangeDel = exciseSpan.Start + } + } + } else { + rangeDelIter = emptyKeyspanIter + } + leftFile.HasPointKeys = key != nil || lastRangeDel != nil + if key != nil && (lastRangeDel == nil || d.cmp(lastRangeDel, key.UserKey) <= 0) { + leftFile.LargestPointKey = key.Clone() + } else if lastRangeDel != nil { + // key == nil || lastRangeDel > key.UserKey. + leftFile.LargestPointKey = base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, lastRangeDel) + } + leftFile.Largest = leftFile.LargestPointKey + } + if leftFile.HasRangeKeys { + var err error + rangeKeyIter, err = d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{}) + if err != nil { + return nil, err + } + // Store the min of (exciseSpan.Start, rkey.End) in lastRangeKey. This + // needs to be a copy if the key is owned by the range key iter. + var lastRangeKey []byte + var lastRangeKeyKind InternalKeyKind + defer rangeKeyIter.Close() + rkey := rangeKeyIter.SeekLT(exciseSpan.Start) + if rkey != nil { + lastRangeKey = append(lastRangeKey[:0], rkey.End...) + if d.cmp(lastRangeKey, exciseSpan.Start) > 0 { + lastRangeKey = exciseSpan.Start + } + lastRangeKeyKind = rkey.Keys[0].Kind() + } + leftFile.HasRangeKeys = lastRangeKey != nil + if leftFile.HasRangeKeys { + leftFile.LargestRangeKey = base.MakeExclusiveSentinelKey(lastRangeKeyKind, lastRangeKey) + if !leftFile.HasPointKeys || base.InternalCompare(d.cmp, leftFile.LargestPointKey, leftFile.LargestRangeKey) < 0 { + leftFile.Largest = leftFile.LargestRangeKey + } + } + } + if leftFile.HasRangeKeys || leftFile.HasPointKeys { + var err error + leftFile.Size, err = d.tableCache.estimateSize(m, leftFile.Smallest.UserKey, leftFile.Largest.UserKey) + if err != nil { + return nil, err + } + if err := leftFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil { + return nil, err + } + ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: leftFile}) + ve.CreatedBackingTables = append(ve.CreatedBackingTables, leftFile.FileBacking) + backingTableCreated = true + numCreatedFiles++ + } + } + // Create a file to the right, if necessary. + if exciseSpan.Contains(d.cmp, m.Largest) { + // No key exists to the right of the excise span in this file. + return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil + } + // Create a new file, rightFile, between [firstKeyAfter(exciseSpan.End), m.Largest]. + // + // See comment before the definition of leftFile for the motivation behind + // calculating tight user-key bounds. + rightFile := &fileMetadata{ + Virtual: true, + FileBacking: m.FileBacking, + FileNum: d.mu.versions.getNextFileNum(), + } + rightFile.Largest = m.Largest + rightFile.LargestRangeKey = m.LargestRangeKey + rightFile.LargestPointKey = m.LargestPointKey + rightFile.HasPointKeys = m.HasPointKeys + rightFile.HasRangeKeys = m.HasRangeKeys + if m.HasPointKeys && exciseSpan.Contains(d.cmp, m.LargestPointKey) { + // This file will not contain any point keys, but will contain range keys. + rightFile.HasPointKeys = false + rightFile.Largest = m.LargestRangeKey + } else if m.HasRangeKeys && exciseSpan.Contains(d.cmp, m.LargestRangeKey) { + rightFile.HasRangeKeys = false + rightFile.Largest = m.LargestPointKey + } + if rightFile.HasPointKeys { + var err error + if iter == nil && rangeDelIter == nil { + iter, rangeDelIter, err = d.newIters(context.TODO(), m, &IterOptions{level: manifest.Level(level)}, internalIterOpts{}) + if err != nil { + return nil, err + } + if iter != nil { + defer iter.Close() + } else { + iter = emptyIter + } + if rangeDelIter != nil { + defer rangeDelIter.Close() + } else { + rangeDelIter = emptyKeyspanIter + } + } + // Store the max of (exciseSpan.End, rdel.Start) in firstRangeDel. This + // needs to be a copy if the key is owned by the range del iter. + key, _ := iter.SeekGE(exciseSpan.End, base.SeekGEFlagsNone) + var firstRangeDel []byte + rdel := rangeDelIter.SeekGE(exciseSpan.End) + if rdel != nil { + firstRangeDel = append(firstRangeDel[:0], rdel.Start...) + if d.cmp(firstRangeDel, exciseSpan.End) < 0 { + firstRangeDel = exciseSpan.End + } + } + rightFile.HasPointKeys = key != nil || firstRangeDel != nil + if key != nil && (firstRangeDel == nil || base.InternalCompare(d.cmp, *key, rdel.SmallestKey()) < 0) { + rightFile.SmallestPointKey = key.Clone() + } else if firstRangeDel != nil { + // key == nil || firstRangeDel <= key.UserKey. + rightFile.SmallestPointKey = rdel.SmallestKey() + rightFile.SmallestPointKey.UserKey = firstRangeDel + } + rightFile.Smallest = rightFile.SmallestPointKey + } + if rightFile.HasRangeKeys { + if rangeKeyIter == nil { + var err error + rangeKeyIter, err = d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{}) + if err != nil { + return nil, err + } + defer rangeKeyIter.Close() + } + // Store the max of (exciseSpan.End, rkey.Start) in firstRangeKey. This + // needs to be a copy if the key is owned by the range key iter. + var firstRangeKey []byte + rkey := rangeKeyIter.SeekGE(exciseSpan.End) + if rkey != nil { + firstRangeKey = append(firstRangeKey[:0], rkey.Start...) + if d.cmp(firstRangeKey, exciseSpan.End) < 0 { + firstRangeKey = exciseSpan.End + } + } + rightFile.HasRangeKeys = firstRangeKey != nil + if rightFile.HasRangeKeys { + rightFile.SmallestRangeKey = rkey.SmallestKey() + rightFile.SmallestRangeKey.UserKey = firstRangeKey + if !rightFile.HasPointKeys || base.InternalCompare(d.cmp, rightFile.SmallestPointKey, rightFile.SmallestRangeKey) > 0 { + rightFile.Smallest = rightFile.SmallestRangeKey + } + } + } + if rightFile.HasRangeKeys || rightFile.HasPointKeys { + var err error + rightFile.Size, err = d.tableCache.estimateSize(m, rightFile.Smallest.UserKey, rightFile.Largest.UserKey) + if err != nil { + return nil, err + } + ve.NewFiles = append(ve.NewFiles, newFileEntry{Level: level, Meta: rightFile}) + if !backingTableCreated { + ve.CreatedBackingTables = append(ve.CreatedBackingTables, rightFile.FileBacking) + backingTableCreated = true + } + numCreatedFiles++ + } + + if err := rightFile.Validate(d.cmp, d.opts.Comparer.FormatKey); err != nil { + return nil, err + } + return ve.NewFiles[len(ve.NewFiles)-numCreatedFiles:], nil +} + type ingestTargetLevelFunc func( newIters tableNewIters, newRangeKeyIter keyspan.TableNewSpanIter, @@ -1058,13 +1586,20 @@ type ingestTargetLevelFunc func( ) (int, error) func (d *DB) ingestApply( - jobID int, meta []*fileMetadata, findTargetLevel ingestTargetLevelFunc, mut *memTable, + jobID int, + lr ingestLoadResult, + findTargetLevel ingestTargetLevelFunc, + mut *memTable, + exciseSpan KeyRange, ) (*versionEdit, error) { d.mu.Lock() defer d.mu.Unlock() ve := &versionEdit{ - NewFiles: make([]newFileEntry, len(meta)), + NewFiles: make([]newFileEntry, len(lr.localMeta)+len(lr.sharedMeta)), + } + if exciseSpan.Valid() { + ve.DeletedFiles = map[manifest.DeletedFileEntry]*manifest.FileMetadata{} } metrics := make(map[int]*LevelMetrics) @@ -1089,13 +1624,43 @@ func (d *DB) ingestApply( current := d.mu.versions.currentVersion() baseLevel := d.mu.versions.picker.getBaseLevel() iterOps := IterOptions{logger: d.opts.Logger} - for i := range meta { + for i := 0; i < len(lr.localMeta)+len(lr.sharedMeta); i++ { // Determine the lowest level in the LSM for which the sstable doesn't // overlap any existing files in the level. - m := meta[i] + var m *fileMetadata + sharedIdx := -1 + sharedLevel := -1 + if i < len(lr.localMeta) { + m = lr.localMeta[i] + } else { + sharedIdx = i - len(lr.localMeta) + m = lr.sharedMeta[sharedIdx] + sharedLevel = int(lr.sharedLevels[sharedIdx]) + } f := &ve.NewFiles[i] var err error - f.Level, err = findTargetLevel(d.newIters, d.tableNewRangeKeyIter, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m) + if sharedIdx >= 0 { + f.Level = sharedLevel + if f.Level < sharedLevelsStart { + panic("cannot slot a shared file higher than the highest shared level") + } + ve.CreatedBackingTables = append(ve.CreatedBackingTables, m.FileBacking) + } else { + if exciseSpan.Valid() && exciseSpan.Contains(d.cmp, m.Smallest) && exciseSpan.Contains(d.cmp, m.Largest) { + // This file fits perfectly within the excise span. We can slot it at + // L6, or sharedLevelsStart - 1 if we have shared files. + if len(lr.sharedMeta) > 0 { + f.Level = sharedLevelsStart - 1 + if baseLevel > f.Level { + f.Level = 0 + } + } else { + f.Level = 6 + } + } else { + f.Level, err = findTargetLevel(d.newIters, d.tableNewRangeKeyIter, iterOps, d.cmp, current, baseLevel, d.mu.compact.inProgress, m) + } + } if err != nil { d.mu.versions.logUnlock() return nil, err @@ -1111,6 +1676,51 @@ func (d *DB) ingestApply( levelMetrics.BytesIngested += m.Size levelMetrics.TablesIngested++ } + if exciseSpan.Valid() { + // Iterate through all levels and find files that intersect with exciseSpan. + // + // TODO(bilal): We could drop the DB mutex here as we don't need it for + // excises; we only need to hold the version lock which we already are + // holding. However releasing the DB mutex could mess with the + // ingestTargetLevel calculation that happened above, as it assumed that it + // had a complete view of in-progress compactions that wouldn't change + // until logAndApply is called. If we were to drop the mutex now, we could + // schedule another in-progress compaction that would go into the chosen target + // level and lead to file overlap within level (which would panic in + // logAndApply). We should drop the db mutex here, do the excise, then + // re-grab the DB mutex and rerun just the in-progress compaction check to + // see if any new compactions are conflicting with our chosen target levels + // for files, and if they are, we should signal those compactions to error + // out. + for level := range current.Levels { + iter := current.Levels[level].Iter() + for m := iter.SeekGE(d.cmp, exciseSpan.Start); m != nil && d.cmp(m.Smallest.UserKey, exciseSpan.End) < 0; m = iter.Next() { + excised, err := d.excise(exciseSpan, m, ve, level) + if err != nil { + return nil, err + } + + if _, ok := ve.DeletedFiles[deletedFileEntry{ + Level: level, + FileNum: m.FileNum, + }]; !ok { + // We did not excise this file. + continue + } + levelMetrics := metrics[level] + if levelMetrics == nil { + levelMetrics = &LevelMetrics{} + metrics[level] = levelMetrics + } + levelMetrics.NumFiles-- + levelMetrics.Size -= int64(m.Size) + for i := range excised { + levelMetrics.NumFiles++ + levelMetrics.Size += int64(excised[i].Meta.Size) + } + } + } + } if err := d.mu.versions.logAndApply(jobID, ve, metrics, false /* forceRotation */, func() []compactionInfo { return d.getInProgressCompactionInfoLocked(nil) }); err != nil { diff --git a/ingest_test.go b/ingest_test.go index 9f94ff3ea4..63bd4f1462 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/pebble/internal/testkeys" "github.com/cockroachdb/pebble/objstorage" "github.com/cockroachdb/pebble/objstorage/objstorageprovider" + "github.com/cockroachdb/pebble/objstorage/shared" "github.com/cockroachdb/pebble/record" "github.com/cockroachdb/pebble/sstable" "github.com/cockroachdb/pebble/vfs" @@ -124,12 +125,12 @@ func TestIngestLoad(t *testing.T) { Comparer: DefaultComparer, FS: mem, }).WithFSDefaults() - meta, _, err := ingestLoad(opts, dbVersion, []string{"ext"}, 0, []base.DiskFileNum{base.FileNum(1).DiskFileNum()}) + lr, err := ingestLoad(opts, dbVersion, []string{"ext"}, nil, 0, []base.DiskFileNum{base.FileNum(1).DiskFileNum()}) if err != nil { return err.Error() } var buf bytes.Buffer - for _, m := range meta { + for _, m := range lr.localMeta { fmt.Fprintf(&buf, "%d: %s-%s\n", m.FileNum, m.Smallest, m.Largest) fmt.Fprintf(&buf, " points: %s-%s\n", m.SmallestPointKey, m.LargestPointKey) fmt.Fprintf(&buf, " ranges: %s-%s\n", m.SmallestRangeKey, m.LargestRangeKey) @@ -211,13 +212,13 @@ func TestIngestLoadRand(t *testing.T) { Comparer: DefaultComparer, FS: mem, }).WithFSDefaults() - meta, _, err := ingestLoad(opts, version, paths, 0, pending) + lr, err := ingestLoad(opts, version, paths, nil, 0, pending) require.NoError(t, err) - for _, m := range meta { + for _, m := range lr.localMeta { m.CreationTime = 0 } - if diff := pretty.Diff(expected, meta); diff != nil { + if diff := pretty.Diff(expected, lr.localMeta); diff != nil { t.Fatalf("%s", strings.Join(diff, "\n")) } } @@ -232,7 +233,7 @@ func TestIngestLoadInvalid(t *testing.T) { Comparer: DefaultComparer, FS: mem, }).WithFSDefaults() - if _, _, err := ingestLoad(opts, internalFormatNewest, []string{"invalid"}, 0, []base.DiskFileNum{base.FileNum(1).DiskFileNum()}); err == nil { + if _, err := ingestLoad(opts, internalFormatNewest, []string{"invalid"}, nil, 0, []base.DiskFileNum{base.FileNum(1).DiskFileNum()}); err == nil { t.Fatalf("expected error, but found success") } } @@ -273,7 +274,8 @@ func TestIngestSortAndVerify(t *testing.T) { meta = append(meta, m) paths = append(paths, strconv.Itoa(i)) } - err := ingestSortAndVerify(cmp, meta, paths) + lr := ingestLoadResult{localPaths: paths, localMeta: meta} + err := ingestSortAndVerify(cmp, lr, KeyRange{}) if err != nil { return fmt.Sprintf("%v\n", err) } @@ -327,7 +329,8 @@ func TestIngestLink(t *testing.T) { opts.FS.Remove(paths[i]) } - err = ingestLink(0 /* jobID */, opts, objProvider, paths, meta) + lr := ingestLoadResult{localMeta: meta, localPaths: paths} + err = ingestLink(0 /* jobID */, opts, objProvider, lr, nil /* shared */) if i < count { if err == nil { t.Fatalf("expected error, but found success") @@ -394,7 +397,8 @@ func TestIngestLinkFallback(t *testing.T) { meta := []*fileMetadata{{FileNum: 1}} meta[0].InitPhysicalBacking() - err = ingestLink(0, opts, objProvider, []string{"source"}, meta) + lr := ingestLoadResult{localMeta: meta, localPaths: []string{"source"}} + err = ingestLink(0, opts, objProvider, lr, nil /* shared */) require.NoError(t, err) dest, err := mem.Open("000001.sst") @@ -570,6 +574,267 @@ func TestOverlappingIngestedSSTs(t *testing.T) { }) } +func TestExcise(t *testing.T) { + var mem vfs.FS + var d *DB + var flushed bool + defer func() { + require.NoError(t, d.Close()) + }() + + reset := func() { + if d != nil { + require.NoError(t, d.Close()) + } + + mem = vfs.NewMem() + require.NoError(t, mem.MkdirAll("ext", 0755)) + opts := &Options{ + FS: mem, + L0CompactionThreshold: 100, + L0StopWritesThreshold: 100, + DebugCheck: DebugCheckLevels, + EventListener: &EventListener{FlushEnd: func(info FlushInfo) { + flushed = true + }}, + FormatMajorVersion: ExperimentalFormatVirtualSSTables, + } + // Disable automatic compactions because otherwise we'll race with + // delete-only compactions triggered by ingesting range tombstones. + opts.DisableAutomaticCompactions = true + + var err error + d, err = Open("", opts) + require.NoError(t, err) + } + reset() + + datadriven.RunTest(t, "testdata/excise", func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + reset() + return "" + case "batch": + b := d.NewIndexedBatch() + if err := runBatchDefineCmd(td, b); err != nil { + return err.Error() + } + if err := b.Commit(nil); err != nil { + return err.Error() + } + return "" + case "build": + if err := runBuildCmd(td, d, mem); err != nil { + return err.Error() + } + return "" + + case "flush": + if err := d.Flush(); err != nil { + return err.Error() + } + return "" + + case "ingest": + flushed = false + if err := runIngestCmd(td, d, mem); err != nil { + return err.Error() + } + // Wait for a possible flush. + d.mu.Lock() + for d.mu.compact.flushing { + d.mu.compact.cond.Wait() + } + d.mu.Unlock() + if flushed { + return "memtable flushed" + } + return "" + + case "ingest-and-excise": + flushed = false + if err := runIngestAndExciseCmd(td, d, mem); err != nil { + return err.Error() + } + // Wait for a possible flush. + d.mu.Lock() + for d.mu.compact.flushing { + d.mu.compact.cond.Wait() + } + d.mu.Unlock() + if flushed { + return "memtable flushed" + } + return "" + + case "get": + return runGetCmd(t, td, d) + + case "iter": + iter := d.NewIter(&IterOptions{ + KeyTypes: IterKeyTypePointsAndRanges, + }) + return runIterCmd(td, iter, true) + + case "lsm": + return runLSMCmd(td, d) + + case "metrics": + // The asynchronous loading of table stats can change metrics, so + // wait for all the tables' stats to be loaded. + d.mu.Lock() + d.waitTableStats() + d.mu.Unlock() + + return d.Metrics().String() + + case "wait-pending-table-stats": + return runTableStatsCmd(td, d) + + case "excise": + ve := &versionEdit{ + DeletedFiles: map[deletedFileEntry]*fileMetadata{}, + } + var exciseSpan KeyRange + if len(td.CmdArgs) != 2 { + panic("insufficient args for compact command") + } + exciseSpan.Start = []byte(td.CmdArgs[0].Key) + exciseSpan.End = []byte(td.CmdArgs[1].Key) + + d.mu.Lock() + d.mu.versions.logLock() + d.mu.Unlock() + current := d.mu.versions.currentVersion() + for level := range current.Levels { + iter := current.Levels[level].Iter() + for m := iter.SeekGE(d.cmp, exciseSpan.Start); m != nil && d.cmp(m.Smallest.UserKey, exciseSpan.End) < 0; m = iter.Next() { + _, err := d.excise(exciseSpan, m, ve, level) + if err != nil { + d.mu.Lock() + d.mu.versions.logUnlock() + d.mu.Unlock() + return fmt.Sprintf("error when excising %s: %s", m.FileNum, err.Error()) + } + } + } + d.mu.Lock() + d.mu.versions.logUnlock() + d.mu.Unlock() + return fmt.Sprintf("would excise %d files, use ingest-and-excise to excise.\n%s", len(ve.DeletedFiles), ve.String()) + + case "compact": + if len(td.CmdArgs) != 2 { + panic("insufficient args for compact command") + } + l := td.CmdArgs[0].Key + r := td.CmdArgs[1].Key + err := d.Compact([]byte(l), []byte(r), false) + if err != nil { + return err.Error() + } + return "" + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} + +func TestIngestShared(t *testing.T) { + mem := vfs.NewMem() + var d *DB + var provider2 objstorage.Provider + opts2 := Options{FS: vfs.NewMem(), FormatMajorVersion: ExperimentalFormatVirtualSSTables} + opts2.EnsureDefaults() + + // Create an objProvider where we will fake-create some sstables that can + // then be shared back to the db instance. + providerSettings := objstorageprovider.Settings{ + Logger: opts2.Logger, + FS: opts2.FS, + FSDirName: "", + FSDirInitialListing: nil, + FSCleaner: opts2.Cleaner, + NoSyncOnClose: opts2.NoSyncOnClose, + BytesPerSync: opts2.BytesPerSync, + } + providerSettings.Shared.Storage = shared.NewInMem() + + provider2, err := objstorageprovider.Open(providerSettings) + require.NoError(t, err) + creatorIDCounter := uint64(1) + provider2.SetCreatorID(objstorage.CreatorID(creatorIDCounter)) + creatorIDCounter++ + + defer func() { + require.NoError(t, d.Close()) + }() + + reset := func() { + if d != nil { + require.NoError(t, d.Close()) + } + + mem = vfs.NewMem() + require.NoError(t, mem.MkdirAll("ext", 0755)) + opts := &Options{ + FormatMajorVersion: ExperimentalFormatVirtualSSTables, + FS: mem, + L0CompactionThreshold: 100, + L0StopWritesThreshold: 100, + } + opts.Experimental.SharedStorage = providerSettings.Shared.Storage + + var err error + d, err = Open("", opts) + require.NoError(t, err) + require.NoError(t, d.SetCreatorID(creatorIDCounter)) + creatorIDCounter++ + } + reset() + + metaMap := map[base.DiskFileNum]objstorage.ObjectMetadata{} + + require.NoError(t, d.Set([]byte("d"), []byte("unexpected"), nil)) + require.NoError(t, d.Set([]byte("e"), []byte("unexpected"), nil)) + require.NoError(t, d.Set([]byte("a"), []byte("unexpected"), nil)) + require.NoError(t, d.Set([]byte("f"), []byte("unexpected"), nil)) + d.Flush() + + { + // Create a shared file. + fn := base.FileNum(2) + f, meta, err := provider2.Create(context.TODO(), fileTypeTable, fn.DiskFileNum(), objstorage.CreateOptions{PreferSharedStorage: true}) + require.NoError(t, err) + w := sstable.NewWriter(f, d.opts.MakeWriterOptions(0, d.opts.FormatMajorVersion.MaxTableFormat())) + w.Set([]byte("d"), []byte("shared")) + w.Set([]byte("e"), []byte("shared")) + w.Close() + metaMap[fn.DiskFileNum()] = meta + } + + m := metaMap[base.FileNum(2).DiskFileNum()] + handle, err := provider2.SharedObjectBacking(&m) + require.NoError(t, err) + size, err := provider2.Size(m) + require.NoError(t, err) + + sharedSSTMeta := SharedSSTMeta{ + Backing: handle, + Smallest: base.MakeInternalKey([]byte("d"), 0, InternalKeyKindSet), + Largest: base.MakeInternalKey([]byte("e"), 0, InternalKeyKindSet), + SmallestPointKey: base.MakeInternalKey([]byte("d"), 0, InternalKeyKindSet), + LargestPointKey: base.MakeInternalKey([]byte("e"), 0, InternalKeyKindSet), + Level: 6, + Size: uint64(size + 5), + } + _, err = d.IngestAndExcise([]string{}, []SharedSSTMeta{sharedSSTMeta}, KeyRange{Start: []byte("d"), End: []byte("ee")}) + require.NoError(t, err) + + // TODO(bilal): Once reading of shared sstables is in, verify that the values + // of d and e have been updated. +} + func TestIngestMemtableOverlaps(t *testing.T) { comparers := []Comparer{ {Name: "default", Compare: DefaultComparer.Compare, FormatKey: DefaultComparer.FormatKey}, @@ -662,6 +927,38 @@ func TestIngestMemtableOverlaps(t *testing.T) { } } +func TestKeyRangeBasic(t *testing.T) { + cmp := base.DefaultComparer.Compare + k1 := KeyRange{Start: []byte("b"), End: []byte("c")} + + // Tests for Contains() + require.True(t, k1.Contains(cmp, base.MakeInternalKey([]byte("b"), 1, InternalKeyKindSet))) + require.False(t, k1.Contains(cmp, base.MakeInternalKey([]byte("c"), 1, InternalKeyKindSet))) + require.True(t, k1.Contains(cmp, base.MakeInternalKey([]byte("bb"), 1, InternalKeyKindSet))) + require.True(t, k1.Contains(cmp, base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, []byte("c")))) + + m1 := &fileMetadata{ + Smallest: base.MakeInternalKey([]byte("b"), 1, InternalKeyKindSet), + Largest: base.MakeInternalKey([]byte("c"), 1, InternalKeyKindSet), + } + require.True(t, k1.Overlaps(cmp, m1)) + m2 := &fileMetadata{ + Smallest: base.MakeInternalKey([]byte("c"), 1, InternalKeyKindSet), + Largest: base.MakeInternalKey([]byte("d"), 1, InternalKeyKindSet), + } + require.False(t, k1.Overlaps(cmp, m2)) + m3 := &fileMetadata{ + Smallest: base.MakeInternalKey([]byte("a"), 1, InternalKeyKindSet), + Largest: base.MakeExclusiveSentinelKey(InternalKeyKindRangeDelete, []byte("b")), + } + require.False(t, k1.Overlaps(cmp, m3)) + m4 := &fileMetadata{ + Smallest: base.MakeInternalKey([]byte("a"), 1, InternalKeyKindSet), + Largest: base.MakeInternalKey([]byte("b"), 1, InternalKeyKindSet), + } + require.True(t, k1.Overlaps(cmp, m4)) +} + func BenchmarkIngestOverlappingMemtable(b *testing.B) { assertNoError := func(err error) { b.Helper() diff --git a/internal/base/seqnums.go b/internal/base/seqnums.go index aa38226a17..cd95724e58 100644 --- a/internal/base/seqnums.go +++ b/internal/base/seqnums.go @@ -16,53 +16,40 @@ const ( // guarantee there are no keys underneath an internal key. SeqNumZero = uint64(0) - // SeqNumL6Point is the sequence number reserved for foreign point keys in L6. - // This sequence number must be lower than SeqNumL6RangeKey for range key - // masking to work correctly. - SeqNumL6Point = uint64(1) - - // SeqNumL6RangeKey is the sequence number reserved for foreign range keys in - // L6. Only RangeKeySets are expected at this level. - SeqNumL6RangeKey = uint64(2) - - // SeqNumL5RangeDel is the sequence number reserved for foreign range deletes - // in L5. These keys could delete L6 points. - SeqNumL5RangeDel = uint64(3) - - // SeqNumL5Point is the sequence number reserved for foreign point keys in L5. - // Any sst-local range deletions would have already been applied to these keys, - // so they can safely get a sequence number higher than SeqNumL5RangeDel. - // However they must have a sequence number lower than SeqNumL5RangeKey* for - // range key masking to work correctly. - SeqNumL5Point = uint64(4) - - // SeqNumL5RangeKeyUnsetDel is the sequence number reserved for foreign - // range key unsets/deletes in L5. These operations could apply to L6 - // RangeKeySets, so this sequence number must be > SeqNumL6RangeKey. - SeqNumL5RangeKeyUnsetDel = uint64(5) - - // SeqNumL5RangeKeySet is the sequence number reserved for foreign range key - // Sets in L5. These operations could apply to L6 RangeKeySets, so this - // sequence number must be > SeqNumL6RangeKey. Any SST-local rangekey - // unsets/dels have already been applied to them, so their sequence number must - // be > SeqNumL5RangeKeyUnsetDel. - SeqNumL5RangeKeySet = uint64(6) - - // Sequence numbers 7-9 are reserved for future use. + // SeqNumL6 is the sequence number reserved for foreign keys in L6. This seqnum + // will be used to expose any range key sets as well as point keys in L6. Range + // deletes do not need to be exposed in L6. + SeqNumL6 = uint64(1) + + // SeqNumL5 is the sequence number reserved for foreign keys in L5. This seqnum + // needs to be greater than SeqNumL6, so that range deletes in L5 can be + // exposed at this level and will correctly delete covering points in L6. Also + // range key unsets/dels will be exposed at this seqnum and will need to shadow + // overlapping range keys in L6. + // + // Note that we can use the same sequence number for all exposed keys in L5 as + // range dels do not delete points at the same seqnum, and range key + // unsets/deletes do not coalesce with range key sets at the same seqnum. Range + // key masking does not care about the sequence number of overlapping points + // (rather, it applies to points based on suffixes), so we can use this seqnum + // for all L5 keys exposed. + SeqNumL5 = uint64(2) + + // Sequence numbers 3-9 are reserved for future use. // SeqNumStart is the first sequence number assigned to a key written by // ourselves. SeqNumStart = uint64(10) ) -// PointSeqNumForLevel returns the appropriate reserved sequence number for -// point keys in foreign sstables at the specified level. -func PointSeqNumForLevel(level int) uint64 { +// SeqNumForLevel returns the appropriate reserved sequence number for keys in +// foreign sstables at the specified level. +func SeqNumForLevel(level int) uint64 { switch level { case 5: - return SeqNumL5Point + return SeqNumL5 case 6: - return SeqNumL6Point + return SeqNumL6 default: panic(fmt.Sprintf("unexpected foreign sstable at level %d", level)) } diff --git a/internal/keyspan/level_iter.go b/internal/keyspan/level_iter.go index 718e58fffa..6dd7ac6e23 100644 --- a/internal/keyspan/level_iter.go +++ b/internal/keyspan/level_iter.go @@ -98,7 +98,7 @@ func (l *LevelIter) Init( ) { l.err = nil l.level = level - l.tableOpts.RangeKeyFilters = opts.RangeKeyFilters + l.tableOpts = opts l.cmp = cmp l.iterFile = nil l.newIter = newIter diff --git a/internal/manifest/version.go b/internal/manifest/version.go index ca16ba1369..a9da238070 100644 --- a/internal/manifest/version.go +++ b/internal/manifest/version.go @@ -227,6 +227,12 @@ type FileMetadata struct { // we'd have to write virtual sstable stats to the version edit. Stats TableStats + // Deleted is set to true if a VersionEdit gets installed that has deleted + // this file. Protected by the manifest lock (see versionSet.logLock()). + Deleted bool + + // For L0 files only. Protected by DB.mu. Used to generate L0 sublevels and + // pick L0 compactions. Only accurate for the most recent Version. SubLevel int L0Index int minIntervalIndex int @@ -235,9 +241,6 @@ type FileMetadata struct { // NB: the alignment of this struct is 8 bytes. We pack all the bools to // ensure an optimal packing. - // For L0 files only. Protected by DB.mu. Used to generate L0 sublevels and - // pick L0 compactions. Only accurate for the most recent Version. - // // IsIntraL0Compacting is set to True if this file is part of an intra-L0 // compaction. When it's true, IsCompacting must also return true. If // Compacting is true and IsIntraL0Compacting is false for an L0 file, the @@ -370,6 +373,17 @@ func (m *FileMetadata) InitPhysicalBacking() { } } +// InitProviderBacking creates a new FileBacking for a file backed by +// an objstorage.Provider. +func (m *FileMetadata) InitProviderBacking(fileNum base.DiskFileNum) { + if !m.Virtual { + panic("pebble: provider-backed sstables must be virtual") + } + if m.FileBacking == nil { + m.FileBacking = &FileBacking{DiskFileNum: fileNum} + } +} + // ValidateVirtual should be called once the FileMetadata for a virtual sstable // is created to verify that the fields of the virtual sstable are sound. func (m *FileMetadata) ValidateVirtual(createdFrom *FileMetadata) { diff --git a/internal/rangekey/coalesce.go b/internal/rangekey/coalesce.go index 18500e7a03..c1208ca47b 100644 --- a/internal/rangekey/coalesce.go +++ b/internal/rangekey/coalesce.go @@ -6,6 +6,7 @@ package rangekey import ( "bytes" + "fmt" "math" "sort" @@ -372,9 +373,9 @@ func coalesce( // foreign sstables (i.e. shared sstables not created by us). It is largely // similar to the Transform function implemented in UserIteratorConfig in that // it calls coalesce to remove range keys shadowed by other range keys, but also -// retains the range key that does the shadowing. In addition, it outputs range -// keys with sequence numbers that match reserved sequence numbers for that -// level (i.e. SeqNumL5RangeKeySet for L5 sets, while L6 unsets/dels are elided). +// retains the range key that does the shadowing. In addition, it elides +// RangeKey unsets/dels in L6 as they are inapplicable when reading from a +// different Pebble instance. type ForeignSSTTransformer struct { Comparer *base.Comparer Level int @@ -398,7 +399,7 @@ func (f *ForeignSSTTransformer) Transform( keys := f.sortBuf.Keys dst.Keys = dst.Keys[:0] for i := range keys { - var seqNum uint64 + seqNum := keys[i].SeqNum() switch keys[i].Kind() { case base.InternalKeyKindRangeKeySet: if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 { @@ -406,9 +407,11 @@ func (f *ForeignSSTTransformer) Transform( } switch f.Level { case 5: - seqNum = base.SeqNumL5RangeKeySet + fallthrough case 6: - seqNum = base.SeqNumL6RangeKey + if seqNum != base.SeqNumForLevel(f.Level) { + panic(fmt.Sprintf("pebble: expected range key iter to return seqnum %d, got %d", base.SeqNumForLevel(f.Level), seqNum)) + } } case base.InternalKeyKindRangeKeyUnset: if invariants.Enabled && len(dst.Keys) > 0 && cmp(dst.Keys[len(dst.Keys)-1].Suffix, keys[i].Suffix) > 0 { @@ -418,7 +421,10 @@ func (f *ForeignSSTTransformer) Transform( case base.InternalKeyKindRangeKeyDelete: switch f.Level { case 5: - seqNum = base.SeqNumL5RangeKeyUnsetDel + // Emit this key. + if seqNum != base.SeqNumForLevel(f.Level) { + panic(fmt.Sprintf("pebble: expected range key iter to return seqnum %d, got %d", base.SeqNumForLevel(f.Level), seqNum)) + } case 6: // Skip this key, as foreign sstable in L6 do not need to emit range key // unsets/dels as they do not apply to any other sstables. diff --git a/open.go b/open.go index 54bab1d17d..12947e0ab2 100644 --- a/open.go +++ b/open.go @@ -804,13 +804,12 @@ func (d *DB) replayWAL( paths[i] = base.MakeFilepath(d.opts.FS, d.dirname, fileTypeTable, n) } - var meta []*manifest.FileMetadata - meta, _, err = ingestLoad( - d.opts, d.mu.formatVers.vers, paths, d.cacheID, fileNums, - ) + var lr ingestLoadResult + lr, err = ingestLoad(d.opts, d.mu.formatVers.vers, paths, nil, d.cacheID, fileNums) if err != nil { return nil, 0, err } + meta := lr.localMeta if uint32(len(meta)) != b.Count() { panic("pebble: couldn't load all files in WAL entry.") diff --git a/scan_internal.go b/scan_internal.go index 7f7a2378c5..f7d5aeb946 100644 --- a/scan_internal.go +++ b/scan_internal.go @@ -167,8 +167,6 @@ type pointCollapsingIterator struct { // position of the child iterator. savedKey InternalKey savedKeyBuf []byte - // Saved key for substituting sequence numbers. Reused to avoid an allocation. - seqNumKey InternalKey // elideRangeDeletes ignores range deletes returned by the interleaving // iterator if true. elideRangeDeletes bool @@ -180,8 +178,8 @@ type pointCollapsingIterator struct { // Used for Merge keys only. valueMerger ValueMerger valueBuf []byte - // If fixedSeqNum is non-zero, all emitted points have this fixed sequence - // number. + // If fixedSeqNum is non-zero, all emitted points are verified to have this + // fixed sequence number. fixedSeqNum uint64 } @@ -227,21 +225,23 @@ func (p *pointCollapsingIterator) SeekLT( func (p *pointCollapsingIterator) resetKey() { p.savedKey.UserKey = p.savedKeyBuf[:0] p.savedKey.Trailer = 0 - p.seqNumKey = InternalKey{} p.valueMerger = nil p.valueBuf = p.valueBuf[:0] p.iterKey = nil p.pos = pcIterPosCur } -func (p *pointCollapsingIterator) subSeqNum(key *base.InternalKey) *base.InternalKey { +func (p *pointCollapsingIterator) verifySeqNum(key *base.InternalKey) *base.InternalKey { + if !invariants.Enabled { + return key + } if p.fixedSeqNum == 0 || key == nil || key.Kind() == InternalKeyKindRangeDelete { return key } - // Reuse seqNumKey. This avoids an allocation. - p.seqNumKey.UserKey = key.UserKey - p.seqNumKey.Trailer = base.MakeTrailer(p.fixedSeqNum, key.Kind()) - return &p.seqNumKey + if key.SeqNum() != p.fixedSeqNum { + panic(fmt.Sprintf("expected foreign point key to have seqnum %d, got %d", p.fixedSeqNum, key.SeqNum())) + } + return key } // finishAndReturnMerge finishes off the valueMerger and returns the saved key. @@ -257,7 +257,7 @@ func (p *pointCollapsingIterator) finishAndReturnMerge() (*base.InternalKey, bas } p.valueMerger = nil val := base.MakeInPlaceValue(p.valueBuf) - return p.subSeqNum(&p.savedKey), val + return p.verifySeqNum(&p.savedKey), val } // findNextEntry is called to return the next key. p.iter must be positioned at the @@ -316,7 +316,7 @@ func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyV // of blocks and can determine user key changes without doing key saves // or comparisons. p.pos = pcIterPosCur - return p.subSeqNum(p.iterKey), p.iterValue + return p.verifySeqNum(p.iterKey), p.iterValue case InternalKeyKindSingleDelete: // Panic, as this iterator is not expected to observe single deletes. panic("cannot process singledel key in point collapsing iterator") @@ -377,7 +377,7 @@ func (p *pointCollapsingIterator) findNextEntry() (*base.InternalKey, base.LazyV // We should pass them as-is, but also account for any points ahead of // them. p.pos = pcIterPosCur - return p.subSeqNum(p.iterKey), p.iterValue + return p.verifySeqNum(p.iterKey), p.iterValue default: panic(fmt.Sprintf("unexpected kind: %d", p.iterKey.Kind())) } @@ -405,7 +405,7 @@ func (p *pointCollapsingIterator) findPrevEntry() (*base.InternalKey, base.LazyV for p.iterKey != nil { if !firstIteration && !p.comparer.Equal(p.iterKey.UserKey, p.savedKey.UserKey) { p.pos = pcIterPosPrev - return p.subSeqNum(&p.savedKey), p.savedValue + return p.verifySeqNum(&p.savedKey), p.savedValue } firstIteration = false if s := p.iter.Span(); s != nil && s.CoversAt(p.seqNum, p.iterKey.SeqNum()) { @@ -458,7 +458,7 @@ func (p *pointCollapsingIterator) findPrevEntry() (*base.InternalKey, base.LazyV // Prev() we encounter and return this rangedel. For now return the point ahead of // this range del (if any). p.pos = pcIterPosPrev - return p.subSeqNum(&p.savedKey), p.savedValue + return p.verifySeqNum(&p.savedKey), p.savedValue } // We take advantage of the fact that a Prev() *on* a RangeDel iterKey // always takes us to a different user key, so on the next iteration @@ -494,7 +494,7 @@ func (p *pointCollapsingIterator) findPrevEntry() (*base.InternalKey, base.LazyV } } p.pos = pcIterPosPrev - return p.subSeqNum(&p.savedKey), p.savedValue + return p.verifySeqNum(&p.savedKey), p.savedValue } // First implements the InternalIterator interface. @@ -736,10 +736,6 @@ func (d *DB) truncateSharedFile( // We will need to truncate file bounds in at least one direction. Open all // relevant iterators. - // - // TODO(bilal): Once virtual sstables go in, verify that the constraining of - // bounds to virtual sstable bounds happens below this method, so we aren't - // unintentionally exposing keys we shouldn't be exposing. iter, rangeDelIter, err := d.newIters(ctx, file, &IterOptions{ LowerBound: lower, UpperBound: upper, @@ -843,6 +839,10 @@ func (d *DB) truncateSharedFile( if len(sst.Smallest.UserKey) == 0 { return nil, true, nil } + sst.Size, err = d.tableCache.estimateSize(file, sst.Smallest.UserKey, sst.Largest.UserKey) + if err != nil { + return nil, false, err + } return sst, false, nil } @@ -875,7 +875,7 @@ func scanInternalImpl( for f := files.SeekGE(cmp, lower); f != nil && cmp(f.Smallest.UserKey, upper) < 0; f = files.Next() { var objMeta objstorage.ObjectMetadata var err error - objMeta, err = provider.Lookup(fileTypeTable, f.FileNum.DiskFileNum()) + objMeta, err = provider.Lookup(fileTypeTable, f.FileBacking.DiskFileNum) if err != nil { return err } diff --git a/sstable/reader.go b/sstable/reader.go index 145e2c51c7..716b668bf4 100644 --- a/sstable/reader.go +++ b/sstable/reader.go @@ -2946,26 +2946,6 @@ func (v *VirtualReader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { ), nil } -// NewFixedSeqnumRangeDelIter wraps Reader.NewFixedSeqnumRangeDelIter. -func (v *VirtualReader) NewFixedSeqnumRangeDelIter( - seqNum uint64, -) (keyspan.FragmentIterator, error) { - iter, err := v.reader.NewFixedSeqnumRangeDelIter(seqNum) - if err != nil { - return nil, err - } - if iter == nil { - return nil, nil - } - - // There should be no spans which cross virtual sstable bounds. So, no - // truncation should occur. - return keyspan.Truncate( - v.reader.Compare, iter, v.vState.lower.UserKey, v.vState.upper.UserKey, - &v.vState.lower, &v.vState.upper, true, /* panicOnPartialOverlap */ - ), nil -} - // NewRawRangeKeyIter wraps Reader.NewRawRangeKeyIter. func (v *VirtualReader) NewRawRangeKeyIter() (keyspan.FragmentIterator, error) { iter, err := v.reader.NewRawRangeKeyIter() @@ -3213,17 +3193,6 @@ func (r *Reader) newCompactionIter( // TODO(sumeer): plumb context.Context since this path is relevant in the user-facing // iterator. Add WithContext methods since the existing ones are public. func (r *Reader) NewRawRangeDelIter() (keyspan.FragmentIterator, error) { - return r.NewFixedSeqnumRangeDelIter(r.Properties.GlobalSeqNum) -} - -// NewFixedSeqnumRangeDelIter returns an internal iterator for the contents of -// the range-del block of the table, with a custom sequence number to be used as -// the global sequence number for this block. Returns nil if the table does not -// contain any range deletions. -// -// TODO(sumeer): plumb context.Context since this path is relevant in the user-facing -// iterator. Add WithContext methods since the existing ones are public. -func (r *Reader) NewFixedSeqnumRangeDelIter(seqNum uint64) (keyspan.FragmentIterator, error) { if r.rangeDelBH.Length == 0 { return nil, nil } @@ -3232,7 +3201,7 @@ func (r *Reader) NewFixedSeqnumRangeDelIter(seqNum uint64) (keyspan.FragmentIter return nil, err } i := &fragmentBlockIter{elideSameSeqnum: true} - if err := i.blockIter.initHandle(r.Compare, h, seqNum, false); err != nil { + if err := i.blockIter.initHandle(r.Compare, h, r.Properties.GlobalSeqNum, false); err != nil { return nil, err } return i, nil diff --git a/table_cache.go b/table_cache.go index 9c3b9d7fc9..b0d62c6eae 100644 --- a/table_cache.go +++ b/table_cache.go @@ -170,6 +170,32 @@ func (c *tableCacheContainer) metrics() (CacheMetrics, FilterMetrics) { return m, f } +func (c *tableCacheContainer) estimateSize( + meta *fileMetadata, lower, upper []byte, +) (size uint64, err error) { + if meta.Virtual { + err = c.withVirtualReader( + meta.VirtualMeta(), + func(r sstable.VirtualReader) (err error) { + size, err = r.EstimateDiskUsage(lower, upper) + return err + }, + ) + } else { + err = c.withReader( + meta.PhysicalMeta(), + func(r *sstable.Reader) (err error) { + size, err = r.EstimateDiskUsage(lower, upper) + return err + }, + ) + } + if err != nil { + return 0, err + } + return size, nil +} + func (c *tableCacheContainer) withReader(meta physicalMeta, fn func(*sstable.Reader) error) error { s := c.tableCache.getShard(meta.FileBacking.DiskFileNum) v := s.findNode(meta.FileMetadata, &c.dbOpts) @@ -408,7 +434,6 @@ func (c *tableCacheShard) newIters( } type iterCreator interface { - NewFixedSeqnumRangeDelIter(seqNum uint64) (keyspan.FragmentIterator, error) NewRawRangeDelIter() (keyspan.FragmentIterator, error) NewIterWithBlockPropertyFiltersAndContextEtc(ctx context.Context, lower, upper []byte, filterer *sstable.BlockPropertiesFilterer, hideObsoletePoints, useFilterBlock bool, stats *base.InternalIteratorStats, rp sstable.ReaderProvider) (sstable.Iterator, error) NewCompactionIter( @@ -446,7 +471,7 @@ func (c *tableCacheShard) newIters( } switch manifest.LevelToInt(opts.level) { case 5: - rangeDelIter, err = ic.NewFixedSeqnumRangeDelIter(base.SeqNumL5RangeDel) + rangeDelIter, err = ic.NewRawRangeDelIter() case 6: // Let rangeDelIter remain nil. We don't need to return rangedels from // this file as they will not apply to any other files. For the purpose @@ -514,7 +539,7 @@ func (c *tableCacheShard) newIters( if provider.IsForeign(objMeta) { // NB: IsForeign() guarantees IsShared, so opts must not be nil as we've // already panicked on the nil case above. - pointKeySeqNum := base.PointSeqNumForLevel(manifest.LevelToInt(opts.level)) + pointKeySeqNum := base.SeqNumForLevel(manifest.LevelToInt(opts.level)) pcIter := pointCollapsingIterator{ comparer: dbOpts.opts.Comparer, merge: dbOpts.opts.Merge, diff --git a/table_stats.go b/table_stats.go index f02f76130b..b96651b057 100644 --- a/table_stats.go +++ b/table_stats.go @@ -186,6 +186,11 @@ func (d *DB) loadNewFileStats( continue } + if nf.Meta.Virtual { + // cannot load virtual table stats + continue + } + stats, newHints, err := d.loadTableStats( rs.current, nf.Level, nf.Meta.PhysicalMeta(), @@ -231,6 +236,11 @@ func (d *DB) scanReadStateTableStats( if f.StatsValid() { continue } + // TODO(bilal): Remove this guard when table stats collection is + // implemented for virtual sstables. + if f.Virtual { + continue + } // Limit how much work we do per read state. The older the read // state is, the higher the likelihood files are no longer being diff --git a/testdata/excise b/testdata/excise new file mode 100644 index 0000000000..8b0d09fcec --- /dev/null +++ b/testdata/excise @@ -0,0 +1,164 @@ + +build ext0 format=pebblev2 +set a 1 +set l 2 +---- + +ingest ext0 +---- + +lsm +---- +6: + 000004:[a#10,SET-l#10,SET] + + +batch +set d foo +set f bar +---- + +flush +---- + +lsm +---- +0.0: + 000006:[d#11,SET-f#12,SET] +6: + 000004:[a#10,SET-l#10,SET] + +excise c k +---- +would excise 2 files, use ingest-and-excise to excise. + deleted: L0 000006 + deleted: L6 000004 + added: L6 000007:[a#10,1-a#10,1] + added: L6 000008:[l#10,1-l#10,1] + + +excise a e +---- +would excise 2 files, use ingest-and-excise to excise. + deleted: L0 000006 + deleted: L6 000004 + added: L0 000009:[f#12,1-f#12,1] + added: L6 000010:[l#10,1-l#10,1] + +excise e z +---- +would excise 2 files, use ingest-and-excise to excise. + deleted: L0 000006 + deleted: L6 000004 + added: L0 000011:[d#11,1-d#11,1] + added: L6 000012:[a#10,1-a#10,1] + +excise f l +---- +would excise 2 files, use ingest-and-excise to excise. + deleted: L0 000006 + deleted: L6 000004 + added: L0 000013:[d#11,1-d#11,1] + added: L6 000014:[a#10,1-a#10,1] + added: L6 000015:[l#10,1-l#10,1] + +excise f ll +---- +would excise 2 files, use ingest-and-excise to excise. + deleted: L0 000006 + deleted: L6 000004 + added: L0 000016:[d#11,1-d#11,1] + added: L6 000017:[a#10,1-a#10,1] + +excise p q +---- +would excise 0 files, use ingest-and-excise to excise. + +lsm +---- +0.0: + 000006:[d#11,SET-f#12,SET] +6: + 000004:[a#10,SET-l#10,SET] + +build ext1 format=pebblev2 +set d foo3 +set e bar2 +---- + +ingest-and-excise ext1 excise="c-k" +---- + +lsm +---- +6: + 000018:[d#13,SET-e#13,SET] + 000019:[l#10,SET-l#10,SET] + +iter +first +next +next +next +---- +d: (foo3, .) +e: (bar2, .) +l: (2, .) +. + +# More complex cases, with the truncation of file bounds happening at rangedel +# and rangekey bounds. + +reset +---- + +build ext3 format=pebblev2 +range-key-set c f @4 foobar +---- + +ingest ext3 +---- + +build ext4 format=pebblev2 +set b bar +del-range g i +---- + +ingest ext4 +---- + +lsm +---- +0.0: + 000005:[b#11,SET-i#inf,RANGEDEL] +6: + 000004:[c#10,RANGEKEYSET-f#inf,RANGEKEYSET] + +excise f g +---- +would excise 1 files, use ingest-and-excise to excise. + deleted: L0 000005 + added: L0 000006:[b#11,1-b#11,1] + added: L0 000007:[g#11,15-i#72057594037927935,15] + +excise b c +---- +would excise 1 files, use ingest-and-excise to excise. + deleted: L0 000005 + added: L0 000008:[g#11,15-i#72057594037927935,15] + +excise i j +---- +would excise 0 files, use ingest-and-excise to excise. + +# Excise mid range key. This will not happen in practice, but excise() +# supports it. + +excise c d +---- +would excise 2 files, use ingest-and-excise to excise. + deleted: L0 000005 + deleted: L6 000004 + added: L0 000009:[b#11,1-b#11,1] + added: L0 000010:[g#11,15-i#72057594037927935,15] + added: L6 000011:[d#10,21-f#72057594037927935,21] diff --git a/version_set.go b/version_set.go index e85774c072..573f461681 100644 --- a/version_set.go +++ b/version_set.go @@ -572,6 +572,14 @@ func (vs *versionSet) logAndApply( for fileNum, size := range zombies { vs.zombieTables[fileNum] = size } + // Update the Deleted bools. We can't use the zombieTables struct for this + // as it works on FileBackings, not FileMetadatas. + for _, f := range ve.DeletedFiles { + f.Deleted = true + } + for i := range ve.NewFiles { + ve.NewFiles[i].Meta.Deleted = false + } // Install the new version. vs.append(newVersion)