Skip to content

Commit

Permalink
eventpb: add schema telemetry event
Browse files Browse the repository at this point in the history
This commit introduces the event logic for logging SQL schema telemetry.
This is tested on the schema of a bootstrapped test cluster, the test
checks that the schema events match expectations.

Care is taken to redact all strings present in descriptors which might
unintentionally be leaking PIIs.

Informs cockroachdb#84284.

Release note: None
  • Loading branch information
Marius Posta committed Jul 26, 2022
1 parent 7ff7685 commit 0f207fa
Show file tree
Hide file tree
Showing 13 changed files with 546 additions and 14 deletions.
19 changes: 19 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2489,6 +2489,25 @@ contains common SQL event/execution details.
| `FullIndexScan` | Whether the query contains a full secondary index scan of a non-partial index. | no |
| `TxnCounter` | The sequence number of the SQL transaction inside its session. | no |

### `schema`

An event of type `schema` is an event for schema telemetry.


| Field | Description | Sensitive |
|--|--|--|
| `CurrentPage` | | no |
| `NumPages` | | no |
| `Payload` | | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |

## Zone config events

Events in this category pertain to zone configuration changes on
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/catalog/schematelemetry/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,24 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/nstree",
"//pkg/sql/catalog/schematelemetry/schematelemetrycontroller",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/parser",
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/sem/tree",
"//pkg/sql/sqltelemetry",
"//pkg/sql/sqlutil",
"//pkg/util/hlc",
"//pkg/util/log/eventpb",
"//pkg/util/metric",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//types",
],
)

Expand Down
261 changes: 259 additions & 2 deletions pkg/sql/catalog/schematelemetry/schema_telemetry_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,22 @@ package schematelemetry
import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/errors"
gogotypes "github.com/gogo/protobuf/types"
)

// CollectClusterSchemaForTelemetry returns a projection of the cluster's SQL
Expand All @@ -26,6 +39,250 @@ import (
func CollectClusterSchemaForTelemetry(
ctx context.Context, cfg *sql.ExecutorConfig, asOf hlc.Timestamp,
) (events []eventpb.EventPayload, _ error) {
// TODO(postamar): implement
return nil, nil
// Scrape the raw catalog.
var raw nstree.Catalog
if err := sql.DescsTxn(ctx, cfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
err := txn.SetFixedTimestamp(ctx, asOf)
if err != nil {
return err
}
raw, err = col.Direct().GetCatalogUnvalidated(ctx, txn)
return err
}); err != nil {
return nil, err
}
// Redact the descriptors.
redacted := make(map[descpb.ID]eventpb.Schema)
_ = raw.ForEachDescriptorEntry(func(rd catalog.Descriptor) error {
ev := eventpb.Schema{Id: uint32(rd.GetID())}
ev.Timestamp = asOf.WallTime
var errs []error
var pb *descpb.Descriptor
// Redact parts of the catalog which may contain PII.
{
mut := rd.NewBuilder().BuildCreatedMutable()
switch d := mut.(type) {
case *tabledesc.Mutable:
errs = redactTableDescriptor(d.TableDesc())
case *typedesc.Mutable:
redactTypeDescriptor(d.TypeDesc())
}
pb = mut.DescriptorProto()
}
// Marshal protobuf and add all errors to the event.
var anyErr error
ev.Desc, anyErr = gogotypes.MarshalAny(pb)
for _, err := range append(errs, errors.Wrap(anyErr, "marshal protobuf")) {
if err != nil {
ev.Errors = append(ev.Errors, err.Error())
}
}
redacted[rd.GetID()] = ev
return nil
})
// Build the log events for each of the namespace entries.
eventStructs := make([]eventpb.Schema, 0, len(redacted))
var logged catalog.DescriptorIDSet
_ = raw.ForEachNamespaceEntry(func(e catalog.NameEntry) error {
ev := eventpb.Schema{
ParentDatabaseId: uint32(e.GetParentID()),
ParentSchemaId: uint32(e.GetParentSchemaID()),
Name: e.GetName(),
Id: uint32(e.GetID()),
}
ev.Timestamp = asOf.WallTime
if src, ok := redacted[e.GetID()]; ok {
ev.Desc = src.Desc
ev.Errors = src.Errors
}
eventStructs = append(eventStructs, ev)
logged.Add(e.GetID())
return nil
})
// Add log events for orphaned descriptors.
for _, id := range raw.OrderedDescriptorIDs() {
if logged.Contains(id) {
continue
}
eventStructs = append(eventStructs, redacted[id])
}
// Build the returned slice of interfaces.
events = make([]eventpb.EventPayload, len(eventStructs))
for i := range eventStructs {
ev := &eventStructs[i]
ev.NumPages = uint32(len(eventStructs))
ev.CurrentPage = uint32(i + 1)
events[i] = ev
}
return events, nil
}

func redactTableDescriptor(d *descpb.TableDescriptor) (errs []error) {
handleErr := func(err error) {
errs = append(errs, err)
}
if d.ViewQuery != "" {
handleErr(errors.Wrap(redactQuery(&d.ViewQuery), "view query"))
}
if d.CreateQuery != "" {
handleErr(errors.Wrap(redactQuery(&d.CreateQuery), "create query"))
}
handleErr(redactIndex(&d.PrimaryIndex))
for i := range d.Indexes {
idx := &d.Indexes[i]
handleErr(errors.Wrapf(redactIndex(idx), "index #%d", idx.ID))
}
for i := range d.Columns {
col := &d.Columns[i]
for _, err := range redactColumn(col) {
handleErr(errors.Wrapf(err, "column #%d", col.ID))
}
}
for i := range d.Checks {
chk := d.Checks[i]
handleErr(errors.Wrapf(redactCheckConstraints(chk), "constraint #%d", chk.ConstraintID))
}
for i := range d.UniqueWithoutIndexConstraints {
uwi := &d.UniqueWithoutIndexConstraints[i]
handleErr(errors.Wrapf(redactUniqueWithoutIndexConstraint(uwi), "constraint #%d", uwi.ConstraintID))
}
for _, m := range d.Mutations {
if idx := m.GetIndex(); idx != nil {
handleErr(errors.Wrapf(redactIndex(idx), "index #%d", idx.ID))
} else if col := m.GetColumn(); col != nil {
for _, err := range redactColumn(col) {
handleErr(errors.Wrapf(err, "column #%d", col.ID))
}
} else if ctu := m.GetConstraint(); ctu != nil {
switch ctu.ConstraintType {
case descpb.ConstraintToUpdate_CHECK:
chk := &ctu.Check
handleErr(errors.Wrapf(redactCheckConstraints(chk), "constraint #%d", chk.ConstraintID))
case descpb.ConstraintToUpdate_UNIQUE_WITHOUT_INDEX:
uwi := &ctu.UniqueWithoutIndexConstraint
handleErr(errors.Wrapf(redactUniqueWithoutIndexConstraint(uwi), "constraint #%d", uwi.ConstraintID))
}
}
}
if scs := d.DeclarativeSchemaChangerState; scs != nil {
for i := range scs.RelevantStatements {
stmt := &scs.RelevantStatements[i]
stmt.Statement.Statement = stmt.Statement.RedactedStatement
}
for i := range scs.Targets {
t := &scs.Targets[i]
handleErr(errors.Wrapf(redactElement(t.Element()), "element #%d", i))
}
}
return errs
}

func redactQuery(sql *string) error {
q, err := parser.ParseOne(*sql)
if err != nil {
*sql = "_"
return err
}
fmtCtx := tree.NewFmtCtx(tree.FmtHideConstants)
q.AST.Format(fmtCtx)
*sql = fmtCtx.String()
return nil
}

func redactIndex(idx *descpb.IndexDescriptor) error {
redactPartitioning(&idx.Partitioning)
return errors.Wrap(redactExprStr(&idx.Predicate), "partial predicate")
}

func redactColumn(col *descpb.ColumnDescriptor) (errs []error) {
handleErr := func(err error) {
errs = append(errs, err)
}
if ce := col.ComputeExpr; ce != nil {
handleErr(errors.Wrap(redactExprStr(ce), "compute expr"))
}
if de := col.DefaultExpr; de != nil {
handleErr(errors.Wrap(redactExprStr(de), "default expr"))
}
if ue := col.OnUpdateExpr; ue != nil {
handleErr(errors.Wrap(redactExprStr(ue), "on-update expr"))
}
return errs
}

func redactCheckConstraints(chk *descpb.TableDescriptor_CheckConstraint) error {
return redactExprStr(&chk.Expr)
}

func redactUniqueWithoutIndexConstraint(uwi *descpb.UniqueWithoutIndexConstraint) error {
return redactExprStr(&uwi.Predicate)
}

func redactTypeDescriptor(d *descpb.TypeDescriptor) {
for i := range d.EnumMembers {
e := &d.EnumMembers[i]
e.LogicalRepresentation = "_"
e.PhysicalRepresentation = []byte("_")
}
}

// redactElement redacts literals which may contain PII from elements.
func redactElement(element scpb.Element) error {
switch e := element.(type) {
case *scpb.EnumTypeValue:
e.LogicalRepresentation = "_"
e.PhysicalRepresentation = []byte("_")
case *scpb.IndexPartitioning:
redactPartitioning(&e.PartitioningDescriptor)
case *scpb.SecondaryIndexPartial:
return redactExpr(&e.Expression.Expr)
case *scpb.CheckConstraint:
return redactExpr(&e.Expression.Expr)
case *scpb.ColumnDefaultExpression:
return redactExpr(&e.Expression.Expr)
case *scpb.ColumnOnUpdateExpression:
return redactExpr(&e.Expression.Expr)
case *scpb.ColumnType:
if e.ComputeExpr != nil {
return redactExpr(&e.ComputeExpr.Expr)
}
}
return nil
}

func redactPartitioning(p *catpb.PartitioningDescriptor) {
for i := range p.List {
l := &p.List[i]
for j := range l.Values {
l.Values[j] = []byte("_")
}
redactPartitioning(&l.Subpartitioning)
}
for i := range p.Range {
r := &p.Range[i]
r.FromInclusive = []byte("_")
r.ToExclusive = []byte("_")
}
}

func redactExpr(expr *catpb.Expression) error {
str := string(*expr)
err := redactExprStr(&str)
*expr = catpb.Expression(str)
return err
}

func redactExprStr(expr *string) error {
if *expr == "" {
return nil
}
parsedExpr, err := parser.ParseExpr(*expr)
if err != nil {
*expr = "_"
return err
}
fmtCtx := tree.NewFmtCtx(tree.FmtHideConstants)
parsedExpr.Format(fmtCtx)
*expr = fmtCtx.String()
return nil
}
4 changes: 4 additions & 0 deletions pkg/sql/catalog/schematelemetry/schema_telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ var (
)

const qHasJob = `SELECT count(*) FROM crdb_internal.jobs WHERE job_type = 'AUTO SCHEMA TELEMETRY' AND status = 'succeeded'`
const qHasLogs = `SELECT sign(count(*)) FROM system.eventlog WHERE "eventType" = 'schema'`

func TestSchemaTelemetrySchedule(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down Expand Up @@ -101,7 +102,10 @@ func TestSchemaTelemetryJob(t *testing.T) {
id := res[0][0]
tdb.ExecSucceedsSoon(t, fmt.Sprintf("PAUSE SCHEDULE %s", id))
tdb.CheckQueryResults(t, qHasJob, [][]string{{"0"}})
tdb.CheckQueryResults(t, qHasLogs, [][]string{{"0"}})
// Run a schema telemetry job and wait for it to succeed.
tdb.Exec(t, qJob)
tdb.CheckQueryResultsRetry(t, qHasJob, [][]string{{"1"}})
tdb.CheckQueryResults(t, qHasLogs, [][]string{{"1"}})

}
8 changes: 8 additions & 0 deletions pkg/sql/catalog/systemschema_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,25 @@ go_test(
data = glob(["testdata/**"]),
deps = [
"//pkg/base",
"//pkg/keys",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/schematelemetry",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/util/leaktest",
"//pkg/util/log/eventpb",
"//pkg/util/randutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_gogo_protobuf//types",
"@com_github_stretchr_testify//require",
],
)

Expand Down
Loading

0 comments on commit 0f207fa

Please sign in to comment.