Skip to content

Commit

Permalink
eventpb: add schema telemetry event
Browse files Browse the repository at this point in the history
This commit introduces the event logic for logging SQL schema telemetry.
This is tested on the schema of a bootstrapped test cluster, the test
checks that the schema events match expectations.

Care is taken to redact all strings present in descriptors which might
unintentionally be leaking PIIs.

Informs cockroachdb#84284.

Release note (general change): CRDB will now collect schema info if
phoning home is enabled. This schema info is added to the telemetry log
on a daily basis by default.
  • Loading branch information
Marius Posta committed Jul 28, 2022
1 parent cc22beb commit 31d1825
Show file tree
Hide file tree
Showing 13 changed files with 683 additions and 14 deletions.
53 changes: 53 additions & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2489,6 +2489,59 @@ contains common SQL event/execution details.
| `FullIndexScan` | Whether the query contains a full secondary index scan of a non-partial index. | no |
| `TxnCounter` | The sequence number of the SQL transaction inside its session. | no |

### `schema_snapshot_metadata`

An event of type `schema_snapshot_metadata` is an event describing a schema snapshot, which
is a set of SchemaSnapshotPage messages sharing the same SnapshotID.


| Field | Description | Sensitive |
|--|--|--|
| `SnapshotID` | SnapshotID is the unique identifier of this snapshot. | no |
| `NumPages` | NumPages is how many SchemaSnapshotPage events are in the snapshot. | no |
| `AsOfTimestamp` | AsOfTimestamp is when the snapshot was taken. This is equivalent to the timestamp given in the AS OF SYSTEM TIME clause when querying the namespace and descriptor tables in the system database. Expressed as nanoseconds since the Unix epoch. | no |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |

### `schema_snapshot_page`

An event of type `schema_snapshot_page` is an event for schema telemetry, whose purpose is to
take periodic snapshots of the cluster's SQL schema and publish them in the
telemetry log channel. For all intents and purposes, the data in such a
snapshot can be thought of the outer join of certain system tables:
namespace, descriptor, and at some point perhaps zones, etc.

Snapshots are too large to conveniently be published as a single log event,
so instead they're broken down into pages (SchemaSnapshotPage events) which
contain the data in one record of this outer join projection. These pages
are prefixed by a header (a SchemaSnapshotMetadata event).


| Field | Description | Sensitive |
|--|--|--|
| `SnapshotID` | SnapshotID is the unique identifier of the snapshot that this event is part of. | no |
| `CurrentPage` | CurrentPage is the ordinal of the event within the snapshot. The first event has value 1, therefore the last event verifies CurrentPage = SchemaSnapshotMetadata.NumPages. | no |
| `ParentDatabaseID` | ParentDatabaseID matches the same key column in system.namespace. | no |
| `ParentSchemaID` | ParentSchemaID matches the same key column in system.namespace. | no |
| `Name` | Name matches the same key column in system.namespace. | no |
| `DescID` | DescID matches the 'id' column in system.namespace and system.descriptor. | no |
| `Desc` | Desc matches the 'descriptor' column in system.descriptor. Some contents of the descriptor may be redacted to prevent leaking PII. Couldn't use descpb.Descriptor directly due to a circular dependency. | no |
| `Errors` | Errors records any errors encountered when post-processing this data, which includes the redaction of any potential PII. | yes |


#### Common fields

| Field | Description | Sensitive |
|--|--|--|
| `Timestamp` | The timestamp of the event. Expressed as nanoseconds since the Unix epoch. | no |
| `EventType` | The type of the event. | no |

## Zone config events

Events in this category pertain to zone configuration changes on
Expand Down
10 changes: 10 additions & 0 deletions pkg/sql/catalog/schematelemetry/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,16 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/nstree",
"//pkg/sql/catalog/schematelemetry/schematelemetrycontroller",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/parser",
"//pkg/sql/schemachanger/scpb",
"//pkg/sql/sem/builtins",
"//pkg/sql/sem/builtins/builtinconstants",
"//pkg/sql/sem/tree",
Expand All @@ -30,7 +38,9 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/log/eventpb",
"//pkg/util/metric",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_gogo_protobuf//types",
],
)

Expand Down
273 changes: 271 additions & 2 deletions pkg/sql/catalog/schematelemetry/schema_telemetry_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,288 @@ package schematelemetry
import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
gogotypes "github.com/gogo/protobuf/types"
)

// CollectClusterSchemaForTelemetry returns a projection of the cluster's SQL
// schema as of the provided system time, suitably filtered for the purposes of
// schema telemetry.
//
// This function is tested in the systemschema package.
//
// TODO(postamar): monitor memory usage
func CollectClusterSchemaForTelemetry(
ctx context.Context, cfg *sql.ExecutorConfig, asOf hlc.Timestamp,
) (events []eventpb.EventPayload, _ error) {
// TODO(postamar): implement
return nil, nil
// Scrape the raw catalog.
var raw nstree.Catalog
if err := sql.DescsTxn(ctx, cfg, func(ctx context.Context, txn *kv.Txn, col *descs.Collection) error {
err := txn.SetFixedTimestamp(ctx, asOf)
if err != nil {
return err
}
raw, err = col.Direct().GetCatalogUnvalidated(ctx, txn)
return err
}); err != nil {
return nil, err
}
// Redact the descriptors.
redacted := make(map[descpb.ID]eventpb.SchemaSnapshotPage)
_ = raw.ForEachDescriptorEntry(func(rd catalog.Descriptor) error {
ev := eventpb.SchemaSnapshotPage{DescID: uint32(rd.GetID())}
var errs []error
var pb *descpb.Descriptor
// Redact parts of the catalog which may contain PII.
{
mut := rd.NewBuilder().BuildCreatedMutable()
switch d := mut.(type) {
case *tabledesc.Mutable:
errs = redactTableDescriptor(d.TableDesc())
case *typedesc.Mutable:
redactTypeDescriptor(d.TypeDesc())
}
pb = mut.DescriptorProto()
}
// Marshal protobuf and add all errors to the event.
var anyErr error
ev.Desc, anyErr = gogotypes.MarshalAny(pb)
for _, err := range append(errs, errors.Wrap(anyErr, "marshal protobuf")) {
if err != nil {
ev.Errors = append(ev.Errors, err.Error())
}
}
redacted[rd.GetID()] = ev
return nil
})
// Build the log events for each of the namespace entries.
pages := make([]eventpb.SchemaSnapshotPage, 0, len(redacted))
var logged catalog.DescriptorIDSet
_ = raw.ForEachNamespaceEntry(func(e catalog.NameEntry) error {
ev := eventpb.SchemaSnapshotPage{
ParentDatabaseID: uint32(e.GetParentID()),
ParentSchemaID: uint32(e.GetParentSchemaID()),
Name: e.GetName(),
DescID: uint32(e.GetID()),
}
if src, ok := redacted[e.GetID()]; ok {
ev.Desc = src.Desc
ev.Errors = src.Errors
}
pages = append(pages, ev)
logged.Add(e.GetID())
return nil
})
// Add log events for orphaned descriptors.
for _, id := range raw.OrderedDescriptorIDs() {
if logged.Contains(id) {
continue
}
pages = append(pages, redacted[id])
}
// Build the returned slice of interfaces.

events = make([]eventpb.EventPayload, 1+len(pages))
meta := &eventpb.SchemaSnapshotMetadata{
CommonEventDetails: eventpb.CommonEventDetails{
Timestamp: asOf.WallTime,
},
SnapshotID: uuid.MakeV4().String(),
NumPages: uint32(len(pages)),
AsOfTimestamp: asOf.WallTime,
}
events[0] = meta
for i := range pages {
ev := &pages[i]
ev.Timestamp = asOf.WallTime
ev.SnapshotID = meta.SnapshotID
ev.CurrentPage = uint32(i + 1)
events[1+i] = ev
}
return events, nil
}

func redactTableDescriptor(d *descpb.TableDescriptor) (errs []error) {
handleErr := func(err error) {
errs = append(errs, err)
}
if d.ViewQuery != "" {
handleErr(errors.Wrap(redactQuery(&d.ViewQuery), "view query"))
}
if d.CreateQuery != "" {
handleErr(errors.Wrap(redactQuery(&d.CreateQuery), "create query"))
}
handleErr(redactIndex(&d.PrimaryIndex))
for i := range d.Indexes {
idx := &d.Indexes[i]
handleErr(errors.Wrapf(redactIndex(idx), "index #%d", idx.ID))
}
for i := range d.Columns {
col := &d.Columns[i]
for _, err := range redactColumn(col) {
handleErr(errors.Wrapf(err, "column #%d", col.ID))
}
}
for i := range d.Checks {
chk := d.Checks[i]
handleErr(errors.Wrapf(redactCheckConstraints(chk), "constraint #%d", chk.ConstraintID))
}
for i := range d.UniqueWithoutIndexConstraints {
uwi := &d.UniqueWithoutIndexConstraints[i]
handleErr(errors.Wrapf(redactUniqueWithoutIndexConstraint(uwi), "constraint #%d", uwi.ConstraintID))
}
for _, m := range d.Mutations {
if idx := m.GetIndex(); idx != nil {
handleErr(errors.Wrapf(redactIndex(idx), "index #%d", idx.ID))
} else if col := m.GetColumn(); col != nil {
for _, err := range redactColumn(col) {
handleErr(errors.Wrapf(err, "column #%d", col.ID))
}
} else if ctu := m.GetConstraint(); ctu != nil {
switch ctu.ConstraintType {
case descpb.ConstraintToUpdate_CHECK:
chk := &ctu.Check
handleErr(errors.Wrapf(redactCheckConstraints(chk), "constraint #%d", chk.ConstraintID))
case descpb.ConstraintToUpdate_UNIQUE_WITHOUT_INDEX:
uwi := &ctu.UniqueWithoutIndexConstraint
handleErr(errors.Wrapf(redactUniqueWithoutIndexConstraint(uwi), "constraint #%d", uwi.ConstraintID))
}
}
}
if scs := d.DeclarativeSchemaChangerState; scs != nil {
for i := range scs.RelevantStatements {
stmt := &scs.RelevantStatements[i]
stmt.Statement.Statement = stmt.Statement.RedactedStatement
}
for i := range scs.Targets {
t := &scs.Targets[i]
handleErr(errors.Wrapf(redactElement(t.Element()), "element #%d", i))
}
}
return errs
}

func redactQuery(sql *string) error {
q, err := parser.ParseOne(*sql)
if err != nil {
*sql = "_"
return err
}
fmtCtx := tree.NewFmtCtx(tree.FmtHideConstants)
q.AST.Format(fmtCtx)
*sql = fmtCtx.String()
return nil
}

func redactIndex(idx *descpb.IndexDescriptor) error {
redactPartitioning(&idx.Partitioning)
return errors.Wrap(redactExprStr(&idx.Predicate), "partial predicate")
}

func redactColumn(col *descpb.ColumnDescriptor) (errs []error) {
handleErr := func(err error) {
errs = append(errs, err)
}
if ce := col.ComputeExpr; ce != nil {
handleErr(errors.Wrap(redactExprStr(ce), "compute expr"))
}
if de := col.DefaultExpr; de != nil {
handleErr(errors.Wrap(redactExprStr(de), "default expr"))
}
if ue := col.OnUpdateExpr; ue != nil {
handleErr(errors.Wrap(redactExprStr(ue), "on-update expr"))
}
return errs
}

func redactCheckConstraints(chk *descpb.TableDescriptor_CheckConstraint) error {
return redactExprStr(&chk.Expr)
}

func redactUniqueWithoutIndexConstraint(uwi *descpb.UniqueWithoutIndexConstraint) error {
return redactExprStr(&uwi.Predicate)
}

func redactTypeDescriptor(d *descpb.TypeDescriptor) {
for i := range d.EnumMembers {
e := &d.EnumMembers[i]
e.LogicalRepresentation = "_"
e.PhysicalRepresentation = []byte("_")
}
}

// redactElement redacts literals which may contain PII from elements.
func redactElement(element scpb.Element) error {
switch e := element.(type) {
case *scpb.EnumTypeValue:
e.LogicalRepresentation = "_"
e.PhysicalRepresentation = []byte("_")
case *scpb.IndexPartitioning:
redactPartitioning(&e.PartitioningDescriptor)
case *scpb.SecondaryIndexPartial:
return redactExpr(&e.Expression.Expr)
case *scpb.CheckConstraint:
return redactExpr(&e.Expression.Expr)
case *scpb.ColumnDefaultExpression:
return redactExpr(&e.Expression.Expr)
case *scpb.ColumnOnUpdateExpression:
return redactExpr(&e.Expression.Expr)
case *scpb.ColumnType:
if e.ComputeExpr != nil {
return redactExpr(&e.ComputeExpr.Expr)
}
}
return nil
}

func redactPartitioning(p *catpb.PartitioningDescriptor) {
for i := range p.List {
l := &p.List[i]
for j := range l.Values {
l.Values[j] = []byte("_")
}
redactPartitioning(&l.Subpartitioning)
}
for i := range p.Range {
r := &p.Range[i]
r.FromInclusive = []byte("_")
r.ToExclusive = []byte("_")
}
}

func redactExpr(expr *catpb.Expression) error {
str := string(*expr)
err := redactExprStr(&str)
*expr = catpb.Expression(str)
return err
}

func redactExprStr(expr *string) error {
if *expr == "" {
return nil
}
parsedExpr, err := parser.ParseExpr(*expr)
if err != nil {
*expr = "_"
return err
}
fmtCtx := tree.NewFmtCtx(tree.FmtHideConstants)
parsedExpr.Format(fmtCtx)
*expr = fmtCtx.String()
return nil
}
Loading

0 comments on commit 31d1825

Please sign in to comment.