Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: add history read compatibility for temporary table #25237

Merged
merged 7 commits into from
Jun 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 31 additions & 19 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1373,11 +1373,6 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu
return e
}

// IsStaleness returns if the query is staleness
func (b *executorBuilder) IsStaleness() bool {
return b.ctx.GetSessionVars().TxnCtx.IsStaleness || b.explicitStaleness
}

// `getSnapshotTS` returns the timestamp of the snapshot that a reader should read.
func (b *executorBuilder) getSnapshotTS() (uint64, error) {
// `refreshForUpdateTSForRC` should always be invoked before returning the cached value to
Expand Down Expand Up @@ -2649,8 +2644,8 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return nil, err
}
ts := v.GetTableScan()
if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
return nil, errors.New("can not stale read temporary table")
if err = b.validCanReadTemporaryTable(ts.Table); err != nil {
return nil, err
}

tbl, _ := b.is.TableByID(ts.Table.ID)
Expand Down Expand Up @@ -2747,8 +2742,8 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) E
}

ts := v.GetTableScan()
if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
if err = b.validCanReadTemporaryTable(ts.Table); err != nil {
b.err = err
return nil
}

Expand Down Expand Up @@ -2961,8 +2956,8 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea

func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) Executor {
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
if is.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
if err := b.validCanReadTemporaryTable(is.Table); err != nil {
b.err = err
return nil
}

Expand Down Expand Up @@ -3113,8 +3108,8 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn

func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLookUpReader) Executor {
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
if is.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
if err := b.validCanReadTemporaryTable(is.Table); err != nil {
b.err = err
return nil
}

Expand Down Expand Up @@ -3224,8 +3219,8 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd

func (b *executorBuilder) buildIndexMergeReader(v *plannercore.PhysicalIndexMergeReader) Executor {
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
if ts.Table.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
if err := b.validCanReadTemporaryTable(ts.Table); err != nil {
b.err = err
return nil
}

Expand Down Expand Up @@ -4019,8 +4014,8 @@ func NewRowDecoder(ctx sessionctx.Context, schema *expression.Schema, tbl *model
}

func (b *executorBuilder) buildBatchPointGet(plan *plannercore.BatchPointGetPlan) Executor {
if plan.TblInfo.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
if err := b.validCanReadTemporaryTable(plan.TblInfo); err != nil {
b.err = err
return nil
}

Expand Down Expand Up @@ -4156,8 +4151,8 @@ func fullRangePartition(idxArr []int) bool {
}

func (b *executorBuilder) buildTableSample(v *plannercore.PhysicalTableSample) *TableSampleExecutor {
if v.TableInfo.Meta().TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
if err := b.validCanReadTemporaryTable(v.TableInfo.Meta()); err != nil {
b.err = err
return nil
}

Expand Down Expand Up @@ -4274,3 +4269,20 @@ func (b *executorBuilder) buildCTETableReader(v *plannercore.PhysicalCTETable) E
chkIdx: 0,
}
}

func (b *executorBuilder) validCanReadTemporaryTable(tbl *model.TableInfo) error {
if tbl.TempTableType == model.TempTableNone {
return nil
}

sessionVars := b.ctx.GetSessionVars()
if sessionVars.SnapshotTS != 0 {
return errors.New("can not read temporary table when 'tidb_snapshot' is set")
}

if sessionVars.TxnCtx.IsStaleness || b.explicitStaleness {
return errors.New("can not stale read temporary table")
}

return nil
}
95 changes: 95 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8521,3 +8521,98 @@ func (s *testResourceTagSuite) TestResourceGroupTag(c *C) {
c.Assert(checkCnt > 0, IsTrue, commentf)
}
}

func (s *testStaleTxnSuite) TestStaleOrHistoryReadTemporaryTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
safePointName := "tikv_gc_safe_point"
safePointValue := "20160102-15:04:05 -0700"
safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)

tk.MustExec("set @@tidb_enable_global_temporary_table=1")
tk.MustExec("use test")
tk.MustExec("drop table if exists tmp1")
tk.MustExec("create global temporary table tmp1 " +
"(id int not null primary key, code int not null, value int default null, unique key code(code))" +
"on commit delete rows")

// sleep 1us to make test stale
time.Sleep(time.Microsecond)

queries := []struct {
sql string
}{
{
sql: "select * from tmp1 where id=1",
},
{
sql: "select * from tmp1 where code=1",
},
{
sql: "select * from tmp1 where id in (1, 2, 3)",
},
{
sql: "select * from tmp1 where code in (1, 2, 3)",
},
{
sql: "select * from tmp1 where id > 1",
},
{
sql: "select /*+use_index(tmp1, code)*/ * from tmp1 where code > 1",
},
{
sql: "select /*+use_index(tmp1, code)*/ code from tmp1 where code > 1",
},
{
sql: "select * from tmp1 tablesample regions()",
},
{
sql: "select /*+ use_index_merge(tmp1, primary, code) */ * from tmp1 where id > 1 or code > 2",
},
}

addStaleReadToSQL := func(sql string) string {
idx := strings.Index(sql, " where ")
if idx < 0 {
return ""
}
return sql[0:idx] + " as of timestamp NOW(6)" + sql[idx:]
}

for _, query := range queries {
sql := addStaleReadToSQL(query.sql)
if sql != "" {
tk.MustGetErrMsg(sql, "can not stale read temporary table")
}
}

tk.MustExec("start transaction read only as of timestamp NOW(6)")
for _, query := range queries {
tk.MustGetErrMsg(query.sql, "can not stale read temporary table")
}
tk.MustExec("commit")

for _, query := range queries {
tk.MustExec(query.sql)
}

tk.MustExec("set transaction read only as of timestamp NOW(6)")
tk.MustExec("start transaction")
for _, query := range queries {
tk.MustGetErrMsg(query.sql, "can not stale read temporary table")
}
tk.MustExec("commit")

for _, query := range queries {
tk.MustExec(query.sql)
}

tk.MustExec("set @@tidb_snapshot=NOW(6)")
for _, query := range queries {
tk.MustGetErrMsg(query.sql, "can not read temporary table when 'tidb_snapshot' is set")
}
}
4 changes: 2 additions & 2 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ import (
)

func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
if p.TblInfo.TempTableType != model.TempTableNone && b.IsStaleness() {
b.err = errors.New("can not stale read temporary table")
if err := b.validCanReadTemporaryTable(p.TblInfo); err != nil {
b.err = err
return nil
}

Expand Down
90 changes: 0 additions & 90 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package executor_test

import (
"fmt"
"strings"
"time"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -773,92 +772,3 @@ func (s *testStaleTxnSuite) TestSetTransactionInfoSchema(c *C) {
tk.MustExec("commit")
c.Assert(tk.Se.GetInfoSchema().SchemaMetaVersion(), Equals, schemaVer3)
}

func (s *testStaleTxnSuite) TestStaleReadTemporaryTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
// For mocktikv, safe point is not initialized, we manually insert it for snapshot to use.
safePointName := "tikv_gc_safe_point"
safePointValue := "20160102-15:04:05 -0700"
safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
tk.MustExec(updateSafePoint)

tk.MustExec("set @@tidb_enable_global_temporary_table=1")
tk.MustExec("use test")
tk.MustExec("drop table if exists tmp1")
tk.MustExec("create global temporary table tmp1 " +
"(id int not null primary key, code int not null, value int default null, unique key code(code))" +
"on commit delete rows")
time.Sleep(time.Second)
tk.MustGetErrMsg("select * from tmp1 as of timestamp NOW() where id=1", "can not stale read temporary table")

queries := []struct {
sql string
}{
{
sql: "select * from tmp1 where id=1",
},
{
sql: "select * from tmp1 where code=1",
},
{
sql: "select * from tmp1 where id in (1, 2, 3)",
},
{
sql: "select * from tmp1 where code in (1, 2, 3)",
},
{
sql: "select * from tmp1 where id > 1",
},
{
sql: "select /*+use_index(tmp1, code)*/ * from tmp1 where code > 1",
},
{
sql: "select /*+use_index(tmp1, code)*/ code from tmp1 where code > 1",
},
{
sql: "select * from tmp1 tablesample regions()",
},
{
sql: "select /*+ use_index_merge(tmp1, primary, code) */ * from tmp1 where id > 1 or code > 2",
},
}

addStaleReadToSQL := func(sql string) string {
idx := strings.Index(sql, " where ")
if idx < 0 {
return ""
}
return sql[0:idx] + " as of timestamp NOW()" + sql[idx:]
}

for _, query := range queries {
sql := addStaleReadToSQL(query.sql)
if sql != "" {
tk.MustGetErrMsg(sql, "can not stale read temporary table")
}
}

tk.MustExec("start transaction read only as of timestamp NOW()")
for _, query := range queries {
tk.MustGetErrMsg(query.sql, "can not stale read temporary table")
}
tk.MustExec("commit")

for _, query := range queries {
tk.MustExec(query.sql)
}

tk.MustExec("set transaction read only as of timestamp NOW()")
tk.MustExec("start transaction")
for _, query := range queries {
tk.MustGetErrMsg(query.sql, "can not stale read temporary table")
}
tk.MustExec("commit")

for _, query := range queries {
tk.MustExec(query.sql)
}
}