From 682fe34e92da51407c137bcfdf32625338d97e38 Mon Sep 17 00:00:00 2001 From: Drew Kimball Date: Tue, 15 Nov 2022 01:28:10 -0800 Subject: [PATCH] **sql: surface query request units consumed by network egress** This commit adds a top-level field to the output of `EXPLAIN ANALYZE` that shows the estimated number of RUs that would be consumed due to network egress to the client. The estimate is obtained by buffering each value from the query result in text format and then measuring the size of the buffer before resetting it. The result is used to get the RU consumption with the tenant cost config's `PGWireEgressCost` method. **sql: surface query request units consumed due to cpu usage** This commit adds the ability for clients to estimate the number of RUs consumed by a query due to CPU usage. This is accomplished by keeping a moving average of the CPU usage for the entire tenant process, then using that to obtain an estimate for what the CPU usage *would* be if the query wasn't running. This is then compared against the actual measured CPU usage during the query's execution to get the estimate. For local flows this is done at the `connExecutor` level; for remote flows this is handled by the last outbox on the node (which gathers and sends the flow's metadata). The resulting RU estimate is added to the existing estimate from network egress and displayed in the output of `EXPLAIN ANALYZE`. **sql: surface query request units consumed by IO** This commit adds tracking for request units consumed by IO operations for all execution operators that perform KV operations. The corresponding RU count is recorded in the span and later aggregated with the RU consumption due to network egress and CPU usage. The resulting query RU consumption estimate is visible in the output of `EXPLAIN ANALYZE`. **multitenantccl: add sanity testing for ru estimation** This commit adds a sanity test for the RU estimates produced by running queries with `EXPLAIN ANALYZE` on a tenant. The test runs each test query several times with `EXPLAIN ANALYZE`, then runs all test queries without `EXPLAIN ANALYZE` and compares the resulting actual RU measurement to the aggregated estimates. For now, this test is disabled during builds because it is flaky in the presence of background activity. For this reason it should only be used as a manual sanity test. Informs #74441 Release note (sql change): Added an estimate for the number of request units consumed by a query to the output of `EXPLAIN ANALYZE` for tenant sessions. --- pkg/BUILD.bazel | 2 + .../tenantcostclient/BUILD.bazel | 7 + .../query_ru_estimate_test.go | 193 ++++++++++++++++++ .../tenantcostclient/tenant_side.go | 37 +++- pkg/multitenant/cost_controller.go | 8 + pkg/multitenant/multitenantcpu/BUILD.bazel | 17 ++ pkg/multitenant/multitenantcpu/cpu_usage.go | 82 ++++++++ pkg/server/BUILD.bazel | 1 + pkg/server/server_sql.go | 1 + pkg/server/tenant.go | 15 +- pkg/sql/BUILD.bazel | 2 + pkg/sql/colexec/columnarizer.go | 3 + pkg/sql/colflow/colrpc/outbox.go | 6 +- pkg/sql/colflow/stats.go | 1 + pkg/sql/colflow/vectorized_flow.go | 11 +- pkg/sql/colflow/vectorized_flow_test.go | 2 +- pkg/sql/conn_executor.go | 5 + pkg/sql/conn_executor_exec.go | 31 ++- pkg/sql/distsql_running.go | 78 ++++++- pkg/sql/execinfra/BUILD.bazel | 1 + pkg/sql/execinfra/flow_context.go | 6 + pkg/sql/execinfra/server_config.go | 3 + pkg/sql/execinfrapb/component_stats.go | 7 + pkg/sql/execinfrapb/component_stats.proto | 6 +- pkg/sql/execstats/stats.go | 33 +-- pkg/sql/execstats/traceanalyzer.go | 17 +- pkg/sql/execstats/traceanalyzer_test.go | 3 + pkg/sql/flowinfra/flow.go | 6 + pkg/sql/flowinfra/outbox.go | 1 + pkg/sql/instrumentation.go | 11 + pkg/sql/opt/exec/explain/output.go | 10 + pkg/sql/pgwire/conn.go | 71 +++++++ pkg/sql/plan_node_to_row_source.go | 36 +++- pkg/sql/rowexec/inverted_joiner.go | 1 + pkg/sql/rowexec/joinreader.go | 1 + pkg/sql/rowexec/tablereader.go | 1 + pkg/sql/rowexec/zigzagjoiner.go | 4 +- 37 files changed, 674 insertions(+), 46 deletions(-) create mode 100644 pkg/ccl/multitenantccl/tenantcostclient/query_ru_estimate_test.go create mode 100644 pkg/multitenant/multitenantcpu/BUILD.bazel create mode 100644 pkg/multitenant/multitenantcpu/cpu_usage.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index ce2cfd2f668f..351fa068bab0 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1202,6 +1202,7 @@ GO_TARGETS = [ "//pkg/kv/kvserver:kvserver_test", "//pkg/kv:kv", "//pkg/kv:kv_test", + "//pkg/multitenant/multitenantcpu:multitenantcpu", "//pkg/multitenant/multitenantio:multitenantio", "//pkg/multitenant/tenantcostmodel:tenantcostmodel", "//pkg/multitenant:multitenant", @@ -2478,6 +2479,7 @@ GET_X_DATA_TARGETS = [ "//pkg/kv/kvserver/txnwait:get_x_data", "//pkg/kv/kvserver/uncertainty:get_x_data", "//pkg/multitenant:get_x_data", + "//pkg/multitenant/multitenantcpu:get_x_data", "//pkg/multitenant/multitenantio:get_x_data", "//pkg/multitenant/tenantcostmodel:get_x_data", "//pkg/obs:get_x_data", diff --git a/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel b/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel index ae6756c758e9..5093bd77185b 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel +++ b/pkg/ccl/multitenantccl/tenantcostclient/BUILD.bazel @@ -26,6 +26,8 @@ go_library( "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", + "//pkg/util/tracing", + "//pkg/util/tracing/tracingpb", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//errorspb", ], @@ -36,6 +38,7 @@ go_test( srcs = [ "limiter_test.go", "main_test.go", + "query_ru_estimate_test.go", "tenant_side_test.go", "token_bucket_test.go", ], @@ -47,6 +50,8 @@ go_test( "//pkg/blobs", "//pkg/ccl", "//pkg/ccl/changefeedccl", + "//pkg/ccl/kvccl/kvtenantccl", + "//pkg/ccl/multitenantccl/tenantcostserver", "//pkg/ccl/utilccl", "//pkg/cloud", "//pkg/cloud/nodelocal", @@ -72,12 +77,14 @@ go_test( "//pkg/sql/stats", "//pkg/testutils", "//pkg/testutils/serverutils", + "//pkg/testutils/skip", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", "//pkg/util/ctxgroup", "//pkg/util/ioctx", "//pkg/util/leaktest", "//pkg/util/log", + "//pkg/util/protoutil", "//pkg/util/randutil", "//pkg/util/stop", "//pkg/util/syncutil", diff --git a/pkg/ccl/multitenantccl/tenantcostclient/query_ru_estimate_test.go b/pkg/ccl/multitenantccl/tenantcostclient/query_ru_estimate_test.go new file mode 100644 index 000000000000..d5a4e2176316 --- /dev/null +++ b/pkg/ccl/multitenantccl/tenantcostclient/query_ru_estimate_test.go @@ -0,0 +1,193 @@ +// Copyright 2022 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package tenantcostclient_test + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + _ "github.com/cockroachdb/cockroach/pkg/ccl" // ccl init hooks + _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" + "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostclient" + _ "github.com/cockroachdb/cockroach/pkg/ccl/multitenantccl/tenantcostserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/stats" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/stretchr/testify/require" +) + +// TestEstimateQueryRUConsumption is a sanity check for the RU estimates +// produced for queries that are run by a tenant under EXPLAIN ANALYZE. The RU +// consumption of a query is not deterministic, since it depends on inexact +// quantities like the (already estimated) CPU usage. Therefore, the test runs +// each query multiple times and then checks that the total estimated RU +// consumption is within reasonable distance from the actual measured RUs for +// the tenant. +func TestEstimateQueryRUConsumption(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // This test becomes flaky when the machine/cluster is under significant + // background load, so it should only be run manually. + skip.IgnoreLint(t, "intended to be manually run as a sanity test") + + ctx := context.Background() + + st := cluster.MakeTestingClusterSettings() + stats.AutomaticStatisticsClusterMode.Override(ctx, &st.SV, false) + stats.UseStatisticsOnSystemTables.Override(ctx, &st.SV, false) + stats.AutomaticStatisticsOnSystemTables.Override(ctx, &st.SV, false) + + // Lower the target duration for reporting tenant usage so that it can be + // measured accurately. Avoid decreasing too far, since doing so can add + // measurable overhead. + tenantcostclient.TargetPeriodSetting.Override(ctx, &st.SV, time.Millisecond*500) + + params := base.TestServerArgs{ + Settings: st, + DisableDefaultTestTenant: true, + } + + s, mainDB, _ := serverutils.StartServer(t, params) + defer s.Stopper().Stop(ctx) + sysDB := sqlutils.MakeSQLRunner(mainDB) + + tenantID := serverutils.TestTenantID() + tenant1, tenantDB1 := serverutils.StartTenant(t, s, base.TestTenantArgs{ + TenantID: tenantID, + Settings: st, + }) + defer tenant1.Stopper().Stop(ctx) + defer tenantDB1.Close() + tdb := sqlutils.MakeSQLRunner(tenantDB1) + tdb.Exec(t, "SET CLUSTER SETTING sql.stats.automatic_collection.enabled=false") + tdb.Exec(t, "CREATE TABLE abcd (a INT, b INT, c INT, d INT, INDEX (a, b, c))") + + type testCase struct { + sql string + count int + } + testCases := []testCase{ + { // Insert statement + sql: "INSERT INTO abcd (SELECT t%2, t%3, t, -t FROM generate_series(1,50000) g(t))", + count: 1, + }, + { // Point query + sql: "SELECT a FROM abcd WHERE (a, b) = (1, 1)", + count: 10, + }, + { // Range query + sql: "SELECT a FROM abcd WHERE (a, b) = (1, 1) AND c > 0 AND c < 10000", + count: 10, + }, + { // Aggregate + sql: "SELECT count(*) FROM abcd", + count: 10, + }, + { // Distinct + sql: "SELECT DISTINCT ON (a, b) * FROM abcd", + count: 10, + }, + { // Full table scan + sql: "SELECT a FROM abcd", + count: 10, + }, + { // Lookup join + sql: "SELECT a FROM (VALUES (1, 1), (0, 2)) v(x, y) INNER LOOKUP JOIN abcd ON (a, b) = (x, y)", + count: 10, + }, + { // Index join + sql: "SELECT * FROM abcd WHERE (a, b) = (0, 0)", + count: 10, + }, + { // No kv IO, lots of network egress. + sql: "SELECT 'deadbeef' FROM generate_series(1, 50000)", + count: 10, + }, + } + + var err error + var tenantEstimatedRUs int + for _, tc := range testCases { + for i := 0; i < tc.count; i++ { + output := tdb.QueryStr(t, "EXPLAIN ANALYZE "+tc.sql) + var estimatedRU int + for _, row := range output { + if len(row) != 1 { + t.Fatalf("expected one column") + } + val := row[0] + if strings.Contains(val, "estimated RUs consumed") { + substr := strings.Split(val, " ") + require.Equalf(t, 4, len(substr), "expected RU consumption message to have four words") + ruCountStr := strings.Replace(strings.TrimSpace(substr[3]), ",", "", -1) + estimatedRU, err = strconv.Atoi(ruCountStr) + require.NoError(t, err, "failed to retrieve estimated RUs") + break + } + } + tenantEstimatedRUs += estimatedRU + } + } + + getTenantRUs := func() float64 { + // Sleep to ensure the measured RU consumption gets recorded in the + // tenant_usage table. + time.Sleep(time.Second) + var consumptionBytes []byte + var consumption roachpb.TenantConsumption + var tenantRUs float64 + rows := sysDB.Query(t, + fmt.Sprintf( + "SELECT total_consumption FROM system.tenant_usage WHERE tenant_id = %d AND instance_id = 0", + tenantID.ToUint64(), + ), + ) + for rows.Next() { + require.NoError(t, rows.Scan(&consumptionBytes)) + if len(consumptionBytes) == 0 { + continue + } + require.NoError(t, protoutil.Unmarshal(consumptionBytes, &consumption)) + tenantRUs += consumption.RU + } + return tenantRUs + } + tenantStartRUs := getTenantRUs() + + var tenantMeasuredRUs float64 + for _, tc := range testCases { + for i := 0; i < tc.count; i++ { + tdb.QueryStr(t, tc.sql) + } + } + + // Check the estimated RU aggregate for all the queries against the actual + // measured RU consumption for the tenant. + tenantMeasuredRUs = getTenantRUs() - tenantStartRUs + const deltaFraction = 0.25 + allowedDelta := tenantMeasuredRUs * deltaFraction + require.InDeltaf(t, tenantMeasuredRUs, tenantEstimatedRUs, allowedDelta, + "estimated RUs (%d) were not within %f RUs of the expected value (%f)", + tenantEstimatedRUs, + allowedDelta, + tenantMeasuredRUs, + ) +} diff --git a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go index 6e0afdb62542..c2e235dfefdc 100644 --- a/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go +++ b/pkg/ccl/multitenantccl/tenantcostclient/tenant_side.go @@ -25,6 +25,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/errorspb" ) @@ -129,6 +131,9 @@ const defaultTickInterval = time.Second // 0.5^(1 second / tickInterval) const movingAvgRUPerSecFactor = 0.5 +// movingAvgCPUPerSecFactor is the weight applied to a new sample of CPU usage. +const movingAvgCPUPerSecFactor = 0.5 + // We request more tokens when the available RUs go below a threshold. The // threshold is a fraction of the last granted RUs. const notifyFraction = 0.1 @@ -260,6 +265,11 @@ type tenantSideCostController struct { // It is read and written on multiple goroutines and so must be protected // by a mutex. consumption roachpb.TenantConsumption + + // avgCPUPerSec is an exponentially-weighted moving average of the CPU usage + // per second; used to estimate the CPU usage of a query. It is only written + // in the main loop, but can be read by multiple goroutines so is protected. + avgCPUPerSec float64 } // lowRUNotifyChan is used when the number of available RUs is running low and @@ -389,12 +399,14 @@ func (c *tenantSideCostController) onTick(ctx context.Context, newTime time.Time // Update CPU consumption. deltaCPU := newExternalUsage.CPUSecs - c.run.externalUsage.CPUSecs - // Subtract any allowance that we consider free background usage. deltaTime := newTime.Sub(c.run.lastTick) if deltaTime > 0 { + // Subtract any allowance that we consider free background usage. allowance := CPUUsageAllowance.Get(&c.settings.SV).Seconds() * deltaTime.Seconds() deltaCPU -= allowance + avgCPU := deltaCPU / deltaTime.Seconds() + // If total CPU usage is small (less than 3% of a single CPU by default) // and there have been no recent read/write operations, then ignore the // recent usage altogether. This is intended to minimize RU usage when the @@ -406,6 +418,9 @@ func (c *tenantSideCostController) onTick(ctx context.Context, newTime time.Time deltaCPU = 0 } } + // Keep track of an exponential moving average of CPU usage. + c.mu.avgCPUPerSec *= 1 - movingAvgCPUPerSecFactor + c.mu.avgCPUPerSec += avgCPU * movingAvgCPUPerSecFactor c.mu.Unlock() } if deltaCPU < 0 { @@ -773,6 +788,13 @@ func (c *tenantSideCostController) OnResponseWait( return err } + // Record the number of RUs consumed by the IO request. + if sp := tracing.SpanFromContext(ctx); sp != nil && sp.RecordingType() != tracingpb.RecordingOff { + sp.RecordStructured(&roachpb.TenantConsumption{ + RU: float64(totalRU), + }) + } + c.mu.Lock() defer c.mu.Unlock() @@ -853,3 +875,16 @@ func (c *tenantSideCostController) onExternalIO( return nil } + +// GetCPUMovingAvg is used to obtain an exponential moving average estimate +// for the CPU usage in seconds per each second of wall-clock time. +func (c *tenantSideCostController) GetCPUMovingAvg() float64 { + c.mu.Lock() + defer c.mu.Unlock() + return c.mu.avgCPUPerSec +} + +// GetCostConfig is part of the multitenant.TenantSideCostController interface. +func (c *tenantSideCostController) GetCostConfig() *tenantcostmodel.Config { + return &c.costCfg +} diff --git a/pkg/multitenant/cost_controller.go b/pkg/multitenant/cost_controller.go index c8167fde8464..b7ebe39c17f1 100644 --- a/pkg/multitenant/cost_controller.go +++ b/pkg/multitenant/cost_controller.go @@ -32,6 +32,14 @@ type TenantSideCostController interface { nextLiveInstanceIDFn NextLiveInstanceIDFn, ) error + // GetCPUMovingAvg returns an exponential moving average used for estimating + // the CPU usage (in CPU secs) per wall-clock second. + GetCPUMovingAvg() float64 + + // GetCostConfig returns the cost model config this TenantSideCostController + // is using. + GetCostConfig() *tenantcostmodel.Config + TenantSideKVInterceptor TenantSideExternalIORecorder diff --git a/pkg/multitenant/multitenantcpu/BUILD.bazel b/pkg/multitenant/multitenantcpu/BUILD.bazel new file mode 100644 index 000000000000..c55ee4944a24 --- /dev/null +++ b/pkg/multitenant/multitenantcpu/BUILD.bazel @@ -0,0 +1,17 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "multitenantcpu", + srcs = ["cpu_usage.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/multitenant/multitenantcpu", + visibility = ["//visibility:public"], + deps = [ + "//pkg/multitenant", + "//pkg/server/status", + "//pkg/util/log", + "//pkg/util/timeutil", + ], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/multitenant/multitenantcpu/cpu_usage.go b/pkg/multitenant/multitenantcpu/cpu_usage.go new file mode 100644 index 000000000000..3c52f3d4400a --- /dev/null +++ b/pkg/multitenant/multitenantcpu/cpu_usage.go @@ -0,0 +1,82 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package multitenantcpu + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/multitenant" + "github.com/cockroachdb/cockroach/pkg/server/status" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// CPUUsageHelper is used to estimate the RUs consumed by a query due to CPU +// usage. It works by using a moving average for the process CPU usage to +// project what the usage *would* be if the query hadn't run, then subtracts +// that from the actual measured CPU usage. +// +// TODO(drewk): this isn't accurate when there are many concurrent queries. +// We should use the grunning library for a more accurate measurement. +type CPUUsageHelper struct { + startCPU float64 + avgCPUPerSecond float64 + startTime time.Time + costController multitenant.TenantSideCostController +} + +// StartCollection should be called at the beginning of execution for a flow. +// It is a no-op for non-tenants (e.g. when costController is nil). +func (h *CPUUsageHelper) StartCollection( + ctx context.Context, costController multitenant.TenantSideCostController, +) { + if costController == nil { + return + } + h.costController = costController + h.startTime = timeutil.Now() + h.startCPU = GetCPUSeconds(ctx) + h.avgCPUPerSecond = h.costController.GetCPUMovingAvg() +} + +// EndCollection should be called at the end of execution for a flow in order to +// get the estimated number of RUs consumed due to CPU usage. It returns zero +// for non-tenants. +func (h *CPUUsageHelper) EndCollection(ctx context.Context) (ruFomCPU float64) { + if h.costController == nil || h.costController.GetCostConfig() == nil { + return 0 + } + cpuDelta := GetCPUSeconds(ctx) - h.startCPU + timeElapsed := timeutil.Since(h.startTime).Seconds() + expectedCPUDelta := timeElapsed * h.avgCPUPerSecond + cpuUsageSeconds := cpuDelta - expectedCPUDelta + if cpuUsageSeconds < 0 { + cpuUsageSeconds = 0 + } + if cpuUsageSeconds > timeElapsed { + // It's unlikely that any single query will have such high CPU usage, so + // bound the estimate above to reduce the possibility of gross error. + cpuUsageSeconds = timeElapsed + } + return float64(h.costController.GetCostConfig().PodCPUCost(cpuUsageSeconds)) +} + +// GetCPUSeconds returns the total CPU usage of the current process in seconds. +// It is used for measuring tenant RU consumption. +func GetCPUSeconds(ctx context.Context) (cpuSecs float64) { + userTimeMillis, sysTimeMillis, err := status.GetCPUTime(ctx) + if err != nil { + log.Ops.Errorf(ctx, "unable to get cpu usage: %v", err) + return 0 + } + return float64(userTimeMillis+sysTimeMillis) * 1e-3 +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index a578b6b50cb2..1550c5eb62c4 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -116,6 +116,7 @@ go_library( "//pkg/kv/kvserver/rangefeed", "//pkg/kv/kvserver/reports", "//pkg/multitenant", + "//pkg/multitenant/multitenantcpu", "//pkg/multitenant/multitenantio", "//pkg/multitenant/tenantcostmodel", "//pkg/obs", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 69f66eebf9b3..2c6a4ee42514 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -684,6 +684,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { SQLSQLResponseAdmissionQ: cfg.sqlSQLResponseAdmissionQ, CollectionFactory: collectionFactory, ExternalIORecorder: cfg.costController, + TenantCostController: cfg.costController, RangeStatsFetcher: rangeStatsFetcher, AdmissionPacerFactory: cfg.admissionPacerFactory, } diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 668dd57d698c..d72a9b6c651f 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -29,6 +29,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptprovider" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts/ptreconcile" "github.com/cockroachdb/cockroach/pkg/multitenant" + "github.com/cockroachdb/cockroach/pkg/multitenant/multitenantcpu" "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcostmodel" "github.com/cockroachdb/cockroach/pkg/obs" "github.com/cockroachdb/cockroach/pkg/obsservice/obspb" @@ -354,12 +355,8 @@ func startTenantInternal( args.eventsServer.SetResourceInfo(clusterID, int32(instanceID), "unknown" /* version */) externalUsageFn := func(ctx context.Context) multitenant.ExternalUsage { - userTimeMillis, sysTimeMillis, err := status.GetCPUTime(ctx) - if err != nil { - log.Ops.Errorf(ctx, "unable to get cpu usage: %v", err) - } return multitenant.ExternalUsage{ - CPUSecs: float64(userTimeMillis+sysTimeMillis) * 1e-3, + CPUSecs: multitenantcpu.GetCPUSeconds(ctx), PGWireEgressBytes: s.pgServer.BytesOut(), } } @@ -710,3 +707,11 @@ func (noopTenantSideCostController) OnExternalIO( ctx context.Context, usage multitenant.ExternalIOUsage, ) { } + +func (noopTenantSideCostController) GetCPUMovingAvg() float64 { + return 0 +} + +func (noopTenantSideCostController) GetCostConfig() *tenantcostmodel.Config { + return nil +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 55b3f032579f..4244378f33b6 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -294,6 +294,7 @@ go_library( "//pkg/kv/kvserver/liveness/livenesspb", "//pkg/kv/kvserver/protectedts", "//pkg/multitenant", + "//pkg/multitenant/multitenantcpu", "//pkg/obs", "//pkg/obsservice/obspb", "//pkg/obsservice/obspb/opentelemetry-proto/common/v1:common", @@ -486,6 +487,7 @@ go_library( "//pkg/util/memzipper", "//pkg/util/metric", "//pkg/util/mon", + "//pkg/util/optional", "//pkg/util/protoutil", "//pkg/util/quotapool", "//pkg/util/randutil", diff --git a/pkg/sql/colexec/columnarizer.go b/pkg/sql/colexec/columnarizer.go index d578edfac314..a9ba10c38eee 100644 --- a/pkg/sql/colexec/columnarizer.go +++ b/pkg/sql/colexec/columnarizer.go @@ -207,6 +207,9 @@ func (c *Columnarizer) GetStats() *execinfrapb.ComponentStats { return &execinfrapb.ComponentStats{Component: componentID} } s := c.ExecStatsForTrace() + if s == nil { + return &execinfrapb.ComponentStats{Component: componentID} + } s.Component = componentID return s } diff --git a/pkg/sql/colflow/colrpc/outbox.go b/pkg/sql/colflow/colrpc/outbox.go index d8dab29483d5..b3b138a10a76 100644 --- a/pkg/sql/colflow/colrpc/outbox.go +++ b/pkg/sql/colflow/colrpc/outbox.go @@ -72,7 +72,7 @@ type Outbox struct { // operators that are in the same tree as this Outbox. The stats will be // added into the span as Structured payload and returned to the gateway as // execinfrapb.ProducerMetadata. - getStats func() []*execinfrapb.ComponentStats + getStats func(context.Context) []*execinfrapb.ComponentStats // A copy of Run's caller ctx, with no StreamID tag. // Used to pass a clean context to the input.Next. @@ -86,7 +86,7 @@ func NewOutbox( unlimitedAllocator *colmem.Allocator, input colexecargs.OpWithMetaInfo, typs []*types.T, - getStats func() []*execinfrapb.ComponentStats, + getStats func(context.Context) []*execinfrapb.ComponentStats, ) (*Outbox, error) { c, err := colserde.NewArrowBatchConverter(typs) if err != nil { @@ -342,7 +342,7 @@ func (o *Outbox) sendMetadata(ctx context.Context, stream flowStreamClient, errT // Retrieving stats and draining the metadata is only safe if the input // to the outbox was properly initialized. if o.span != nil && o.getStats != nil { - for _, s := range o.getStats() { + for _, s := range o.getStats(ctx) { o.span.RecordStructured(s) } } diff --git a/pkg/sql/colflow/stats.go b/pkg/sql/colflow/stats.go index ee594604173a..4336dff83997 100644 --- a/pkg/sql/colflow/stats.go +++ b/pkg/sql/colflow/stats.go @@ -249,6 +249,7 @@ func (vsc *vectorizedStatsCollectorImpl) GetStats() *execinfrapb.ComponentStats s.KV.ContentionTime.Set(vsc.kvReader.GetCumulativeContentionTime()) scanStats := vsc.kvReader.GetScanStats() execstats.PopulateKVMVCCStats(&s.KV, &scanStats) + s.Exec.ConsumedRU.Set(scanStats.ConsumedRU) } else { s.Exec.ExecTime.Set(time) } diff --git a/pkg/sql/colflow/vectorized_flow.go b/pkg/sql/colflow/vectorized_flow.go index 295889f50a78..8a9806cc3dde 100644 --- a/pkg/sql/colflow/vectorized_flow.go +++ b/pkg/sql/colflow/vectorized_flow.go @@ -471,11 +471,11 @@ func (s *vectorizedFlowCreator) makeGetStatsFnForOutbox( flowCtx *execinfra.FlowCtx, statsCollectors []colexecop.VectorizedStatsCollector, originSQLInstanceID base.SQLInstanceID, -) func() []*execinfrapb.ComponentStats { +) func(context.Context) []*execinfrapb.ComponentStats { if !s.recordingStats { return nil } - return func() []*execinfrapb.ComponentStats { + return func(ctx context.Context) []*execinfrapb.ComponentStats { lastOutboxOnRemoteNode := atomic.AddInt32(&s.numOutboxesDrained, 1) == atomic.LoadInt32(&s.numOutboxes) && !s.isGatewayNode numResults := len(statsCollectors) if lastOutboxOnRemoteNode { @@ -494,6 +494,7 @@ func (s *vectorizedFlowCreator) makeGetStatsFnForOutbox( FlowStats: execinfrapb.FlowStats{ MaxMemUsage: optional.MakeUint(uint64(flowCtx.EvalCtx.Mon.MaximumBytes())), MaxDiskUsage: optional.MakeUint(uint64(flowCtx.DiskMonitor.MaximumBytes())), + ConsumedRU: optional.MakeUint(uint64(flowCtx.TenantCPUMonitor.EndCollection(ctx))), }, }) } @@ -537,7 +538,7 @@ type remoteComponentCreator interface { allocator *colmem.Allocator, input colexecargs.OpWithMetaInfo, typs []*types.T, - getStats func() []*execinfrapb.ComponentStats, + getStats func(context.Context) []*execinfrapb.ComponentStats, ) (*colrpc.Outbox, error) newInbox( allocator *colmem.Allocator, @@ -554,7 +555,7 @@ func (vectorizedRemoteComponentCreator) newOutbox( allocator *colmem.Allocator, input colexecargs.OpWithMetaInfo, typs []*types.T, - getStats func() []*execinfrapb.ComponentStats, + getStats func(context.Context) []*execinfrapb.ComponentStats, ) (*colrpc.Outbox, error) { return colrpc.NewOutbox(allocator, input, typs, getStats) } @@ -737,7 +738,7 @@ func (s *vectorizedFlowCreator) setupRemoteOutputStream( outputTyps []*types.T, stream *execinfrapb.StreamEndpointSpec, factory coldata.ColumnFactory, - getStats func() []*execinfrapb.ComponentStats, + getStats func(context.Context) []*execinfrapb.ComponentStats, ) (execopnode.OpNode, error) { outbox, err := s.remoteComponentCreator.newOutbox( colmem.NewAllocator(ctx, s.monitorRegistry.NewStreamingMemAccount(flowCtx), factory), diff --git a/pkg/sql/colflow/vectorized_flow_test.go b/pkg/sql/colflow/vectorized_flow_test.go index 85d710aee968..58277df20376 100644 --- a/pkg/sql/colflow/vectorized_flow_test.go +++ b/pkg/sql/colflow/vectorized_flow_test.go @@ -47,7 +47,7 @@ func (c callbackRemoteComponentCreator) newOutbox( allocator *colmem.Allocator, input colexecargs.OpWithMetaInfo, typs []*types.T, - _ func() []*execinfrapb.ComponentStats, + _ func(context.Context) []*execinfrapb.ComponentStats, ) (*colrpc.Outbox, error) { return c.newOutboxFn(allocator, input, typs) } diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 00e863997009..8155664b03bf 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -25,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/multitenant" + "github.com/cockroachdb/cockroach/pkg/multitenant/multitenantcpu" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/server/serverpb" @@ -1433,6 +1434,10 @@ type connExecutor struct { // transactions. statsCollector sqlstats.StatsCollector + // cpuStatsCollector is used to estimate RU consumption due to CPU usage for + // tenants. + cpuStatsCollector multitenantcpu.CPUUsageHelper + // applicationName is the same as sessionData.ApplicationName. It's copied // here as an atomic so that it can be read concurrently by serialize(). applicationName atomic.Value diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 0a40a7964608..1617b00e8ec4 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/multitenant/multitenantcpu" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" @@ -1037,6 +1038,11 @@ func (ex *connExecutor) dispatchToExecutionEngine( ex.sessionTracing.TracePlanStart(ctx, stmt.AST.StatementTag()) ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.PlannerStartLogicalPlan, timeutil.Now()) + if server := ex.server.cfg.DistSQLSrv; server != nil { + // Begin measuring CPU usage for tenants. This is a no-op for non-tenants. + ex.cpuStatsCollector.StartCollection(ctx, server.TenantCostController) + } + // If adminAuditLogging is enabled, we want to check for HasAdminRole // before the deferred maybeLogStatement. // We must check prior to execution in the case the txn is aborted due to @@ -1200,7 +1206,7 @@ func (ex *connExecutor) dispatchToExecutionEngine( ex.extraTxnState.bytesRead += stats.bytesRead ex.extraTxnState.rowsWritten += stats.rowsWritten - populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg) + populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector) // The transaction (from planner.txn) may already have been committed at this point, // due to one-phase commit optimization or an error. Since we use that transaction @@ -1235,7 +1241,13 @@ func (ex *connExecutor) dispatchToExecutionEngine( // Query-level execution statistics are collected using the statement's trace // and the plan's flow metadata. It also populates the regions field and // annotates the explainPlan field of the instrumentationHelper. -func populateQueryLevelStatsAndRegions(ctx context.Context, p *planner, cfg *ExecutorConfig) { +func populateQueryLevelStatsAndRegions( + ctx context.Context, + p *planner, + cfg *ExecutorConfig, + topLevelStats *topLevelQueryStats, + cpuStats *multitenantcpu.CPUUsageHelper, +) { ih := &p.instrumentation if _, ok := ih.Tracing(); !ok { return @@ -1257,6 +1269,18 @@ func populateQueryLevelStatsAndRegions(ctx context.Context, p *planner, cfg *Exe panic(fmt.Sprintf(msg, ih.fingerprint, err)) } log.VInfof(ctx, 1, msg, ih.fingerprint, err) + } else { + // If this query is being run by a tenant, record the RUs consumed by CPU + // usage and network egress to the client. + if cfg.DistSQLSrv != nil { + if costController := cfg.DistSQLSrv.TenantCostController; costController != nil { + if costCfg := costController.GetCostConfig(); costCfg != nil { + networkEgressRUEstimate := costCfg.PGWireEgressCost(topLevelStats.networkEgressEstimate) + ih.queryLevelStatsWithErr.Stats.RUEstimate += int64(networkEgressRUEstimate) + ih.queryLevelStatsWithErr.Stats.RUEstimate += int64(cpuStats.EndCollection(ctx)) + } + } + } } if ih.traceMetadata != nil && ih.explainPlan != nil { ih.regions = ih.traceMetadata.annotateExplain( @@ -1474,6 +1498,9 @@ type topLevelQueryStats struct { rowsRead int64 // rowsWritten is the number of rows written. rowsWritten int64 + // networkEgressEstimate is an estimate for the number of bytes sent to the + // client. It is used for estimating the number of RUs consumed by a query. + networkEgressEstimate int64 } // execWithDistSQLEngine converts a plan to a distributed SQL physical plan and diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 0a1ae0f77364..2d8b902603e8 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -631,6 +631,13 @@ func (dsp *DistSQLPlanner) Run( recv.outputTypes = plan.GetResultTypes() recv.contendedQueryMetric = dsp.distSQLSrv.Metrics.ContendedQueriesCount + if dsp.distSQLSrv.TenantCostController != nil && planCtx.planner != nil { + if instrumentation := planCtx.planner.curPlan.instrumentation; instrumentation != nil { + // Only collect the network egress estimate for a tenant that is running + // EXPLAIN ANALYZE, since the overhead is non-negligible. + recv.isTenantExplainAnalyze = instrumentation.outputMode != unmodifiedOutput + } + } if len(flows) == 1 { // We ended up planning everything locally, regardless of whether we @@ -793,6 +800,13 @@ type DistSQLReceiver struct { stats *topLevelQueryStats + // isTenantExplainAnalyze is used to indicate that network egress should be + // collected in order to estimate RU consumption for a tenant that is running + // a query with EXPLAIN ANALYZE. + isTenantExplainAnalyze bool + + egressCounter TenantNetworkEgressCounter + expectedRowsRead int64 progressAtomic *uint64 @@ -832,6 +846,21 @@ type MetadataResultWriter interface { AddMeta(ctx context.Context, meta *execinfrapb.ProducerMetadata) } +// TenantNetworkEgressCounter is used by tenants running EXPLAIN ANALYZE to +// measure the number of bytes that would be sent over the network if the +// query result was returned to the client. Its implementation lives in the +// pgwire package, in conn.go. +type TenantNetworkEgressCounter interface { + // GetRowNetworkEgress estimates network egress for a row. + GetRowNetworkEgress(ctx context.Context, row tree.Datums, typs []*types.T) int64 + // GetBatchNetworkEgress estimates network egress for a batch. + GetBatchNetworkEgress(ctx context.Context, batch coldata.Batch) int64 +} + +// NewTenantNetworkEgressCounter is used to create a tenantNetworkEgressCounter. +// It hooks into pgwire code. +var NewTenantNetworkEgressCounter func() TenantNetworkEgressCounter + // MetadataCallbackWriter wraps a rowResultWriter to stream metadata in a // DistSQL flow. It executes a given callback when metadata is added. type MetadataCallbackWriter struct { @@ -1225,6 +1254,35 @@ func (r *DistSQLReceiver) Push( return r.status } + ensureDecodedRow := func() error { + if r.row == nil { + r.row = make(tree.Datums, len(row)) + } + for i, encDatum := range row { + err := encDatum.EnsureDecoded(r.outputTypes[i], &r.alloc) + if err != nil { + return err + } + r.row[i] = encDatum.Datum + } + return nil + } + + if r.isTenantExplainAnalyze { + if err := ensureDecodedRow(); err != nil { + r.SetError(err) + return r.status + } + if len(r.row) != len(r.outputTypes) { + r.SetError(errors.Errorf("expected number of columns and output types to be the same")) + return r.status + } + if r.egressCounter == nil { + r.egressCounter = NewTenantNetworkEgressCounter() + } + r.stats.networkEgressEstimate += r.egressCounter.GetRowNetworkEgress(r.ctx, r.row, r.outputTypes) + } + if r.discardRows { // Discard rows. return r.status @@ -1237,16 +1295,9 @@ func (r *DistSQLReceiver) Push( log.VEvent(r.ctx, 2, `a row is pushed in "exists" mode, so transition to draining`) r.status = execinfra.DrainRequested } else { - if r.row == nil { - r.row = make(tree.Datums, len(row)) - } - for i, encDatum := range row { - err := encDatum.EnsureDecoded(r.outputTypes[i], &r.alloc) - if err != nil { - r.SetError(err) - return r.status - } - r.row[i] = encDatum.Datum + if err := ensureDecodedRow(); err != nil { + r.SetError(err) + return r.status } } r.tracing.TraceExecRowsResult(r.ctx, r.row) @@ -1283,6 +1334,13 @@ func (r *DistSQLReceiver) PushBatch( return r.status } + if r.isTenantExplainAnalyze { + if r.egressCounter == nil { + r.egressCounter = NewTenantNetworkEgressCounter() + } + r.stats.networkEgressEstimate += r.egressCounter.GetBatchNetworkEgress(r.ctx, batch) + } + if r.discardRows { // Discard rows. return r.status diff --git a/pkg/sql/execinfra/BUILD.bazel b/pkg/sql/execinfra/BUILD.bazel index eebff2137b6c..0a2bcd23a134 100644 --- a/pkg/sql/execinfra/BUILD.bazel +++ b/pkg/sql/execinfra/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "//pkg/kv/kvserver/kvserverbase", "//pkg/kv/kvserver/protectedts", "//pkg/multitenant", + "//pkg/multitenant/multitenantcpu", "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", diff --git a/pkg/sql/execinfra/flow_context.go b/pkg/sql/execinfra/flow_context.go index cdc998393c6f..8733d8353c02 100644 --- a/pkg/sql/execinfra/flow_context.go +++ b/pkg/sql/execinfra/flow_context.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/multitenant/multitenantcpu" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" @@ -95,6 +96,11 @@ type FlowCtx struct { // DiskMonitor is this flow's disk monitor. All disk usage for this flow must // be registered through this monitor. DiskMonitor *mon.BytesMonitor + + // TenantCPUMonitor is used to estimate a query's CPU usage for tenants + // running EXPLAIN ANALYZE. Currently, it is only used by remote flows. + // The gateway flow is handled by the connExecutor. + TenantCPUMonitor multitenantcpu.CPUUsageHelper } // NewEvalCtx returns a modifiable copy of the FlowCtx's EvalContext. diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index f4f04f5e5e96..2a8b2b51b1d4 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -192,6 +192,9 @@ type ServerConfig struct { // external services (such as external storage) ExternalIORecorder multitenant.TenantSideExternalIORecorder + // TenantCostController is used to measure and record RU consumption. + TenantCostController multitenant.TenantSideCostController + // RangeStatsFetcher is used to fetch range stats for keys. RangeStatsFetcher eval.RangeStatsFetcher diff --git a/pkg/sql/execinfrapb/component_stats.go b/pkg/sql/execinfrapb/component_stats.go index 9277127f562b..c4270b02bc54 100644 --- a/pkg/sql/execinfrapb/component_stats.go +++ b/pkg/sql/execinfrapb/component_stats.go @@ -266,6 +266,9 @@ func (s *ComponentStats) Union(other *ComponentStats) *ComponentStats { if !result.Exec.MaxAllocatedDisk.HasValue() { result.Exec.MaxAllocatedDisk = other.Exec.MaxAllocatedDisk } + if !result.Exec.ConsumedRU.HasValue() { + result.Exec.ConsumedRU = other.Exec.ConsumedRU + } // Output stats. if !result.Output.NumBatches.HasValue() { @@ -282,6 +285,9 @@ func (s *ComponentStats) Union(other *ComponentStats) *ComponentStats { if !result.FlowStats.MaxDiskUsage.HasValue() { result.FlowStats.MaxDiskUsage = other.FlowStats.MaxDiskUsage } + if !result.FlowStats.ConsumedRU.HasValue() { + result.FlowStats.ConsumedRU = other.FlowStats.ConsumedRU + } return &result } @@ -355,6 +361,7 @@ func (s *ComponentStats) MakeDeterministic() { timeVal(&s.Exec.ExecTime) resetUint(&s.Exec.MaxAllocatedMem) resetUint(&s.Exec.MaxAllocatedDisk) + resetUint(&s.Exec.ConsumedRU) // Output. resetUint(&s.Output.NumBatches) diff --git a/pkg/sql/execinfrapb/component_stats.proto b/pkg/sql/execinfrapb/component_stats.proto index 014a183bb645..5ef05891d9c5 100644 --- a/pkg/sql/execinfrapb/component_stats.proto +++ b/pkg/sql/execinfrapb/component_stats.proto @@ -144,9 +144,10 @@ message ExecStats { optional util.optional.Duration exec_time = 1 [(gogoproto.nullable) = false]; // Maximum memory allocated by the component. optional util.optional.Uint max_allocated_mem = 2 [(gogoproto.nullable) = false]; - // Maximum scratch disk allocated by the component. optional util.optional.Uint max_allocated_disk = 3 [(gogoproto.nullable) = false]; + // Amount of RUs consumed while executing the component. + optional util.optional.Uint consumed_r_u = 4 [(gogoproto.nullable) = false]; } // OutputStats contains statistics about the output (results) of a component. @@ -162,4 +163,7 @@ message OutputStats { message FlowStats { optional util.optional.Uint max_mem_usage = 1 [(gogoproto.nullable) = false]; optional util.optional.Uint max_disk_usage = 2 [(gogoproto.nullable) = false]; + // Amount of RUs consumed due to CPU usage while executing the flow. Currently + // only used for remote flows. + optional util.optional.Uint consumed_r_u = 3 [(gogoproto.nullable) = false]; } diff --git a/pkg/sql/execstats/stats.go b/pkg/sql/execstats/stats.go index 94d2856c8ef8..653e3fdf2483 100644 --- a/pkg/sql/execstats/stats.go +++ b/pkg/sql/execstats/stats.go @@ -72,6 +72,9 @@ type ScanStats struct { // NumInternalSeeks is the number of times that MVCC seek was invoked // internally, including to step over internal, uncompacted Pebble versions. NumInternalSeeks uint64 + // ConsumedRU is the number of RUs that were consumed during the course of a + // scan. + ConsumedRU uint64 } // PopulateKVMVCCStats adds data from the input ScanStats to the input KVStats. @@ -85,25 +88,29 @@ func PopulateKVMVCCStats(kvStats *execinfrapb.KVStats, ss *ScanStats) { // GetScanStats is a helper function to calculate scan stats from the given // recording or, if the recording is nil, from the tracing span from the // context. -func GetScanStats(ctx context.Context, recording tracingpb.Recording) (ss ScanStats) { +func GetScanStats(ctx context.Context, recording tracingpb.Recording) (scanStats ScanStats) { if recording == nil { recording = tracing.SpanFromContext(ctx).GetConfiguredRecording() } - var ev roachpb.ScanStats + var ss roachpb.ScanStats + var tc roachpb.TenantConsumption for i := range recording { recording[i].Structured(func(any *pbtypes.Any, _ time.Time) { - if !pbtypes.Is(any, &ev) { - return - } - if err := pbtypes.UnmarshalAny(any, &ev); err != nil { - return + if pbtypes.Is(any, &ss) { + if err := pbtypes.UnmarshalAny(any, &ss); err != nil { + return + } + scanStats.NumInterfaceSteps += ss.NumInterfaceSteps + scanStats.NumInternalSteps += ss.NumInternalSteps + scanStats.NumInterfaceSeeks += ss.NumInterfaceSeeks + scanStats.NumInternalSeeks += ss.NumInternalSeeks + } else if pbtypes.Is(any, &tc) { + if err := pbtypes.UnmarshalAny(any, &tc); err != nil { + return + } + scanStats.ConsumedRU += uint64(tc.RU) } - - ss.NumInterfaceSteps += ev.NumInterfaceSteps - ss.NumInternalSteps += ev.NumInternalSteps - ss.NumInterfaceSeeks += ev.NumInterfaceSeeks - ss.NumInternalSeeks += ev.NumInternalSeeks }) } - return ss + return scanStats } diff --git a/pkg/sql/execstats/traceanalyzer.go b/pkg/sql/execstats/traceanalyzer.go index ca0253d2bf95..585ff4abca8c 100644 --- a/pkg/sql/execstats/traceanalyzer.go +++ b/pkg/sql/execstats/traceanalyzer.go @@ -108,6 +108,7 @@ type NodeLevelStats struct { KVTimeGroupedByNode map[base.SQLInstanceID]time.Duration NetworkMessagesGroupedByNode map[base.SQLInstanceID]int64 ContentionTimeGroupedByNode map[base.SQLInstanceID]time.Duration + RUEstimateGroupedByNode map[base.SQLInstanceID]int64 } // QueryLevelStats returns all the query level stats that correspond to the @@ -123,6 +124,7 @@ type QueryLevelStats struct { KVTime time.Duration NetworkMessages int64 ContentionTime time.Duration + RUEstimate int64 } // QueryLevelStatsWithErr is the same as QueryLevelStats, but also tracks @@ -156,6 +158,7 @@ func (s *QueryLevelStats) Accumulate(other QueryLevelStats) { s.KVTime += other.KVTime s.NetworkMessages += other.NetworkMessages s.ContentionTime += other.ContentionTime + s.RUEstimate += other.RUEstimate } // TraceAnalyzer is a struct that helps calculate top-level statistics from a @@ -231,6 +234,7 @@ func (a *TraceAnalyzer) ProcessStats() error { KVTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration), NetworkMessagesGroupedByNode: make(map[base.SQLInstanceID]int64), ContentionTimeGroupedByNode: make(map[base.SQLInstanceID]time.Duration), + RUEstimateGroupedByNode: make(map[base.SQLInstanceID]int64), } var errs error @@ -245,6 +249,7 @@ func (a *TraceAnalyzer) ProcessStats() error { a.nodeLevelStats.KVBatchRequestsIssuedGroupedByNode[instanceID] += int64(stats.KV.BatchRequestsIssued.Value()) a.nodeLevelStats.KVTimeGroupedByNode[instanceID] += stats.KV.KVTime.Value() a.nodeLevelStats.ContentionTimeGroupedByNode[instanceID] += stats.KV.ContentionTime.Value() + a.nodeLevelStats.RUEstimateGroupedByNode[instanceID] += int64(stats.Exec.ConsumedRU.Value()) } // Process streamStats. @@ -282,6 +287,9 @@ func (a *TraceAnalyzer) ProcessStats() error { a.nodeLevelStats.MaxDiskUsageGroupedByNode[originInstanceID] = diskUsage } } + if stats.stats.FlowStats.ConsumedRU.HasValue() { + a.nodeLevelStats.RUEstimateGroupedByNode[originInstanceID] += int64(stats.stats.FlowStats.ConsumedRU.Value()) + } numMessages, err := getNumNetworkMessagesFromComponentsStats(stats.stats) if err != nil { @@ -307,7 +315,9 @@ func (a *TraceAnalyzer) ProcessStats() error { if diskUsage := int64(v.FlowStats.MaxDiskUsage.Value()); diskUsage > a.nodeLevelStats.MaxDiskUsageGroupedByNode[instanceID] { a.nodeLevelStats.MaxDiskUsageGroupedByNode[instanceID] = diskUsage } - + } + if v.FlowStats.ConsumedRU.HasValue() { + a.nodeLevelStats.RUEstimateGroupedByNode[instanceID] += int64(v.FlowStats.ConsumedRU.Value()) } } } @@ -354,6 +364,11 @@ func (a *TraceAnalyzer) ProcessStats() error { for _, contentionTime := range a.nodeLevelStats.ContentionTimeGroupedByNode { a.queryLevelStats.ContentionTime += contentionTime } + + for _, estimatedRU := range a.nodeLevelStats.RUEstimateGroupedByNode { + a.queryLevelStats.RUEstimate += estimatedRU + } + return errs } diff --git a/pkg/sql/execstats/traceanalyzer_test.go b/pkg/sql/execstats/traceanalyzer_test.go index bf2503729fdc..870f00614cdf 100644 --- a/pkg/sql/execstats/traceanalyzer_test.go +++ b/pkg/sql/execstats/traceanalyzer_test.go @@ -250,6 +250,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) { NetworkMessages: 6, ContentionTime: 7 * time.Second, MaxDiskUsage: 8, + RUEstimate: 9, } b := execstats.QueryLevelStats{ NetworkBytesSent: 8, @@ -261,6 +262,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) { NetworkMessages: 13, ContentionTime: 14 * time.Second, MaxDiskUsage: 15, + RUEstimate: 16, } expected := execstats.QueryLevelStats{ NetworkBytesSent: 9, @@ -272,6 +274,7 @@ func TestQueryLevelStatsAccumulate(t *testing.T) { NetworkMessages: 19, ContentionTime: 21 * time.Second, MaxDiskUsage: 15, + RUEstimate: 25, } aCopy := a diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index c3b529f13fdc..28636ebc2b05 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -412,6 +412,12 @@ func (f *FlowBase) StartInternal( f.status = flowRunning + if !f.Gateway && f.CollectStats { + // Remote flows begin collecting CPU usage here, and finish when the last + // outbox finishes. Gateway flows are handled by the connExecutor. + f.FlowCtx.TenantCPUMonitor.StartCollection(ctx, f.Cfg.TenantCostController) + } + if log.V(1) { log.Infof(ctx, "registered flow %s", f.ID.Short()) } diff --git a/pkg/sql/flowinfra/outbox.go b/pkg/sql/flowinfra/outbox.go index 4e97bd04d4e0..aaf22edaa6d4 100644 --- a/pkg/sql/flowinfra/outbox.go +++ b/pkg/sql/flowinfra/outbox.go @@ -290,6 +290,7 @@ func (m *Outbox) mainLoop(ctx context.Context) error { // maxMemUsage from streamStats should be removed as well. m.stats.FlowStats.MaxMemUsage.Set(uint64(m.flowCtx.EvalCtx.Mon.MaximumBytes())) m.stats.FlowStats.MaxDiskUsage.Set(uint64(m.flowCtx.DiskMonitor.MaximumBytes())) + m.stats.FlowStats.ConsumedRU.Set(uint64(m.flowCtx.TenantCPUMonitor.EndCollection(ctx))) } span.RecordStructured(&m.stats) if trace := tracing.SpanFromContext(ctx).GetConfiguredRecording(); trace != nil { diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index 3bda13dbbbb3..6468236c026c 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -92,6 +92,9 @@ type instrumentationHelper struct { // statement. collectExecStats bool + // isTenant is set when the query is being executed on behalf of a tenant. + isTenant bool + // discardRows is set if we want to discard any results rather than sending // them back to the client. Used for testing/benchmarking. Note that the // resulting schema or the plan are not affected. @@ -247,6 +250,7 @@ func (ih *instrumentationHelper) Setup( ih.implicitTxn = implicitTxn ih.codec = cfg.Codec ih.origCtx = ctx + ih.isTenant = cfg.DistSQLSrv != nil && cfg.DistSQLSrv.TenantCostController != nil switch ih.outputMode { case explainAnalyzeDebugOutput: @@ -530,6 +534,13 @@ func (ih *instrumentationHelper) emitExplainAnalyzePlanToOutputBuilder( ob.AddMaxMemUsage(queryStats.MaxMemUsage) ob.AddNetworkStats(queryStats.NetworkMessages, queryStats.NetworkBytesSent) ob.AddMaxDiskUsage(queryStats.MaxDiskUsage) + if ih.isTenant && ih.outputMode != unmodifiedOutput && ih.vectorized { + // Only output RU estimate if this is a tenant running EXPLAIN ANALYZE. + // Additionally, RUs aren't correctly propagated in all cases for plans + // that aren't vectorized - for example, EXPORT statements. For now, + // only output RU estimates for vectorized plans. + ob.AddRUEstimate(queryStats.RUEstimate) + } } if len(ih.regions) > 0 { diff --git a/pkg/sql/opt/exec/explain/output.go b/pkg/sql/opt/exec/explain/output.go index 625aebec7e99..a408fce7578b 100644 --- a/pkg/sql/opt/exec/explain/output.go +++ b/pkg/sql/opt/exec/explain/output.go @@ -361,6 +361,16 @@ func (ob *OutputBuilder) AddMaxDiskUsage(bytes int64) { } } +// AddRUEstimate adds a top-level field for the estimated number of RUs consumed +// by the query. +func (ob *OutputBuilder) AddRUEstimate(ru int64) { + ob.AddRedactableTopLevelField( + RedactVolatile, + "estimated RUs consumed", + string(humanizeutil.Count(uint64(ru))), + ) +} + // AddRegionsStats adds a top-level field for regions executed on statistics. func (ob *OutputBuilder) AddRegionsStats(regions []string) { ob.AddRedactableTopLevelField( diff --git a/pkg/sql/pgwire/conn.go b/pkg/sql/pgwire/conn.go index 5ab40faa3634..79d2d63f5b4b 100644 --- a/pkg/sql/pgwire/conn.go +++ b/pkg/sql/pgwire/conn.go @@ -1406,6 +1406,77 @@ func (c *conn) bufferBatch(ctx context.Context, batch coldata.Batch, r *commandR return nil } +// tenantEgressCounter implements the sql.TenantNetworkEgressCounter interface. +type tenantEgressCounter struct { + buf writeBuffer + vecs coldata.TypedVecs +} + +var _ sql.TenantNetworkEgressCounter = &tenantEgressCounter{} + +func newTenantEgressCounter() sql.TenantNetworkEgressCounter { + counter := &tenantEgressCounter{} + counter.buf.init(nil /* byteCount */) + return counter +} + +func init() { + sql.NewTenantNetworkEgressCounter = newTenantEgressCounter +} + +// GetRowNetworkEgress returns an estimate of the number of bytes that would be +// sent over the network if the given row was written to the client. It does this +// by encoding and buffering the row in text format, then measuring the buffer +// size before clearing it. +func (c *tenantEgressCounter) GetRowNetworkEgress( + ctx context.Context, row tree.Datums, typs []*types.T, +) (egress int64) { + // Each row uses 5 bytes for the message type and length. + egress = 5 + + var conv sessiondatapb.DataConversionConfig + for i := range row { + // Use the default values for the DataConversionConfig and location. + // We use the writeText variant here because this function will only ever + // be called in the context of EXPLAIN ANALYZE, which obfuscates the format + // the client will use when actually executing the query. This should still + // provide an accurate estimate, since most or all of the common data types + // take up the same amount of space between the text and binary formats. + c.buf.writeTextDatum(ctx, row[i], conv, nil /* sessionLoc */, typs[i]) + egress += int64(c.buf.Len()) + c.buf.reset() + } + return egress +} + +// GetBatchNetworkEgress returns an estimate of the number of bytes that would +// be sent over the network if the given batch was written to the client. +func (c *tenantEgressCounter) GetBatchNetworkEgress( + ctx context.Context, batch coldata.Batch, +) (egress int64) { + // Each row uses 5 bytes for the message type and length. + egress = 5 * int64(batch.Length()) + + var conv sessiondatapb.DataConversionConfig + c.vecs.SetBatch(batch) + sel := batch.Selection() + for vecIdx := range c.vecs.Vecs { + for i := 0; i < batch.Length(); i++ { + rowIdx := i + if sel != nil { + rowIdx = sel[i] + } + // Use the default values for the DataConversionConfig and location. + // See the comment in getRowNetworkEgress for why the writeText variant + // is used here instead of writeBinary. + c.buf.writeTextColumnarElement(ctx, &c.vecs, vecIdx, rowIdx, conv, nil /* sessionLoc */) + egress += int64(c.buf.Len()) + c.buf.reset() + } + } + return egress +} + func (c *conn) bufferReadyForQuery(txnStatus byte) { c.msgBuilder.initMsg(pgwirebase.ServerMsgReady) c.msgBuilder.writeByte(txnStatus) diff --git a/pkg/sql/plan_node_to_row_source.go b/pkg/sql/plan_node_to_row_source.go index 2b332ffe0432..18401059a5b8 100644 --- a/pkg/sql/plan_node_to_row_source.go +++ b/pkg/sql/plan_node_to_row_source.go @@ -18,10 +18,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execopnode" "github.com/cockroachdb/cockroach/pkg/sql/execinfra/execreleasable" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/execstats" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/optional" "github.com/cockroachdb/errors" ) @@ -99,7 +101,7 @@ func (p *planNodeToRowSource) MustBeStreaming() bool { func (p *planNodeToRowSource) InitWithOutput( flowCtx *execinfra.FlowCtx, post *execinfrapb.PostProcessSpec, output execinfra.RowReceiver, ) error { - return p.InitWithEvalCtx( + if err := p.InitWithEvalCtx( p, post, p.outputTypes, @@ -116,7 +118,15 @@ func (p *planNodeToRowSource) InitWithOutput( // Input to drain is added in SetInput. TrailingMetaCallback: p.trailingMetaCallback, }, - ) + ); err != nil { + return err + } + // We can't use execstats.ShouldCollectStats here because the context isn't + // passed to InitWithOutput. + if flowCtx.CollectStats { + p.ExecStatsForTrace = p.execStatsForTrace + } + return nil } // SetInput implements the LocalProcessor interface. @@ -149,7 +159,11 @@ func (p *planNodeToRowSource) SetInput(ctx context.Context, input execinfra.RowS } func (p *planNodeToRowSource) Start(ctx context.Context) { - ctx = p.StartInternalNoSpan(ctx) + if p.FlowCtx.CollectStats { + ctx = p.StartInternal(ctx, nodeName(p.node)) + } else { + ctx = p.StartInternalNoSpan(ctx) + } p.params.ctx = ctx // This starts all of the nodes below this node. if err := startExec(p.params, p.node); err != nil { @@ -239,6 +253,22 @@ func (p *planNodeToRowSource) trailingMetaCallback() []execinfrapb.ProducerMetad return meta } +// execStatsForTrace implements ProcessorBase.ExecStatsForTrace. +func (p *planNodeToRowSource) execStatsForTrace() *execinfrapb.ComponentStats { + // Propagate RUs from IO requests. + // TODO(drewk): we should consider propagating other stats for planNode + // operators. + scanStats := execstats.GetScanStats(p.Ctx(), p.ExecStatsTrace) + if scanStats.ConsumedRU == 0 { + return nil + } + return &execinfrapb.ComponentStats{ + Exec: execinfrapb.ExecStats{ + ConsumedRU: optional.MakeUint(scanStats.ConsumedRU), + }, + } +} + // Release releases this planNodeToRowSource back to the pool. func (p *planNodeToRowSource) Release() { p.ProcessorBase.Reset() diff --git a/pkg/sql/rowexec/inverted_joiner.go b/pkg/sql/rowexec/inverted_joiner.go index a946d0e8947b..3176e388904d 100644 --- a/pkg/sql/rowexec/inverted_joiner.go +++ b/pkg/sql/rowexec/inverted_joiner.go @@ -778,6 +778,7 @@ func (ij *invertedJoiner) execStatsForTrace() *execinfrapb.ComponentStats { }, Output: ij.OutputHelper.Stats(), } + ret.Exec.ConsumedRU = optional.MakeUint(ij.scanStats.ConsumedRU) execstats.PopulateKVMVCCStats(&ret.KV, &ij.scanStats) return &ret } diff --git a/pkg/sql/rowexec/joinreader.go b/pkg/sql/rowexec/joinreader.go index 883ba873d5c7..9879f9988542 100644 --- a/pkg/sql/rowexec/joinreader.go +++ b/pkg/sql/rowexec/joinreader.go @@ -1193,6 +1193,7 @@ func (jr *joinReader) execStatsForTrace() *execinfrapb.ComponentStats { ret.Exec.MaxAllocatedDisk.Add(jr.streamerInfo.diskMonitor.MaximumBytes()) } } + ret.Exec.ConsumedRU = optional.MakeUint(jr.scanStats.ConsumedRU) execstats.PopulateKVMVCCStats(&ret.KV, &jr.scanStats) return ret } diff --git a/pkg/sql/rowexec/tablereader.go b/pkg/sql/rowexec/tablereader.go index 40745a862d94..c7cb4ebf110c 100644 --- a/pkg/sql/rowexec/tablereader.go +++ b/pkg/sql/rowexec/tablereader.go @@ -317,6 +317,7 @@ func (tr *tableReader) execStatsForTrace() *execinfrapb.ComponentStats { }, Output: tr.OutputHelper.Stats(), } + ret.Exec.ConsumedRU = optional.MakeUint(tr.scanStats.ConsumedRU) execstats.PopulateKVMVCCStats(&ret.KV, &tr.scanStats) return ret } diff --git a/pkg/sql/rowexec/zigzagjoiner.go b/pkg/sql/rowexec/zigzagjoiner.go index 25c9927d10b0..181bd63c0808 100644 --- a/pkg/sql/rowexec/zigzagjoiner.go +++ b/pkg/sql/rowexec/zigzagjoiner.go @@ -859,10 +859,12 @@ func (z *zigzagJoiner) execStatsForTrace() *execinfrapb.ComponentStats { kvStats.TuplesRead.MaybeAdd(fis.NumTuples) kvStats.KVTime.MaybeAdd(fis.WaitTime) } - return &execinfrapb.ComponentStats{ + ret := &execinfrapb.ComponentStats{ KV: kvStats, Output: z.OutputHelper.Stats(), } + ret.Exec.ConsumedRU = optional.MakeUint(z.scanStats.ConsumedRU) + return ret } func (z *zigzagJoiner) getBytesRead() int64 {