diff --git a/executor/executor_txn_test.go b/executor/executor_txn_test.go index d2bd8f862830e..32bd8d738167a 100644 --- a/executor/executor_txn_test.go +++ b/executor/executor_txn_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/testkit" @@ -798,3 +799,40 @@ func (m mockPumpClient) WriteBinlog(ctx context.Context, in *binlog.WriteBinlogR func (m mockPumpClient) PullBinlogs(ctx context.Context, in *binlog.PullBinlogReq, opts ...grpc.CallOption) (binlog.Pump_PullBinlogsClient, error) { return nil, nil } + +func TestInnodbLockWaitTimeout(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int auto_increment, k int,c varchar(255), unique index idx(id))") + tk.MustExec("insert into t (k,c) values (1,'abcdefg');") + for i := 0; i < 8; i++ { + tk.MustExec("insert into t (k,c) select k,c from t;") + } + tk.MustExec("update t set k= id, c = id") + tk.MustExec("split table t by (0), (50), (100);") + tk.MustExec("split table t index idx by (0), (50), (100);") + require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/tikv/pessimisticLockReturnWriteConflict", `return(true)`)) + defer func() { + require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/tikv/pessimisticLockReturnWriteConflict")) + }() + tk.MustExec("set @@innodb_lock_wait_timeout=1") + isolations := []string{"REPEATABLE READ", "READ COMMITTED"} + for _, isolation := range isolations { + tk.MustExec("SET SESSION TRANSACTION ISOLATION LEVEL " + isolation) + tk.MustExec("begin") + start := time.Now() + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + res, err := tk.ExecWithContext(ctx, "update t use index (idx) set k=k+1 where id >0;") + cancel() + if res != nil { + require.NoError(t, res.Close()) + } + require.Error(t, err) + msg := fmt.Sprintf("cost: %v", time.Since(start)) + require.Equal(t, "lock wait timeout", err.Error(), msg) + require.Less(t, time.Since(start), time.Second*2) + tk.MustExec("commit") + } +} diff --git a/sessiontxn/isolation/readcommitted.go b/sessiontxn/isolation/readcommitted.go index 510f6b407c9a7..ee8e89df798f2 100644 --- a/sessiontxn/isolation/readcommitted.go +++ b/sessiontxn/isolation/readcommitted.go @@ -230,6 +230,11 @@ func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(ctx co return sessiontxn.ErrorAction(err) } } else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) { + sessVars := p.sctx.GetSessionVars() + waitTime := time.Since(sessVars.StmtCtx.GetLockWaitStartTime()) + if waitTime.Milliseconds() >= sessVars.LockWaitTimeout { + return sessiontxn.ErrorAction(tikverr.ErrLockWaitTimeout) + } logutil.Logger(p.ctx).Debug("pessimistic write conflict, retry statement", zap.Uint64("txn", txnCtx.StartTS), zap.Uint64("forUpdateTS", txnCtx.GetForUpdateTS()), diff --git a/sessiontxn/isolation/repeatable_read.go b/sessiontxn/isolation/repeatable_read.go index e64a066d47d89..33003e198e773 100644 --- a/sessiontxn/isolation/repeatable_read.go +++ b/sessiontxn/isolation/repeatable_read.go @@ -257,6 +257,11 @@ func (p *PessimisticRRTxnContextProvider) handleAfterPessimisticLockError(ctx co return sessiontxn.ErrorAction(err) } } else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) { + waitTime := time.Since(sessVars.StmtCtx.GetLockWaitStartTime()) + if waitTime.Milliseconds() >= sessVars.LockWaitTimeout { + return sessiontxn.ErrorAction(tikverr.ErrLockWaitTimeout) + } + // Always update forUpdateTS by getting a new timestamp from PD. // If we use the conflict commitTS as the new forUpdateTS and async commit // is used, the commitTS of this transaction may exceed the max timestamp diff --git a/store/mockstore/unistore/tikv/BUILD.bazel b/store/mockstore/unistore/tikv/BUILD.bazel index 52578cacbc78a..9ccbe11969142 100644 --- a/store/mockstore/unistore/tikv/BUILD.bazel +++ b/store/mockstore/unistore/tikv/BUILD.bazel @@ -41,6 +41,7 @@ go_library( "@com_github_pingcap_badger//:badger", "@com_github_pingcap_badger//y", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/coprocessor", "@com_github_pingcap_kvproto//pkg/deadlock", "@com_github_pingcap_kvproto//pkg/errorpb", diff --git a/store/mockstore/unistore/tikv/server.go b/store/mockstore/unistore/tikv/server.go index bf8eb592dbae8..afe7647784348 100644 --- a/store/mockstore/unistore/tikv/server.go +++ b/store/mockstore/unistore/tikv/server.go @@ -21,6 +21,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" deadlockPb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/errorpb" @@ -193,6 +194,18 @@ func (svr *Server) KvScan(ctx context.Context, req *kvrpcpb.ScanRequest) (*kvrpc // KvPessimisticLock implements the tikvpb.TikvServer interface. func (svr *Server) KvPessimisticLock(ctx context.Context, req *kvrpcpb.PessimisticLockRequest) (*kvrpcpb.PessimisticLockResponse, error) { + failpoint.Inject("pessimisticLockReturnWriteConflict", func(val failpoint.Value) { + if val.(bool) { + time.Sleep(time.Millisecond * 100) + err := &kverrors.ErrConflict{ + StartTS: req.GetForUpdateTs(), + ConflictTS: req.GetForUpdateTs() + 1, + ConflictCommitTS: req.GetForUpdateTs() + 2, + } + failpoint.Return(&kvrpcpb.PessimisticLockResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil) + } + }) + reqCtx, err := newRequestCtx(svr, req.Context, "PessimisticLock") if err != nil { return &kvrpcpb.PessimisticLockResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil