Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix PointGet will return an stale value when tidb_enable_plan_replayer_capture is set #40197

Merged
merged 6 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"fmt"
"math"
"runtime/trace"
"strconv"
"strings"
Expand Down Expand Up @@ -295,8 +296,12 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
}
a.Ctx.GetSessionVars().StmtCtx.Priority = kv.PriorityHigh

var pointExecutor *PointGetExecutor
useMaxTS := startTs == math.MaxUint64

// try to reuse point get executor
if a.PsStmt.Executor != nil {
// We should only use the cached the executor when the startTS is MaxUint64
if a.PsStmt.Executor != nil && useMaxTS {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
exec, ok := a.PsStmt.Executor.(*PointGetExecutor)
if !ok {
logutil.Logger(ctx).Error("invalid executor type, not PointGetExecutor for point get path")
Expand All @@ -306,17 +311,21 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan)
a.PsStmt.Executor = exec
pointExecutor = exec
}
}
if a.PsStmt.Executor == nil {

if pointExecutor == nil {
b := newExecutorBuilder(a.Ctx, a.InfoSchema, a.Ti)
newExecutor := b.build(a.Plan)
pointExecutor = b.build(a.Plan).(*PointGetExecutor)
if b.err != nil {
return nil, b.err
}
a.PsStmt.Executor = newExecutor

if useMaxTS {
a.PsStmt.Executor = pointExecutor
}
}
pointExecutor := a.PsStmt.Executor.(*PointGetExecutor)

if err = pointExecutor.Open(ctx); err != nil {
terror.Call(pointExecutor.Close)
Expand Down
5 changes: 5 additions & 0 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
}

if err = sessiontxn.OptimizeWithPlanAndThenWarmUp(c.Ctx, stmt.Plan); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why move the OptimizeWithPlanAndThenWarmUp here from ExecuteStmt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If IsPlanReplayerCaptureEnabled is true, it will invoke GetReadStmt and activate the txn. Moving OptimizeWithPlanAndThenWarmUp here is to keep the pointget still be optimized before it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to add some comments about this constraint here or the possible implicit activation on the interface GetStmtReadTS and GetStmtForUpdateTS.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments

return nil, err
}

if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
Expand Down
17 changes: 17 additions & 0 deletions executor/point_get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,3 +825,20 @@ func TestPointGetIssue25167(t *testing.T) {
tk.MustExec("insert into t values (1)")
tk.MustQuery("select * from t as of timestamp @a where a = 1").Check(testkit.Rows())
}

func TestPointGetIssue40194(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1(id int primary key, v int)")
tk.MustExec("insert into t1 values(1, 10)")
tk.MustExec("prepare s from 'select * from t1 where id=1'")
tk.MustExec("set @@tidb_enable_plan_replayer_capture=1")
tk.MustQuery("execute s").Check(testkit.Rows("1 10"))
tk.MustQuery("execute s").Check(testkit.Rows("1 10"))

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("update t1 set v=v+1")
tk.MustQuery("execute s").Check(testkit.Rows("1 11"))
}
11 changes: 7 additions & 4 deletions planner/core/plan_cache_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,10 +405,13 @@ func NewPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*model.Ta

// PlanCacheStmt store prepared ast from PrepareExec and other related fields
type PlanCacheStmt struct {
PreparedAst *ast.Prepared
StmtDB string // which DB the statement will be processed over
VisitInfos []visitInfo
ColumnInfos interface{}
PreparedAst *ast.Prepared
StmtDB string // which DB the statement will be processed over
VisitInfos []visitInfo
ColumnInfos interface{}
// Executor is only used for point get scene.
// Notice that we should only cache the PointGetExecutor that have a snapshot with MaxTS in it.
// If the current plan is not PointGet or does not use MaxTS optimization, this value should be nil here.
Executor interface{}
NormalizedSQL string
NormalizedPlan string
Expand Down
4 changes: 0 additions & 4 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2180,10 +2180,6 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex
// Transform abstract syntax tree to a physical plan(stored in executor.ExecStmt).
compiler := executor.Compiler{Ctx: s}
stmt, err := compiler.Compile(ctx, stmtNode)
if err == nil {
err = sessiontxn.OptimizeWithPlanAndThenWarmUp(s, stmt.Plan)
}

if err != nil {
s.rollbackOnError(ctx)

Expand Down
4 changes: 2 additions & 2 deletions sessiontxn/isolation/optimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (p *OptimisticTxnContextProvider) GetStmtForUpdateTS() (uint64, error) {
// AdviseOptimizeWithPlan providers optimization according to the plan
// It will use MaxTS as the startTS in autocommit txn for some plans.
func (p *OptimisticTxnContextProvider) AdviseOptimizeWithPlan(plan interface{}) (err error) {
if p.optimizeWithMaxTS || p.isTidbSnapshotEnabled() || p.isBeginStmtWithStaleRead() {
if p.optimizeWithMaxTS || p.isTidbSnapshotEnabled() || p.isBeginStmtWithStaleRead() || p.txn != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add this condition?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AdviseOptimizeWithPlan is an optimization and should not have too many strong assumptions. If the txn is already active, it's better to keep the statement goes well without error.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more clear to use a method like IsActive to check whether it's already activated compared with p.txn != nil, maybe we could do some improvements in later PRs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments added

return nil
}

Expand Down Expand Up @@ -141,7 +141,7 @@ func (p *OptimisticTxnContextProvider) AdviseOptimizeWithPlan(plan interface{})
zap.Uint64("conn", sessVars.ConnectionID),
zap.String("text", sessVars.StmtCtx.OriginalSQL),
)
return nil
return err
}

p.optimizeWithMaxTS = true
Expand Down