From 58c53122c11323d7c877dc46c574bccf1257d9a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E8=B6=85?= Date: Wed, 28 Dec 2022 16:00:17 +0800 Subject: [PATCH] This is an automated cherry-pick of #40197 Signed-off-by: ti-chi-bot --- executor/adapter.go | 19 ++++++++++++++----- executor/compiler.go | 18 ++++++++++++++++++ executor/point_get_test.go | 17 +++++++++++++++++ planner/core/plan_cache_utils.go | 11 +++++++---- session/session.go | 4 ---- sessiontxn/interface.go | 2 ++ sessiontxn/isolation/optimistic.go | 8 +++++++- 7 files changed, 65 insertions(+), 14 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 61a10f9807b74..33782e33be9fa 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -18,6 +18,7 @@ import ( "bytes" "context" "fmt" + "math" "runtime/trace" "strconv" "strings" @@ -295,8 +296,12 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) { } a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh + var pointExecutor *PointGetExecutor + useMaxTS := startTs == math.MaxUint64 + // try to reuse point get executor - if a.PsStmt.Executor != nil { + // We should only use the cached the executor when the startTS is MaxUint64 + if a.PsStmt.Executor != nil && useMaxTS { exec, ok := a.PsStmt.Executor.(*PointGetExecutor) if !ok { logutil.Logger(ctx).Error("invalid executor type, not PointGetExecutor for point get path") @@ -306,17 +311,21 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) { pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan) exec.Init(pointGetPlan) a.PsStmt.Executor = exec + pointExecutor = exec } } - if a.PsStmt.Executor == nil { + + if pointExecutor == nil { b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti) - newExecutor := b.build(a.Plan) + pointExecutor = b.build(a.Plan).(*PointGetExecutor) if b.err != nil { return nil, b.err } - a.PsStmt.Executor = newExecutor + + if useMaxTS { + a.PsStmt.Executor = pointExecutor + } } - pointExecutor := a.PsStmt.Executor.(*PointGetExecutor) if err = pointExecutor.Open(ctx); err != nil { terror.Call(pointExecutor.Close) diff --git a/executor/compiler.go b/executor/compiler.go index 6d62b6e3272cf..6ec74443952ed 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -156,8 +156,26 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS } } } +<<<<<<< HEAD if c.Ctx.GetSessionVars().EnablePlanReplayerCapture && !c.Ctx.GetSessionVars().InRestrictedSQL { checkPlanReplayerCaptureTask(c.Ctx, stmtNode) +======= + + if err = sessiontxn.OptimizeWithPlanAndThenWarmUp(c.Ctx, stmt.Plan); err != nil { + return nil, err + } + + if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL { + startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS() + if err != nil { + return nil, err + } + if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture { + checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS) + } else { + checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS) + } +>>>>>>> b268c65710b (*: fix PointGet will return an stale value when `tidb_enable_plan_replayer_capture` is set (#40197)) } return stmt, nil diff --git a/executor/point_get_test.go b/executor/point_get_test.go index c615c3a75cb1a..8f13675457481 100644 --- a/executor/point_get_test.go +++ b/executor/point_get_test.go @@ -825,3 +825,20 @@ func TestPointGetIssue25167(t *testing.T) { tk.MustExec("insert into t values (1)") tk.MustQuery("select * from t as of timestamp @a where a = 1").Check(testkit.Rows()) } + +func TestPointGetIssue40194(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t1(id int primary key, v int)") + tk.MustExec("insert into t1 values(1, 10)") + tk.MustExec("prepare s from 'select * from t1 where id=1'") + tk.MustExec("set @@tidb_enable_plan_replayer_capture=1") + tk.MustQuery("execute s").Check(testkit.Rows("1 10")) + tk.MustQuery("execute s").Check(testkit.Rows("1 10")) + + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk2.MustExec("update t1 set v=v+1") + tk.MustQuery("execute s").Check(testkit.Rows("1 11")) +} diff --git a/planner/core/plan_cache_utils.go b/planner/core/plan_cache_utils.go index 4229e2b134f06..5431f7ef71c27 100644 --- a/planner/core/plan_cache_utils.go +++ b/planner/core/plan_cache_utils.go @@ -414,10 +414,13 @@ func NewPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*model.Ta // PlanCacheStmt store prepared ast from PrepareExec and other related fields type PlanCacheStmt struct { - PreparedAst *ast.Prepared - StmtDB string // which DB the statement will be processed over - VisitInfos []visitInfo - ColumnInfos interface{} + PreparedAst *ast.Prepared + StmtDB string // which DB the statement will be processed over + VisitInfos []visitInfo + ColumnInfos interface{} + // Executor is only used for point get scene. + // Notice that we should only cache the PointGetExecutor that have a snapshot with MaxTS in it. + // If the current plan is not PointGet or does not use MaxTS optimization, this value should be nil here. Executor interface{} NormalizedSQL string NormalizedPlan string diff --git a/session/session.go b/session/session.go index 57fdf69f87288..db47606345983 100644 --- a/session/session.go +++ b/session/session.go @@ -2187,10 +2187,6 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex // Transform abstract syntax tree to a physical plan(stored in executor.ExecStmt). compiler := executor.Compiler{Ctx: s} stmt, err := compiler.Compile(ctx, stmtNode) - if err == nil { - err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, stmt.Plan) - } - if err != nil { s.rollbackOnError(ctx) diff --git a/sessiontxn/interface.go b/sessiontxn/interface.go index 85d9217a90c0f..d91f1c291b588 100644 --- a/sessiontxn/interface.go +++ b/sessiontxn/interface.go @@ -160,8 +160,10 @@ type TxnManager interface { // GetReadReplicaScope returns the read replica scope GetReadReplicaScope() string // GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update) + // Calling this method will activate the txn implicitly if current read is not stale/historical read GetStmtReadTS() (uint64, error) // GetStmtForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update + // Calling this method will activate the txn implicitly if current read is not stale/historical read GetStmtForUpdateTS() (uint64, error) // GetContextProvider returns the current TxnContextProvider GetContextProvider() TxnContextProvider diff --git a/sessiontxn/isolation/optimistic.go b/sessiontxn/isolation/optimistic.go index 3c60eba09331b..9a1f8d58aabbd 100644 --- a/sessiontxn/isolation/optimistic.go +++ b/sessiontxn/isolation/optimistic.go @@ -114,6 +114,12 @@ func (p *OptimisticTxnContextProvider) AdviseOptimizeWithPlan(plan interface{}) return nil } + if p.txn != nil { + // `p.txn != nil` means the txn has already been activated, we should not optimize the startTS because the startTS + // has already been used. + return nil + } + realPlan, ok := plan.(plannercore.Plan) if !ok { return nil @@ -141,7 +147,7 @@ func (p *OptimisticTxnContextProvider) AdviseOptimizeWithPlan(plan interface{}) zap.Uint64("conn", sessVars.ConnectionID), zap.String("text", sessVars.StmtCtx.OriginalSQL), ) - return nil + return err } p.optimizeWithMaxTS = true