From e87d0359373f261d50106a29f0a9dd3d8de46214 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=BE=99=E6=96=B9=E6=B7=9E?= Date: Thu, 27 May 2021 17:57:36 +0800 Subject: [PATCH] infoschema, executor, txn: implement DATA_LOCK_WAITS table (#24750) --- executor/builder.go | 3 +- executor/infoschema_reader.go | 69 ++++++++++++------- infoschema/tables.go | 11 +++ infoschema/tables_test.go | 58 ++++++++++++++++ kv/interface_mock_test.go | 13 ++++ kv/kv.go | 5 +- store/driver/tikv_driver.go | 23 +++++++ store/helper/helper.go | 2 + store/mockstore/mockstorage/storage.go | 21 ++++-- store/mockstore/unistore/tikv/detector.go | 8 +-- .../mockstore/unistore/tikv/detector_test.go | 4 +- store/mockstore/unistore/tikv/errors.go | 4 +- store/tikv/kv.go | 4 +- store/tikv/region_cache.go | 8 ++- store/tikv/tests/lock_test.go | 4 +- store/tikv/tikvrpc/tikvrpc.go | 10 +++ util/mock/store.go | 6 ++ 17 files changed, 208 insertions(+), 45 deletions(-) diff --git a/executor/builder.go b/executor/builder.go index 7a66594222d09..2f644a6eed2c4 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -1537,7 +1537,8 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo strings.ToLower(infoschema.TableTiDBTrx), strings.ToLower(infoschema.ClusterTableTiDBTrx), strings.ToLower(infoschema.TableDeadlocks), - strings.ToLower(infoschema.ClusterTableDeadlocks): + strings.ToLower(infoschema.ClusterTableDeadlocks), + strings.ToLower(infoschema.TableDataLockWaits): return &MemTableReaderExec{ baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()), table: v.Table, diff --git a/executor/infoschema_reader.go b/executor/infoschema_reader.go index d049fb3eb39e8..3b4fc2e159e05 100644 --- a/executor/infoschema_reader.go +++ b/executor/infoschema_reader.go @@ -16,6 +16,7 @@ package executor import ( "bytes" "context" + "encoding/hex" "encoding/json" "fmt" "io" @@ -53,13 +54,16 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/deadlockhistory" + "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/pdapi" + "github.com/pingcap/tidb/util/resourcegrouptag" "github.com/pingcap/tidb/util/sem" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/sqlexec" "github.com/pingcap/tidb/util/stmtsummary" "github.com/pingcap/tidb/util/stringutil" "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" ) type memtableRetriever struct { @@ -158,6 +162,8 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex err = e.setDataForDeadlock(sctx) case infoschema.ClusterTableDeadlocks: err = e.setDataForClusterDeadlock(sctx) + case infoschema.TableDataLockWaits: + err = e.setDataForTableDataLockWaits(sctx) } if err != nil { return nil, err @@ -1005,6 +1011,40 @@ func (e *memtableRetriever) dataForTiKVStoreStatus(ctx sessionctx.Context) (err return nil } +func hasPriv(ctx sessionctx.Context, priv mysql.PrivilegeType) bool { + if pm := privilege.GetPrivilegeManager(ctx); pm != nil { + return pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", priv) + } + return false +} + +func (e *memtableRetriever) setDataForTableDataLockWaits(ctx sessionctx.Context) error { + if !hasPriv(ctx, mysql.ProcessPriv) { + return plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS") + } + waits, err := ctx.GetStore().GetLockWaits() + if err != nil { + return err + } + for _, wait := range waits { + var digestStr interface{} + digest, err := resourcegrouptag.DecodeResourceGroupTag(wait.ResourceGroupTag) + if err != nil { + logutil.BgLogger().Warn("failed to decode resource group tag", zap.Error(err)) + digestStr = nil + } else { + digestStr = hex.EncodeToString(digest) + } + e.rows = append(e.rows, types.MakeDatums( + wait.Key, + wait.Txn, + wait.WaitForTxn, + digestStr, + )) + } + return nil +} + // DDLJobsReaderExec executes DDLJobs information retrieving. type DDLJobsReaderExec struct { baseExecutor @@ -1189,13 +1229,7 @@ func (e *memtableRetriever) setDataForProcessList(ctx sessionctx.Context) { } loginUser := ctx.GetSessionVars().User - var hasProcessPriv bool - if pm := privilege.GetPrivilegeManager(ctx); pm != nil { - if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) { - hasProcessPriv = true - } - } - + hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv) pl := sm.ShowProcessList() records := make([][]types.Datum, 0, len(pl)) @@ -1946,13 +1980,8 @@ func (e *memtableRetriever) setDataForPlacementPolicy(ctx sessionctx.Context) er func (e *memtableRetriever) setDataForClientErrorsSummary(ctx sessionctx.Context, tableName string) error { // Seeing client errors should require the PROCESS privilege, with the exception of errors for your own user. // This is similar to information_schema.processlist, which is the closest comparison. - var hasProcessPriv bool + hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv) loginUser := ctx.GetSessionVars().User - if pm := privilege.GetPrivilegeManager(ctx); pm != nil { - if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) { - hasProcessPriv = true - } - } var rows [][]types.Datum switch tableName { @@ -2026,12 +2055,7 @@ func (e *memtableRetriever) setDataForTiDBTrx(ctx sessionctx.Context) { } loginUser := ctx.GetSessionVars().User - var hasProcessPriv bool - if pm := privilege.GetPrivilegeManager(ctx); pm != nil { - if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) { - hasProcessPriv = true - } - } + hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv) infoList := sm.ShowTxnList() for _, info := range infoList { // If you have the PROCESS privilege, you can see all running transactions. @@ -2054,12 +2078,7 @@ func (e *memtableRetriever) setDataForClusterTiDBTrx(ctx sessionctx.Context) err } func (e *memtableRetriever) setDataForDeadlock(ctx sessionctx.Context) error { - hasPriv := false - if pm := privilege.GetPrivilegeManager(ctx); pm != nil { - hasPriv = pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) - } - - if !hasPriv { + if !hasPriv(ctx, mysql.ProcessPriv) { return plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS") } diff --git a/infoschema/tables.go b/infoschema/tables.go index 04988cb07c2b7..fec9378a491b3 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -167,6 +167,8 @@ const ( TableTiDBTrx = "TIDB_TRX" // TableDeadlocks is the string constatnt of deadlock table. TableDeadlocks = "DEADLOCKS" + // TableDataLockWaits is current lock waiting status table. + TableDataLockWaits = "DATA_LOCK_WAITS" ) var tableIDMap = map[string]int64{ @@ -243,6 +245,7 @@ var tableIDMap = map[string]int64{ ClusterTableTiDBTrx: autoid.InformationSchemaDBID + 71, TableDeadlocks: autoid.InformationSchemaDBID + 72, ClusterTableDeadlocks: autoid.InformationSchemaDBID + 73, + TableDataLockWaits: autoid.InformationSchemaDBID + 74, } type columnInfo struct { @@ -1368,6 +1371,13 @@ var tableDeadlocksCols = []columnInfo{ {name: "TRX_HOLDING_LOCK", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The transaction ID (start ts) of the transaction that's currently holding the lock"}, } +var tableDataLockWaitsCols = []columnInfo{ + {name: "KEY", tp: mysql.TypeVarchar, size: 64, flag: mysql.NotNullFlag, comment: "The key that's being waiting on"}, + {name: "TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "Current transaction that's waiting for the lock"}, + {name: "CURRENT_HOLDING_TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "The transaction that's holding the lock and blocks the current transaction"}, + {name: "SQL_DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the SQL that's trying to acquire the lock"}, +} + // GetShardingInfo returns a nil or description string for the sharding information of given TableInfo. // The returned description string may be: // - "NOT_SHARDED": for tables that SHARD_ROW_ID_BITS is not specified. @@ -1739,6 +1749,7 @@ var tableNameToColumns = map[string][]columnInfo{ TableClientErrorsSummaryByHost: tableClientErrorsSummaryByHostCols, TableTiDBTrx: tableTiDBTrxCols, TableDeadlocks: tableDeadlocksCols, + TableDataLockWaits: tableDataLockWaitsCols, } func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) { diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 761bb75fb76ed..65b5a37a4ba2a 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -28,6 +28,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/fn" + "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/parser" "github.com/pingcap/parser/auth" "github.com/pingcap/parser/model" @@ -46,9 +47,13 @@ import ( "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/store/helper" "github.com/pingcap/tidb/store/mockstore" + "github.com/pingcap/tidb/store/mockstore/mockstorage" + "github.com/pingcap/tidb/store/mockstore/unistore" + "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/kvcache" "github.com/pingcap/tidb/util/pdapi" + "github.com/pingcap/tidb/util/resourcegrouptag" "github.com/pingcap/tidb/util/set" "github.com/pingcap/tidb/util/testkit" "github.com/pingcap/tidb/util/testleak" @@ -57,12 +62,17 @@ import ( ) var _ = Suite(&testTableSuite{&testTableSuiteBase{}}) +var _ = Suite(&testDataLockWaitSuite{&testTableSuiteBase{}}) var _ = SerialSuites(&testClusterTableSuite{testTableSuiteBase: &testTableSuiteBase{}}) type testTableSuite struct { *testTableSuiteBase } +type testDataLockWaitSuite struct { + *testTableSuiteBase +} + type testTableSuiteBase struct { store kv.Storage dom *domain.Domain @@ -1552,3 +1562,51 @@ func (s *testTableSuite) TestInfoschemaDeadlockPrivilege(c *C) { }, nil, nil), IsTrue) _ = tk.MustQuery("select * from information_schema.deadlocks") } + +func (s *testDataLockWaitSuite) SetUpSuite(c *C) { + testleak.BeforeTest() + + client, pdClient, cluster, err := unistore.New("") + c.Assert(err, IsNil) + unistore.BootstrapWithSingleStore(cluster) + kvstore, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0) + c.Assert(err, IsNil) + _, digest1 := parser.NormalizeDigest("select * from t1 for update;") + _, digest2 := parser.NormalizeDigest("update t1 set f1=1 where id=2;") + s.store, err = mockstorage.NewMockStorageWithLockWaits(kvstore, []*deadlock.WaitForEntry{ + {Txn: 1, WaitForTxn: 2, KeyHash: 3, Key: []byte("a"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest1, nil)}, + {Txn: 4, WaitForTxn: 5, KeyHash: 6, Key: []byte("b"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest2, nil)}, + }) + c.Assert(err, IsNil) + session.DisableStats4Test() + s.dom, err = session.BootstrapSession(s.store) + c.Assert(err, IsNil) +} + +func (s *testDataLockWaitSuite) TestDataLockWait(c *C) { + _, digest1 := parser.NormalizeDigest("select * from t1 for update;") + _, digest2 := parser.NormalizeDigest("update t1 set f1=1 where id=2;") + tk := s.newTestKitWithRoot(c) + tk.MustQuery("select * from information_schema.DATA_LOCK_WAITS;").Check(testkit.Rows("a 1 2 "+digest1.String(), "b 4 5 "+digest2.String())) +} + +func (s *testDataLockWaitSuite) TestDataLockPrivilege(c *C) { + tk := s.newTestKitWithRoot(c) + tk.MustExec("create user 'testuser'@'localhost'") + c.Assert(tk.Se.Auth(&auth.UserIdentity{ + Username: "testuser", + Hostname: "localhost", + }, nil, nil), IsTrue) + err := tk.QueryToErr("select * from information_schema.DATA_LOCK_WAITS") + c.Assert(err, NotNil) + c.Assert(err.Error(), Equals, "[planner:1227]Access denied; you need (at least one of) the PROCESS privilege(s) for this operation") + + tk = s.newTestKitWithRoot(c) + tk.MustExec("create user 'testuser2'@'localhost'") + tk.MustExec("grant process on *.* to 'testuser2'@'localhost'") + c.Assert(tk.Se.Auth(&auth.UserIdentity{ + Username: "testuser2", + Hostname: "localhost", + }, nil, nil), IsTrue) + _ = tk.MustQuery("select * from information_schema.DATA_LOCK_WAITS") +} diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 0c471c6fb40e0..2e78b8725a7dc 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -16,6 +16,7 @@ package kv import ( "context" + deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" @@ -103,6 +104,10 @@ func (t *mockTxn) GetSnapshot() Snapshot { return nil } +func (t *mockTxn) GetUnionStore() UnionStore { + return nil +} + func (t *mockTxn) NewStagingBuffer() MemBuffer { return nil } @@ -210,6 +215,10 @@ func (s *mockStorage) GetMemCache() MemManager { return nil } +func (s *mockStorage) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) { + return nil, nil +} + func (s *mockStorage) GetMinSafeTS(txnScope string) uint64 { return 0 } @@ -255,3 +264,7 @@ func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) { } func (s *mockSnapshot) SetOption(opt int, val interface{}) {} + +func (s *mockSnapshot) GetLockWaits() []deadlockpb.WaitForEntry { + return nil +} diff --git a/kv/kv.go b/kv/kv.go index fdcc7148247af..d65258131d184 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -18,6 +18,7 @@ import ( "crypto/tls" "time" + deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/config" @@ -340,7 +341,7 @@ type Driver interface { type Storage interface { // Begin a global transaction Begin() (Transaction, error) - // Begin a transaction with given option + // BeginWithOption begins a transaction with given option BeginWithOption(option tikv.StartTSOption) (Transaction, error) // GetSnapshot gets a snapshot that is able to read any data which data is <= ver. // if ver is MaxVersion or > current max committed version, we will use current version for this snapshot. @@ -369,6 +370,8 @@ type Storage interface { GetMemCache() MemManager // GetMinSafeTS return the minimal SafeTS of the storage with given txnScope. GetMinSafeTS(txnScope string) uint64 + // GetLockWaits return all lock wait information + GetLockWaits() ([]*deadlockpb.WaitForEntry, error) } // EtcdBackend is used for judging a storage is a real TiKV. diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index 5f5471d8e7251..689c6ea170678 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -24,6 +24,8 @@ import ( "time" "github.com/pingcap/errors" + deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" + "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/copr" derr "github.com/pingcap/tidb/store/driver/error" @@ -31,6 +33,7 @@ import ( "github.com/pingcap/tidb/store/gcworker" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/config" + "github.com/pingcap/tidb/store/tikv/tikvrpc" "github.com/pingcap/tidb/store/tikv/util" "github.com/pingcap/tidb/util/logutil" pd "github.com/tikv/pd/client" @@ -331,3 +334,23 @@ func (s *tikvStore) CurrentVersion(txnScope string) (kv.Version, error) { func (s *tikvStore) ShowStatus(ctx context.Context, key string) (interface{}, error) { return nil, kv.ErrNotImplemented } + +// GetLockWaits get return lock waits info +func (s *tikvStore) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) { + stores := s.GetRegionCache().GetStoresByType(tikvrpc.TiKV) + var result []*deadlockpb.WaitForEntry + for _, store := range stores { + resp, err := s.GetTiKVClient().SendRequest(context.TODO(), store.GetAddr(), tikvrpc.NewRequest(tikvrpc.CmdLockWaitInfo, &kvrpcpb.GetLockWaitInfoRequest{}), time.Second*30) + if err != nil { + logutil.BgLogger().Warn("query lock wait info failed", zap.Error(err)) + continue + } + if resp.Resp == nil { + logutil.BgLogger().Warn("lock wait info from store is nil") + continue + } + entries := resp.Resp.(*kvrpcpb.GetLockWaitInfoResponse).Entries + result = append(result, entries...) + } + return result, nil +} diff --git a/store/helper/helper.go b/store/helper/helper.go index 53525f9687fa2..533b1d66d576e 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -29,6 +29,7 @@ import ( "time" "github.com/pingcap/errors" + deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/log" "github.com/pingcap/parser/model" @@ -72,6 +73,7 @@ type Storage interface { GetTiKVClient() tikv.Client Closed() <-chan struct{} GetMinSafeTS(txnScope string) uint64 + GetLockWaits() ([]*deadlockpb.WaitForEntry, error) } // Helper is a middleware to get some information from tikv/pd. It can be used for TiDB's http api or mem table. diff --git a/store/mockstore/mockstorage/storage.go b/store/mockstore/mockstorage/storage.go index 7d78d1a9b7418..4fa049fc69f42 100644 --- a/store/mockstore/mockstorage/storage.go +++ b/store/mockstore/mockstorage/storage.go @@ -17,6 +17,7 @@ import ( "context" "crypto/tls" + deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/copr" driver "github.com/pingcap/tidb/store/driver/txn" @@ -28,22 +29,28 @@ import ( type mockStorage struct { *tikv.KVStore *copr.Store - memCache kv.MemManager + memCache kv.MemManager + LockWaits []*deadlockpb.WaitForEntry } // NewMockStorage wraps tikv.KVStore as kv.Storage. func NewMockStorage(tikvStore *tikv.KVStore) (kv.Storage, error) { + return NewMockStorageWithLockWaits(tikvStore, nil) +} + +// NewMockStorageWithLockWaits wraps tikv.KVStore as kv.Storage, with mock LockWaits. +func NewMockStorageWithLockWaits(tikvStore *tikv.KVStore, lockWaits []*deadlockpb.WaitForEntry) (kv.Storage, error) { coprConfig := config.DefaultConfig().TiKVClient.CoprCache coprStore, err := copr.NewStore(tikvStore, &coprConfig) if err != nil { return nil, err } return &mockStorage{ - KVStore: tikvStore, - Store: coprStore, - memCache: kv.NewCacheDB(), + KVStore: tikvStore, + Store: coprStore, + memCache: kv.NewCacheDB(), + LockWaits: lockWaits, }, nil - } func (s *mockStorage) EtcdAddrs() ([]string, error) { @@ -111,6 +118,10 @@ func newTiKVTxn(txn *tikv.KVTxn, err error) (kv.Transaction, error) { return driver.NewTiKVTxn(txn), nil } +func (s *mockStorage) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) { + return s.LockWaits, nil +} + func (s *mockStorage) Close() error { s.Store.Close() return s.KVStore.Close() diff --git a/store/mockstore/unistore/tikv/detector.go b/store/mockstore/unistore/tikv/detector.go index 6a56a0d9fd73c..b86804696a670 100644 --- a/store/mockstore/unistore/tikv/detector.go +++ b/store/mockstore/unistore/tikv/detector.go @@ -30,7 +30,7 @@ import ( "sync" "time" - deadlockPB "github.com/pingcap/kvproto/pkg/deadlock" + deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/log" "go.uber.org/zap" ) @@ -93,7 +93,7 @@ func (d *Detector) Detect(sourceTxn, waitForTxn, keyHash uint64, diagCtx diagnos j := len(err.WaitChain) - i - 1 err.WaitChain[i], err.WaitChain[j] = err.WaitChain[j], err.WaitChain[i] } - err.WaitChain = append(err.WaitChain, &deadlockPB.WaitForEntry{ + err.WaitChain = append(err.WaitChain, &deadlockpb.WaitForEntry{ Txn: sourceTxn, Key: diagCtx.key, KeyHash: keyHash, @@ -122,7 +122,7 @@ func (d *Detector) doDetect(nowTime time.Time, sourceTxn, waitForTxn uint64) *Er } if keyHashPair.txn == sourceTxn { return &ErrDeadlock{DeadlockKeyHash: keyHashPair.keyHash, - WaitChain: []*deadlockPB.WaitForEntry{ + WaitChain: []*deadlockpb.WaitForEntry{ { Txn: waitForTxn, Key: keyHashPair.diagCtx.key, @@ -134,7 +134,7 @@ func (d *Detector) doDetect(nowTime time.Time, sourceTxn, waitForTxn uint64) *Er } } if err := d.doDetect(nowTime, sourceTxn, keyHashPair.txn); err != nil { - err.WaitChain = append(err.WaitChain, &deadlockPB.WaitForEntry{ + err.WaitChain = append(err.WaitChain, &deadlockpb.WaitForEntry{ Txn: waitForTxn, Key: keyHashPair.diagCtx.key, KeyHash: keyHashPair.keyHash, diff --git a/store/mockstore/unistore/tikv/detector_test.go b/store/mockstore/unistore/tikv/detector_test.go index 189a8b00b8217..c47260f886275 100644 --- a/store/mockstore/unistore/tikv/detector_test.go +++ b/store/mockstore/unistore/tikv/detector_test.go @@ -30,7 +30,7 @@ import ( "time" . "github.com/pingcap/check" - deadlockPB "github.com/pingcap/kvproto/pkg/deadlock" + deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" ) func TestT(t *testing.T) { @@ -48,7 +48,7 @@ func (s *testDeadlockSuite) TestDeadlock(c *C) { resourceGroupTag: []byte(resourceGroupTag), } } - checkWaitChainEntry := func(entry *deadlockPB.WaitForEntry, txn, waitForTxn uint64, key, resourceGroupTag string) { + checkWaitChainEntry := func(entry *deadlockpb.WaitForEntry, txn, waitForTxn uint64, key, resourceGroupTag string) { c.Assert(entry.Txn, Equals, txn) c.Assert(entry.WaitForTxn, Equals, waitForTxn) c.Assert(string(entry.Key), Equals, key) diff --git a/store/mockstore/unistore/tikv/errors.go b/store/mockstore/unistore/tikv/errors.go index 98a70951871d5..bce76319320c9 100644 --- a/store/mockstore/unistore/tikv/errors.go +++ b/store/mockstore/unistore/tikv/errors.go @@ -16,7 +16,7 @@ package tikv import ( "fmt" - deadlockPB "github.com/pingcap/kvproto/pkg/deadlock" + deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/store/mockstore/unistore/tikv/mvcc" ) @@ -91,7 +91,7 @@ type ErrDeadlock struct { LockKey []byte LockTS uint64 DeadlockKeyHash uint64 - WaitChain []*deadlockPB.WaitForEntry + WaitChain []*deadlockpb.WaitForEntry } func (e ErrDeadlock) Error() string { diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 563f985f95083..8adf5c1f58782 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -336,7 +336,7 @@ func (s *KVStore) GetTiKVClient() (client Client) { // GetMinSafeTS return the minimal safeTS of the storage with given txnScope. func (s *KVStore) GetMinSafeTS(txnScope string) uint64 { stores := make([]*Store, 0) - allStores := s.regionCache.getStoresByType(tikvrpc.TiKV) + allStores := s.regionCache.GetStoresByType(tikvrpc.TiKV) if txnScope != oracle.GlobalTxnScope { for _, store := range allStores { if store.IsLabelsMatch([]*metapb.StoreLabel{ @@ -402,7 +402,7 @@ func (s *KVStore) safeTSUpdater() { } func (s *KVStore) updateSafeTS(ctx context.Context) { - stores := s.regionCache.getStoresByType(tikvrpc.TiKV) + stores := s.regionCache.GetStoresByType(tikvrpc.TiKV) tikvClient := s.GetTiKVClient() wg := &sync.WaitGroup{} wg.Add(len(stores)) diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 2b0ddd99f6454..287edeb4170f5 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -1268,8 +1268,9 @@ func (c *RegionCache) getRegionByIDFromCache(regionID uint64) *Region { return latestRegion } +// GetStoresByType gets stores by type `typ` // TODO: revise it by get store by closure. -func (c *RegionCache) getStoresByType(typ tikvrpc.EndpointType) []*Store { +func (c *RegionCache) GetStoresByType(typ tikvrpc.EndpointType) []*Store { c.storeMu.Lock() defer c.storeMu.Unlock() stores := make([]*Store, 0) @@ -2236,6 +2237,11 @@ func (s *Store) requestLiveness(bo *Backoffer, c *RegionCache) (l livenessState) return } +// GetAddr returns the address of the store +func (s *Store) GetAddr() string { + return s.addr +} + func invokeKVStatusAPI(addr string, timeout time.Duration) (l livenessState) { start := time.Now() defer func() { diff --git a/store/tikv/tests/lock_test.go b/store/tikv/tests/lock_test.go index d64c1d102e6d1..d52329c50cfa5 100644 --- a/store/tikv/tests/lock_test.go +++ b/store/tikv/tests/lock_test.go @@ -25,7 +25,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/failpoint" - deadlockPB "github.com/pingcap/kvproto/pkg/deadlock" + deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/store/tikv" tikverr "github.com/pingcap/tidb/store/tikv/error" @@ -714,7 +714,7 @@ func (s *testLockSuite) TestDeadlockReportWaitChain(c *C) { } // Check the given WaitForEntry is caused by txn[i] waiting for txn[j]. - checkWaitChainEntry := func(txns []*txnWrapper, entry *deadlockPB.WaitForEntry, i, j int) { + checkWaitChainEntry := func(txns []*txnWrapper, entry *deadlockpb.WaitForEntry, i, j int) { c.Assert(entry.Txn, Equals, txns[i].StartTS()) c.Assert(entry.WaitForTxn, Equals, txns[j].StartTS()) c.Assert(entry.Key, BytesEquals, []byte{'k', byte(j)}) diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index ac213e0da8239..9f1b4fc0806aa 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -69,6 +69,7 @@ const ( CmdPhysicalScanLock CmdStoreSafeTS + CmdLockWaitInfo CmdCop CmdType = 512 + iota CmdCopStream @@ -168,6 +169,8 @@ func (t CmdType) String() string { return "TxnHeartBeat" case CmdStoreSafeTS: return "StoreSafeTS" + case CmdLockWaitInfo: + return "LockWaitInfo" } return "Unknown" } @@ -427,6 +430,11 @@ func (req *Request) StoreSafeTS() *kvrpcpb.StoreSafeTSRequest { return req.Req.(*kvrpcpb.StoreSafeTSRequest) } +// LockWaitInfo returns GetLockWaitInfoRequest in request. +func (req *Request) LockWaitInfo() *kvrpcpb.GetLockWaitInfoRequest { + return req.Req.(*kvrpcpb.GetLockWaitInfoRequest) +} + // ToBatchCommandsRequest converts the request to an entry in BatchCommands request. func (req *Request) ToBatchCommandsRequest() *tikvpb.BatchCommandsRequest_Request { switch req.Type { @@ -924,6 +932,8 @@ func CallRPC(ctx context.Context, client tikvpb.TikvClient, req *Request) (*Resp resp.Resp, err = client.KvTxnHeartBeat(ctx, req.TxnHeartBeat()) case CmdStoreSafeTS: resp.Resp, err = client.GetStoreSafeTS(ctx, req.StoreSafeTS()) + case CmdLockWaitInfo: + resp.Resp, err = client.GetLockWaitInfo(ctx, req.LockWaitInfo()) default: return nil, errors.Errorf("invalid request type: %v", req.Type) } diff --git a/util/mock/store.go b/util/mock/store.go index 3adba59e115e5..beefae3dc7171 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -16,6 +16,7 @@ package mock import ( "context" + deadlockpb "github.com/pingcap/kvproto/pkg/deadlock" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" @@ -78,3 +79,8 @@ func (s *Store) ShowStatus(ctx context.Context, key string) (interface{}, error) func (s *Store) GetMinSafeTS(txnScope string) uint64 { return 0 } + +// GetLockWaits implements kv.Storage interface. +func (s *Store) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) { + return nil, nil +}