Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: add IngestAndExcise operation #2538

Merged
merged 1 commit into from
Jun 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1333,10 +1333,10 @@ func (c *compaction) newInputIter(
// internal iterator interface). The resulting merged rangedel iterator is
// then included with the point levels in a single mergingIter.
newRangeDelIter := func(
f manifest.LevelFile, _ *IterOptions, bytesIterated *uint64,
f manifest.LevelFile, _ *IterOptions, l manifest.Level, bytesIterated *uint64,
) (keyspan.FragmentIterator, error) {
iter, rangeDelIter, err := newIters(context.Background(), f.FileMetadata,
nil /* iter options */, internalIterOpts{bytesIterated: &c.bytesIterated})
&IterOptions{level: l}, internalIterOpts{bytesIterated: &c.bytesIterated})
if err == nil {
// TODO(peter): It is mildly wasteful to open the point iterator only to
// immediately close it. One way to solve this would be to add new
Expand Down Expand Up @@ -1459,7 +1459,7 @@ func (c *compaction) newInputIter(
// mergingIter.
iter := level.files.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
rangeDelIter, err := newRangeDelIter(iter.Take(), nil, &c.bytesIterated)
rangeDelIter, err := newRangeDelIter(iter.Take(), nil, l, &c.bytesIterated)
if err != nil {
// The error will already be annotated with the BackingFileNum, so
// we annotate it with the FileNum.
Expand Down Expand Up @@ -1501,7 +1501,7 @@ func (c *compaction) newInputIter(
}
return iter, err
}
li.Init(keyspan.SpanIterOptions{}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange)
li.Init(keyspan.SpanIterOptions{Level: l}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, manifest.KeyTypeRange)
rangeKeyIters = append(rangeKeyIters, li)
}
return nil
Expand Down Expand Up @@ -2564,9 +2564,20 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
info.Duration = d.timeNow().Sub(startTime)
if err == nil {
d.mu.versions.logLock()
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
return d.getInProgressCompactionInfoLocked(c)
})
// Confirm if any of this compaction's inputs were deleted while this
// compaction was ongoing.
for i := range c.inputs {
c.inputs[i].files.Each(func(m *manifest.FileMetadata) {
if m.Deleted {
err = firstError(err, errors.New("pebble: file deleted by a concurrent operation, will retry compaction"))
}
})
}
if err == nil {
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
return d.getInProgressCompactionInfoLocked(c)
})
}
if err != nil {
// TODO(peter): untested.
for _, f := range pendingOutputs {
Expand Down
28 changes: 27 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,32 @@ func (d *DB) waitTableStats() {
}
}

func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
var exciseSpan KeyRange
paths := make([]string, 0, len(td.CmdArgs))
for i, arg := range td.CmdArgs {
switch td.CmdArgs[i].Key {
case "excise":
if len(td.CmdArgs[i].Vals) != 1 {
return errors.New("expected 2 values for excise separated by -, eg. ingest-and-excise foo1 excise=\"start-end\"")
}
fields := strings.Split(td.CmdArgs[i].Vals[0], "-")
if len(fields) != 2 {
return errors.New("expected 2 values for excise separated by -, eg. ingest-and-excise foo1 excise=\"start-end\"")
}
exciseSpan.Start = []byte(fields[0])
exciseSpan.End = []byte(fields[1])
default:
paths = append(paths, arg.String())
}
}

if _, err := d.IngestAndExcise(paths, nil /* shared */, exciseSpan); err != nil {
return err
}
return nil
}

func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
paths := make([]string, 0, len(td.CmdArgs))
for _, arg := range td.CmdArgs {
Expand Down Expand Up @@ -1212,7 +1238,7 @@ func runForceIngestCmd(td *datadriven.TestData, d *DB) error {
*fileMetadata,
) (int, error) {
return level, nil
})
}, nil, KeyRange{})
return err
}

Expand Down
9 changes: 4 additions & 5 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {

// We can reuse the ingestLoad function for this test even if we're
// not actually ingesting a file.
meta, paths, err := ingestLoad(
d.opts, d.FormatMajorVersion(), paths, d.cacheID, pendingOutputs,
)
lr, err := ingestLoad(d.opts, d.FormatMajorVersion(), paths, nil, d.cacheID, pendingOutputs)
meta := lr.localMeta
if err != nil {
panic(err)
}
Expand All @@ -70,7 +69,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
}

// Verify the sstables do not overlap.
if err := ingestSortAndVerify(d.cmp, meta, paths); err != nil {
if err := ingestSortAndVerify(d.cmp, lr, KeyRange{}); err != nil {
panic("unsorted sstables")
}

Expand All @@ -79,7 +78,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
// (e.g. because the files reside on a different filesystem), ingestLink will
// fall back to copying, and if that fails we undo our work and return an
// error.
if err := ingestLink(jobID, d.opts, d.objProvider, paths, meta); err != nil {
if err := ingestLink(jobID, d.opts, d.objProvider, lr, nil /* shared */); err != nil {
panic("couldn't hard link sstables")
}

Expand Down
Loading