Skip to content

Commit

Permalink
sql: trace execution stats for query diagnostics
Browse files Browse the repository at this point in the history
The distsql processors are initialized with the Context in the eval context. If
this context contains a tracing span that is recording, the processors will set
up statistics collection and put them in the span as tags.

The statement diagnostics code sets up a span but doesn't change this context,
so statistics collection doesn't happen. We want these statistics in the trace,
as they will soon be used to generate EXPLAIN ANALYZE diagrams for the bundles.

This change fixes this issue and moves up the initialization of the planner so
we can tweak it directly, which simplifies code.

Release note (bug fix): statement diagnostics traces now contain processor
statistics.

Release justification: Bug fixes and low-risk updates to new functionality
  • Loading branch information
RaduBerinde committed Mar 16, 2020
1 parent 4ed99d9 commit e7afadb
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 27 deletions.
8 changes: 5 additions & 3 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1783,7 +1783,7 @@ func (ex *connExecutor) execCopyIn(
ex.statsCollector = ex.newStatsCollector()
ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes)
ex.initPlanner(ctx, p)
ex.resetPlanner(ctx, p, txn, stmtTS, 0 /* numAnnotations */)
ex.resetPlanner(ctx, p, txn, stmtTS)
}
if table := cmd.Stmt.Table; table.Table() == fileUploadTable && table.Schema() == crdbInternalName {
cm, err = newFileUploadMachine(cmd.Conn, cmd.Stmt, ex.server.cfg, resetPlanner)
Expand Down Expand Up @@ -2069,7 +2069,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) {
}

func (ex *connExecutor) resetPlanner(
ctx context.Context, p *planner, txn *kv.Txn, stmtTS time.Time, numAnnotations tree.AnnotationIdx,
ctx context.Context, p *planner, txn *kv.Txn, stmtTS time.Time,
) {
p.txn = txn
p.stmt = nil
Expand All @@ -2080,13 +2080,15 @@ func (ex *connExecutor) resetPlanner(
p.semaCtx.Location = &ex.sessionData.DataConversion.Location
p.semaCtx.SearchPath = ex.sessionData.SearchPath
p.semaCtx.AsOfTimestamp = nil
p.semaCtx.Annotations = tree.MakeAnnotations(numAnnotations)
p.semaCtx.Annotations = nil

ex.resetEvalCtx(&p.extendedEvalCtx, txn, stmtTS)

p.autoCommit = false
p.isPreparing = false
p.avoidCachedDescriptors = false
p.discardRows = false
p.collectBundle = false
}

// txnStateTransitionsApplyWrapper is a wrapper on top of Machine built with the
Expand Down
54 changes: 31 additions & 23 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,13 @@ func (ex *connExecutor) execStmtInOpenState(
}
}()

var discardRows bool
var stmtDiagPlan *planTop
p := &ex.planner
stmtTS := ex.server.cfg.Clock.PhysicalTime()
ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes)
ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS)
p.sessionDataMutator.paramStatusUpdater = res
p.noticeSender = res

var shouldCollectDiagnostics bool
var diagHelper *stmtDiagnosticsHelper

Expand All @@ -196,30 +201,33 @@ func (ex *connExecutor) execStmtInOpenState(
// returns some text which includes a URL.
// TODO(radu): maybe capture some of the rows and include them in the
// bundle.
discardRows = true
p.discardRows = true
} else {
shouldCollectDiagnostics, diagHelper = ex.stmtInfoRegistry.shouldCollectDiagnostics(ctx, stmt.AST)
}

if shouldCollectDiagnostics {
p.collectBundle = true
tr := ex.server.cfg.AmbientCtx.Tracer
origCtx := ctx
var sp opentracing.Span
ctx, sp = tracing.StartSnowballTrace(ctx, tr, "traced statement")
// TODO(radu): consider removing this if/when #46164 is addressed.
p.extendedEvalCtx.Context = ctx
defer func() {
// Record the statement information that we've collected.
// Note that in case of implicit transactions, the trace contains the auto-commit too.
sp.Finish()
trace := tracing.GetRecording(sp)

if diagHelper != nil {
diagHelper.Finish(origCtx, trace, stmtDiagPlan)
diagHelper.Finish(origCtx, trace, &p.curPlan)
} else {
// Handle EXPLAIN BUNDLE.
// If there was a communication error, no point in setting any results.
if retErr == nil {
retErr = setExplainBundleResult(
origCtx, res, stmt.AST, trace, stmtDiagPlan, ex.server.cfg,
origCtx, res, stmt.AST, trace, &p.curPlan, ex.server.cfg,
)
}
}
Expand Down Expand Up @@ -354,20 +362,16 @@ func (ex *connExecutor) execStmtInOpenState(
stmt.AnonymizedStr = ps.AnonymizedStr
res.ResetStmtType(ps.AST)

discardRows = discardRows || s.DiscardRows
if s.DiscardRows {
p.discardRows = true
}
}

p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations)

// For regular statements (the ones that get to this point), we
// don't return any event unless an error happens.

p := &ex.planner
stmtTS := ex.server.cfg.Clock.PhysicalTime()
ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes)
ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS, stmt.NumAnnotations)
p.sessionDataMutator.paramStatusUpdater = res
p.noticeSender = res
stmtDiagPlan = &p.curPlan

if os.ImplicitTxn.Get() {
asOfTs, err := p.isAsOf(stmt.AST)
if err != nil {
Expand Down Expand Up @@ -457,10 +461,8 @@ func (ex *connExecutor) execStmtInOpenState(
p.extendedEvalCtx.Annotations = &p.semaCtx.Annotations
ex.phaseTimes[plannerStartExecStmt] = timeutil.Now()
p.stmt = &stmt
p.discardRows = discardRows
p.cancelChecker = sqlbase.NewCancelChecker(ctx)
p.autoCommit = os.ImplicitTxn.Get() && !ex.server.cfg.TestingKnobs.DisableAutoCommit
p.collectBundle = shouldCollectDiagnostics
if err := ex.dispatchToExecutionEngine(ctx, p, res); err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -832,13 +834,19 @@ func (ex *connExecutor) execWithDistSQLEngine(

var evalCtxFactory func() *extendedEvalContext
if len(planner.curPlan.subqueryPlans) != 0 || len(planner.curPlan.postqueryPlans) != 0 {
var evalCtx extendedEvalContext
ex.initEvalCtx(ctx, &evalCtx, planner)
// The factory reuses the same object because the contexts are not used
// concurrently.
var factoryEvalCtx extendedEvalContext
ex.initEvalCtx(ctx, &factoryEvalCtx, planner)
evalCtxFactory = func() *extendedEvalContext {
ex.resetEvalCtx(&evalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
evalCtx.Placeholders = &planner.semaCtx.Placeholders
evalCtx.Annotations = &planner.semaCtx.Annotations
return &evalCtx
ex.resetEvalCtx(&factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp)
factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders
factoryEvalCtx.Annotations = &planner.semaCtx.Annotations
// Query diagnostics can change the Context; make sure we are using the
// same one.
// TODO(radu): consider removing this if/when #46164 is addressed.
factoryEvalCtx.Context = evalCtx.Context
return &factoryEvalCtx
}
}

Expand Down Expand Up @@ -895,7 +903,7 @@ func (ex *connExecutor) beginTransactionTimestampsAndReadMode(
}
ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes)
p := &ex.planner
ex.resetPlanner(ctx, p, nil /* txn */, now.GoTime(), 0 /* numAnnotations */)
ex.resetPlanner(ctx, p, nil /* txn */, now.GoTime())
ts, err := p.EvalAsOfTimestamp(s.Modes.AsOf)
if err != nil {
return 0, time.Time{}, nil, err
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/conn_executor_prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,9 @@ func (ex *connExecutor) prepare(

ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes)
p := &ex.planner
ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */, stmt.NumAnnotations)
ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */)
p.stmt = &stmt
p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations)
flags, err := ex.populatePrepared(ctx, txn, placeholderHints, p)
if err != nil {
txn.CleanupOnError(ctx, err)
Expand Down

0 comments on commit e7afadb

Please sign in to comment.