Skip to content

Commit

Permalink
Merge #66679 #66837
Browse files Browse the repository at this point in the history
66679: sql,tracing: introduce crdb_internal.cluster_inflight_traces r=abarganier a=adityamaru

This change adds a new indexed, virtual table
`crdb_internal.cluster_inflight_traces`. This table surfaces
cluster-wide inflight traces for the trace_id specified via
an index constraint.

Each row in the virtual table corresponds to a
`tracing.Recording` on a particular node for the given
trace ID. A `tracing.Recording` is the trace of a single
operation rooted at a root span on that node. Under the hood,
the virtual table contacts all "live" nodes in the cluster
via the trace collector which streams back a recording at a
time.

The table has 3 additional columns that surface the raw JSON,
string, and JaegarJSON representation of the recording. These
formats are what we dump in a stmt bundle as well, and have
been considered the best way to consume traces. This table
is not meant to be consumed directly via the SQL shell but
will have CLI wrapper built on top of it that will assimilate
and write the traces to files. Similar to how we dump a stmt
bundle.

This change also tweaks some of the recording->string methods
to include StructuredRecords.

Informs: #64992

Release note (sql change): adds a virtual table
`crdb_internal.cluster_inflight_traces` which surfaces
cluster-wide inflight traces for the trace_id specified via
an index constraint. The output of this table is not appropriate
to consume over a SQL connection; follow up changes will add
CLI wrappers to make the interaction more user-friendly.


66837: sql/schemachanger: reorder args on Build, clone nodes, minor renaming r=fqazi a=ajwerner

Release note: None

Co-authored-by: Aditya Maru <adityamaru@gmail.com>
Co-authored-by: Andrew Werner <awerner32@gmail.com>
  • Loading branch information
3 people committed Jun 28, 2021
3 parents 9785a7b + 3d5c000 + 06dc96e commit 53dcc3a
Show file tree
Hide file tree
Showing 57 changed files with 2,330 additions and 1,925 deletions.
1 change: 1 addition & 0 deletions pkg/cli/zip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ table_name NOT IN (
'cluster_contended_keys',
'cluster_contended_indexes',
'cluster_contended_tables',
'cluster_inflight_traces',
'create_statements',
'create_type_statements',
'cross_db_references',
Expand Down
626 changes: 313 additions & 313 deletions pkg/jobs/jobspb/jobs.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/jobs/jobspb/jobs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ message NewSchemaChangeDetails {

// NewSchemaChangeProgress is the persisted progress for the new schema change job.
message NewSchemaChangeProgress {
repeated cockroach.sql.schemachanger.scpb.State states = 1;
repeated cockroach.sql.schemachanger.scpb.Status states = 1;
}


Expand Down
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/collector",
"//pkg/util/tracing/service",
"//pkg/util/tracing/tracingpb",
"//pkg/util/tracing/tracingservicepb:tracingservicepb_go_proto",
Expand Down
15 changes: 13 additions & 2 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing/collector"
"github.com/cockroachdb/cockroach/pkg/util/tracing/service"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingservicepb"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -491,8 +492,8 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}

var isAvailable func(roachpb.NodeID) bool
nodeLiveness, ok := cfg.nodeLiveness.Optional(47900)
if ok {
nodeLiveness, hasNodeLiveness := cfg.nodeLiveness.Optional(47900)
if hasNodeLiveness {
// TODO(erikgrinaker): We may want to use IsAvailableNotDraining instead, to
// avoid scheduling long-running flows (e.g. rangefeeds or backups) on nodes
// that are being drained/decommissioned. However, these nodes can still be
Expand All @@ -507,6 +508,15 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}
}

// Setup the trace collector that is used to fetch inflight trace spans from
// all nodes in the cluster.
// The collector requires nodeliveness to get a list of all the nodes in the
// cluster.
var traceCollector *collector.TraceCollector
if hasNodeLiveness {
traceCollector = collector.New(cfg.nodeDialer, nodeLiveness, cfg.Settings.Tracer)
}

*execCfg = sql.ExecutorConfig{
Settings: cfg.Settings,
NodeInfo: nodeInfo,
Expand Down Expand Up @@ -537,6 +547,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
RootMemoryMonitor: rootSQLMemoryMonitor,
TestingKnobs: sqlExecutorTestingKnobs,
CompactEngineSpanFunc: compactEngineSpanFunc,
TraceCollector: traceCollector,

DistSQLPlanner: sql.NewDistSQLPlanner(
ctx,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"//pkg/util/tracing/collector",
"//pkg/util/tracing/tracingpb",
"//pkg/util/uint128",
"//pkg/util/uuid",
Expand All @@ -393,7 +394,6 @@ go_library(
"@com_github_cockroachdb_errors//hintdetail",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//jsonpb",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_lib_pq//:pq",
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/catalog/catconstants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ const (
CrdbInternalInterleaved
CrdbInternalCrossDbRefrences
CrdbInternalLostTableDescriptors
CrdbInternalClusterInflightTracesTable
InformationSchemaID
InformationSchemaAdministrableRoleAuthorizationsID
InformationSchemaApplicableRolesID
Expand Down
26 changes: 13 additions & 13 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2699,7 +2699,7 @@ func (ex *connExecutor) notifyStatsRefresherOfNewTables(ctx context.Context) {
// mutate descriptors prior to committing a SQL transaction.
func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
scs := &ex.extraTxnState.schemaChangerState
if len(scs.nodes) == 0 {
if len(scs.state) == 0 {
return nil
}
executor := scexec.NewExecutor(
Expand All @@ -2710,29 +2710,29 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
after, err := runNewSchemaChanger(
ctx,
scplan.PreCommitPhase,
ex.extraTxnState.schemaChangerState.nodes,
ex.extraTxnState.schemaChangerState.state,
executor,
scs.stmts,
)
if err != nil {
return err
}
scs.nodes = after
targetSlice := make([]*scpb.Target, len(scs.nodes))
states := make([]scpb.State, len(scs.nodes))
scs.state = after
targetSlice := make([]*scpb.Target, len(scs.state))
states := make([]scpb.Status, len(scs.state))
// TODO(ajwerner): It may be better in the future to have the builder be
// responsible for determining this set of descriptors. As of the time of
// writing, the descriptors to be "locked," descriptors that need schema
// change jobs, and descriptors with schema change mutations all coincide. But
// there are future schema changes to be implemented in the new schema changer
// (e.g., RENAME TABLE) for which this may no longer be true.
descIDSet := catalog.MakeDescriptorIDSet()
for i := range scs.nodes {
targetSlice[i] = scs.nodes[i].Target
states[i] = scs.nodes[i].State
for i := range scs.state {
targetSlice[i] = scs.state[i].Target
states[i] = scs.state[i].Status
// Depending on the element type either a single descriptor ID
// will exist or multiple (i.e. foreign keys).
if id := scpb.GetDescID(scs.nodes[i].Element()); id != descpb.InvalidID {
if id := scpb.GetDescID(scs.state[i].Element()); id != descpb.InvalidID {
descIDSet.Add(id)
}
}
Expand Down Expand Up @@ -2765,18 +2765,18 @@ func (ex *connExecutor) runPreCommitStages(ctx context.Context) error {
func runNewSchemaChanger(
ctx context.Context,
phase scplan.Phase,
nodes []*scpb.Node,
state scpb.State,
executor *scexec.Executor,
stmts []string,
) (after []*scpb.Node, _ error) {
sc, err := scplan.MakePlan(nodes, scplan.Params{
) (after scpb.State, _ error) {
sc, err := scplan.MakePlan(state, scplan.Params{
ExecutionPhase: phase,
// TODO(ajwerner): Populate the set of new descriptors
})
if err != nil {
return nil, err
}
after = nodes
after = state
for _, s := range sc.Stages {
if err := executor.ExecuteOps(ctx, s.Ops,
scexec.TestingKnobMetadata{
Expand Down
77 changes: 77 additions & 0 deletions pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ var crdbInternal = virtualSchema{
catconstants.CrdbInternalInterleaved: crdbInternalInterleaved,
catconstants.CrdbInternalCrossDbRefrences: crdbInternalCrossDbReferences,
catconstants.CrdbInternalLostTableDescriptors: crdbLostTableDescriptors,
catconstants.CrdbInternalClusterInflightTracesTable: crdbInternalClusterInflightTracesTable,
},
validWithNoDatabaseContext: true,
}
Expand Down Expand Up @@ -1197,6 +1198,82 @@ CREATE TABLE crdb_internal.session_trace (
},
}

// crdbInternalClusterInflightTracesTable exposes cluster-wide inflight spans
// for a trace_id.
//
// crdbInternalClusterInflightTracesTable is an indexed, virtual table that only
// returns rows when accessed with an index constraint specifying the trace_id
// for which inflight spans need to be aggregated from all nodes in the cluster.
//
// Each row in the virtual table corresponds to a single `tracing.Recording` on
// a particular node. A `tracing.Recording` is the trace of a single operation
// rooted at a root span on that node. Under the hood, the virtual table
// contacts all "live" nodes in the cluster via the trace collector which
// streams back a recording at a time.
//
// The underlying trace collector only buffers recordings one node at a time.
// The virtual table also produces rows lazily, i.e. as and when they are
// consumed by the consumer. Therefore, the memory overhead of querying this
// table will be the size of all the `tracing.Recordings` of a particular
// `trace_id` on a single node in the cluster. Each `tracing.Recording` has its
// own memory protections via ring buffers, and so we do not expect this
// overhead to grow in an unbounded manner.
var crdbInternalClusterInflightTracesTable = virtualSchemaTable{
comment: `traces for in-flight spans across all nodes in the cluster (cluster RPC; expensive!)`,
schema: `
CREATE TABLE crdb_internal.cluster_inflight_traces (
trace_id INT NOT NULL, -- The trace's ID.
node_id INT NOT NULL, -- The node's ID.
trace_json STRING NULL, -- JSON representation of the traced remote operation.
trace_str STRING NULL, -- human readable representation of the traced remote operation.
jaeger_json STRING NULL, -- Jaeger JSON representation of the traced remote operation.
INDEX(trace_id)
)`,
indexes: []virtualIndex{{populate: func(ctx context.Context, constraint tree.Datum, p *planner,
db catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) (matched bool, err error) {
var traceID uint64
d := tree.UnwrapDatum(p.EvalContext(), constraint)
if d == tree.DNull {
return false, nil
}
switch t := d.(type) {
case *tree.DInt:
traceID = uint64(*t)
default:
return false, errors.AssertionFailedf(
"unexpected type %T for trace_id column in virtual table crdb_internal.cluster_inflight_traces", d)
}

traceCollector := p.ExecCfg().TraceCollector
for iter := traceCollector.StartIter(ctx, traceID); iter.Valid(); iter.Next() {
nodeID, recording := iter.Value()
traceJSON, err := tracing.TraceToJSON(recording)
if err != nil {
return false, err
}
traceString := recording.String()
traceJaegerJSON, err := recording.ToJaegerJSON("", "", fmt.Sprintf("node %d", nodeID))
if err != nil {
return false, err
}
if err := addRow(tree.NewDInt(tree.DInt(traceID)), tree.NewDInt(tree.DInt(nodeID)),
tree.NewDString(traceJSON), tree.NewDString(traceString),
tree.NewDString(traceJaegerJSON)); err != nil {
return false, err
}
}

return true, nil
}}},
populate: func(ctx context.Context, p *planner, db catalog.DatabaseDescriptor,
addRow func(...tree.Datum) error) error {
// We only want to generate rows when an index constraint is provided on the
// query accessing this vtable. This index constraint will provide the
// trace_id for which we will collect inflight trace spans from the cluster.
return nil
},
}

// crdbInternalInflightTraceSpanTable exposes the node-local registry of in-flight spans.
var crdbInternalInflightTraceSpanTable = virtualSchemaTable{
comment: `in-flight spans (RAM; local node only)`,
Expand Down
120 changes: 120 additions & 0 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/pgtype"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -808,3 +809,122 @@ func TestDistSQLFlowsVirtualTables(t *testing.T) {
})
}
}

// setupTraces takes two tracers (potentially on different nodes), and creates
// two span hierarchies as depicted below. The method returns the traceIDs for
// both these span hierarchies, along with a cleanup method to Finish() all the
// opened spans.
//
// Traces on node1:
// -------------
// root <-- traceID1
// root.child <-- traceID1
// root.child.remotechild <-- traceID1
//
// Traces on node2:
// -------------
// root.child.remotechild2 <-- traceID1
// root.child.remotechilddone <-- traceID1
// root2 <-- traceID2
// root2.child <-- traceID2
func setupTraces(t1, t2 *tracing.Tracer) (uint64, func()) {
// Start a root span on "node 1".
root := t1.StartSpan("root", tracing.WithForceRealSpan())
root.SetVerbose(true)

time.Sleep(10 * time.Millisecond)

// Start a child span on "node 1".
child := t1.StartSpan("root.child", tracing.WithParentAndAutoCollection(root))

// Sleep a bit so that everything that comes afterwards has higher timestamps
// than the one we just assigned. Otherwise the sorting is not deterministic.
time.Sleep(10 * time.Millisecond)

// Start a forked child span on "node 1".
childRemoteChild := t1.StartSpan("root.child.remotechild", tracing.WithParentAndManualCollection(child.Meta()))

// Start a remote child span on "node 2".
childRemoteChild2 := t2.StartSpan("root.child.remotechild2", tracing.WithParentAndManualCollection(child.Meta()))

time.Sleep(10 * time.Millisecond)

// Start another remote child span on "node 2" that we finish.
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithParentAndManualCollection(child.Meta()))
childRemoteChildFinished.Finish()
child.ImportRemoteSpans(childRemoteChildFinished.GetRecording())

// Start another remote child span on "node 2" that we finish. This will have
// a different trace_id from the spans created above.
root2 := t2.StartSpan("root2", tracing.WithForceRealSpan())
root2.SetVerbose(true)

// Start a child span on "node 2".
child2 := t2.StartSpan("root2.child", tracing.WithParentAndAutoCollection(root2))
return root.TraceID(), func() {
for _, span := range []*tracing.Span{root, child, childRemoteChild,
childRemoteChild2, root2, child2} {
span.Finish()
}
}
}

func TestClusterInflightTracesVirtualTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
args := base.TestClusterArgs{}
tc := testcluster.StartTestCluster(t, 2 /* nodes */, args)
defer tc.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

node1Tracer := tc.Server(0).Tracer().(*tracing.Tracer)
node2Tracer := tc.Server(1).Tracer().(*tracing.Tracer)

traceID, cleanup := setupTraces(node1Tracer, node2Tracer)
defer cleanup()

t.Run("no-index-constraint", func(t *testing.T) {
sqlDB.CheckQueryResults(t, `SELECT * from crdb_internal.cluster_inflight_traces`, [][]string{})
})

t.Run("with-index-constraint", func(t *testing.T) {
// We expect there to be 3 tracing.Recordings rooted at
// root, root.child.remotechild, root.child.remotechild2.
expectedRows := []struct {
traceID int
nodeID int
}{
{
traceID: int(traceID),
nodeID: 1,
},
{
traceID: int(traceID),
nodeID: 1,
},
{
traceID: int(traceID),
nodeID: 2,
},
}
var rowIdx int
rows := sqlDB.Query(t, `SELECT trace_id, node_id, trace_json, trace_str, jaeger_json from crdb_internal.cluster_inflight_traces WHERE trace_id=$1`, traceID)
defer rows.Close()
for rows.Next() {
var traceID, nodeID int
var traceJSON, traceStr, jaegarJSON string
require.NoError(t, rows.Scan(&traceID, &nodeID, &traceJSON, &traceStr, &jaegarJSON))
require.Less(t, rowIdx, len(expectedRows))
expected := expectedRows[rowIdx]
require.Equal(t, expected.nodeID, nodeID)
require.Equal(t, expected.traceID, traceID)
require.NotEmpty(t, traceJSON)
require.NotEmpty(t, traceStr)
require.NotEmpty(t, jaegarJSON)
rowIdx++
}
})
}
Loading

0 comments on commit 53dcc3a

Please sign in to comment.