Skip to content

Commit

Permalink
sql: surface query request units consumed by IO
Browse files Browse the repository at this point in the history
This commit adds tracking for request units consumed by IO operations
for all execution operators that perform KV operations. The corresponding
RU count is recorded in the span and later aggregated with the RU consumption
due to network egress and CPU usage. The resulting query RU consumption
estimate is visible in the output of `EXPLAIN ANALYZE`.

Release note (sql change): Added an estimate for the number of request units
consumed by a query to the output of `EXPLAIN ANALYZE` for tenant sessions.
  • Loading branch information
DrewKimball committed Oct 25, 2022
1 parent a87c513 commit 1543bb0
Show file tree
Hide file tree
Showing 11 changed files with 61 additions and 23 deletions.
2 changes: 2 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/tracingpb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_errors//errorspb",
],
Expand Down
11 changes: 10 additions & 1 deletion pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/errorspb"
)
Expand Down Expand Up @@ -789,6 +791,13 @@ func (c *tenantSideCostController) OnResponseWait(
return err
}

// Record the number of RUs consumed by the IO request.
if sp := tracing.SpanFromContext(ctx); sp != nil && sp.RecordingType() != tracingpb.RecordingOff {
sp.RecordStructured(&roachpb.TenantConsumption{
RU: float64(totalRU),
})
}

c.mu.Lock()
defer c.mu.Unlock()

Expand Down Expand Up @@ -830,7 +839,7 @@ func (c *tenantSideCostController) OnExternalIOWait(
}

// OnExternalIO is part of the multitenant.TenantSideExternalIORecorder
// interface.
// interface. TODO(drewk): collect this for queries.
func (c *tenantSideCostController) OnExternalIO(
ctx context.Context, usage multitenant.ExternalIOUsage,
) {
Expand Down
17 changes: 9 additions & 8 deletions pkg/sql/colflow/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,21 +212,22 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats
}

if vsc.kvReader != nil {
// Note that kvReader is non-nil only for ColBatchScans, and this is the
// only case when we want to add the number of rows read, bytes read,
// and the contention time (because the wrapped row-execution KV reading
// processors - joinReaders, tableReaders, zigzagJoiners, and
// invertedJoiners - will add these statistics themselves). Similarly,
// for those wrapped processors it is ok to show the time as "execution
// time" since "KV time" would only make sense for tableReaders, and
// they are less likely to be wrapped than others.
// Note that kvReader is non-nil only for vectorized operators that perform
// kv operations, and this is the only case when we want to add the number
// of rows read, bytes read, and the contention time (because the wrapped
// row-execution KV reading processors - joinReaders, tableReaders,
// zigzagJoiners, and invertedJoiners - will add these statistics
// themselves). Similarly, for those wrapped processors it is ok to show the
// time as "execution time" since "KV time" would only make sense for
// tableReaders, and they are less likely to be wrapped than others.
s.KV.KVTime.Set(time)
s.KV.TuplesRead.Set(uint64(vsc.kvReader.GetRowsRead()))
s.KV.BytesRead.Set(uint64(vsc.kvReader.GetBytesRead()))
s.KV.BatchRequestsIssued.Set(uint64(vsc.kvReader.GetBatchRequestsIssued()))
s.KV.ContentionTime.Set(vsc.kvReader.GetCumulativeContentionTime())
scanStats := vsc.kvReader.GetScanStats()
execstats.PopulateKVMVCCStats(&s.KV, &scanStats)
s.Exec.ConsumedRU.Set(scanStats.RuConsumed)
} else {
s.Exec.ExecTime.Set(time)
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/sql/execinfrapb/component_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ func (s *ComponentStats) formatStats(fn func(suffix string, value interface{}))
if s.Exec.MaxAllocatedDisk.HasValue() {
fn("max sql temp disk usage", humanize.IBytes(s.Exec.MaxAllocatedDisk.Value()))
}
if s.Exec.ConsumedRU.HasValue() {
fn("request units consumed", humanizeutil.Count(s.Exec.ConsumedRU.Value()))
}

// Output stats.
if s.Output.NumBatches.HasValue() {
Expand Down Expand Up @@ -264,6 +267,9 @@ func (s *ComponentStats) Union(other *ComponentStats) *ComponentStats {
if !result.Exec.MaxAllocatedDisk.HasValue() {
result.Exec.MaxAllocatedDisk = other.Exec.MaxAllocatedDisk
}
if !result.Exec.ConsumedRU.HasValue() {
result.Exec.ConsumedRU = other.Exec.ConsumedRU
}

// Output stats.
if !result.Output.NumBatches.HasValue() {
Expand Down Expand Up @@ -356,6 +362,7 @@ func (s *ComponentStats) MakeDeterministic() {
timeVal(&s.Exec.ExecTime)
resetUint(&s.Exec.MaxAllocatedMem)
resetUint(&s.Exec.MaxAllocatedDisk)
resetUint(&s.Exec.ConsumedRU)

// Output.
resetUint(&s.Output.NumBatches)
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/execinfrapb/component_stats.proto
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,10 @@ message ExecStats {
optional util.optional.Duration exec_time = 1 [(gogoproto.nullable) = false];
// Maximum memory allocated by the component.
optional util.optional.Uint max_allocated_mem = 2 [(gogoproto.nullable) = false];

// Maximum scratch disk allocated by the component.
optional util.optional.Uint max_allocated_disk = 3 [(gogoproto.nullable) = false];
// Amount of RUs consumed while executing the component.
optional util.optional.Uint consumed_r_u = 4 [(gogoproto.nullable) = false];
}

// OutputStats contains statistics about the output (results) of a component.
Expand Down
33 changes: 20 additions & 13 deletions pkg/sql/execstats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type ScanStats struct {
// NumInternalSeeks is the number of times that MVCC seek was invoked
// internally, including to step over internal, uncompacted Pebble versions.
NumInternalSeeks uint64
// RuConsumed is the number of RUs that were consumed during the course of a
// scan.
RuConsumed uint64
}

// PopulateKVMVCCStats adds data from the input ScanStats to the input KVStats.
Expand All @@ -85,25 +88,29 @@ func PopulateKVMVCCStats(kvStats *execinfrapb.KVStats, ss *ScanStats) {
// GetScanStats is a helper function to calculate scan stats from the given
// recording or, if the recording is nil, from the tracing span from the
// context.
func GetScanStats(ctx context.Context, recording tracingpb.Recording) (ss ScanStats) {
func GetScanStats(ctx context.Context, recording tracingpb.Recording) (scanStats ScanStats) {
if recording == nil {
recording = tracing.SpanFromContext(ctx).GetConfiguredRecording()
}
var ev roachpb.ScanStats
var ss roachpb.ScanStats
var tc roachpb.TenantConsumption
for i := range recording {
recording[i].Structured(func(any *pbtypes.Any, _ time.Time) {
if !pbtypes.Is(any, &ev) {
return
}
if err := pbtypes.UnmarshalAny(any, &ev); err != nil {
return
if pbtypes.Is(any, &ss) {
if err := pbtypes.UnmarshalAny(any, &ss); err != nil {
return
}
scanStats.NumInterfaceSteps += ss.NumInterfaceSteps
scanStats.NumInternalSteps += ss.NumInternalSteps
scanStats.NumInterfaceSeeks += ss.NumInterfaceSeeks
scanStats.NumInternalSeeks += ss.NumInternalSeeks
} else if pbtypes.Is(any, &tc) {
if err := pbtypes.UnmarshalAny(any, &tc); err != nil {
return
}
scanStats.RuConsumed += uint64(tc.RU)
}

ss.NumInterfaceSteps += ev.NumInterfaceSteps
ss.NumInternalSteps += ev.NumInternalSteps
ss.NumInterfaceSeeks += ev.NumInterfaceSeeks
ss.NumInternalSeeks += ev.NumInternalSeeks
})
}
return ss
return scanStats
}
1 change: 1 addition & 0 deletions pkg/sql/execstats/traceanalyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ func (a *TraceAnalyzer) ProcessStats() error {
a.nodeLevelStats.KVBatchRequestsIssuedGroupedByNode[instanceID] += int64(stats.KV.BatchRequestsIssued.Value())
a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.KV.KVTime.Value()
a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.KV.ContentionTime.Value()
a.nodeLevelStats.RUEstimateGroupedByNode[instanceID] += int64(stats.Exec.ConsumedRU.Value())
}

// Process streamStats.
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/rowexec/inverted_joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,6 +775,7 @@ func (ij *invertedJoiner) execStatsForTrace() *execinfrapb.ComponentStats {
Exec: execinfrapb.ExecStats{
MaxAllocatedMem: optional.MakeUint(uint64(ij.MemMonitor.MaximumBytes())),
MaxAllocatedDisk: optional.MakeUint(uint64(ij.diskMonitor.MaximumBytes())),
ConsumedRU: optional.MakeUint(ij.scanStats.RuConsumed),
},
Output: ij.OutputHelper.Stats(),
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,9 @@ func (jr *joinReader) execStatsForTrace() *execinfrapb.ComponentStats {
BatchRequestsIssued: optional.MakeUint(uint64(jr.fetcher.GetBatchRequestsIssued())),
},
Output: jr.OutputHelper.Stats(),
Exec: execinfrapb.ExecStats{
ConsumedRU: optional.MakeUint(jr.scanStats.RuConsumed),
},
}
// Note that there is no need to include the maximum bytes of
// jr.limitedMemMonitor because it is a child of jr.MemMonitor.
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/rowexec/tablereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,9 @@ func (tr *tableReader) execStatsForTrace() *execinfrapb.ComponentStats {
ContentionTime: optional.MakeTimeValue(execstats.GetCumulativeContentionTime(tr.Ctx, tr.ExecStatsTrace)),
BatchRequestsIssued: optional.MakeUint(uint64(tr.fetcher.GetBatchRequestsIssued())),
},
Exec: execinfrapb.ExecStats{
ConsumedRU: optional.MakeUint(tr.scanStats.RuConsumed),
},
Output: tr.OutputHelper.Stats(),
}
execstats.PopulateKVMVCCStats(&ret.KV, &tr.scanStats)
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/rowexec/zigzagjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,9 @@ func (z *zigzagJoiner) execStatsForTrace() *execinfrapb.ComponentStats {
return &execinfrapb.ComponentStats{
KV: kvStats,
Output: z.OutputHelper.Stats(),
Exec: execinfrapb.ExecStats{
ConsumedRU: optional.MakeUint(z.scanStats.RuConsumed),
},
}
}

Expand Down

0 comments on commit 1543bb0

Please sign in to comment.