Skip to content

Commit

Permalink
storage: add single delete callbacks to crash, or increment metrics
Browse files Browse the repository at this point in the history
Handlers are added for SingleDeleteInvariantViolationCallback and
IneffectualSingleDeleteCallback options in Pebble. By default, they crash
the process. If crashing is turned off, they increment metrics,
storage.single-delete.invariant-violation and
storage.single-delete.ineffectual.

Informs #115881
Informs #114421

Epic: none

Release note (ops change): The cluster settings
storage.single_delete.crash_on_invariant_violation.enabled and
storage.single_delete.crash_on_ineffectual.enabled default to true
since these can be indicative of a serious bug that could corrupt
data.
  • Loading branch information
sumeerbhola committed Dec 15, 2023
1 parent d17b97b commit 4299c39
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 18 deletions.
2 changes: 2 additions & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,8 @@
<tr><td>STORAGE</td><td>storage.secondary-cache.write-back-failures</td><td>The number of times writing a cache block to the secondary cache failed</td><td>Num failures</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.shared-storage.read</td><td>Bytes read from shared storage</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.shared-storage.write</td><td>Bytes written to external storage</td><td>Bytes</td><td>GAUGE</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.single-delete.ineffectual</td><td>Number of SingleDeletes that were ineffectual</td><td>Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.single-delete.invariant-violation</td><td>Number of SingleDelete invariant violations</td><td>Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.wal.bytes_in</td><td>The number of logical bytes the storage engine has written to the WAL</td><td>Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.wal.bytes_written</td><td>The number of bytes the storage engine has written to the WAL</td><td>Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>storage.wal.fsync.latency</td><td>The write ahead log fsync latency</td><td>Fsync Latency</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@
<tr><td><div id="setting-storage-ingest-split-enabled" class="anchored"><code>storage.ingest_split.enabled</code></div></td><td>boolean</td><td><code>false</code></td><td>set to true to use ingest-time splitting to lower write-amplification (experimental)</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-storage-max-sync-duration" class="anchored"><code>storage.max_sync_duration</code></div></td><td>duration</td><td><code>20s</code></td><td>maximum duration for disk operations; any operations that take longer than this setting trigger a warning log entry or process crash</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
<tr><td><div id="setting-storage-max-sync-duration-fatal-enabled" class="anchored"><code>storage.max_sync_duration.fatal.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if true, fatal the process when a disk operation exceeds storage.max_sync_duration</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-storage-single-delete-crash-on-ineffectual-enabled" class="anchored"><code>storage.single_delete.crash_on_ineffectual.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to crash if the single delete was ineffectual</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-storage-single-delete-crash-on-invariant-violation-enabled" class="anchored"><code>storage.single_delete.crash_on_invariant_violation.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to crash if the single delete invariant is violated</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-storage-value-blocks-enabled" class="anchored"><code>storage.value_blocks.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>set to true to enable writing of value blocks in sstables</td><td>Serverless/Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-timeseries-storage-enabled" class="anchored"><code>timeseries.storage.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere</td><td>Dedicated/Self-Hosted</td></tr>
<tr><td><div id="setting-timeseries-storage-resolution-10s-ttl" class="anchored"><code>timeseries.storage.resolution_10s.ttl</code></div></td><td>duration</td><td><code>240h0m0s</code></td><td>the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.</td><td>Serverless/Dedicated/Self-Hosted (read-only)</td></tr>
Expand Down
18 changes: 18 additions & 0 deletions pkg/kv/kvserver/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,18 @@ bytes preserved during flushes and compactions over the lifetime of the process.
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaStorageSingleDelInvariantViolationCount = metric.Metadata{
Name: "storage.single-delete.invariant-violation",
Help: "Number of SingleDelete invariant violations",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaStorageSingleDelIneffectualCount = metric.Metadata{
Name: "storage.single-delete.ineffectual",
Help: "Number of SingleDeletes that were ineffectual",
Measurement: "Events",
Unit: metric.Unit_COUNT,
}
metaSharedStorageBytesWritten = metric.Metadata{
Name: "storage.shared-storage.write",
Help: "Bytes written to external storage",
Expand Down Expand Up @@ -2408,6 +2420,8 @@ type StoreMetrics struct {
RdbLevelScore [7]*metric.GaugeFloat64 // idx = level
RdbWriteStalls *metric.Gauge
RdbWriteStallNanos *metric.Gauge
SingleDelInvariantViolations *metric.Gauge
SingleDelIneffectualCount *metric.Gauge
SharedStorageBytesRead *metric.Gauge
SharedStorageBytesWritten *metric.Gauge
SecondaryCacheSize *metric.Gauge
Expand Down Expand Up @@ -3089,6 +3103,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {
IterExternalSteps: metric.NewGauge(metaIterExternalSteps),
IterInternalSeeks: metric.NewGauge(metaIterInternalSeeks),
IterInternalSteps: metric.NewGauge(metaIterInternalSteps),
SingleDelInvariantViolations: metric.NewGauge(metaStorageSingleDelInvariantViolationCount),
SingleDelIneffectualCount: metric.NewGauge(metaStorageSingleDelIneffectualCount),
SharedStorageBytesRead: metric.NewGauge(metaSharedStorageBytesRead),
SharedStorageBytesWritten: metric.NewGauge(metaSharedStorageBytesWritten),
SecondaryCacheSize: metric.NewGauge(metaSecondaryCacheSize),
Expand Down Expand Up @@ -3488,6 +3504,8 @@ func (sm *StoreMetrics) updateEngineMetrics(m storage.Metrics) {
sm.StorageCompactionsPinnedKeys.Update(int64(m.Snapshots.PinnedKeys))
sm.StorageCompactionsPinnedBytes.Update(int64(m.Snapshots.PinnedSize))
sm.StorageCompactionsDuration.Update(int64(m.Compact.Duration))
sm.SingleDelInvariantViolations.Update(m.SingleDelInvariantViolationCount)
sm.SingleDelIneffectualCount.Update(m.SingleDelIneffectualCount)
sm.SharedStorageBytesRead.Update(m.SharedStorageReadBytes)
sm.SharedStorageBytesWritten.Update(m.SharedStorageWriteBytes)
sm.SecondaryCacheSize.Update(m.SecondaryCacheMetrics.Size)
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,6 +1173,13 @@ type Metrics struct {
// DiskStallCount counts the number of times Pebble observes slow writes
// on disk lasting longer than MaxSyncDuration (`storage.max_sync_duration`).
DiskStallCount int64
// SingleDelInvariantViolationCount counts the number of times a
// SingleDelete was found to violate the invariant that it should only be
// used when there is at most one older Set for it to delete.
SingleDelInvariantViolationCount int64
// SingleDelIneffectualCount counts the number of times a SingleDelete was
// ineffectual, i.e., it was elided without deleting anything.
SingleDelIneffectualCount int64
// SharedStorageWriteBytes counts the number of bytes written to shared storage.
SharedStorageWriteBytes int64
// SharedStorageReadBytes counts the number of bytes read from shared storage.
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/metamorphic/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/uint128",
Expand Down
26 changes: 23 additions & 3 deletions pkg/storage/metamorphic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/pebble"
"github.com/cockroachdb/pebble/vfs"
)
Expand Down Expand Up @@ -61,6 +62,21 @@ func (e *engineConfig) create(path string, fs vfs.FS) (storage.Engine, error) {
pebbleConfig.Opts.Cache = pebble.NewCache(1 << 20)
defer pebbleConfig.Opts.Cache.Unref()

pebbleConfig.Opts.Experimental.IneffectualSingleDeleteCallback = func(userKey []byte) {
// TODO(sumeer): figure out what is causing these callbacks.
if false {
ek, ok := storage.DecodeEngineKey(userKey)
if !ok {
log.Fatalf(context.Background(), "unable to decode %s", roachpb.Key(userKey).String())
}
ltk, err := ek.ToLockTableKey()
if err != nil {
log.Fatalf(context.Background(), "%s", err.Error())
}
log.Fatalf(context.Background(), "Ineffectual SingleDel on k=%s str=%s txn=%s",
ltk.Key.String(), ltk.Strength.String(), ltk.TxnUUID.String())
}
}
return storage.NewPebble(context.Background(), pebbleConfig)
}

Expand Down Expand Up @@ -303,7 +319,7 @@ func (m *metaTestRunner) generateAndRun(n int) {
for i := range m.ops {
opRun := &m.ops[i]
output := opRun.op.run(m.ctx)
m.printOp(opRun.name, opRun.args, output)
m.printOp(i, opRun.name, opRun.args, output)
}
}

Expand Down Expand Up @@ -407,7 +423,7 @@ func (m *metaTestRunner) parseFileAndRun(f io.Reader) {
for i := range m.ops {
op := &m.ops[i]
actualOutput := op.op.run(m.ctx)
m.printOp(op.name, op.args, actualOutput)
actualOutput = m.printOp(i, op.name, op.args, actualOutput)
if strings.Compare(strings.TrimSpace(op.expectedOutput), strings.TrimSpace(actualOutput)) != 0 {
// Error messages can sometimes mismatch. If both outputs contain "error",
// consider this a pass.
Expand Down Expand Up @@ -482,15 +498,19 @@ func (m *metaTestRunner) resolveAndAddOp(op *opGenerator, fixedArgs ...string) {
}

// Print passed-in operation, arguments and output string to output file.
func (m *metaTestRunner) printOp(opName string, argStrings []string, output string) {
func (m *metaTestRunner) printOp(
index int, opName string, argStrings []string, output string,
) string {
fmt.Fprintf(m.w, "%s(", opName)
for i, arg := range argStrings {
if i > 0 {
fmt.Fprintf(m.w, ", ")
}
fmt.Fprintf(m.w, "%s", arg)
}
output = fmt.Sprintf("%s // #%d", output, index)
fmt.Fprintf(m.w, ") -> %s\n", output)
return output
}

// printComment prints a comment line into the output file. Supports single-line
Expand Down
71 changes: 56 additions & 15 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,22 @@ var IngestAsFlushable = settings.RegisterBoolSetting(
util.ConstantWithMetamorphicTestBool(
"storage.ingest_as_flushable.enabled", true))

var SingleDeleteCrashOnInvariantViolation = settings.RegisterBoolSetting(
settings.SystemOnly,
"storage.single_delete.crash_on_invariant_violation.enabled",
"set to true to crash if the single delete invariant is violated",
true,
settings.WithPublic,
)

var SingleDeleteCrashOnIneffectual = settings.RegisterBoolSetting(
settings.SystemOnly,
"storage.single_delete.crash_on_ineffectual.enabled",
"set to true to crash if the single delete was ineffectual",
true,
settings.WithPublic,
)

// ShouldUseEFOS returns true if either of the UseEFOS or UseExciseForSnapshots
// cluster settings are enabled, and EventuallyFileOnlySnapshots must be used
// to guarantee snapshot-like semantics.
Expand Down Expand Up @@ -819,14 +835,16 @@ type Pebble struct {

// Stats updated by pebble.EventListener invocations, and returned in
// GetMetrics. Updated and retrieved atomically.
writeStallCount int64
writeStallDuration time.Duration
writeStallStartNanos int64
diskSlowCount int64
diskStallCount int64
sharedBytesRead int64
sharedBytesWritten int64
iterStats struct {
writeStallCount int64
writeStallDuration time.Duration
writeStallStartNanos int64
diskSlowCount int64
diskStallCount int64
singleDelInvariantViolationCount int64
singleDelIneffectualCount int64
sharedBytesRead int64
sharedBytesWritten int64
iterStats struct {
syncutil.Mutex
AggregatedIteratorStats
}
Expand Down Expand Up @@ -1178,6 +1196,27 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) {
})
}

opts.Experimental.SingleDeleteInvariantViolationCallback = func(userKey []byte) {
logFunc := log.Errorf
if SingleDeleteCrashOnInvariantViolation.Get(&cfg.Settings.SV) {
logFunc = log.Fatalf
}
logFunc(logCtx, "SingleDel invariant violation on key %s", roachpb.Key(userKey))
atomic.AddInt64(&p.singleDelInvariantViolationCount, 1)
}
// TODO(sumeer): remove this nil check once the metamorphic test is fixed,
// and does not need to override.
if opts.Experimental.IneffectualSingleDeleteCallback == nil {
opts.Experimental.IneffectualSingleDeleteCallback = func(userKey []byte) {
logFunc := log.Errorf
if SingleDeleteCrashOnIneffectual.Get(&cfg.Settings.SV) {
logFunc = log.Fatalf
}
logFunc(logCtx, "Ineffectual SingleDel on key %s", roachpb.Key(userKey))
atomic.AddInt64(&p.singleDelIneffectualCount, 1)
}
}

// MaxConcurrentCompactions can be set by multiple sources, but all the
// sources will eventually call NewPebble. So, we override
// Opts.MaxConcurrentCompactions to a closure which will return
Expand Down Expand Up @@ -1978,13 +2017,15 @@ func (p *Pebble) Flush() error {
// GetMetrics implements the Engine interface.
func (p *Pebble) GetMetrics() Metrics {
m := Metrics{
Metrics: p.db.Metrics(),
WriteStallCount: atomic.LoadInt64(&p.writeStallCount),
WriteStallDuration: time.Duration(atomic.LoadInt64((*int64)(&p.writeStallDuration))),
DiskSlowCount: atomic.LoadInt64(&p.diskSlowCount),
DiskStallCount: atomic.LoadInt64(&p.diskStallCount),
SharedStorageReadBytes: atomic.LoadInt64(&p.sharedBytesRead),
SharedStorageWriteBytes: atomic.LoadInt64(&p.sharedBytesWritten),
Metrics: p.db.Metrics(),
WriteStallCount: atomic.LoadInt64(&p.writeStallCount),
WriteStallDuration: time.Duration(atomic.LoadInt64((*int64)(&p.writeStallDuration))),
DiskSlowCount: atomic.LoadInt64(&p.diskSlowCount),
DiskStallCount: atomic.LoadInt64(&p.diskStallCount),
SingleDelInvariantViolationCount: atomic.LoadInt64(&p.singleDelInvariantViolationCount),
SingleDelIneffectualCount: atomic.LoadInt64(&p.singleDelIneffectualCount),
SharedStorageReadBytes: atomic.LoadInt64(&p.sharedBytesRead),
SharedStorageWriteBytes: atomic.LoadInt64(&p.sharedBytesWritten),
}
p.iterStats.Lock()
m.Iterator = p.iterStats.AggregatedIteratorStats
Expand Down

0 comments on commit 4299c39

Please sign in to comment.