Skip to content

Commit

Permalink
Address comment
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta committed May 27, 2021
1 parent e5921e5 commit 43cd7b4
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 31 deletions.
58 changes: 29 additions & 29 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,9 @@ type LazyTxn struct {
// we need these fields because kv.Transaction provides no thread safety promise
// but we hope getting TxnInfo is a thread safe op

// atomicTxnInfo provides information about the transaction in a thread-safe way. To atomically replace the struct,
// txnInfo provides information about the transaction in a thread-safe way. To atomically replace the struct,
// it's stored as an unsafe.Pointer.
atomicTxnInfo unsafe.Pointer
// txnInfo points to the same thing as atomicTxnInfo. It's just used internally by LazyTxn to avoid casting
// atomicTxnInfo from the untyped unsafe.Pointer every time using it.
txnInfo *txninfo.TxnInfo
txnInfo unsafe.Pointer
}

// GetTableInfo returns the cached index name.
Expand All @@ -83,10 +80,9 @@ func (txn *LazyTxn) CacheTableInfo(id int64, info *model.TableInfo) {

func (txn *LazyTxn) init() {
txn.mutations = make(map[int64]*binlog.TableMutation)
txn.txnInfo = &txninfo.TxnInfo{
txn.storeTxnInfo(&txninfo.TxnInfo{
State: txninfo.TxnRunningNormal,
}
txn.storeTxnInfo(txn.txnInfo)
})
}

func (txn *LazyTxn) initStmtBuf() {
Expand Down Expand Up @@ -122,13 +118,14 @@ func (txn *LazyTxn) cleanupStmtBuf() {
buf := txn.Transaction.GetMemBuffer()
buf.Cleanup(txn.stagingHandle)
txn.initCnt = buf.Len()
atomic.StoreUint64(&txn.txnInfo.EntriesCount, uint64(txn.Transaction.Len()))
atomic.StoreUint64(&txn.txnInfo.EntriesSize, uint64(txn.Transaction.Size()))

txnInfo := txn.getTxnInfo()
atomic.StoreUint64(&txnInfo.EntriesCount, uint64(txn.Transaction.Len()))
atomic.StoreUint64(&txnInfo.EntriesSize, uint64(txn.Transaction.Size()))
}

func (txn *LazyTxn) storeTxnInfo(info *txninfo.TxnInfo) {
txn.txnInfo = info
atomic.StorePointer(&txn.atomicTxnInfo, unsafe.Pointer(info))
atomic.StorePointer(&txn.txnInfo, unsafe.Pointer(info))
}

func (txn *LazyTxn) recreateTxnInfo(
Expand All @@ -150,8 +147,8 @@ func (txn *LazyTxn) recreateTxnInfo(
txn.storeTxnInfo(info)
}

func (txn *LazyTxn) loadTxnInfo() *txninfo.TxnInfo {
return (*txninfo.TxnInfo)(atomic.LoadPointer(&txn.atomicTxnInfo))
func (txn *LazyTxn) getTxnInfo() *txninfo.TxnInfo {
return (*txninfo.TxnInfo)(atomic.LoadPointer(&txn.txnInfo))
}

// Size implements the MemBuffer interface.
Expand Down Expand Up @@ -241,13 +238,14 @@ func (txn *LazyTxn) changePendingToValid(ctx context.Context) error {
txn.initStmtBuf()

// The txnInfo may already recorded the first statement (usually "begin") when it's pending, so keep them.
txnInfo := txn.getTxnInfo()
txn.recreateTxnInfo(
t.StartTS(),
txninfo.TxnRunningNormal,
uint64(txn.Transaction.Len()),
uint64(txn.Transaction.Size()),
txn.txnInfo.CurrentSQLDigest,
txn.txnInfo.AllSQLDigests)
txnInfo.CurrentSQLDigest,
txnInfo.AllSQLDigests)
return nil
}

Expand All @@ -273,7 +271,7 @@ func (txn *LazyTxn) onStmtStart(currentSQLDigest string) {
return
}

info := txn.txnInfo.GetSnapInfo()
info := txn.getTxnInfo().ShallowClone()
info.CurrentSQLDigest = currentSQLDigest
// Keeps at most 50 history sqls to avoid consuming too much memory.
const maxTransactionStmtHistory int = 50
Expand All @@ -285,7 +283,7 @@ func (txn *LazyTxn) onStmtStart(currentSQLDigest string) {
}

func (txn *LazyTxn) onStmtEnd() {
info := txn.txnInfo.GetSnapInfo()
info := txn.getTxnInfo().ShallowClone()
info.CurrentSQLDigest = ""
txn.storeTxnInfo(info)
}
Expand Down Expand Up @@ -327,7 +325,7 @@ func (txn *LazyTxn) Commit(ctx context.Context) error {
return errors.Trace(kv.ErrInvalidTxn)
}

atomic.StoreInt32(&txn.txnInfo.State, txninfo.TxnCommitting)
atomic.StoreInt32(&txn.getTxnInfo().State, txninfo.TxnCommitting)

failpoint.Inject("mockSlowCommit", func(_ failpoint.Value) {})

Expand Down Expand Up @@ -359,22 +357,23 @@ func (txn *LazyTxn) Commit(ctx context.Context) error {
// Rollback overrides the Transaction interface.
func (txn *LazyTxn) Rollback() error {
defer txn.reset()
atomic.StoreInt32(&txn.txnInfo.State, txninfo.TxnRollingBack)
atomic.StoreInt32(&txn.getTxnInfo().State, txninfo.TxnRollingBack)
// mockSlowRollback is used to mock a rollback which takes a long time
failpoint.Inject("mockSlowRollback", func(_ failpoint.Value) {})
return txn.Transaction.Rollback()
}

// LockKeys Wrap the inner transaction's `LockKeys` to record the status
func (txn *LazyTxn) LockKeys(ctx context.Context, lockCtx *kv.LockCtx, keys ...kv.Key) error {
originState := atomic.SwapInt32(&txn.txnInfo.State, txninfo.TxnLockWaiting)
txnInfo := txn.getTxnInfo()
originState := atomic.SwapInt32(&txnInfo.State, txninfo.TxnLockWaiting)
t := time.Now()
atomic.StorePointer(&txn.txnInfo.BlockStartTime, unsafe.Pointer(&t))
atomic.StorePointer(&txnInfo.BlockStartTime, unsafe.Pointer(&t))
err := txn.Transaction.LockKeys(ctx, lockCtx, keys...)
atomic.StorePointer(&txn.txnInfo.BlockStartTime, unsafe.Pointer(nil))
atomic.StoreInt32(&txn.txnInfo.State, originState)
atomic.StoreUint64(&txn.txnInfo.EntriesCount, uint64(txn.Transaction.Len()))
atomic.StoreUint64(&txn.txnInfo.EntriesSize, uint64(txn.Transaction.Size()))
atomic.StorePointer(&txnInfo.BlockStartTime, unsafe.Pointer(nil))
atomic.StoreInt32(&txnInfo.State, originState)
atomic.StoreUint64(&txnInfo.EntriesCount, uint64(txn.Transaction.Len()))
atomic.StoreUint64(&txnInfo.EntriesSize, uint64(txn.Transaction.Size()))
return err
}

Expand Down Expand Up @@ -433,7 +432,7 @@ func keyNeedToLock(k, v []byte, flags kv.KeyFlags) bool {
// Info dump the TxnState to Datum for displaying in `TIDB_TRX`
// This function is supposed to be thread safe
func (txn *LazyTxn) Info() *txninfo.TxnInfo {
info := txn.loadTxnInfo().GetSnapInfo()
info := txn.getTxnInfo().ShallowClone()
if info.StartTS == 0 {
return nil
}
Expand All @@ -445,8 +444,9 @@ func (txn *LazyTxn) Info() *txninfo.TxnInfo {
// txn.Transaction can be changed during this function's execution if running parallel.
func (txn *LazyTxn) UpdateEntriesCountAndSize() {
if txn.Valid() {
atomic.StoreUint64(&txn.txnInfo.EntriesCount, uint64(txn.Transaction.Len()))
atomic.StoreUint64(&txn.txnInfo.EntriesSize, uint64(txn.Transaction.Size()))
txnInfo := txn.getTxnInfo()
atomic.StoreUint64(&txnInfo.EntriesCount, uint64(txn.Transaction.Len()))
atomic.StoreUint64(&txnInfo.EntriesSize, uint64(txn.Transaction.Size()))
}
}

Expand Down
4 changes: 2 additions & 2 deletions session/txninfo/txn_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ type TxnInfo struct {
CurrentDB string
}

// GetSnapInfo gets a snapshot of the TxnInfo for read. It's safe to call concurrently with the transaction.
// ShallowClone shallow clones the TxnInfo. It's safe to call concurrently with the transaction.
// Note that this function doesn't do deep copy and some fields of the result may be unsafe to write. Use it at your own
// risk.
func (info *TxnInfo) GetSnapInfo() *TxnInfo {
func (info *TxnInfo) ShallowClone() *TxnInfo {
return &TxnInfo{
StartTS: info.StartTS,
CurrentSQLDigest: info.CurrentSQLDigest,
Expand Down

0 comments on commit 43cd7b4

Please sign in to comment.