Skip to content

Commit

Permalink
eventpb: add schema telemetry event
Browse files Browse the repository at this point in the history
This commit introduces the logic which will be used in a later commit
for logging SQL schema telemetry. In the meantime, this commit applies
it in a test which checks that the schema of a bootstrapped cluster
matches expectations.

Informs cockroachdb#84284.

Release note: None
  • Loading branch information
Marius Posta committed Jul 21, 2022
1 parent a3f3295 commit ce0fbbe
Show file tree
Hide file tree
Showing 15 changed files with 1,644 additions and 13 deletions.
19 changes: 19 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2489,6 +2489,25 @@ contains common SQL event/execution details.
| `FullIndexScan` | Whether the query contains a full secondary index scan of a non-partial index. | no |
| `TxnCounter` | The sequence number of the SQL transaction inside its session. | no |

### `schema`

An event of type `schema` is an event for schema telemetry.


| Field | Description | Sensitive |
|--|--|--|
| `CurrentPage` | | no |
| `NumPages` | | no |
| `Payload` | | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |

## Zone config events

Events in this category pertain to zone configuration changes on
Expand Down
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -1246,6 +1246,7 @@ GO_TARGETS = [
"//pkg/sql/catalog/schemadesc:schemadesc_test",
"//pkg/sql/catalog/schemaexpr:schemaexpr",
"//pkg/sql/catalog/schemaexpr:schemaexpr_test",
"//pkg/sql/catalog/schematelemetry:schematelemetry",
"//pkg/sql/catalog/seqexpr:seqexpr",
"//pkg/sql/catalog/seqexpr:seqexpr_test",
"//pkg/sql/catalog/systemschema:systemschema",
Expand Down Expand Up @@ -2349,6 +2350,7 @@ GET_X_DATA_TARGETS = [
"//pkg/sql/catalog/rewrite:get_x_data",
"//pkg/sql/catalog/schemadesc:get_x_data",
"//pkg/sql/catalog/schemaexpr:get_x_data",
"//pkg/sql/catalog/schematelemetry:get_x_data",
"//pkg/sql/catalog/seqexpr:get_x_data",
"//pkg/sql/catalog/systemschema:get_x_data",
"//pkg/sql/catalog/systemschema_test:get_x_data",
Expand Down
26 changes: 26 additions & 0 deletions pkg/sql/catalog/schematelemetry/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "schematelemetry",
srcs = ["schema_telemetry_event.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/descmetadata",
"//pkg/sql/schemachanger/scdecomp",
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/schemachanger/screl",
"//pkg/util/hlc",
"//pkg/util/log/eventpb",
"@com_github_gogo_protobuf//types",
],
)

get_x_data(name = "get_x_data")
137 changes: 137 additions & 0 deletions pkg/sql/catalog/schematelemetry/schema_telemetry_event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package schematelemetry

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/descmetadata"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scdecomp"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/screl"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
gogotypes "github.com/gogo/protobuf/types"
)

// MaxElementsPerPayload is the upper bound on how many elements can feature in
// the payload of one eventpb.Schema object.
const MaxElementsPerPayload = 50

func buildLogEvents(
ctx context.Context, cfg *sql.ExecutorConfig, aostOffset time.Duration,
) ([]eventpb.EventPayload, error) {
asOf := cfg.Clock.Now().Add(aostOffset.Nanoseconds(), 0)
ess, err := CollectClusterSchemaForTelemetry(ctx, cfg, asOf)
if err != nil {
return nil, err
}
// Bundle the element-status pairs into log events.
events := make([]eventpb.EventPayload, 0, len(ess)/MaxElementsPerPayload+1)
var pl scpb.TelemetryPayload
flush := func() error {
if len(pl.ElementStatuses) == 0 {
return nil
}
event := &eventpb.Schema{}
event.Timestamp = asOf.WallTime
event.CurrentPage = uint32(len(events) + 1)
any, err := gogotypes.MarshalAny(&pl)
if err != nil {
return err
}
event.Payload = any
events = append(events, event)
pl.ElementStatuses = pl.ElementStatuses[:0]
return nil
}

for _, es := range ess {
pl.ElementStatuses = append(pl.ElementStatuses, es)
if len(pl.ElementStatuses) >= MaxElementsPerPayload {
if err := flush(); err != nil {
return nil, err
}
}
}
if err := flush(); err != nil {
return nil, err
}
for i := range events {
events[i].(*eventpb.Schema).NumPages = uint32(len(events))
}
return events, nil
}

// Keep the linter happy.
// TODO(postamar): use this function
var _ = buildLogEvents

// CollectClusterSchemaForTelemetry returns a projection of the cluster's SQL
// schema as of the provided system time, suitably filtered for the purposes of
// schema telemetry.
//
// This function is tested in the systemschema package.
func CollectClusterSchemaForTelemetry(
ctx context.Context, cfg *sql.ExecutorConfig, asOf hlc.Timestamp,
) (ess []scpb.TelemetryPayload_ElementStatus, _ error) {
visitorFn := func(status scpb.Status, element scpb.Element) {
if es := filteredForTelemetry(status, element); es != nil {
ess = append(ess, *es)
}
}
if err := sql.DescsTxn(ctx, cfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
err := txn.SetFixedTimestamp(ctx, asOf)
if err != nil {
return err
}
c, err := col.GetAllDescriptors(ctx, txn)
if err != nil {
return err
}
cc := descmetadata.NewCommentCache(txn, cfg.InternalExecutor)
_ = c.ForEachDescriptorEntry(func(desc catalog.Descriptor) error {
_ = scdecomp.WalkDescriptor(ctx, desc, c.LookupDescriptorEntry, visitorFn, cc)
return nil
})
return nil
}); err != nil {
return nil, err
}
return ess, nil
}

// filteredForTelemetry removes unwanted information from the element
// decomposition for the purposes of schema telemetry.
func filteredForTelemetry(
status scpb.Status, element scpb.Element,
) *scpb.TelemetryPayload_ElementStatus {
// Exclude elements related to system columns.
a, _ := screl.Schema.GetAttribute(screl.ColumnID, element)
if colID, ok := a.(descpb.ColumnID); ok && colinfo.IsColIDSystemColumn(colID) {
return nil
}
// Return the visited element-status pair as a telemetry payload element.
es := &scpb.TelemetryPayload_ElementStatus{}
es.SetValue(element)
// Only set non-public statuses, public status should be an implicit default.
if status != scpb.Status_PUBLIC {
es.Status = status
}
return es
}
4 changes: 4 additions & 0 deletions pkg/sql/catalog/systemschema_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,17 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/schematelemetry",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/util/leaktest",
"//pkg/util/randutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_stretchr_testify//require",
],
)

Expand Down
33 changes: 25 additions & 8 deletions pkg/sql/catalog/systemschema_test/systemschema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@ import (

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/datadriven"
"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/require"
)

func createTestServerParams() base.TestServerArgs {
Expand All @@ -41,31 +45,44 @@ func TestValidateSystemSchemaAfterBootStrap(t *testing.T) {
// New database for each test file.
s, db, _ := serverutils.StartServer(t, createTestServerParams())
defer s.Stopper().Stop(ctx)
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)

datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string {
switch d.Cmd {
case "bootstrap":
case "show_create":
// Create a connection to the database cluster.
sqlRunner := sqlutils.MakeSQLRunner(db)

// Prepare the SQL query.
sql := `USE SYSTEM; SHOW CREATE ALL TABLES;`

// Execute the SQL query.
rows := sqlRunner.QueryStr(t, sql)
rows := sqlRunner.QueryStr(t, d.Input)

// Extract return and return.
// Extract results and return.
var sb strings.Builder
for _, row := range rows {
if len(row) != 1 {
d.Fatalf(t, "`SHOW CREATE ALL TABLES` returns has zero column.")
d.Fatalf(t, "expect 1 column in %q result set, instead found %d", d.Input, len(row))
}
sb.WriteString(row[0])
sb.WriteString("\n")
}
return sb.String()
}

case "schema_telemetry":
// Collect a projection of the bootstrapped cluster's schema.
ess, err := schematelemetry.CollectClusterSchemaForTelemetry(ctx, &execCfg, execCfg.Clock.Now())
require.NoError(t, err)

// Return the results, one element per line.
var sb strings.Builder
jsonEncoder := jsonpb.Marshaler{}
for _, es := range ess {
str, err := jsonEncoder.MarshalToString(&es)
require.NoError(t, err)
sb.WriteString(str)
sb.WriteRune('\n')
}
return sb.String()
}
d.Fatalf(t, "unsupported command: %s", d.Cmd)
return ""
})
Expand Down
Loading

0 comments on commit ce0fbbe

Please sign in to comment.