Skip to content

Commit

Permalink
db: compute table properties for snapshot-pinned keys
Browse files Browse the repository at this point in the history
Add three new built-in sstable properties that hold the count, cumulative raw
size of keys and cumulative raw size of values that would've been elided were
it not for open snapshots. For now these properties only include point keys and
not range deletions or range keys.

Also, add two new fields to pebble.Metrics: monotonically increasing cumulative
totals of the count and sum of keys written to sstables due to snapshots.
  • Loading branch information
jbowens committed Mar 30, 2023
1 parent 2e9d3f5 commit 53a50a0
Show file tree
Hide file tree
Showing 26 changed files with 557 additions and 271 deletions.
75 changes: 59 additions & 16 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ var compactLabels = pprof.Labels("pebble", "compact")
var flushLabels = pprof.Labels("pebble", "flush")
var gcLabels = pprof.Labels("pebble", "gc")

// getInternalWriterProperties accesses a private variable (in the
// internal/private package) initialized by the sstable Writer. This indirection
// is necessary to ensure non-Pebble users constructing sstables for ingestion
// are unable to set internal-only properties.
var getInternalWriterProperties = private.SSTableInternalProperties.(func(*sstable.Writer) *sstable.Properties)

// expandedCompactionByteSizeLimit is the maximum number of bytes in all
// compacted files. We avoid expanding the lower level file set of a compaction
// if it would make the total compaction cover more than this many bytes.
Expand Down Expand Up @@ -1961,14 +1967,15 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {

var ve *manifest.VersionEdit
var pendingOutputs []physicalMeta
var stats compactStats
// To determine the target level of the files in the ingestedFlushable, we
// need to acquire the logLock, and not release it for that duration. Since,
// we need to acquire the logLock below to perform the logAndApply step
// anyway, we create the VersionEdit for ingestedFlushable outside of
// runCompaction. For all other flush cases, we construct the VersionEdit
// inside runCompaction.
if c.kind != compactionKindIngestedFlushable {
ve, pendingOutputs, err = d.runCompaction(jobID, c)
ve, pendingOutputs, stats, err = d.runCompaction(jobID, c)
}

// Acquire logLock. This will be released either on an error, by way of
Expand Down Expand Up @@ -2040,6 +2047,9 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
}

bytesFlushed = c.bytesIterated
d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize

d.maybeUpdateDeleteCompactionHints(c)
d.removeInProgressCompaction(c, err != nil)
d.mu.versions.incrementCompactions(c.kind, c.extraLevels)
Expand Down Expand Up @@ -2519,7 +2529,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
d.opts.EventListener.CompactionBegin(info)
startTime := d.timeNow()

ve, pendingOutputs, err := d.runCompaction(jobID, c)
ve, pendingOutputs, stats, err := d.runCompaction(jobID, c)

info.Duration = d.timeNow().Sub(startTime)
if err == nil {
Expand Down Expand Up @@ -2548,6 +2558,9 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
}
}

d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize

d.maybeUpdateDeleteCompactionHints(c)
d.removeInProgressCompaction(c, err != nil)
d.mu.versions.incrementCompactions(c.kind, c.extraLevels)
Expand All @@ -2569,14 +2582,19 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
return err
}

type compactStats struct {
cumulativePinnedKeys uint64
cumulativePinnedSize uint64
}

// runCompactions runs a compaction that produces new on-disk tables from
// memtables or old on-disk tables.
//
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) runCompaction(
jobID int, c *compaction,
) (ve *versionEdit, pendingOutputs []physicalMeta, retErr error) {
) (ve *versionEdit, pendingOutputs []physicalMeta, stats compactStats, retErr error) {
// As a sanity check, confirm that the smallest / largest keys for new and
// deleted files in the new versionEdit pass a validation function before
// returning the edit.
Expand Down Expand Up @@ -2612,7 +2630,7 @@ func (d *DB) runCompaction(
}
c.metrics[cl.level] = levelMetrics
}
return ve, nil, nil
return ve, nil, stats, nil
}

if c.kind == compactionKindIngestedFlushable {
Expand Down Expand Up @@ -2646,7 +2664,7 @@ func (d *DB) runCompaction(
{Level: c.outputLevel.level, Meta: meta},
},
}
return ve, nil, nil
return ve, nil, stats, nil
}

defer func() {
Expand All @@ -2665,16 +2683,19 @@ func (d *DB) runCompaction(

iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter, snapshots)
if err != nil {
return nil, pendingOutputs, err
return nil, pendingOutputs, stats, err
}
c.allowedZeroSeqNum = c.allowZeroSeqNum()
iter := newCompactionIter(c.cmp, c.equal, c.formatKey, d.merge, iiter, snapshots,
&c.rangeDelFrag, &c.rangeKeyFrag, c.allowedZeroSeqNum, c.elideTombstone,
c.elideRangeTombstone, d.FormatMajorVersion())

var (
createdFiles []base.FileNum
tw *sstable.Writer
createdFiles []base.FileNum
tw *sstable.Writer
pinnedKeySize uint64
pinnedValueSize uint64
pinnedCount uint64
)
defer func() {
if iter != nil {
Expand Down Expand Up @@ -2782,7 +2803,6 @@ func (d *DB) runCompaction(
}
createdFiles = append(createdFiles, fileNum)
cacheOpts := private.SSTableCacheOpts(d.cacheID, fileNum).(sstable.WriterOption)
internalTableOpt := private.SSTableInternalTableOpt.(sstable.WriterOption)

const MaxFileWriteAdditionalCPUTime = time.Millisecond * 100
cpuWorkHandle = d.opts.Experimental.CPUWorkPermissionGranter.GetPermission(
Expand All @@ -2792,7 +2812,7 @@ func (d *DB) runCompaction(
d.opts.Experimental.MaxWriterConcurrency > 0 &&
(cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)

tw = sstable.NewWriter(writable, writerOpts, cacheOpts, internalTableOpt, &prevPointKey)
tw = sstable.NewWriter(writable, writerOpts, cacheOpts, &prevPointKey)

fileMeta.CreationTime = time.Now().Unix()
ve.NewFiles = append(ve.NewFiles, newFileEntry{
Expand Down Expand Up @@ -2891,7 +2911,22 @@ func (d *DB) runCompaction(
if tw == nil {
return nil
}

{
// Set internal sstable properties.
p := getInternalWriterProperties(tw)
// Set the external sst version to 0. This is what RocksDB expects for
// db-internal sstables; otherwise, it could apply a global sequence number.
p.ExternalFormatVersion = 0
// Set the snapshot pinned totals.
p.SnapshotPinnedKeys = pinnedCount
p.SnapshotPinnedKeySize = pinnedKeySize
p.SnapshotPinnedValueSize = pinnedValueSize
stats.cumulativePinnedKeys += pinnedCount
stats.cumulativePinnedSize += pinnedKeySize + pinnedValueSize
pinnedCount = 0
pinnedKeySize = 0
pinnedValueSize = 0
}
if err := tw.Close(); err != nil {
tw = nil
return err
Expand Down Expand Up @@ -3140,11 +3175,19 @@ func (d *DB) runCompaction(
}
if tw == nil {
if err := newOutput(); err != nil {
return nil, pendingOutputs, err
return nil, pendingOutputs, stats, err
}
}
if err := tw.Add(*key, val); err != nil {
return nil, pendingOutputs, err
return nil, pendingOutputs, stats, err
}
if iter.snapshotPinned {
// The kv pair we just added to the sstable was only surfaced by
// the compaction iterator because an open snapshot prevented
// its elision. Increment the stats.
pinnedCount++
pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen
pinnedValueSize += uint64(len(val))
}
}

Expand Down Expand Up @@ -3172,7 +3215,7 @@ func (d *DB) runCompaction(
splitKey = key.UserKey
}
if err := finishOutput(splitKey); err != nil {
return nil, pendingOutputs, err
return nil, pendingOutputs, stats, err
}
}

Expand All @@ -3189,14 +3232,14 @@ func (d *DB) runCompaction(
}

if err := d.objProvider.Sync(); err != nil {
return nil, pendingOutputs, err
return nil, pendingOutputs, stats, err
}

// Refresh the disk available statistic whenever a compaction/flush
// completes, before re-acquiring the mutex.
_ = d.calculateDiskAvailableBytes()

return ve, pendingOutputs, nil
return ve, pendingOutputs, stats, nil
}

// validateVersionEdit validates that start and end keys across new and deleted
Expand Down
Loading

0 comments on commit 53a50a0

Please sign in to comment.