From 0581f61fec10b04410273cb05c61050c206ccd43 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Wed, 30 Mar 2022 11:15:50 +0200 Subject: [PATCH] kvserver: very rough prototype for storage load metrics https://github.com/cockroachdb/cockroach/issues/65414#issuecomment-1082188947 Release note: None --- pkg/kv/kvserver/batcheval/cmd_scan.go | 6 +++ pkg/kv/kvserver/batcheval/result/result.go | 1 + pkg/kv/kvserver/replica.go | 6 ++- pkg/kv/kvserver/replica_evaluate.go | 47 ++++++++++++++++++++++ pkg/storage/mvcc.go | 16 +++++++- 5 files changed, 74 insertions(+), 2 deletions(-) diff --git a/pkg/kv/kvserver/batcheval/cmd_scan.go b/pkg/kv/kvserver/batcheval/cmd_scan.go index 90a951db5187..b667882e20b5 100644 --- a/pkg/kv/kvserver/batcheval/cmd_scan.go +++ b/pkg/kv/kvserver/batcheval/cmd_scan.go @@ -76,6 +76,12 @@ func Scan( panic(fmt.Sprintf("Unknown scanFormat %d", args.ScanFormat)) } + // TODO: if this method returns with an error, it doesn't return ReadBytes, + // but the reads still happened. So worth thinking about that and perhaps + // passing the stats as a separate return value which always needs to be + // populated on all return paths. + res.ReadBytes = scanRes.ReadBytes + reply.NumKeys = scanRes.NumKeys reply.NumBytes = scanRes.NumBytes diff --git a/pkg/kv/kvserver/batcheval/result/result.go b/pkg/kv/kvserver/batcheval/result/result.go index d5da4ac5f202..b127bc9062f1 100644 --- a/pkg/kv/kvserver/batcheval/result/result.go +++ b/pkg/kv/kvserver/batcheval/result/result.go @@ -161,6 +161,7 @@ type Result struct { Replicated kvserverpb.ReplicatedEvalResult WriteBatch *kvserverpb.WriteBatch LogicalOpLog *kvserverpb.LogicalOpLog + ReadBytes int64 // TODO } // IsZero reports whether p is the zero value. diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index d35bd5c91e93..0b9556280a56 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -240,7 +240,7 @@ type Replica struct { // purpose. // // [1]: https://github.com/cockroachdb/cockroach/pull/16664 - writeStats *replicaStats + writeStats *replicaStats // TODO pinging for diff visibility // creatingReplica is set when a replica is created as uninitialized // via a raft message. @@ -1226,6 +1226,10 @@ func (r *Replica) raftBasicStatusRLocked() raft.BasicStatus { // State returns a copy of the internal state of the Replica, along with some // auxiliary information. +// +// TODO: rename this to RangeInfo, State is misleading +// TODO: include helpful per-replica write/read load information here, derived +// from the new metrics we're adding. func (r *Replica) State(ctx context.Context) kvserverpb.RangeInfo { var ri kvserverpb.RangeInfo diff --git a/pkg/kv/kvserver/replica_evaluate.go b/pkg/kv/kvserver/replica_evaluate.go index 9ce9b7ff58a9..3e4313100d47 100644 --- a/pkg/kv/kvserver/replica_evaluate.go +++ b/pkg/kv/kvserver/replica_evaluate.go @@ -227,6 +227,12 @@ func evaluateBatch( cantDeferWTOE bool } + writeBatchSize := func() int { return 0 } + if b, ok := readWriter.(storage.Batch); ok { + writeBatchSize = b.Len + } + curWriteSize := writeBatchSize() + // TODO(tbg): if we introduced an "executor" helper here that could carry state // across the slots in the batch while we execute them, this code could come // out a lot less ad-hoc. @@ -269,6 +275,47 @@ func evaluateBatch( // (which is sometimes deferred) it is fully populated. curResult, err := evaluateCommand( ctx, readWriter, rec, ms, baHeader, args, reply, ui) + { + newWriteSize := writeBatchSize() + delta := newWriteSize - curWriteSize + curWriteSize = newWriteSize + if delta > 0 { + // TODO: populate per-Method counters following the approach used here: + // https://github.com/cockroachdb/cockroach/blob/f7719998f4dab1d603b84ad2b0e7fe83d521af11/pkg/kv/kvclient/kvcoord/dist_sender.go#L260-L266 + // TODO: decide whether we need to "defer" the metrics recording. For + // reads, this likely makes no sense (if a request evaluates, it does + // the reads right then, so they already happened). But evaluating a + // write just means we're making a WriteBatch. Its writes may never be + // seen to the engine, for example if a later entry in the BatchRequest + // causes an error. The "write" really only happens when we append to + // the log (and even then it may not end up getting applied to the state + // machine, but distinguishing this is diminishing returns imo). So we + // could send the information along with the `result.Result` method and + // only record it upon log submission. I think this is best avoided for + // the first pass, we can just document that we're tracking "potential" + // writes. + // TODO: we also have `Replica.writeStats` which counts the number of + // mutations (sort of... it's complicated). There may be a chance to + // either replace it or we need to at least document better what it + // does. + // TODO: do we want to track write count (on top of write bytes) by + // request type? + // TODO: do we want to expose an EWMA of these per-req breakdowns on the + // (*Replica).State method? This could power a richer "hot ranges" + // dashboard, where you could for example find ranges that are "hot for + // AddSSTable in terms of bytes/sec ingested". Even if we don't break + // it down per request type we should at least expose write bytes/sec + // on that method as in past escalations we really would've wanted the + // hot writing ranges and had no way of getting that. + // TODO: the AddSSTable method will need special casing, as it does not + // create a large write batch but populates curResult.Replicated.AddSSTable.Data. + log.Infof(ctx, "XXX evaluated %s, wrote %d bytes", args.Method(), delta) + } + } + if curResult.ReadBytes > 0 { + // TODO: many of the TODOs for the write path apply here too. + log.Infof(ctx, "XXX evaluated %s, read %d bytes", args.Method(), curResult.ReadBytes) + } if filter := rec.EvalKnobs().TestingPostEvalFilter; filter != nil { filterArgs := kvserverbase.FilterArgs{ diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index dff7a57fe16b..f2a4296f5878 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -2439,6 +2439,8 @@ type MVCCScanResult struct { // used for encoding the uncompressed kv pairs contained in the result. NumBytes int64 + ReadBytes int64 // TODO + ResumeSpan *roachpb.Span ResumeReason roachpb.ResumeReason ResumeNextBytes int64 // populated if TargetBytes != 0, size of next resume kv @@ -2500,8 +2502,20 @@ func MVCCScanToBytes( opts MVCCScanOptions, ) (MVCCScanResult, error) { iter := newMVCCIterator(reader, timestamp.IsEmpty(), IterOptions{LowerBound: key, UpperBound: endKey}) + // TODO: is this better than KeyBytes+ValBytes? + // TODO: force all callers to newMVCCIterator to reckon with the stats + // requirement, to make sure all code paths pass them up. There are other + // ways to do the plumbing that we consider. Think about it and pick a good + // one. For example, newMVCCIterator could take a pebble stats struct that + // pebble would then associate to the iterator and record into directly. Then + // we'd have to plumb a value down, but not pass a value back up in return. + // Notably this sidesteps the question of whether stats are reliably recorded + // on error return paths. defer iter.Close() - return mvccScanToBytes(ctx, iter, key, endKey, timestamp, opts) + res, err := mvccScanToBytes(ctx, iter, key, endKey, timestamp, opts) + res.ReadBytes = int64(iter.Stats().Stats.InternalStats.BlockBytes) + res.ReadBytes++ // HACK: BlockBytes seems to always be zero at least in unit tests, maybe something about the in-mem engine? + return res, err } // MVCCScanAsTxn constructs a temporary transaction from the given transaction