Skip to content

Commit

Permalink
feat: check kill signal in 2pc committer
Browse files Browse the repository at this point in the history
Signed-off-by: ekexium <eke@fastmail.com>
  • Loading branch information
ekexium committed Dec 21, 2023
1 parent 9932209 commit 3ab3d54
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 23 deletions.
4 changes: 2 additions & 2 deletions error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ var (
// ErrTiFlashServerTimeout is the error when tiflash server is timeout.
ErrTiFlashServerTimeout = errors.New("tiflash server timeout")
// ErrQueryInterrupted is the error when the query is interrupted.
ErrQueryInterrupted = errors.New("query interruppted")
ErrQueryInterrupted = errors.New("query interrupted")
// ErrTiKVStaleCommand is the error that the command is stale in tikv.
ErrTiKVStaleCommand = errors.New("tikv stale command")
// ErrTiKVMaxTimestampNotSynced is the error that tikv's max timestamp is not synced.
ErrTiKVMaxTimestampNotSynced = errors.New("tikv max timestamp not synced")
// ErrLockAcquireFailAndNoWaitSet is the error that acquire the lock failed while no wait is setted.
ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is setted")
ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is set")
// ErrResolveLockTimeout is the error that resolve lock timeout.
ErrResolveLockTimeout = errors.New("resolve lock timeout")
// ErrLockWaitTimeout is the error that wait for the lock is timeout.
Expand Down
32 changes: 29 additions & 3 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,10 @@ func (c *PlainMutations) AppendMutation(mutation PlainMutation) {
}

// newTwoPhaseCommitter creates a twoPhaseCommitter.
func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) {
func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (
*twoPhaseCommitter,
error,
) {
return &twoPhaseCommitter{
store: txn.store,
txn: txn,
Expand Down Expand Up @@ -919,7 +922,10 @@ const CommitSecondaryMaxBackoff = 41000

// doActionOnGroupedMutations splits groups into batches (there is one group per region, and potentially many batches per group, but all mutations
// in a batch will belong to the same region).
func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action twoPhaseCommitAction, groups []groupedMutations) error {
func (c *twoPhaseCommitter) doActionOnGroupMutations(
bo *retry.Backoffer,
action twoPhaseCommitAction, groups []groupedMutations,
) error {
action.tiKVTxnRegionsNumHistogram().Observe(float64(len(groups)))

var sizeFunc = c.keySize
Expand Down Expand Up @@ -1048,7 +1054,27 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action
}

// doActionOnBatches does action to batches in parallel.
func (c *twoPhaseCommitter) doActionOnBatches(bo *retry.Backoffer, action twoPhaseCommitAction, batches []batchMutations) error {
func (c *twoPhaseCommitter) doActionOnBatches(
bo *retry.Backoffer, action twoPhaseCommitAction,
batches []batchMutations,
) error {
// killSignal should never be nil for TiDB
if c.txn.vars != nil && c.txn.vars.Killed != nil {
// Do not reset the killed flag here. Let the upper layer reset the flag.
// Before it resets, any request is considered valid to be killed.
status := atomic.LoadUint32(c.txn.vars.Killed)
if status != 0 {
logutil.BgLogger().Info(
"pessimistic locking is killed", zap.Uint32(
"signal",
status,
),
)
// TODO: There might be various signals besides a query interruption,
// but we are unable to differentiate them, because the definition is in TiDB.
return errors.WithStack(tikverr.ErrQueryInterrupted)
}
}
if len(batches) == 0 {
return nil
}
Expand Down
12 changes: 0 additions & 12 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,6 @@ func (action actionPessimisticLock) handleSingleBatch(
return nil
}
}

// Handle the killed flag when waiting for the pessimistic lock.
// When a txn runs into LockKeys() and backoff here, it has no chance to call
// executor.Next() and check the killed flag.
if action.Killed != nil {
// Do not reset the killed flag here!
// actionPessimisticLock runs on each region parallelly, we have to consider that
// the error may be dropped.
if atomic.LoadUint32(action.Killed) == 1 {
return errors.WithStack(tikverr.ErrQueryInterrupted)
}
}
}
}

Expand Down
6 changes: 0 additions & 6 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,12 +1111,6 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
lockCtx.Stats.Mu.BackoffTypes = append(lockCtx.Stats.Mu.BackoffTypes, bo.GetTypes()...)
lockCtx.Stats.Mu.Unlock()
}
if lockCtx.Killed != nil {
// If the kill signal is received during waiting for pessimisticLock,
// pessimisticLockKeys would handle the error but it doesn't reset the flag.
// We need to reset the killed flag here.
atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0)
}
if txn.IsInAggressiveLockingMode() {
if txn.aggressiveLockingContext.maxLockedWithConflictTS < lockCtx.MaxLockedWithConflictTS {
txn.aggressiveLockingContext.maxLockedWithConflictTS = lockCtx.MaxLockedWithConflictTS
Expand Down

0 comments on commit 3ab3d54

Please sign in to comment.