Skip to content

Commit

Permalink
*: add IngestAndExcise operation
Browse files Browse the repository at this point in the history
This change adds an IngestAndExcise operation that does the below
additional things alongside a regular ingestion:

1) It ingests some SharedSSTMeta files, which are provider-backed
   sstables that could be owned by other nodes.
2) It excises existing sstables within the provided excise span (within
   which all sstables from 1 must fit) by creating new virtual sstables
   that exclude keys from the excise span.

While this change can be implemented independently of #2455, some of the
end-to-end tests in future changes will rely on both that and this.

Fixes #2520.
  • Loading branch information
itsbilal committed May 24, 2023
1 parent 5a00746 commit 8182b6a
Show file tree
Hide file tree
Showing 12 changed files with 1,128 additions and 81 deletions.
17 changes: 14 additions & 3 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -2558,9 +2558,20 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
info.Duration = d.timeNow().Sub(startTime)
if err == nil {
d.mu.versions.logLock()
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
return d.getInProgressCompactionInfoLocked(c)
})
// Confirm if any of this compaction's inputs were deleted while this
// compaction was ongoing.
for i := range c.inputs {
c.inputs[i].files.Each(func(m *manifest.FileMetadata) {
if m.Deleted {
err = firstError(err, errors.New("pebble: file deleted by a concurrent operation, will retry compaction"))
}
})
}
if err == nil {
err = d.mu.versions.logAndApply(jobID, ve, c.metrics, false /* forceRotation */, func() []compactionInfo {
return d.getInProgressCompactionInfoLocked(c)
})
}
if err != nil {
// TODO(peter): untested.
for _, f := range pendingOutputs {
Expand Down
28 changes: 27 additions & 1 deletion data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,6 +1167,32 @@ func (d *DB) waitTableStats() {
}
}

func runIngestAndExciseCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
var exciseSpan KeyRange
paths := make([]string, 0, len(td.CmdArgs))
for i, arg := range td.CmdArgs {
switch td.CmdArgs[i].Key {
case "excise":
if len(td.CmdArgs[i].Vals) != 1 {
return errors.New("expected 2 values for excise separated by -, eg. ingest-and-excise foo1 excise=\"start-end\"")
}
fields := strings.Split(td.CmdArgs[i].Vals[0], "-")
if len(fields) != 2 {
return errors.New("expected 2 values for excise separated by -, eg. ingest-and-excise foo1 excise=\"start-end\"")
}
exciseSpan.Start = []byte(fields[0])
exciseSpan.End = []byte(fields[1])
default:
paths = append(paths, arg.String())
}
}

if _, err := d.IngestAndExcise(paths, nil /* shared */, exciseSpan); err != nil {
return err
}
return nil
}

func runIngestCmd(td *datadriven.TestData, d *DB, fs vfs.FS) error {
paths := make([]string, 0, len(td.CmdArgs))
for _, arg := range td.CmdArgs {
Expand Down Expand Up @@ -1205,7 +1231,7 @@ func runForceIngestCmd(td *datadriven.TestData, d *DB) error {
*fileMetadata,
) (int, error) {
return level, nil
})
}, nil, KeyRange{})
return err
}

Expand Down
7 changes: 4 additions & 3 deletions flushable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,10 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {

// We can reuse the ingestLoad function for this test even if we're
// not actually ingesting a file.
meta, paths, err := ingestLoad(
d.opts, d.FormatMajorVersion(), paths, d.cacheID, pendingOutputs,
lr, err := ingestLoad(
d.opts, d.FormatMajorVersion(), paths, nil /* shared */, d.cacheID, pendingOutputs, d.objProvider,
)
meta := lr.localMeta
if err != nil {
panic(err)
}
Expand All @@ -70,7 +71,7 @@ func TestIngestedSSTFlushableAPI(t *testing.T) {
}

// Verify the sstables do not overlap.
if err := ingestSortAndVerify(d.cmp, meta, paths); err != nil {
if err := ingestSortAndVerify(d.cmp, lr, KeyRange{}); err != nil {
panic("unsorted sstables")
}

Expand Down
Loading

0 comments on commit 8182b6a

Please sign in to comment.