Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: fix the issue that some PointGet plans generated in physical-stage cannot be cached #28478

Merged
merged 17 commits into from
Oct 9, 2021
10 changes: 4 additions & 6 deletions executor/explainfor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,10 +509,9 @@ func (s *testPrepareSerialSuite) TestPointGetUserVarPlanCache(c *C) {
tkProcess := tk.Se.ShowProcess()
ps := []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
// t2 should use PointGet.
rows := tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows()
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[3][0]), "Point_Get"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[3][3]), "table:t2"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[4][0]), "IndexRangeScan"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[4][3]), "table:t2"), IsTrue)

tk.MustExec("set @a=2")
tk.MustQuery("execute stmt using @a").Check(testkit.Rows(
Expand All @@ -521,10 +520,9 @@ func (s *testPrepareSerialSuite) TestPointGetUserVarPlanCache(c *C) {
tkProcess = tk.Se.ShowProcess()
ps = []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
// t2 should use PointGet, range is changed to [2,2].
rows = tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows()
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[3][0]), "Point_Get"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[3][3]), "table:t2"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[4][0]), "IndexRangeScan"), IsTrue)
c.Assert(strings.Contains(fmt.Sprintf("%v", rows[4][3]), "table:t2"), IsTrue)
tk.MustQuery("execute stmt using @a").Check(testkit.Rows(
"2 4 2 2",
))
Expand Down
2 changes: 1 addition & 1 deletion executor/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ func (s *testSerialSuite) TestPlanCacheClusterIndex(c *C) {
ps = []*util.ProcessInfo{tkProcess}
tk.Se.SetSessionManager(&mockSessionManager1{PS: ps})
rows = tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows()
c.Assert(strings.Index(rows[1][0].(string), `Point_Get`), Equals, 6)
c.Assert(strings.Contains(rows[3][0].(string), `TableRangeScan`), IsTrue)

// case 3:
tk.MustExec(`drop table if exists ta, tb`)
Expand Down
4 changes: 2 additions & 2 deletions executor/seqtest/prepared_serial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,14 +523,14 @@ func TestPreparedInsert(t *testing.T) {
err = counter.Write(pb)
require.NoError(t, err)
hit := pb.GetCounter().GetValue()
require.Equal(t, float64(2), hit)
require.Equal(t, float64(3), hit)
}
tk.MustExec(`set @a=3; execute stmt_insert_select using @a;`)
if flag {
err = counter.Write(pb)
require.NoError(t, err)
hit := pb.GetCounter().GetValue()
require.Equal(t, float64(2), hit)
require.Equal(t, float64(4), hit)
}

result = tk.MustQuery("select id, c1 from prepare_test where id = ?", 101)
Expand Down
2 changes: 1 addition & 1 deletion expression/expression.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,7 @@ func SplitDNFItems(onExpr Expression) []Expression {
// If the Expression is a non-constant value, it means the result is unknown.
func EvaluateExprWithNull(ctx sessionctx.Context, schema *Schema, expr Expression) Expression {
if MaybeOverOptimized4PlanCache(ctx, []Expression{expr}) {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
ctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache = true
return expr
}
return evaluateExprWithNull(ctx, schema, expr)
}
Expand Down
26 changes: 26 additions & 0 deletions expression/expression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,32 @@ func (s *testEvaluatorSuite) TestEvaluateExprWithNull(c *C) {
c.Assert(res.Equal(s.ctx, NewOne()), IsTrue)
}

func (s *testEvaluatorSerialSuites) TestEvaluateExprWithNullAndParameters(c *C) {
tblInfo := newTestTableBuilder("").add("col0", mysql.TypeLonglong, 0).build()
schema := tableInfoToSchemaForTest(tblInfo)
col0 := schema.Columns[0]

defer func(original bool) {
s.ctx.GetSessionVars().StmtCtx.UseCache = original
}(s.ctx.GetSessionVars().StmtCtx.UseCache)
s.ctx.GetSessionVars().StmtCtx.UseCache = true

// cases for parameters
ltWithoutParam, err := newFunctionForTest(s.ctx, ast.LT, col0, NewOne())
c.Assert(err, IsNil)
res := EvaluateExprWithNull(s.ctx, schema, ltWithoutParam)
c.Assert(res.Equal(s.ctx, NewNull()), IsTrue) // the expression is evaluated to null

param := NewOne()
param.ParamMarker = &ParamMarker{ctx: s.ctx, order: 0}
s.ctx.GetSessionVars().PreparedParams = append(s.ctx.GetSessionVars().PreparedParams, types.NewIntDatum(10))
ltWithParam, err := newFunctionForTest(s.ctx, ast.LT, col0, param)
c.Assert(err, IsNil)
res = EvaluateExprWithNull(s.ctx, schema, ltWithParam)
_, isScalarFunc := res.(*ScalarFunction)
c.Assert(isScalarFunc, IsTrue) // the expression with parameters is not evaluated
}

func (s *testEvaluatorSuite) TestConstant(c *C) {
sc := &stmtctx.StatementContext{TimeZone: time.Local}
c.Assert(NewZero().IsCorrelated(), IsFalse)
Expand Down
5 changes: 4 additions & 1 deletion planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,10 @@ func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter
p: dual,
}, cntPlan, nil
}
canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV && ds.isPointGetConvertableSchema()
canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV && ds.isPointGetConvertableSchema() &&
// to avoid the over-optimized risk, do not generate PointGet for plan cache, for example,
// `pk>=$a and pk<=$b` can be optimized to a PointGet when `$a==$b`, but it can cause wrong results when `$a!=$b`.
!ds.ctx.GetSessionVars().StmtCtx.UseCache
if canConvertPointGet && !path.IsIntHandlePath {
// We simply do not build [batch] point get for prefix indexes. This can be optimized.
canConvertPointGet = path.Index.Unique && !path.Index.HasPrefixIndex()
Expand Down
104 changes: 84 additions & 20 deletions planner/core/prepare_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,48 @@ func (s *testPrepareSerialSuite) TestPrepareTableAsNameOnGroupByWithCache(c *C)
tk.MustQuery("execute stmt").Sort().Check(testkit.Rows("partner1", "partner2", "partner3", "partner4"))
}

func (s *testPrepareSerialSuite) TestPrepareCachePointGetInsert(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
err = store.Close()
c.Assert(err, IsNil)
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)
tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t1 (a int, b int, primary key(a))")
tk.MustExec("insert into t1 values (1, 1), (2, 2), (3, 3)")

tk.MustExec("create table t2 (a int, b int, primary key(a))")
tk.MustExec(`prepare stmt1 from "insert into t2 select * from t1 where a=?"`)

tk.MustExec("set @a=1")
tk.MustExec("execute stmt1 using @a")
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("0"))
tk.MustQuery("select * from t2 order by a").Check(testkit.Rows("1 1"))

tk.MustExec("set @a=2")
tk.MustExec("execute stmt1 using @a")
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
tk.MustQuery("select * from t2 order by a").Check(testkit.Rows("1 1", "2 2"))

tk.MustExec("set @a=3")
tk.MustExec("execute stmt1 using @a")
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
tk.MustQuery("select * from t2 order by a").Check(testkit.Rows("1 1", "2 2", "3 3"))
}

// nolint:unused
func readGaugeInt(g prometheus.Gauge) int {
ch := make(chan prometheus.Metric, 1)
Expand Down Expand Up @@ -1102,76 +1144,98 @@ func (s *testPlanSerialSuite) TestPlanCachePointGetAndTableDual(c *C) {
tk.MustExec("insert into t0 values('0000','7777',1)")
tk.MustExec("prepare s0 from 'select * from t0 where c1=? and c2>=? and c2<=?'")
tk.MustExec("set @a0='0000', @b0='9999'")
// TableDual plan would be built, we should not cache it.
// TableDual is forbidden for plan-cache, a TableReader be built and cached.
tk.MustQuery("execute s0 using @a0, @b0, @a0").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous TableDual plan.
tk.MustQuery("execute s0 using @a0, @a0, @b0").Check(testkit.Rows("0000 7777 1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("create table t1(c1 varchar(20), c2 varchar(20), c3 bigint(20), primary key(c1, c2))")
tk.MustExec("insert into t1 values('0000','7777',1)")
tk.MustExec("prepare s1 from 'select * from t1 where c1=? and c2>=? and c2<=?'")
tk.MustExec("set @a1='0000', @b1='9999'")
// PointGet plan would be built, we should not cache it.
// IndexLookup plan would be built, we should cache it.
tk.MustQuery("execute s1 using @a1, @b1, @b1").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous PointGet plan.
tk.MustQuery("execute s1 using @a1, @a1, @b1").Check(testkit.Rows("0000 7777 1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("create table t2(c1 bigint(20) primary key, c2 varchar(20))")
tk.MustExec("insert into t2 values(1,'7777')")
tk.MustExec("prepare s2 from 'select * from t2 where c1>=? and c1<=?'")
tk.MustExec("set @a2=0, @b2=9")
// PointGet plan would be built, we should not cache it.
// TableReader plan would be built, we should cache it.
tk.MustQuery("execute s2 using @a2, @a2").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous PointGet plan.
tk.MustQuery("execute s2 using @a2, @b2").Check(testkit.Rows("1 7777"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("create table t3(c1 int, c2 int, c3 int, unique key(c1), key(c2))")
tk.MustExec("insert into t3 values(2,1,1)")
tk.MustExec("prepare s3 from 'select /*+ use_index_merge(t3) */ * from t3 where (c1 >= ? and c1 <= ?) or c2 > 1'")
tk.MustExec("set @a3=1,@b3=3")
// PointGet partial plan would be built, we should not cache it.
// TableReader plan would be built, we should cache it.
tk.MustQuery("execute s3 using @a3,@a3").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous IndexMerge with partial PointGet plan.
tk.MustQuery("execute s3 using @a3,@b3").Check(testkit.Rows("2 1 1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("prepare s3 from 'select /*+ use_index_merge(t3) */ * from t3 where (c1 >= ? and c1 <= ?) or c2 > 1'")
tk.MustExec("set @a3=1,@b3=3")
// TableDual partial plan would be built, we should not cache it.
// TableReader plan would be built, we should cache it.
tk.MustQuery("execute s3 using @b3,@a3").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous IndexMerge with partial TableDual plan.
tk.MustQuery("execute s3 using @a3,@b3").Check(testkit.Rows("2 1 1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))

tk.MustExec("create table t4(c1 int primary key, c2 int, c3 int, key(c2))")
tk.MustExec("insert into t4 values(2,1,1)")
tk.MustExec("prepare s4 from 'select /*+ use_index_merge(t4) */ * from t4 where (c1 >= ? and c1 <= ?) or c2 > 1'")
tk.MustExec("set @a4=1,@b4=3")
// PointGet partial plan would be built, we should not cache it.
// IndexMerge plan would be built, we should not cache it.
tk.MustQuery("execute s4 using @a4,@a4").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous IndexMerge with partial PointGet plan.
tk.MustQuery("execute s4 using @a4,@b4").Check(testkit.Rows("2 1 1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))

tk.MustExec("prepare s4 from 'select /*+ use_index_merge(t4) */ * from t4 where (c1 >= ? and c1 <= ?) or c2 > 1'")
tk.MustExec("set @a4=1,@b4=3")
// TableDual partial plan would be built, we should not cache it.
// IndexMerge plan would be built, we should not cache it.
tk.MustQuery("execute s4 using @b4,@a4").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
// Must not reuse the previous IndexMerge with partial TableDual plan.
tk.MustQuery("execute s4 using @a4,@b4").Check(testkit.Rows("2 1 1"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
}

func (s *testPrepareSuite) TestIssue26873(c *C) {
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
tk := testkit.NewTestKit(c, store)
orgEnable := core.PreparedPlanCacheEnabled()
defer func() {
dom.Close()
c.Assert(store.Close(), IsNil)
core.SetPreparedPlanCache(orgEnable)
}()
core.SetPreparedPlanCache(true)

tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{
PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64),
})
c.Assert(err, IsNil)

tk.MustExec("use test")
tk.MustExec("drop table if exists t")

tk.MustExec("create table t(a int primary key, b int, c int)")
tk.MustExec("prepare stmt from 'select * from t where a = 2 or a = ?'")
tk.MustExec("set @p = 3")
tk.MustQuery("execute stmt using @p").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("execute stmt using @p").Check(testkit.Rows())
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
}

func (s *testPlanSerialSuite) TestIssue23671(c *C) {
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -1199,7 +1263,7 @@ func (s *testPlanSerialSuite) TestIssue23671(c *C) {
tk.MustQuery("execute s1 using @a, @b, @c").Check(testkit.Rows("1 1"))
tk.MustExec("set @a=1, @b=1, @c=10")
tk.MustQuery("execute s1 using @a, @b, @c").Check(testkit.Rows("1 1", "2 2"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0"))
tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1"))
}

func (s *testPlanSerialSuite) TestPartitionTable(c *C) {
Expand Down
2 changes: 0 additions & 2 deletions planner/core/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,6 @@ func (ds *DataSource) derivePathStatsAndTryHeuristics() error {
if selected != nil {
ds.possibleAccessPaths[0] = selected
ds.possibleAccessPaths = ds.possibleAccessPaths[:1]
// TODO: Can we make a more careful check on whether the optimization depends on mutable constants?
ds.ctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache = true
Comment on lines -346 to -347
Copy link
Contributor

@Reminiscent Reminiscent Oct 8, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the test cases from #26873.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new test case TestIssue26873 is added.

if ds.ctx.GetSessionVars().StmtCtx.InVerboseExplain {
var tableName string
if ds.TableAsName.O == "" {
Expand Down
13 changes: 8 additions & 5 deletions util/ranger/detacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,8 @@ func ExtractEqAndInCondition(sctx sessionctx.Context, conditions []expression.Ex
points[offset] = rb.build(accesses[offset])
}
points[offset] = rb.intersection(points[offset], rb.build(cond))
// Early termination if false expression found
if len(points[offset]) == 0 {
if len(points[offset]) == 0 && // Early termination if false expression found
!expression.MaybeOverOptimized4PlanCache(sctx, conditions) { // cannot return an empty-range for plan-cache since parameters may change later
return nil, nil, nil, nil, true
}
}
Expand All @@ -554,8 +554,8 @@ func ExtractEqAndInCondition(sctx sessionctx.Context, conditions []expression.Ex
if points[i] == nil {
// There exists an interval whose length is larger than 0
accesses[i] = nil
} else if len(points[i]) == 0 {
// Early termination if false expression found
} else if len(points[i]) == 0 && // Early termination if false expression found
!expression.MaybeOverOptimized4PlanCache(sctx, conditions) { // cannot return an empty-range for plan-cache since parameters may change later
return nil, nil, nil, nil, true
} else {
// All Intervals are single points
Expand All @@ -566,7 +566,10 @@ func ExtractEqAndInCondition(sctx sessionctx.Context, conditions []expression.Ex
// Maybe we can improve it later.
columnValues[i] = &valueInfo{mutable: true}
}
sctx.GetSessionVars().StmtCtx.MaybeOverOptimized4PlanCache = true
if expression.MaybeOverOptimized4PlanCache(sctx, conditions) {
// TODO: optimize it more elaborately, e.g. return [2 3, 2 3] as accesses for 'where a = 2 and b = 3 and c >= ? and c <= ?'
return nil, conditions, nil, nil, false
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
for i, offset := range offsets {
Expand Down