Skip to content

Commit

Permalink
executor: support innodb_lock_wait_timeout for pessimistic transaction (
Browse files Browse the repository at this point in the history
  • Loading branch information
cfzjywxk authored and sre-bot committed Nov 9, 2019
1 parent f89feef commit a870c23
Show file tree
Hide file tree
Showing 25 changed files with 219 additions and 61 deletions.
2 changes: 1 addition & 1 deletion ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ func (w *addIndexWorker) backfillIndexInTxn(handleRange reorgIndexTask) (taskCtx

// Lock the row key to notify us that someone delete or update the row,
// then we should not backfill the index of it, otherwise the adding index is redundant.
err := txn.LockKeys(context.Background(), nil, 0, idxRecord.key)
err := txn.LockKeys(context.Background(), nil, 0, kv.LockAlwaysWait, idxRecord.key)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,7 @@ func (a *ExecStmt) handlePessimisticDML(ctx context.Context, e Executor) error {
return nil
}
forUpdateTS := txnCtx.GetForUpdateTS()
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, keys...)
err = txn.LockKeys(ctx, &sctx.GetSessionVars().Killed, forUpdateTS, sctx.GetSessionVars().LockWaitTimeout, keys...)
if err == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion executor/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (e *RecoverIndexExec) backfillIndexInTxn(ctx context.Context, txn kv.Transa
}

recordKey := e.table.RecordKey(row.handle)
err := txn.LockKeys(ctx, nil, 0, recordKey)
err := txn.LockKeys(ctx, nil, 0, kv.LockAlwaysWait, recordKey)
if err != nil {
return result, err
}
Expand Down
7 changes: 4 additions & 3 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,18 +785,19 @@ func (e *SelectLockExec) Next(ctx context.Context, req *chunk.Chunk) error {
}
return nil
}
return doLockKeys(ctx, e.ctx, e.keys...)
lockWaitTime := e.ctx.GetSessionVars().LockWaitTimeout
return doLockKeys(ctx, e.ctx, lockWaitTime, e.keys...)
}

func doLockKeys(ctx context.Context, se sessionctx.Context, keys ...kv.Key) error {
func doLockKeys(ctx context.Context, se sessionctx.Context, lockWaitTime int64, keys ...kv.Key) error {
se.GetSessionVars().TxnCtx.ForUpdate = true
// Lock keys only once when finished fetching all results.
txn, err := se.Txn(true)
if err != nil {
return err
}
forUpdateTS := se.GetSessionVars().TxnCtx.GetForUpdateTS()
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, keys...)
return txn.LockKeys(ctx, &se.GetSessionVars().Killed, forUpdateTS, lockWaitTime, keys...)
}

// LimitExec represents limit executor
Expand Down
20 changes: 11 additions & 9 deletions executor/point_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
handle: p.Handle,
startTS: startTS,
lock: p.Lock,
lockWaitTime: p.LockWaitTime,
}
b.isSelectForUpdate = p.IsForUpdate
e.base().initCap = 1
Expand All @@ -55,14 +56,15 @@ func (b *executorBuilder) buildPointGet(p *plannercore.PointGetPlan) Executor {
type PointGetExecutor struct {
baseExecutor

tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
done bool
lock bool
tblInfo *model.TableInfo
handle int64
idxInfo *model.IndexInfo
idxVals []types.Datum
startTS uint64
snapshot kv.Snapshot
done bool
lock bool
lockWaitTime int64
}

// Open implements the Executor interface.
Expand Down Expand Up @@ -145,7 +147,7 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error {

func (e *PointGetExecutor) lockKeyIfNeeded(ctx context.Context, key []byte) error {
if e.lock {
return doLockKeys(ctx, e.ctx, key)
return doLockKeys(ctx, e.ctx, e.lockWaitTime, key)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ require (
github.com/pingcap/errors v0.11.4
github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18
github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20191018040038-555b97093a2a
github.com/pingcap/pd v1.1.0-beta.0.20190912093418-dc03c839debd
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ github.com/pingcap/failpoint v0.0.0-20190512135322-30cc7431d99c/go.mod h1:DNS3Qg
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rGrobssy1nVy2VaVpNCuLpCbr+FEaTA8=
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
github.com/pingcap/kvproto v0.0.0-20190516013202-4cf58ad90b6c/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18 h1:5vQV8S/8B9nE+I+0Me6vZGyASeXl/QymwqtaOL5e5ZA=
github.com/pingcap/kvproto v0.0.0-20190918085321-44e3817e1f18/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d h1:zTHgLr8+0LTEJmjf8yHilgmNhdrVlCN/RW7NeO8IRsE=
github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20191018040038-555b97093a2a h1:PMjYrxWKdVUlJ77+9YHbYVciDQCyqZ/noS9nIni76KQ=
Expand Down
10 changes: 9 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ type Transaction interface {
// String implements fmt.Stringer interface.
String() string
// LockKeys tries to lock the entries with the keys in KV store.
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, keys ...Key) error
LockKeys(ctx context.Context, killed *uint32, forUpdateTS uint64, lockWaitTime int64, keys ...Key) error
// SetOption sets an option with a value, when val is nil, uses the default
// value of this option.
SetOption(opt Option, val interface{})
Expand Down Expand Up @@ -304,3 +304,11 @@ type SplitableStore interface {
WaitScatterRegionFinish(regionID uint64, backOff int) error
CheckRegionInScattering(regionID uint64) (bool, error)
}

// Used for pessimistic lock wait time
// these two constants are special for lock protocol with tikv
// 0 means always wait, -1 means nowait, others meaning lock wait in milliseconds
var (
LockAlwaysWait = int64(0)
LockNoWait = int64(-1)
)
2 changes: 1 addition & 1 deletion kv/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (t *mockTxn) String() string {
return ""
}

func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ ...Key) error {
func (t *mockTxn) LockKeys(_ context.Context, _ *uint32, _ uint64, _ int64, _ ...Key) error {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion kv/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (s testMockSuite) TestInterface(c *C) {

transaction, err := storage.Begin()
c.Check(err, IsNil)
err = transaction.LockKeys(context.Background(), nil, 0, Key("lock"))
err = transaction.LockKeys(context.Background(), nil, 0, LockAlwaysWait, Key("lock"))
c.Check(err, IsNil)
transaction.SetOption(Option(23), struct{}{})
if mock, ok := transaction.(*mockTxn); ok {
Expand Down
11 changes: 7 additions & 4 deletions planner/core/point_get_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type PointGetPlan struct {
IsTableDual bool
Lock bool
IsForUpdate bool
LockWaitTime int64
}

type nameValuePair struct {
Expand Down Expand Up @@ -161,6 +162,7 @@ func TryFastPlan(ctx sessionctx.Context, node ast.Node) Plan {
if !sessVars.IsAutocommit() || sessVars.InTxn() {
fp.Lock = true
fp.IsForUpdate = true
fp.LockWaitTime = sessVars.LockWaitTimeout
}
}
return fp
Expand Down Expand Up @@ -284,10 +286,11 @@ func tryPointGetPlan(ctx sessionctx.Context, selStmt *ast.SelectStmt) *PointGetP

func newPointGetPlan(ctx sessionctx.Context, dbName string, schema *expression.Schema, tbl *model.TableInfo) *PointGetPlan {
p := &PointGetPlan{
basePlan: newBasePlan(ctx, plancodec.TypePointGet),
dbName: dbName,
schema: schema,
TblInfo: tbl,
basePlan: newBasePlan(ctx, plancodec.TypePointGet),
dbName: dbName,
schema: schema,
TblInfo: tbl,
LockWaitTime: ctx.GetSessionVars().LockWaitTimeout,
}
ctx.GetSessionVars().StmtCtx.Tables = []stmtctx.TableEntry{{DB: ctx.GetSessionVars().CurrentDB, Table: tbl.Name.L}}
return p
Expand Down
85 changes: 85 additions & 0 deletions session/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,88 @@ func (s *testPessimisticSuite) TestWaitLockKill(c *C) {
c.Assert(terror.ErrorEqual(err, tikv.ErrQueryInterrupted), IsTrue)
tk.MustExec("rollback")
}

func (s *testPessimisticSuite) TestInnodbLockWaitTimeout(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists tk")
tk.MustExec("create table tk (c1 int primary key, c2 int)")
tk.MustExec("insert into tk values(1,1),(2,2),(3,3),(4,4),(5,5)")
// tk set global
tk.MustExec("set global innodb_lock_wait_timeout = 3")
tk.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 50"))

tk2 := testkit.NewTestKitWithInit(c, s.store)
tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3"))
tk2.MustExec("set innodb_lock_wait_timeout = 2")
tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 2"))

tk3 := testkit.NewTestKitWithInit(c, s.store)
tk3.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3"))
tk3.MustExec("set innodb_lock_wait_timeout = 1")
tk3.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1"))

tk2.MustExec("set @@autocommit = 0")
tk3.MustExec("set @@autocommit = 0")

tk4 := testkit.NewTestKitWithInit(c, s.store)
tk4.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 3"))
tk4.MustExec("set @@autocommit = 0")

// tk2 lock c1 = 1
tk2.MustExec("begin pessimistic")
tk2.MustExec("select * from tk where c1 = 1 for update") // lock succ c1 = 1

// tk3 try lock c1 = 1 timeout 1sec
tk3.MustExec("begin pessimistic")
start := time.Now()
_, err := tk3.Exec("select * from tk where c1 = 1 for update")
c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error())

tk4.MustExec("begin pessimistic")
tk4.MustExec("update tk set c2 = c2 + 1 where c1 = 2") // lock succ c1 = 2 by update
start = time.Now()
_, err = tk2.Exec("update tk set c2 = c2 - 1 where c1 = 2")
c.Check(time.Since(start), GreaterEqual, time.Duration(2000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(2100*time.Millisecond)) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error())

tk2.MustExec("set innodb_lock_wait_timeout = 1")
tk2.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 1"))
start = time.Now()
_, err = tk2.Exec("delete from tk where c1 = 2")
c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error())

tk2.MustExec("commit")
tk3.MustExec("commit")
tk4.MustExec("commit")

tk.MustQuery(`show variables like "innodb_lock_wait_timeout"`).Check(testkit.Rows("innodb_lock_wait_timeout 50"))
tk.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 3")) // tk4 update commit work, tk2 delete should be rollbacked

// test stmtRollBack caused by timeout but not the whole transaction
tk2.MustExec("begin pessimistic")
tk2.MustExec("update tk set c2 = c2 + 2 where c1 = 2") // tk2 lock succ c1 = 2 by update
tk2.MustQuery(`select * from tk where c1 = 2`).Check(testkit.Rows("2 5")) // tk2 update c2 succ

tk3.MustExec("begin pessimistic")
tk3.MustExec("select * from tk where c1 = 3 for update") // tk3 lock c1 = 3 succ

start = time.Now()
_, err = tk2.Exec("delete from tk where c1 = 3") // tk2 tries to lock c1 = 3 fail, this delete should be rollback, but previous update should be keeped
c.Check(time.Since(start), GreaterEqual, time.Duration(1000*time.Millisecond))
c.Check(time.Since(start), LessEqual, time.Duration(1100*time.Millisecond)) // unit test diff should not be too big
c.Check(err.Error(), Equals, tikv.ErrLockWaitTimeout.Error())

tk2.MustExec("commit")
tk3.MustExec("commit")

// tk2 update succ, tk2 delete should fail
tk.MustQuery(`select * from tk`).Check(testkit.Rows("1 1", "2 5", "3 3", "4 4", "5 5"))

// clean
tk.MustExec("drop table if exists tk")
}
1 change: 1 addition & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1673,6 +1673,7 @@ var builtinGlobalVariable = []string{
variable.InteractiveTimeout,
variable.MaxPreparedStmtCount,
variable.MaxExecutionTime,
variable.InnodbLockWaitTimeout,
/* TiDB specific global variables: */
variable.TiDBSkipUTF8Check,
variable.TiDBIndexJoinBatchSize,
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,10 @@ type SessionVars struct {

// AllowRemoveAutoInc indicates whether a user can drop the auto_increment column attribute or not.
AllowRemoveAutoInc bool

// LockWaitTimeout is the duration waiting for pessimistic lock in milliseconds
// negative value means nowait, 0 means default behavior, others means actual wait time
LockWaitTimeout int64
}

// PreparedParams contains the parameters of the current prepared statement when executing it.
Expand Down Expand Up @@ -470,6 +474,7 @@ func NewSessionVars() *SessionVars {
WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish,
WaitSplitRegionTimeout: DefWaitSplitRegionTimeout,
AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc,
LockWaitTimeout: DefInnodbLockWaitTimeout * 1000,
}
vars.Concurrency = Concurrency{
IndexLookupConcurrency: DefIndexLookupConcurrency,
Expand Down Expand Up @@ -717,6 +722,9 @@ func (s *SessionVars) SetSystemVar(name string, val string) error {
case MaxExecutionTime:
timeoutMS := tidbOptPositiveInt32(val, 0)
s.MaxExecutionTime = uint64(timeoutMS)
case InnodbLockWaitTimeout:
lockWaitSec := tidbOptInt64(val, DefInnodbLockWaitTimeout)
s.LockWaitTimeout = int64(lockWaitSec * 1000)
case TiDBSkipUTF8Check:
s.SkipUTF8Check = TiDBOptOn(val)
case TiDBOptAggPushDown:
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ var defaultSysVars = []*SysVar{
{ScopeNone, "basedir", "/usr/local/mysql"},
{ScopeGlobal, "innodb_old_blocks_time", "1000"},
{ScopeGlobal, "innodb_stats_method", "nulls_equal"},
{ScopeGlobal | ScopeSession, InnodbLockWaitTimeout, "50"},
{ScopeGlobal | ScopeSession, InnodbLockWaitTimeout, strconv.FormatInt(DefInnodbLockWaitTimeout, 10)},
{ScopeGlobal, LocalInFile, "1"},
{ScopeGlobal | ScopeSession, "myisam_stats_method", "nulls_unequal"},
{ScopeNone, "version_compile_os", "osx10.8"},
Expand Down
1 change: 1 addition & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ const (
DefTiDBExpensiveQueryTimeThreshold = 60 // 60s
DefWaitSplitRegionTimeout = 300 // 300s
DefTiDBAllowRemoveAutoInc = false
DefInnodbLockWaitTimeout = 50 // 50s
)

// Process global variables.
Expand Down
17 changes: 11 additions & 6 deletions store/mockstore/mocktikv/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,21 @@

package mocktikv

import "fmt"
import (
"fmt"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
)

// ErrLocked is returned when trying to Read/Write on a locked key. Client should
// backoff or cleanup the lock then retry.
type ErrLocked struct {
Key MvccKey
Primary []byte
StartTS uint64
TTL uint64
TxnSize uint64
Key MvccKey
Primary []byte
StartTS uint64
TTL uint64
TxnSize uint64
LockType kvrpcpb.Op
}

// Error formats the lock to a string.
Expand Down
11 changes: 6 additions & 5 deletions store/mockstore/mocktikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,12 @@ func newEntry(key MvccKey) *mvccEntry {
// Note that parameter key is raw key, while key in ErrLocked is mvcc key.
func (l *mvccLock) lockErr(key []byte) error {
return &ErrLocked{
Key: mvccEncode(key, lockVer),
Primary: l.primary,
StartTS: l.startTS,
TTL: l.ttl,
TxnSize: l.txnSize,
Key: mvccEncode(key, lockVer),
Primary: l.primary,
StartTS: l.startTS,
TTL: l.ttl,
TxnSize: l.txnSize,
LockType: l.op,
}
}

Expand Down
7 changes: 5 additions & 2 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func convertToKeyError(err error) *kvrpcpb.KeyError {
LockVersion: locked.StartTS,
LockTtl: locked.TTL,
TxnSize: locked.TxnSize,
LockType: locked.LockType,
},
}
}
Expand Down Expand Up @@ -311,8 +312,10 @@ func (h *rpcHandler) handleKvPessimisticLock(req *kvrpcpb.PessimisticLockRequest
h.cluster.handleDelay(startTS, regionID)
errs := h.mvccStore.PessimisticLock(req.Mutations, req.PrimaryLock, req.GetStartVersion(), req.GetForUpdateTs(), req.GetLockTtl())

// TODO: remove this when implement sever side wait.
h.simulateServerSideWaitLock(errs)
if req.WaitTimeout == kv.LockAlwaysWait {
// TODO: remove this when implement sever side wait.
h.simulateServerSideWaitLock(errs)
}
return &kvrpcpb.PessimisticLockResponse{
Errors: convertToKeyErrors(errs),
}
Expand Down
Loading

0 comments on commit a870c23

Please sign in to comment.