Skip to content

Commit

Permalink
txn: fix issue innodb_lock_wait_timeout doesn't work in some case (#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Oct 29, 2024
1 parent d330ab2 commit 9913e0d
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 1 deletion.
3 changes: 2 additions & 1 deletion pkg/executor/test/txn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@ go_test(
"txn_test.go",
],
flaky = True,
shard_count = 9,
shard_count = 10,
deps = [
"//pkg/config",
"//pkg/errno",
"//pkg/meta/autoid",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@io_opencensus_go//stats/view",
Expand Down
39 changes: 39 additions & 0 deletions pkg/executor/test/txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package txn_test

import (
"context"
"fmt"
"strconv"
"strings"
"sync"
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
Expand Down Expand Up @@ -706,3 +708,40 @@ func TestColumnNotMatchError(t *testing.T) {
tk.MustExec("delete from t where id=1")
tk.MustGetErrCode("commit", errno.ErrInfoSchemaChanged)
}

func TestInnodbLockWaitTimeout(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (id int auto_increment, k int,c varchar(255), unique index idx(id))")
tk.MustExec("insert into t (k,c) values (1,'abcdefg');")
for i := 0; i < 8; i++ {
tk.MustExec("insert into t (k,c) select k,c from t;")
}
tk.MustExec("update t set k= id, c = id")
tk.MustExec("split table t by (0), (50), (100);")
tk.MustExec("split table t index idx by (0), (50), (100);")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/tikv/pessimisticLockReturnWriteConflict", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/store/mockstore/unistore/tikv/pessimisticLockReturnWriteConflict"))
}()
tk.MustExec("set @@innodb_lock_wait_timeout=1")
isolations := []string{"REPEATABLE READ", "READ COMMITTED"}
for _, isolation := range isolations {
tk.MustExec("SET SESSION TRANSACTION ISOLATION LEVEL " + isolation)
tk.MustExec("begin")
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
res, err := tk.ExecWithContext(ctx, "update t use index (idx) set k=k+1 where id >0;")
cancel()
if res != nil {
require.NoError(t, res.Close())
}
require.Error(t, err)
msg := fmt.Sprintf("cost: %v", time.Since(start))
require.Equal(t, "lock wait timeout", err.Error(), msg)
require.Less(t, time.Since(start), time.Second*2)
tk.MustExec("commit")
}
}
5 changes: 5 additions & 0 deletions pkg/sessiontxn/isolation/readcommitted.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,11 @@ func (p *PessimisticRCTxnContextProvider) handleAfterPessimisticLockError(ctx co
return sessiontxn.ErrorAction(err)
}
} else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
sessVars := p.sctx.GetSessionVars()
waitTime := time.Since(sessVars.StmtCtx.GetLockWaitStartTime())
if waitTime.Milliseconds() >= sessVars.LockWaitTimeout {
return sessiontxn.ErrorAction(tikverr.ErrLockWaitTimeout)
}
logutil.Logger(p.ctx).Debug("pessimistic write conflict, retry statement",
zap.Uint64("txn", txnCtx.StartTS),
zap.Uint64("forUpdateTS", txnCtx.GetForUpdateTS()),
Expand Down
5 changes: 5 additions & 0 deletions pkg/sessiontxn/isolation/repeatable_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ func (p *PessimisticRRTxnContextProvider) handleAfterPessimisticLockError(ctx co
return sessiontxn.ErrorAction(err)
}
} else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
waitTime := time.Since(sessVars.StmtCtx.GetLockWaitStartTime())
if waitTime.Milliseconds() >= sessVars.LockWaitTimeout {
return sessiontxn.ErrorAction(tikverr.ErrLockWaitTimeout)
}

// Always update forUpdateTS by getting a new timestamp from PD.
// If we use the conflict commitTS as the new forUpdateTS and async commit
// is used, the commitTS of this transaction may exceed the max timestamp
Expand Down
1 change: 1 addition & 0 deletions pkg/store/mockstore/unistore/tikv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ go_library(
"@com_github_pingcap_badger//:badger",
"@com_github_pingcap_badger//y",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_pingcap_kvproto//pkg/coprocessor",
"@com_github_pingcap_kvproto//pkg/deadlock",
"@com_github_pingcap_kvproto//pkg/errorpb",
Expand Down
13 changes: 13 additions & 0 deletions pkg/store/mockstore/unistore/tikv/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
deadlockPb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/errorpb"
Expand Down Expand Up @@ -193,6 +194,18 @@ func (svr *Server) KvScan(ctx context.Context, req *kvrpcpb.ScanRequest) (*kvrpc

// KvPessimisticLock implements the tikvpb.TikvServer interface.
func (svr *Server) KvPessimisticLock(ctx context.Context, req *kvrpcpb.PessimisticLockRequest) (*kvrpcpb.PessimisticLockResponse, error) {
failpoint.Inject("pessimisticLockReturnWriteConflict", func(val failpoint.Value) {
if val.(bool) {
time.Sleep(time.Millisecond * 100)
err := &kverrors.ErrConflict{
StartTS: req.GetForUpdateTs(),
ConflictTS: req.GetForUpdateTs() + 1,
ConflictCommitTS: req.GetForUpdateTs() + 2,
}
failpoint.Return(&kvrpcpb.PessimisticLockResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil)
}
})

reqCtx, err := newRequestCtx(svr, req.Context, "PessimisticLock")
if err != nil {
return &kvrpcpb.PessimisticLockResponse{Errors: []*kvrpcpb.KeyError{convertToKeyError(err)}}, nil
Expand Down

0 comments on commit 9913e0d

Please sign in to comment.