Skip to content

Commit

Permalink
*: add IngestAndExcise operation
Browse files Browse the repository at this point in the history
This change adds an IngestAndExcise operation that does the below
additional things alongside a regular ingestion:

1) It ingests some SharedSSTMeta files, which are provider-backed
   sstables that could be owned by other nodes.
2) It excises existing sstables within the provided excise span (within
   which all sstables from 1 must fit) by creating new virtual sstables
   that exclude keys from the excise span.

While this change can be implemented independently of #2455, some of the
end-to-end tests in future changes will rely on both that and this.

Fixes #2520.
  • Loading branch information
itsbilal committed Jun 12, 2023
1 parent 898fb2f commit 084bcfe
Show file tree
Hide file tree
Showing 16 changed files with 1,318 additions and 193 deletions.
25 changes: 18 additions & 7 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,10 +1333,10 @@ func (c *compaction) newInputIter(
// internal iterator interface). The resulting merged rangedel iterator is
// then included with the point levels in a single mergingIter.
newRangeDelIter := func(
f manifest.LevelFile, _ *IterOptions, bytesIterated *uint64,
f manifest.LevelFile, _ *IterOptions, l manifest.Level, bytesIterated *uint64,
) (keyspan.FragmentIterator, error) {
iter, rangeDelIter, err := newIters(context.Background(), f.FileMetadata,
nil /* iter options */, internalIterOpts{bytesIterated: &c.bytesIterated})
&IterOptions{level: l}, internalIterOpts{bytesIterated: &c.bytesIterated})
if err == nil {
// TODO(peter): It is mildly wasteful to open the point iterator only to
// immediately close it. One way to solve this would be to add new
Expand Down Expand Up @@ -1459,7 +1459,7 @@ func (c *compaction) newInputIter(
// mergingIter.
iter := level.files.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
rangeDelIter, err := newRangeDelIter(iter.Take(), nil, &c.bytesIterated)
rangeDelIter, err := newRangeDelIter(iter.Take(), nil, l, &c.bytesIterated)
if err != nil {
// The error will already be annotated with the BackingFileNum, so
// we annotate it with the FileNum.
Expand Down Expand Up @@ -1501,7 +1501,7 @@ func (c *compaction) newInputIter(
}
return iter, err
}
li.Init(keyspan.SpanIterOptions{}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange)
li.Init(keyspan.SpanIterOptions{Level: l}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange)
rangeKeyIters = append(rangeKeyIters, li)
}
return nil
Expand Down Expand Up @@ -2564,9 +2564,20 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
info.Duration = d.timeNow().Sub(startTime)
if err == nil {
d.mu.versions.logLock()
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
return d.getInProgressCompactionInfoLocked(c)
})
// Confirm if any of this compaction's inputs were deleted while this
// compaction was ongoing.
for i := range c.inputs {
c.inputs[i].files.Each(func(m *manifest.FileMetadata) {
if m.Deleted {
err = firstError(err, errors.New("pebble: file deleted by a concurrent operation, will retry compaction"))
}
})
}
if err == nil {
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
return d.getInProgressCompactionInfoLocked(c)
})
}
if err != nil {
// TODO(peter): untested.
for _, f := range pendingOutputs {
Expand Down
28 changes: 27 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,32 @@ func (d *DB) waitTableStats() {
}
}

func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
var exciseSpan KeyRange
paths := make([]string, 0, len(td.CmdArgs))
for i, arg := range td.CmdArgs {
switch td.CmdArgs[i].Key {
case "excise":
if len(td.CmdArgs[i].Vals) != 1 {
return errors.New("expected 2 values for excise separated by -, eg. ingest-and-excise foo1 excise=\"start-end\"")
}
fields := strings.Split(td.CmdArgs[i].Vals[0], "-")
if len(fields) != 2 {
return errors.New("expected 2 values for excise separated by -, eg. ingest-and-excise foo1 excise=\"start-end\"")
}
exciseSpan.Start = []byte(fields[0])
exciseSpan.End = []byte(fields[1])
default:
paths = append(paths, arg.String())
}
}

if _, err := d.IngestAndExcise(paths, nil /* shared */, exciseSpan); err != nil {
return err
}
return nil
}

func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
paths := make([]string, 0, len(td.CmdArgs))
for _, arg := range td.CmdArgs {
Expand Down Expand Up @@ -1212,7 +1238,7 @@ func runForceIngestCmd(td *datadriven.TestData, d *DB) error {
*fileMetadata,
) (int, error) {
return level, nil
})
}, nil, KeyRange{})
return err
}

Expand Down
9 changes: 4 additions & 5 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {

// We can reuse the ingestLoad function for this test even if we're
// not actually ingesting a file.
meta, paths, err := ingestLoad(
d.opts, d.FormatMajorVersion(), paths, d.cacheID, pendingOutputs,
)
lr, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, nil, d.cacheID, pendingOutputs)
meta := lr.localMeta
if err != nil {
panic(err)
}
Expand All @@ -70,7 +69,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
}

// Verify the sstables do not overlap.
if err := ingestSortAndVerify(d.cmp, meta, paths); err != nil {
if err := ingestSortAndVerify(d.cmp, lr, KeyRange{}); err != nil {
panic("unsorted sstables")
}

Expand All @@ -79,7 +78,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
// (e.g. because the files reside on a different filesystem), ingestLink will
// fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil {
if err := ingestLink(jobID, d.opts, d.objProvider, lr, nil /* shared */); err != nil {
panic("couldn't hard link sstables")
}

Expand Down
Loading

0 comments on commit 084bcfe

Please sign in to comment.