Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tracing,testutils: detect span leaks #58902

Merged
merged 3 commits into from
Jan 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/ccl/importccl/import_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,15 @@ func (idp *readImportDataProcessor) Next() (rowenc.EncDatumRow, *execinfrapb.Pro
// Once the import is done, send back to the controller the serialized
// summary of the import operation. For more info see roachpb.BulkOpSummary.
countsBytes, err := protoutil.Marshal(idp.summary)
idp.MoveToDraining(err)
if err != nil {
idp.MoveToDraining(err)
return nil, idp.DrainHelper()
}

idp.MoveToDraining(nil /* err */)
return rowenc.EncDatumRow{
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes(countsBytes))),
rowenc.DatumToEncDatum(types.Bytes, tree.NewDBytes(tree.DBytes([]byte{}))),
}, idp.DrainHelper()
}, nil
}

// ConsumerDone is part of the RowSource interface.
Expand Down
5 changes: 5 additions & 0 deletions pkg/server/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,11 @@ func (ts *TestServer) ExecutorConfig() interface{} {
return *ts.sqlServer.execCfg
}

// Tracer is part of the TestServerInterface
func (ts *TestServer) Tracer() interface{} {
return ts.node.storeCfg.AmbientCtx.Tracer
}

// GCSystemLog deletes entries in the given system log table between
// timestamp and timestampUpperBound if the server is the lease holder
// for range 1.
Expand Down
5 changes: 3 additions & 2 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1496,11 +1496,12 @@ func (ex *connExecutor) recordTransaction(ev txnEvent, implicit bool, txnStart t
// child span if one is found. A context derived from parentCtx which
// additionally contains the new span is also returned.
func createRootOrChildSpan(
parentCtx context.Context, opName string, tr *tracing.Tracer,
parentCtx context.Context, opName string, tr *tracing.Tracer, os ...tracing.SpanOption,
) (context.Context, *tracing.Span) {
// WithForceRealSpan is used to support the use of session tracing, which
// may start recording on this span.
return tracing.EnsureChildSpan(parentCtx, tr, opName, tracing.WithForceRealSpan())
os = append(os, tracing.WithForceRealSpan())
return tracing.EnsureChildSpan(parentCtx, tr, opName, os...)
}

// logTraceAboveThreshold logs a span's recording if the duration is above a
Expand Down
3 changes: 1 addition & 2 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -644,8 +644,7 @@ func (r *DistSQLReceiver) Push(
if len(meta.TraceData) > 0 {
span := tracing.SpanFromContext(r.ctx)
if span == nil {
r.resultWriter.SetError(
errors.New("trying to ingest remote spans but there is no recording span set up"))
// Nothing to do.
} else if err := span.ImportRemoteSpans(meta.TraceData); err != nil {
r.resultWriter.SetError(errors.Errorf("error ingesting remote spans: %s", err))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/processorsbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ const (
// returned from now on. In this state, the processor is expected to drain its
// inputs (commonly by using DrainHelper()).
//
// If the processor has no input (ProcStateOpts.intputToDrain was not specified
// If the processor has no input (ProcStateOpts.inputToDrain was not specified
// at init() time), then we move straight to the StateTrailingMeta.
//
// An error can be optionally passed. It will be the first piece of metadata
Expand Down
15 changes: 15 additions & 0 deletions pkg/sql/flowinfra/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func TestClusterFlow(t *testing.T) {
metas = ignoreMisplannedRanges(metas)
metas = ignoreLeafTxnState(metas)
metas = ignoreMetricsMeta(metas)
metas = ignoreTraceData(metas)
if len(metas) != 0 {
t.Fatalf("unexpected metadata (%d): %+v", len(metas), metas)
}
Expand Down Expand Up @@ -327,6 +328,18 @@ func ignoreMetricsMeta(metas []execinfrapb.ProducerMetadata) []execinfrapb.Produ
return res
}

// ignoreTraceData takes a slice of metadata and returns the entries
// excluding the ones with trace data.
func ignoreTraceData(metas []execinfrapb.ProducerMetadata) []execinfrapb.ProducerMetadata {
res := make([]execinfrapb.ProducerMetadata, 0)
for _, m := range metas {
if m.TraceData == nil {
res = append(res, m)
}
}
return res
}

// TestLimitedBufferingDeadlock sets up a scenario which leads to deadlock if
// a single consumer can block the entire router (#17097).
func TestLimitedBufferingDeadlock(t *testing.T) {
Expand Down Expand Up @@ -545,6 +558,7 @@ func TestLimitedBufferingDeadlock(t *testing.T) {
metas = ignoreMisplannedRanges(metas)
metas = ignoreLeafTxnState(metas)
metas = ignoreMetricsMeta(metas)
metas = ignoreTraceData(metas)
if len(metas) != 0 {
t.Errorf("unexpected metadata (%d): %+v", len(metas), metas)
}
Expand Down Expand Up @@ -852,6 +866,7 @@ func BenchmarkInfrastructure(b *testing.B) {
metas = ignoreMisplannedRanges(metas)
metas = ignoreLeafTxnState(metas)
metas = ignoreMetricsMeta(metas)
metas = ignoreTraceData(metas)
if len(metas) != 0 {
b.Fatalf("unexpected metadata (%d): %+v", len(metas), metas)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/flowinfra/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func TestServer(t *testing.T) {
}
metas = ignoreLeafTxnState(metas)
metas = ignoreMetricsMeta(metas)
metas = ignoreTraceData(metas)
if len(metas) != 0 {
t.Errorf("unexpected metadata: %v", metas)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/physicalplan/aggregator_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func runTestFlow(
for {
row, meta := rowBuf.Next()
if meta != nil {
if meta.LeafTxnFinalState != nil || meta.Metrics != nil {
if meta.LeafTxnFinalState != nil || meta.Metrics != nil || meta.TraceData != nil {
continue
}
t.Fatalf("unexpected metadata: %v", meta)
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/txn_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (ts *txnState) resetForNewSQLTxn(
// TODO(andrei): figure out how to close these spans on server shutdown? Ties
// into a larger discussion about how to drain SQL and rollback open txns.
opName := sqlTxnName
txnCtx, sp := createRootOrChildSpan(connCtx, opName, tranCtx.tracer)
txnCtx, sp := createRootOrChildSpan(connCtx, opName, tranCtx.tracer, tracing.WithBypassRegistry())
if txnType == implicitTxn {
sp.SetTag("implicit", "true")
}
Expand Down
3 changes: 3 additions & 0 deletions pkg/testutils/serverutils/test_server_shim.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ type TestServerInterface interface {
// The real return type is sql.ExecutorConfig.
ExecutorConfig() interface{}

// Tracer returns a *tracing.Tracer as an interface{}.
Tracer() interface{}

// GossipI returns the gossip used by the TestServer.
// The real return type is *gossip.Gossip.
GossipI() interface{}
Expand Down
1 change: 1 addition & 0 deletions pkg/testutils/testcluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/tracing",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
],
Expand Down
44 changes: 37 additions & 7 deletions pkg/testutils/testcluster/testcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)
Expand All @@ -56,6 +57,8 @@ type TestCluster struct {
}
serverArgs []base.TestServerArgs
clusterArgs base.TestClusterArgs

t testing.TB
}

var _ serverutils.TestClusterInterface = &TestCluster{}
Expand Down Expand Up @@ -105,18 +108,45 @@ func (tc *TestCluster) stopServers(ctx context.Context) {
}
wg.Wait()

for i := range tc.mu.serverStoppers {
if tc.mu.serverStoppers[i] != nil {
tc.mu.serverStoppers[i].Stop(context.TODO())
tc.mu.serverStoppers[i] = nil
}
for i := 0; i < tc.NumServers(); i++ {
tc.stopServerLocked(i)
}

// TODO(irfansharif): Instead of checking for empty tracing registries after
// shutting down each node, we're doing it after shutting down all nodes.
// This is because TestCluster share the same Tracer object. Perhaps a saner
// thing to do is to separate out individual TestServers entirely. The
// component sharing within TestCluster has bitten in the past as well, and
// it's not clear why it has to be this way.
for i := 0; i < tc.NumServers(); i++ {
// Wait until a server's span registry is emptied out. This helps us check
// to see that there are no un-Finish()ed spans. We need to wrap this in a
// SucceedsSoon block because it's possible for us to issue requests during
// server shut down, where the requests in turn would create (registered)
// spans. Cleaning up temporary objects created by the session[1] is one
// example of this.
//
// [1]: cleanupSessionTempObjects
tracer := tc.Server(i).Tracer().(*tracing.Tracer)
testutils.SucceedsSoon(tc.t, func() error {
var err error
tracer.VisitSpans(func(span *tracing.Span) {
err = errors.Newf("expected to find no active spans, found %s", span.Meta())
})
return err
})
}
}

// StopServer stops an individual server in the cluster.
func (tc *TestCluster) StopServer(idx int) {
tc.mu.Lock()
defer tc.mu.Unlock()

tc.stopServerLocked(idx)
}

func (tc *TestCluster) stopServerLocked(idx int) {
if tc.mu.serverStoppers[idx] != nil {
tc.mu.serverStoppers[idx].Stop(context.TODO())
tc.mu.serverStoppers[idx] = nil
Expand Down Expand Up @@ -155,6 +185,7 @@ func NewTestCluster(t testing.TB, nodes int, clusterArgs base.TestClusterArgs) *
tc := &TestCluster{
stopper: stop.NewStopper(),
clusterArgs: clusterArgs,
t: t,
}

// Check if any of the args have a locality set.
Expand Down Expand Up @@ -409,8 +440,7 @@ func (tc *TestCluster) AddServer(serverArgs base.TestServerArgs) (*server.TestSe

tc.mu.Lock()
defer tc.mu.Unlock()
thisStopper := s.Stopper()
tc.mu.serverStoppers = append(tc.mu.serverStoppers, thisStopper)
tc.mu.serverStoppers = append(tc.mu.serverStoppers, s.Stopper())
return s, nil
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/util/tracing/span.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ type SpanMeta struct {
Baggage map[string]string
}

func (sm *SpanMeta) String() string {
return fmt.Sprintf("[spanID: %d, traceID: %d]", sm.spanID, sm.traceID)
}

func (s *Span) isNoop() bool {
return s.crdb == nil && s.netTr == nil && s.ot == (otSpan{})
}
Expand Down Expand Up @@ -172,8 +176,8 @@ func (s *Span) IsBlackHole() bool {
// isNilOrNoop returns true if the Span context is either nil
// or corresponds to a "no-op" Span. If this is true, any Span
// derived from this context will be a "black hole Span".
func (sc *SpanMeta) isNilOrNoop() bool {
return sc == nil || (sc.recordingType == RecordingOff && sc.shadowTracerType == "")
func (sm *SpanMeta) isNilOrNoop() bool {
return sm == nil || (sm.recordingType == RecordingOff && sm.shadowTracerType == "")
}

// SpanStats are stats that can be added to a Span.
Expand Down
32 changes: 26 additions & 6 deletions pkg/util/tracing/span_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ import (
// field comment below are invoked as arguments to `Tracer.StartSpan`.
// See the SpanOption interface for a synopsis.
type spanOptions struct {
Parent *Span // see WithParentAndAutoCollection
RemoteParent *SpanMeta // see WithParentAndManualCollection
RefType opentracing.SpanReferenceType // see WithFollowsFrom
LogTags *logtags.Buffer // see WithLogTags
Tags map[string]interface{} // see WithTags
ForceRealSpan bool // see WithForceRealSpan
Parent *Span // see WithParentAndAutoCollection
RemoteParent *SpanMeta // see WithParentAndManualCollection
RefType opentracing.SpanReferenceType // see WithFollowsFrom
LogTags *logtags.Buffer // see WithLogTags
Tags map[string]interface{} // see WithTags
ForceRealSpan bool // see WithForceRealSpan
BypassRegistry bool // See WithBypassRegistry
}

func (opts *spanOptions) parentTraceID() uint64 {
Expand Down Expand Up @@ -203,3 +204,22 @@ func (forceRealSpanOption) apply(opts spanOptions) spanOptions {
opts.ForceRealSpan = true
return opts
}

type bypassRegistryOption struct{}

// WithBypassRegistry instructs StartSpan to no record the span in the top-level
// registry. Spans started with this option are not inspectable. This was
// introduced as a stop-gap for long-lived Span objects that are never
// explicitly Finish()-ed (#58721). Recording these Spans in our registry would
// cause us to OOM.
//
// TODO(irfansharif,asubiotto): Purge all instances of this option; we should be
// recording all spans.
func WithBypassRegistry() SpanOption {
return bypassRegistryOption{}
}

func (bypassRegistryOption) apply(opts spanOptions) spanOptions {
opts.BypassRegistry = true
return opts
}
63 changes: 32 additions & 31 deletions pkg/util/tracing/tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,30 @@ const (
modeBackground
)

// tracingMode informs the creation of noop spans
// and the default recording mode of created spans.
// tracingMode informs the creation of noop spans and the default recording mode
// of created spans.
//
// If set to 'background', trace spans will be created for all operations, but
// these will record sparse structured information, unless an operation
// explicitly requests the verbose from. It's optimized for low overhead, and
// powers fine-grained statistics and alerts.
//
// If set to 'legacy', trace spans will not be created by default. This is
// unless an internal code path explicitly requests for it, or if an auxiliary
// tracer (such as lightstep or zipkin) is configured. This tracing mode always
// records in the verbose form. Using this mode has two effects: the
// observability of the cluster may be degraded (as most trace spans are elided)
// and where trace spans are created, they may consume large amounts of memory.
//
// Note that regardless of this setting, configuring an auxiliary trace sink
// will cause verbose traces to be created for all operations, which may lead to
// high memory consumption. It is not currently possible to send non-verbose
// traces to auxiliary sinks.
var tracingMode = settings.RegisterEnumSetting(
"trace.mode",
`configures the CockroachDB-internal tracing subsystem.

If set to 'background', trace spans will be created for all operations, but
these trace spans will only be recording sparse structured information,
unless an operation explicitly requests verbose recording. This is
optimized for low overhead, and powers fine-grained statistics and alerts.

If set to 'legacy', trace spans will not be created (unless an
auxiliary tracer such as Lightstep or Zipkin, is configured, or an
internal code path explicitly requests a trace to be created) but
when they are, they record verbose information. This has two effects:
the observability of the cluster may be degraded (as some trace spans
are elided) and where trace spans are created, they may consume large
amounts of memory. This mode should not be used with auxiliary tracing
sinks as that leads to expensive trace spans being created throughout.

Note that regardless of this setting, configuring an auxiliary
trace sink will cause verbose traces to be created for all
operations, which may lead to high memory consumption. It is not
currently possible to send non-verbose traces to auxiliary sinks.
`,
"legacy",
"if set to 'background', traces will be created for all operations (in"+
"'legacy' mode it's created when explicitly requested or when auxiliary tracers are configured)",
"background",
map[int64]string{
int64(modeLegacy): "legacy",
int64(modeBackground): "background",
Expand Down Expand Up @@ -142,8 +140,9 @@ type Tracer struct {
// Pointer to shadowTracer, if using one.
shadowTracer unsafe.Pointer

// activeSpans is a map that references all non-Finish'ed local root spans
// (i.e. those for which no WithLocalParent(<non-nil>) option was supplied).
// activeSpans is a map that references all non-Finish'ed local root spans,
// i.e. those for which no WithLocalParent(<non-nil>) option was supplied.
// It also elides spans created using WithBypassRegistry.
//
// In normal operation, a local root Span is inserted on creation and
// removed on .Finish().
Expand Down Expand Up @@ -428,11 +427,13 @@ func (t *Tracer) startSpanGeneric(
opts.Parent.crdb.mu.Unlock()
}
} else {
// Local root span - put it into the registry of active local root spans.
// `Span.Finish` takes care of deleting it again.
t.activeSpans.Lock()
t.activeSpans.m[s] = struct{}{}
t.activeSpans.Unlock()
if !opts.BypassRegistry {
// Local root span - put it into the registry of active local root
// spans. `Span.Finish` takes care of deleting it again.
t.activeSpans.Lock()
t.activeSpans.m[s] = struct{}{}
t.activeSpans.Unlock()
}

if opts.RemoteParent != nil {
for k, v := range opts.RemoteParent.Baggage {
Expand Down