From 5ac2253b21f96cf843a292222f668e7ba552198e Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 4 Nov 2019 17:59:05 +0800 Subject: [PATCH 1/3] support lock wait timeout --- executor/adapter.go | 2 +- executor/executor.go | 2 +- planner/core/point_get_plan.go | 4 +- session/pessimistic_test.go | 88 ++++++++++++++++++++++++++++++++ session/session.go | 1 + sessionctx/variable/session.go | 7 +++ sessionctx/variable/sysvar.go | 2 +- sessionctx/variable/tidb_vars.go | 1 + store/tikv/2pc.go | 22 ++++++-- store/tikv/error.go | 2 + 10 files changed, 122 insertions(+), 9 deletions(-) diff --git a/executor/adapter.go b/executor/adapter.go index 0568c1cd8bf81..b2f69f23afb4c 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -536,7 +536,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { return nil } forUpdateTS := txnCtx.GetForUpdateTS() - err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, kv.LockAlwaysWait, keys...) + err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout, keys...) if err == nil { return nil } diff --git a/executor/executor.go b/executor/executor.go index ed9f9ed9a4282..cf5b8b588e41a 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -815,7 +815,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { } return nil } - var lockWaitTime = kv.LockAlwaysWait + var lockWaitTime = e.ctx.GetSessionVars().LockWaitTimeout if e.Lock == ast.SelectLockForUpdateNoWait { lockWaitTime = kv.LockNoWait } diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index a7270e97d6904..fdb2984e1c1a2 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -262,7 +262,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan { if !sessVars.IsAutocommit() || sessVars.InTxn() { fp.Lock = true fp.IsForUpdate = true - fp.LockWaitTime = kv.LockAlwaysWait + fp.LockWaitTime = sessVars.LockWaitTimeout if x.LockTp == ast.SelectLockForUpdateNoWait { fp.LockWaitTime = kv.LockNoWait } @@ -621,7 +621,7 @@ func newPointGetPlan(ctx sessionctx.Context, dbName string, schema *expression.S schema: schema, TblInfo: tbl, outputNames: names, - LockWaitTime: kv.LockAlwaysWait, + LockWaitTime: ctx.GetSessionVars().LockWaitTimeout, } ctx.GetSessionVars().StmtCtx.Tables = []stmtctx.TableEntry{{DB: ctx.GetSessionVars().CurrentDB, Table: tbl.Name.L}} return p diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index d54a9dd659e22..d31c0fc976b15 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -567,3 +567,91 @@ func (s *testPessimisticSuite) TestKillStopTTLManager(c *C) { // This query should success rather than returning a ResolveLock error. tk2.MustExec("update test_kill set c = c + 1 where id = 1") } + +func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists tk") + tk.MustExec("create table tk (c1 int primary key, c2 int)") + tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,5)") + // tk set global + tk.MustExec("set global innodb_lock_wait_timeout = 3") + tk.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 50")) + + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3")) + tk2.MustExec("set innodb_lock_wait_timeout = 2") + tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 2")) + + tk3 := testkit.NewTestKitWithInit(c, s.store) + tk3.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3")) + tk3.MustExec("set innodb_lock_wait_timeout = 1") + tk3.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1")) + + tk2.MustExec("set @@autocommit = 0") + tk3.MustExec("set @@autocommit = 0") + + tk4 := testkit.NewTestKitWithInit(c, s.store) + tk4.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3")) + tk4.MustExec("set @@autocommit = 0") + + // tk2 lock c1 = 1 + tk2.MustExec("begin pessimistic") + tk2.MustExec("select * from tk where c1 = 1 for update") // lock succ c1 = 1 + + // tk3 try lock c1 = 1 timeout 1sec + tk3.MustExec("begin pessimistic") + start := time.Now() + _, err := tk3.Exec("select * from tk where c1 = 1 for update") + c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big + c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + + tk4.MustExec("begin pessimistic") + tk4.MustExec("update tk set c2 = c2 + 1 where c1 = 2") // lock succ c1 = 2 by update + start = time.Now() + _, err = tk2.Exec("update tk set c2 = c2 - 1 where c1 = 2") + c.Check(time.Since(start), GreaterEqual, time.Duration(2000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(2100*time.Millisecond)) // unit test diff should not be too big + c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + + tk2.MustExec("set innodb_lock_wait_timeout = 1") + tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1")) + start = time.Now() + _, err = tk2.Exec("delete from tk where c1 = 2") + c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big + c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + + tk2.MustExec("commit") + tk3.MustExec("commit") + tk4.MustExec("commit") + + tk.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 50")) + tk.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 3")) // tk4 update commit work, tk2 delete should be rollbacked + + // test stmtRollBack caused by timeout but not the whole transaction + tk2.MustExec("begin pessimistic") + tk2.MustExec("update tk set c2 = c2 + 2 where c1 = 2") // tk2 lock succ c1 = 2 by update + tk2.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 5")) // tk2 update c2 succ + + tk3.MustExec("begin pessimistic") + tk3.MustExec("select * from tk where c1 = 3 for update") // tk3 lock c1 = 3 succ + + start = time.Now() + _, err = tk2.Exec("delete from tk where c1 = 3") // tk2 tries to lock c1 = 3 fail, this delete should be rollback, but previous update should be keeped + c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big + c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + + tk2.MustExec("commit") + tk3.MustExec("commit") + + tk.MustQuery(`select * from tk where c1 = 1`).Check(testkit.Rows("1 1")) + tk.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 5")) // tk2 update succ + tk.MustQuery(`select * from tk where c1 = 3`).Check(testkit.Rows("3 3")) // tk2 delete should fail + tk.MustQuery(`select * from tk where c1 = 4`).Check(testkit.Rows("4 4")) + tk.MustQuery(`select * from tk where c1 = 5`).Check(testkit.Rows("5 5")) + + // clean + tk.MustExec("drop table if exists tk") +} diff --git a/session/session.go b/session/session.go index e7775c4949f43..3bb83e2ac2f10 100644 --- a/session/session.go +++ b/session/session.go @@ -1793,6 +1793,7 @@ var builtinGlobalVariable = []string{ variable.CollationServer, variable.NetWriteTimeout, variable.MaxExecutionTime, + variable.InnodbLockWaitTimeout, /* TiDB specific global variables: */ variable.TiDBSkipUTF8Check, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index cdf3fd7b8d882..a6a6800b7e5e3 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -447,6 +447,9 @@ type SessionVars struct { isolationReadEngines map[kv.StoreType]struct{} PlannerSelectBlockAsName []ast.HintTable + + // Lock wait timeout for pessimistic transaction in milliseconds, `innodb_lock_wait_timeout` is in seconds + LockWaitTimeout int64 } // PreparedParams contains the parameters of the current prepared statement when executing it. @@ -524,6 +527,7 @@ func NewSessionVars() *SessionVars { AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc, UsePlanBaselines: DefTiDBUsePlanBaselines, isolationReadEngines: map[kv.StoreType]struct{}{kv.TiKV: {}, kv.TiFlash: {}}, + LockWaitTimeout: DefInnodbLockWaitTimeout * 1000, } vars.Concurrency = Concurrency{ IndexLookupConcurrency: DefIndexLookupConcurrency, @@ -812,6 +816,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { case MaxExecutionTime: timeoutMS := tidbOptPositiveInt32(val, 0) s.MaxExecutionTime = uint64(timeoutMS) + case InnodbLockWaitTimeout: + lockWaitSec := tidbOptInt64(val, DefInnodbLockWaitTimeout) + s.LockWaitTimeout = int64(lockWaitSec * 1000) case TiDBSkipUTF8Check: s.SkipUTF8Check = TiDBOptOn(val) case TiDBOptAggPushDown: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index e16cc14d3df73..6614ca9f38ee3 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -570,7 +570,7 @@ var defaultSysVars = []*SysVar{ {ScopeNone, "basedir", "/usr/local/mysql"}, {ScopeGlobal, "innodb_old_blocks_time", "1000"}, {ScopeGlobal, "innodb_stats_method", "nulls_equal"}, - {ScopeGlobal | ScopeSession, InnodbLockWaitTimeout, "50"}, + {ScopeGlobal | ScopeSession, InnodbLockWaitTimeout, strconv.FormatInt(DefInnodbLockWaitTimeout, 10)}, {ScopeGlobal, LocalInFile, "1"}, {ScopeGlobal | ScopeSession, "myisam_stats_method", "nulls_unequal"}, {ScopeNone, "version_compile_os", "osx10.8"}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 54265be1e8e68..f44774bc67e7b 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -413,6 +413,7 @@ const ( DefTiDBEnableNoopFuncs = false DefTiDBAllowRemoveAutoInc = false DefTiDBUsePlanBaselines = true + DefInnodbLockWaitTimeout = 50 // 50s ) // Process global variables. diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index d5d94a5c7cdaf..91a1a654dc4c3 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -676,6 +676,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * IsFirstLock: c.isFirstLock, WaitTimeout: action.lockWaitTime, }, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) + lockWaitStartTime := time.Now() for { resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) if err != nil { @@ -726,15 +727,28 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * // if the lock left behind whose related txn is already committed or rollbacked, // (eg secondary locks not committed or rollbacked yet) // we cant return "nowait conflict" directly - if action.lockWaitTime == kv.LockNoWait && lock.LockType == pb.Op_PessimisticLock { - // the pessimistic lock found could be lock left behind(timeout but not recycled yet) - if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { - return ErrLockAcquireFailAndNoWaitSet + if lock.LockType == pb.Op_PessimisticLock { + if action.lockWaitTime == kv.LockNoWait { + // the pessimistic lock found could be lock left behind(timeout but not recycled yet) + if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { + return ErrLockAcquireFailAndNoWaitSet + } + } else if action.lockWaitTime == kv.LockAlwaysWait { + // do nothing but keep wait + } else { + // user has set the `InnodbLockWaitTimeout`, check timeout + // the pessimistic lock found could be lock left behind(timeout but not recycled yet) + if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { + if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime { + return ErrLockWaitTimeout + } + } } } locks = append(locks, lock) } // Because we already waited on tikv, no need to Backoff here. + // tikv default will wait 3s(also the maximum wait value) when lock error occurs _, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks) if err != nil { return errors.Trace(err) diff --git a/store/tikv/error.go b/store/tikv/error.go index 30879aa85b9e7..cc5118ce112e2 100644 --- a/store/tikv/error.go +++ b/store/tikv/error.go @@ -40,6 +40,7 @@ var ( ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly]) ErrQueryInterrupted = terror.ClassTiKV.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted]) ErrLockAcquireFailAndNoWaitSet = terror.ClassTiKV.New(mysql.ErrLockAcquireFailAndNoWaitSet, mysql.MySQLErrName[mysql.ErrLockAcquireFailAndNoWaitSet]) + ErrLockWaitTimeout = terror.ClassTiKV.New(mysql.ErrLockWaitTimeout, mysql.MySQLErrName[mysql.ErrLockWaitTimeout]) ) // ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface. @@ -64,6 +65,7 @@ func init() { mysql.ErrTruncatedWrongValue: mysql.ErrTruncatedWrongValue, mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted, mysql.ErrLockAcquireFailAndNoWaitSet: mysql.ErrLockAcquireFailAndNoWaitSet, + mysql.ErrLockWaitTimeout: mysql.ErrLockWaitTimeout, } terror.ErrClassToMySQLCodes[terror.ClassTiKV] = tikvMySQLErrCodes } From a05099aaa8db22a186aa4961b2350d743d4d5b79 Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 4 Nov 2019 20:55:34 +0800 Subject: [PATCH 2/3] refine rpc reuqest timeout param --- executor/executor.go | 2 +- sessionctx/variable/session.go | 3 ++- store/tikv/2pc.go | 15 ++++++++++++--- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/executor/executor.go b/executor/executor.go index cf5b8b588e41a..482119db8e0a9 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -815,7 +815,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { } return nil } - var lockWaitTime = e.ctx.GetSessionVars().LockWaitTimeout + lockWaitTime := e.ctx.GetSessionVars().LockWaitTimeout if e.Lock == ast.SelectLockForUpdateNoWait { lockWaitTime = kv.LockNoWait } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index a6a6800b7e5e3..9f04af1451ead 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -448,7 +448,8 @@ type SessionVars struct { PlannerSelectBlockAsName []ast.HintTable - // Lock wait timeout for pessimistic transaction in milliseconds, `innodb_lock_wait_timeout` is in seconds + // LockWaitTimeout is the duration waiting for pessimistic lock in milliseconds + // negative value means nowait, 0 means default behavior, others means actual wait time LockWaitTimeout int64 } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 91a1a654dc4c3..abda50605e6b0 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -678,6 +678,15 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * }, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) lockWaitStartTime := time.Now() for { + // if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit + if action.lockWaitTime > 0 { + timeLeft := action.lockWaitTime - (time.Since(lockWaitStartTime)).Milliseconds() + if timeLeft <= 0 { + req.PessimisticLock().WaitTimeout = kv.LockNoWait + } else { + req.PessimisticLock().WaitTimeout = timeLeft + } + } resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) if err != nil { return errors.Trace(err) @@ -729,15 +738,15 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * // we cant return "nowait conflict" directly if lock.LockType == pb.Op_PessimisticLock { if action.lockWaitTime == kv.LockNoWait { - // the pessimistic lock found could be lock left behind(timeout but not recycled yet) + // the pessimistic lock found could be invalid lock which is timeout but not recycled yet if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { return ErrLockAcquireFailAndNoWaitSet } } else if action.lockWaitTime == kv.LockAlwaysWait { // do nothing but keep wait } else { - // user has set the `InnodbLockWaitTimeout`, check timeout - // the pessimistic lock found could be lock left behind(timeout but not recycled yet) + // the lockWaitTime is set, check the lock wait timeout or not + // the pessimistic lock found could be invalid lock which is timeout but not recycled yet if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime { return ErrLockWaitTimeout From eb05a0f37f22814ae59bfff44ece6cf373777b7e Mon Sep 17 00:00:00 2001 From: cfzjywxk Date: Mon, 4 Nov 2019 21:11:11 +0800 Subject: [PATCH 3/3] comment --- store/tikv/2pc.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index abda50605e6b0..48ecfb4ab7d5c 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -738,7 +738,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * // we cant return "nowait conflict" directly if lock.LockType == pb.Op_PessimisticLock { if action.lockWaitTime == kv.LockNoWait { - // the pessimistic lock found could be invalid lock which is timeout but not recycled yet + // the pessimistic lock found could be invalid locks which is timeout but not recycled yet if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { return ErrLockAcquireFailAndNoWaitSet } @@ -746,7 +746,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * // do nothing but keep wait } else { // the lockWaitTime is set, check the lock wait timeout or not - // the pessimistic lock found could be invalid lock which is timeout but not recycled yet + // the pessimistic lock found could be invalid locks which is timeout but not recycled yet if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime { return ErrLockWaitTimeout