Skip to content

Commit

Permalink
planner: fix wrong cached plan in cachedPlanExec (#34579)
Browse files Browse the repository at this point in the history
close #32371
  • Loading branch information
qw4990 authored May 12, 2022
1 parent 12ee8f2 commit cd297b9
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 10 deletions.
38 changes: 38 additions & 0 deletions planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,51 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/testkit"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/hint"
"github.com/pingcap/tidb/util/kvcache"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
)

func TestPointGetPreparedPlan4PlanCache(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
orgEnable := core.PreparedPlanCacheEnabled()
defer core.SetPreparedPlanCache(orgEnable)
core.SetPreparedPlanCache(true)
se, err := session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
require.NoError(t, err)
tk1 := testkit.NewTestKitWithSession(t, store, se)
tk1.MustExec("drop database if exists ps_text")
defer tk1.MustExec("drop database if exists ps_text")
tk1.MustExec("create database ps_text")
tk1.MustExec("use ps_text")

tk1.MustExec(`create table t (a int, b int, c int,
primary key k_a(a),
unique key k_b(b))`)
tk1.MustExec("insert into t values (1, 1, 1)")
tk1.MustExec("insert into t values (2, 2, 2)")
tk1.MustExec("insert into t values (3, 3, 3)")

pspk1Id, _, _, err := tk1.Session().PrepareStmt("select * from t where a = ?")
require.NoError(t, err)
tk1.Session().GetSessionVars().PreparedStmts[pspk1Id].(*core.CachedPrepareStmt).PreparedAst.UseCache = false

ctx := context.Background()
// first time plan generated
_, err = tk1.Session().ExecutePreparedStmt(ctx, pspk1Id, []types.Datum{types.NewDatum(0)})
require.NoError(t, err)

// using the generated plan but with different params
_, err = tk1.Session().ExecutePreparedStmt(ctx, pspk1Id, []types.Datum{types.NewDatum(nil)})
require.NoError(t, err)
}

func TestPreparePointGetWithDML(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down
25 changes: 15 additions & 10 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2213,15 +2213,15 @@ func (s *session) preparedStmtExec(ctx context.Context,
return runStmt(ctx, s, st)
}

// cachedPlanExec short path currently ONLY for cached "point select plan" execution
func (s *session) cachedPlanExec(ctx context.Context,
is infoschema.InfoSchema, stmtID uint32, prepareStmt *plannercore.CachedPrepareStmt, replicaReadScope string, args []types.Datum) (sqlexec.RecordSet, error) {
// cachedPointPlanExec is a short path currently ONLY for cached "point select plan" execution
func (s *session) cachedPointPlanExec(ctx context.Context,
is infoschema.InfoSchema, stmtID uint32, prepareStmt *plannercore.CachedPrepareStmt, replicaReadScope string, args []types.Datum) (sqlexec.RecordSet, bool, error) {

prepared := prepareStmt.PreparedAst
// compile ExecStmt
execAst := &ast.ExecuteStmt{ExecID: stmtID}
if err := executor.ResetContextOfStmt(s, execAst); err != nil {
return nil, err
return nil, false, err
}

failpoint.Inject("assertTxnManagerInCachedPlanExec", func() {
Expand All @@ -2234,7 +2234,7 @@ func (s *session) cachedPlanExec(ctx context.Context,
execAst.BinaryArgs = args
execPlan, err := planner.OptimizeExecStmt(ctx, s, execAst, is)
if err != nil {
return nil, err
return nil, false, err
}

stmtCtx := s.GetSessionVars().StmtCtx
Expand Down Expand Up @@ -2272,7 +2272,7 @@ func (s *session) cachedPlanExec(ctx context.Context,

// run ExecStmt
var resultSet sqlexec.RecordSet
switch prepared.CachedPlan.(type) {
switch execPlan.(type) {
case *plannercore.PointGetPlan:
resultSet, err = stmt.PointGet(ctx, is)
s.txn.changeToInvalid()
Expand All @@ -2287,11 +2287,10 @@ func (s *session) cachedPlanExec(ctx context.Context,
}
resultSet, err = runStmt(ctx, s, stmt)
default:
err = errors.Errorf("invalid cached plan type %T", prepared.CachedPlan)
prepared.CachedPlan = nil
return nil, err
return nil, false, nil
}
return resultSet, err
return resultSet, true, err
}

// IsCachedExecOk check if we can execute using plan cached in prepared structure
Expand Down Expand Up @@ -2398,7 +2397,13 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [
}

if ok {
return s.cachedPlanExec(ctx, txnManager.GetTxnInfoSchema(), stmtID, preparedStmt, replicaReadScope, args)
rs, ok, err := s.cachedPointPlanExec(ctx, txnManager.GetTxnInfoSchema(), stmtID, preparedStmt, replicaReadScope, args)
if err != nil {
return nil, err
}
if ok { // fallback to preparedStmtExec if we cannot get a valid point select plan in cachedPointPlanExec
return rs, nil
}
}
return s.preparedStmtExec(ctx, txnManager.GetTxnInfoSchema(), snapshotTS, stmtID, preparedStmt, replicaReadScope, args)
}
Expand Down

0 comments on commit cd297b9

Please sign in to comment.