diff --git a/cmd/pebble/replay.go b/cmd/pebble/replay.go index 58bbf3dcf4..2ac92f3931 100644 --- a/cmd/pebble/replay.go +++ b/cmd/pebble/replay.go @@ -45,6 +45,8 @@ func initReplayCmd() *cobra.Command { &c.name, "name", "", "the name of the workload being replayed") cmd.Flags().VarPF( &c.pacer, "pacer", "p", "the pacer to use: unpaced, reference-ramp, or fixed-ramp=N") + cmd.Flags().Uint64Var( + &c.maxWritesMB, "max-writes", 0, "the maximum volume of writes (MB) to apply, with 0 denoting unlimited") cmd.Flags().StringVar( &c.optionsString, "options", "", "Pebble options to override, in the OPTIONS ini format but with any whitespace as field delimiters instead of newlines") cmd.Flags().StringVar( @@ -61,6 +63,7 @@ type replayConfig struct { pacer pacerFlag runDir string count int + maxWritesMB uint64 streamLogs bool ignoreCheckpoint bool optionsString string @@ -93,6 +96,9 @@ func (c *replayConfig) runOnce(stdout io.Writer, workloadPath string) error { Pacer: c.pacer, Opts: &pebble.Options{}, } + if c.maxWritesMB > 0 { + r.MaxWriteBytes = c.maxWritesMB * (1 << 20) + } if err := c.initRunDir(r); err != nil { return err } diff --git a/replay/replay.go b/replay/replay.go index a40f25d685..8c40662a3d 100644 --- a/replay/replay.go +++ b/replay/replay.go @@ -115,6 +115,7 @@ type Metrics struct { BytesWeightedByLevel uint64 } TotalWriteAmp float64 + WriteBytes uint64 WriteStalls uint64 WriteStallsDuration time.Duration } @@ -180,11 +181,12 @@ func (m *Metrics) BenchmarkString(name string) string { // Runner runs a captured workload against a test database, collecting // metrics on performance. type Runner struct { - RunDir string - WorkloadFS vfs.FS - WorkloadPath string - Pacer Pacer - Opts *pebble.Options + RunDir string + WorkloadFS vfs.FS + WorkloadPath string + Pacer Pacer + Opts *pebble.Options + MaxWriteBytes uint64 // Internal state. @@ -202,6 +204,7 @@ type Runner struct { stepsApplied chan workloadStep metrics struct { + writeBytes uint64 writeStalls uint64 writeStallsDurationNano uint64 } @@ -420,6 +423,7 @@ func (r *Runner) Wait() (Metrics, error) { m := Metrics{ Final: pm, TotalWriteAmp: total.WriteAmp(), + WriteBytes: r.metrics.writeBytes, WriteStalls: r.metrics.writeStalls, WriteStallsDuration: time.Duration(r.metrics.writeStallsDurationNano), } @@ -556,6 +560,7 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error { idx := r.workload.manifestIdx + var cumulativeWriteBytes uint64 var flushBufs flushBuffers var v *manifest.Version var previousVersion *manifest.Version @@ -576,6 +581,10 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error { } for ; idx < len(r.workload.manifests); idx++ { + if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes { + break + } + err := func() error { manifestName := r.workload.manifests[idx] f, err := r.WorkloadFS.Open(r.WorkloadFS.PathJoin(r.WorkloadPath, manifestName)) @@ -688,6 +697,7 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error { if err := loadFlushedSSTableKeys(s.flushBatch, r.WorkloadFS, r.WorkloadPath, newFiles, r.readerOpts, &flushBufs); err != nil { return errors.Wrapf(err, "flush in %q at offset %d", manifestName, rr.Offset()) } + cumulativeWriteBytes += uint64(s.flushBatch.Len()) case ingestStepKind: // Copy the ingested sstables into a staging area within the // run dir. This is necessary for two reasons: @@ -705,6 +715,11 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error { if err := vfs.CopyAcrossFS(r.WorkloadFS, src, r.Opts.FS, dst); err != nil { return errors.Wrapf(err, "ingest in %q at offset %d", manifestName, rr.Offset()) } + finfo, err := r.Opts.FS.Stat(dst) + if err != nil { + return errors.Wrapf(err, "stating %q", dst) + } + cumulativeWriteBytes += uint64(finfo.Size()) s.tablesToIngest = append(s.tablesToIngest, dst) } case compactionStepKind: @@ -716,6 +731,10 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error { return ctx.Err() case r.steps <- s: } + + if r.MaxWriteBytes != 0 && cumulativeWriteBytes > r.MaxWriteBytes { + break + } } return nil }() @@ -723,6 +742,7 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error { return err } } + atomic.StoreUint64(&r.metrics.writeBytes, cumulativeWriteBytes) return nil } diff --git a/replay/replay_test.go b/replay/replay_test.go index 75532ce0fa..476c11e0a7 100644 --- a/replay/replay_test.go +++ b/replay/replay_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/pebble" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/datatest" + "github.com/cockroachdb/pebble/internal/humanize" "github.com/cockroachdb/pebble/internal/invariants" "github.com/cockroachdb/pebble/internal/testkeys" "github.com/cockroachdb/pebble/rangekey" @@ -131,10 +132,11 @@ func runReplayTest(t *testing.T, path string) { ct.WaitForInflightCompactionsToEqual(target) return "" case "wait": - if _, err := r.Wait(); err != nil { + m, err := r.Wait() + if err != nil { return err.Error() } - return "" + return fmt.Sprintf("replayed %s in writes", humanize.Uint64(m.WriteBytes)) case "close": if err := r.Close(); err != nil { return err.Error() diff --git a/replay/testdata/replay b/replay/testdata/replay index 6ce5c31a41..ad588dd352 100644 --- a/replay/testdata/replay +++ b/replay/testdata/replay @@ -85,6 +85,7 @@ replay simple unpaced wait ---- +replayed 42 B in writes # NB: The file sizes are non-deterministic after replay (because compactions are # nondeterministic). We don't `tree` here as a result. diff --git a/replay/testdata/replay_paced b/replay/testdata/replay_paced index be818b7be0..8d5e506bec 100644 --- a/replay/testdata/replay_paced +++ b/replay/testdata/replay_paced @@ -39,6 +39,7 @@ wait-for-compactions wait ---- +replayed 42 B in writes scan-keys ----