diff --git a/ddl/index.go b/ddl/index.go index 3921499ade8ec..92d5ae0712903 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -869,7 +869,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx // Lock the row key to notify us that someone delete or update the row, // then we should not backfill the index of it, otherwise the adding index is redundant. - err := txn.LockKeys(context.Background(), nil, 0, idxRecord.key) + err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, idxRecord.key) if err != nil { return errors.Trace(err) } diff --git a/executor/adapter.go b/executor/adapter.go index deb91247c48b2..ffa9f3b282ba9 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -460,7 +460,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error { return nil } forUpdateTS := txnCtx.GetForUpdateTS() - err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, keys...) + err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout, keys...) if err == nil { return nil } diff --git a/executor/admin.go b/executor/admin.go index a177c9fff1e5f..e0528620ad6dc 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -431,7 +431,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa } recordKey := e.table.RecordKey(row.handle) - err := txn.LockKeys(ctx, nil, 0, recordKey) + err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, recordKey) if err != nil { return result, err } diff --git a/executor/executor.go b/executor/executor.go index a9d2367ad24f0..0b1327607d314 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -785,10 +785,11 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error { } return nil } - return doLockKeys(ctx, e.ctx, e.keys...) + lockWaitTime := e.ctx.GetSessionVars().LockWaitTimeout + return doLockKeys(ctx, e.ctx, lockWaitTime, e.keys...) } -func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) error { +func doLockKeys(ctx context.Context, se sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error { se.GetSessionVars().TxnCtx.ForUpdate = true // Lock keys only once when finished fetching all results. txn, err := se.Txn(true) @@ -796,7 +797,7 @@ func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) erro return err } forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS() - return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, keys...) + return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, lockWaitTime, keys...) } // LimitExec represents limit executor diff --git a/executor/point_get.go b/executor/point_get.go index d480f1ac1f913..2da6cd6011e02 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -44,6 +44,7 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { handle: p.Handle, startTS: startTS, lock: p.Lock, + lockWaitTime: p.LockWaitTime, } b.isSelectForUpdate = p.IsForUpdate e.base().initCap = 1 @@ -55,14 +56,15 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor { type PointGetExecutor struct { baseExecutor - tblInfo *model.TableInfo - handle int64 - idxInfo *model.IndexInfo - idxVals []types.Datum - startTS uint64 - snapshot kv.Snapshot - done bool - lock bool + tblInfo *model.TableInfo + handle int64 + idxInfo *model.IndexInfo + idxVals []types.Datum + startTS uint64 + snapshot kv.Snapshot + done bool + lock bool + lockWaitTime int64 } // Open implements the Executor interface. @@ -145,7 +147,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error { if e.lock { - return doLockKeys(ctx, e.ctx, key) + return doLockKeys(ctx, e.ctx, e.lockWaitTime, key) } return nil } diff --git a/go.mod b/go.mod index ff24d6fa46cfa..491960312256f 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( github.com/pingcap/errors v0.11.4 github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e - github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18 + github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd github.com/pingcap/parser v0.0.0-20191018040038-555b97093a2a github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd diff --git a/go.sum b/go.sum index 6955852681809..8c20c2d36c8d3 100644 --- a/go.sum +++ b/go.sum @@ -163,8 +163,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8= github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw= github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= -github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18 h1:5vQV8S/8B9nE+I+0Me6vZGyASeXl/QymwqtaOL5e5ZA= -github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= +github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d h1:zTHgLr8+0LTEJmjf8yHilgmNhdrVlCN/RW7NeO8IRsE= +github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg= github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw= github.com/pingcap/parser v0.0.0-20191018040038-555b97093a2a h1:PMjYrxWKdVUlJ77+9YHbYVciDQCyqZ/noS9nIni76KQ= diff --git a/kv/kv.go b/kv/kv.go index 7ca90087377e2..5194f99832623 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -136,7 +136,7 @@ type Transaction interface { // String implements fmt.Stringer interface. String() string // LockKeys tries to lock the entries with the keys in KV store. - LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, keys ...Key) error + LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keys ...Key) error // SetOption sets an option with a value, when val is nil, uses the default // value of this option. SetOption(opt Option, val interface{}) @@ -304,3 +304,11 @@ type SplitableStore interface { WaitScatterRegionFinish(regionID uint64, backOff int) error CheckRegionInScattering(regionID uint64) (bool, error) } + +// Used for pessimistic lock wait time +// these two constants are special for lock protocol with tikv +// 0 means always wait, -1 means nowait, others meaning lock wait in milliseconds +var ( + LockAlwaysWait = int64(0) + LockNoWait = int64(-1) +) diff --git a/kv/mock.go b/kv/mock.go index 877ab09ff9344..352e3bbd29866 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -39,7 +39,7 @@ func (t *mockTxn) String() string { return "" } -func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ ...Key) error { +func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ ...Key) error { return nil } diff --git a/kv/mock_test.go b/kv/mock_test.go index 4cbe5631e0610..b6fd8b192638f 100644 --- a/kv/mock_test.go +++ b/kv/mock_test.go @@ -37,7 +37,7 @@ func (s testMockSuite) TestInterface(c *C) { transaction, err := storage.Begin() c.Check(err, IsNil) - err = transaction.LockKeys(context.Background(), nil, 0, Key("lock")) + err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, Key("lock")) c.Check(err, IsNil) transaction.SetOption(Option(23), struct{}{}) if mock, ok := transaction.(*mockTxn); ok { diff --git a/planner/core/point_get_plan.go b/planner/core/point_get_plan.go index 1323171805f89..38312cd5a0f30 100644 --- a/planner/core/point_get_plan.go +++ b/planner/core/point_get_plan.go @@ -53,6 +53,7 @@ type PointGetPlan struct { IsTableDual bool Lock bool IsForUpdate bool + LockWaitTime int64 } type nameValuePair struct { @@ -161,6 +162,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan { if !sessVars.IsAutocommit() || sessVars.InTxn() { fp.Lock = true fp.IsForUpdate = true + fp.LockWaitTime = sessVars.LockWaitTimeout } } return fp @@ -284,10 +286,11 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP func newPointGetPlan(ctx sessionctx.Context, dbName string, schema *expression.Schema, tbl *model.TableInfo) *PointGetPlan { p := &PointGetPlan{ - basePlan: newBasePlan(ctx, plancodec.TypePointGet), - dbName: dbName, - schema: schema, - TblInfo: tbl, + basePlan: newBasePlan(ctx, plancodec.TypePointGet), + dbName: dbName, + schema: schema, + TblInfo: tbl, + LockWaitTime: ctx.GetSessionVars().LockWaitTimeout, } ctx.GetSessionVars().StmtCtx.Tables = []stmtctx.TableEntry{{DB: ctx.GetSessionVars().CurrentDB, Table: tbl.Name.L}} return p diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index 7ec471c148f11..ef9693720c137 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -420,3 +420,88 @@ func (s *testPessimisticSuite) TestWaitLockKill(c *C) { c.Assert(terror.ErrorEqual(err, tikv.ErrQueryInterrupted), IsTrue) tk.MustExec("rollback") } + +func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("drop table if exists tk") + tk.MustExec("create table tk (c1 int primary key, c2 int)") + tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,5)") + // tk set global + tk.MustExec("set global innodb_lock_wait_timeout = 3") + tk.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 50")) + + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3")) + tk2.MustExec("set innodb_lock_wait_timeout = 2") + tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 2")) + + tk3 := testkit.NewTestKitWithInit(c, s.store) + tk3.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3")) + tk3.MustExec("set innodb_lock_wait_timeout = 1") + tk3.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1")) + + tk2.MustExec("set @@autocommit = 0") + tk3.MustExec("set @@autocommit = 0") + + tk4 := testkit.NewTestKitWithInit(c, s.store) + tk4.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3")) + tk4.MustExec("set @@autocommit = 0") + + // tk2 lock c1 = 1 + tk2.MustExec("begin pessimistic") + tk2.MustExec("select * from tk where c1 = 1 for update") // lock succ c1 = 1 + + // tk3 try lock c1 = 1 timeout 1sec + tk3.MustExec("begin pessimistic") + start := time.Now() + _, err := tk3.Exec("select * from tk where c1 = 1 for update") + c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big + c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + + tk4.MustExec("begin pessimistic") + tk4.MustExec("update tk set c2 = c2 + 1 where c1 = 2") // lock succ c1 = 2 by update + start = time.Now() + _, err = tk2.Exec("update tk set c2 = c2 - 1 where c1 = 2") + c.Check(time.Since(start), GreaterEqual, time.Duration(2000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(2100*time.Millisecond)) // unit test diff should not be too big + c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + + tk2.MustExec("set innodb_lock_wait_timeout = 1") + tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1")) + start = time.Now() + _, err = tk2.Exec("delete from tk where c1 = 2") + c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big + c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + + tk2.MustExec("commit") + tk3.MustExec("commit") + tk4.MustExec("commit") + + tk.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 50")) + tk.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 3")) // tk4 update commit work, tk2 delete should be rollbacked + + // test stmtRollBack caused by timeout but not the whole transaction + tk2.MustExec("begin pessimistic") + tk2.MustExec("update tk set c2 = c2 + 2 where c1 = 2") // tk2 lock succ c1 = 2 by update + tk2.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 5")) // tk2 update c2 succ + + tk3.MustExec("begin pessimistic") + tk3.MustExec("select * from tk where c1 = 3 for update") // tk3 lock c1 = 3 succ + + start = time.Now() + _, err = tk2.Exec("delete from tk where c1 = 3") // tk2 tries to lock c1 = 3 fail, this delete should be rollback, but previous update should be keeped + c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond)) + c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big + c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error()) + + tk2.MustExec("commit") + tk3.MustExec("commit") + + // tk2 update succ, tk2 delete should fail + tk.MustQuery(`select * from tk`).Check(testkit.Rows("1 1", "2 5", "3 3", "4 4", "5 5")) + + // clean + tk.MustExec("drop table if exists tk") +} diff --git a/session/session.go b/session/session.go index a68aa8ed4c131..f3062dd85a800 100644 --- a/session/session.go +++ b/session/session.go @@ -1673,6 +1673,7 @@ var builtinGlobalVariable = []string{ variable.InteractiveTimeout, variable.MaxPreparedStmtCount, variable.MaxExecutionTime, + variable.InnodbLockWaitTimeout, /* TiDB specific global variables: */ variable.TiDBSkipUTF8Check, variable.TiDBIndexJoinBatchSize, diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 349d0762c073d..0e37a9f5e1771 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -407,6 +407,10 @@ type SessionVars struct { // AllowRemoveAutoInc indicates whether a user can drop the auto_increment column attribute or not. AllowRemoveAutoInc bool + + // LockWaitTimeout is the duration waiting for pessimistic lock in milliseconds + // negative value means nowait, 0 means default behavior, others means actual wait time + LockWaitTimeout int64 } // PreparedParams contains the parameters of the current prepared statement when executing it. @@ -470,6 +474,7 @@ func NewSessionVars() *SessionVars { WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish, WaitSplitRegionTimeout: DefWaitSplitRegionTimeout, AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc, + LockWaitTimeout: DefInnodbLockWaitTimeout * 1000, } vars.Concurrency = Concurrency{ IndexLookupConcurrency: DefIndexLookupConcurrency, @@ -717,6 +722,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { case MaxExecutionTime: timeoutMS := tidbOptPositiveInt32(val, 0) s.MaxExecutionTime = uint64(timeoutMS) + case InnodbLockWaitTimeout: + lockWaitSec := tidbOptInt64(val, DefInnodbLockWaitTimeout) + s.LockWaitTimeout = int64(lockWaitSec * 1000) case TiDBSkipUTF8Check: s.SkipUTF8Check = TiDBOptOn(val) case TiDBOptAggPushDown: diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index a6996b154349b..381abbc14598c 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -570,7 +570,7 @@ var defaultSysVars = []*SysVar{ {ScopeNone, "basedir", "/usr/local/mysql"}, {ScopeGlobal, "innodb_old_blocks_time", "1000"}, {ScopeGlobal, "innodb_stats_method", "nulls_equal"}, - {ScopeGlobal | ScopeSession, InnodbLockWaitTimeout, "50"}, + {ScopeGlobal | ScopeSession, InnodbLockWaitTimeout, strconv.FormatInt(DefInnodbLockWaitTimeout, 10)}, {ScopeGlobal, LocalInFile, "1"}, {ScopeGlobal | ScopeSession, "myisam_stats_method", "nulls_unequal"}, {ScopeNone, "version_compile_os", "osx10.8"}, diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index 3d22e952775d3..8e7b65e65bcf0 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -359,6 +359,7 @@ const ( DefTiDBExpensiveQueryTimeThreshold = 60 // 60s DefWaitSplitRegionTimeout = 300 // 300s DefTiDBAllowRemoveAutoInc = false + DefInnodbLockWaitTimeout = 50 // 50s ) // Process global variables. diff --git a/store/mockstore/mocktikv/errors.go b/store/mockstore/mocktikv/errors.go index 8f15c0f689b16..54e1e4b8720f0 100644 --- a/store/mockstore/mocktikv/errors.go +++ b/store/mockstore/mocktikv/errors.go @@ -13,16 +13,21 @@ package mocktikv -import "fmt" +import ( + "fmt" + + "github.com/pingcap/kvproto/pkg/kvrpcpb" +) // ErrLocked is returned when trying to Read/Write on a locked key. Client should // backoff or cleanup the lock then retry. type ErrLocked struct { - Key MvccKey - Primary []byte - StartTS uint64 - TTL uint64 - TxnSize uint64 + Key MvccKey + Primary []byte + StartTS uint64 + TTL uint64 + TxnSize uint64 + LockType kvrpcpb.Op } // Error formats the lock to a string. diff --git a/store/mockstore/mocktikv/mvcc.go b/store/mockstore/mocktikv/mvcc.go index 543e6f4f92218..8bff6cfb473c8 100644 --- a/store/mockstore/mocktikv/mvcc.go +++ b/store/mockstore/mocktikv/mvcc.go @@ -196,11 +196,12 @@ func newEntry(key MvccKey) *mvccEntry { // Note that parameter key is raw key, while key in ErrLocked is mvcc key. func (l *mvccLock) lockErr(key []byte) error { return &ErrLocked{ - Key: mvccEncode(key, lockVer), - Primary: l.primary, - StartTS: l.startTS, - TTL: l.ttl, - TxnSize: l.txnSize, + Key: mvccEncode(key, lockVer), + Primary: l.primary, + StartTS: l.startTS, + TTL: l.ttl, + TxnSize: l.txnSize, + LockType: l.op, } } diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index f566cae797e05..7bc7e24eab0e7 100755 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -59,6 +59,7 @@ func convertToKeyError(err error) *kvrpcpb.KeyError { LockVersion: locked.StartTS, LockTtl: locked.TTL, TxnSize: locked.TxnSize, + LockType: locked.LockType, }, } } @@ -311,8 +312,10 @@ func (h *rpcHandler) handleKvPessimisticLock(req *kvrpcpb.PessimisticLockRequest h.cluster.handleDelay(startTS, regionID) errs := h.mvccStore.PessimisticLock(req.Mutations, req.PrimaryLock, req.GetStartVersion(), req.GetForUpdateTs(), req.GetLockTtl()) - // TODO: remove this when implement sever side wait. - h.simulateServerSideWaitLock(errs) + if req.WaitTimeout == kv.LockAlwaysWait { + // TODO: remove this when implement sever side wait. + h.simulateServerSideWaitLock(errs) + } return &kvrpcpb.PessimisticLockResponse{ Errors: convertToKeyErrors(errs), } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 1e8b710c7c504..c5de475de27d1 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -49,7 +49,10 @@ type twoPhaseCommitAction interface { type actionPrewrite struct{} type actionCommit struct{} type actionCleanup struct{} -type actionPessimisticLock struct{ killed *uint32 } +type actionPessimisticLock struct { + killed *uint32 + lockWaitTime int64 +} type actionPessimisticRollback struct{} var ( @@ -736,13 +739,24 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * ForUpdateTs: c.forUpdateTS, LockTtl: c.pessimisticTTL, IsFirstLock: c.isFirstLock, + WaitTimeout: action.lockWaitTime, }, Context: pb.Context{ Priority: c.priority, SyncLog: c.syncLog, }, } + lockWaitStartTime := time.Now() for { + // if lockWaitTime set, refine the request `WaitTimeout` field based on timeout limit + if action.lockWaitTime > 0 { + timeLeft := action.lockWaitTime - (time.Since(lockWaitStartTime)).Milliseconds() + if timeLeft <= 0 { + req.PessimisticLock.WaitTimeout = kv.LockNoWait + } else { + req.PessimisticLock.WaitTimeout = timeLeft + } + } resp, err := c.store.SendReq(bo, req, batch.region, readTimeoutShort) if err != nil { return errors.Trace(err) @@ -756,7 +770,7 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * if err != nil { return errors.Trace(err) } - err = c.pessimisticLockKeys(bo, action.killed, batch.keys) + err = c.pessimisticLockKeys(bo, action.killed, action.lockWaitTime, batch.keys) return errors.Trace(err) } lockResp := resp.PessimisticLock @@ -787,6 +801,27 @@ func (action actionPessimisticLock) handleSingleBatch(c *twoPhaseCommitter, bo * if err1 != nil { return errors.Trace(err1) } + // Check lock conflict error for nowait, if nowait set and key locked by others, + // report error immediately and do no more resolve locks. + // if the lock left behind whose related txn is already committed or rollbacked, + // (eg secondary locks not committed or rollbacked yet) + // we cant return "nowait conflict" directly + if lock.LockType == pb.Op_PessimisticLock { + if action.lockWaitTime == kv.LockNoWait { + // 3.0 release not supported yet + return kv.ErrNotImplemented + } else if action.lockWaitTime == kv.LockAlwaysWait { + // do nothing but keep wait + } else { + // the lockWaitTime is set, check the lock wait timeout or not + // the pessimistic lock found could be invalid locks which is timeout but not recycled yet + if !c.store.oracle.IsExpired(lock.TxnID, lock.TTL) { + if time.Since(lockWaitStartTime).Milliseconds() >= action.lockWaitTime { + return ErrLockWaitTimeout + } + } + } + } locks = append(locks, lock) } // Because we already waited on tikv, no need to Backoff here. @@ -1024,8 +1059,8 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { return c.doActionOnKeys(bo, actionCleanup{}, keys) } -func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, killed *uint32, keys [][]byte) error { - return c.doActionOnKeys(bo, actionPessimisticLock{killed}, keys) +func (c *twoPhaseCommitter) pessimisticLockKeys(bo *Backoffer, killed *uint32, lockWaitTime int64, keys [][]byte) error { + return c.doActionOnKeys(bo, actionPessimisticLock{killed, lockWaitTime}, keys) } func (c *twoPhaseCommitter) pessimisticRollbackKeys(bo *Backoffer, keys [][]byte) error { diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 7ecb6bb1994ba..24c39c3b77775 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -507,7 +507,7 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) { _, _ = txn.us.Get(key) c.Assert(txn.Set(key, key), IsNil) txn.DelOption(kv.PresumeKeyNotExists) - err := txn.LockKeys(context.Background(), nil, txn.startTS, key) + err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key) c.Assert(err, NotNil) c.Assert(txn.Delete(key), IsNil) key2 := kv.Key("key2") @@ -519,9 +519,9 @@ func (s *testCommitterSuite) TestUnsetPrimaryKey(c *C) { func (s *testCommitterSuite) TestPessimisticLockedKeysDedup(c *C) { txn := s.begin(c) txn.SetOption(kv.Pessimistic, true) - err := txn.LockKeys(context.Background(), nil, 100, kv.Key("abc"), kv.Key("def")) + err := txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, kv.Key("abc"), kv.Key("def")) c.Assert(err, IsNil) - err = txn.LockKeys(context.Background(), nil, 100, kv.Key("abc"), kv.Key("def")) + err = txn.LockKeys(context.Background(), nil, 100, kv.LockAlwaysWait, kv.Key("abc"), kv.Key("def")) c.Assert(err, IsNil) c.Assert(txn.lockKeys, HasLen, 2) } @@ -531,11 +531,11 @@ func (s *testCommitterSuite) TestPessimisticTTL(c *C) { txn := s.begin(c) txn.SetOption(kv.Pessimistic, true) time.Sleep(time.Millisecond * 100) - err := txn.LockKeys(context.Background(), nil, txn.startTS, key) + err := txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key) c.Assert(err, IsNil) time.Sleep(time.Millisecond * 100) key2 := kv.Key("key2") - err = txn.LockKeys(context.Background(), nil, txn.startTS, key2) + err = txn.LockKeys(context.Background(), nil, txn.startTS, kv.LockAlwaysWait, key2) c.Assert(err, IsNil) lockInfo := s.getLockInfo(c, key) elapsedTTL := lockInfo.LockTtl - PessimisticLockTTL diff --git a/store/tikv/error.go b/store/tikv/error.go index 574e460454912..1e66e1eaafc24 100644 --- a/store/tikv/error.go +++ b/store/tikv/error.go @@ -39,6 +39,7 @@ var ( ErrTiKVServerBusy = terror.ClassTiKV.New(mysql.ErrTiKVServerBusy, mysql.MySQLErrName[mysql.ErrTiKVServerBusy]) ErrGCTooEarly = terror.ClassTiKV.New(mysql.ErrGCTooEarly, mysql.MySQLErrName[mysql.ErrGCTooEarly]) ErrQueryInterrupted = terror.ClassTiKV.New(mysql.ErrQueryInterrupted, mysql.MySQLErrName[mysql.ErrQueryInterrupted]) + ErrLockWaitTimeout = terror.ClassTiKV.New(mysql.ErrLockWaitTimeout, mysql.MySQLErrName[mysql.ErrLockWaitTimeout]) ) // ErrDeadlock wraps *kvrpcpb.Deadlock to implement the error interface. @@ -62,6 +63,7 @@ func init() { mysql.ErrGCTooEarly: mysql.ErrGCTooEarly, mysql.ErrTruncatedWrongValue: mysql.ErrTruncatedWrongValue, mysql.ErrQueryInterrupted: mysql.ErrQueryInterrupted, + mysql.ErrLockWaitTimeout: mysql.ErrLockWaitTimeout, } terror.ErrClassToMySQLCodes[terror.ClassTiKV] = tikvMySQLErrCodes } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 935060ff72d88..9f1613cf82c8d 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -128,11 +128,12 @@ var ttlFactor = 6000 // Lock represents a lock from tikv server. type Lock struct { - Key []byte - Primary []byte - TxnID uint64 - TTL uint64 - TxnSize uint64 + Key []byte + Primary []byte + TxnID uint64 + TTL uint64 + TxnSize uint64 + LockType kvrpcpb.Op } func (l *Lock) String() string { @@ -142,11 +143,12 @@ func (l *Lock) String() string { // NewLock creates a new *Lock. func NewLock(l *kvrpcpb.LockInfo) *Lock { return &Lock{ - Key: l.GetKey(), - Primary: l.GetPrimaryLock(), - TxnID: l.GetLockVersion(), - TTL: l.GetLockTtl(), - TxnSize: l.GetTxnSize(), + Key: l.GetKey(), + Primary: l.GetPrimaryLock(), + TxnID: l.GetLockVersion(), + TTL: l.GetLockTtl(), + TxnSize: l.GetTxnSize(), + LockType: l.LockType, } } diff --git a/store/tikv/ticlient_test.go b/store/tikv/ticlient_test.go index ddbbe3e610f0b..a6869cec71004 100644 --- a/store/tikv/ticlient_test.go +++ b/store/tikv/ticlient_test.go @@ -119,7 +119,7 @@ func (s *testTiclientSuite) TestSingleKey(c *C) { txn := s.beginTxn(c) err := txn.Set(encodeKey(s.prefix, "key"), []byte("value")) c.Assert(err, IsNil) - err = txn.LockKeys(context.Background(), nil, 0, encodeKey(s.prefix, "key")) + err = txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, encodeKey(s.prefix, "key")) c.Assert(err, IsNil) err = txn.Commit(context.Background()) c.Assert(err, IsNil) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index cd87396cb867c..2819e1bd9e4c8 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -353,7 +353,8 @@ func (txn *tikvTxn) rollbackPessimisticLocks() error { return txn.committer.pessimisticRollbackKeys(NewBackoffer(context.Background(), cleanupMaxBackoff), txn.lockKeys) } -func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, keysInput ...kv.Key) error { +func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, + lockWaitTime int64, keysInput ...kv.Key) error { // Exclude keys that are already locked. keys := make([][]byte, 0, len(keysInput)) txn.mu.Lock() @@ -397,7 +398,7 @@ func (txn *tikvTxn) LockKeys(ctx context.Context, killed *uint32, forUpdateTS ui // If the number of keys greater than 1, it can be on different region, // concurrently execute on multiple regions may lead to deadlock. txn.committer.isFirstLock = len(txn.lockKeys) == 0 && len(keys) == 1 - err := txn.committer.pessimisticLockKeys(bo, killed, keys) + err := txn.committer.pessimisticLockKeys(bo, killed, lockWaitTime, keys) if killed != nil { // If the kill signal is received during waiting for pessimisticLock, // pessimisticLockKeys would handle the error but it doesn't reset the flag.