Skip to content

Commit

Permalink
executor: support innodb_lock_wait_timeout for pessimistic transaction (
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk authored and XiaTianliang committed Dec 21, 2019
1 parent 74feccb commit 1609b2c
Show file tree
Hide file tree
Showing 10 changed files with 132 additions and 9 deletions.
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, kv.LockAlwaysWait, keys...)
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout, keys...)
if err == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
var lockWaitTime = kv.LockAlwaysWait
lockWaitTime := e.ctx.GetSessionVars().LockWaitTimeout
if e.Lock == ast.SelectLockForUpdateNoWait {
lockWaitTime = kv.LockNoWait
}
Expand Down
4 changes: 2 additions & 2 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
if !sessVars.IsAutocommit() || sessVars.InTxn() {
fp.Lock = true
fp.IsForUpdate = true
fp.LockWaitTime = kv.LockAlwaysWait
fp.LockWaitTime = sessVars.LockWaitTimeout
if x.LockTp == ast.SelectLockForUpdateNoWait {
fp.LockWaitTime = kv.LockNoWait
}
Expand Down Expand Up @@ -621,7 +621,7 @@ func newPointGetPlan(ctx sessionctx.Context, dbName string, schema *expression.S
schema: schema,
TblInfo: tbl,
outputNames: names,
LockWaitTime: kv.LockAlwaysWait,
LockWaitTime: ctx.GetSessionVars().LockWaitTimeout,
}
ctx.GetSessionVars().StmtCtx.Tables = []stmtctx.TableEntry{{DB: ctx.GetSessionVars().CurrentDB, Table: tbl.Name.L}}
return p
Expand Down
88 changes: 88 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,3 +567,91 @@ func (s *testPessimisticSuite) TestKillStopTTLManager(c *C) {
// This query should success rather than returning a ResolveLock error.
tk2.MustExec("update test_kill set c = c + 1 where id = 1")
}

func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists tk")
tk.MustExec("create table tk (c1 int primary key, c2 int)")
tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,5)")
// tk set global
tk.MustExec("set global innodb_lock_wait_timeout = 3")
tk.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 50"))

tk2 := testkit.NewTestKitWithInit(c, s.store)
tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3"))
tk2.MustExec("set innodb_lock_wait_timeout = 2")
tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 2"))

tk3 := testkit.NewTestKitWithInit(c, s.store)
tk3.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3"))
tk3.MustExec("set innodb_lock_wait_timeout = 1")
tk3.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1"))

tk2.MustExec("set @@autocommit = 0")
tk3.MustExec("set @@autocommit = 0")

tk4 := testkit.NewTestKitWithInit(c, s.store)
tk4.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3"))
tk4.MustExec("set @@autocommit = 0")

// tk2 lock c1 = 1
tk2.MustExec("begin pessimistic")
tk2.MustExec("select * from tk where c1 = 1 for update") // lock succ c1 = 1

// tk3 try lock c1 = 1 timeout 1sec
tk3.MustExec("begin pessimistic")
start := time.Now()
_, err := tk3.Exec("select * from tk where c1 = 1 for update")
c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error())

tk4.MustExec("begin pessimistic")
tk4.MustExec("update tk set c2 = c2 + 1 where c1 = 2") // lock succ c1 = 2 by update
start = time.Now()
_, err = tk2.Exec("update tk set c2 = c2 - 1 where c1 = 2")
c.Check(time.Since(start), GreaterEqual, time.Duration(2000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(2100*time.Millisecond)) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error())

tk2.MustExec("set innodb_lock_wait_timeout = 1")
tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1"))
start = time.Now()
_, err = tk2.Exec("delete from tk where c1 = 2")
c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error())

tk2.MustExec("commit")
tk3.MustExec("commit")
tk4.MustExec("commit")

tk.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 50"))
tk.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 3")) // tk4 update commit work, tk2 delete should be rollbacked

// test stmtRollBack caused by timeout but not the whole transaction
tk2.MustExec("begin pessimistic")
tk2.MustExec("update tk set c2 = c2 + 2 where c1 = 2") // tk2 lock succ c1 = 2 by update
tk2.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 5")) // tk2 update c2 succ

tk3.MustExec("begin pessimistic")
tk3.MustExec("select * from tk where c1 = 3 for update") // tk3 lock c1 = 3 succ

start = time.Now()
_, err = tk2.Exec("delete from tk where c1 = 3") // tk2 tries to lock c1 = 3 fail, this delete should be rollback, but previous update should be keeped
c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error())

tk2.MustExec("commit")
tk3.MustExec("commit")

tk.MustQuery(`select * from tk where c1 = 1`).Check(testkit.Rows("1 1"))
tk.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 5")) // tk2 update succ
tk.MustQuery(`select * from tk where c1 = 3`).Check(testkit.Rows("3 3")) // tk2 delete should fail
tk.MustQuery(`select * from tk where c1 = 4`).Check(testkit.Rows("4 4"))
tk.MustQuery(`select * from tk where c1 = 5`).Check(testkit.Rows("5 5"))

// clean
tk.MustExec("drop table if exists tk")
}
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,7 @@ var builtinGlobalVariable = []string{
variable.CollationServer,
variable.NetWriteTimeout,
variable.MaxExecutionTime,
variable.InnodbLockWaitTimeout,

/* TiDB specific global variables: */
variable.TiDBSkipUTF8Check,
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,10 @@ type SessionVars struct {
isolationReadEngines map[kv.StoreType]struct{}

PlannerSelectBlockAsName []ast.HintTable

// LockWaitTimeout is the duration waiting for pessimistic lock in milliseconds
// negative value means nowait, 0 means default behavior, others means actual wait time
LockWaitTimeout int64
}

// PreparedParams contains the parameters of the current prepared statement when executing it.
Expand Down Expand Up @@ -524,6 +528,7 @@ func NewSessionVars() *SessionVars {
AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc,
UsePlanBaselines: DefTiDBUsePlanBaselines,
isolationReadEngines: map[kv.StoreType]struct{}{kv.TiKV: {}, kv.TiFlash: {}},
LockWaitTimeout: DefInnodbLockWaitTimeout * 1000,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -812,6 +817,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
case MaxExecutionTime:
timeoutMS := tidbOptPositiveInt32(val, 0)
s.MaxExecutionTime = uint64(timeoutMS)
case InnodbLockWaitTimeout:
lockWaitSec := tidbOptInt64(val, DefInnodbLockWaitTimeout)
s.LockWaitTimeout = int64(lockWaitSec * 1000)
case TiDBSkipUTF8Check:
s.SkipUTF8Check = TiDBOptOn(val)
case TiDBOptAggPushDown:
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ var defaultSysVars = []*SysVar{
{ScopeNone, "basedir", "/usr/local/mysql"},
{ScopeGlobal, "innodb_old_blocks_time", "1000"},
{ScopeGlobal, "innodb_stats_method", "nulls_equal"},
{ScopeGlobal | ScopeSession, InnodbLockWaitTimeout, "50"},
{ScopeGlobal | ScopeSession, InnodbLockWaitTimeout, strconv.FormatInt(DefInnodbLockWaitTimeout, 10)},
{ScopeGlobal, LocalInFile, "1"},
{ScopeGlobal | ScopeSession, "myisam_stats_method", "nulls_unequal"},
{ScopeNone, "version_compile_os", "osx10.8"},
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ const (
DefTiDBEnableNoopFuncs = false
DefTiDBAllowRemoveAutoInc = false
DefTiDBUsePlanBaselines = true
DefInnodbLockWaitTimeout = 50 // 50s
)

// Process global variables.
Expand Down
31 changes: 27 additions & 4 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,17 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
IsFirstLock: c.isFirstLock,
WaitTimeout: action.lockWaitTime,
}, pb.Context{Priority: c.priority, SyncLog: c.syncLog})
lockWaitStartTime := time.Now()
for {
// if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit
if action.lockWaitTime > 0 {
timeLeft := action.lockWaitTime - (time.Since(lockWaitStartTime)).Milliseconds()
if timeLeft <= 0 {
req.PessimisticLock().WaitTimeout = kv.LockNoWait
} else {
req.PessimisticLock().WaitTimeout = timeLeft
}
}
resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -726,15 +736,28 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo *
// if the lock left behind whose related txn is already committed or rollbacked,
// (eg secondary locks not committed or rollbacked yet)
// we cant return "nowait conflict" directly
if action.lockWaitTime == kv.LockNoWait && lock.LockType == pb.Op_PessimisticLock {
// the pessimistic lock found could be lock left behind(timeout but not recycled yet)
if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) {
return ErrLockAcquireFailAndNoWaitSet
if lock.LockType == pb.Op_PessimisticLock {
if action.lockWaitTime == kv.LockNoWait {
// the pessimistic lock found could be invalid locks which is timeout but not recycled yet
if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) {
return ErrLockAcquireFailAndNoWaitSet
}
} else if action.lockWaitTime == kv.LockAlwaysWait {
// do nothing but keep wait
} else {
// the lockWaitTime is set, check the lock wait timeout or not
// the pessimistic lock found could be invalid locks which is timeout but not recycled yet
if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) {
if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime {
return ErrLockWaitTimeout
}
}
}
}
locks = append(locks, lock)
}
// Because we already waited on tikv, no need to Backoff here.
// tikv default will wait 3s(also the maximum wait value) when lock error occurs
_, err = c.store.lockResolver.ResolveLocks(bo, c.startTS, locks)
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 2 additions & 0 deletions store/tikv/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly])
ErrQueryInterrupted = terror.ClassTiKV.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted])
ErrLockAcquireFailAndNoWaitSet = terror.ClassTiKV.New(mysql.ErrLockAcquireFailAndNoWaitSet, mysql.MySQLErrName[mysql.ErrLockAcquireFailAndNoWaitSet])
ErrLockWaitTimeout = terror.ClassTiKV.New(mysql.ErrLockWaitTimeout, mysql.MySQLErrName[mysql.ErrLockWaitTimeout])
)

// ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface.
Expand All @@ -64,6 +65,7 @@ func init() {
mysql.ErrTruncatedWrongValue: mysql.ErrTruncatedWrongValue,
mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted,
mysql.ErrLockAcquireFailAndNoWaitSet: mysql.ErrLockAcquireFailAndNoWaitSet,
mysql.ErrLockWaitTimeout: mysql.ErrLockWaitTimeout,
}
terror.ErrClassToMySQLCodes[terror.ClassTiKV] = tikvMySQLErrCodes
}

0 comments on commit 1609b2c

Please sign in to comment.