Skip to content

Commit

Permalink
infoschema, executor, txn: implement DATA_LOCK_WAITS table (#24750)
Browse files Browse the repository at this point in the history
  • Loading branch information
longfangsong committed May 27, 2021
1 parent 415dae6 commit e87d035
Show file tree
Hide file tree
Showing 17 changed files with 208 additions and 45 deletions.
3 changes: 2 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1537,7 +1537,8 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
strings.ToLower(infoschema.TableTiDBTrx),
strings.ToLower(infoschema.ClusterTableTiDBTrx),
strings.ToLower(infoschema.TableDeadlocks),
strings.ToLower(infoschema.ClusterTableDeadlocks):
strings.ToLower(infoschema.ClusterTableDeadlocks),
strings.ToLower(infoschema.TableDataLockWaits):
return &MemTableReaderExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ID()),
table: v.Table,
Expand Down
69 changes: 44 additions & 25 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -53,13 +54,16 @@ import (
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/deadlockhistory"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/sem"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/sqlexec"
"github.com/pingcap/tidb/util/stmtsummary"
"github.com/pingcap/tidb/util/stringutil"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
)

type memtableRetriever struct {
Expand Down Expand Up @@ -158,6 +162,8 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
err = e.setDataForDeadlock(sctx)
case infoschema.ClusterTableDeadlocks:
err = e.setDataForClusterDeadlock(sctx)
case infoschema.TableDataLockWaits:
err = e.setDataForTableDataLockWaits(sctx)
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -1005,6 +1011,40 @@ func (e *memtableRetriever) dataForTiKVStoreStatus(ctx sessionctx.Context) (err
return nil
}

func hasPriv(ctx sessionctx.Context, priv mysql.PrivilegeType) bool {
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
return pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", priv)
}
return false
}

func (e *memtableRetriever) setDataForTableDataLockWaits(ctx sessionctx.Context) error {
if !hasPriv(ctx, mysql.ProcessPriv) {
return plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
}
waits, err := ctx.GetStore().GetLockWaits()
if err != nil {
return err
}
for _, wait := range waits {
var digestStr interface{}
digest, err := resourcegrouptag.DecodeResourceGroupTag(wait.ResourceGroupTag)
if err != nil {
logutil.BgLogger().Warn("failed to decode resource group tag", zap.Error(err))
digestStr = nil
} else {
digestStr = hex.EncodeToString(digest)
}
e.rows = append(e.rows, types.MakeDatums(
wait.Key,
wait.Txn,
wait.WaitForTxn,
digestStr,
))
}
return nil
}

// DDLJobsReaderExec executes DDLJobs information retrieving.
type DDLJobsReaderExec struct {
baseExecutor
Expand Down Expand Up @@ -1189,13 +1229,7 @@ func (e *memtableRetriever) setDataForProcessList(ctx sessionctx.Context) {
}

loginUser := ctx.GetSessionVars().User
var hasProcessPriv bool
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) {
hasProcessPriv = true
}
}

hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv)
pl := sm.ShowProcessList()

records := make([][]types.Datum, 0, len(pl))
Expand Down Expand Up @@ -1946,13 +1980,8 @@ func (e *memtableRetriever) setDataForPlacementPolicy(ctx sessionctx.Context) er
func (e *memtableRetriever) setDataForClientErrorsSummary(ctx sessionctx.Context, tableName string) error {
// Seeing client errors should require the PROCESS privilege, with the exception of errors for your own user.
// This is similar to information_schema.processlist, which is the closest comparison.
var hasProcessPriv bool
hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv)
loginUser := ctx.GetSessionVars().User
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) {
hasProcessPriv = true
}
}

var rows [][]types.Datum
switch tableName {
Expand Down Expand Up @@ -2026,12 +2055,7 @@ func (e *memtableRetriever) setDataForTiDBTrx(ctx sessionctx.Context) {
}

loginUser := ctx.GetSessionVars().User
var hasProcessPriv bool
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
if pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv) {
hasProcessPriv = true
}
}
hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv)
infoList := sm.ShowTxnList()
for _, info := range infoList {
// If you have the PROCESS privilege, you can see all running transactions.
Expand All @@ -2054,12 +2078,7 @@ func (e *memtableRetriever) setDataForClusterTiDBTrx(ctx sessionctx.Context) err
}

func (e *memtableRetriever) setDataForDeadlock(ctx sessionctx.Context) error {
hasPriv := false
if pm := privilege.GetPrivilegeManager(ctx); pm != nil {
hasPriv = pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", mysql.ProcessPriv)
}

if !hasPriv {
if !hasPriv(ctx, mysql.ProcessPriv) {
return plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
}

Expand Down
11 changes: 11 additions & 0 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,8 @@ const (
TableTiDBTrx = "TIDB_TRX"
// TableDeadlocks is the string constatnt of deadlock table.
TableDeadlocks = "DEADLOCKS"
// TableDataLockWaits is current lock waiting status table.
TableDataLockWaits = "DATA_LOCK_WAITS"
)

var tableIDMap = map[string]int64{
Expand Down Expand Up @@ -243,6 +245,7 @@ var tableIDMap = map[string]int64{
ClusterTableTiDBTrx: autoid.InformationSchemaDBID + 71,
TableDeadlocks: autoid.InformationSchemaDBID + 72,
ClusterTableDeadlocks: autoid.InformationSchemaDBID + 73,
TableDataLockWaits: autoid.InformationSchemaDBID + 74,
}

type columnInfo struct {
Expand Down Expand Up @@ -1368,6 +1371,13 @@ var tableDeadlocksCols = []columnInfo{
{name: "TRX_HOLDING_LOCK", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The transaction ID (start ts) of the transaction that's currently holding the lock"},
}

var tableDataLockWaitsCols = []columnInfo{
{name: "KEY", tp: mysql.TypeVarchar, size: 64, flag: mysql.NotNullFlag, comment: "The key that's being waiting on"},
{name: "TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "Current transaction that's waiting for the lock"},
{name: "CURRENT_HOLDING_TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "The transaction that's holding the lock and blocks the current transaction"},
{name: "SQL_DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the SQL that's trying to acquire the lock"},
}

// GetShardingInfo returns a nil or description string for the sharding information of given TableInfo.
// The returned description string may be:
// - "NOT_SHARDED": for tables that SHARD_ROW_ID_BITS is not specified.
Expand Down Expand Up @@ -1739,6 +1749,7 @@ var tableNameToColumns = map[string][]columnInfo{
TableClientErrorsSummaryByHost: tableClientErrorsSummaryByHostCols,
TableTiDBTrx: tableTiDBTrxCols,
TableDeadlocks: tableDeadlocksCols,
TableDataLockWaits: tableDataLockWaitsCols,
}

func createInfoSchemaTable(_ autoid.Allocators, meta *model.TableInfo) (table.Table, error) {
Expand Down
58 changes: 58 additions & 0 deletions infoschema/tables_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/fn"
"github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/parser"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
Expand All @@ -46,9 +47,13 @@ import (
"github.com/pingcap/tidb/session/txninfo"
"github.com/pingcap/tidb/store/helper"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/store/mockstore/mockstorage"
"github.com/pingcap/tidb/store/mockstore/unistore"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/kvcache"
"github.com/pingcap/tidb/util/pdapi"
"github.com/pingcap/tidb/util/resourcegrouptag"
"github.com/pingcap/tidb/util/set"
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"
Expand All @@ -57,12 +62,17 @@ import (
)

var _ = Suite(&testTableSuite{&testTableSuiteBase{}})
var _ = Suite(&testDataLockWaitSuite{&testTableSuiteBase{}})
var _ = SerialSuites(&testClusterTableSuite{testTableSuiteBase: &testTableSuiteBase{}})

type testTableSuite struct {
*testTableSuiteBase
}

type testDataLockWaitSuite struct {
*testTableSuiteBase
}

type testTableSuiteBase struct {
store kv.Storage
dom *domain.Domain
Expand Down Expand Up @@ -1552,3 +1562,51 @@ func (s *testTableSuite) TestInfoschemaDeadlockPrivilege(c *C) {
}, nil, nil), IsTrue)
_ = tk.MustQuery("select * from information_schema.deadlocks")
}

func (s *testDataLockWaitSuite) SetUpSuite(c *C) {
testleak.BeforeTest()

client, pdClient, cluster, err := unistore.New("")
c.Assert(err, IsNil)
unistore.BootstrapWithSingleStore(cluster)
kvstore, err := tikv.NewTestTiKVStore(client, pdClient, nil, nil, 0)
c.Assert(err, IsNil)
_, digest1 := parser.NormalizeDigest("select * from t1 for update;")
_, digest2 := parser.NormalizeDigest("update t1 set f1=1 where id=2;")
s.store, err = mockstorage.NewMockStorageWithLockWaits(kvstore, []*deadlock.WaitForEntry{
{Txn: 1, WaitForTxn: 2, KeyHash: 3, Key: []byte("a"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest1, nil)},
{Txn: 4, WaitForTxn: 5, KeyHash: 6, Key: []byte("b"), ResourceGroupTag: resourcegrouptag.EncodeResourceGroupTag(digest2, nil)},
})
c.Assert(err, IsNil)
session.DisableStats4Test()
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
}

func (s *testDataLockWaitSuite) TestDataLockWait(c *C) {
_, digest1 := parser.NormalizeDigest("select * from t1 for update;")
_, digest2 := parser.NormalizeDigest("update t1 set f1=1 where id=2;")
tk := s.newTestKitWithRoot(c)
tk.MustQuery("select * from information_schema.DATA_LOCK_WAITS;").Check(testkit.Rows("a 1 2 "+digest1.String(), "b 4 5 "+digest2.String()))
}

func (s *testDataLockWaitSuite) TestDataLockPrivilege(c *C) {
tk := s.newTestKitWithRoot(c)
tk.MustExec("create user 'testuser'@'localhost'")
c.Assert(tk.Se.Auth(&auth.UserIdentity{
Username: "testuser",
Hostname: "localhost",
}, nil, nil), IsTrue)
err := tk.QueryToErr("select * from information_schema.DATA_LOCK_WAITS")
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[planner:1227]Access denied; you need (at least one of) the PROCESS privilege(s) for this operation")

tk = s.newTestKitWithRoot(c)
tk.MustExec("create user 'testuser2'@'localhost'")
tk.MustExec("grant process on *.* to 'testuser2'@'localhost'")
c.Assert(tk.Se.Auth(&auth.UserIdentity{
Username: "testuser2",
Hostname: "localhost",
}, nil, nil), IsTrue)
_ = tk.MustQuery("select * from information_schema.DATA_LOCK_WAITS")
}
13 changes: 13 additions & 0 deletions kv/interface_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kv
import (
"context"

deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -103,6 +104,10 @@ func (t *mockTxn) GetSnapshot() Snapshot {
return nil
}

func (t *mockTxn) GetUnionStore() UnionStore {
return nil
}

func (t *mockTxn) NewStagingBuffer() MemBuffer {
return nil
}
Expand Down Expand Up @@ -210,6 +215,10 @@ func (s *mockStorage) GetMemCache() MemManager {
return nil
}

func (s *mockStorage) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) {
return nil, nil
}

func (s *mockStorage) GetMinSafeTS(txnScope string) uint64 {
return 0
}
Expand Down Expand Up @@ -255,3 +264,7 @@ func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) {
}

func (s *mockSnapshot) SetOption(opt int, val interface{}) {}

func (s *mockSnapshot) GetLockWaits() []deadlockpb.WaitForEntry {
return nil
}
5 changes: 4 additions & 1 deletion kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"crypto/tls"
"time"

deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/config"
Expand Down Expand Up @@ -340,7 +341,7 @@ type Driver interface {
type Storage interface {
// Begin a global transaction
Begin() (Transaction, error)
// Begin a transaction with given option
// BeginWithOption begins a transaction with given option
BeginWithOption(option tikv.StartTSOption) (Transaction, error)
// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ver is MaxVersion or > current max committed version, we will use current version for this snapshot.
Expand Down Expand Up @@ -369,6 +370,8 @@ type Storage interface {
GetMemCache() MemManager
// GetMinSafeTS return the minimal SafeTS of the storage with given txnScope.
GetMinSafeTS(txnScope string) uint64
// GetLockWaits return all lock wait information
GetLockWaits() ([]*deadlockpb.WaitForEntry, error)
}

// EtcdBackend is used for judging a storage is a real TiKV.
Expand Down
23 changes: 23 additions & 0 deletions store/driver/tikv_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import (
"time"

"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/copr"
derr "github.com/pingcap/tidb/store/driver/error"
txn_driver "github.com/pingcap/tidb/store/driver/txn"
"github.com/pingcap/tidb/store/gcworker"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/config"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/store/tikv/util"
"github.com/pingcap/tidb/util/logutil"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -331,3 +334,23 @@ func (s *tikvStore) CurrentVersion(txnScope string) (kv.Version, error) {
func (s *tikvStore) ShowStatus(ctx context.Context, key string) (interface{}, error) {
return nil, kv.ErrNotImplemented
}

// GetLockWaits get return lock waits info
func (s *tikvStore) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) {
stores := s.GetRegionCache().GetStoresByType(tikvrpc.TiKV)
var result []*deadlockpb.WaitForEntry
for _, store := range stores {
resp, err := s.GetTiKVClient().SendRequest(context.TODO(), store.GetAddr(), tikvrpc.NewRequest(tikvrpc.CmdLockWaitInfo, &kvrpcpb.GetLockWaitInfoRequest{}), time.Second*30)
if err != nil {
logutil.BgLogger().Warn("query lock wait info failed", zap.Error(err))
continue
}
if resp.Resp == nil {
logutil.BgLogger().Warn("lock wait info from store is nil")
continue
}
entries := resp.Resp.(*kvrpcpb.GetLockWaitInfoResponse).Entries
result = append(result, entries...)
}
return result, nil
}
2 changes: 2 additions & 0 deletions store/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/pingcap/errors"
deadlockpb "github.com/pingcap/kvproto/pkg/deadlock"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/log"
"github.com/pingcap/parser/model"
Expand Down Expand Up @@ -72,6 +73,7 @@ type Storage interface {
GetTiKVClient() tikv.Client
Closed() <-chan struct{}
GetMinSafeTS(txnScope string) uint64
GetLockWaits() ([]*deadlockpb.WaitForEntry, error)
}

// Helper is a middleware to get some information from tikv/pd. It can be used for TiDB's http api or mem table.
Expand Down
Loading

0 comments on commit e87d035

Please sign in to comment.