Skip to content

Commit

Permalink
*: Introduce read triggered compaction heuristic
Browse files Browse the repository at this point in the history
Previously, our compactions were triggered on write based heuristics.
This change introduces compactions based on high read activity to improve
read performance in read-heavy workloads. It is inspired from LevelDB's
read based compaction heuristic which is outlined in the following issue:
cockroachdb#29.

These compactions are triggered using an `AllowedSeeks` parameter on
each file's `FileMetadata`. Reads are sampled based on a rate defined in
`iterator.MaybeSampleRead()`. If a read is sampled, `AllowedSeeks` is
decremented on the file in the top-most level containing the key. Once
`AllowedSeeks` reaches 0, a compaction for the key range is scheduled.

Read triggered compactions are only considered if no other compaction is
possible at the time. This helps prioritize score based compactions to
maintain a healthy LSM shape.

The results of this change in benchmarks are outlined in the github
issue: cockroachdb#29.
  • Loading branch information
aadityasondhi committed Dec 17, 2020
1 parent 100e1a4 commit d5d9287
Show file tree
Hide file tree
Showing 22 changed files with 1,362 additions and 53 deletions.
2 changes: 1 addition & 1 deletion cmd/pebble/compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func runReplay(cmd *cobra.Command, args []string) error {
// read amplification.
var bve manifest.BulkVersionEdit
bve.Accumulate(&li.ve)
ref, _, err = bve.Apply(ref, mvccCompare, nil, 0)
ref, _, err = bve.Apply(ref, mvccCompare, nil, 0, 0)
if err != nil {
return err
}
Expand Down
28 changes: 24 additions & 4 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,14 @@ type manualCompaction struct {
end InternalKey
}

type readCompaction struct {
level int
// Key ranges are used instead of file handles as versions could change
// between the read sampling and scheduling a compaction.
start []byte
end []byte
}

func (d *DB) addInProgressCompaction(c *compaction) {
d.mu.compact.inProgress[c] = struct{}{}
var isBase, isIntraL0 bool
Expand Down Expand Up @@ -1263,6 +1271,15 @@ func (d *DB) maybeScheduleFlush() {
return
}

if !d.passedFlushThreshold() {
return
}

d.mu.compact.flushing = true
go d.flush()
}

func (d *DB) passedFlushThreshold() bool {
var n int
var size uint64
for ; n < len(d.mu.mem.queue)-1; n++ {
Expand All @@ -1279,7 +1296,7 @@ func (d *DB) maybeScheduleFlush() {
}
if n == 0 {
// None of the immutable memtables are ready for flushing.
return
return false
}

// Only flush once the sum of the queued memtable sizes exceeds half the
Expand All @@ -1288,11 +1305,10 @@ func (d *DB) maybeScheduleFlush() {
// DB.newMemTable().
minFlushSize := uint64(d.opts.MemTableSize) / 2
if size < minFlushSize {
return
return false
}

d.mu.compact.flushing = true
go d.flush()
return true
}

func (d *DB) maybeScheduleDelayedFlush(tbl *memTable) {
Expand Down Expand Up @@ -1577,6 +1593,10 @@ func (d *DB) maybeScheduleCompactionPicker(

for !d.opts.private.disableAutomaticCompactions && d.mu.compact.compactingCount < d.opts.MaxConcurrentCompactions {
env.inProgressCompactions = d.getInProgressCompactionInfoLocked(nil)
env.readCompactionEnv = readCompactionEnv{
readCompactions: &d.mu.compact.readCompactions,
flushing: d.mu.compact.flushing || d.passedFlushThreshold(),
}
pc := pickFunc(d.mu.versions.picker, env)
if pc == nil {
break
Expand Down
86 changes: 81 additions & 5 deletions compaction_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type compactionEnv struct {
earliestUnflushedSeqNum uint64
earliestSnapshotSeqNum uint64
inProgressCompactions []compactionInfo
readCompactionEnv readCompactionEnv
}

type compactionPicker interface {
Expand All @@ -34,10 +35,16 @@ type compactionPicker interface {
pickAuto(env compactionEnv) (pc *pickedCompaction)
pickManual(env compactionEnv, manual *manualCompaction) (c *pickedCompaction, retryLater bool)
pickElisionOnlyCompaction(env compactionEnv) (pc *pickedCompaction)

pickReadTriggeredCompaction(env compactionEnv) (pc *pickedCompaction)
forceBaseLevel1()
}

// readCompactionEnv is used to hold data required to perform read compactions
type readCompactionEnv struct {
readCompactions *[]readCompaction
flushing bool
}

// Information about in-progress compactions provided to the compaction picker. These are used to
// constrain the new compactions that will be picked.
type compactionInfo struct {
Expand Down Expand Up @@ -408,10 +415,7 @@ func expandToAtomicUnit(
}

func newCompactionPicker(
v *version,
opts *Options,
inProgressCompactions []compactionInfo,
levelSizes [numLevels]int64,
v *version, opts *Options, inProgressCompactions []compactionInfo, levelSizes [numLevels]int64,
) compactionPicker {
p := &compactionPickerByScore{
opts: opts,
Expand Down Expand Up @@ -991,6 +995,11 @@ func (p *compactionPickerByScore) pickAuto(env compactionEnv) (pc *pickedCompact
if pc := p.pickElisionOnlyCompaction(env); pc != nil {
return pc
}

if pc := p.pickReadTriggeredCompaction(env); pc != nil {
return pc
}

return nil
}

Expand Down Expand Up @@ -1323,6 +1332,73 @@ func pickManualHelper(
return pc
}

func (p *compactionPickerByScore) pickReadTriggeredCompaction(
env compactionEnv,
) (pc *pickedCompaction) {
// If a flush is in-progress or expected to happen soon, it means more writes are taking place. We would
// soon be scheduling more write focussed compactions. In this case, skip read compactions as they are
// lower priority.
if env.readCompactionEnv.flushing || env.readCompactionEnv.readCompactions == nil {
return nil
}
for len(*env.readCompactionEnv.readCompactions) > 0 {
rc := (*env.readCompactionEnv.readCompactions)[0]
*env.readCompactionEnv.readCompactions = (*env.readCompactionEnv.readCompactions)[1:]
if pc = pickReadTriggeredCompactionHelper(p, &rc, env); pc != nil {
break
}
}
return pc
}

func pickReadTriggeredCompactionHelper(
p *compactionPickerByScore, rc *readCompaction, env compactionEnv,
) (pc *pickedCompaction) {
cmp := p.opts.Comparer.Compare
overlapSlice := p.vers.Overlaps(rc.level, cmp, rc.start, rc.end)
if overlapSlice.Empty() {
var shouldCompact bool
// If the file for the given key range has moved levels since the compaction
// was scheduled, check to see if the range still has overlapping files
overlapSlice, shouldCompact = updateReadCompaction(p.vers, cmp, rc)
if !shouldCompact {
return nil
}
}
pc = newPickedCompaction(p.opts, p.vers, rc.level, p.baseLevel)
pc.startLevel.files = overlapSlice
if !pc.setupInputs() {
return nil
}
if inputRangeAlreadyCompacting(env, pc) {
return nil
}
return pc
}

func updateReadCompaction(
vers *version, cmp Compare, rc *readCompaction,
) (slice manifest.LevelSlice, shouldCompact bool) {
numOverlap, topLevel := 0, 0
var topOverlaps manifest.LevelSlice
for l := 0; l < numLevels; l++ {
overlaps := vers.Overlaps(l, cmp, rc.start, rc.end)
if !overlaps.Empty() {
numOverlap++
if numOverlap >= 2 {
break
}
topOverlaps = overlaps
topLevel = l
}
}
if numOverlap >= 2 {
rc.level = topLevel
return topOverlaps, true
}
return manifest.LevelSlice{}, false
}

func (p *compactionPickerByScore) forceBaseLevel1() {
p.baseLevel = 1
}
Expand Down
154 changes: 154 additions & 0 deletions compaction_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -875,6 +875,160 @@ func TestCompactionPickerConcurrency(t *testing.T) {
})
}

func TestCompactionPickerPickReadTriggered(t *testing.T) {
opts := (*Options)(nil).EnsureDefaults()
var picker *compactionPickerByScore
var rcList []readCompaction
var vers *version

fileNums := func(files manifest.LevelSlice) string {
var ss []string
files.Each(func(f *fileMetadata) {
ss = append(ss, f.FileNum.String())
})
sort.Strings(ss)
return strings.Join(ss, ",")
}

parseMeta := func(s string) (*fileMetadata, error) {
parts := strings.Split(s, ":")
fileNum, err := strconv.Atoi(parts[0])
if err != nil {
return nil, err
}
fields := strings.Fields(parts[1])
parts = strings.Split(fields[0], "-")
if len(parts) != 2 {
return nil, errors.Errorf("malformed table spec: %s", s)
}
m := &fileMetadata{
FileNum: base.FileNum(fileNum),
Size: 1028,
Smallest: base.ParseInternalKey(strings.TrimSpace(parts[0])),
Largest: base.ParseInternalKey(strings.TrimSpace(parts[1])),
}
for _, p := range fields[1:] {
if strings.HasPrefix(p, "size=") {
v, err := strconv.Atoi(strings.TrimPrefix(p, "size="))
if err != nil {
return nil, err
}
m.Size = uint64(v)
}
}
m.SmallestSeqNum = m.Smallest.SeqNum()
m.LargestSeqNum = m.Largest.SeqNum()
return m, nil
}

datadriven.RunTest(t, "testdata/compaction_picker_read_triggered", func(td *datadriven.TestData) string {
switch td.Cmd {
case "define":
rcList = []readCompaction{}
fileMetas := [manifest.NumLevels][]*fileMetadata{}
level := 0
var err error
lines := strings.Split(td.Input, "\n")

for len(lines) > 0 {
data := strings.TrimSpace(lines[0])
lines = lines[1:]
switch data {
case "L0", "L1", "L2", "L3", "L4", "L5", "L6":
level, err = strconv.Atoi(data[1:])
if err != nil {
return err.Error()
}
default:
meta, err := parseMeta(data)
if err != nil {
return err.Error()
}
fileMetas[level] = append(fileMetas[level], meta)
}
}

vers = newVersion(opts, fileMetas)
vs := &versionSet{
opts: opts,
cmp: DefaultComparer.Compare,
cmpName: DefaultComparer.Name,
}
vs.versions.Init(nil)
vs.append(vers)
var sizes [numLevels]int64
for l := 0; l < len(sizes); l++ {
vers.Levels[l].Slice().Each(func(m *fileMetadata) {
sizes[l] += int64(m.Size)
})
}
var inProgressCompactions []compactionInfo
picker = newCompactionPicker(vers, opts, inProgressCompactions, sizes).(*compactionPickerByScore)
vs.picker = picker

var buf bytes.Buffer
fmt.Fprint(&buf, vers.DebugString(base.DefaultFormatter))
return buf.String()

case "add-read-compaction":
for _, line := range strings.Split(td.Input, "\n") {
if line == "" {
continue
}
parts := strings.Split(line, " ")
if len(parts) != 2 {
return "error: malformed data for add-read-compaction. usage: <level>: <start>-<end>"
}
if l, err := strconv.Atoi(parts[0][:1]); err == nil {
keys := strings.Split(parts[1], "-")

rc := readCompaction{
level: l,
start: []byte(keys[0]),
end: []byte(keys[1]),
}
rcList = append(rcList, rc)
} else {
return err.Error()
}
}
return ""

case "show-read-compactions":
var sb strings.Builder
if len(rcList) == 0 {
sb.WriteString("(none)")
}
for _, rc := range rcList {
sb.WriteString(fmt.Sprintf("(level: %d, start: %s, end: %s)\n", rc.level, string(rc.start), string(rc.end)))
}
return sb.String()

case "pick-auto":
pc := picker.pickAuto(compactionEnv{
bytesCompacted: new(uint64),
earliestUnflushedSeqNum: math.MaxUint64,
readCompactionEnv: readCompactionEnv{
readCompactions: &rcList,
flushing: false,
},
})
var result strings.Builder
if pc != nil {
fmt.Fprintf(&result, "L%d -> L%d\n", pc.startLevel.level, pc.outputLevel.level)
fmt.Fprintf(&result, "L%d: %s\n", pc.startLevel.level, fileNums(pc.startLevel.files))
if !pc.outputLevel.files.Empty() {
fmt.Fprintf(&result, "L%d: %s\n", pc.outputLevel.level, fileNums(pc.outputLevel.files))
}
} else {
return "nil"
}
return result.String()
}
return fmt.Sprintf("unrecognized command: %s", td.Cmd)
})
}

func TestPickedCompactionSetupInputs(t *testing.T) {
opts := &Options{}
opts.EnsureDefaults()
Expand Down
Loading

0 comments on commit d5d9287

Please sign in to comment.