Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: check kill signal in 2pc committer #1084

Merged
merged 3 commits into from
Dec 26, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ var (
// ErrTiFlashServerTimeout is the error when tiflash server is timeout.
ErrTiFlashServerTimeout = errors.New("tiflash server timeout")
// ErrQueryInterrupted is the error when the query is interrupted.
ErrQueryInterrupted = errors.New("query interruppted")
ErrQueryInterrupted = errors.New("query interrupted")
// ErrTiKVStaleCommand is the error that the command is stale in tikv.
ErrTiKVStaleCommand = errors.New("tikv stale command")
// ErrTiKVMaxTimestampNotSynced is the error that tikv's max timestamp is not synced.
ErrTiKVMaxTimestampNotSynced = errors.New("tikv max timestamp not synced")
// ErrLockAcquireFailAndNoWaitSet is the error that acquire the lock failed while no wait is setted.
ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is setted")
ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is set")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

// ErrResolveLockTimeout is the error that resolve lock timeout.
ErrResolveLockTimeout = errors.New("resolve lock timeout")
// ErrLockWaitTimeout is the error that wait for the lock is timeout.
Expand Down
32 changes: 29 additions & 3 deletions txnkv/transaction/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,10 @@ func (c *PlainMutations) AppendMutation(mutation PlainMutation) {
}

// newTwoPhaseCommitter creates a twoPhaseCommitter.
func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (*twoPhaseCommitter, error) {
func newTwoPhaseCommitter(txn *KVTxn, sessionID uint64) (
*twoPhaseCommitter,
error,
) {
return &twoPhaseCommitter{
store: txn.store,
txn: txn,
Expand Down Expand Up @@ -919,7 +922,10 @@ const CommitSecondaryMaxBackoff = 41000

// doActionOnGroupedMutations splits groups into batches (there is one group per region, and potentially many batches per group, but all mutations
// in a batch will belong to the same region).
func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action twoPhaseCommitAction, groups []groupedMutations) error {
func (c *twoPhaseCommitter) doActionOnGroupMutations(
bo *retry.Backoffer,
action twoPhaseCommitAction, groups []groupedMutations,
ekexium marked this conversation as resolved.
Show resolved Hide resolved
) error {
action.tiKVTxnRegionsNumHistogram().Observe(float64(len(groups)))

var sizeFunc = c.keySize
Expand Down Expand Up @@ -1048,7 +1054,27 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action
}

// doActionOnBatches does action to batches in parallel.
func (c *twoPhaseCommitter) doActionOnBatches(bo *retry.Backoffer, action twoPhaseCommitAction, batches []batchMutations) error {
func (c *twoPhaseCommitter) doActionOnBatches(
bo *retry.Backoffer, action twoPhaseCommitAction,
batches []batchMutations,
) error {
// killSignal should never be nil for TiDB
if c.txn != nil && c.txn.vars != nil && c.txn.vars.Killed != nil {
// Do not reset the killed flag here. Let the upper layer reset the flag.
// Before it resets, any request is considered valid to be killed.
status := atomic.LoadUint32(c.txn.vars.Killed)
if status != 0 {
logutil.BgLogger().Info(
"query is killed", zap.Uint32(
"signal",
status,
),
)
// TODO: There might be various signals besides a query interruption,
// but we are unable to differentiate them, because the definition is in TiDB.
return errors.WithStack(tikverr.ErrQueryInterrupted)
}
}
if len(batches) == 0 {
return nil
}
Expand Down
12 changes: 0 additions & 12 deletions txnkv/transaction/pessimistic.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,18 +214,6 @@ func (action actionPessimisticLock) handleSingleBatch(
return nil
}
}

// Handle the killed flag when waiting for the pessimistic lock.
// When a txn runs into LockKeys() and backoff here, it has no chance to call
// executor.Next() and check the killed flag.
if action.Killed != nil {
// Do not reset the killed flag here!
// actionPessimisticLock runs on each region parallelly, we have to consider that
// the error may be dropped.
if atomic.LoadUint32(action.Killed) == 1 {
return errors.WithStack(tikverr.ErrQueryInterrupted)
}
}
}
}

Expand Down
6 changes: 0 additions & 6 deletions txnkv/transaction/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1111,12 +1111,6 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func()
lockCtx.Stats.Mu.BackoffTypes = append(lockCtx.Stats.Mu.BackoffTypes, bo.GetTypes()...)
lockCtx.Stats.Mu.Unlock()
}
if lockCtx.Killed != nil {
// If the kill signal is received during waiting for pessimisticLock,
// pessimisticLockKeys would handle the error but it doesn't reset the flag.
// We need to reset the killed flag here.
atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0)
}
if txn.IsInAggressiveLockingMode() {
if txn.aggressiveLockingContext.maxLockedWithConflictTS < lockCtx.MaxLockedWithConflictTS {
txn.aggressiveLockingContext.maxLockedWithConflictTS = lockCtx.MaxLockedWithConflictTS
Expand Down
Loading