Skip to content

Commit

Permalink
*: fix PointGet will return an stale value when `tidb_enable_plan_rep…
Browse files Browse the repository at this point in the history
…layer_capture` is set (#40197)

close #40194
  • Loading branch information
lcwangchao authored Dec 28, 2022
1 parent 11f5c17 commit b268c65
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 14 deletions.
19 changes: 14 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"runtime/trace"
"strconv"
"strings"
Expand Down Expand Up @@ -295,8 +296,12 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
}
a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh

var pointExecutor *PointGetExecutor
useMaxTS := startTs == math.MaxUint64

// try to reuse point get executor
if a.PsStmt.Executor != nil {
// We should only use the cached the executor when the startTS is MaxUint64
if a.PsStmt.Executor != nil && useMaxTS {
exec, ok := a.PsStmt.Executor.(*PointGetExecutor)
if !ok {
logutil.Logger(ctx).Error("invalid executor type, not PointGetExecutor for point get path")
Expand All @@ -306,17 +311,21 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan)
a.PsStmt.Executor = exec
pointExecutor = exec
}
}
if a.PsStmt.Executor == nil {

if pointExecutor == nil {
b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti)
newExecutor := b.build(a.Plan)
pointExecutor = b.build(a.Plan).(*PointGetExecutor)
if b.err != nil {
return nil, b.err
}
a.PsStmt.Executor = newExecutor

if useMaxTS {
a.PsStmt.Executor = pointExecutor
}
}
pointExecutor := a.PsStmt.Executor.(*PointGetExecutor)

if err = pointExecutor.Open(ctx); err != nil {
terror.Call(pointExecutor.Close)
Expand Down
5 changes: 5 additions & 0 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
}

if err = sessiontxn.OptimizeWithPlanAndThenWarmUp(c.Ctx, stmt.Plan); err != nil {
return nil, err
}

if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,3 +825,20 @@ func TestPointGetIssue25167(t *testing.T) {
tk.MustExec("insert into t values (1)")
tk.MustQuery("select * from t as of timestamp @a where a = 1").Check(testkit.Rows())
}

func TestPointGetIssue40194(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1(id int primary key, v int)")
tk.MustExec("insert into t1 values(1, 10)")
tk.MustExec("prepare s from 'select * from t1 where id=1'")
tk.MustExec("set @@tidb_enable_plan_replayer_capture=1")
tk.MustQuery("execute s").Check(testkit.Rows("1 10"))
tk.MustQuery("execute s").Check(testkit.Rows("1 10"))

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("update t1 set v=v+1")
tk.MustQuery("execute s").Check(testkit.Rows("1 11"))
}
11 changes: 7 additions & 4 deletions planner/core/plan_cache_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,13 @@ func NewPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*model.Ta

// PlanCacheStmt store prepared ast from PrepareExec and other related fields
type PlanCacheStmt struct {
PreparedAst *ast.Prepared
StmtDB string // which DB the statement will be processed over
VisitInfos []visitInfo
ColumnInfos interface{}
PreparedAst *ast.Prepared
StmtDB string // which DB the statement will be processed over
VisitInfos []visitInfo
ColumnInfos interface{}
// Executor is only used for point get scene.
// Notice that we should only cache the PointGetExecutor that have a snapshot with MaxTS in it.
// If the current plan is not PointGet or does not use MaxTS optimization, this value should be nil here.
Executor interface{}
NormalizedSQL string
NormalizedPlan string
Expand Down
4 changes: 0 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2180,10 +2180,6 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
// Transform abstract syntax tree to a physical plan(stored in executor.ExecStmt).
compiler := executor.Compiler{Ctx: s}
stmt, err := compiler.Compile(ctx, stmtNode)
if err == nil {
err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, stmt.Plan)
}

if err != nil {
s.rollbackOnError(ctx)

Expand Down
2 changes: 2 additions & 0 deletions sessiontxn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,10 @@ type TxnManager interface {
// GetReadReplicaScope returns the read replica scope
GetReadReplicaScope() string
// GetStmtReadTS returns the read timestamp used by select statement (not for select ... for update)
// Calling this method will activate the txn implicitly if current read is not stale/historical read
GetStmtReadTS() (uint64, error)
// GetStmtForUpdateTS returns the read timestamp used by update/insert/delete or select ... for update
// Calling this method will activate the txn implicitly if current read is not stale/historical read
GetStmtForUpdateTS() (uint64, error)
// GetContextProvider returns the current TxnContextProvider
GetContextProvider() TxnContextProvider
Expand Down
8 changes: 7 additions & 1 deletion sessiontxn/isolation/optimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ func (p *OptimisticTxnContextProvider) AdviseOptimizeWithPlan(plan interface{})
return nil
}

if p.txn != nil {
// `p.txn != nil` means the txn has already been activated, we should not optimize the startTS because the startTS
// has already been used.
return nil
}

realPlan, ok := plan.(plannercore.Plan)
if !ok {
return nil
Expand Down Expand Up @@ -141,7 +147,7 @@ func (p *OptimisticTxnContextProvider) AdviseOptimizeWithPlan(plan interface{})
zap.Uint64("conn", sessVars.ConnectionID),
zap.String("text", sessVars.StmtCtx.OriginalSQL),
)
return nil
return err
}

p.optimizeWithMaxTS = true
Expand Down

0 comments on commit b268c65

Please sign in to comment.