diff --git a/executor/executor_test.go b/executor/executor_test.go index 65c60f392f689..47f070641e8d6 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -8187,14 +8187,14 @@ func (s *testSerialSuite) TestDeadlockTable(c *C) { TryLockTxn: 101, SQLDigest: "aabbccdd", Key: []byte("k1"), - AllSQLs: nil, + AllSQLDigests: nil, TxnHoldingLock: 102, }, { TryLockTxn: 102, SQLDigest: "ddccbbaa", Key: []byte("k2"), - AllSQLs: []string{"sql1"}, + AllSQLDigests: []string{"sql1"}, TxnHoldingLock: 101, }, }, @@ -8208,12 +8208,12 @@ func (s *testSerialSuite) TestDeadlockTable(c *C) { WaitChain: []deadlockhistory.WaitChainItem{ { TryLockTxn: 201, - AllSQLs: []string{}, + AllSQLDigests: []string{}, TxnHoldingLock: 202, }, { TryLockTxn: 202, - AllSQLs: []string{"sql1", "sql2, sql3"}, + AllSQLDigests: []string{"sql1", "sql2, sql3"}, TxnHoldingLock: 203, }, { diff --git a/infoschema/infoschema_test.go b/infoschema/infoschema_test.go index 87276ef1452b9..61f34032942fe 100644 --- a/infoschema/infoschema_test.go +++ b/infoschema/infoschema_test.go @@ -298,6 +298,7 @@ func (*testSuite) TestInfoTables(c *C) { "COLLATION_CHARACTER_SET_APPLICABILITY", "PROCESSLIST", "TIDB_TRX", + "DEADLOCKS", } for _, t := range infoTables { tb, err1 := is.TableByName(util.InformationSchemaName, model.NewCIStr(t)) diff --git a/infoschema/tables.go b/infoschema/tables.go index fec9378a491b3..d4851c245d016 100644 --- a/infoschema/tables.go +++ b/infoschema/tables.go @@ -1349,26 +1349,27 @@ var tableClientErrorsSummaryByHostCols = []columnInfo{ var tableTiDBTrxCols = []columnInfo{ {name: "ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.PriKeyFlag | mysql.NotNullFlag | mysql.UnsignedFlag}, - {name: "START_TIME", tp: mysql.TypeTimestamp, size: 26, comment: "Start time of the transaction"}, - {name: "DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the sql the transaction are currently running"}, + {name: "START_TIME", tp: mysql.TypeTimestamp, decimal: 6, size: 26, comment: "Start time of the transaction"}, + {name: "CURRENT_SQL_DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "Digest of the sql the transaction are currently running"}, {name: "STATE", tp: mysql.TypeEnum, enumElems: txninfo.TxnRunningStateStrs, comment: "Current running state of the transaction"}, - {name: "WAITING_START_TIME", tp: mysql.TypeTimestamp, size: 26, comment: "Current lock waiting's start time"}, + {name: "WAITING_START_TIME", tp: mysql.TypeTimestamp, decimal: 6, size: 26, comment: "Current lock waiting's start time"}, {name: "LEN", tp: mysql.TypeLonglong, size: 64, comment: "How many entries are in MemDB"}, {name: "SIZE", tp: mysql.TypeLonglong, size: 64, comment: "MemDB used memory"}, {name: "SESSION_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.UnsignedFlag, comment: "Which session this transaction belongs to"}, {name: "USER", tp: mysql.TypeVarchar, size: 16, comment: "The user who open this session"}, {name: "DB", tp: mysql.TypeVarchar, size: 64, comment: "The schema this transaction works on"}, + {name: "ALL_SQL_DIGESTS", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "A list of the digests of SQL statements that the transaction has executed"}, } var tableDeadlocksCols = []columnInfo{ - {name: "DEADLOCK_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The ID to dinstinguish different deadlock events"}, + {name: "DEADLOCK_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The ID to distinguish different deadlock events"}, {name: "OCCUR_TIME", tp: mysql.TypeTimestamp, decimal: 6, size: 26, comment: "The physical time when the deadlock occurs"}, {name: "RETRYABLE", tp: mysql.TypeTiny, size: 1, flag: mysql.NotNullFlag, comment: "Whether the deadlock is retryable. Retryable deadlocks are usually not reported to the client"}, - {name: "TRY_LOCK_TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The transaction ID (start ts) of the transaction that's trying to acquire the lock"}, + {name: "TRY_LOCK_TRX_ID", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "The transaction ID (start ts) of the transaction that's trying to acquire the lock"}, {name: "CURRENT_SQL_DIGEST", tp: mysql.TypeVarchar, size: 64, comment: "The digest of the SQL that's being blocked"}, {name: "KEY", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "The key on which a transaction is waiting for another"}, - {name: "ALL_SQLS", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "A list of the digests of SQL statements that the transaction has executed"}, - {name: "TRX_HOLDING_LOCK", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag, comment: "The transaction ID (start ts) of the transaction that's currently holding the lock"}, + {name: "ALL_SQL_DIGESTS", tp: mysql.TypeBlob, size: types.UnspecifiedLength, comment: "A list of the digests of SQL statements that the transaction has executed"}, + {name: "TRX_HOLDING_LOCK", tp: mysql.TypeLonglong, size: 21, flag: mysql.NotNullFlag | mysql.UnsignedFlag, comment: "The transaction ID (start ts) of the transaction that's currently holding the lock"}, } var tableDataLockWaitsCols = []columnInfo{ diff --git a/infoschema/tables_test.go b/infoschema/tables_test.go index 65b5a37a4ba2a..33394e57a204c 100644 --- a/infoschema/tables_test.go +++ b/infoschema/tables_test.go @@ -23,6 +23,7 @@ import ( "runtime" "strings" "time" + "unsafe" "github.com/gorilla/mux" . "github.com/pingcap/check" @@ -1524,7 +1525,7 @@ func (s *testTableSuite) TestInfoschemaClientErrors(c *C) { func (s *testTableSuite) TestTrx(c *C) { tk := s.newTestKitWithRoot(c) _, digest := parser.NormalizeDigest("select * from trx for update;") - sm := &mockSessionManager{nil, make([]*txninfo.TxnInfo, 1)} + sm := &mockSessionManager{nil, make([]*txninfo.TxnInfo, 2)} sm.txnInfo[0] = &txninfo.TxnInfo{ StartTS: 424768545227014155, CurrentSQLDigest: digest.String(), @@ -1536,10 +1537,21 @@ func (s *testTableSuite) TestTrx(c *C) { Username: "root", CurrentDB: "test", } + blockTime2 := time.Date(2021, 05, 20, 13, 18, 30, 123456000, time.UTC) + sm.txnInfo[1] = &txninfo.TxnInfo{ + StartTS: 425070846483628033, + CurrentSQLDigest: "", + AllSQLDigests: []string{"sql1", "sql2"}, + State: txninfo.TxnLockWaiting, + BlockStartTime: unsafe.Pointer(&blockTime2), + ConnectionID: 10, + Username: "user1", + CurrentDB: "db1", + } tk.Se.SetSessionManager(sm) - tk.MustQuery("select * from information_schema.TIDB_TRX;").Check( - testkit.Rows("424768545227014155 2021-05-07 12:56:48 " + digest.String() + " Normal 1 19 2 root test"), - ) + tk.MustQuery("select * from information_schema.TIDB_TRX;").Check(testkit.Rows( + "424768545227014155 2021-05-07 04:56:48.001000 "+digest.String()+" Normal 1 19 2 root test []", + "425070846483628033 2021-05-20 13:16:35.778000 LockWaiting 2021-05-20 13:18:30.123456 0 0 10 user1 db1 [sql1, sql2]")) } func (s *testTableSuite) TestInfoschemaDeadlockPrivilege(c *C) { diff --git a/session/session.go b/session/session.go index bed1e6e2f7ae3..03860c6320b2e 100644 --- a/session/session.go +++ b/session/session.go @@ -450,7 +450,6 @@ func (s *session) TxnInfo() *txninfo.TxnInfo { return nil } processInfo := s.ShowProcess() - txnInfo.CurrentSQLDigest = processInfo.Digest txnInfo.ConnectionID = processInfo.ID txnInfo.Username = processInfo.User txnInfo.CurrentDB = processInfo.DB @@ -1502,6 +1501,9 @@ func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlex // Uncorrelated subqueries will execute once when building plan, so we reset process info before building plan. cmd32 := atomic.LoadUint32(&s.GetSessionVars().CommandValue) s.SetProcessInfo(stmtNode.Text(), time.Now(), byte(cmd32), 0) + _, digest := s.sessionVars.StmtCtx.SQLDigest() + s.txn.onStmtStart(digest.String()) + defer s.txn.onStmtEnd() // Transform abstract syntax tree to a physical plan(stored in executor.ExecStmt). compiler := executor.Compiler{Ctx: s} @@ -1873,10 +1875,15 @@ func (s *session) ExecutePreparedStmt(ctx context.Context, stmtID uint32, args [ if err != nil { return nil, err } + s.txn.onStmtStart(preparedStmt.SQLDigest.String()) + var rs sqlexec.RecordSet if ok { - return s.cachedPlanExec(ctx, stmtID, preparedStmt, args) + rs, err = s.cachedPlanExec(ctx, stmtID, preparedStmt, args) + } else { + rs, err = s.preparedStmtExec(ctx, stmtID, preparedStmt, args) } - return s.preparedStmtExec(ctx, stmtID, preparedStmt, args) + s.txn.onStmtEnd() + return rs, err } func (s *session) DropPreparedStmt(stmtID uint32) error { diff --git a/session/session_test.go b/session/session_test.go index 88f6a48b83a45..f68f75f133da9 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -19,6 +19,7 @@ import ( "fmt" "os" "path" + "strconv" "strings" "sync" "sync/atomic" @@ -42,7 +43,7 @@ import ( plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/privilege/privileges" "github.com/pingcap/tidb/session" - txninfo "github.com/pingcap/tidb/session/txninfo" + "github.com/pingcap/tidb/session/txninfo" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/variable" @@ -82,7 +83,7 @@ var _ = SerialSuites(&testSessionSerialSuite{}) var _ = SerialSuites(&testBackupRestoreSuite{}) var _ = Suite(&testClusteredSuite{}) var _ = SerialSuites(&testClusteredSerialSuite{}) -var _ = SerialSuites(&testTxnStateSuite{}) +var _ = SerialSuites(&testTxnStateSerialSuite{}) type testSessionSuiteBase struct { cluster cluster.Cluster @@ -4178,33 +4179,94 @@ func (s *testSessionSuite3) TestGlobalTemporaryTable(c *C) { tk.MustQuery("select * from g_tmp").Check(testkit.Rows()) } -type testTxnStateSuite struct { +type testTxnStateSerialSuite struct { testSessionSuiteBase } -func (s *testTxnStateSuite) TestBasic(c *C) { +func (s *testTxnStateSerialSuite) TestBasic(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t(a int);") tk.MustExec("insert into t(a) values (1);") info := tk.Se.TxnInfo() c.Assert(info, IsNil) + tk.MustExec("begin pessimistic;") - tk.MustExec("select * from t for update;") + startTSStr := tk.MustQuery("select @@tidb_current_ts;").Rows()[0][0].(string) + startTS, err := strconv.ParseUint(startTSStr, 10, 64) + c.Assert(err, IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock", "pause"), IsNil) + ch := make(chan interface{}) + go func() { + tk.MustExec("select * from t for update;") + ch <- nil + }() + time.Sleep(100 * time.Millisecond) info = tk.Se.TxnInfo() _, expectedDigest := parser.NormalizeDigest("select * from t for update;") c.Assert(info.CurrentSQLDigest, Equals, expectedDigest.String()) + c.Assert(info.State, Equals, txninfo.TxnLockWaiting) + c.Assert((*time.Time)(info.BlockStartTime), NotNil) + c.Assert(info.StartTS, Equals, startTS) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock"), IsNil) + <-ch + + info = tk.Se.TxnInfo() + c.Assert(info.CurrentSQLDigest, Equals, "") c.Assert(info.State, Equals, txninfo.TxnRunningNormal) - c.Assert(info.BlockStartTime, IsNil) + c.Assert((*time.Time)(info.BlockStartTime), IsNil) + c.Assert(info.StartTS, Equals, startTS) + _, beginDigest := parser.NormalizeDigest("begin pessimistic;") + _, selectTSDigest := parser.NormalizeDigest("select @@tidb_current_ts;") + c.Assert(info.AllSQLDigests, DeepEquals, []string{beginDigest.String(), selectTSDigest.String(), expectedDigest.String()}) + // len and size will be covered in TestLenAndSize c.Assert(info.ConnectionID, Equals, tk.Se.GetSessionVars().ConnectionID) c.Assert(info.Username, Equals, "") c.Assert(info.CurrentDB, Equals, "test") - tk.MustExec("commit;") + c.Assert(info.StartTS, Equals, startTS) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "pause"), IsNil) + go func() { + tk.MustExec("commit;") + ch <- nil + }() + time.Sleep(100 * time.Millisecond) + _, commitDigest := parser.NormalizeDigest("commit;") + info = tk.Se.TxnInfo() + c.Assert(info.CurrentSQLDigest, Equals, commitDigest.String()) + c.Assert(info.State, Equals, txninfo.TxnCommitting) + c.Assert(info.AllSQLDigests, DeepEquals, []string{beginDigest.String(), selectTSDigest.String(), expectedDigest.String(), commitDigest.String()}) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil) + <-ch + info = tk.Se.TxnInfo() + c.Assert(info, IsNil) + + // Test autocommit transaction + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "pause"), IsNil) + go func() { + tk.MustExec("insert into t values (2)") + ch <- nil + }() + time.Sleep(100 * time.Millisecond) + info = tk.Se.TxnInfo() + _, expectedDigest = parser.NormalizeDigest("insert into t values (2)") + c.Assert(info.CurrentSQLDigest, Equals, expectedDigest.String()) + c.Assert(info.State, Equals, txninfo.TxnCommitting) + c.Assert((*time.Time)(info.BlockStartTime), IsNil) + c.Assert(info.StartTS, Greater, startTS) + c.Assert(len(info.AllSQLDigests), Equals, 1) + c.Assert(info.AllSQLDigests[0], Equals, expectedDigest.String()) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil) + <-ch info = tk.Se.TxnInfo() c.Assert(info, IsNil) } -func (s *testTxnStateSuite) TestEntriesCountAndSize(c *C) { +func (s *testTxnStateSerialSuite) TestEntriesCountAndSize(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t(a int);") tk.MustExec("begin pessimistic;") @@ -4219,7 +4281,7 @@ func (s *testTxnStateSuite) TestEntriesCountAndSize(c *C) { tk.MustExec("commit;") } -func (s *testTxnStateSuite) TestBlocked(c *C) { +func (s *testTxnStateSerialSuite) TestBlocked(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk2 := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t(a int);") @@ -4237,7 +4299,7 @@ func (s *testTxnStateSuite) TestBlocked(c *C) { tk.MustExec("commit;") } -func (s *testTxnStateSuite) TestCommitting(c *C) { +func (s *testTxnStateSerialSuite) TestCommitting(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk2 := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t(a int);") @@ -4249,8 +4311,10 @@ func (s *testTxnStateSuite) TestCommitting(c *C) { tk2.MustExec("begin pessimistic") c.Assert(tk2.Se.TxnInfo(), NotNil) tk2.MustExec("select * from t where a = 2 for update;") - failpoint.Enable("github.com/pingcap/tidb/session/mockSlowCommit", "sleep(200)") - defer failpoint.Disable("github.com/pingcap/tidb/session/mockSlowCommit") + c.Assert(failpoint.Enable("github.com/pingcap/tidb/session/mockSlowCommit", "sleep(200)"), IsNil) + defer func() { + c.Assert(failpoint.Disable("github.com/pingcap/tidb/session/mockSlowCommit"), IsNil) + }() tk2.MustExec("commit;") ch <- struct{}{} }() @@ -4260,7 +4324,7 @@ func (s *testTxnStateSuite) TestCommitting(c *C) { <-ch } -func (s *testTxnStateSuite) TestRollbacking(c *C) { +func (s *testTxnStateSerialSuite) TestRollbacking(c *C) { tk := testkit.NewTestKitWithInit(c, s.store) tk.MustExec("create table t(a int);") tk.MustExec("insert into t(a) values (1), (2);") @@ -4278,6 +4342,125 @@ func (s *testTxnStateSuite) TestRollbacking(c *C) { <-ch } +func (s *testTxnStateSerialSuite) TestTxnInfoWithPreparedStmt(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t(a int)") + tk.MustExec("prepare s1 from 'insert into t values (?)'") + tk.MustExec("set @v = 1") + + tk.MustExec("begin pessimistic") + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock", "pause"), IsNil) + ch := make(chan interface{}) + go func() { + tk.MustExec("execute s1 using @v") + ch <- nil + }() + time.Sleep(100 * time.Millisecond) + info := tk.Se.TxnInfo() + _, expectDigest := parser.NormalizeDigest("insert into t values (?)") + c.Assert(info.CurrentSQLDigest, Equals, expectDigest.String()) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock"), IsNil) + <-ch + info = tk.Se.TxnInfo() + c.Assert(info.CurrentSQLDigest, Equals, "") + _, beginDigest := parser.NormalizeDigest("begin pessimistic") + c.Assert(info.AllSQLDigests, DeepEquals, []string{beginDigest.String(), expectDigest.String()}) + + tk.MustExec("rollback") +} + +func (s *testTxnStateSerialSuite) TestTxnInfoWithScalarSubquery(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t (a int, b int)") + tk.MustExec("insert into t values (1, 10), (2, 1)") + + tk.MustExec("begin pessimistic") + _, beginDigest := parser.NormalizeDigest("begin pessimistic") + tk.MustExec("select * from t where a = (select b from t where a = 2)") + _, s1Digest := parser.NormalizeDigest("select * from t where a = (select b from t where a = 2)") + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock", "pause"), IsNil) + ch := make(chan interface{}) + go func() { + tk.MustExec("update t set b = b + 1 where a = (select b from t where a = 2)") + ch <- nil + }() + _, s2Digest := parser.NormalizeDigest("update t set b = b + 1 where a = (select b from t where a = 1)") + time.Sleep(100 * time.Millisecond) + info := tk.Se.TxnInfo() + c.Assert(info.CurrentSQLDigest, Equals, s2Digest.String()) + c.Assert(info.AllSQLDigests, DeepEquals, []string{beginDigest.String(), s1Digest.String(), s2Digest.String()}) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock"), IsNil) + <-ch + tk.MustExec("rollback") +} + +func (s *testTxnStateSerialSuite) TestTxnInfoWithPSProtocol(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t (a int primary key)") + + // Test autocommit transaction + + idInsert, _, _, err := tk.Se.PrepareStmt("insert into t values (?)") + c.Assert(err, IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePrewrite", "pause"), IsNil) + ch := make(chan interface{}) + go func() { + _, err := tk.Se.ExecutePreparedStmt(context.Background(), idInsert, types.MakeDatums(1)) + c.Assert(err, IsNil) + ch <- nil + }() + time.Sleep(100 * time.Millisecond) + _, digest := parser.NormalizeDigest("insert into t values (1)") + info := tk.Se.TxnInfo() + c.Assert(info, NotNil) + c.Assert(info.StartTS, Greater, uint64(0)) + c.Assert(info.State, Equals, txninfo.TxnCommitting) + c.Assert(info.CurrentSQLDigest, Equals, digest.String()) + c.Assert(info.AllSQLDigests, DeepEquals, []string{digest.String()}) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePrewrite"), IsNil) + <-ch + info = tk.Se.TxnInfo() + c.Assert(info, IsNil) + + // Test non-autocommit transaction + + id1, _, _, err := tk.Se.PrepareStmt("select * from t where a = ?") + c.Assert(err, IsNil) + _, digest1 := parser.NormalizeDigest("select * from t where a = ?") + id2, _, _, err := tk.Se.PrepareStmt("update t set a = a + 1 where a = ?") + c.Assert(err, IsNil) + _, digest2 := parser.NormalizeDigest("update t set a = a + 1 where a = ?") + + tk.MustExec("begin pessimistic") + + _, err = tk.Se.ExecutePreparedStmt(context.Background(), id1, types.MakeDatums(1)) + c.Assert(err, IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock", "pause"), IsNil) + go func() { + _, err := tk.Se.ExecutePreparedStmt(context.Background(), id2, types.MakeDatums(1)) + c.Assert(err, IsNil) + ch <- nil + }() + time.Sleep(100 * time.Millisecond) + info = tk.Se.TxnInfo() + c.Assert(info.StartTS, Greater, uint64(0)) + c.Assert(info.CurrentSQLDigest, Equals, digest2.String()) + c.Assert(info.State, Equals, txninfo.TxnLockWaiting) + c.Assert((*time.Time)(info.BlockStartTime), NotNil) + _, beginDigest := parser.NormalizeDigest("begin pessimistic") + c.Assert(info.AllSQLDigests, DeepEquals, []string{beginDigest.String(), digest1.String(), digest2.String()}) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/beforePessimisticLock"), IsNil) + <-ch + tk.MustExec("rollback") +} + func (s *testSessionSuite) TestReadDMLBatchSize(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("set global tidb_dml_batch_size=1000") diff --git a/session/txn.go b/session/txn.go index df4f2d7a62bed..bb00265044ddf 100644 --- a/session/txn.go +++ b/session/txn.go @@ -63,15 +63,9 @@ type LazyTxn struct { // we need these fields because kv.Transaction provides no thread safety promise // but we hope getting TxnInfo is a thread safe op - infoStartTS uint64 - // current executing state - State txninfo.TxnRunningState - // last trying to block start time - blockStartTime unsafe.Pointer // *time.Time, cannot use atomic.Value here because it is possible to be nil - // how many entries are there in the memBuffer, should be equal to self.(kv.Transaction).Len() - EntriesCount uint64 - // how many memory space do the entries in the memBuffer take, should be equal to self.(kv.Transaction).Size() - EntriesSize uint64 + // txnInfo provides information about the transaction in a thread-safe way. To atomically replace the struct, + // it's stored as an unsafe.Pointer. + txnInfo unsafe.Pointer } // GetTableInfo returns the cached index name. @@ -86,9 +80,9 @@ func (txn *LazyTxn) CacheTableInfo(id int64, info *model.TableInfo) { func (txn *LazyTxn) init() { txn.mutations = make(map[int64]*binlog.TableMutation) - atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) - atomic.StoreUint64(&txn.EntriesCount, 0) - atomic.StoreUint64(&txn.EntriesSize, 0) + txn.storeTxnInfo(&txninfo.TxnInfo{ + State: txninfo.TxnRunningNormal, + }) } func (txn *LazyTxn) initStmtBuf() { @@ -124,8 +118,37 @@ func (txn *LazyTxn) cleanupStmtBuf() { buf := txn.Transaction.GetMemBuffer() buf.Cleanup(txn.stagingHandle) txn.initCnt = buf.Len() - atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + + txnInfo := txn.getTxnInfo() + atomic.StoreUint64(&txnInfo.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txnInfo.EntriesSize, uint64(txn.Transaction.Size())) +} + +func (txn *LazyTxn) storeTxnInfo(info *txninfo.TxnInfo) { + atomic.StorePointer(&txn.txnInfo, unsafe.Pointer(info)) +} + +func (txn *LazyTxn) recreateTxnInfo( + startTS uint64, + state txninfo.TxnRunningState, + entriesCount, + entriesSize uint64, + currentSQLDigest string, + allSQLDigests []string, +) { + info := &txninfo.TxnInfo{ + StartTS: startTS, + State: state, + EntriesCount: entriesCount, + EntriesSize: entriesSize, + CurrentSQLDigest: currentSQLDigest, + AllSQLDigests: allSQLDigests, + } + txn.storeTxnInfo(info) +} + +func (txn *LazyTxn) getTxnInfo() *txninfo.TxnInfo { + return (*txninfo.TxnInfo)(atomic.LoadPointer(&txn.txnInfo)) } // Size implements the MemBuffer interface. @@ -181,20 +204,20 @@ func (txn *LazyTxn) GoString() string { func (txn *LazyTxn) changeInvalidToValid(kvTxn kv.Transaction) { txn.Transaction = kvTxn - atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) - atomic.StoreUint64(&txn.infoStartTS, kvTxn.StartTS()) txn.initStmtBuf() - atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + txn.recreateTxnInfo( + kvTxn.StartTS(), + txninfo.TxnRunningNormal, + uint64(txn.Transaction.Len()), + uint64(txn.Transaction.Size()), + "", + nil) txn.txnFuture = nil } func (txn *LazyTxn) changeInvalidToPending(future *txnFuture) { txn.Transaction = nil txn.txnFuture = future - atomic.StoreUint64(&txn.infoStartTS, 0) - atomic.StoreUint64(&txn.EntriesCount, uint64(0)) - atomic.StoreUint64(&txn.EntriesSize, uint64(0)) } func (txn *LazyTxn) changePendingToValid(ctx context.Context) error { @@ -212,11 +235,17 @@ func (txn *LazyTxn) changePendingToValid(ctx context.Context) error { return err } txn.Transaction = t - atomic.StoreInt32(&txn.State, txninfo.TxnRunningNormal) - atomic.StoreUint64(&txn.infoStartTS, t.StartTS()) txn.initStmtBuf() - atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + + // The txnInfo may already recorded the first statement (usually "begin") when it's pending, so keep them. + txnInfo := txn.getTxnInfo() + txn.recreateTxnInfo( + t.StartTS(), + txninfo.TxnRunningNormal, + uint64(txn.Transaction.Len()), + uint64(txn.Transaction.Size()), + txnInfo.CurrentSQLDigest, + txnInfo.AllSQLDigests) return nil } @@ -227,9 +256,36 @@ func (txn *LazyTxn) changeToInvalid() { txn.stagingHandle = kv.InvalidStagingHandle txn.Transaction = nil txn.txnFuture = nil - atomic.StoreUint64(&txn.infoStartTS, 0) - atomic.StoreUint64(&txn.EntriesCount, 0) - atomic.StoreUint64(&txn.EntriesSize, 0) + + txn.recreateTxnInfo( + 0, + txninfo.TxnRunningNormal, + 0, + 0, + "", + nil) +} + +func (txn *LazyTxn) onStmtStart(currentSQLDigest string) { + if len(currentSQLDigest) == 0 { + return + } + + info := txn.getTxnInfo().ShallowClone() + info.CurrentSQLDigest = currentSQLDigest + // Keeps at most 50 history sqls to avoid consuming too much memory. + const maxTransactionStmtHistory int = 50 + if len(info.AllSQLDigests) < maxTransactionStmtHistory { + info.AllSQLDigests = append(info.AllSQLDigests, currentSQLDigest) + } + + txn.storeTxnInfo(info) +} + +func (txn *LazyTxn) onStmtEnd() { + info := txn.getTxnInfo().ShallowClone() + info.CurrentSQLDigest = "" + txn.storeTxnInfo(info) } var hasMockAutoIncIDRetry = int64(0) @@ -269,7 +325,7 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { return errors.Trace(kv.ErrInvalidTxn) } - atomic.StoreInt32(&txn.State, txninfo.TxnCommitting) + atomic.StoreInt32(&txn.getTxnInfo().State, txninfo.TxnCommitting) failpoint.Inject("mockSlowCommit", func(_ failpoint.Value) {}) @@ -301,7 +357,7 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { // Rollback overrides the Transaction interface. func (txn *LazyTxn) Rollback() error { defer txn.reset() - atomic.StoreInt32(&txn.State, txninfo.TxnRollingBack) + atomic.StoreInt32(&txn.getTxnInfo().State, txninfo.TxnRollingBack) // mockSlowRollback is used to mock a rollback which takes a long time failpoint.Inject("mockSlowRollback", func(_ failpoint.Value) {}) return txn.Transaction.Rollback() @@ -309,15 +365,15 @@ func (txn *LazyTxn) Rollback() error { // LockKeys Wrap the inner transaction's `LockKeys` to record the status func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error { - originState := atomic.LoadInt32(&txn.State) - atomic.StoreInt32(&txn.State, txninfo.TxnLockWaiting) + txnInfo := txn.getTxnInfo() + originState := atomic.SwapInt32(&txnInfo.State, txninfo.TxnLockWaiting) t := time.Now() - atomic.StorePointer(&txn.blockStartTime, unsafe.Pointer(&t)) + atomic.StorePointer(&txnInfo.BlockStartTime, unsafe.Pointer(&t)) err := txn.Transaction.LockKeys(ctx, lockCtx, keys...) - atomic.StorePointer(&txn.blockStartTime, unsafe.Pointer(nil)) - atomic.StoreInt32(&txn.State, originState) - atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + atomic.StorePointer(&txnInfo.BlockStartTime, unsafe.Pointer(nil)) + atomic.StoreInt32(&txnInfo.State, originState) + atomic.StoreUint64(&txnInfo.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txnInfo.EntriesSize, uint64(txn.Transaction.Size())) return err } @@ -376,17 +432,11 @@ func keyNeedToLock(k, v []byte, flags kv.KeyFlags) bool { // Info dump the TxnState to Datum for displaying in `TIDB_TRX` // This function is supposed to be thread safe func (txn *LazyTxn) Info() *txninfo.TxnInfo { - startTs := atomic.LoadUint64(&txn.infoStartTS) - if startTs == 0 { + info := txn.getTxnInfo().ShallowClone() + if info.StartTS == 0 { return nil } - return &txninfo.TxnInfo{ - StartTS: startTs, - State: atomic.LoadInt32(&txn.State), - BlockStartTime: (*time.Time)(atomic.LoadPointer(&txn.blockStartTime)), - EntriesCount: atomic.LoadUint64(&txn.EntriesCount), - EntriesSize: atomic.LoadUint64(&txn.EntriesSize), - } + return info } // UpdateEntriesCountAndSize updates the EntriesCount and EntriesSize @@ -394,8 +444,9 @@ func (txn *LazyTxn) Info() *txninfo.TxnInfo { // txn.Transaction can be changed during this function's execution if running parallel. func (txn *LazyTxn) UpdateEntriesCountAndSize() { if txn.Valid() { - atomic.StoreUint64(&txn.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.EntriesSize, uint64(txn.Transaction.Size())) + txnInfo := txn.getTxnInfo() + atomic.StoreUint64(&txnInfo.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txnInfo.EntriesSize, uint64(txn.Transaction.Size())) } } diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go index 77a2d8c90cd05..acc52e985f0f9 100644 --- a/session/txninfo/txn_info.go +++ b/session/txninfo/txn_info.go @@ -14,7 +14,10 @@ package txninfo import ( + "strings" + "sync/atomic" "time" + "unsafe" "github.com/pingcap/parser/mysql" "github.com/pingcap/tidb/store/tikv/oracle" @@ -43,19 +46,28 @@ var TxnRunningStateStrs = []string{ // TxnInfo is information about a running transaction // This is supposed to be the datasource of `TIDB_TRX` in infoschema type TxnInfo struct { + // The following fields are immutable and can be safely read across threads. + StartTS uint64 - // digest of SQL current running + // Digest of SQL currently running CurrentSQLDigest string - // current executing State + // Digests of all SQLs executed in the transaction. + AllSQLDigests []string + + // The following fields are mutable and needs to be read or written by atomic operations. But since only the + // transaction's thread can modify its value, it's ok for the transaction's thread to read it without atomic + // operations. + + // Current execution state of the transaction. State TxnRunningState - // last trying to block start time - BlockStartTime *time.Time + // Last trying to block start time. Invalid if State is not TxnLockWaiting. It's an unsafe pointer to time.Time or nil. + BlockStartTime unsafe.Pointer // How many entries are in MemDB EntriesCount uint64 // MemDB used memory EntriesSize uint64 - // the following fields will be filled in `session` instead of `LazyTxn` + // The following fields will be filled in `session` instead of `LazyTxn` // Which session this transaction belongs to ConnectionID uint64 @@ -65,24 +77,54 @@ type TxnInfo struct { CurrentDB string } -// ToDatum Converts the `TxnInfo` to `Datum` to show in the `TIDB_TRX` table +// ShallowClone shallow clones the TxnInfo. It's safe to call concurrently with the transaction. +// Note that this function doesn't do deep copy and some fields of the result may be unsafe to write. Use it at your own +// risk. +func (info *TxnInfo) ShallowClone() *TxnInfo { + return &TxnInfo{ + StartTS: info.StartTS, + CurrentSQLDigest: info.CurrentSQLDigest, + AllSQLDigests: info.AllSQLDigests, + State: atomic.LoadInt32(&info.State), + BlockStartTime: atomic.LoadPointer(&info.BlockStartTime), + EntriesCount: atomic.LoadUint64(&info.EntriesCount), + EntriesSize: atomic.LoadUint64(&info.EntriesSize), + ConnectionID: info.ConnectionID, + Username: info.Username, + CurrentDB: info.CurrentDB, + } +} + +// ToDatum Converts the `TxnInfo` to `Datum` to show in the `TIDB_TRX` table. func (info *TxnInfo) ToDatum() []types.Datum { - humanReadableStartTime := time.Unix(0, oracle.ExtractPhysical(info.StartTS)*1e6) + // TODO: The timezone represented to the user is not correct and it will be always UTC time. + humanReadableStartTime := time.Unix(0, oracle.ExtractPhysical(info.StartTS)*1e6).UTC() + + var currentDigest interface{} + if len(info.CurrentSQLDigest) != 0 { + currentDigest = info.CurrentSQLDigest + } + var blockStartTime interface{} - if info.BlockStartTime == nil { + if t := (*time.Time)(atomic.LoadPointer(&info.BlockStartTime)); t == nil { blockStartTime = nil } else { - blockStartTime = types.NewTime(types.FromGoTime(*info.BlockStartTime), mysql.TypeTimestamp, 0) + blockStartTime = types.NewTime(types.FromGoTime(*t), mysql.TypeTimestamp, types.MaxFsp) } + e, err := types.ParseEnumValue(TxnRunningStateStrs, uint64(info.State+1)) if err != nil { panic("this should never happen") } + + allSQLs := "[" + strings.Join(info.AllSQLDigests, ", ") + "]" + state := types.NewMysqlEnumDatum(e) + datums := types.MakeDatums( info.StartTS, - types.NewTime(types.FromGoTime(humanReadableStartTime), mysql.TypeTimestamp, 0), - info.CurrentSQLDigest, + types.NewTime(types.FromGoTime(humanReadableStartTime), mysql.TypeTimestamp, types.MaxFsp), + currentDigest, ) datums = append(datums, state) datums = append(datums, types.MakeDatums( @@ -91,6 +133,7 @@ func (info *TxnInfo) ToDatum() []types.Datum { info.EntriesSize, info.ConnectionID, info.Username, - info.CurrentDB)...) + info.CurrentDB, + allSQLs)...) return datums } diff --git a/util/deadlockhistory/deadlock_history.go b/util/deadlockhistory/deadlock_history.go index c219442cf5bf1..ee2fb496a2a58 100644 --- a/util/deadlockhistory/deadlock_history.go +++ b/util/deadlockhistory/deadlock_history.go @@ -32,7 +32,7 @@ type WaitChainItem struct { TryLockTxn uint64 SQLDigest string Key []byte - AllSQLs []string + AllSQLDigests []string TxnHoldingLock uint64 } @@ -149,8 +149,8 @@ func (d *DeadlockHistory) GetAllDatum() [][]types.Datum { } row[6] = nil - if item.AllSQLs != nil { - row[6] = "[" + strings.Join(item.AllSQLs, ", ") + "]" + if item.AllSQLDigests != nil { + row[6] = "[" + strings.Join(item.AllSQLDigests, ", ") + "]" } row[7] = item.TxnHoldingLock @@ -185,7 +185,7 @@ func ErrDeadlockToDeadlockRecord(dl *tikverr.ErrDeadlock) *DeadlockRecord { TryLockTxn: rawItem.Txn, SQLDigest: hex.EncodeToString(sqlDigest), Key: rawItem.Key, - AllSQLs: nil, + AllSQLDigests: nil, TxnHoldingLock: rawItem.WaitForTxn, }) } diff --git a/util/deadlockhistory/deadlock_history_test.go b/util/deadlockhistory/deadlock_history_test.go index dd9428a9f550a..398d2943ab996 100644 --- a/util/deadlockhistory/deadlock_history_test.go +++ b/util/deadlockhistory/deadlock_history_test.go @@ -148,7 +148,7 @@ func (s *testDeadlockHistorySuite) TestGetDatum(c *C) { TryLockTxn: 101, SQLDigest: "sql1", Key: []byte("k1"), - AllSQLs: []string{"sql1", "sql2"}, + AllSQLDigests: []string{"sql1", "sql2"}, TxnHoldingLock: 102, }, // It should work even some information are missing. @@ -164,12 +164,12 @@ func (s *testDeadlockHistorySuite) TestGetDatum(c *C) { WaitChain: []WaitChainItem{ { TryLockTxn: 201, - AllSQLs: []string{}, + AllSQLDigests: []string{}, TxnHoldingLock: 202, }, { TryLockTxn: 202, - AllSQLs: []string{"sql1"}, + AllSQLDigests: []string{"sql1"}, TxnHoldingLock: 201, }, }, @@ -201,7 +201,7 @@ func (s *testDeadlockHistorySuite) TestGetDatum(c *C) { c.Assert(res[0][3].GetValue(), Equals, uint64(101)) // TRY_LOCK_TRX_ID c.Assert(res[0][4].GetValue(), Equals, "sql1") // SQL_DIGEST c.Assert(res[0][5].GetValue(), Equals, "6B31") // KEY - c.Assert(res[0][6].GetValue(), Equals, "[sql1, sql2]") // ALL_SQLS + c.Assert(res[0][6].GetValue(), Equals, "[sql1, sql2]") // ALL_SQL_DIGESTS c.Assert(res[0][7].GetValue(), Equals, uint64(102)) // TRX_HOLDING_LOCK c.Assert(res[1][0].GetValue(), Equals, uint64(1)) // ID @@ -210,21 +210,21 @@ func (s *testDeadlockHistorySuite) TestGetDatum(c *C) { c.Assert(res[1][3].GetValue(), Equals, uint64(102)) // TRY_LOCK_TRX_ID c.Assert(res[1][4].GetValue(), Equals, nil) // SQL_DIGEST c.Assert(res[1][5].GetValue(), Equals, nil) // KEY - c.Assert(res[1][6].GetValue(), Equals, nil) // ALL_SQLS + c.Assert(res[1][6].GetValue(), Equals, nil) // ALL_SQL_DIGESTS c.Assert(res[1][7].GetValue(), Equals, uint64(101)) // TRX_HOLDING_LOCK c.Assert(res[2][0].GetValue(), Equals, uint64(2)) // ID c.Assert(toGoTime(res[2][1]), Equals, time2) // OCCUR_TIME c.Assert(res[2][2].GetValue(), Equals, int64(1)) // RETRYABLE c.Assert(res[2][3].GetValue(), Equals, uint64(201)) // TRY_LOCK_TRX_ID - c.Assert(res[2][6].GetValue(), Equals, "[]") // ALL_SQLS + c.Assert(res[2][6].GetValue(), Equals, "[]") // ALL_SQL_DIGESTS c.Assert(res[2][7].GetValue(), Equals, uint64(202)) // TRX_HOLDING_LOCK c.Assert(res[3][0].GetValue(), Equals, uint64(2)) // ID c.Assert(toGoTime(res[3][1]), Equals, time2) // OCCUR_TIME c.Assert(res[3][2].GetValue(), Equals, int64(1)) // RETRYABLE c.Assert(res[3][3].GetValue(), Equals, uint64(202)) // TRY_LOCK_TRX_ID - c.Assert(res[3][6].GetValue(), Equals, "[sql1]") // ALL_SQLS + c.Assert(res[3][6].GetValue(), Equals, "[sql1]") // ALL_SQL_DIGESTS c.Assert(res[3][7].GetValue(), Equals, uint64(201)) // TRX_HOLDING_LOCK }