Skip to content

Commit

Permalink
*: Introduce read triggered compaction heuristic
Browse files Browse the repository at this point in the history
Previously, our compactions were triggered on write based heuristics.
This change introduces compactions based on high read activity to improve
read performance in read-heavy workloads. It is inspired from LevelDB's
read based compaction heuristic which is outlined in the following issue:
cockroachdb#29.

These compactions are triggered using an `AllowedSeeks` parameter on
each file's `FileMetadata`. Reads are sampled based on a rate defined in
`iterator.MaybeSampleRead()`. If a read is sampled, `AllowedSeeks` is
decremented on the file in the top-most level containing the key. Once
`AllowedSeeks` reaches 0, a compaction for the key range is scheduled.

Read triggered compactions are only considered if no other compaction is
possible at the time. This helps prioritize score based compactions to
maintain a healthy LSM shape.

The results of this change in benchmarks are outlined in the github
issue: cockroachdb#29.
  • Loading branch information
aadityasondhi committed Dec 10, 2020
1 parent e6d62a7 commit 19cbfd1
Show file tree
Hide file tree
Showing 19 changed files with 1,010 additions and 56 deletions.
2 changes: 1 addition & 1 deletion cmd/pebble/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func runReplay(cmd *cobra.Command, args []string) error {
// read amplification.
var bve manifest.BulkVersionEdit
bve.Accumulate(&li.ve)
ref, _, err = bve.Apply(ref, mvccCompare, nil, 0)
ref, _, err = bve.Apply(ref, mvccCompare, nil, 0, 0)
if err != nil {
return err
}
Expand Down
28 changes: 24 additions & 4 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,14 @@ type manualCompaction struct {
end InternalKey
}

type readCompaction struct {
level int
// Key ranges are used instead of file handles as versions could change
// between the read sampling and scheduling a compaction.
start []byte
end []byte
}

func (d *DB) addInProgressCompaction(c *compaction) {
d.mu.compact.inProgress[c] = struct{}{}
var isBase, isIntraL0 bool
Expand Down Expand Up @@ -1263,6 +1271,15 @@ func (d *DB) maybeScheduleFlush() {
return
}

if !d.passedFlushThreshold() {
return
}

d.mu.compact.flushing = true
go d.flush()
}

func (d *DB) passedFlushThreshold() bool {
var n int
var size uint64
for ; n < len(d.mu.mem.queue)-1; n++ {
Expand All @@ -1279,7 +1296,7 @@ func (d *DB) maybeScheduleFlush() {
}
if n == 0 {
// None of the immutable memtables are ready for flushing.
return
return false
}

// Only flush once the sum of the queued memtable sizes exceeds half the
Expand All @@ -1288,11 +1305,10 @@ func (d *DB) maybeScheduleFlush() {
// DB.newMemTable().
minFlushSize := uint64(d.opts.MemTableSize) / 2
if size < minFlushSize {
return
return false
}

d.mu.compact.flushing = true
go d.flush()
return true
}

func (d *DB) maybeScheduleDelayedFlush(tbl *memTable) {
Expand Down Expand Up @@ -1577,6 +1593,10 @@ func (d *DB) maybeScheduleCompactionPicker(

for !d.opts.private.disableAutomaticCompactions && d.mu.compact.compactingCount < d.opts.MaxConcurrentCompactions {
env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
env.readTriggeredCompactionEnv = readTriggeredCompactionEnv{
readCompactions: &d.mu.compact.readCompactions,
flushing: d.mu.compact.flushing || d.passedFlushThreshold(),
}
pc := pickFunc(d.mu.versions.picker, env)
if pc == nil {
break
Expand Down
94 changes: 85 additions & 9 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import (
const minIntraL0Count = 4

type compactionEnv struct {
bytesCompacted *uint64
earliestUnflushedSeqNum uint64
earliestSnapshotSeqNum uint64
inProgressCompactions []compactionInfo
bytesCompacted *uint64
earliestUnflushedSeqNum uint64
earliestSnapshotSeqNum uint64
inProgressCompactions []compactionInfo
readTriggeredCompactionEnv readTriggeredCompactionEnv
}

type compactionPicker interface {
Expand All @@ -34,10 +35,16 @@ type compactionPicker interface {
pickAuto(env compactionEnv) (pc *pickedCompaction)
pickManual(env compactionEnv, manual *manualCompaction) (c *pickedCompaction, retryLater bool)
pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction)

pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction)
forceBaseLevel1()
}

// readTriggeredCompactionEnv is used to hold data required to perform read compactions
type readTriggeredCompactionEnv struct {
readCompactions *[]readCompaction
flushing bool
}

// Information about in-progress compactions provided to the compaction picker. These are used to
// constrain the new compactions that will be picked.
type compactionInfo struct {
Expand Down Expand Up @@ -408,10 +415,7 @@ func expandToAtomicUnit(
}

func newCompactionPicker(
v *version,
opts *Options,
inProgressCompactions []compactionInfo,
levelSizes [numLevels]int64,
v *version, opts *Options, inProgressCompactions []compactionInfo, levelSizes [numLevels]int64,
) compactionPicker {
p := &compactionPickerByScore{
opts: opts,
Expand Down Expand Up @@ -990,6 +994,11 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
if pc := p.pickElisionOnlyCompaction(env); pc != nil {
return pc
}

if pc := p.pickReadTriggeredCompaction(env); pc != nil {
return pc
}

return nil
}

Expand Down Expand Up @@ -1322,6 +1331,73 @@ func pickManualHelper(
return pc
}

func (p *compactionPickerByScore) pickReadTriggeredCompaction(
env compactionEnv,
) (pc *pickedCompaction) {
// If a flush is in-progress or expected to happen soon, it means more writes are taking place. We would
// then be scheduling more write focussed compactions. In this case, skip read compactions as they are
// lower priority.
if env.readTriggeredCompactionEnv.flushing || env.readTriggeredCompactionEnv.readCompactions == nil {
return nil
}
for len(*env.readTriggeredCompactionEnv.readCompactions) > 0 {
rc := (*env.readTriggeredCompactionEnv.readCompactions)[0]
*env.readTriggeredCompactionEnv.readCompactions = (*env.readTriggeredCompactionEnv.readCompactions)[1:]
if pc = pickReadTriggeredCompactionHelper(p, &rc, env); pc != nil {
break
}
}
return pc
}

func pickReadTriggeredCompactionHelper(
p *compactionPickerByScore, rc *readCompaction, env compactionEnv,
) (pc *pickedCompaction) {
cmp := p.opts.Comparer.Compare
overlapSlice := p.vers.Overlaps(rc.level, cmp, rc.start, rc.end)
if overlapSlice.Empty() {
var shouldCompact bool
// If the file for the given key range has moved levels since the compaction
// was scheduled, check to see if the range still has overlapping files
overlapSlice, shouldCompact = updateReadCompaction(p.vers, cmp, rc)
if !shouldCompact {
return nil
}
}
pc = newPickedCompaction(p.opts, p.vers, rc.level, p.baseLevel)
pc.startLevel.files = overlapSlice
if !pc.setupInputs() {
return nil
}
if inputRangeAlreadyCompacting(env, pc) {
return nil
}
return pc
}

func updateReadCompaction(
vers *version, cmp Compare, rc *readCompaction,
) (slice manifest.LevelSlice, shouldCompact bool) {
numOverlap, topLevel := 0, 0
var topOverlaps manifest.LevelSlice
for l := 0; l < numLevels; l++ {
overlaps := vers.Overlaps(l, cmp, rc.start, rc.end)
if !overlaps.Empty() {
numOverlap++
if numOverlap >= 2 {
break
}
topOverlaps = overlaps
topLevel = l
}
}
if numOverlap >= 2 {
rc.level = topLevel
return topOverlaps, true
}
return manifest.LevelSlice{}, false
}

func (p *compactionPickerByScore) forceBaseLevel1() {
p.baseLevel = 1
}
Expand Down
118 changes: 118 additions & 0 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ func (p *compactionPickerForTesting) pickManual(
return pickManualHelper(p.opts, manual, p.vers, p.baseLevel), false
}

func (p *compactionPickerForTesting) pickReadTriggeredCompaction(
env compactionEnv,
) (pc *pickedCompaction) {
return nil
}

func TestPickCompaction(t *testing.T) {
fileNums := func(files manifest.LevelSlice) string {
var ss []string
Expand Down Expand Up @@ -1634,6 +1640,118 @@ func TestCompactionTombstoneElisionOnly(t *testing.T) {
})
}

func TestCompactionReadTriggered(t *testing.T) {
var d *DB
var compactInfo *CompactionInfo // protected by d.mu

compactionString := func() string {
for d.mu.compact.compactingCount > 0 {
d.mu.compact.cond.Wait()
}

s := "(none)"
if compactInfo != nil {
// JobID's aren't deterministic, especially w/ table stats
// enabled. Use a fixed job ID for data-driven test output.
compactInfo.JobID = 100
s = compactInfo.String()
compactInfo = nil
}
return s
}

datadriven.RunTest(t, "testdata/compaction_read_triggered",
func(td *datadriven.TestData) string {
switch td.Cmd {
case "define":
if d != nil {
compactInfo = nil
if err := d.Close(); err != nil {
return err.Error()
}
}
opts := &Options{
FS: vfs.NewMem(),
DebugCheck: DebugCheckLevels,
EventListener: EventListener{
CompactionEnd: func(info CompactionInfo) {
compactInfo = &info
},
},
}
var err error
d, err = runDBDefineCmd(td, opts)
if err != nil {
return err.Error()
}
d.mu.Lock()
t := time.Now()
d.timeNow = func() time.Time {
t = t.Add(time.Second)
return t
}
s := d.mu.versions.currentVersion().DebugString(base.DefaultFormatter)
d.mu.Unlock()
return s

case "set-read-compaction":
d.mu.Lock()
for _, arg := range td.CmdArgs {
switch arg.Key {
case "flushing":
switch arg.Vals[0] {
case "true":
d.mu.compact.flushing = true
case "false":
d.mu.compact.flushing = false
default:
d.mu.compact.flushing = false
}
}
}
for _, line := range strings.Split(td.Input, "\n") {
if line == "" {
continue
}
parts := strings.Split(line, " ")
if len(parts) != 2 {
panic("malformed data for set-read-compaction")
}
if l, err := strconv.Atoi(parts[0][:1]); err == nil {
keys := strings.Split(parts[1], "-")
rc := readCompaction{
level: l,
start: []byte(keys[0]),
end: []byte(keys[1]),
}
d.mu.compact.readCompactions = append(d.mu.compact.readCompactions, rc)
} else {
return err.Error()
}
}
d.mu.Unlock()
return ""

case "maybe-compact":
d.mu.Lock()
d.opts.private.disableAutomaticCompactions = false
d.maybeScheduleCompaction()
s := compactionString()
d.mu.Unlock()
return s

case "version":
d.mu.Lock()
s := d.mu.versions.currentVersion().DebugString(base.DefaultFormatter)
d.mu.Unlock()
return s

default:
return fmt.Sprintf("unknown command: %s", td.Cmd)
}
})
}

func TestCompactionInuseKeyRanges(t *testing.T) {
parseMeta := func(s string) *fileMetadata {
parts := strings.Split(s, "-")
Expand Down
3 changes: 3 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,9 @@ type DB struct {
manual []*manualCompaction
// inProgress is the set of in-progress flushes and compactions.
inProgress map[*compaction]struct{}
// readCompactions is a list of read triggered compactions. The next
// compaction to perform is as the start. New entries are added to the end.
readCompactions []readCompaction
}

cleaner struct {
Expand Down
2 changes: 1 addition & 1 deletion internal/manifest/l0_sublevels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func readManifest(filename string) (*Version, error) {
if err := bve.Accumulate(&ve); err != nil {
return nil, err
}
if v, _, err = bve.Apply(v, base.DefaultComparer.Compare, base.DefaultFormatter, 10<<20); err != nil {
if v, _, err = bve.Apply(v, base.DefaultComparer.Compare, base.DefaultFormatter, 10<<20, 32000); err != nil {
return nil, err
}
}
Expand Down
5 changes: 5 additions & 0 deletions internal/manifest/level.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func makeLevel(level, sublevel int) Level {
return Level(((sublevel + 1) << levelBits) | level)
}

// LevelToInt returns the int representation of a Level
func LevelToInt(l Level) int {
return int(l) & levelMask
}

// L0Sublevel returns a Level representing the specified L0 sublevel.
func L0Sublevel(sublevel int) Level {
if sublevel < 0 {
Expand Down
Loading

0 comments on commit 19cbfd1

Please sign in to comment.