Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing: untangle local/remove parent vs recording collection #72898

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/replica_application_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,9 @@ func (d *replicaDecoder) createTracingSpans(ctx context.Context) {
cmd.ctx, cmd.sp = d.r.AmbientContext.Tracer.StartSpanCtx(
ctx,
opName,
// NB: we are lying here - we are not actually going to propagate
// the recording towards the root. That seems ok.
tracing.WithParentAndManualCollection(spanMeta),
// NB: Nobody is collecting the recording of this span; we have no
// mechanism for it.
tracing.WithRemoteParent(spanMeta),
tracing.WithFollowsFrom(),
)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica_range_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (p *pendingLeaseRequest) requestLeaseAsync(
ctx, sp = tr.StartSpanCtx(
ctx,
opName,
tracing.WithParentAndAutoCollection(parentSp),
tracing.WithParent(parentSp),
tracing.WithFollowsFrom(),
tagsOpt,
)
Expand Down
16 changes: 7 additions & 9 deletions pkg/migration/migrationmanager/manager_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,13 @@ RETURNING id;`).Scan(&secondID))
}()

testutils.SucceedsSoon(t, func() error {
// TODO(yuzefovich): this check is quite unfortunate since it relies on
// the assumption that all recordings from the child spans are imported
// into the tracer. However, this is not the case for the DistSQL
// processors where child spans are created with
// WithParentAndManualCollection option which requires explicitly
// importing the recordings from the children. This only happens when
// the execution flow is drained which cannot happen until we close
// the 'unblock' channel, and this we cannot do until we see the
// expected message in the trace.
// TODO(yuzefovich): this check is quite unfortunate since it relies on the
// assumption that all recordings from the child spans are imported into the
// tracer. However, this is not the case for the DistSQL processors whose
// recordings require explicit importing. This only happens when the
// execution flow is drained which cannot happen until we close the
// 'unblock' channel, and this we cannot do until we see the expected
// message in the trace.
//
// At the moment it works in a very fragile manner (by making sure that
// no processors actually create their own spans). Instead, a different
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/crdb_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ CREATE TABLE crdb_internal.node_inflight_trace_spans (
"only users with the admin role are allowed to read crdb_internal.node_inflight_trace_spans")
}
return p.ExecCfg().AmbientCtx.Tracer.VisitSpans(func(span tracing.RegistrySpan) error {
for _, rec := range span.GetRecording(tracing.RecordingVerbose) {
for _, rec := range span.GetFullRecording(tracing.RecordingVerbose) {
traceID := rec.TraceID
parentSpanID := rec.ParentSpanID
spanID := rec.SpanID
Expand Down
33 changes: 16 additions & 17 deletions pkg/sql/crdb_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,13 +700,13 @@ func TestDistSQLFlowsVirtualTables(t *testing.T) {
//
// Traces on node1:
// -------------
// root <-- traceID1
// root.child <-- traceID1
// root.child.remotechild <-- traceID1
// root <-- traceID1
// root.child <-- traceID1
// root.child.detached_child <-- traceID1
//
// Traces on node2:
// -------------
// root.child.remotechild2 <-- traceID1
// root.child.remotechild <-- traceID1
// root.child.remotechilddone <-- traceID1
// root2 <-- traceID2
// root2.child <-- traceID2
Expand All @@ -717,33 +717,33 @@ func setupTraces(t1, t2 *tracing.Tracer) (tracingpb.TraceID, func()) {
time.Sleep(10 * time.Millisecond)

// Start a child span on "node 1".
child := t1.StartSpan("root.child", tracing.WithParentAndAutoCollection(root))
child := t1.StartSpan("root.child", tracing.WithParent(root))

// Sleep a bit so that everything that comes afterwards has higher timestamps
// than the one we just assigned. Otherwise the sorting is not deterministic.
time.Sleep(10 * time.Millisecond)

// Start a forked child span on "node 1".
childRemoteChild := t1.StartSpan("root.child.remotechild", tracing.WithParentAndManualCollection(child.Meta()))
childDetachedChild := t1.StartSpan("root.child.detached_child", tracing.WithParent(child), tracing.WithDetachedRecording())

// Start a remote child span on "node 2".
childRemoteChild2 := t2.StartSpan("root.child.remotechild2", tracing.WithParentAndManualCollection(child.Meta()))
childRemoteChild := t2.StartSpan("root.child.remotechild", tracing.WithRemoteParent(child.Meta()))

time.Sleep(10 * time.Millisecond)

// Start another remote child span on "node 2" that we finish.
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithParentAndManualCollection(child.Meta()))
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithRemoteParent(child.Meta()))
child.ImportRemoteSpans(childRemoteChildFinished.FinishAndGetRecording(tracing.RecordingVerbose))

// Start another remote child span on "node 2" that we finish. This will have
// a different trace_id from the spans created above.
root2 := t2.StartSpan("root2", tracing.WithRecording(tracing.RecordingVerbose))

// Start a child span on "node 2".
child2 := t2.StartSpan("root2.child", tracing.WithParentAndAutoCollection(root2))
child2 := t2.StartSpan("root2.child", tracing.WithParent(root2))
return root.TraceID(), func() {
for _, span := range []*tracing.Span{root, child, childRemoteChild,
childRemoteChild2, root2, child2} {
for _, span := range []*tracing.Span{root, child, childDetachedChild,
childRemoteChild, root2, child2} {
span.Finish()
}
}
Expand All @@ -766,13 +766,16 @@ func TestClusterInflightTracesVirtualTable(t *testing.T) {
traceID, cleanup := setupTraces(node1Tracer, node2Tracer)
defer cleanup()

// The cluster_inflight_traces table is magic and only returns results when
// the query contains an index constraint.

t.Run("no-index-constraint", func(t *testing.T) {
sqlDB.CheckQueryResults(t, `SELECT * from crdb_internal.cluster_inflight_traces`, [][]string{})
})

t.Run("with-index-constraint", func(t *testing.T) {
// We expect there to be 3 tracing.Recordings rooted at
// root, root.child.remotechild, root.child.remotechild2.
// root and root.child.remotechild.
expectedRows := []struct {
traceID int
nodeID int
Expand All @@ -781,10 +784,6 @@ func TestClusterInflightTracesVirtualTable(t *testing.T) {
traceID: int(traceID),
nodeID: 1,
},
{
traceID: int(traceID),
nodeID: 1,
},
{
traceID: int(traceID),
nodeID: 2,
Expand All @@ -799,8 +798,8 @@ func TestClusterInflightTracesVirtualTable(t *testing.T) {
require.NoError(t, rows.Scan(&traceID, &nodeID, &traceStr, &jaegarJSON))
require.Less(t, rowIdx, len(expectedRows))
expected := expectedRows[rowIdx]
require.Equal(t, expected.nodeID, nodeID)
require.Equal(t, expected.traceID, traceID)
require.Equal(t, expected.nodeID, nodeID)
require.NotEmpty(t, traceStr)
require.NotEmpty(t, jaegarJSON)
rowIdx++
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsql/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,13 @@ func (ds *ServerImpl) setupFlow(
// TODO(andrei): localState.IsLocal is not quite the right thing to use.
// If that field is unset, we might still want to create a child span if
// this flow is run synchronously.
ctx, sp = ds.Tracer.StartSpanCtx(ctx, opName, tracing.WithParentAndAutoCollection(parentSpan))
ctx, sp = ds.Tracer.StartSpanCtx(ctx, opName, tracing.WithParent(parentSpan))
} else {
// We use FollowsFrom because the flow's span outlives the SetupFlow request.
ctx, sp = ds.Tracer.StartSpanCtx(
ctx,
opName,
tracing.WithParentAndAutoCollection(parentSpan),
tracing.WithParent(parentSpan),
tracing.WithFollowsFrom(),
)
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,12 @@ func (pb *ProcessorBase) AppendTrailingMeta(meta execinfrapb.ProducerMetadata) {
// ProcessorSpan creates a child span for a processor (if we are doing any
// tracing). The returned span needs to be finished using tracing.FinishSpan.
func ProcessorSpan(ctx context.Context, name string) (context.Context, *tracing.Span) {
return tracing.ChildSpanRemote(ctx, name)
sp := tracing.SpanFromContext(ctx)
if sp == nil {
return ctx, nil
}
return sp.Tracer().StartSpanCtx(ctx, name,
tracing.WithParent(sp), tracing.WithDetachedRecording())
}

// StartInternal prepares the ProcessorBase for execution. It returns the
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sem/builtins/generator_builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -1816,7 +1816,7 @@ func (p *payloadsForSpanGenerator) Start(_ context.Context, _ *kv.Txn) error {
// managing the iterator's position needs to start at -1 instead of 0.
p.payloadIndex = -1

rec := p.span.GetRecording(tracing.RecordingStructured)
rec := p.span.GetFullRecording(tracing.RecordingStructured)
if rec == nil {
// No structured records.
return nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/tests/tracing_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ func TestSetTraceSpansVerbosityBuiltin(t *testing.T) {
defer root.Finish()
require.False(t, root.IsVerbose())

child := tr.StartSpan("root.child", tracing.WithParentAndAutoCollection(root))
child := tr.StartSpan("root.child", tracing.WithParent(root))
defer child.Finish()
require.False(t, child.IsVerbose())

childChild := tr.StartSpan("root.child.child", tracing.WithParentAndAutoCollection(child))
childChild := tr.StartSpan("root.child.child", tracing.WithParent(child))
defer childChild.Finish()
require.False(t, childChild.IsVerbose())

Expand Down Expand Up @@ -79,7 +79,7 @@ func TestSetTraceSpansVerbosityBuiltin(t *testing.T) {
require.False(t, childChild.IsVerbose())

// New child of verbose child span should also be verbose by default.
newChild := tr.StartSpan("root.child.newchild", tracing.WithParentAndAutoCollection(root))
newChild := tr.StartSpan("root.child.newchild", tracing.WithParent(root))
defer newChild.Finish()
require.True(t, newChild.IsVerbose())

Expand Down
2 changes: 1 addition & 1 deletion pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (tc *TestCluster) stopServers(ctx context.Context) {
var buf strings.Builder
fmt.Fprintf(&buf, "unexpectedly found %d active spans:\n", len(sps))
for _, sp := range sps {
fmt.Fprintln(&buf, sp.GetRecording(tracing.RecordingVerbose))
fmt.Fprintln(&buf, sp.GetFullRecording(tracing.RecordingVerbose))
fmt.Fprintln(&buf)
}
return errors.Newf("%s", buf.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/stop/stopper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,7 +733,7 @@ func TestStopperRunAsyncTaskTracing(t *testing.T) {
errC <- errors.Errorf("missing span")
return
}
sp = tr.StartSpan("child", tracing.WithParentAndAutoCollection(sp))
sp = tr.StartSpan("child", tracing.WithParent(sp))
if sp.TraceID() == traceID {
errC <- errors.Errorf("expected different trace")
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/util/tracing/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,10 @@ func BenchmarkTracer_StartSpanCtx(b *testing.B) {
WithForceRealSpan(), WithLogTags(&staticLogTags),
}},
{"real,autoparent", []SpanOption{
WithForceRealSpan(), WithParentAndAutoCollection(parSp),
WithForceRealSpan(), WithParent(parSp),
}},
{"real,manualparent", []SpanOption{
WithForceRealSpan(), WithParentAndManualCollection(parSp.Meta()),
WithForceRealSpan(), WithParent(parSp), WithDetachedRecording(),
}},
} {
b.Run(fmt.Sprintf("opts=%s", tc.name), func(b *testing.B) {
Expand Down Expand Up @@ -91,7 +91,7 @@ func BenchmarkSpan_GetRecording(b *testing.B) {
run(b, sp)
})

child := tr.StartSpan("bar", WithParentAndAutoCollection(sp), WithForceRealSpan())
child := tr.StartSpan("bar", WithParent(sp), WithForceRealSpan())
b.Run("child-only", func(b *testing.B) {
run(b, child)
})
Expand All @@ -110,7 +110,7 @@ func BenchmarkRecordingWithStructuredEvent(b *testing.B) {
for i := 0; i < b.N; i++ {
root := tr.StartSpan("foo")
root.RecordStructured(ev)
child := tr.StartSpan("bar", WithParentAndAutoCollection(root))
child := tr.StartSpan("bar", WithParent(root))
child.RecordStructured(ev)
child.Finish()
_ = root.GetRecording(RecordingStructured)
Expand Down
12 changes: 6 additions & 6 deletions pkg/util/tracing/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,35 +72,35 @@ func setupTraces(t1, t2 *tracing.Tracer) (tracingpb.TraceID, tracingpb.TraceID,
time.Sleep(10 * time.Millisecond)

// Start a child span on "node 1".
child := t1.StartSpan("root.child", tracing.WithParentAndAutoCollection(root))
child := t1.StartSpan("root.child", tracing.WithParent(root))

// Sleep a bit so that everything that comes afterwards has higher timestamps
// than the one we just assigned. Otherwise the sorting is not deterministic.
time.Sleep(10 * time.Millisecond)

// Start a remote child span on "node 2".
childRemoteChild := t2.StartSpan("root.child.remotechild", tracing.WithParentAndManualCollection(child.Meta()))
childRemoteChild := t2.StartSpan("root.child.remotechild", tracing.WithRemoteParent(child.Meta()))
childRemoteChild.RecordStructured(newTestStructured("root.child.remotechild"))

time.Sleep(10 * time.Millisecond)

// Start another remote child span on "node 2" that we finish.
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithParentAndManualCollection(child.Meta()))
childRemoteChildFinished := t2.StartSpan("root.child.remotechilddone", tracing.WithRemoteParent(child.Meta()))
child.ImportRemoteSpans(childRemoteChildFinished.FinishAndGetRecording(tracing.RecordingVerbose))

// Start a root span on "node 2".
root2 := t2.StartSpan("root2", tracing.WithRecording(tracing.RecordingVerbose))
root2.RecordStructured(newTestStructured("root2"))

// Start a child span on "node 2".
child2 := t2.StartSpan("root2.child", tracing.WithParentAndAutoCollection(root2))
child2 := t2.StartSpan("root2.child", tracing.WithParent(root2))
// Start a remote child span on "node 1".
child2RemoteChild := t1.StartSpan("root2.child.remotechild", tracing.WithParentAndManualCollection(child2.Meta()))
child2RemoteChild := t1.StartSpan("root2.child.remotechild", tracing.WithRemoteParent(child2.Meta()))

time.Sleep(10 * time.Millisecond)

// Start another remote child span on "node 1".
anotherChild2RemoteChild := t1.StartSpan("root2.child.remotechild2", tracing.WithParentAndManualCollection(child2.Meta()))
anotherChild2RemoteChild := t1.StartSpan("root2.child.remotechild2", tracing.WithRemoteParent(child2.Meta()))
return root.TraceID(), root2.TraceID(), func() {
for _, span := range []*tracing.Span{root, child, childRemoteChild, root2, child2,
child2RemoteChild, anotherChild2RemoteChild} {
Expand Down
Loading