diff --git a/error/error.go b/error/error.go index 62a75ff65..3f10c211f 100644 --- a/error/error.go +++ b/error/error.go @@ -64,13 +64,13 @@ var ( // ErrTiFlashServerTimeout is the error when tiflash server is timeout. ErrTiFlashServerTimeout = errors.New("tiflash server timeout") // ErrQueryInterrupted is the error when the query is interrupted. - ErrQueryInterrupted = errors.New("query interruppted") + ErrQueryInterrupted = errors.New("query interrupted") // ErrTiKVStaleCommand is the error that the command is stale in tikv. ErrTiKVStaleCommand = errors.New("tikv stale command") // ErrTiKVMaxTimestampNotSynced is the error that tikv's max timestamp is not synced. ErrTiKVMaxTimestampNotSynced = errors.New("tikv max timestamp not synced") // ErrLockAcquireFailAndNoWaitSet is the error that acquire the lock failed while no wait is setted. - ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is setted") + ErrLockAcquireFailAndNoWaitSet = errors.New("lock acquired failed and no wait is set") // ErrResolveLockTimeout is the error that resolve lock timeout. ErrResolveLockTimeout = errors.New("resolve lock timeout") // ErrLockWaitTimeout is the error that wait for the lock is timeout. diff --git a/txnkv/transaction/2pc.go b/txnkv/transaction/2pc.go index c7d99333b..49a580c16 100644 --- a/txnkv/transaction/2pc.go +++ b/txnkv/transaction/2pc.go @@ -1048,7 +1048,27 @@ func (c *twoPhaseCommitter) doActionOnGroupMutations(bo *retry.Backoffer, action } // doActionOnBatches does action to batches in parallel. -func (c *twoPhaseCommitter) doActionOnBatches(bo *retry.Backoffer, action twoPhaseCommitAction, batches []batchMutations) error { +func (c *twoPhaseCommitter) doActionOnBatches( + bo *retry.Backoffer, action twoPhaseCommitAction, + batches []batchMutations, +) error { + // killSignal should never be nil for TiDB + if c.txn != nil && c.txn.vars != nil && c.txn.vars.Killed != nil { + // Do not reset the killed flag here. Let the upper layer reset the flag. + // Before it resets, any request is considered valid to be killed. + status := atomic.LoadUint32(c.txn.vars.Killed) + if status != 0 { + logutil.BgLogger().Info( + "query is killed", zap.Uint32( + "signal", + status, + ), + ) + // TODO: There might be various signals besides a query interruption, + // but we are unable to differentiate them, because the definition is in TiDB. + return errors.WithStack(tikverr.ErrQueryInterrupted) + } + } if len(batches) == 0 { return nil } diff --git a/txnkv/transaction/pessimistic.go b/txnkv/transaction/pessimistic.go index 28835baeb..3db567028 100644 --- a/txnkv/transaction/pessimistic.go +++ b/txnkv/transaction/pessimistic.go @@ -214,18 +214,6 @@ func (action actionPessimisticLock) handleSingleBatch( return nil } } - - // Handle the killed flag when waiting for the pessimistic lock. - // When a txn runs into LockKeys() and backoff here, it has no chance to call - // executor.Next() and check the killed flag. - if action.Killed != nil { - // Do not reset the killed flag here! - // actionPessimisticLock runs on each region parallelly, we have to consider that - // the error may be dropped. - if atomic.LoadUint32(action.Killed) == 1 { - return errors.WithStack(tikverr.ErrQueryInterrupted) - } - } } } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 2d5407a39..94ef35964 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -1111,12 +1111,6 @@ func (txn *KVTxn) lockKeys(ctx context.Context, lockCtx *tikv.LockCtx, fn func() lockCtx.Stats.Mu.BackoffTypes = append(lockCtx.Stats.Mu.BackoffTypes, bo.GetTypes()...) lockCtx.Stats.Mu.Unlock() } - if lockCtx.Killed != nil { - // If the kill signal is received during waiting for pessimisticLock, - // pessimisticLockKeys would handle the error but it doesn't reset the flag. - // We need to reset the killed flag here. - atomic.CompareAndSwapUint32(lockCtx.Killed, 1, 0) - } if txn.IsInAggressiveLockingMode() { if txn.aggressiveLockingContext.maxLockedWithConflictTS < lockCtx.MaxLockedWithConflictTS { txn.aggressiveLockingContext.maxLockedWithConflictTS = lockCtx.MaxLockedWithConflictTS