diff --git a/executor/adapter.go b/executor/adapter.go index 0568c1cd8bf81..b2f69f23afb4c 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -536,7 +536,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { return nil } forUpdateTS := txnCtx.GetForUpdateTS() - err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, kv.LockAlwaysWait, keys...) + err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout, keys...) if err == nil { return nil } diff --git a/executor/executor.go b/executor/executor.go index ed9f9ed9a4282..482119db8e0a9 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -815,7 +815,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { } return nil } - var lockWaitTime = kv.LockAlwaysWait + lockWaitTime := e.ctx.GetSessionVars().LockWaitTimeout if e.Lock == ast.SelectLockForUpdateNoWait { lockWaitTime = kv.LockNoWait } diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index a7270e97d6904..fdb2984e1c1a2 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -262,7 +262,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan { if !sessVars.IsAutocommit() || sessVars.InTxn() { fp.Lock = true fp.IsForUpdate = true - fp.LockWaitTime = kv.LockAlwaysWait + fp.LockWaitTime = sessVars.LockWaitTimeout if x.LockTp == ast.SelectLockForUpdateNoWait { fp.LockWaitTime = kv.LockNoWait } @@ -621,7 +621,7 @@ func newPointGetPlan(ctx sessionctx.Context, dbName string, schema *expression.S schema: schema, TblInfo: tbl, outputNames: names, - LockWaitTime: kv.LockAlwaysWait, + LockWaitTime: ctx.GetSessionVars().LockWaitTimeout, } ctx.GetSessionVars().StmtCtx.Tables = []stmtctx.TableEntry{{DB: ctx.GetSessionVars().CurrentDB, Table: tbl.Name.L}} return p diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index d54a9dd659e22..d31c0fc976b15 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -567,3 +567,91 @@ func (s *testPessimisticSuite) TestKillStopTTLManager(c *C) { // This query should success rather than returning a ResolveLock error. tk2.MustExec("update test_kill set c = c + 1 where id = 1") } + +func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists tk") + tk.MustExec("create table tk (c1 int primary key, c2 int)") + tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,5)") + // tk set global + tk.MustExec("set global innodb_lock_wait_timeout = 3") + tk.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 50")) + + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3")) + tk2.MustExec("set innodb_lock_wait_timeout = 2") + tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 2")) + + tk3 := testkit.NewTestKitWithInit(c, s.store) + tk3.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3")) + tk3.MustExec("set innodb_lock_wait_timeout = 1") + tk3.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1")) + + tk2.MustExec("set @@autocommit = 0") + tk3.MustExec("set @@autocommit = 0") + + tk4 := testkit.NewTestKitWithInit(c, s.store) + tk4.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3")) + tk4.MustExec("set @@autocommit = 0") + + // tk2 lock c1 = 1 + tk2.MustExec("begin pessimistic") + tk2.MustExec("select * from tk where c1 = 1 for update") // lock succ c1 = 1 + + // tk3 try lock c1 = 1 timeout 1sec + tk3.MustExec("begin pessimistic") + start := time.Now() + _, err := tk3.Exec("select * from tk where c1 = 1 for update") + c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big + c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + + tk4.MustExec("begin pessimistic") + tk4.MustExec("update tk set c2 = c2 + 1 where c1 = 2") // lock succ c1 = 2 by update + start = time.Now() + _, err = tk2.Exec("update tk set c2 = c2 - 1 where c1 = 2") + c.Check(time.Since(start), GreaterEqual, time.Duration(2000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(2100*time.Millisecond)) // unit test diff should not be too big + c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + + tk2.MustExec("set innodb_lock_wait_timeout = 1") + tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1")) + start = time.Now() + _, err = tk2.Exec("delete from tk where c1 = 2") + c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big + c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + + tk2.MustExec("commit") + tk3.MustExec("commit") + tk4.MustExec("commit") + + tk.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 50")) + tk.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 3")) // tk4 update commit work, tk2 delete should be rollbacked + + // test stmtRollBack caused by timeout but not the whole transaction + tk2.MustExec("begin pessimistic") + tk2.MustExec("update tk set c2 = c2 + 2 where c1 = 2") // tk2 lock succ c1 = 2 by update + tk2.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 5")) // tk2 update c2 succ + + tk3.MustExec("begin pessimistic") + tk3.MustExec("select * from tk where c1 = 3 for update") // tk3 lock c1 = 3 succ + + start = time.Now() + _, err = tk2.Exec("delete from tk where c1 = 3") // tk2 tries to lock c1 = 3 fail, this delete should be rollback, but previous update should be keeped + c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big + c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + + tk2.MustExec("commit") + tk3.MustExec("commit") + + tk.MustQuery(`select * from tk where c1 = 1`).Check(testkit.Rows("1 1")) + tk.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 5")) // tk2 update succ + tk.MustQuery(`select * from tk where c1 = 3`).Check(testkit.Rows("3 3")) // tk2 delete should fail + tk.MustQuery(`select * from tk where c1 = 4`).Check(testkit.Rows("4 4")) + tk.MustQuery(`select * from tk where c1 = 5`).Check(testkit.Rows("5 5")) + + // clean + tk.MustExec("drop table if exists tk") +} diff --git a/session/session.go b/session/session.go index e7775c4949f43..3bb83e2ac2f10 100644 --- a/session/session.go +++ b/session/session.go @@ -1793,6 +1793,7 @@ var builtinGlobalVariable = []string{ variable.CollationServer, variable.NetWriteTimeout, variable.MaxExecutionTime, + variable.InnodbLockWaitTimeout, /* TiDB specific global variables: */ variable.TiDBSkipUTF8Check, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index cdf3fd7b8d882..9f04af1451ead 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -447,6 +447,10 @@ type SessionVars struct { isolationReadEngines map[kv.StoreType]struct{} PlannerSelectBlockAsName []ast.HintTable + + // LockWaitTimeout is the duration waiting for pessimistic lock in milliseconds + // negative value means nowait, 0 means default behavior, others means actual wait time + LockWaitTimeout int64 } // PreparedParams contains the parameters of the current prepared statement when executing it. @@ -524,6 +528,7 @@ func NewSessionVars() *SessionVars { AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc, UsePlanBaselines: DefTiDBUsePlanBaselines, isolationReadEngines: map[kv.StoreType]struct{}{kv.TiKV: {}, kv.TiFlash: {}}, + LockWaitTimeout: DefInnodbLockWaitTimeout * 1000, } vars.Concurrency = Concurrency{ IndexLookupConcurrency: DefIndexLookupConcurrency, @@ -812,6 +817,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { case MaxExecutionTime: timeoutMS := tidbOptPositiveInt32(val, 0) s.MaxExecutionTime = uint64(timeoutMS) + case InnodbLockWaitTimeout: + lockWaitSec := tidbOptInt64(val, DefInnodbLockWaitTimeout) + s.LockWaitTimeout = int64(lockWaitSec * 1000) case TiDBSkipUTF8Check: s.SkipUTF8Check = TiDBOptOn(val) case TiDBOptAggPushDown: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index e16cc14d3df73..6614ca9f38ee3 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -570,7 +570,7 @@ var defaultSysVars = []*SysVar{ {ScopeNone, "basedir", "/usr/local/mysql"}, {ScopeGlobal, "innodb_old_blocks_time", "1000"}, {ScopeGlobal, "innodb_stats_method", "nulls_equal"}, - {ScopeGlobal | ScopeSession, InnodbLockWaitTimeout, "50"}, + {ScopeGlobal | ScopeSession, InnodbLockWaitTimeout, strconv.FormatInt(DefInnodbLockWaitTimeout, 10)}, {ScopeGlobal, LocalInFile, "1"}, {ScopeGlobal | ScopeSession, "myisam_stats_method", "nulls_unequal"}, {ScopeNone, "version_compile_os", "osx10.8"}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 54265be1e8e68..f44774bc67e7b 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -413,6 +413,7 @@ const ( DefTiDBEnableNoopFuncs = false DefTiDBAllowRemoveAutoInc = false DefTiDBUsePlanBaselines = true + DefInnodbLockWaitTimeout = 50 // 50s ) // Process global variables. diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 9391ade5b5c2c..e623821139829 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -676,7 +676,17 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * IsFirstLock: c.isFirstLock, WaitTimeout: action.lockWaitTime, }, pb.Context{Priority: c.priority, SyncLog: c.syncLog}) + lockWaitStartTime := time.Now() for { + // if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit + if action.lockWaitTime > 0 { + timeLeft := action.lockWaitTime - (time.Since(lockWaitStartTime)).Milliseconds() + if timeLeft <= 0 { + req.PessimisticLock().WaitTimeout = kv.LockNoWait + } else { + req.PessimisticLock().WaitTimeout = timeLeft + } + } resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) if err != nil { return errors.Trace(err) @@ -726,15 +736,28 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * // if the lock left behind whose related txn is already committed or rollbacked, // (eg secondary locks not committed or rollbacked yet) // we cant return "nowait conflict" directly - if action.lockWaitTime == kv.LockNoWait && lock.LockType == pb.Op_PessimisticLock { - // the pessimistic lock found could be lock left behind(timeout but not recycled yet) - if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { - return ErrLockAcquireFailAndNoWaitSet + if lock.LockType == pb.Op_PessimisticLock { + if action.lockWaitTime == kv.LockNoWait { + // the pessimistic lock found could be invalid locks which is timeout but not recycled yet + if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { + return ErrLockAcquireFailAndNoWaitSet + } + } else if action.lockWaitTime == kv.LockAlwaysWait { + // do nothing but keep wait + } else { + // the lockWaitTime is set, check the lock wait timeout or not + // the pessimistic lock found could be invalid locks which is timeout but not recycled yet + if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { + if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime { + return ErrLockWaitTimeout + } + } } } locks = append(locks, lock) } // Because we already waited on tikv, no need to Backoff here. + // tikv default will wait 3s(also the maximum wait value) when lock error occurs _, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks) if err != nil { return errors.Trace(err) diff --git a/store/tikv/error.go b/store/tikv/error.go index 30879aa85b9e7..cc5118ce112e2 100644 --- a/store/tikv/error.go +++ b/store/tikv/error.go @@ -40,6 +40,7 @@ var ( ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly]) ErrQueryInterrupted = terror.ClassTiKV.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted]) ErrLockAcquireFailAndNoWaitSet = terror.ClassTiKV.New(mysql.ErrLockAcquireFailAndNoWaitSet, mysql.MySQLErrName[mysql.ErrLockAcquireFailAndNoWaitSet]) + ErrLockWaitTimeout = terror.ClassTiKV.New(mysql.ErrLockWaitTimeout, mysql.MySQLErrName[mysql.ErrLockWaitTimeout]) ) // ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface. @@ -64,6 +65,7 @@ func init() { mysql.ErrTruncatedWrongValue: mysql.ErrTruncatedWrongValue, mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted, mysql.ErrLockAcquireFailAndNoWaitSet: mysql.ErrLockAcquireFailAndNoWaitSet, + mysql.ErrLockWaitTimeout: mysql.ErrLockWaitTimeout, } terror.ErrClassToMySQLCodes[terror.ClassTiKV] = tikvMySQLErrCodes }