From ac2c6f8d875a6f530d53bcd64ff3c5e4a3ce313b Mon Sep 17 00:00:00 2001 From: xiaolunzhou <51695571+JayL-zxl@users.noreply.github.com> Date: Fri, 29 Oct 2021 17:08:49 +0800 Subject: [PATCH] ddl : ddl forbid stale read for cache table and support history read (#29173) --- ddl/db_test.go | 11 ++++ ddl/ddl_api.go | 3 ++ ddl/table.go | 4 +- executor/admin_test.go | 7 ++- executor/builder.go | 34 ++++++++++--- executor/executor_test.go | 103 ++++++++++++++++++++++++++++++++++++++ executor/point_get.go | 2 +- 7 files changed, 155 insertions(+), 9 deletions(-) diff --git a/ddl/db_test.go b/ddl/db_test.go index 6defdb38266e5..4b9da4557f08f 100644 --- a/ddl/db_test.go +++ b/ddl/db_test.go @@ -5917,6 +5917,17 @@ func (s *testDBSuite2) TestAlterTableCache(c *C) { // Multiple alter cache is okay tk.MustExec("alter table t cache") tk.MustExec("alter table t cache") + // Test a temporary table + tk.MustExec("drop table if exists t") + tk.MustExec("create temporary table t (id int primary key auto_increment, u int unique, v int)") + tk.MustExec("drop table if exists tmp1") + // local temporary table alter is not supported + tk.MustGetErrCode("alter table t cache", errno.ErrUnsupportedDDLOperation) + // test global temporary table + tk.MustExec("create global temporary table tmp1 " + + "(id int not null primary key, code int not null, value int default null, unique key code(code))" + + "on commit delete rows") + tk.MustGetErrMsg("alter table tmp1 cache", ddl.ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache").Error()) } diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index a8e6e858b1adb..081c5246b5eac 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -6637,6 +6637,9 @@ func (d *ddl) AlterTableCache(ctx sessionctx.Context, ti ast.Ident) (err error) if t.Meta().TableCacheStatusType == model.TableCacheStatusEnable { return nil } + if t.Meta().TempTableType != model.TempTableNone { + return ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache") + } job := &model.Job{ SchemaID: schema.ID, SchemaName: schema.Name.L, diff --git a/ddl/table.go b/ddl/table.go index d7cd9fde44e8e..430f688c8fc2b 100644 --- a/ddl/table.go +++ b/ddl/table.go @@ -1475,7 +1475,9 @@ func onAlterCacheTable(t *meta.Meta, job *model.Job) (ver int64, err error) { job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tbInfo) return ver, nil } - + if tbInfo.TempTableType != model.TempTableNone { + return ver, errors.Trace(ErrOptOnTemporaryTable.GenWithStackByArgs("alter temporary table cache")) + } switch tbInfo.TableCacheStatusType { case model.TableCacheStatusDisable: // disable -> switching diff --git a/executor/admin_test.go b/executor/admin_test.go index 8d2f9c89b6e85..8707332688ad2 100644 --- a/executor/admin_test.go +++ b/executor/admin_test.go @@ -127,6 +127,7 @@ func (s *testSuite5) TestAdminCheckIndexInLocalTemporaryMode(c *C) { c.Assert(err.Error(), Equals, core.ErrOptOnTemporaryTable.GenWithStackByArgs("admin checksum table").Error()) tk.MustExec("drop table if exists local_temporary_admin_checksum_table_with_index_test,local_temporary_admin_checksum_table_without_index_test;") } + func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") @@ -137,6 +138,8 @@ func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) { tk.MustExec("admin check table cache_admin_test;") tk.MustExec("admin check index cache_admin_test c1;") tk.MustExec("admin check index cache_admin_test c2;") + tk.MustExec("drop table if exists cache_admin_test;") + tk.MustExec(`drop table if exists check_index_test;`) tk.MustExec(`create table check_index_test (a int, b varchar(10), index a_b (a, b), index b (b))`) tk.MustExec(`insert check_index_test values (3, "ab"),(2, "cd"),(1, "ef"),(-1, "hi")`) @@ -145,7 +148,8 @@ func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) { result.Check(testkit.Rows("1 ef 3", "2 cd 2")) result = tk.MustQuery("admin check index check_index_test a_b (3, 5);") result.Check(testkit.Rows("-1 hi 4", "1 ef 3")) - tk.MustExec("drop table if exists cache_admin_test;") + tk.MustExec("drop table if exists check_index_test;") + tk.MustExec("drop table if exists cache_admin_table_with_index_test;") tk.MustExec("drop table if exists cache_admin_table_without_index_test;") tk.MustExec("create table cache_admin_table_with_index_test (id int, count int, PRIMARY KEY(id), KEY(count))") @@ -156,6 +160,7 @@ func (s *testSuite5) TestAdminCheckIndexInCacheTable(c *C) { tk.MustExec("admin checksum table cache_admin_table_without_index_test;") tk.MustExec("drop table if exists cache_admin_table_with_index_test,cache_admin_table_without_index_test;") } + func (s *testSuite5) TestAdminRecoverIndex(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/builder.go b/executor/builder.go index df2b87dd5f620..09e288ea521c0 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -2848,7 +2848,7 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea return nil, err } ts := v.GetTableScan() - if err = b.validCanReadTemporaryTable(ts.Table); err != nil { + if err = b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil { return nil, err } @@ -2964,7 +2964,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E } ts := v.GetTableScan() - if err = b.validCanReadTemporaryTable(ts.Table); err != nil { + if err = b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil { b.err = err return nil } @@ -3187,7 +3187,7 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) Executor { is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) - if err := b.validCanReadTemporaryTable(is.Table); err != nil { + if err := b.validCanReadTemporaryOrCacheTable(is.Table); err != nil { b.err = err return nil } @@ -3346,7 +3346,7 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor { is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan) - if err := b.validCanReadTemporaryTable(is.Table); err != nil { + if err := b.validCanReadTemporaryOrCacheTable(is.Table); err != nil { b.err = err return nil } @@ -3461,7 +3461,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor { ts := v.TablePlans[0].(*plannercore.PhysicalTableScan) - if err := b.validCanReadTemporaryTable(ts.Table); err != nil { + if err := b.validCanReadTemporaryOrCacheTable(ts.Table); err != nil { b.err = err return nil } @@ -4266,7 +4266,7 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model } func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor { - if err := b.validCanReadTemporaryTable(plan.TblInfo); err != nil { + if err := b.validCanReadTemporaryOrCacheTable(plan.TblInfo); err != nil { b.err = err return nil } @@ -4551,6 +4551,28 @@ func (b *executorBuilder) buildCTETableReader(v *plannercore.PhysicalCTETable) E chkIdx: 0, } } +func (b *executorBuilder) validCanReadTemporaryOrCacheTable(tbl *model.TableInfo) error { + err := b.validCanReadTemporaryTable(tbl) + if err != nil { + return err + } + return b.validCanReadCacheTable(tbl) +} + +func (b *executorBuilder) validCanReadCacheTable(tbl *model.TableInfo) error { + if tbl.TableCacheStatusType == model.TableCacheStatusDisable { + return nil + } + + sessionVars := b.ctx.GetSessionVars() + + // Temporary table can't switch into cache table. so the following code will not cause confusion + if sessionVars.TxnCtx.IsStaleness || b.isStaleness { + return errors.Trace(errors.New("can not stale read cache table")) + } + + return nil +} func (b *executorBuilder) validCanReadTemporaryTable(tbl *model.TableInfo) error { if tbl.TempTableType == model.TempTableNone { diff --git a/executor/executor_test.go b/executor/executor_test.go index df531c4af7b91..f713709b6463f 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8990,6 +8990,109 @@ func (s *testStaleTxnSuite) TestInvalidReadTemporaryTable(c *C) { } } +func (s *testStaleTxnSuite) TestInvalidReadCacheTable(c *C) { + tk := testkit.NewTestKit(c, s.store) + // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. + safePointName := "tikv_gc_safe_point" + safePointValue := "20160102-15:04:05 -0700" + safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)" + updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s') + ON DUPLICATE KEY + UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment) + tk.MustExec(updateSafePoint) + tk.MustExec("use test") + tk.MustExec("drop table if exists cache_tmp1") + tk.MustExec("create table cache_tmp1 " + + "(id int not null primary key, code int not null, value int default null, unique key code(code))") + tk.MustExec("alter table cache_tmp1 cache") + tk.MustExec("drop table if exists cache_tmp2") + tk.MustExec("create table cache_tmp2 (id int not null primary key, code int not null, value int default null, unique key code(code));") + tk.MustExec("alter table cache_tmp2 cache") + tk.MustExec("drop table if exists cache_tmp3 , cache_tmp4, cache_tmp5") + tk.MustExec("create table cache_tmp3 (id int not null primary key, code int not null, value int default null, unique key code(code));") + tk.MustExec("create table cache_tmp4 (id int not null primary key, code int not null, value int default null, unique key code(code));") + tk.MustExec("create table cache_tmp5 (id int primary key);") + // sleep 1us to make test stale + time.Sleep(time.Microsecond) + + queries := []struct { + sql string + }{ + { + sql: "select * from cache_tmp1 where id=1", + }, + { + sql: "select * from cache_tmp1 where code=1", + }, + { + sql: "select * from cache_tmp1 where id in (1, 2, 3)", + }, + { + sql: "select * from cache_tmp1 where code in (1, 2, 3)", + }, + { + sql: "select * from cache_tmp1 where id > 1", + }, + { + sql: "select /*+use_index(cache_tmp1, code)*/ * from cache_tmp1 where code > 1", + }, + { + sql: "select /*+use_index(cache_tmp1, code)*/ code from cache_tmp1 where code > 1", + }, + } + + addStaleReadToSQL := func(sql string) string { + idx := strings.Index(sql, " where ") + if idx < 0 { + return "" + } + return sql[0:idx] + " as of timestamp NOW(6)" + sql[idx:] + } + for _, query := range queries { + sql := addStaleReadToSQL(query.sql) + if sql != "" { + tk.MustGetErrMsg(sql, "can not stale read cache table") + } + } + + tk.MustExec("start transaction read only as of timestamp NOW(6)") + for _, query := range queries { + tk.MustGetErrMsg(query.sql, "can not stale read cache table") + } + tk.MustExec("commit") + + for _, query := range queries { + tk.MustExec(query.sql) + } + + // Test normal table when cache table exits. + tk.MustExec("insert into cache_tmp5 values(1);") + tk.MustExec("set @a=now(6);") + time.Sleep(time.Microsecond) + tk.MustExec("drop table cache_tmp5") + tk.MustExec("create table cache_tmp5 (id int primary key);") + tk.MustQuery("select * from cache_tmp5 as of timestamp(@a) where id=1;").Check(testkit.Rows("1")) + tk.MustQuery("select * from cache_tmp4 as of timestamp(@a), cache_tmp3 as of timestamp(@a) where cache_tmp3.id=1;") + tk.MustGetErrMsg("select * from cache_tmp4 as of timestamp(@a), cache_tmp2 as of timestamp(@a) where cache_tmp2.id=1;", "can not stale read cache table") + tk.MustExec("set transaction read only as of timestamp NOW(6)") + tk.MustExec("start transaction") + for _, query := range queries { + tk.MustGetErrMsg(query.sql, "can not stale read cache table") + } + tk.MustExec("commit") + + for _, query := range queries { + tk.MustExec(query.sql) + } + + tk.MustExec("set @@tidb_snapshot=NOW(6)") + for _, query := range queries { + // enable historical read cache table + tk.MustExec(query.sql) + + } +} + func (s *testSuite) TestTableSampleTemporaryTable(c *C) { tk := testkit.NewTestKit(c, s.store) // For mocktikv, safe point is not initialized, we manually insert it for snapshot to use. diff --git a/executor/point_get.go b/executor/point_get.go index 6b6a5248e19df..0d1de79d0d1cd 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -43,7 +43,7 @@ import ( ) func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { - if err := b.validCanReadTemporaryTable(p.TblInfo); err != nil { + if err := b.validCanReadTemporaryOrCacheTable(p.TblInfo); err != nil { b.err = err return nil }