Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: add snapshot-pinned keys sstable properties and metrics #1814

Merged
merged 1 commit into from
Mar 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 59 additions & 16 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ var compactLabels = pprof.Labels("pebble", "compact")
var flushLabels = pprof.Labels("pebble", "flush")
var gcLabels = pprof.Labels("pebble", "gc")

// getInternalWriterProperties accesses a private variable (in the
// internal/private package) initialized by the sstable Writer. This indirection
// is necessary to ensure non-Pebble users constructing sstables for ingestion
// are unable to set internal-only properties.
var getInternalWriterProperties = private.SSTableInternalProperties.(func(*sstable.Writer) *sstable.Properties)

// expandedCompactionByteSizeLimit is the maximum number of bytes in all
// compacted files. We avoid expanding the lower level file set of a compaction
// if it would make the total compaction cover more than this many bytes.
Expand Down Expand Up @@ -1961,14 +1967,15 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {

var ve *manifest.VersionEdit
var pendingOutputs []physicalMeta
var stats compactStats
// To determine the target level of the files in the ingestedFlushable, we
// need to acquire the logLock, and not release it for that duration. Since,
// we need to acquire the logLock below to perform the logAndApply step
// anyway, we create the VersionEdit for ingestedFlushable outside of
// runCompaction. For all other flush cases, we construct the VersionEdit
// inside runCompaction.
if c.kind != compactionKindIngestedFlushable {
ve, pendingOutputs, err = d.runCompaction(jobID, c)
ve, pendingOutputs, stats, err = d.runCompaction(jobID, c)
}

// Acquire logLock. This will be released either on an error, by way of
Expand Down Expand Up @@ -2040,6 +2047,9 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
}

bytesFlushed = c.bytesIterated
d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize

d.maybeUpdateDeleteCompactionHints(c)
d.removeInProgressCompaction(c, err != nil)
d.mu.versions.incrementCompactions(c.kind, c.extraLevels)
Expand Down Expand Up @@ -2519,7 +2529,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
d.opts.EventListener.CompactionBegin(info)
startTime := d.timeNow()

ve, pendingOutputs, err := d.runCompaction(jobID, c)
ve, pendingOutputs, stats, err := d.runCompaction(jobID, c)

info.Duration = d.timeNow().Sub(startTime)
if err == nil {
Expand Down Expand Up @@ -2548,6 +2558,9 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
}
}

d.mu.snapshots.cumulativePinnedCount += stats.cumulativePinnedKeys
d.mu.snapshots.cumulativePinnedSize += stats.cumulativePinnedSize

d.maybeUpdateDeleteCompactionHints(c)
d.removeInProgressCompaction(c, err != nil)
d.mu.versions.incrementCompactions(c.kind, c.extraLevels)
Expand All @@ -2569,14 +2582,19 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
return err
}

type compactStats struct {
cumulativePinnedKeys uint64
cumulativePinnedSize uint64
}

// runCompactions runs a compaction that produces new on-disk tables from
// memtables or old on-disk tables.
//
// d.mu must be held when calling this, but the mutex may be dropped and
// re-acquired during the course of this method.
func (d *DB) runCompaction(
jobID int, c *compaction,
) (ve *versionEdit, pendingOutputs []physicalMeta, retErr error) {
) (ve *versionEdit, pendingOutputs []physicalMeta, stats compactStats, retErr error) {
// As a sanity check, confirm that the smallest / largest keys for new and
// deleted files in the new versionEdit pass a validation function before
// returning the edit.
Expand Down Expand Up @@ -2612,7 +2630,7 @@ func (d *DB) runCompaction(
}
c.metrics[cl.level] = levelMetrics
}
return ve, nil, nil
return ve, nil, stats, nil
}

if c.kind == compactionKindIngestedFlushable {
Expand Down Expand Up @@ -2646,7 +2664,7 @@ func (d *DB) runCompaction(
{Level: c.outputLevel.level, Meta: meta},
},
}
return ve, nil, nil
return ve, nil, stats, nil
}

defer func() {
Expand All @@ -2665,16 +2683,19 @@ func (d *DB) runCompaction(

iiter, err := c.newInputIter(d.newIters, d.tableNewRangeKeyIter, snapshots)
if err != nil {
return nil, pendingOutputs, err
return nil, pendingOutputs, stats, err
}
c.allowedZeroSeqNum = c.allowZeroSeqNum()
iter := newCompactionIter(c.cmp, c.equal, c.formatKey, d.merge, iiter, snapshots,
&c.rangeDelFrag, &c.rangeKeyFrag, c.allowedZeroSeqNum, c.elideTombstone,
c.elideRangeTombstone, d.FormatMajorVersion())

var (
createdFiles []base.FileNum
tw *sstable.Writer
createdFiles []base.FileNum
tw *sstable.Writer
pinnedKeySize uint64
pinnedValueSize uint64
pinnedCount uint64
)
defer func() {
if iter != nil {
Expand Down Expand Up @@ -2782,7 +2803,6 @@ func (d *DB) runCompaction(
}
createdFiles = append(createdFiles, fileNum)
cacheOpts := private.SSTableCacheOpts(d.cacheID, fileNum).(sstable.WriterOption)
internalTableOpt := private.SSTableInternalTableOpt.(sstable.WriterOption)

const MaxFileWriteAdditionalCPUTime = time.Millisecond * 100
cpuWorkHandle = d.opts.Experimental.CPUWorkPermissionGranter.GetPermission(
Expand All @@ -2792,7 +2812,7 @@ func (d *DB) runCompaction(
d.opts.Experimental.MaxWriterConcurrency > 0 &&
(cpuWorkHandle.Permitted() || d.opts.Experimental.ForceWriterParallelism)

tw = sstable.NewWriter(writable, writerOpts, cacheOpts, internalTableOpt, &prevPointKey)
tw = sstable.NewWriter(writable, writerOpts, cacheOpts, &prevPointKey)

fileMeta.CreationTime = time.Now().Unix()
ve.NewFiles = append(ve.NewFiles, newFileEntry{
Expand Down Expand Up @@ -2891,7 +2911,22 @@ func (d *DB) runCompaction(
if tw == nil {
return nil
}

{
// Set internal sstable properties.
p := getInternalWriterProperties(tw)
// Set the external sst version to 0. This is what RocksDB expects for
// db-internal sstables; otherwise, it could apply a global sequence number.
p.ExternalFormatVersion = 0
// Set the snapshot pinned totals.
p.SnapshotPinnedKeys = pinnedCount
p.SnapshotPinnedKeySize = pinnedKeySize
p.SnapshotPinnedValueSize = pinnedValueSize
stats.cumulativePinnedKeys += pinnedCount
stats.cumulativePinnedSize += pinnedKeySize + pinnedValueSize
pinnedCount = 0
pinnedKeySize = 0
pinnedValueSize = 0
}
if err := tw.Close(); err != nil {
tw = nil
return err
Expand Down Expand Up @@ -3140,11 +3175,19 @@ func (d *DB) runCompaction(
}
if tw == nil {
if err := newOutput(); err != nil {
return nil, pendingOutputs, err
return nil, pendingOutputs, stats, err
}
}
if err := tw.Add(*key, val); err != nil {
return nil, pendingOutputs, err
return nil, pendingOutputs, stats, err
}
if iter.snapshotPinned {
// The kv pair we just added to the sstable was only surfaced by
// the compaction iterator because an open snapshot prevented
// its elision. Increment the stats.
pinnedCount++
pinnedKeySize += uint64(len(key.UserKey)) + base.InternalTrailerLen
pinnedValueSize += uint64(len(val))
}
}

Expand Down Expand Up @@ -3172,7 +3215,7 @@ func (d *DB) runCompaction(
splitKey = key.UserKey
}
if err := finishOutput(splitKey); err != nil {
return nil, pendingOutputs, err
return nil, pendingOutputs, stats, err
}
}

Expand All @@ -3189,14 +3232,14 @@ func (d *DB) runCompaction(
}

if err := d.objProvider.Sync(); err != nil {
return nil, pendingOutputs, err
return nil, pendingOutputs, stats, err
}

// Refresh the disk available statistic whenever a compaction/flush
// completes, before re-acquiring the mutex.
_ = d.calculateDiskAvailableBytes()

return ve, pendingOutputs, nil
return ve, pendingOutputs, stats, nil
}

// validateVersionEdit validates that start and end keys across new and deleted
Expand Down
Loading