diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index cc2e9b0f8250..b7d6010a2e39 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -1783,7 +1783,7 @@ func (ex *connExecutor) execCopyIn( ex.statsCollector = ex.newStatsCollector() ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes) ex.initPlanner(ctx, p) - ex.resetPlanner(ctx, p, txn, stmtTS, 0 /* numAnnotations */) + ex.resetPlanner(ctx, p, txn, stmtTS) } if table := cmd.Stmt.Table; table.Table() == fileUploadTable && table.Schema() == crdbInternalName { cm, err = newFileUploadMachine(cmd.Conn, cmd.Stmt, ex.server.cfg, resetPlanner) @@ -2069,7 +2069,7 @@ func (ex *connExecutor) initPlanner(ctx context.Context, p *planner) { } func (ex *connExecutor) resetPlanner( - ctx context.Context, p *planner, txn *kv.Txn, stmtTS time.Time, numAnnotations tree.AnnotationIdx, + ctx context.Context, p *planner, txn *kv.Txn, stmtTS time.Time, ) { p.txn = txn p.stmt = nil @@ -2080,13 +2080,15 @@ func (ex *connExecutor) resetPlanner( p.semaCtx.Location = &ex.sessionData.DataConversion.Location p.semaCtx.SearchPath = ex.sessionData.SearchPath p.semaCtx.AsOfTimestamp = nil - p.semaCtx.Annotations = tree.MakeAnnotations(numAnnotations) + p.semaCtx.Annotations = nil ex.resetEvalCtx(&p.extendedEvalCtx, txn, stmtTS) p.autoCommit = false p.isPreparing = false p.avoidCachedDescriptors = false + p.discardRows = false + p.collectBundle = false } // txnStateTransitionsApplyWrapper is a wrapper on top of Machine built with the diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 8f1f22ca20c2..94f00af5ba0c 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -180,8 +180,13 @@ func (ex *connExecutor) execStmtInOpenState( } }() - var discardRows bool - var stmtDiagPlan *planTop + p := &ex.planner + stmtTS := ex.server.cfg.Clock.PhysicalTime() + ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes) + ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS) + p.sessionDataMutator.paramStatusUpdater = res + p.noticeSender = res + var shouldCollectDiagnostics bool var diagHelper *stmtDiagnosticsHelper @@ -196,16 +201,19 @@ func (ex *connExecutor) execStmtInOpenState( // returns some text which includes a URL. // TODO(radu): maybe capture some of the rows and include them in the // bundle. - discardRows = true + p.discardRows = true } else { shouldCollectDiagnostics, diagHelper = ex.stmtInfoRegistry.shouldCollectDiagnostics(ctx, stmt.AST) } if shouldCollectDiagnostics { + p.collectBundle = true tr := ex.server.cfg.AmbientCtx.Tracer origCtx := ctx var sp opentracing.Span ctx, sp = tracing.StartSnowballTrace(ctx, tr, "traced statement") + // TODO(radu): consider removing this if/when #46164 is addressed. + p.extendedEvalCtx.Context = ctx defer func() { // Record the statement information that we've collected. // Note that in case of implicit transactions, the trace contains the auto-commit too. @@ -213,13 +221,13 @@ func (ex *connExecutor) execStmtInOpenState( trace := tracing.GetRecording(sp) if diagHelper != nil { - diagHelper.Finish(origCtx, trace, stmtDiagPlan) + diagHelper.Finish(origCtx, trace, &p.curPlan) } else { // Handle EXPLAIN BUNDLE. // If there was a communication error, no point in setting any results. if retErr == nil { retErr = setExplainBundleResult( - origCtx, res, stmt.AST, trace, stmtDiagPlan, ex.server.cfg, + origCtx, res, stmt.AST, trace, &p.curPlan, ex.server.cfg, ) } } @@ -354,20 +362,16 @@ func (ex *connExecutor) execStmtInOpenState( stmt.AnonymizedStr = ps.AnonymizedStr res.ResetStmtType(ps.AST) - discardRows = discardRows || s.DiscardRows + if s.DiscardRows { + p.discardRows = true + } } + p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations) + // For regular statements (the ones that get to this point), we // don't return any event unless an error happens. - p := &ex.planner - stmtTS := ex.server.cfg.Clock.PhysicalTime() - ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes) - ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS, stmt.NumAnnotations) - p.sessionDataMutator.paramStatusUpdater = res - p.noticeSender = res - stmtDiagPlan = &p.curPlan - if os.ImplicitTxn.Get() { asOfTs, err := p.isAsOf(stmt.AST) if err != nil { @@ -457,10 +461,8 @@ func (ex *connExecutor) execStmtInOpenState( p.extendedEvalCtx.Annotations = &p.semaCtx.Annotations ex.phaseTimes[plannerStartExecStmt] = timeutil.Now() p.stmt = &stmt - p.discardRows = discardRows p.cancelChecker = sqlbase.NewCancelChecker(ctx) p.autoCommit = os.ImplicitTxn.Get() && !ex.server.cfg.TestingKnobs.DisableAutoCommit - p.collectBundle = shouldCollectDiagnostics if err := ex.dispatchToExecutionEngine(ctx, p, res); err != nil { return nil, nil, err } @@ -832,13 +834,19 @@ func (ex *connExecutor) execWithDistSQLEngine( var evalCtxFactory func() *extendedEvalContext if len(planner.curPlan.subqueryPlans) != 0 || len(planner.curPlan.postqueryPlans) != 0 { - var evalCtx extendedEvalContext - ex.initEvalCtx(ctx, &evalCtx, planner) + // The factory reuses the same object because the contexts are not used + // concurrently. + var factoryEvalCtx extendedEvalContext + ex.initEvalCtx(ctx, &factoryEvalCtx, planner) evalCtxFactory = func() *extendedEvalContext { - ex.resetEvalCtx(&evalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp) - evalCtx.Placeholders = &planner.semaCtx.Placeholders - evalCtx.Annotations = &planner.semaCtx.Annotations - return &evalCtx + ex.resetEvalCtx(&factoryEvalCtx, planner.txn, planner.ExtendedEvalContext().StmtTimestamp) + factoryEvalCtx.Placeholders = &planner.semaCtx.Placeholders + factoryEvalCtx.Annotations = &planner.semaCtx.Annotations + // Query diagnostics can change the Context; make sure we are using the + // same one. + // TODO(radu): consider removing this if/when #46164 is addressed. + factoryEvalCtx.Context = evalCtx.Context + return &factoryEvalCtx } } @@ -895,7 +903,7 @@ func (ex *connExecutor) beginTransactionTimestampsAndReadMode( } ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes) p := &ex.planner - ex.resetPlanner(ctx, p, nil /* txn */, now.GoTime(), 0 /* numAnnotations */) + ex.resetPlanner(ctx, p, nil /* txn */, now.GoTime()) ts, err := p.EvalAsOfTimestamp(s.Modes.AsOf) if err != nil { return 0, time.Time{}, nil, err diff --git a/pkg/sql/conn_executor_prepare.go b/pkg/sql/conn_executor_prepare.go index a3186855f13b..d741698f857b 100644 --- a/pkg/sql/conn_executor_prepare.go +++ b/pkg/sql/conn_executor_prepare.go @@ -169,8 +169,9 @@ func (ex *connExecutor) prepare( ex.statsCollector.reset(&ex.server.sqlStats, ex.appStats, &ex.phaseTimes) p := &ex.planner - ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */, stmt.NumAnnotations) + ex.resetPlanner(ctx, p, txn, ex.server.cfg.Clock.PhysicalTime() /* stmtTS */) p.stmt = &stmt + p.semaCtx.Annotations = tree.MakeAnnotations(stmt.NumAnnotations) flags, err := ex.populatePrepared(ctx, txn, placeholderHints, p) if err != nil { txn.CleanupOnError(ctx, err)