Skip to content

Commit

Permalink
db: populate return statistics for flushable ingests
Browse files Browse the repository at this point in the history
The IngestWithStats ingestion entrypoint returnrs statistics about the ingest.
Cockroach uses these statistics to inform admission control. During a flushable
ingest, at commit time the number of bytes that will be ingested into L0 is
unknown. This commit adds an approximation based on which tables overlapped the
flushable queue. This required adjusting the memtable overlap logic to avoid
short circuiting as soon as overlap is found.

This approximation is rough and may be improved with future work, such as cockroachdb#2112.

Close cockroachdb#2421.
Informs cockroachdb#2112.
  • Loading branch information
jbowens committed Mar 31, 2023
1 parent 53a50a0 commit 853ec5e
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 54 deletions.
102 changes: 81 additions & 21 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ func overlapWithIterator(
return true
}
}
// Assume overlap if iterator errored.
if err := iter.Error(); err != nil {
return true
}

computeOverlapWithSpans := func(rIter keyspan.FragmentIterator) bool {
// NB: The spans surfaced by the fragment iterator are non-overlapping.
Expand All @@ -417,6 +421,10 @@ func overlapWithIterator(
return true
}
}
// Assume overlap if iterator errored.
if err := rIter.Error(); err != nil {
return true
}
return false
}

Expand Down Expand Up @@ -657,11 +665,14 @@ type IngestOperationStats struct {
// Bytes is the total bytes in the ingested sstables.
Bytes uint64
// ApproxIngestedIntoL0Bytes is the approximate number of bytes ingested
// into L0.
// Currently, this value is completely accurate, but we are allowing this to
// be approximate once https://github.com/cockroachdb/pebble/issues/25 is
// implemented.
// into L0. This value is approximate when flushable ingests are active and
// an ingest overlaps an entry in the flushable queue. Currently, this
// approximation is very rough, only including tables that overlapped the
// memtable. This estimate may be improved with #2112.
ApproxIngestedIntoL0Bytes uint64
// MemtableOverlappingFiles is the count of ingested sstables
// that overlapped keys in the memtables.
MemtableOverlappingFiles int
}

// IngestWithStats does the same as Ingest, and additionally returns
Expand Down Expand Up @@ -831,6 +842,10 @@ func (d *DB) ingest(
return IngestOperationStats{}, err
}

// metaFlushableOverlaps is a slice parallel to meta indicating which of the
// ingested sstables overlap some table in the flushable queue. It's used to
// approximate ingest-into-L0 stats when using flushable ingests.
metaFlushableOverlaps := make([]bool, len(meta))
var mem *flushableEntry
// asFlushable indicates whether the sstable was ingested as a flushable.
var asFlushable bool
Expand All @@ -844,29 +859,59 @@ func (d *DB) ingest(
// is ordered from oldest to newest with the mutable memtable being the
// last element in the slice. We want to wait for the newest table that
// overlaps.

for i := len(d.mu.mem.queue) - 1; i >= 0; i-- {
m := d.mu.mem.queue[i]
if ingestMemtableOverlaps(d.cmp, m, meta) {
if (len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) ||
d.mu.formatVers.vers < FormatFlushableIngest ||
d.opts.Experimental.DisableIngestAsFlushable() {
mem = m
if mem.flushable == d.mu.mem.mutable {
err = d.makeRoomForWrite(nil)
iter := m.newIter(nil)
rangeDelIter := m.newRangeDelIter(nil)
rkeyIter := m.newRangeKeyIter(nil)
for i := range meta {
if metaFlushableOverlaps[i] {
// This table already overlapped a more recent flushable.
continue
}
if overlapWithIterator(iter, &rangeDelIter, rkeyIter, meta[i], d.cmp) {
// If this is the first table to overlap a flushable, save
// the flushable. This ingest must be ingested or flushed
// after it.
if mem == nil {
mem = m
}
mem.flushForced = true
d.maybeScheduleFlush()
return
metaFlushableOverlaps[i] = true
}

// The ingestion overlaps with the memtable. Since there aren't
// too many memtables already queued up, we can slide the
// ingested sstables on top of the existing memtables.
err = d.handleIngestAsFlushable(meta, seqNum)
asFlushable = true
return
}
err := iter.Close()
if rangeDelIter != nil {
err = firstError(err, rangeDelIter.Close())
}
if rkeyIter != nil {
err = firstError(err, rkeyIter.Close())
}
if err != nil {
d.opts.Logger.Infof("ingest error reading flushable for log %s: %s", m.logNum, err)
}
}
if mem == nil {
// No overlap with any of the queued flushables.
return
}
// The ingestion overlaps with some entry in the flushable queue.
if d.mu.formatVers.vers < FormatFlushableIngest ||
d.opts.Experimental.DisableIngestAsFlushable() ||
(len(d.mu.mem.queue) > d.opts.MemTableStopWritesThreshold-1) {
// We're not able to ingest as a flushable,
// so we must synchronously flush.
if mem.flushable == d.mu.mem.mutable {
err = d.makeRoomForWrite(nil)
}
mem.flushForced = true
d.maybeScheduleFlush()
return
}
// Since there aren't too many memtables already queued up, we can
// slide the ingested sstables on top of the existing memtables.
asFlushable = true
err = d.handleIngestAsFlushable(meta, seqNum)
}

var ve *versionEdit
Expand Down Expand Up @@ -933,6 +978,9 @@ func (d *DB) ingest(
if e.Level == 0 {
stats.ApproxIngestedIntoL0Bytes += e.Meta.Size
}
if metaFlushableOverlaps[i] {
stats.MemtableOverlappingFiles++
}
}
} else if asFlushable {
info.Tables = make([]struct {
Expand All @@ -942,6 +990,18 @@ func (d *DB) ingest(
for i, f := range meta {
info.Tables[i].Level = -1
info.Tables[i].TableInfo = f.TableInfo()
stats.Bytes += f.Size
// We don't have exact stats on which files were ingested into L0,
// because actual ingestion into the LSM has been deferred until
// flush time. Instead, we infer based on memtable overlap.
//
// TODO(jackson): If we optimistically compute data overlap (#2112)
// before entering the commit pipeline, we can use that overlap to
// improve our approximation.
if metaFlushableOverlaps[i] {
stats.ApproxIngestedIntoL0Bytes += f.Size
stats.MemtableOverlappingFiles++
}
}
}
d.opts.EventListener.TableIngested(info)
Expand Down
76 changes: 43 additions & 33 deletions ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,44 +1219,54 @@ func TestConcurrentIngestCompact(t *testing.T) {
func TestIngestFlushQueuedMemTable(t *testing.T) {
// Verify that ingestion forces a flush of a queued memtable.

mem := vfs.NewMem()
d, err := Open("", &Options{
FS: mem,
})
require.NoError(t, err)
// Test with a format major version prior to FormatFlushableIngest and one
// after. Both should result in the same statistic calculations.
for _, fmv := range []FormatMajorVersion{FormatFlushableIngest - 1, FormatNewest} {
func(fmv FormatMajorVersion) {
mem := vfs.NewMem()
d, err := Open("", &Options{
FS: mem,
FormatMajorVersion: fmv,
})
require.NoError(t, err)

// Add the key "a" to the memtable, then fill up the memtable with the key
// "b". The ingested sstable will only overlap with the queued memtable.
require.NoError(t, d.Set([]byte("a"), nil, nil))
for {
require.NoError(t, d.Set([]byte("b"), nil, nil))
d.mu.Lock()
done := len(d.mu.mem.queue) == 2
d.mu.Unlock()
if done {
break
}
}
// Add the key "a" to the memtable, then fill up the memtable with the key
// "b". The ingested sstable will only overlap with the queued memtable.
require.NoError(t, d.Set([]byte("a"), nil, nil))
for {
require.NoError(t, d.Set([]byte("b"), nil, nil))
d.mu.Lock()
done := len(d.mu.mem.queue) == 2
d.mu.Unlock()
if done {
break
}
}

ingest := func(keys ...string) {
t.Helper()
f, err := mem.Create("ext")
require.NoError(t, err)
ingest := func(keys ...string) {
t.Helper()
f, err := mem.Create("ext")
require.NoError(t, err)

w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{})
for _, k := range keys {
require.NoError(t, w.Set([]byte(k), nil))
}
require.NoError(t, w.Close())
stats, err := d.IngestWithStats([]string{"ext"})
require.NoError(t, err)
require.Equal(t, stats.ApproxIngestedIntoL0Bytes, stats.Bytes)
require.Less(t, uint64(0), stats.Bytes)
}
w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), sstable.WriterOptions{
TableFormat: fmv.MinTableFormat(),
})
for _, k := range keys {
require.NoError(t, w.Set([]byte(k), nil))
}
require.NoError(t, w.Close())
stats, err := d.IngestWithStats([]string{"ext"})
require.NoError(t, err)
require.Equal(t, stats.ApproxIngestedIntoL0Bytes, stats.Bytes)
require.Equal(t, stats.MemtableOverlappingFiles, 1)
require.Less(t, uint64(0), stats.Bytes)
}

ingest("a")
ingest("a")

require.NoError(t, d.Close())
require.NoError(t, d.Close())
}(fmv)
}
}

func TestIngestStats(t *testing.T) {
Expand Down

0 comments on commit 853ec5e

Please sign in to comment.