Skip to content

Commit

Permalink
txn: fix issue innodb_lock_wait_timeout doesn't work in some case (#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 29, 2024
1 parent 41f1b9e commit 4af46a5
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 0 deletions.
38 changes: 38 additions & 0 deletions executor/executor_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/testkit"
Expand Down Expand Up @@ -798,3 +799,40 @@ func (m mockPumpClient) WriteBinlog(ctx context.Context, in *binlog.WriteBinlogR
func (m mockPumpClient) PullBinlogs(ctx context.Context, in *binlog.PullBinlogReq, opts ...grpc.CallOption) (binlog.Pump_PullBinlogsClient, error) {
return nil, nil
}

func TestInnodbLockWaitTimeout(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int auto_increment, k int,c varchar(255), unique index idx(id))")
tk.MustExec("insert into t (k,c) values (1,'abcdefg');")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t (k,c) select k,c from t;")
}
tk.MustExec("update t set k= id, c = id")
tk.MustExec("split table t by (0), (50), (100);")
tk.MustExec("split table t index idx by (0), (50), (100);")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/mockstore/unistore/tikv/pessimisticLockReturnWriteConflict", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/mockstore/unistore/tikv/pessimisticLockReturnWriteConflict"))
}()
tk.MustExec("set @@innodb_lock_wait_timeout=1")
isolations := []string{"REPEATABLE READ", "READ COMMITTED"}
for _, isolation := range isolations {
tk.MustExec("SET SESSION TRANSACTION ISOLATION LEVEL " + isolation)
tk.MustExec("begin")
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
res, err := tk.ExecWithContext(ctx, "update t use index (idx) set k=k+1 where id >0;")
cancel()
if res != nil {
require.NoError(t, res.Close())
}
require.Error(t, err)
msg := fmt.Sprintf("cost: %v", time.Since(start))
require.Equal(t, "lock wait timeout", err.Error(), msg)
require.Less(t, time.Since(start), time.Second*2)
tk.MustExec("commit")
}
}
5 changes: 5 additions & 0 deletions sessiontxn/isolation/readcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(ctx co
return sessiontxn.ErrorAction(err)
}
} else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
sessVars := p.sctx.GetSessionVars()
waitTime := time.Since(sessVars.StmtCtx.GetLockWaitStartTime())
if waitTime.Milliseconds() >= sessVars.LockWaitTimeout {
return sessiontxn.ErrorAction(tikverr.ErrLockWaitTimeout)
}
logutil.Logger(p.ctx).Debug("pessimistic write conflict, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("forUpdateTS", txnCtx.GetForUpdateTS()),
Expand Down
5 changes: 5 additions & 0 deletions sessiontxn/isolation/repeatable_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,11 @@ func (p *PessimisticRRTxnContextProvider) handleAfterPessimisticLockError(ctx co
return sessiontxn.ErrorAction(err)
}
} else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
waitTime := time.Since(sessVars.StmtCtx.GetLockWaitStartTime())
if waitTime.Milliseconds() >= sessVars.LockWaitTimeout {
return sessiontxn.ErrorAction(tikverr.ErrLockWaitTimeout)
}

// Always update forUpdateTS by getting a new timestamp from PD.
// If we use the conflict commitTS as the new forUpdateTS and async commit
// is used, the commitTS of this transaction may exceed the max timestamp
Expand Down
1 change: 1 addition & 0 deletions store/mockstore/unistore/tikv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_library(
"@com_github_pingcap_badger//:badger",
"@com_github_pingcap_badger//y",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/deadlock",
"@com_github_pingcap_kvproto//pkg/errorpb",
Expand Down
13 changes: 13 additions & 0 deletions store/mockstore/unistore/tikv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
deadlockPb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/errorpb"
Expand Down Expand Up @@ -193,6 +194,18 @@ func (svr *Server) KvScan(ctx context.Context, req *kvrpcpb.ScanRequest) (*kvrpc

// KvPessimisticLock implements the tikvpb.TikvServer interface.
func (svr *Server) KvPessimisticLock(ctx context.Context, req *kvrpcpb.PessimisticLockRequest) (*kvrpcpb.PessimisticLockResponse, error) {
failpoint.Inject("pessimisticLockReturnWriteConflict", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Millisecond * 100)
err := &kverrors.ErrConflict{
StartTS: req.GetForUpdateTs(),
ConflictTS: req.GetForUpdateTs() + 1,
ConflictCommitTS: req.GetForUpdateTs() + 2,
}
failpoint.Return(&kvrpcpb.PessimisticLockResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil)
}
})

reqCtx, err := newRequestCtx(svr, req.Context, "PessimisticLock")
if err != nil {
return &kvrpcpb.PessimisticLockResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil
Expand Down

0 comments on commit 4af46a5

Please sign in to comment.