Skip to content

Commit

Permalink
This is an automated cherry-pick of #52373
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
qw4990 authored and ti-chi-bot committed May 31, 2024
1 parent 39ea2b3 commit 17f2ca7
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 41 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
a.PsStmt.Executor = nil
} else {
// CachedPlan type is already checked in last step
pointGetPlan := a.PsStmt.PreparedAst.CachedPlan.(*plannercore.PointGetPlan)
pointGetPlan := a.PsStmt.CachedPlan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan)
a.PsStmt.Executor = exec
pointExecutor = exec
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
stmt.PsStmt = preparedObj
} else {
// invalid the previous cached point plan
preparedObj.PreparedAst.CachedPlan = nil
preparedObj.CachedPlan = nil
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,15 @@ func (e *DeallocateExec) Next(context.Context, *chunk.Chunk) error {
if !ok {
return errors.Errorf("invalid PlanCacheStmt type")
}
prepared := preparedObj.PreparedAst
delete(vars.PreparedStmtNameToID, e.Name)
if e.Ctx().GetSessionVars().EnablePreparedPlanCache {
<<<<<<< HEAD
bindSQL, _ := plannercore.GetBindSQL4PlanCache(e.Ctx(), preparedObj)
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, prepared.SchemaVersion,
=======
bindSQL, _ := bindinfo.MatchSQLBindingForPlanCache(e.Ctx(), preparedObj.PreparedAst.Stmt, &preparedObj.BindingInfo)
cacheKey, err := plannercore.NewPlanCacheKey(vars, preparedObj.StmtText, preparedObj.StmtDB, preparedObj.SchemaVersion,
>>>>>>> 62d6f4737bf (planner: move fields from ast.Prepared to planner.PlanCacheStmt (#52373))
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
if err != nil {
return err
Expand Down
8 changes: 2 additions & 6 deletions pkg/parser/ast/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,12 +552,8 @@ func (n *DeallocateStmt) Accept(v Visitor) (Node, bool) {

// Prepared represents a prepared statement.
type Prepared struct {
Stmt StmtNode
StmtType string
Params []ParamMarkerExpr
SchemaVersion int64
CachedPlan interface{}
CachedNames interface{}
Stmt StmtNode
StmtType string
}

// ExecuteStmt is a statement to execute PreparedStmt.
Expand Down
52 changes: 32 additions & 20 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,31 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
vars.StmtCtx.StmtType = stmtAst.StmtType

// step 1: check parameter number
<<<<<<< HEAD
if len(stmtAst.Params) != len(params) {
return errors.Trace(ErrWrongParamCount)
}

// step 2: set parameter values
if err := SetParameterValuesIntoSCtx(sctx, isNonPrepared, stmtAst.Params, params); err != nil {
=======
if len(stmt.Params) != len(params) {
return errors.Trace(plannererrors.ErrWrongParamCount)
}

// step 2: set parameter values
if err := SetParameterValuesIntoSCtx(sctx.GetPlanCtx(), isNonPrepared, stmt.Params, params); err != nil {
>>>>>>> 62d6f4737bf (planner: move fields from ast.Prepared to planner.PlanCacheStmt (#52373))
return errors.Trace(err)
}

// step 3: check schema version
if stmtAst.SchemaVersion != is.SchemaMetaVersion() {
if stmt.SchemaVersion != is.SchemaMetaVersion() {
// In order to avoid some correctness issues, we have to clear the
// cached plan once the schema version is changed.
// Cached plan in prepared struct does NOT have a "cache key" with
// schema version like prepared plan cache key
stmtAst.CachedPlan = nil
stmt.CachedPlan = nil
stmt.Executor = nil
stmt.ColumnInfos = nil
// If the schema version has changed we need to preprocess it again,
Expand All @@ -122,7 +131,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
if err != nil {
return ErrSchemaChanged.GenWithStack("Schema change caused error: %s", err.Error())
}
stmtAst.SchemaVersion = is.SchemaMetaVersion()
stmt.SchemaVersion = is.SchemaMetaVersion()
}

// step 4: handle expiration
Expand All @@ -133,7 +142,7 @@ func planCachePreprocess(ctx context.Context, sctx sessionctx.Context, isNonPrep
expiredTimeStamp4PC := domain.GetDomain(sctx).ExpiredTimeStamp4PC()
if stmt.StmtCacheable && expiredTimeStamp4PC.Compare(vars.LastUpdateTime4PC) > 0 {
sctx.GetSessionPlanCache().DeleteAll()
stmtAst.CachedPlan = nil
stmt.CachedPlan = nil
vars.LastUpdateTime4PC = expiredTimeStamp4PC
}
return nil
Expand All @@ -153,7 +162,6 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
var cacheKey kvcache.Key
sessVars := sctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
stmtAst := stmt.PreparedAst
cacheEnabled := false
if isNonPrepared {
stmtCtx.CacheType = stmtctx.SessionNonPrepared
Expand Down Expand Up @@ -188,13 +196,13 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context,
latestSchemaVersion = domain.GetDomain(sctx).InfoSchema().SchemaMetaVersion()
}
if cacheKey, err = NewPlanCacheKey(sctx.GetSessionVars(), stmt.StmtText,
stmt.StmtDB, stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
stmt.StmtDB, stmt.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
return nil, nil, err
}
}

if stmtCtx.UseCache && stmtAst.CachedPlan != nil { // special code path for fast point plan
if plan, names, ok, err := getCachedPointPlan(stmtAst, sessVars, stmtCtx); ok {
if stmtCtx.UseCache && stmt.CachedPlan != nil { // special code path for fast point plan
if plan, names, ok, err := getCachedPointPlan(stmt, sessVars, stmtCtx); ok {
return plan, names, err
}
}
Expand Down Expand Up @@ -232,7 +240,7 @@ func parseParamTypes(sctx sessionctx.Context, params []expression.Expression) (p
return
}

func getCachedPointPlan(stmt *ast.Prepared, sessVars *variable.SessionVars, stmtCtx *stmtctx.StatementContext) (Plan,
func getCachedPointPlan(stmt *PlanCacheStmt, sessVars *variable.SessionVars, stmtCtx *stmtctx.StatementContext) (Plan,
[]*types.FieldName, bool, error) {
// short path for point-get plans
// Rewriting the expression in the select.where condition will convert its
Expand Down Expand Up @@ -333,7 +341,7 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared
if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmtAst.Stmt, sessVars) {
delete(sessVars.IsolationReadEngines, kv.TiFlash)
if cacheKey, err = NewPlanCacheKey(sessVars, stmt.StmtText, stmt.StmtDB,
stmtAst.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
stmt.SchemaVersion, latestSchemaVersion, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load()); err != nil {
return nil, nil, err
}
sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{}
Expand Down Expand Up @@ -762,9 +770,8 @@ func tryCachePointPlan(_ context.Context, sctx sessionctx.Context,
return nil
}
var (
stmtAst = stmt.PreparedAst
ok bool
err error
ok bool
err error
)

if plan, _ok := p.(*PointGetPlan); _ok {
Expand All @@ -779,8 +786,8 @@ func tryCachePointPlan(_ context.Context, sctx sessionctx.Context,

if ok {
// just cache point plan now
stmtAst.CachedPlan = p
stmtAst.CachedNames = names
stmt.CachedPlan = p
stmt.CachedNames = names
stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p)
sctx.GetSessionVars().StmtCtx.SetPlan(p)
sctx.GetSessionVars().StmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest)
Expand Down Expand Up @@ -825,33 +832,38 @@ func GetBindSQL4PlanCache(sctx sessionctx.Context, stmt *PlanCacheStmt) (string,
// IsPointPlanShortPathOK check if we can execute using plan cached in prepared structure
// Be careful with the short path, current precondition is ths cached plan satisfying
// IsPointGetWithPKOrUniqueKeyByAutoCommit
<<<<<<< HEAD
func IsPointPlanShortPathOK(sctx sessionctx.Context, is infoschema.InfoSchema, stmt *PlanCacheStmt) (bool, error) {
stmtAst := stmt.PreparedAst
if stmtAst.CachedPlan == nil || staleread.IsStmtStaleness(sctx) {
=======
func IsPointGetPlanShortPathOK(sctx sessionctx.Context, is infoschema.InfoSchema, stmt *PlanCacheStmt) (bool, error) {
if stmt.CachedPlan == nil || staleread.IsStmtStaleness(sctx) {
>>>>>>> 62d6f4737bf (planner: move fields from ast.Prepared to planner.PlanCacheStmt (#52373))
return false, nil
}
// check auto commit
if !IsAutoCommitTxn(sctx) {
return false, nil
}
if stmtAst.SchemaVersion != is.SchemaMetaVersion() {
stmtAst.CachedPlan = nil
if stmt.SchemaVersion != is.SchemaMetaVersion() {
stmt.CachedPlan = nil
stmt.ColumnInfos = nil
return false, nil
}
// maybe we'd better check cached plan type here, current
// only point select/update will be cached, see "getPhysicalPlan" func
var ok bool
var err error
switch stmtAst.CachedPlan.(type) {
switch stmt.CachedPlan.(type) {
case *PointGetPlan:
ok = true
case *Update:
pointUpdate := stmtAst.CachedPlan.(*Update)
pointUpdate := stmt.CachedPlan.(*Update)
_, ok = pointUpdate.SelectPlan.(*PointGetPlan)
if !ok {
err = errors.Errorf("cached update plan not point update")
stmtAst.CachedPlan = nil
stmt.CachedPlan = nil
return false, err
}
default:
Expand Down
22 changes: 16 additions & 6 deletions pkg/planner/core/plan_cache_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,8 @@ func GeneratePlanCacheStmtWithAST(ctx context.Context, sctx sessionctx.Context,
}

prepared := &ast.Prepared{
Stmt: paramStmt,
StmtType: ast.GetStmtLabel(paramStmt),
Params: extractor.markers,
SchemaVersion: ret.InfoSchema.SchemaMetaVersion(),
Stmt: paramStmt,
StmtType: ast.GetStmtLabel(paramStmt),
}
normalizedSQL, digest := parser.NormalizeDigest(prepared.Stmt.Text())

Expand Down Expand Up @@ -160,8 +158,8 @@ func GeneratePlanCacheStmtWithAST(ctx context.Context, sctx sessionctx.Context,
// parameters are unknown here, so regard them all as NULL.
// For non-prepared statements, all parameters are already initialized at `ParameterizeAST`, so no need to set NULL.
if isPrepStmt {
for i := range prepared.Params {
param := prepared.Params[i].(*driver.ParamMarkerExpr)
for i := range extractor.markers {
param := extractor.markers[i].(*driver.ParamMarkerExpr)
param.Datum.SetNull()
param.InExecute = false
}
Expand Down Expand Up @@ -191,6 +189,8 @@ func GeneratePlanCacheStmtWithAST(ctx context.Context, sctx sessionctx.Context,
StmtCacheable: cacheable,
UncacheableReason: reason,
QueryFeatures: features,
SchemaVersion: ret.InfoSchema.SchemaMetaVersion(),
Params: extractor.markers,
}
if err = CheckPreparedPriv(sctx, preparedObj, ret.InfoSchema); err != nil {
return nil, nil, 0, err
Expand Down Expand Up @@ -443,12 +443,22 @@ type PlanCacheStmt struct {
PreparedAst *ast.Prepared
StmtDB string // which DB the statement will be processed over
VisitInfos []visitInfo
<<<<<<< HEAD
ColumnInfos interface{}
=======
ColumnInfos any
Params []ast.ParamMarkerExpr
>>>>>>> 62d6f4737bf (planner: move fields from ast.Prepared to planner.PlanCacheStmt (#52373))
// Executor is only used for point get scene.
// Notice that we should only cache the PointGetExecutor that have a snapshot with MaxTS in it.
// If the current plan is not PointGet or does not use MaxTS optimization, this value should be nil here.
Executor interface{}

// below fields are for PointGet short path
SchemaVersion int64
CachedPlan any
CachedNames any

StmtCacheable bool // Whether this stmt is cacheable.
UncacheableReason string // Why this stmt is uncacheable.
QueryFeatures *PlanCacheQueryFeatures
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func (ts *TiDBStatement) Close() error {
}
bindSQL, _ := core.GetBindSQL4PlanCache(ts.ctx, preparedObj)
cacheKey, err := core.NewPlanCacheKey(ts.ctx.GetSessionVars(), preparedObj.StmtText, preparedObj.StmtDB,
preparedObj.PreparedAst.SchemaVersion, 0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
preparedObj.SchemaVersion, 0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
if err != nil {
return err
}
Expand Down
14 changes: 9 additions & 5 deletions pkg/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,21 @@ func (s *session) cleanRetryInfo() {
planCacheEnabled := s.GetSessionVars().EnablePreparedPlanCache
var cacheKey kvcache.Key
var err error
var preparedAst *ast.Prepared
var preparedObj *plannercore.PlanCacheStmt
var stmtText, stmtDB string
if planCacheEnabled {
firstStmtID := retryInfo.DroppedPreparedStmtIDs[0]
if preparedPointer, ok := s.sessionVars.PreparedStmts[firstStmtID]; ok {
preparedObj, ok := preparedPointer.(*plannercore.PlanCacheStmt)
preparedObj, ok = preparedPointer.(*plannercore.PlanCacheStmt)
if ok {
preparedAst = preparedObj.PreparedAst
stmtText, stmtDB = preparedObj.StmtText, preparedObj.StmtDB
<<<<<<< HEAD
bindSQL, _ := plannercore.GetBindSQL4PlanCache(s, preparedObj)
cacheKey, err = plannercore.NewPlanCacheKey(s.sessionVars, stmtText, stmtDB, preparedAst.SchemaVersion,
=======
bindSQL, _ := bindinfo.MatchSQLBindingForPlanCache(s.pctx, preparedObj.PreparedAst.Stmt, &preparedObj.BindingInfo)
cacheKey, err = plannercore.NewPlanCacheKey(s.sessionVars, stmtText, stmtDB, preparedObj.SchemaVersion,
>>>>>>> 62d6f4737bf (planner: move fields from ast.Prepared to planner.PlanCacheStmt (#52373))
0, bindSQL, expression.ExprPushDownBlackListReloadTimeStamp.Load())
if err != nil {
logutil.Logger(s.currentCtx).Warn("clean cached plan failed", zap.Error(err))
Expand All @@ -362,8 +366,8 @@ func (s *session) cleanRetryInfo() {
}
for i, stmtID := range retryInfo.DroppedPreparedStmtIDs {
if planCacheEnabled {
if i > 0 && preparedAst != nil {
plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtText, preparedAst.SchemaVersion, s.sessionVars.IsolationReadEngines)
if i > 0 && preparedObj != nil {
plannercore.SetPstmtIDSchemaVersion(cacheKey, stmtText, preparedObj.SchemaVersion, s.sessionVars.IsolationReadEngines)
}
if !s.sessionVars.IgnorePreparedCacheCloseStmt { // keep the plan in cache
s.GetSessionPlanCache().Delete(cacheKey)
Expand Down

0 comments on commit 17f2ca7

Please sign in to comment.