diff --git a/kv/variables.go b/kv/variables.go index 373421826dacb..ad589b280633a 100644 --- a/kv/variables.go +++ b/kv/variables.go @@ -23,18 +23,25 @@ type Variables struct { // Hook is used for test to verify the variable take effect. Hook func(name string, vars *Variables) + + // Pointer to SessionVars.Killed + // Killed is a flag to indicate that this query is killed. + Killed *uint32 } // NewVariables create a new Variables instance with default values. -func NewVariables() *Variables { +func NewVariables(killed *uint32) *Variables { return &Variables{ BackoffLockFast: DefBackoffLockFast, BackOffWeight: DefBackOffWeight, + Killed: killed, } } +var ignoreKill uint32 + // DefaultVars is the default variables instance. -var DefaultVars = NewVariables() +var DefaultVars = NewVariables(&ignoreKill) // Default values const ( diff --git a/session/session_fail_test.go b/session/session_fail_test.go index 44fa7be1173a0..abd99447d0fd9 100644 --- a/session/session_fail_test.go +++ b/session/session_fail_test.go @@ -15,9 +15,11 @@ package session_test import ( "context" + "sync/atomic" . "github.com/pingcap/check" "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/testkit" ) @@ -126,3 +128,21 @@ func (s *testSessionSuite) TestRetryPreparedSleep(c *C) { tk.MustQuery("select c1 from t").Check(testkit.Rows("21", "0")) } + +func (s *testSessionSuite) TestKillFlagInBackoff(c *C) { + // This test checks the `killed` flag is passed down to the backoffer through + // session.KVVars. It works by setting the `killed = 3` first, then using + // failpoint to run backoff() and check the vars.Killed using the Hook() function. + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table kill_backoff (id int)") + var killValue uint32 + tk.Se.GetSessionVars().KVVars.Hook = func(name string, vars *kv.Variables) { + killValue = atomic.LoadUint32(vars.Killed) + } + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult", `return("callBackofferHook")`), IsNil) + defer failpoint.Disable("github.com/pingcap/tidb/store/tikv/tikvStoreSendReqResult") + // Set kill flag and check its passed to backoffer. + tk.Se.GetSessionVars().Killed = 3 + tk.MustQuery("select * from kill_backoff") + c.Assert(killValue, Equals, uint32(3)) +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 96286e06626bf..45303102af20a 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -487,7 +487,6 @@ func NewSessionVars() *SessionVars { PreparedStmtNameToID: make(map[string]uint32), PreparedParams: make([]types.Datum, 0, 10), TxnCtx: &TransactionContext{}, - KVVars: kv.NewVariables(), RetryInfo: &RetryInfo{}, ActiveRoles: make([]*auth.RoleIdentity, 0, 10), StrictSQLMode: true, @@ -513,6 +512,7 @@ func NewSessionVars() *SessionVars { AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc, LockWaitTimeout: DefInnodbLockWaitTimeout * 1000, } + vars.KVVars = kv.NewVariables(&vars.Killed) vars.Concurrency = Concurrency{ IndexLookupConcurrency: DefIndexLookupConcurrency, IndexSerialScanConcurrency: DefIndexSerialScanConcurrency, diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index 0d31e814e275d..b890b06e19270 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -19,6 +19,7 @@ import ( "math" "math/rand" "strings" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -322,6 +323,12 @@ func (b *Backoffer) BackoffWithMaxSleep(typ backoffType, maxSleepMs int, err err backoffDuration.Observe(float64(realSleep) / 1000) b.totalSleep += realSleep + if b.vars != nil && b.vars.Killed != nil { + if atomic.CompareAndSwapUint32(b.vars.Killed, 1, 0) { + return ErrQueryInterrupted + } + } + var startTs interface{} if ts := b.ctx.Value(txnStartKey); ts != nil { startTs = ts diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index 9c50af4e0d2c6..2a5146bc6fd71 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -95,6 +95,10 @@ func (s *RegionRequestSender) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, re GC: &kvrpcpb.GCResponse{RegionError: &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}}}, }, nil, nil) } + case "callBackofferHook": + if bo.vars != nil && bo.vars.Hook != nil { + bo.vars.Hook("callBackofferHook", bo.vars) + } } })