Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: pessimistic lock on the temporary table should not be written to TiKV #24737

Merged
merged 6 commits into from
May 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
if len(keys) == 0 {
return nil
}
keys = filterTemporaryTableKeys(sctx.GetSessionVars(), keys)
seVars := sctx.GetSessionVars()
lockCtx := newLockCtx(seVars, seVars.LockWaitTimeout)
var lockKeyStats *util.LockKeysDetails
Expand Down
23 changes: 22 additions & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,7 +1005,8 @@ func newLockCtx(seVars *variable.SessionVars, lockWaitTime int64) *tikvstore.Loc
// locked by others. used for (select for update nowait) situation
// except 0 means alwaysWait 1 means nowait
func doLockKeys(ctx context.Context, se sessionctx.Context, lockCtx *tikvstore.LockCtx, keys ...kv.Key) error {
sctx := se.GetSessionVars().StmtCtx
sessVars := se.GetSessionVars()
sctx := sessVars.StmtCtx
if !sctx.InUpdateStmt && !sctx.InDeleteStmt {
atomic.StoreUint32(&se.GetSessionVars().TxnCtx.ForUpdate, 1)
}
Expand All @@ -1014,6 +1015,10 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, lockCtx *tikvstore.L
if err != nil {
return err
}

// Skip the temporary table keys.
keys = filterTemporaryTableKeys(sessVars, keys)

var lockKeyStats *tikvutil.LockKeysDetails
ctx = context.WithValue(ctx, tikvutil.LockKeysDetailCtxKey, &lockKeyStats)
err = txn.LockKeys(tikvutil.SetSessionID(ctx, se.GetSessionVars().ConnectionID), lockCtx, keys...)
Expand All @@ -1023,6 +1028,22 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, lockCtx *tikvstore.L
return err
}

func filterTemporaryTableKeys(vars *variable.SessionVars, keys []kv.Key) []kv.Key {
txnCtx := vars.TxnCtx
if txnCtx == nil || txnCtx.GlobalTemporaryTables == nil {
return keys
}

newKeys := keys[:]
for _, key := range keys {
tblID := tablecodec.DecodeTableID(key)
if _, ok := txnCtx.GlobalTemporaryTables[tblID]; !ok {
newKeys = append(newKeys, key)
}
}
return newKeys
}

// LimitExec represents limit executor
// It ignores 'Offset' rows from src, then returns 'Count' rows at maximum.
type LimitExec struct {
Expand Down
38 changes: 38 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8240,6 +8240,44 @@ func (s *testSerialSuite) TestDeadlockTable(c *C) {
))
}

func (s *testSuite1) TestTemporaryTableNoPessimisticLock(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t;")
tk.MustExec("create global temporary table t (a int primary key, b int) on commit delete rows")
tk.MustExec("insert into t values (1, 1)")

// Do something on the temporary table, pessimistic transaction mode.
tk.MustExec("begin pessimistic")
tk.MustExec("insert into t values (2, 2)")
tk.MustExec("update t set b = b + 1 where a = 1")
tk.MustExec("delete from t where a > 1")
tk.MustQuery("select count(*) from t where b >= 2 for update")

// Get the temporary table ID.
schema := tk.Se.GetInfoSchema().(infoschema.InfoSchema)
tbl, err := schema.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
c.Assert(err, IsNil)
meta := tbl.Meta()
c.Assert(meta.TempTableType, Equals, model.TempTableGlobal)

// Scan the table range to check there is no lock.
// It's better to use the rawkv client, but the txnkv client should also works.
// If there is a lock, the txnkv client should have reported the lock error.
txn, err := s.store.Begin()
c.Assert(err, IsNil)
seekKey := tablecodec.EncodeTablePrefix(meta.ID)
endKey := tablecodec.EncodeTablePrefix(meta.ID + 1)
scanner, err := txn.Iter(seekKey, endKey)
c.Assert(err, IsNil)
for scanner.Valid() {
// No lock written to TiKV here.
c.FailNow()
}

tk.MustExec("rollback")
}

func (s testSerialSuite) TestExprBlackListForEnum(c *C) {
tk := testkit.NewTestKit(c, s.store)

Expand Down
26 changes: 0 additions & 26 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3846,29 +3846,3 @@ func (s *testIntegrationSerialSuite) TestEnforceMPP(c *C) {
" └─Selection_20 10.00 285020.00 batchCop[tiflash] eq(test.t.a, 1)",
" └─TableFullScan_19 10000.00 255020.00 batchCop[tiflash] table:t keep order:false, stats:pseudo"))
}

func (s *testIntegrationSuite) TestEliminateLockForTemporaryTable(c *C) {
tk := testkit.NewTestKit(c, s.store)

tk.MustExec("use test;")
tk.MustExec("create global temporary table t1 (a int primary key, b int, c int, index i_b(b)) on commit delete rows;")
defer func() {
tk.MustExec("drop global temporary table if exists t1;")
}()
tk.MustExec("begin;")
tk.MustExec("insert t1 values (8,8,9);")

var input []string
var output []struct {
SQL string
Plan []string
}
s.testData.GetTestCases(c, &input, &output)
for i, tt := range input {
s.testData.OnRecord(func() {
output[i].SQL = tt
output[i].Plan = s.testData.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + tt).Rows())
})
tk.MustQuery("explain format = 'brief' " + tt).Check(testkit.Rows(output[i].Plan...))
}
}
25 changes: 0 additions & 25 deletions planner/core/optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/expression"
Expand Down Expand Up @@ -187,7 +186,6 @@ func postOptimize(sctx sessionctx.Context, plan PhysicalPlan) PhysicalPlan {
plan = InjectExtraProjection(plan)
mergeContinuousSelections(plan)
plan = eliminateUnionScanAndLock(sctx, plan)
plan = eliminateLockForTemporaryTable(plan)
plan = enableParallelApply(sctx, plan)
return plan
}
Expand Down Expand Up @@ -324,29 +322,6 @@ func eliminateUnionScanAndLock(sctx sessionctx.Context, p PhysicalPlan) Physical
})
}

// eliminateLockForTemporaryTable eliminates lock for the temporary table.
func eliminateLockForTemporaryTable(p PhysicalPlan) PhysicalPlan {
iteratePhysicalPlan(p, func(p PhysicalPlan) bool {
if len(p.Children()) > 1 {
return false
}
switch x := p.(type) {
case *PointGetPlan:
if x.TblInfo.TempTableType != model.TempTableNone {
x.Lock = false
x.LockWaitTime = 0
}
case *BatchPointGetPlan:
if x.TblInfo.TempTableType != model.TempTableNone {
x.Lock = false
x.LockWaitTime = 0
}
}
return true
})
return p
}

func iteratePhysicalPlan(p PhysicalPlan, f func(p PhysicalPlan) bool) {
if !f(p) {
return
Expand Down
10 changes: 2 additions & 8 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) {
if tidbutil.IsMemDB(fp.dbName) {
return nil
}
// ignore lock for temporary table.
if fp.TblInfo.TempTableType == model.TempTableNone {
fp.Lock, fp.LockWaitTime = getLockWaitTime(ctx, x.LockInfo)
}
fp.Lock, fp.LockWaitTime = getLockWaitTime(ctx, x.LockInfo)
p = fp
return
}
Expand All @@ -484,10 +481,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) (p Plan) {
p = tableDual.Init(ctx, &property.StatsInfo{}, 0)
return
}
// ignore lock for temporary table.
if fp.TblInfo.TempTableType == model.TempTableNone {
fp.Lock, fp.LockWaitTime = getLockWaitTime(ctx, x.LockInfo)
}
fp.Lock, fp.LockWaitTime = getLockWaitTime(ctx, x.LockInfo)
p = fp
return
}
Expand Down
9 changes: 0 additions & 9 deletions planner/core/testdata/integration_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -294,14 +294,5 @@
"select sum(1) from s1",
"select count(1) as cnt from s1 union select count(1) as cnt from s2"
]
},
{
"name": "TestEliminateLockForTemporaryTable",
"cases": [
"select * from t1 where a = 2 for update",
"select * from t1 where a in (1,2) for update",
"select c + 1 from t1 where a = 2 and c = 2 for update",
"select c + 1 from t1 where a in (1,2) and c = 2 for update"
]
}
]
33 changes: 0 additions & 33 deletions planner/core/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -1564,38 +1564,5 @@
]
}
]
},
{
"Name": "TestEliminateLockForTemporaryTable",
"Cases": [
{
"SQL": "select * from t1 where a = 2 for update",
"Plan": [
"Point_Get 1.00 root table:t1 handle:2"
]
},
{
"SQL": "select * from t1 where a in (1,2) for update",
"Plan": [
"Batch_Point_Get 2.00 root table:t1 handle:[1 2], keep order:false, desc:false"
]
},
{
"SQL": "select c + 1 from t1 where a = 2 and c = 2 for update",
"Plan": [
"Projection 0.00 root plus(test.t1.c, 1)->Column#4",
"└─Selection 0.00 root eq(test.t1.c, 2)",
" └─Point_Get 1.00 root table:t1 handle:2"
]
},
{
"SQL": "select c + 1 from t1 where a in (1,2) and c = 2 for update",
"Plan": [
"Projection 0.00 root plus(test.t1.c, 1)->Column#4",
"└─Selection 0.00 root eq(test.t1.c, 2)",
" └─Batch_Point_Get 2.00 root table:t1 handle:[1 2], keep order:false, desc:false"
]
}
]
}
]