From 43cd7b40453d5c7e7458c3b992442ec898a0b61e Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 27 May 2021 12:04:34 +0800 Subject: [PATCH] Address comment --- session/txn.go | 58 ++++++++++++++++++------------------- session/txninfo/txn_info.go | 4 +-- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/session/txn.go b/session/txn.go index 9615a84857fd4..bb00265044ddf 100644 --- a/session/txn.go +++ b/session/txn.go @@ -63,12 +63,9 @@ type LazyTxn struct { // we need these fields because kv.Transaction provides no thread safety promise // but we hope getting TxnInfo is a thread safe op - // atomicTxnInfo provides information about the transaction in a thread-safe way. To atomically replace the struct, + // txnInfo provides information about the transaction in a thread-safe way. To atomically replace the struct, // it's stored as an unsafe.Pointer. - atomicTxnInfo unsafe.Pointer - // txnInfo points to the same thing as atomicTxnInfo. It's just used internally by LazyTxn to avoid casting - // atomicTxnInfo from the untyped unsafe.Pointer every time using it. - txnInfo *txninfo.TxnInfo + txnInfo unsafe.Pointer } // GetTableInfo returns the cached index name. @@ -83,10 +80,9 @@ func (txn *LazyTxn) CacheTableInfo(id int64, info *model.TableInfo) { func (txn *LazyTxn) init() { txn.mutations = make(map[int64]*binlog.TableMutation) - txn.txnInfo = &txninfo.TxnInfo{ + txn.storeTxnInfo(&txninfo.TxnInfo{ State: txninfo.TxnRunningNormal, - } - txn.storeTxnInfo(txn.txnInfo) + }) } func (txn *LazyTxn) initStmtBuf() { @@ -122,13 +118,14 @@ func (txn *LazyTxn) cleanupStmtBuf() { buf := txn.Transaction.GetMemBuffer() buf.Cleanup(txn.stagingHandle) txn.initCnt = buf.Len() - atomic.StoreUint64(&txn.txnInfo.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.txnInfo.EntriesSize, uint64(txn.Transaction.Size())) + + txnInfo := txn.getTxnInfo() + atomic.StoreUint64(&txnInfo.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txnInfo.EntriesSize, uint64(txn.Transaction.Size())) } func (txn *LazyTxn) storeTxnInfo(info *txninfo.TxnInfo) { - txn.txnInfo = info - atomic.StorePointer(&txn.atomicTxnInfo, unsafe.Pointer(info)) + atomic.StorePointer(&txn.txnInfo, unsafe.Pointer(info)) } func (txn *LazyTxn) recreateTxnInfo( @@ -150,8 +147,8 @@ func (txn *LazyTxn) recreateTxnInfo( txn.storeTxnInfo(info) } -func (txn *LazyTxn) loadTxnInfo() *txninfo.TxnInfo { - return (*txninfo.TxnInfo)(atomic.LoadPointer(&txn.atomicTxnInfo)) +func (txn *LazyTxn) getTxnInfo() *txninfo.TxnInfo { + return (*txninfo.TxnInfo)(atomic.LoadPointer(&txn.txnInfo)) } // Size implements the MemBuffer interface. @@ -241,13 +238,14 @@ func (txn *LazyTxn) changePendingToValid(ctx context.Context) error { txn.initStmtBuf() // The txnInfo may already recorded the first statement (usually "begin") when it's pending, so keep them. + txnInfo := txn.getTxnInfo() txn.recreateTxnInfo( t.StartTS(), txninfo.TxnRunningNormal, uint64(txn.Transaction.Len()), uint64(txn.Transaction.Size()), - txn.txnInfo.CurrentSQLDigest, - txn.txnInfo.AllSQLDigests) + txnInfo.CurrentSQLDigest, + txnInfo.AllSQLDigests) return nil } @@ -273,7 +271,7 @@ func (txn *LazyTxn) onStmtStart(currentSQLDigest string) { return } - info := txn.txnInfo.GetSnapInfo() + info := txn.getTxnInfo().ShallowClone() info.CurrentSQLDigest = currentSQLDigest // Keeps at most 50 history sqls to avoid consuming too much memory. const maxTransactionStmtHistory int = 50 @@ -285,7 +283,7 @@ func (txn *LazyTxn) onStmtStart(currentSQLDigest string) { } func (txn *LazyTxn) onStmtEnd() { - info := txn.txnInfo.GetSnapInfo() + info := txn.getTxnInfo().ShallowClone() info.CurrentSQLDigest = "" txn.storeTxnInfo(info) } @@ -327,7 +325,7 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { return errors.Trace(kv.ErrInvalidTxn) } - atomic.StoreInt32(&txn.txnInfo.State, txninfo.TxnCommitting) + atomic.StoreInt32(&txn.getTxnInfo().State, txninfo.TxnCommitting) failpoint.Inject("mockSlowCommit", func(_ failpoint.Value) {}) @@ -359,7 +357,7 @@ func (txn *LazyTxn) Commit(ctx context.Context) error { // Rollback overrides the Transaction interface. func (txn *LazyTxn) Rollback() error { defer txn.reset() - atomic.StoreInt32(&txn.txnInfo.State, txninfo.TxnRollingBack) + atomic.StoreInt32(&txn.getTxnInfo().State, txninfo.TxnRollingBack) // mockSlowRollback is used to mock a rollback which takes a long time failpoint.Inject("mockSlowRollback", func(_ failpoint.Value) {}) return txn.Transaction.Rollback() @@ -367,14 +365,15 @@ func (txn *LazyTxn) Rollback() error { // LockKeys Wrap the inner transaction's `LockKeys` to record the status func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error { - originState := atomic.SwapInt32(&txn.txnInfo.State, txninfo.TxnLockWaiting) + txnInfo := txn.getTxnInfo() + originState := atomic.SwapInt32(&txnInfo.State, txninfo.TxnLockWaiting) t := time.Now() - atomic.StorePointer(&txn.txnInfo.BlockStartTime, unsafe.Pointer(&t)) + atomic.StorePointer(&txnInfo.BlockStartTime, unsafe.Pointer(&t)) err := txn.Transaction.LockKeys(ctx, lockCtx, keys...) - atomic.StorePointer(&txn.txnInfo.BlockStartTime, unsafe.Pointer(nil)) - atomic.StoreInt32(&txn.txnInfo.State, originState) - atomic.StoreUint64(&txn.txnInfo.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.txnInfo.EntriesSize, uint64(txn.Transaction.Size())) + atomic.StorePointer(&txnInfo.BlockStartTime, unsafe.Pointer(nil)) + atomic.StoreInt32(&txnInfo.State, originState) + atomic.StoreUint64(&txnInfo.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txnInfo.EntriesSize, uint64(txn.Transaction.Size())) return err } @@ -433,7 +432,7 @@ func keyNeedToLock(k, v []byte, flags kv.KeyFlags) bool { // Info dump the TxnState to Datum for displaying in `TIDB_TRX` // This function is supposed to be thread safe func (txn *LazyTxn) Info() *txninfo.TxnInfo { - info := txn.loadTxnInfo().GetSnapInfo() + info := txn.getTxnInfo().ShallowClone() if info.StartTS == 0 { return nil } @@ -445,8 +444,9 @@ func (txn *LazyTxn) Info() *txninfo.TxnInfo { // txn.Transaction can be changed during this function's execution if running parallel. func (txn *LazyTxn) UpdateEntriesCountAndSize() { if txn.Valid() { - atomic.StoreUint64(&txn.txnInfo.EntriesCount, uint64(txn.Transaction.Len())) - atomic.StoreUint64(&txn.txnInfo.EntriesSize, uint64(txn.Transaction.Size())) + txnInfo := txn.getTxnInfo() + atomic.StoreUint64(&txnInfo.EntriesCount, uint64(txn.Transaction.Len())) + atomic.StoreUint64(&txnInfo.EntriesSize, uint64(txn.Transaction.Size())) } } diff --git a/session/txninfo/txn_info.go b/session/txninfo/txn_info.go index e75a9825d4bfb..acc52e985f0f9 100644 --- a/session/txninfo/txn_info.go +++ b/session/txninfo/txn_info.go @@ -77,10 +77,10 @@ type TxnInfo struct { CurrentDB string } -// GetSnapInfo gets a snapshot of the TxnInfo for read. It's safe to call concurrently with the transaction. +// ShallowClone shallow clones the TxnInfo. It's safe to call concurrently with the transaction. // Note that this function doesn't do deep copy and some fields of the result may be unsafe to write. Use it at your own // risk. -func (info *TxnInfo) GetSnapInfo() *TxnInfo { +func (info *TxnInfo) ShallowClone() *TxnInfo { return &TxnInfo{ StartTS: info.StartTS, CurrentSQLDigest: info.CurrentSQLDigest,