Skip to content

Commit

Permalink
sql: include regions information into the sampled query telemetry
Browse files Browse the repository at this point in the history
This commit adds the regions (where SQL processors where planned for the
query) to the sampled query telemetry. This required a couple of minor
changes to derive the regions information stored in the instrumentation
helper earlier (before the logging takes place).

Release justification: low-risk improvement.

Release note (sql change): The structured payloads used for telemetry
logs now include the new `Regions` field which indicates the regions of
the nodes where SQL processing ran for the query.
  • Loading branch information
yuzefovich committed Aug 26, 2022
1 parent 5ca7023 commit 73e9ff0
Show file tree
Hide file tree
Showing 15 changed files with 243 additions and 65 deletions.
1 change: 1 addition & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2604,6 +2604,7 @@ contains common SQL event/execution details.
| `ApplyJoinCount` | The number of apply joins in the query plan. | no |
| `ZigZagJoinCount` | The number of zig zag joins in the query plan. | no |
| `ContentionNanos` | The duration of time in nanoseconds that the query experienced contention. | no |
| `Regions` | The regions of the nodes where SQL processors ran. | no |


#### Common fields
Expand Down
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1920,6 +1920,7 @@ GO_TARGETS = [
"//pkg/util/log/logcrash:logcrash_test",
"//pkg/util/log/logflags:logflags",
"//pkg/util/log/logpb:logpb",
"//pkg/util/log/logtestutils:logtestutils",
"//pkg/util/log/severity:severity",
"//pkg/util/log/testshout:testshout_test",
"//pkg/util/log:log",
Expand Down Expand Up @@ -2873,6 +2874,7 @@ GET_X_DATA_TARGETS = [
"//pkg/util/log/logcrash:get_x_data",
"//pkg/util/log/logflags:get_x_data",
"//pkg/util/log/logpb:get_x_data",
"//pkg/util/log/logtestutils:get_x_data",
"//pkg/util/log/severity:get_x_data",
"//pkg/util/log/testshout:get_x_data",
"//pkg/util/memzipper:get_x_data",
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/telemetryccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ go_test(
name = "telemetryccl_test",
srcs = [
"main_test.go",
"telemetry_logging_test.go",
"telemetry_test.go",
],
data = glob(["testdata/**"]),
shard_count = 16,
deps = [
"//pkg/base",
"//pkg/ccl",
"//pkg/ccl/multiregionccl/multiregionccltestutils",
"//pkg/ccl/utilccl",
"//pkg/roachpb",
"//pkg/security/securityassets",
Expand All @@ -20,10 +22,13 @@ go_test(
"//pkg/sql/sqltestutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/logtestutils",
"//pkg/util/randutil",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand Down
126 changes: 126 additions & 0 deletions pkg/ccl/telemetryccl/telemetry_logging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package telemetryccl

import (
"math"
"regexp"
"strings"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/logtestutils"
"github.com/cockroachdb/errors"
)

func TestTelemetryLogRegions(t *testing.T) {
defer leaktest.AfterTest(t)()
sc := log.ScopeWithoutShowLogs(t)
defer sc.Close(t)

cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t)
defer cleanup()

_, db, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(
t, 3 /* numServers */, base.TestingKnobs{},
multiregionccltestutils.WithReplicationMode(base.ReplicationManual),
)
defer cleanup()
sqlDB := sqlutils.MakeSQLRunner(db)

// Create three tables, with each table touching one, two, and three
// regions, respectively.
sqlDB.Exec(t, `CREATE TABLE one_region (k INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO one_region SELECT generate_series(1, 1)`)
sqlDB.Exec(t, `ALTER TABLE one_region SPLIT AT SELECT generate_series(1, 1)`)
sqlDB.Exec(t, "ALTER TABLE one_region EXPERIMENTAL_RELOCATE VALUES (ARRAY[1], 1)")
sqlDB.Exec(t, `CREATE TABLE two_regions (k INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO two_regions SELECT generate_series(1, 2)`)
sqlDB.Exec(t, `ALTER TABLE two_regions SPLIT AT SELECT generate_series(1, 2)`)
sqlDB.Exec(t, "ALTER TABLE two_regions EXPERIMENTAL_RELOCATE VALUES (ARRAY[1], 1), (ARRAY[2], 2)")
sqlDB.Exec(t, `CREATE TABLE three_regions (k INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO three_regions SELECT generate_series(1, 3)`)
sqlDB.Exec(t, `ALTER TABLE three_regions SPLIT AT SELECT generate_series(1, 3)`)
sqlDB.Exec(t, "ALTER TABLE three_regions EXPERIMENTAL_RELOCATE VALUES (ARRAY[1], 1), (ARRAY[2], 2), (ARRAY[3], 3)")

// Enable the telemetry logging and increase the sampling frequency so that
// all statements are captured.
sqlDB.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.enabled = true;`)
sqlDB.Exec(t, `SET CLUSTER SETTING sql.telemetry.query_sampling.max_event_frequency = 1000000`)

testData := []struct {
name string
query string
expectedLogStatement string
expectedRegions []string
}{
{
name: "one-region",
query: "SELECT * FROM one_region",
expectedLogStatement: `SELECT * FROM \"\".\"\".one_region`,
expectedRegions: []string{"us-east1"},
},
{
name: "two-regions",
query: "SELECT * FROM two_regions",
expectedLogStatement: `SELECT * FROM \"\".\"\".two_regions`,
expectedRegions: []string{"us-east1", "us-east2"},
},
{
name: "three-regions",
query: "SELECT * FROM three_regions",
expectedLogStatement: `SELECT * FROM \"\".\"\".three_regions`,
expectedRegions: []string{"us-east1", "us-east2", "us-east3"},
},
}

for _, tc := range testData {
sqlDB.Exec(t, tc.query)
}

log.Flush()

entries, err := log.FetchEntriesFromFiles(
0,
math.MaxInt64,
10000,
regexp.MustCompile(`"EventType":"sampled_query"`),
log.WithMarkedSensitiveData,
)

if err != nil {
t.Fatal(err)
}

if len(entries) == 0 {
t.Fatal(errors.Newf("no entries found"))
}

for _, tc := range testData {
var logEntriesCount int
for i := len(entries) - 1; i >= 0; i-- {
e := entries[i]
if strings.Contains(e.Message, tc.expectedLogStatement) {
logEntriesCount++
for _, region := range tc.expectedRegions {
if !strings.Contains(e.Message, region) {
t.Errorf("didn't find region %q in the log entry %s", region, e.Message)
}
}
}
}
if logEntriesCount != 1 {
t.Errorf("expected to find a single entry for %q: %v", tc.name, entries)
}
}
}
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@ go_test(
"//pkg/util/log/eventpb",
"//pkg/util/log/logconfig",
"//pkg/util/log/logpb",
"//pkg/util/log/logtestutils",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
Expand Down
19 changes: 14 additions & 5 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1190,7 +1190,7 @@ func (ex *connExecutor) dispatchToExecutionEngine(
ex.extraTxnState.bytesRead += stats.bytesRead
ex.extraTxnState.rowsWritten += stats.rowsWritten

populateQueryLevelStats(ctx, planner)
populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg)

// The transaction (from planner.txn) may already have been committed at this point,
// due to one-phase commit optimization or an error. Since we use that transaction
Expand Down Expand Up @@ -1220,11 +1220,12 @@ func (ex *connExecutor) dispatchToExecutionEngine(
return err
}

// populateQueryLevelStats collects query-level execution statistics and
// populates it in the instrumentationHelper's queryLevelStatsWithErr field.
// populateQueryLevelStatsAndRegions collects query-level execution statistics
// and populates it in the instrumentationHelper's queryLevelStatsWithErr field.
// Query-level execution statistics are collected using the statement's trace
// and the plan's flow metadata.
func populateQueryLevelStats(ctx context.Context, p *planner) {
// and the plan's flow metadata. It also populates the regions field and
// annotates the explainPlan field of the instrumentationHelper.
func populateQueryLevelStatsAndRegions(ctx context.Context, p *planner, cfg *ExecutorConfig) {
ih := &p.instrumentation
if _, ok := ih.Tracing(); !ok {
return
Expand All @@ -1247,6 +1248,14 @@ func populateQueryLevelStats(ctx context.Context, p *planner) {
}
log.VInfof(ctx, 1, msg, ih.fingerprint, err)
}
if ih.traceMetadata != nil && ih.explainPlan != nil {
ih.regions = ih.traceMetadata.annotateExplain(
ih.explainPlan,
trace,
cfg.TestingKnobs.DeterministicExplain,
p,
)
}
}

type txnRowsWrittenLimitErr struct {
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/exec_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ func (p *planner) maybeLogStatementInternal(
ApplyJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ApplyJoin]),
ZigZagJoinCount: int64(p.curPlan.instrumentation.joinAlgorithmCounts[exec.ZigZagJoin]),
ContentionNanos: contentionNanos,
Regions: p.curPlan.instrumentation.regions,
}
p.logOperationalEventsOnlyExternally(ctx, &sampledQuery)
} else {
Expand Down
19 changes: 9 additions & 10 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,12 @@ func (ih *instrumentationHelper) Setup(
ih.savePlanForStats =
statsCollector.ShouldSaveLogicalPlanDesc(fingerprint, implicitTxn, p.SessionData().Database)

if ih.ShouldBuildExplainPlan() {
// Populate traceMetadata early in case we short-circuit the execution
// before reaching the bottom of this method.
ih.traceMetadata = make(execNodeTraceMetadata)
}

if sp := tracing.SpanFromContext(ctx); sp != nil {
if sp.IsVerbose() && !cfg.TestingKnobs.NoStatsCollectionWithVerboseTracing {
// If verbose tracing was enabled at a higher level, stats
Expand Down Expand Up @@ -299,7 +305,9 @@ func (ih *instrumentationHelper) Setup(
}

ih.collectExecStats = true
ih.traceMetadata = make(execNodeTraceMetadata)
if ih.traceMetadata == nil {
ih.traceMetadata = make(execNodeTraceMetadata)
}
ih.evalCtx = p.EvalContext()
newCtx, ih.sp = tracing.EnsureChildSpan(ctx, cfg.AmbientCtx.Tracer, "traced statement", tracing.WithRecording(tracingpb.RecordingVerbose))
ih.shouldFinishSpan = true
Expand Down Expand Up @@ -336,15 +344,6 @@ func (ih *instrumentationHelper) Finish(
ih.withStatementTrace(trace, stmtRawSQL)
}

if ih.traceMetadata != nil && ih.explainPlan != nil {
ih.regions = ih.traceMetadata.annotateExplain(
ih.explainPlan,
trace,
cfg.TestingKnobs.DeterministicExplain,
p,
)
}

queryLevelStats, ok := ih.GetQueryLevelStats()
// Accumulate txn stats if no error was encountered while collecting
// query-level statistics.
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/scheduledlogging/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ go_test(
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/log/channel",
"//pkg/util/log/logconfig",
"//pkg/util/log/logtestutils",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
24 changes: 2 additions & 22 deletions pkg/sql/scheduledlogging/captured_index_usage_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
"github.com/cockroachdb/cockroach/pkg/util/log/logconfig"
"github.com/cockroachdb/cockroach/pkg/util/log/logtestutils"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -63,31 +62,12 @@ func (s *stubDurations) getOverlapDuration() time.Duration {
return s.overlapDuration
}

func installTelemetryLogFileSink(t *testing.T, sc *log.TestLogScope) func() {
// Enable logging channels.
log.TestingResetActive()
cfg := logconfig.DefaultConfig()
// Make a sink for just the session log.
cfg.Sinks.FileGroups = map[string]*logconfig.FileSinkConfig{
"telemetry": {
Channels: logconfig.SelectChannels(channel.TELEMETRY),
}}
dir := sc.GetDirectory()
require.NoError(t, cfg.Validate(&dir), "expected no errors validating log config")
cleanup, err := log.ApplyConfig(cfg)
if err != nil {
t.Fatal(err)
}

return cleanup
}

func TestCaptureIndexUsageStats(t *testing.T) {
defer leaktest.AfterTest(t)()
sc := log.ScopeWithoutShowLogs(t)
defer sc.Close(t)

cleanup := installTelemetryLogFileSink(t, sc)
cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t)
defer cleanup()

sd := stubDurations{}
Expand Down
30 changes: 4 additions & 26 deletions pkg/sql/telemetry_logging_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/channel"
"github.com/cockroachdb/cockroach/pkg/util/log/logconfig"
"github.com/cockroachdb/cockroach/pkg/util/log/logtestutils"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -69,27 +68,6 @@ func (s *stubQueryMetrics) ContentionNanos() int64 {
return s.contentionNanos
}

func installTelemetryLogFileSink(sc *log.TestLogScope, t *testing.T) func() {
// Enable logging channels.
log.TestingResetActive()
cfg := logconfig.DefaultConfig()
// Make a sink for just the session log.
cfg.Sinks.FileGroups = map[string]*logconfig.FileSinkConfig{
"telemetry": {
Channels: logconfig.SelectChannels(channel.TELEMETRY),
}}
dir := sc.GetDirectory()
if err := cfg.Validate(&dir); err != nil {
t.Fatal(err)
}
cleanup, err := log.ApplyConfig(cfg)
if err != nil {
t.Fatal(err)
}

return cleanup
}

// TestTelemetryLogging verifies that telemetry events are logged to the telemetry log
// and are sampled according to the configured sample rate.
func TestTelemetryLogging(t *testing.T) {
Expand All @@ -98,7 +76,7 @@ func TestTelemetryLogging(t *testing.T) {
sc := log.ScopeWithoutShowLogs(t)
defer sc.Close(t)

cleanup := installTelemetryLogFileSink(sc, t)
cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t)
defer cleanup()

st := stubTime{}
Expand Down Expand Up @@ -493,7 +471,7 @@ func TestNoTelemetryLogOnTroubleshootMode(t *testing.T) {
sc := log.ScopeWithoutShowLogs(t)
defer sc.Close(t)

cleanup := installTelemetryLogFileSink(sc, t)
cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t)
defer cleanup()

st := stubTime{}
Expand Down Expand Up @@ -602,7 +580,7 @@ func TestTelemetryLogJoinTypesAndAlgorithms(t *testing.T) {
sc := log.ScopeWithoutShowLogs(t)
defer sc.Close(t)

cleanup := installTelemetryLogFileSink(sc, t)
cleanup := logtestutils.InstallTelemetryLogFileSink(sc, t)
defer cleanup()

s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
Expand Down
Loading

0 comments on commit 73e9ff0

Please sign in to comment.