Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

txn: Update client-go to fix the issue that GC BatchResolveLcok may miss primary pessimistic locks (#45143) #45151

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,6 +972,14 @@ func (do *Domain) GetEtcdClient() *clientv3.Client {
return do.etcdClient
}

// GetPDClient returns the PD client.
func (do *Domain) GetPDClient() pd.Client {
if store, ok := do.store.(kv.StorageWithPD); ok {
return store.GetPDClient()
}
return nil
}

// LoadPrivilegeLoop create a goroutine loads privilege tables in a loop, it
// should be called only once in BootstrapSession.
func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/cockroachdb/pebble v0.0.0-20210719141320-8c3bd06debb5
github.com/coocood/freecache v1.2.1
github.com/coreos/go-semver v0.3.0
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect
github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548
github.com/cznic/sortutil v0.0.0-20181122101858-f5f958428db8
github.com/danjacques/gofslock v0.0.0-20191023191349-0a45f885bc37
github.com/dgraph-io/ristretto v0.1.1-0.20220403145359-8e850b710d6d
Expand Down Expand Up @@ -63,7 +63,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.1-0.20230630082913-bf42043d98b6
github.com/tikv/client-go/v2 v2.0.1-0.20230704072904-34f99fa358b6
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,8 @@ github.com/stretchr/testify v1.7.2-0.20220504104629-106ec21d14df/go.mod h1:6Fq8o
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
github.com/tikv/client-go/v2 v2.0.1-0.20230630082913-bf42043d98b6 h1:CWnu4ji9owlkK+E7B5X9rg6EiwTbyDJY96lspfDVYkg=
github.com/tikv/client-go/v2 v2.0.1-0.20230630082913-bf42043d98b6/go.mod h1:VTlli8fRRpcpISj9I2IqroQmcAFfaTyBquiRhofOcDs=
github.com/tikv/client-go/v2 v2.0.1-0.20230704072904-34f99fa358b6 h1:+RE7e74gaujHFwxfpZ+tRQxF7H1ekd7sYm/tkACq5OA=
github.com/tikv/client-go/v2 v2.0.1-0.20230704072904-34f99fa358b6/go.mod h1:VTlli8fRRpcpISj9I2IqroQmcAFfaTyBquiRhofOcDs=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710 h1:jxgmKOscXSjaFEKQGRyY5qOpK8hLqxs2irb/uDJMtwk=
github.com/tikv/pd/client v0.0.0-20220307081149-841fa61e9710/go.mod h1:AtvppPwkiyUgQlR1W9qSqfTB+OsOIu19jDCOxOsPkmU=
github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
Expand Down
7 changes: 1 addition & 6 deletions store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -1267,12 +1267,7 @@ func (store *MVCCStore) Cleanup(reqCtx *requestCtx, key []byte, startTS, current
func (store *MVCCStore) appendScannedLock(locks []*kvrpcpb.LockInfo, it *lockstore.Iterator, maxTS uint64) []*kvrpcpb.LockInfo {
lock := mvcc.DecodeLock(it.Value())
if lock.StartTS < maxTS {
locks = append(locks, &kvrpcpb.LockInfo{
PrimaryLock: lock.Primary,
LockVersion: lock.StartTS,
Key: safeCopy(it.Key()),
LockTtl: uint64(lock.TTL),
})
locks = append(locks, lock.ToLockInfo(append([]byte{}, it.Key()...)))
}
return locks
}
Expand Down
142 changes: 142 additions & 0 deletions tests/realtikvtest/pessimistictest/pessimistic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/auth"
Expand All @@ -37,6 +38,7 @@ import (
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
storeerr "github.com/pingcap/tidb/store/driver/error"
"github.com/pingcap/tidb/store/gcworker"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/testkit"
Expand All @@ -47,6 +49,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/testutils"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/txnkv/transaction"
)

Expand Down Expand Up @@ -3284,3 +3287,142 @@ func TestRCUpdateWithPointGet(t *testing.T) {
require.Equal(t, uint64(1), tk1.Session().AffectedRows())
tk1.MustExec("commit")
}

func mustTimeout[T interface{}](t *testing.T, ch <-chan T, timeout time.Duration) {
select {
case res := <-ch:
require.FailNow(t, fmt.Sprintf("received signal when not expected: %v", res))
case <-time.After(timeout):
}
}
func mustRecv[T interface{}](t *testing.T, ch <-chan T) T {
select {
case <-time.After(time.Second):
case res := <-ch:
return res
}
require.FailNow(t, "signal not received after waiting for one second")
panic("unreachable")
}

func mustLocked(t *testing.T, store kv.Storage, stmt string) {
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("begin pessimistic")
tk.MustGetErrCode(stmt, errno.ErrLockAcquireFailAndNoWaitSet)
tk.MustExec("rollback")
}

func TestBatchResolveLocks(t *testing.T) {
if !*realtikvtest.WithRealTiKV {
t.Skip("this test doesn't work due to some problem in unistore's implementation")
}

store, domain, clean := realtikvtest.CreateMockStoreAndDomainAndSetup(t)
defer clean()

if *realtikvtest.WithRealTiKV {
// Disable in-memory pessimistic lock since it cannot be scanned in current implementation.
// TODO: Remove this after supporting scan lock for in-memory pessimistic lock.
tkcfg := testkit.NewTestKit(t, store)
res := tkcfg.MustQuery("show config where name = 'pessimistic-txn.in-memory' and type = 'tikv'").Rows()
if len(res) > 0 && res[0][3].(string) == "true" {
tkcfg.MustExec("set config tikv `pessimistic-txn.in-memory`=\"false\"")
tkcfg.MustQuery("show warnings").Check(testkit.Rows())
defer func() {
tkcfg.MustExec("set config tikv `pessimistic-txn.in-memory`=\"true\"")
}()
time.Sleep(time.Second)
} else {
t.Log("skip disabling in-memory pessimistic lock, current config:", res)
}
}

tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t1 (id int primary key, v int)")
tk.MustExec("create table t2 (id int primary key, v int)")
tk.MustExec("create table t3 (id int primary key, v int)")
tk.MustExec("insert into t1 values (1, 1), (2, 2)")
tk.MustExec("insert into t2 values (1, 1), (2, 2), (3, 3), (4, 4), (5, 5)")
tk.MustExec("insert into t3 values (1, 1)")
tk.MustExec("set @@tidb_enable_async_commit=0")
tk.MustExec("set @@tidb_enable_1pc=0")

// Split region
{
tableID, err := strconv.ParseInt(tk.MustQuery(`select tidb_table_id from information_schema.tables where table_schema = "test" and table_name = "t2"`).Rows()[0][0].(string), 10, 64)
require.NoError(t, err)
key := tablecodec.EncodeTablePrefix(tableID)
_, err = domain.GetPDClient().SplitRegions(context.Background(), [][]byte{key})
require.NoError(t, err)
}

tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk3 := testkit.NewTestKit(t, store)
tk3.MustExec("use test")

require.NoError(t, failpoint.Enable("tikvclient/beforeAsyncPessimisticRollback", `return("skip")`))
require.NoError(t, failpoint.Enable("tikvclient/beforeCommitSecondaries", `return("skip")`))
require.NoError(t, failpoint.Enable("tikvclient/twoPCRequestBatchSizeLimit", `return`))
require.NoError(t, failpoint.Enable("tikvclient/onRollback", `return("skipRollbackPessimisticLock")`))
defer func() {
require.NoError(t, failpoint.Disable("tikvclient/beforeAsyncPessimisticRollback"))
require.NoError(t, failpoint.Disable("tikvclient/beforeCommitSecondaries"))
require.NoError(t, failpoint.Disable("tikvclient/twoPCRequestBatchSizeLimit"))
require.NoError(t, failpoint.Disable("tikvclient/onRollback"))
}()

// ----------------
// Simulate issue https://github.com/pingcap/tidb/issues/43243

tk.MustExec("begin pessimistic")
tk2.MustExec("begin pessimistic")
tk2.MustExec("update t2 set v = v + 1 where id = 2")
ch := make(chan struct{})
go func() {
// The `MERGE()` hint is not supported in release-6.1 branch. We use an equivalent `join` statement instead.
// tk.MustExec(`
// with
// c as (select /*+ MERGE() */ v from t1 where id in (1, 2))
// update c join t2 on c.v = t2.id set t2.v = t2.v + 10`)
tk.MustExec(`update t1 join t2 on t1.v = t2.id set t2.v = t2.v + 10 where t1.id in (1, 2)`)
ch <- struct{}{}
}()
// tk blocked on row 2
mustTimeout(t, ch, time.Millisecond*100)
// Change the rows that should be locked by tk.
tk3.MustExec("update t1 set v = v + 3")
// Release row 2 and resume tk.
tk2.MustExec("commit")
mustRecv(t, ch)

// tk should have updated row 4 and row 5, and 4 should be the primary.
// At the same time row 1 should be the old primary, row2 points to row 1.
// Add another secondary that's smaller than the current primary.
tk.MustExec("update t2 set v = v + 10 where id = 3")
tk.MustExec("commit")

// ----------------
// Simulate issue https://github.com/pingcap/tidb/issues/45134
tk.MustExec("begin pessimistic")
tk.MustQuery("select * from t3 where id = 1 for update").Check(testkit.Rows("1 1"))
tk.MustExec("rollback")
// tk leaves a pessimistic lock on row 6. Try to ensure it.
mustLocked(t, store, "select * from t3 where id = 1 for update nowait")

// Simulate a later GC that should resolve all stale lock produced in above steps.
currentTS, err := store.CurrentVersion(kv.GlobalTxnScope)
require.NoError(t, err)
_, err = gcworker.RunResolveLocks(context.Background(), store.(tikv.Storage), domain.GetPDClient(), currentTS.Ver, "gc-worker-test-batch-resolve-locks", 1, false)
require.NoError(t, err)

// Check row 6 unlocked
tk3.MustExec("begin pessimistic")
tk3.MustQuery("select * from t3 where id = 1 for update nowait").Check(testkit.Rows("1 1"))
tk3.MustExec("rollback")

// Check data consistency
tk.MustQuery("select * from t2 order by id").Check(testkit.Rows("1 1", "2 3", "3 13", "4 14", "5 15"))
}