Skip to content

Commit

Permalink
db: use delete-only compaction hints to do excises
Browse files Browse the repository at this point in the history
Previously, we'd only act on delete-only compaction hints if
the entirety of a file was covered by a delete-only compaction
hint. This change leverages the fact that we can now partially
excise files using virtual sstables, to also use delete-only
compactions to do excises of partial sstables and reduce the
w-amp impact of writing rangedels and rangekeydels.

Fixes #3820.
  • Loading branch information
itsbilal committed Sep 6, 2024
1 parent fc6eab0 commit e473e96
Show file tree
Hide file tree
Showing 6 changed files with 426 additions and 51 deletions.
220 changes: 182 additions & 38 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math"
"runtime/pprof"
"slices"
"sort"
"sync/atomic"
"time"

Expand Down Expand Up @@ -251,6 +252,11 @@ type compaction struct {
// lower level in the LSM during runCompaction.
allowedZeroSeqNum bool

// deletionHints are set if this is a compactionKindDeleteOnly. Used to figure
// out whether an input must be deleted in its entirety, or excised into
// virtual sstables.
deletionHints []deleteCompactionHint

metrics map[int]*LevelMetrics

pickerMetrics compactionPickerMetrics
Expand Down Expand Up @@ -390,18 +396,23 @@ func newCompaction(
}

func newDeleteOnlyCompaction(
opts *Options, cur *version, inputs []compactionLevel, beganAt time.Time,
opts *Options,
cur *version,
inputs []compactionLevel,
beganAt time.Time,
hints []deleteCompactionHint,
) *compaction {
c := &compaction{
kind: compactionKindDeleteOnly,
cmp: opts.Comparer.Compare,
equal: opts.Comparer.Equal,
comparer: opts.Comparer,
formatKey: opts.Comparer.FormatKey,
logger: opts.Logger,
version: cur,
beganAt: beganAt,
inputs: inputs,
kind: compactionKindDeleteOnly,
cmp: opts.Comparer.Compare,
equal: opts.Comparer.Equal,
comparer: opts.Comparer,
formatKey: opts.Comparer.FormatKey,
logger: opts.Logger,
version: cur,
beganAt: beganAt,
inputs: inputs,
deletionHints: hints,
}

// Set c.smallest, c.largest.
Expand Down Expand Up @@ -1013,10 +1024,13 @@ func (d *DB) clearCompactingState(c *compaction, rollback bool) {
d.opts.Logger.Fatalf("L%d->L%d: %s not being compacted", c.startLevel.level, c.outputLevel.level, f.FileNum)
}
if !rollback {
// On success all compactions other than move-compactions transition the
// file into the Compacted state. Move-compacted files become eligible
// for compaction again and transition back to NotCompacting.
if c.kind != compactionKindMove {
// On success all compactions other than move and delete-only compactions
// transition the file into the Compacted state. Move-compacted files
// become eligible for compaction again and transition back to NotCompacting.
// Delete-only compactions could, on rare occasion, leave files untouched
// (eg. if files have a loose bound), so we revert them all to NotCompacting
// just in case they need to be compacted again.
if c.kind != compactionKindMove && c.kind != compactionKindDeleteOnly {
f.SetCompactionState(manifest.CompactionStateCompacted)
} else {
f.SetCompactionState(manifest.CompactionStateNotCompacting)
Expand Down Expand Up @@ -1282,7 +1296,7 @@ func (d *DB) runIngestFlush(c *compaction) (*manifest.VersionEdit, error) {
iter := overlaps.Iter()

for m := iter.First(); m != nil; m = iter.Next() {
newFiles, err := d.excise(context.TODO(), ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l)
newFiles, err := d.excise(context.TODO(), ingestFlushable.exciseSpan.UserKeyBounds(), m, ve, l, d.mu.versions.getNextFileNum)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1759,11 +1773,11 @@ func (d *DB) maybeScheduleCompactionPicker(
func (d *DB) tryScheduleDeleteOnlyCompaction() {
v := d.mu.versions.currentVersion()
snapshots := d.mu.snapshots.toSlice()
inputs, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots)
inputs, resolvedHints, unresolvedHints := checkDeleteCompactionHints(d.cmp, v, d.mu.compact.deletionHints, snapshots, d.FormatMajorVersion())
d.mu.compact.deletionHints = unresolvedHints

if len(inputs) > 0 {
c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow())
c := newDeleteOnlyCompaction(d.opts, v, inputs, d.timeNow(), resolvedHints)
d.mu.compact.compactingCount++
d.addInProgressCompaction(c)
go d.compact(c, nil)
Expand Down Expand Up @@ -1914,8 +1928,8 @@ func (h deleteCompactionHint) String() string {
)
}

func (h *deleteCompactionHint) canDelete(
cmp Compare, m *fileMetadata, snapshots compact.Snapshots,
func (h *deleteCompactionHint) canDeleteOrExcise(
cmp Compare, m *fileMetadata, snapshots compact.Snapshots, fmv FormatMajorVersion,
) bool {
// The file can only be deleted if all of its keys are older than the
// earliest tombstone aggregated into the hint. Note that we use
Expand Down Expand Up @@ -1960,18 +1974,30 @@ func (h *deleteCompactionHint) canDelete(
default:
panic(fmt.Sprintf("pebble: unknown delete compaction hint type: %d", h.hintType))
}

// The file's keys must be completely contained within the hint range.
return cmp(h.start, m.Smallest.UserKey) <= 0 && cmp(m.Largest.UserKey, h.end) < 0
if fmv < FormatVirtualSSTables {
// The file's keys must be completely contained within the hint range.
return cmp(h.start, m.Smallest.UserKey) <= 0 &&
base.UserKeyExclusive(h.end).CompareUpperBounds(cmp, m.UserKeyBounds().End) >= 0
}
// Check for any overlap. In cases of partial overlap, we excise the part of the file
// that overlaps with the deletion hint.
return cmp(h.end, m.Smallest.UserKey) > 0 &&
(m.UserKeyBounds().End.CompareUpperBounds(cmp, base.UserKeyInclusive(h.start)) >= 0)
}

func checkDeleteCompactionHints(
cmp Compare, v *version, hints []deleteCompactionHint, snapshots compact.Snapshots,
) ([]compactionLevel, []deleteCompactionHint) {
cmp Compare,
v *version,
hints []deleteCompactionHint,
snapshots compact.Snapshots,
fmv FormatMajorVersion,
) (levels []compactionLevel, resolved, unresolved []deleteCompactionHint) {
var files map[*fileMetadata]bool
var byLevel [numLevels][]*fileMetadata

unresolvedHints := hints[:0]
// Lazily populate resolvedHints, similar to files above.
resolvedHints := make([]deleteCompactionHint, 0)
for _, h := range hints {
// Check each compaction hint to see if it's resolvable. Resolvable
// hints are removed and trigger a delete-only compaction if any files
Expand Down Expand Up @@ -2025,7 +2051,7 @@ func checkDeleteCompactionHints(
overlaps := v.Overlaps(l, base.UserKeyBoundsEndExclusive(h.start, h.end))
iter := overlaps.Iter()
for m := iter.First(); m != nil; m = iter.Next() {
if m.IsCompacting() || !h.canDelete(cmp, m, snapshots) || files[m] {
if m.IsCompacting() || !h.canDeleteOrExcise(cmp, m, snapshots, fmv) || files[m] {
continue
}
if files == nil {
Expand All @@ -2037,6 +2063,7 @@ func checkDeleteCompactionHints(
byLevel[l] = append(byLevel[l], m)
}
}
resolvedHints = append(resolvedHints, h)
}

var compactLevels []compactionLevel
Expand All @@ -2049,7 +2076,7 @@ func checkDeleteCompactionHints(
files: manifest.NewLevelSliceKeySorted(cmp, files),
})
}
return compactLevels, unresolvedHints
return compactLevels, resolvedHints, unresolvedHints
}

// compact runs one compaction and maybe schedules another call to compact.
Expand Down Expand Up @@ -2349,24 +2376,137 @@ func (d *DB) runCopyCompaction(
return ve, compact.Stats{}, nil
}

func (d *DB) runDeleteOnlyCompactionForLevel(
c *compaction,
cl compactionLevel,
levelMetrics *LevelMetrics,
ve *versionEdit,
nextFileNum func() base.FileNum,
snapshots compact.Snapshots,
) error {
iter := cl.files.Iter()
// c.deletionHints[curHint] is the hint that overlaps with f. There
// could be more than one hint overlapping with f.
curHint := 0
f := iter.First()
// curFile usually matches f, except if f got excised in which case
// it maps to the "current state" of f.
curFile := f

for f != nil && curHint < len(c.deletionHints) {
h := c.deletionHints[curHint]
if h.tombstoneLevel >= cl.level {
// We cannot excise out the deletion tombstone itself, or anything
// above it.
curHint++
continue
}
if d.cmp(h.end, f.Smallest.UserKey) <= 0 || !h.canDeleteOrExcise(d.cmp, f, snapshots, d.FormatMajorVersion()) {
curHint++
continue
}
if curFile.UserKeyBounds().End.CompareUpperBounds(d.cmp, base.UserKeyInclusive(h.start)) < 0 {
if _, ok := ve.DeletedFiles[deletedFileEntry{
Level: cl.level,
FileNum: f.FileNum,
}]; !ok && f == curFile {
panic("pebble: deletion only compaction scheduled with a hint that didn't delete a file")
}
f = iter.Next()
curFile = f
continue
}

// The hint overlaps with at least part of the file.
if curFile == f && d.cmp(h.start, f.Smallest.UserKey) <= 0 &&
base.UserKeyExclusive(h.end).CompareUpperBounds(d.cmp, f.UserKeyBounds().End) >= 0 {
// The hint deletes the entirety of this file.
ve.DeletedFiles[deletedFileEntry{
Level: cl.level,
FileNum: f.FileNum,
}] = f
f = iter.Next()
curFile = f
levelMetrics.TablesDeleted++
continue
}
// The hint overlaps with only a part of the file, not the entirety of it. We need
// to use d.excise.
if d.FormatMajorVersion() < FormatVirtualSSTables {
panic("pebble: format major version too low to do an excise in deletion only compaction")
}
levelMetrics.TablesExcised++
newFiles, err := d.excise(context.TODO(), base.UserKeyBoundsEndExclusive(h.start, h.end), curFile, ve, cl.level, nextFileNum)
if err != nil {
return errors.Wrap(err, "error when running excise for delete-only compaction")
}
if _, ok := ve.DeletedFiles[deletedFileEntry{
Level: cl.level,
FileNum: curFile.FileNum,
}]; !ok {
// This hint did not touch this file. Try the next hint.
curHint++
continue
}
if curFile != f {
// We excised a file after having previously excised it. Remove curFile from both
// deletedFiles and from ve.NewFiles.
delete(ve.DeletedFiles, deletedFileEntry{
Level: cl.level,
FileNum: curFile.FileNum,
})
for i := range ve.NewFiles {
if ve.NewFiles[i].Meta == curFile {
copy(ve.NewFiles[i:], ve.NewFiles[i+1:])
ve.NewFiles = ve.NewFiles[:len(ve.NewFiles)-1]
break
}
}
}
if len(newFiles) == 0 {
// The entirety of this file was deleted by excise.
f = iter.Next()
curFile = f
continue
}
// Since the deletion hints are sorted by start keys, we can set curFile to the
// last of the new files. We could potentially excise this twice.
curFile = newFiles[len(newFiles)-1].Meta
}
return nil
}

// Runs a delete-only compaction.
//
// d.mu must *not* be held when calling this.
func (d *DB) runDeleteOnlyCompaction(
jobID JobID, c *compaction,
jobID JobID, c *compaction, snapshots compact.Snapshots,
) (ve *versionEdit, stats compact.Stats, retErr error) {
nextFileNumGetter := func() base.FileNum {
d.mu.Lock()
defer d.mu.Unlock()
return d.mu.versions.getNextFileNum()
}

c.metrics = make(map[int]*LevelMetrics, len(c.inputs))
// Sort the deletion hints by start key. runDeleteOnlyCompactionForLevel
// requires this.
sort.Slice(c.deletionHints, func(i, j int) bool {
return d.cmp(c.deletionHints[i].start, c.deletionHints[j].start) < 0
})
ve = &versionEdit{
DeletedFiles: map[deletedFileEntry]*fileMetadata{},
}
for _, cl := range c.inputs {
levelMetrics := &LevelMetrics{}
iter := cl.files.Iter()
for f := iter.First(); f != nil; f = iter.Next() {
ve.DeletedFiles[deletedFileEntry{
Level: cl.level,
FileNum: f.FileNum,
}] = f
if err := d.runDeleteOnlyCompactionForLevel(c, cl, levelMetrics, ve, nextFileNumGetter, snapshots); err != nil {
return nil, stats, err
}
c.metrics[cl.level] = levelMetrics
}
// Refresh the disk available statistic whenever a compaction/flush
// completes, before re-acquiring the mutex.
d.calculateDiskAvailableBytes()
return ve, stats, nil
}

Expand Down Expand Up @@ -2409,9 +2549,17 @@ func (d *DB) runMoveCompaction(
func (d *DB) runCompaction(
jobID JobID, c *compaction,
) (ve *versionEdit, stats compact.Stats, retErr error) {
if c.cancel.Load() {
return ve, stats, ErrCancelledCompaction
}
switch c.kind {
case compactionKindDeleteOnly:
return d.runDeleteOnlyCompaction(jobID, c)
// Release the d.mu lock while doing I/O.
// Note the unusual order: Unlock and then Lock.
snapshots := d.mu.snapshots.toSlice()
d.mu.Unlock()
defer d.mu.Lock()
return d.runDeleteOnlyCompaction(jobID, c, snapshots)
case compactionKindMove:
return d.runMoveCompaction(jobID, c)
case compactionKindCopy:
Expand All @@ -2434,10 +2582,6 @@ func (d *DB) runCompaction(
defer vers.UnrefLocked()
}

if c.cancel.Load() {
return ve, stats, ErrCancelledCompaction
}

// The table is typically written at the maximum allowable format implied by
// the current format major version of the DB.
tableFormat := d.FormatMajorVersion().MaxTableFormat()
Expand Down
8 changes: 6 additions & 2 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,18 @@ func (i CompactionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
w.Printf("[JOB %d] compacting(%s) ",
redact.Safe(i.JobID),
redact.SafeString(i.Reason))
w.Printf("%s", i.Annotations)
if len(i.Annotations) > 0 {
w.Printf("%s ", i.Annotations)
}
w.Printf("%s; ", levelInfos(i.Input))
w.Printf("OverlappingRatio: Single %.2f, Multi %.2f", i.SingleLevelOverlappingRatio, i.MultiLevelOverlappingRatio)
return
}
outputSize := tablesTotalSize(i.Output.Tables)
w.Printf("[JOB %d] compacted(%s) ", redact.Safe(i.JobID), redact.SafeString(i.Reason))
w.Printf("%s", i.Annotations)
if len(i.Annotations) > 0 {
w.Printf("%s ", i.Annotations)
}
w.Print(levelInfos(i.Input))
w.Printf(" -> L%d [%s] (%s), in %.1fs (%.1fs total), output rate %s/s",
redact.Safe(i.Output.Level),
Expand Down
Loading

0 comments on commit e473e96

Please sign in to comment.