diff --git a/DEPS.bzl b/DEPS.bzl index 442749f514a5e..39b7c9e484d56 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3582,8 +3582,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:RI6bs9TDIIJ96N0lR5uZoGO8QNot4qS/1l+Mobx0InM=", - version = "v2.0.5-0.20230110071533-f313ddf58d73", + sum = "h1:B2FNmPDaGirXpIOgQbqxiukIkT8eOT4tKEahqYE2ers=", + version = "v2.0.5-0.20230112062023-fe5b35c5f5dc", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/ddl/column.go b/ddl/column.go index 9893d6528038b..833dd7a631f94 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -552,6 +552,10 @@ func (w *worker) onModifyColumn(d *ddlCtx, t *meta.Meta, job *model.Job) (ver in job.State = model.JobStateCancelled return ver, errors.Trace(err) } + if tblInfo.Partition != nil { + job.State = model.JobStateCancelled + return ver, errors.Trace(dbterror.ErrUnsupportedModifyColumn.GenWithStackByArgs("table is partition table")) + } changingCol := modifyInfo.changingCol if changingCol == nil { diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index c61eeaf885aa6..c6f50fcf71874 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -20,6 +20,7 @@ import ( "fmt" "math/rand" "strings" + "sync" "sync/atomic" "testing" "time" @@ -4528,6 +4529,43 @@ func TestPartitionTableWithAnsiQuotes(t *testing.T) { ` PARTITION "pMax" VALUES LESS THAN (MAXVALUE,MAXVALUE))`)) } +func TestIssue40135Ver2(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomain(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + + tk1 := testkit.NewTestKit(t, store) + tk1.MustExec("use test") + + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec("use test") + + tk.MustExec("CREATE TABLE t40135 ( a int DEFAULT NULL, b varchar(32) DEFAULT 'md', index(a)) PARTITION BY HASH (a) PARTITIONS 6") + tk.MustExec("insert into t40135 values (1, 'md'), (2, 'ma'), (3, 'md'), (4, 'ma'), (5, 'md'), (6, 'ma')") + one := true + hook := &ddl.TestDDLCallback{Do: dom} + var checkErr error + var wg sync.WaitGroup + wg.Add(1) + hook.OnJobRunBeforeExported = func(job *model.Job) { + if job.SchemaState == model.StateDeleteOnly { + tk3.MustExec("delete from t40135 where a = 1") + } + if one { + one = false + go func() { + _, checkErr = tk1.Exec("alter table t40135 modify column a int NULL") + wg.Done() + }() + } + } + dom.DDL().SetHook(hook) + tk.MustExec("alter table t40135 modify column a bigint NULL DEFAULT '6243108' FIRST") + wg.Wait() + require.ErrorContains(t, checkErr, "[ddl:8200]Unsupported modify column: table is partition table") + tk.MustExec("admin check table t40135") +} + func TestAlterModifyPartitionColTruncateWarning(t *testing.T) { t.Skip("waiting for supporting Modify Partition Column again") store := testkit.CreateMockStore(t) diff --git a/ddl/fktest/foreign_key_test.go b/ddl/fktest/foreign_key_test.go index df461fa048e5c..489e125dc8a3c 100644 --- a/ddl/fktest/foreign_key_test.go +++ b/ddl/fktest/foreign_key_test.go @@ -1810,3 +1810,28 @@ func TestForeignKeyAndConcurrentDDL(t *testing.T) { } } } + +func TestForeignKeyAndRenameIndex(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@foreign_key_checks=1;") + tk.MustExec("use test") + tk.MustExec("create table t1 (id int key, b int, index idx1(b));") + tk.MustExec("create table t2 (id int key, b int, constraint fk foreign key (b) references t1(b));") + tk.MustExec("insert into t1 values (1,1),(2,2)") + tk.MustExec("insert into t2 values (1,1),(2,2)") + tk.MustGetDBError("insert into t2 values (3,3)", plannercore.ErrNoReferencedRow2) + tk.MustGetDBError("delete from t1 where id=1", plannercore.ErrRowIsReferenced2) + tk.MustExec("alter table t1 rename index idx1 to idx2") + tk.MustExec("alter table t2 rename index fk to idx") + tk.MustGetDBError("insert into t2 values (3,3)", plannercore.ErrNoReferencedRow2) + tk.MustGetDBError("delete from t1 where id=1", plannercore.ErrRowIsReferenced2) + tk.MustExec("alter table t2 drop foreign key fk") + tk.MustExec("alter table t2 add foreign key fk (b) references t1(b) on delete cascade on update cascade") + tk.MustExec("alter table t1 rename index idx2 to idx3") + tk.MustExec("alter table t2 rename index idx to idx0") + tk.MustExec("delete from t1 where id=1") + tk.MustQuery("select * from t1").Check(testkit.Rows("2 2")) + tk.MustQuery("select * from t2").Check(testkit.Rows("2 2")) + tk.MustExec("admin check table t1,t2") +} diff --git a/executor/executor.go b/executor/executor.go index 81afc5620daca..c2fcdaa2d7887 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -1393,6 +1393,9 @@ type LimitExec struct { // columnIdxsUsedByChild keep column indexes of child executor used for inline projection columnIdxsUsedByChild []int + + // Log the close time when opentracing is enabled. + span opentracing.Span } // Next implements the Executor Next interface. @@ -1470,13 +1473,29 @@ func (e *LimitExec) Open(ctx context.Context) error { e.childResult = tryNewCacheChunk(e.children[0]) e.cursor = 0 e.meetFirstBatch = e.begin == 0 + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + e.span = span + } return nil } // Close implements the Executor Close interface. func (e *LimitExec) Close() error { + start := time.Now() + e.childResult = nil - return e.baseExecutor.Close() + err := e.baseExecutor.Close() + + elapsed := time.Since(start) + if elapsed > time.Millisecond { + logutil.BgLogger().Info("limit executor close takes a long time", + zap.Duration("elapsed", elapsed)) + if e.span != nil { + span1 := e.span.Tracer().StartSpan("limitExec.Close", opentracing.ChildOf(e.span.Context()), opentracing.StartTime(start)) + defer span1.Finish() + } + } + return err } func (e *LimitExec) adjustRequiredRows(chk *chunk.Chunk) *chunk.Chunk { diff --git a/executor/executor_test.go b/executor/executor_test.go index a3b2ba0b221ab..6a454bb85a9ea 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -5556,6 +5556,8 @@ func TestAdmin(t *testing.T) { })) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") tk.MustExec("drop table if exists admin_test") tk.MustExec("create table admin_test (c1 int, c2 int, c3 int default 1, index (c1))") tk.MustExec("insert admin_test (c1) values (1),(2),(NULL)") @@ -5680,7 +5682,7 @@ func TestAdmin(t *testing.T) { // check that the result set has no duplication defer wg.Done() for i := 0; i < 10; i++ { - result := tk.MustQuery(`admin show ddl job queries 20`) + result := tk2.MustQuery(`admin show ddl job queries 20`) rows := result.Rows() rowIDs := make(map[string]struct{}) for _, row := range rows { @@ -5711,7 +5713,7 @@ func TestAdmin(t *testing.T) { // check that the result set has no duplication defer wg2.Done() for i := 0; i < 10; i++ { - result := tk.MustQuery(`admin show ddl job queries limit 3 offset 2`) + result := tk2.MustQuery(`admin show ddl job queries limit 3 offset 2`) rows := result.Rows() rowIDs := make(map[string]struct{}) for _, row := range rows { diff --git a/executor/fktest/foreign_key_test.go b/executor/fktest/foreign_key_test.go index 670c273b4cb1c..fb29d391aaf09 100644 --- a/executor/fktest/foreign_key_test.go +++ b/executor/fktest/foreign_key_test.go @@ -2745,3 +2745,97 @@ func TestForeignKeyMetaInKeyColumnUsage(t *testing.T) { "INFORMATION_SCHEMA.KEY_COLUMN_USAGE where CONSTRAINT_SCHEMA='test' and TABLE_NAME='t2' and REFERENCED_TABLE_SCHEMA is not null and REFERENCED_COLUMN_NAME is not null;"). Check(testkit.Rows("fk test t2 a test t1 a", "fk test t2 b test t1 b")) } + +func TestForeignKeyAndGeneratedColumn(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + // Test foreign key with parent column is virtual generated column. + tk.MustExec("create table t1 (a int, b int as (a+1) virtual, index(b));") + tk.MustGetErrMsg("create table t2 (a int, b int, constraint fk foreign key(b) references t1(b));", "[schema:3733]Foreign key 'fk' uses virtual column 'b' which is not supported.") + // Test foreign key with child column is virtual generated column. + tk.MustExec("drop table t1") + tk.MustExec("create table t1 (a int key);") + tk.MustGetErrMsg("create table t2 (a int, c int as (a+1) virtual, constraint fk foreign key(c) references t1(a));", "[schema:3733]Foreign key 'fk' uses virtual column 'c' which is not supported.") + // Test foreign key with parent column is stored generated column. + tk.MustExec("drop table if exists t1,t2") + tk.MustExec("create table t1 (a int, b int as (a) stored, index(b));") + tk.MustExec("create table t2 (a int, b int, constraint fk foreign key(b) references t1(b) on delete cascade on update cascade);") + tk.MustExec("insert into t1 (a) values (1),(2)") + tk.MustExec("insert into t2 (a) values (1),(2)") + tk.MustExec("update t2 set b=a") + tk.MustExec("insert into t2 values (1,1),(2,2)") + tk.MustGetDBError("insert into t2 values (3,3)", plannercore.ErrNoReferencedRow2) + tk.MustQuery("select * from t2 order by a").Check(testkit.Rows("1 1", "1 1", "2 2", "2 2")) + tk.MustExec("update t1 set a=a+10 where a=1") + tk.MustQuery("select * from t1 order by a").Check(testkit.Rows("2 2", "11 11")) + tk.MustQuery("select * from t2 order by a").Check(testkit.Rows("1 11", "1 11", "2 2", "2 2")) + tk.MustExec("delete from t1 where a=2") + tk.MustQuery("select * from t1 order by a").Check(testkit.Rows("11 11")) + tk.MustQuery("select * from t2 order by a").Check(testkit.Rows("1 11", "1 11")) + // Test foreign key with parent and child column is stored generated column. + tk.MustExec("drop table if exists t1,t2") + tk.MustExec("create table t1 (a int, b int as (a) stored, index(b));") + tk.MustGetErrMsg("create table t2 (a int, b int as (a) stored, constraint fk foreign key(b) references t1(b) on update cascade);", "[ddl:3104]Cannot define foreign key with ON UPDATE CASCADE clause on a generated column.") + tk.MustGetErrMsg("create table t2 (a int, b int as (a) stored, constraint fk foreign key(b) references t1(b) on delete set null);", "[ddl:3104]Cannot define foreign key with ON DELETE SET NULL clause on a generated column.") + tk.MustExec("create table t2 (a int, b int as (a) stored, constraint fk foreign key(b) references t1(b));") + tk.MustExec("insert into t1 (a) values (1),(2)") + tk.MustExec("insert into t2 (a) values (1),(2)") + tk.MustGetDBError("insert into t2 (a) values (3)", plannercore.ErrNoReferencedRow2) + tk.MustQuery("select * from t2 order by a").Check(testkit.Rows("1 1", "2 2")) + tk.MustGetDBError("delete from t1 where b=1", plannercore.ErrRowIsReferenced2) + tk.MustGetDBError("update t1 set a=a+10 where a=1", plannercore.ErrRowIsReferenced2) + tk.MustExec("alter table t2 drop foreign key fk") + tk.MustExec("alter table t2 add foreign key fk (b) references t1(b) on delete cascade") + tk.MustExec("delete from t1 where a=1") + tk.MustQuery("select * from t1 order by a").Check(testkit.Rows("2 2")) + tk.MustQuery("select * from t2 order by a").Check(testkit.Rows("2 2")) +} + +func TestForeignKeyAndExpressionIndex(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + tk.MustExec("create table t1 (a int, b int, index idx1 (b), index idx2 ((b*2)));") + tk.MustExec("create table t2 (a int, b int, index((b*2)), constraint fk foreign key(b) references t1(b));") + tk.MustExec("insert into t1 values (1,1),(2,2)") + tk.MustExec("insert into t2 values (1,1),(2,2)") + tk.MustGetDBError("insert into t2 values (3,3)", plannercore.ErrNoReferencedRow2) + tk.MustGetDBError("update t1 set b=b+10 where b=1", plannercore.ErrRowIsReferenced2) + tk.MustGetDBError("delete from t1 where b=1", plannercore.ErrRowIsReferenced2) + tk.MustGetErrMsg("alter table t1 drop index idx1", "[ddl:1553]Cannot drop index 'idx1': needed in a foreign key constraint") + tk.MustGetErrMsg("alter table t2 drop index fk", "[ddl:1553]Cannot drop index 'fk': needed in a foreign key constraint") + tk.MustExec("alter table t2 drop foreign key fk") + tk.MustExec("alter table t2 add foreign key fk (b) references t1(b) on delete set null on update cascade") + tk.MustExec("update t1 set b=b+10 where b=1") + tk.MustExec("delete from t1 where b=2") + tk.MustQuery("select * from t1 order by a").Check(testkit.Rows("1 11")) + tk.MustQuery("select * from t2 order by a").Check(testkit.Rows("1 11", "2 ")) + tk.MustExec("admin check table t1") + tk.MustExec("admin check table t2") +} + +func TestForeignKeyAndMultiValuedIndex(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("set @@foreign_key_checks=1") + tk.MustExec("use test") + tk.MustExec("create table t1 (id int primary key, a json, b int generated always as (a->'$.id') stored, index idx1(b), index idx2((cast(a ->'$.data' as signed array))))") + tk.MustExec("create table t2 (id int, b int, constraint fk foreign key(b) references t1(b));") + tk.MustExec(`insert into t1 (id, a) values (1, '{"id": "1", "data": [1,11,111]}')`) + tk.MustExec(`insert into t1 (id, a) values (2, '{"id": "2", "data": [2,22,222]}')`) + tk.MustExec("insert into t2 values (1,1),(2,2)") + tk.MustGetDBError("insert into t2 values (3,3)", plannercore.ErrNoReferencedRow2) + tk.MustGetDBError(`update t1 set a='{"id": "10", "data": [1,11,111]}' where id=1`, plannercore.ErrRowIsReferenced2) + tk.MustGetDBError(`delete from t1 where id=1`, plannercore.ErrRowIsReferenced2) + tk.MustExec("alter table t2 drop foreign key fk") + tk.MustExec("alter table t2 add foreign key fk (b) references t1(b) on delete set null on update cascade") + tk.MustExec(`update t1 set a='{"id": "10", "data": [1,11,111]}' where id=1`) + tk.MustExec(`delete from t1 where id=2`) + tk.MustQuery("select id,b from t1 order by id").Check(testkit.Rows("1 10")) + tk.MustQuery("select id,b from t2 order by id").Check(testkit.Rows("1 10", "2 ")) + tk.MustExec("admin check table t1") + tk.MustExec("admin check table t2") +} diff --git a/go.mod b/go.mod index 65440ab9d18cb..2e6cbf201b043 100644 --- a/go.mod +++ b/go.mod @@ -90,7 +90,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73 + github.com/tikv/client-go/v2 v2.0.5-0.20230112062023-fe5b35c5f5dc github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index e34b4c5935340..a75624ea224ee 100644 --- a/go.sum +++ b/go.sum @@ -936,8 +936,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73 h1:RI6bs9TDIIJ96N0lR5uZoGO8QNot4qS/1l+Mobx0InM= -github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73/go.mod h1:dO/2a/xi/EO3eVv9xN5G1VFtd/hythzgTeeCbW5SWuI= +github.com/tikv/client-go/v2 v2.0.5-0.20230112062023-fe5b35c5f5dc h1:B2FNmPDaGirXpIOgQbqxiukIkT8eOT4tKEahqYE2ers= +github.com/tikv/client-go/v2 v2.0.5-0.20230112062023-fe5b35c5f5dc/go.mod h1:dO/2a/xi/EO3eVv9xN5G1VFtd/hythzgTeeCbW5SWuI= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= diff --git a/planner/core/encode.go b/planner/core/encode.go index 14931d4d1ef0a..bf02059dafdda 100644 --- a/planner/core/encode.go +++ b/planner/core/encode.go @@ -262,6 +262,9 @@ type planDigester struct { // NormalizeFlatPlan normalizes a FlatPhysicalPlan and generates plan digest. func NormalizeFlatPlan(flat *FlatPhysicalPlan) (normalized string, digest *parser.Digest) { + if flat == nil { + return "", parser.NewDigest(nil) + } selectPlan, selectPlanOffset := flat.Main.GetSelectPlan() if len(selectPlan) == 0 || !selectPlan[0].IsPhysicalPlan { return "", parser.NewDigest(nil) diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index 867541b97fb99..c2d3e1a62d7bc 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -1136,6 +1136,7 @@ func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, c cop.tablePlan = ts cop.idxMergePartPlans = scans cop.idxMergeIsIntersection = path.IndexMergeIsIntersection + cop.idxMergeAccessMVIndex = path.IndexMergeAccessMVIndex if remainingFilters != nil { cop.rootTaskConds = remainingFilters } diff --git a/planner/core/indexmerge_path.go b/planner/core/indexmerge_path.go index 1b384aef1fd02..22c5e75cdd023 100644 --- a/planner/core/indexmerge_path.go +++ b/planner/core/indexmerge_path.go @@ -618,7 +618,7 @@ func (ds *DataSource) generateIndexMerge4MVIndex(normalPathCnt int, filters []ex // buildPartialPathUp4MVIndex builds these partial paths up to a complete index merge path. func (ds *DataSource) buildPartialPathUp4MVIndex(partialPaths []*util.AccessPath, isIntersection bool, remainingFilters []expression.Expression) *util.AccessPath { - indexMergePath := &util.AccessPath{PartialIndexPaths: partialPaths} + indexMergePath := &util.AccessPath{PartialIndexPaths: partialPaths, IndexMergeAccessMVIndex: true} indexMergePath.IndexMergeIsIntersection = isIntersection indexMergePath.TableFilters = remainingFilters diff --git a/planner/core/indexmerge_path_test.go b/planner/core/indexmerge_path_test.go index ecb0d60a395e4..1119cfb5c666e 100644 --- a/planner/core/indexmerge_path_test.go +++ b/planner/core/indexmerge_path_test.go @@ -136,3 +136,16 @@ index i_int((cast(j->'$.int' as signed array))))`) result.Check(testkit.Rows(output[i].Plan...)) } } + +func TestMVIndexIndexMergePlanCache(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`create table t(j json, index kj((cast(j as signed array))))`) + + tk.MustExec("prepare st from 'select /*+ use_index_merge(t, kj) */ * from t where (1 member of (j))'") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: query accesses generated columns is un-cacheable")) + tk.MustExec("execute st") + tk.MustExec("execute st") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) +} diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index 38ba8df4e37b9..2d73534fc2e1e 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -2017,7 +2017,7 @@ func getUintFromNode(ctx sessionctx.Context, n ast.Node, mustInt64orUint64 bool) return 0, false, true } if mustInt64orUint64 { - if expected := checkParamTypeInt64orUint64(v); !expected { + if expected, _ := CheckParamTypeInt64orUint64(v); !expected { return 0, false, false } } @@ -2054,19 +2054,19 @@ func getUintFromNode(ctx sessionctx.Context, n ast.Node, mustInt64orUint64 bool) return 0, false, false } -// check param type for plan cache limit, only allow int64 and uint64 now +// CheckParamTypeInt64orUint64 check param type for plan cache limit, only allow int64 and uint64 now // eg: set @a = 1; -func checkParamTypeInt64orUint64(param *driver.ParamMarkerExpr) bool { +func CheckParamTypeInt64orUint64(param *driver.ParamMarkerExpr) (bool, uint64) { val := param.GetValue() switch v := val.(type) { case int64: if v >= 0 { - return true + return true, uint64(v) } case uint64: - return true + return true, v } - return false + return false, 0 } func extractLimitCountOffset(ctx sessionctx.Context, limit *ast.Limit) (count uint64, diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index 9ea2fdf89c006..a5db249fd2be1 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -393,7 +393,6 @@ func postOptimize(ctx context.Context, sctx sessionctx.Context, plan PhysicalPla plan = eliminateUnionScanAndLock(sctx, plan) plan = enableParallelApply(sctx, plan) handleFineGrainedShuffle(ctx, sctx, plan) - checkPlanCacheable(sctx, plan) propagateProbeParents(plan, nil) countStarRewrite(plan) return plan, nil @@ -966,16 +965,6 @@ func setupFineGrainedShuffleInternal(ctx context.Context, sctx sessionctx.Contex } } -// checkPlanCacheable used to check whether a plan can be cached. Plans that -// meet the following characteristics cannot be cached: -// 1. Use the TiFlash engine. -// Todo: make more careful check here. -func checkPlanCacheable(sctx sessionctx.Context, plan PhysicalPlan) { - if sctx.GetSessionVars().StmtCtx.UseCache && useTiFlash(plan) { - sctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: TiFlash plan is un-cacheable")) - } -} - // propagateProbeParents doesn't affect the execution plan, it only sets the probeParents field of a PhysicalPlan. // It's for handling the inconsistency between row count in the statsInfo and the recorded actual row count. Please // see comments in PhysicalPlan for details. @@ -1012,26 +1001,6 @@ func propagateProbeParents(plan PhysicalPlan, probeParents []PhysicalPlan) { } } -// useTiFlash used to check whether the plan use the TiFlash engine. -func useTiFlash(p PhysicalPlan) bool { - switch x := p.(type) { - case *PhysicalTableReader: - switch x.StoreType { - case kv.TiFlash: - return true - default: - return false - } - default: - if len(p.Children()) > 0 { - for _, plan := range p.Children() { - return useTiFlash(plan) - } - } - } - return false -} - func enableParallelApply(sctx sessionctx.Context, plan PhysicalPlan) PhysicalPlan { if !sctx.GetSessionVars().EnableParallelApply { return plan diff --git a/planner/core/physical_plans.go b/planner/core/physical_plans.go index 537b0826eb381..02d01aad5db4c 100644 --- a/planner/core/physical_plans.go +++ b/planner/core/physical_plans.go @@ -551,6 +551,8 @@ type PhysicalIndexMergeReader struct { // IsIntersectionType means whether it's intersection type or union type. // Intersection type is for expressions connected by `AND` and union type is for `OR`. IsIntersectionType bool + // AccessMVIndex indicates whether this IndexMergeReader access a MVIndex. + AccessMVIndex bool // PartialPlans flats the partialPlans to construct executor pb. PartialPlans [][]PhysicalPlan diff --git a/planner/core/plan_cache.go b/planner/core/plan_cache.go index 677b9afe4e29d..cbe16438ba8c3 100644 --- a/planner/core/plan_cache.go +++ b/planner/core/plan_cache.go @@ -127,7 +127,7 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context, sessVars := sctx.GetSessionVars() stmtCtx := sessVars.StmtCtx stmtAst := stmt.PreparedAst - stmtCtx.UseCache = stmt.StmtCacheable + stmtCtx.UseCache = true if !stmt.StmtCacheable { stmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: %s", stmt.UncacheableReason)) } @@ -158,27 +158,29 @@ func GetPlanFromSessionPlanCache(ctx context.Context, sctx sessionctx.Context, } } - paramNum, paramTypes := parseParamTypes(sctx, params) + paramTypes := parseParamTypes(sctx, params) if stmtCtx.UseCache && stmtAst.CachedPlan != nil { // for point query plan if plan, names, ok, err := getCachedPointPlan(stmtAst, sessVars, stmtCtx); ok { return plan, names, err } } - + limitCountAndOffset, paramErr := ExtractLimitFromAst(stmt.PreparedAst.Stmt, sctx) + if paramErr != nil { + return nil, nil, paramErr + } if stmtCtx.UseCache { // for non-point plans if plan, names, ok, err := getCachedPlan(sctx, isNonPrepared, cacheKey, bindSQL, is, stmt, - paramTypes); err != nil || ok { + paramTypes, limitCountAndOffset); err != nil || ok { return plan, names, err } } - return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, latestSchemaVersion, paramNum, paramTypes, bindSQL) + return generateNewPlan(ctx, sctx, isNonPrepared, is, stmt, cacheKey, latestSchemaVersion, paramTypes, bindSQL, limitCountAndOffset) } // parseParamTypes get parameters' types in PREPARE statement -func parseParamTypes(sctx sessionctx.Context, params []expression.Expression) (paramNum int, paramTypes []*types.FieldType) { - paramNum = len(params) +func parseParamTypes(sctx sessionctx.Context, params []expression.Expression) (paramTypes []*types.FieldType) { for _, param := range params { if c, ok := param.(*expression.Constant); ok { // from binary protocol paramTypes = append(paramTypes, c.GetType()) @@ -221,12 +223,12 @@ func getCachedPointPlan(stmt *ast.Prepared, sessVars *variable.SessionVars, stmt } func getCachedPlan(sctx sessionctx.Context, isNonPrepared bool, cacheKey kvcache.Key, bindSQL string, - is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType) (Plan, + is infoschema.InfoSchema, stmt *PlanCacheStmt, paramTypes []*types.FieldType, limitParams []uint64) (Plan, []*types.FieldName, bool, error) { sessVars := sctx.GetSessionVars() stmtCtx := sessVars.StmtCtx - candidate, exist := sctx.GetPlanCache(isNonPrepared).Get(cacheKey, paramTypes) + candidate, exist := sctx.GetPlanCache(isNonPrepared).Get(cacheKey, paramTypes, limitParams) if !exist { return nil, nil, false, nil } @@ -264,8 +266,9 @@ func getCachedPlan(sctx sessionctx.Context, isNonPrepared bool, cacheKey kvcache // generateNewPlan call the optimizer to generate a new plan for current statement // and try to add it to cache -func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema, stmt *PlanCacheStmt, cacheKey kvcache.Key, latestSchemaVersion int64, paramNum int, - paramTypes []*types.FieldType, bindSQL string) (Plan, []*types.FieldName, error) { +func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared bool, is infoschema.InfoSchema, + stmt *PlanCacheStmt, cacheKey kvcache.Key, latestSchemaVersion int64, paramTypes []*types.FieldType, + bindSQL string, limitParams []uint64) (Plan, []*types.FieldName, error) { stmtAst := stmt.PreparedAst sessVars := sctx.GetSessionVars() stmtCtx := sessVars.StmtCtx @@ -282,10 +285,12 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared return nil, nil, err } - // We only cache the tableDual plan when the number of parameters are zero. - if containTableDual(p) && paramNum > 0 { - stmtCtx.SetSkipPlanCache(errors.New("skip plan-cache: get a TableDual plan")) + // check whether this plan is cacheable. + if stmtCtx.UseCache { + checkPlanCacheability(sctx, p, len(paramTypes)) } + + // put this plan into the plan cache. if stmtCtx.UseCache { // rebuild key to exclude kv.TiFlash when stmt is not read only if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmtAst.Stmt, sessVars) { @@ -296,16 +301,54 @@ func generateNewPlan(ctx context.Context, sctx sessionctx.Context, isNonPrepared } sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{} } - cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, paramTypes) + cached := NewPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, paramTypes, limitParams) stmt.NormalizedPlan, stmt.PlanDigest = NormalizePlan(p) stmtCtx.SetPlan(p) stmtCtx.SetPlanDigest(stmt.NormalizedPlan, stmt.PlanDigest) - sctx.GetPlanCache(isNonPrepared).Put(cacheKey, cached, paramTypes) + sctx.GetPlanCache(isNonPrepared).Put(cacheKey, cached, paramTypes, limitParams) } sessVars.FoundInPlanCache = false return p, names, err } +// checkPlanCacheability checks whether this plan is cacheable and set to skip plan cache if it's uncacheable. +func checkPlanCacheability(sctx sessionctx.Context, p Plan, paramNum int) { + stmtCtx := sctx.GetSessionVars().StmtCtx + var pp PhysicalPlan + switch x := p.(type) { + case *Insert: + pp = x.SelectPlan + case *Update: + pp = x.SelectPlan + case *Delete: + pp = x.SelectPlan + case PhysicalPlan: + pp = x + default: + stmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: unexpected un-cacheable plan %v", p.ExplainID().String())) + return + } + if pp == nil { // simple DML statements + return + } + + if useTiFlash(pp) { + stmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: TiFlash plan is un-cacheable")) + return + } + + // We only cache the tableDual plan when the number of parameters are zero. + if containTableDual(pp) && paramNum > 0 { + stmtCtx.SetSkipPlanCache(errors.New("skip plan-cache: get a TableDual plan")) + return + } + + if accessMVIndexWithIndexMerge(pp) { + stmtCtx.SetSkipPlanCache(errors.New("skip plan-cache: the plan with IndexMerge accessing Multi-Valued Index is un-cacheable")) + return + } +} + // RebuildPlan4CachedPlan will rebuild this plan under current user parameters. func RebuildPlan4CachedPlan(p Plan) error { sc := p.SCtx().GetSessionVars().StmtCtx @@ -675,22 +718,53 @@ func tryCachePointPlan(_ context.Context, sctx sessionctx.Context, return err } -func containTableDual(p Plan) bool { +func containTableDual(p PhysicalPlan) bool { _, isTableDual := p.(*PhysicalTableDual) if isTableDual { return true } - physicalPlan, ok := p.(PhysicalPlan) - if !ok { - return false - } childContainTableDual := false - for _, child := range physicalPlan.Children() { + for _, child := range p.Children() { childContainTableDual = childContainTableDual || containTableDual(child) } return childContainTableDual } +func accessMVIndexWithIndexMerge(p PhysicalPlan) bool { + if idxMerge, ok := p.(*PhysicalIndexMergeReader); ok { + if idxMerge.AccessMVIndex { + return true + } + } + + for _, c := range p.Children() { + if accessMVIndexWithIndexMerge(c) { + return true + } + } + return false +} + +// useTiFlash used to check whether the plan use the TiFlash engine. +func useTiFlash(p PhysicalPlan) bool { + switch x := p.(type) { + case *PhysicalTableReader: + switch x.StoreType { + case kv.TiFlash: + return true + default: + return false + } + default: + if len(p.Children()) > 0 { + for _, plan := range p.Children() { + return useTiFlash(plan) + } + } + } + return false +} + // GetBindSQL4PlanCache used to get the bindSQL for plan cache to build the plan cache key. func GetBindSQL4PlanCache(sctx sessionctx.Context, stmt *PlanCacheStmt) (string, bool) { useBinding := sctx.GetSessionVars().UsePlanBaselines diff --git a/planner/core/plan_cache_lru.go b/planner/core/plan_cache_lru.go index 413dd37e8f5a2..20fa4c3f5c85c 100644 --- a/planner/core/plan_cache_lru.go +++ b/planner/core/plan_cache_lru.go @@ -53,7 +53,7 @@ type LRUPlanCache struct { lock sync.Mutex // pickFromBucket get one element from bucket. The LRUPlanCache can not work if it is nil - pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType) (*list.Element, bool) + pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType, []uint64) (*list.Element, bool) // onEvict will be called if any eviction happened, only for test use now onEvict func(kvcache.Key, kvcache.Value) @@ -68,7 +68,7 @@ type LRUPlanCache struct { // NewLRUPlanCache creates a PCLRUCache object, whose capacity is "capacity". // NOTE: "capacity" should be a positive value. func NewLRUPlanCache(capacity uint, guard float64, quota uint64, - pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType) (*list.Element, bool), sctx sessionctx.Context) *LRUPlanCache { + pickFromBucket func(map[*list.Element]struct{}, []*types.FieldType, []uint64) (*list.Element, bool), sctx sessionctx.Context) *LRUPlanCache { if capacity < 1 { capacity = 100 logutil.BgLogger().Info("capacity of LRU cache is less than 1, will use default value(100) init cache") @@ -94,13 +94,13 @@ func strHashKey(key kvcache.Key, deepCopy bool) string { } // Get tries to find the corresponding value according to the given key. -func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType) (value kvcache.Value, ok bool) { +func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType, limitParams []uint64) (value kvcache.Value, ok bool) { l.lock.Lock() defer l.lock.Unlock() bucket, bucketExist := l.buckets[strHashKey(key, false)] if bucketExist { - if element, exist := l.pickFromBucket(bucket, paramTypes); exist { + if element, exist := l.pickFromBucket(bucket, paramTypes, limitParams); exist { l.lruList.MoveToFront(element) return element.Value.(*planCacheEntry).PlanValue, true } @@ -109,14 +109,14 @@ func (l *LRUPlanCache) Get(key kvcache.Key, paramTypes []*types.FieldType) (valu } // Put puts the (key, value) pair into the LRU Cache. -func (l *LRUPlanCache) Put(key kvcache.Key, value kvcache.Value, paramTypes []*types.FieldType) { +func (l *LRUPlanCache) Put(key kvcache.Key, value kvcache.Value, paramTypes []*types.FieldType, limitParams []uint64) { l.lock.Lock() defer l.lock.Unlock() hash := strHashKey(key, true) bucket, bucketExist := l.buckets[hash] if bucketExist { - if element, exist := l.pickFromBucket(bucket, paramTypes); exist { + if element, exist := l.pickFromBucket(bucket, paramTypes, limitParams); exist { l.updateInstanceMetric(&planCacheEntry{PlanKey: key, PlanValue: value}, element.Value.(*planCacheEntry)) element.Value.(*planCacheEntry).PlanValue = value l.lruList.MoveToFront(element) @@ -252,16 +252,36 @@ func (l *LRUPlanCache) memoryControl() { } // PickPlanFromBucket pick one plan from bucket -func PickPlanFromBucket(bucket map[*list.Element]struct{}, paramTypes []*types.FieldType) (*list.Element, bool) { +func PickPlanFromBucket(bucket map[*list.Element]struct{}, paramTypes []*types.FieldType, limitParams []uint64) (*list.Element, bool) { for k := range bucket { plan := k.Value.(*planCacheEntry).PlanValue.(*PlanCacheValue) - if plan.ParamTypes.CheckTypesCompatibility4PC(paramTypes) { + ok1 := plan.ParamTypes.CheckTypesCompatibility4PC(paramTypes) + if !ok1 { + continue + } + ok2 := checkUint64SliceIfEqual(plan.limitOffsetAndCount, limitParams) + if ok2 { return k, true } } return nil, false } +func checkUint64SliceIfEqual(a, b []uint64) bool { + if (a == nil && b != nil) || (a != nil && b == nil) { + return false + } + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + // updateInstanceMetric update the memory usage and plan num for show in grafana func (l *LRUPlanCache) updateInstanceMetric(in, out *planCacheEntry) { updateInstancePlanNum(in, out) diff --git a/planner/core/plan_cache_lru_test.go b/planner/core/plan_cache_lru_test.go index 74b6b2c92c3bb..f51480401ce62 100644 --- a/planner/core/plan_cache_lru_test.go +++ b/planner/core/plan_cache_lru_test.go @@ -65,14 +65,18 @@ func TestLRUPCPut(t *testing.T) { {types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeLong)}, {types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeInt24)}, } + limitParams := [][]uint64{ + {1}, {2}, {3}, {4}, {5}, + } // one key corresponding to multi values for i := 0; i < 5; i++ { keys[i] = &planCacheKey{database: strconv.FormatInt(int64(1), 10)} vals[i] = &PlanCacheValue{ - ParamTypes: pTypes[i], + ParamTypes: pTypes[i], + limitOffsetAndCount: limitParams[i], } - lru.Put(keys[i], vals[i], pTypes[i]) + lru.Put(keys[i], vals[i], pTypes[i], limitParams[i]) } require.Equal(t, lru.size, lru.capacity) require.Equal(t, uint(3), lru.size) @@ -103,7 +107,7 @@ func TestLRUPCPut(t *testing.T) { bucket, exist := lru.buckets[string(hack.String(keys[i].Hash()))] require.True(t, exist) - element, exist := lru.pickFromBucket(bucket, pTypes[i]) + element, exist := lru.pickFromBucket(bucket, pTypes[i], limitParams[i]) require.NotNil(t, element) require.True(t, exist) require.Equal(t, root, element) @@ -131,22 +135,25 @@ func TestLRUPCGet(t *testing.T) { {types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeLong)}, {types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeInt24)}, } + limitParams := [][]uint64{ + {1}, {2}, {3}, {4}, {5}, + } // 5 bucket for i := 0; i < 5; i++ { keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i%4), 10)} - vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]} - lru.Put(keys[i], vals[i], pTypes[i]) + vals[i] = &PlanCacheValue{ParamTypes: pTypes[i], limitOffsetAndCount: limitParams[i]} + lru.Put(keys[i], vals[i], pTypes[i], limitParams[i]) } // test for non-existent elements for i := 0; i < 2; i++ { - value, exists := lru.Get(keys[i], pTypes[i]) + value, exists := lru.Get(keys[i], pTypes[i], limitParams[i]) require.False(t, exists) require.Nil(t, value) } for i := 2; i < 5; i++ { - value, exists := lru.Get(keys[i], pTypes[i]) + value, exists := lru.Get(keys[i], pTypes[i], limitParams[i]) require.True(t, exists) require.NotNil(t, value) require.Equal(t, vals[i], value) @@ -175,23 +182,29 @@ func TestLRUPCDelete(t *testing.T) { {types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeEnum)}, {types.NewFieldType(mysql.TypeFloat), types.NewFieldType(mysql.TypeDate)}, } + limitParams := [][]uint64{ + {1}, {2}, {3}, + } for i := 0; i < 3; i++ { keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i), 10)} - vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]} - lru.Put(keys[i], vals[i], pTypes[i]) + vals[i] = &PlanCacheValue{ + ParamTypes: pTypes[i], + limitOffsetAndCount: limitParams[i], + } + lru.Put(keys[i], vals[i], pTypes[i], []uint64{}) } require.Equal(t, 3, int(lru.size)) lru.Delete(keys[1]) - value, exists := lru.Get(keys[1], pTypes[1]) + value, exists := lru.Get(keys[1], pTypes[1], limitParams[1]) require.False(t, exists) require.Nil(t, value) require.Equal(t, 2, int(lru.size)) - _, exists = lru.Get(keys[0], pTypes[0]) + _, exists = lru.Get(keys[0], pTypes[0], limitParams[0]) require.True(t, exists) - _, exists = lru.Get(keys[2], pTypes[2]) + _, exists = lru.Get(keys[2], pTypes[2], limitParams[2]) require.True(t, exists) } @@ -207,14 +220,14 @@ func TestLRUPCDeleteAll(t *testing.T) { for i := 0; i < 3; i++ { keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i), 10)} vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]} - lru.Put(keys[i], vals[i], pTypes[i]) + lru.Put(keys[i], vals[i], pTypes[i], []uint64{}) } require.Equal(t, 3, int(lru.size)) lru.DeleteAll() for i := 0; i < 3; i++ { - value, exists := lru.Get(keys[i], pTypes[i]) + value, exists := lru.Get(keys[i], pTypes[i], []uint64{}) require.False(t, exists) require.Nil(t, value) require.Equal(t, 0, int(lru.size)) @@ -242,7 +255,7 @@ func TestLRUPCSetCapacity(t *testing.T) { for i := 0; i < 5; i++ { keys[i] = &planCacheKey{database: strconv.FormatInt(int64(1), 10)} vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]} - lru.Put(keys[i], vals[i], pTypes[i]) + lru.Put(keys[i], vals[i], pTypes[i], []uint64{}) } require.Equal(t, lru.size, lru.capacity) require.Equal(t, uint(5), lru.size) @@ -292,7 +305,7 @@ func TestIssue37914(t *testing.T) { val := &PlanCacheValue{ParamTypes: pTypes} require.NotPanics(t, func() { - lru.Put(key, val, pTypes) + lru.Put(key, val, pTypes, []uint64{}) }) } @@ -313,7 +326,7 @@ func TestIssue38244(t *testing.T) { for i := 0; i < 5; i++ { keys[i] = &planCacheKey{database: strconv.FormatInt(int64(i), 10)} vals[i] = &PlanCacheValue{ParamTypes: pTypes[i]} - lru.Put(keys[i], vals[i], pTypes[i]) + lru.Put(keys[i], vals[i], pTypes[i], []uint64{}) } require.Equal(t, lru.size, lru.capacity) require.Equal(t, uint(3), lru.size) @@ -334,7 +347,7 @@ func TestLRUPlanCacheMemoryUsage(t *testing.T) { for i := 0; i < 3; i++ { k := randomPlanCacheKey() v := randomPlanCacheValue(pTypes) - lru.Put(k, v, pTypes) + lru.Put(k, v, pTypes, []uint64{}) res += k.MemoryUsage() + v.MemoryUsage() require.Equal(t, lru.MemoryUsage(), res) } @@ -342,7 +355,7 @@ func TestLRUPlanCacheMemoryUsage(t *testing.T) { p := &PhysicalTableScan{} k := &planCacheKey{database: "3"} v := &PlanCacheValue{Plan: p} - lru.Put(k, v, pTypes) + lru.Put(k, v, pTypes, []uint64{}) res += k.MemoryUsage() + v.MemoryUsage() for kk, vv := range evict { res -= kk.(*planCacheKey).MemoryUsage() + vv.(*PlanCacheValue).MemoryUsage() diff --git a/planner/core/plan_cache_test.go b/planner/core/plan_cache_test.go index fe76180291edf..05fc7d338a850 100644 --- a/planner/core/plan_cache_test.go +++ b/planner/core/plan_cache_test.go @@ -385,12 +385,6 @@ func TestPlanCacheDiagInfo(t *testing.T) { tk.MustExec("prepare stmt from 'select /*+ ignore_plan_cache() */ * from t'") tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: ignore plan cache by hint")) - tk.MustExec("prepare stmt from 'select * from t limit ?'") - tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: query has 'limit ?' is un-cacheable")) - - tk.MustExec("prepare stmt from 'select * from t limit ?, 1'") - tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: query has 'limit ?, 10' is un-cacheable")) - tk.MustExec("prepare stmt from 'select * from t order by ?'") tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: query has 'order by ?' is un-cacheable")) @@ -463,18 +457,64 @@ func TestIssue40225(t *testing.T) { tk.MustQuery("select @@last_plan_from_binding").Check(testkit.Rows("1")) } -func TestUncacheableReason(t *testing.T) { +func TestPlanCacheWithLimit(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") - tk.MustExec("create table t (a int)") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int primary key, b int)") + + testCases := []struct { + sql string + params []int + }{ + {"prepare stmt from 'select * from t limit ?'", []int{1}}, + {"prepare stmt from 'select * from t limit ?, ?'", []int{1, 2}}, + {"prepare stmt from 'delete from t order by a limit ?'", []int{1}}, + {"prepare stmt from 'insert into t select * from t order by a desc limit ?'", []int{1}}, + {"prepare stmt from 'insert into t select * from t order by a desc limit ?, ?'", []int{1, 2}}, + {"prepare stmt from 'update t set a = 1 limit ?'", []int{1}}, + {"prepare stmt from '(select * from t order by a limit ?) union (select * from t order by a desc limit ?)'", []int{1, 2}}, + {"prepare stmt from 'select * from t where a = ? limit ?, ?'", []int{1, 1, 1}}, + {"prepare stmt from 'select * from t where a in (?, ?) limit ?, ?'", []int{1, 2, 1, 1}}, + } - tk.MustExec("prepare st from 'select * from t limit ?'") - tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: query has 'limit ?' is un-cacheable")) + for idx, testCase := range testCases { + tk.MustExec(testCase.sql) + var using []string + for i, p := range testCase.params { + tk.MustExec(fmt.Sprintf("set @a%d = %d", i, p)) + using = append(using, fmt.Sprintf("@a%d", i)) + } - tk.MustExec("set @a=1") - tk.MustQuery("execute st using @a").Check(testkit.Rows()) - tk.MustExec("prepare st from 'select * from t limit ?'") - // show the corresponding un-cacheable reason at execute-stage as well - tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: query has 'limit ?' is un-cacheable")) + tk.MustExec("execute stmt using " + strings.Join(using, ", ")) + tk.MustExec("execute stmt using " + strings.Join(using, ", ")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + if idx < 6 { + tk.MustExec("set @a0 = 6") + tk.MustExec("execute stmt using " + strings.Join(using, ", ")) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + } + } + + tk.MustExec("prepare stmt from 'select * from t limit ?'") + tk.MustExec("set @a = 10001") + tk.MustExec("execute stmt using @a") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: limit count more than 10000")) +} + +func TestUncacheableReason(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + + tk.MustExec("prepare st from 'select /*+ ignore_plan_cache() */ * from t'") + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: ignore plan cache by hint")) + tk.MustExec("execute st") // show the un-cacheable reason when executing the statement + tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: ignore plan cache by hint")) + tk.MustExec("execute st") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) } diff --git a/planner/core/plan_cache_utils.go b/planner/core/plan_cache_utils.go index 3fe4ee38bfe45..082637506e590 100644 --- a/planner/core/plan_cache_utils.go +++ b/planner/core/plan_cache_utils.go @@ -343,6 +343,9 @@ type PlanCacheValue struct { TblInfo2UnionScan map[*model.TableInfo]bool ParamTypes FieldSlice memoryUsage int64 + // limitOffsetAndCount stores all the offset and key parameters extract from limit statement + // only used for cache and pick plan with parameters + limitOffsetAndCount []uint64 } func (v *PlanCacheValue) varTypesUnchanged(txtVarTps []*types.FieldType) bool { @@ -390,7 +393,7 @@ func (v *PlanCacheValue) MemoryUsage() (sum int64) { // NewPlanCacheValue creates a SQLCacheValue. func NewPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*model.TableInfo]bool, - paramTypes []*types.FieldType) *PlanCacheValue { + paramTypes []*types.FieldType, limitParams []uint64) *PlanCacheValue { dstMap := make(map[*model.TableInfo]bool) for k, v := range srcMap { dstMap[k] = v @@ -400,10 +403,11 @@ func NewPlanCacheValue(plan Plan, names []*types.FieldName, srcMap map[*model.Ta userParamTypes[i] = tp.Clone() } return &PlanCacheValue{ - Plan: plan, - OutPutNames: names, - TblInfo2UnionScan: dstMap, - ParamTypes: userParamTypes, + Plan: plan, + OutPutNames: names, + TblInfo2UnionScan: dstMap, + ParamTypes: userParamTypes, + limitOffsetAndCount: limitParams, } } @@ -453,3 +457,69 @@ func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (*PlanCa } return nil, ErrStmtNotFound } + +type limitExtractor struct { + cacheable bool // For safety considerations, check if limit count less than 10000 + offsetAndCount []uint64 + unCacheableReason string + paramTypeErr error +} + +// Enter implements Visitor interface. +func (checker *limitExtractor) Enter(in ast.Node) (out ast.Node, skipChildren bool) { + switch node := in.(type) { + case *ast.Limit: + if node.Count != nil { + if count, isParamMarker := node.Count.(*driver.ParamMarkerExpr); isParamMarker { + typeExpected, val := CheckParamTypeInt64orUint64(count) + if typeExpected { + if val > 10000 { + checker.cacheable = false + checker.unCacheableReason = "limit count more than 10000" + return in, true + } + checker.offsetAndCount = append(checker.offsetAndCount, val) + } else { + checker.paramTypeErr = ErrWrongArguments.GenWithStackByArgs("LIMIT") + return in, true + } + } + } + if node.Offset != nil { + if offset, isParamMarker := node.Offset.(*driver.ParamMarkerExpr); isParamMarker { + typeExpected, val := CheckParamTypeInt64orUint64(offset) + if typeExpected { + checker.offsetAndCount = append(checker.offsetAndCount, val) + } else { + checker.paramTypeErr = ErrWrongArguments.GenWithStackByArgs("LIMIT") + return in, true + } + } + } + } + return in, false +} + +// Leave implements Visitor interface. +func (checker *limitExtractor) Leave(in ast.Node) (out ast.Node, ok bool) { + return in, checker.cacheable +} + +// ExtractLimitFromAst extract limit offset and count from ast for plan cache key encode +func ExtractLimitFromAst(node ast.Node, sctx sessionctx.Context) ([]uint64, error) { + if node == nil { + return nil, nil + } + checker := limitExtractor{ + cacheable: true, + offsetAndCount: []uint64{}, + } + node.Accept(&checker) + if checker.paramTypeErr != nil { + return nil, checker.paramTypeErr + } + if sctx != nil && !checker.cacheable { + sctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.New("skip plan-cache: " + checker.unCacheableReason)) + } + return checker.offsetAndCount, nil +} diff --git a/planner/core/plan_cacheable_checker.go b/planner/core/plan_cacheable_checker.go index 0074cff434221..2ff9e51823ee2 100644 --- a/planner/core/plan_cacheable_checker.go +++ b/planner/core/plan_cacheable_checker.go @@ -135,21 +135,22 @@ func (checker *cacheableChecker) Enter(in ast.Node) (out ast.Node, skipChildren return in, true } } - case *ast.Limit: - if node.Count != nil { - if _, isParamMarker := node.Count.(*driver.ParamMarkerExpr); isParamMarker { - checker.cacheable = false - checker.reason = "query has 'limit ?' is un-cacheable" - return in, true - } - } - if node.Offset != nil { - if _, isParamMarker := node.Offset.(*driver.ParamMarkerExpr); isParamMarker { - checker.cacheable = false - checker.reason = "query has 'limit ?, 10' is un-cacheable" - return in, true - } - } + // todo: these comment is used to add switch in the later pr + //case *ast.Limit: + // if node.Count != nil { + // if _, isParamMarker := node.Count.(*driver.ParamMarkerExpr); isParamMarker { + // checker.cacheable = false + // checker.reason = "query has 'limit ?' is un-cacheable" + // return in, true + // } + // } + // if node.Offset != nil { + // if _, isParamMarker := node.Offset.(*driver.ParamMarkerExpr); isParamMarker { + // checker.cacheable = false + // checker.reason = "query has 'limit ?, 10' is un-cacheable" + // return in, true + // } + // } case *ast.FrameBound: if _, ok := node.Expr.(*driver.ParamMarkerExpr); ok { checker.cacheable = false diff --git a/planner/core/plan_cacheable_checker_test.go b/planner/core/plan_cacheable_checker_test.go index e87a08592eb16..7d417e377888f 100644 --- a/planner/core/plan_cacheable_checker_test.go +++ b/planner/core/plan_cacheable_checker_test.go @@ -87,7 +87,7 @@ func TestCacheable(t *testing.T) { TableRefs: tableRefsClause, Limit: limitStmt, } - require.False(t, core.Cacheable(stmt, is)) + require.True(t, core.Cacheable(stmt, is)) limitStmt = &ast.Limit{ Offset: &driver.ParamMarkerExpr{}, @@ -96,7 +96,7 @@ func TestCacheable(t *testing.T) { TableRefs: tableRefsClause, Limit: limitStmt, } - require.False(t, core.Cacheable(stmt, is)) + require.True(t, core.Cacheable(stmt, is)) limitStmt = &ast.Limit{} stmt = &ast.DeleteStmt{ @@ -139,7 +139,7 @@ func TestCacheable(t *testing.T) { TableRefs: tableRefsClause, Limit: limitStmt, } - require.False(t, core.Cacheable(stmt, is)) + require.True(t, core.Cacheable(stmt, is)) limitStmt = &ast.Limit{ Offset: &driver.ParamMarkerExpr{}, @@ -148,7 +148,7 @@ func TestCacheable(t *testing.T) { TableRefs: tableRefsClause, Limit: limitStmt, } - require.False(t, core.Cacheable(stmt, is)) + require.True(t, core.Cacheable(stmt, is)) limitStmt = &ast.Limit{} stmt = &ast.UpdateStmt{ @@ -188,7 +188,7 @@ func TestCacheable(t *testing.T) { stmt = &ast.SelectStmt{ Limit: limitStmt, } - require.False(t, core.Cacheable(stmt, is)) + require.True(t, core.Cacheable(stmt, is)) limitStmt = &ast.Limit{ Offset: &driver.ParamMarkerExpr{}, @@ -196,7 +196,7 @@ func TestCacheable(t *testing.T) { stmt = &ast.SelectStmt{ Limit: limitStmt, } - require.False(t, core.Cacheable(stmt, is)) + require.True(t, core.Cacheable(stmt, is)) limitStmt = &ast.Limit{} stmt = &ast.SelectStmt{ diff --git a/planner/core/task.go b/planner/core/task.go index d68806c5ef1d8..5d7ca6e5fd424 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -86,6 +86,7 @@ type copTask struct { idxMergePartPlans []PhysicalPlan idxMergeIsIntersection bool + idxMergeAccessMVIndex bool // rootTaskConds stores select conditions containing virtual columns. // These conditions can't push to TiKV, so we have to add a selection for rootTask @@ -688,6 +689,7 @@ func (t *copTask) convertToRootTaskImpl(ctx sessionctx.Context) *rootTask { partialPlans: t.idxMergePartPlans, tablePlan: t.tablePlan, IsIntersectionType: t.idxMergeIsIntersection, + AccessMVIndex: t.idxMergeAccessMVIndex, }.Init(ctx, t.idxMergePartPlans[0].SelectBlockOffset()) p.PartitionInfo = t.partitionInfo setTableScanToTableRowIDScan(p.tablePlan) diff --git a/planner/util/path.go b/planner/util/path.go index 2ee67eea8b6aa..68b11bbf5751f 100644 --- a/planner/util/path.go +++ b/planner/util/path.go @@ -54,6 +54,8 @@ type AccessPath struct { // It's only valid for a IndexMerge path. // Intersection type is for expressions connected by `AND` and union type is for `OR`. IndexMergeIsIntersection bool + // IndexMergeAccessMVIndex indicates whether this IndexMerge path accesses a MVIndex. + IndexMergeAccessMVIndex bool StoreType kv.StoreType diff --git a/resourcemanager/BUILD.bazel b/resourcemanager/BUILD.bazel index 968a73b4a0f95..c6e7983df34be 100644 --- a/resourcemanager/BUILD.bazel +++ b/resourcemanager/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//resourcemanager/util", "//util", "//util/cpu", + "@com_github_google_uuid//:uuid", "@com_github_pingcap_log//:log", "@org_uber_go_zap//:zap", ], diff --git a/resourcemanager/rm.go b/resourcemanager/rm.go index aa2af6f949465..ed73cda8e1abf 100644 --- a/resourcemanager/rm.go +++ b/resourcemanager/rm.go @@ -17,6 +17,7 @@ package resourcemanager import ( "time" + "github.com/google/uuid" "github.com/pingcap/tidb/resourcemanager/scheduler" "github.com/pingcap/tidb/resourcemanager/util" tidbutil "github.com/pingcap/tidb/util" @@ -26,6 +27,11 @@ import ( // GlobalResourceManager is a global resource manager var GlobalResourceManager = NewResourceManger() +// RandomName is to get a random name for register pool. It is just for test. +func RandomName() string { + return uuid.New().String() +} + // ResourceManager is a resource manager type ResourceManager struct { poolMap *util.ShardPoolMap @@ -85,3 +91,8 @@ func (r *ResourceManager) registerPool(name string, pool *util.PoolContainer) er func (r *ResourceManager) Unregister(name string) { r.poolMap.Del(name) } + +// Reset is to Reset resource manager. it is just for test. +func (r *ResourceManager) Reset() { + r.poolMap = util.NewShardPoolMap() +} diff --git a/sessionctx/context.go b/sessionctx/context.go index 0e38fbdaba3d5..0999b2396cae0 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -54,8 +54,8 @@ type SessionStatesHandler interface { // PlanCache is an interface for prepare and non-prepared plan cache type PlanCache interface { - Get(key kvcache.Key, paramTypes []*types.FieldType) (value kvcache.Value, ok bool) - Put(key kvcache.Key, value kvcache.Value, paramTypes []*types.FieldType) + Get(key kvcache.Key, paramTypes []*types.FieldType, limitParams []uint64) (value kvcache.Value, ok bool) + Put(key kvcache.Key, value kvcache.Value, paramTypes []*types.FieldType, limitParams []uint64) Delete(key kvcache.Key) DeleteAll() Size() int diff --git a/statistics/handle/BUILD.bazel b/statistics/handle/BUILD.bazel index 81dff92c5b143..d52847495d539 100644 --- a/statistics/handle/BUILD.bazel +++ b/statistics/handle/BUILD.bazel @@ -42,6 +42,7 @@ go_library( "//util/memory", "//util/ranger", "//util/sqlexec", + "//util/syncutil", "//util/timeutil", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", @@ -72,6 +73,7 @@ go_test( ], embed = [":handle"], flaky = True, + race = "on", shard_count = 50, deps = [ "//config", diff --git a/statistics/handle/handle.go b/statistics/handle/handle.go index 79a5382779208..fc4f86dc54fb8 100644 --- a/statistics/handle/handle.go +++ b/statistics/handle/handle.go @@ -48,6 +48,7 @@ import ( "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/syncutil" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/oracle" atomic2 "go.uber.org/atomic" @@ -70,7 +71,7 @@ type Handle struct { initStatsCtx sessionctx.Context mu struct { - sync.RWMutex + syncutil.RWMutex ctx sessionctx.Context // rateMap contains the error rate delta from feedback. rateMap errorRateDeltaMap @@ -361,8 +362,15 @@ func (h *Handle) RemoveLockedTables(tids []int64, pids []int64, tables []*ast.Ta return "", err } -// IsTableLocked check whether table is locked in handle +// IsTableLocked check whether table is locked in handle with Handle.Mutex func (h *Handle) IsTableLocked(tableID int64) bool { + h.mu.RLock() + defer h.mu.RUnlock() + return h.isTableLocked(tableID) +} + +// IsTableLocked check whether table is locked in handle without Handle.Mutex +func (h *Handle) isTableLocked(tableID int64) bool { return isTableLocked(h.tableLocked, tableID) } diff --git a/statistics/handle/update.go b/statistics/handle/update.go index 68aa9cebbcf05..e245a3ea0bca5 100644 --- a/statistics/handle/update.go +++ b/statistics/handle/update.go @@ -549,7 +549,8 @@ func (h *Handle) dumpTableStatCountToKV(id int64, delta variable.TableDelta) (up startTS := txn.StartTS() updateStatsMeta := func(id int64) error { var err error - if h.IsTableLocked(id) { + // This lock is already locked on it so it use isTableLocked without lock. + if h.isTableLocked(id) { if delta.Delta < 0 { _, err = exec.ExecuteInternal(ctx, "update mysql.stats_table_locked set version = %?, count = count - %?, modify_count = modify_count + %? where table_id = %? and count >= %?", startTS, -delta.Delta, delta.Count, id, -delta.Delta) } else { diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index 9ea8467d01dfa..a7cdd81453fd7 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "//util/trxevents", "@com_github_dgraph_io_ristretto//:ristretto", "@com_github_gogo_protobuf//proto", + "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/coprocessor", diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 08f0f055a31e3..f446912ef33af 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -26,6 +26,7 @@ import ( "unsafe" "github.com/gogo/protobuf/proto" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" @@ -384,12 +385,20 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *KeyRanges, req *kv builder.reverse() } tasks := builder.build() - if elapsed := time.Since(start); elapsed > time.Millisecond*500 { + elapsed := time.Since(start) + if elapsed > time.Millisecond*500 { logutil.BgLogger().Warn("buildCopTasks takes too much time", zap.Duration("elapsed", elapsed), zap.Int("range len", rangesLen), zap.Int("task len", len(tasks))) } + if elapsed > time.Millisecond { + ctx := bo.GetCtx() + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("copr.buildCopTasks", opentracing.ChildOf(span.Context()), opentracing.StartTime(start)) + defer span1.Finish() + } + } metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(builder.regionNum())) return tasks, nil } diff --git a/testkit/BUILD.bazel b/testkit/BUILD.bazel index 4e0e24091db27..c28ef0614eb04 100644 --- a/testkit/BUILD.bazel +++ b/testkit/BUILD.bazel @@ -21,6 +21,7 @@ go_library( "//parser/ast", "//parser/terror", "//planner/core", + "//resourcemanager", "//session", "//session/txninfo", "//sessionctx/variable", diff --git a/testkit/mockstore.go b/testkit/mockstore.go index 12afe0e0f2f68..9756d5bb65804 100644 --- a/testkit/mockstore.go +++ b/testkit/mockstore.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/tidb/ddl/schematracker" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" + "github.com/pingcap/tidb/resourcemanager" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/store/driver" "github.com/pingcap/tidb/store/mockstore" @@ -91,6 +92,7 @@ func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Doma err := store.Close() require.NoError(t, err) view.Stop() + resourcemanager.GlobalResourceManager.Reset() }) return dom }