Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store/tikv/retry: define Config instead of BackoffType #24692

Merged
merged 16 commits into from
May 20, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
return nil
}

if err1 := bo.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -431,9 +431,8 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *copro
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
for backoff := range backoffTimes {
backoffName := backoff.String()
resp.detail.BackoffTimes[backoffName] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoffName] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
}
resp.detail.CalleeAddress = task.storeAddr

Expand Down
9 changes: 4 additions & 5 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *ti
if task.storeType == kv.TiFlash {
err1 = bo.Backoff(tikv.BoTiFlashRPC, err1)
} else {
err1 = bo.BackoffTiKVRPC(err1)
err1 = bo.Backoff(tikv.BoTiKVRPC, err1)
}

if err1 != nil {
Expand Down Expand Up @@ -883,7 +883,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
return nil, errors.Trace(err1)
}
if msBeforeExpired > 0 {
if err := bo.BackoffWithMaxSleep(tikv.BoTxnLockFast, int(msBeforeExpired), errors.New(lockErr.String())); err != nil {
if err := bo.BackoffWithMaxSleepTxnLockFast(int(msBeforeExpired), errors.New(lockErr.String())); err != nil {
return nil, errors.Trace(err)
}
}
Expand Down Expand Up @@ -914,9 +914,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
for backoff := range backoffTimes {
backoffName := backoff.String()
resp.detail.BackoffTimes[backoffName] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoffName] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
}
if rpcCtx != nil {
resp.detail.CalleeAddress = rpcCtx.Addr
Expand Down
7 changes: 3 additions & 4 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques
return
}

if err1 := bo.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if errors.Cause(err) == context.Canceled {
logutil.BgLogger().Info("stream recv timeout", zap.Error(err))
} else {
Expand Down Expand Up @@ -383,9 +383,8 @@ func (m *mppIterator) handleMPPStreamResponse(bo *Backoffer, response *mpp.MPPDa
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
for backoff := range backoffTimes {
backoffName := backoff.String()
resp.detail.BackoffTimes[backoffName] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoffName] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
}
resp.detail.CalleeAddress = req.Meta.GetAddress()

Expand Down
23 changes: 8 additions & 15 deletions store/driver/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,22 @@ func (b *Backoffer) TiKVBackoffer() *tikv.Backoffer {
return b.b
}

// Backoff sleeps a while base on the backoffType and records the error message.
// Backoff sleeps a while base on the BackoffConfig and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) Backoff(typ tikv.BackoffType, err error) error {
e := b.b.Backoff(typ, err)
func (b *Backoffer) Backoff(cfg *tikv.BackoffConfig, err error) error {
e := b.b.Backoff(cfg, err)
return derr.ToTiDBErr(e)
}

// BackoffTiKVRPC sleeps a while base on the TiKVRPC and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) BackoffTiKVRPC(err error) error {
e := b.b.BackoffTiKVRPC(err)
return derr.ToTiDBErr(e)
}

// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message
// BackoffWithMaxSleepTxnLockFast sleeps a while for the operation TxnLockFast and records the error message
// and never sleep more than maxSleepMs for each sleep.
func (b *Backoffer) BackoffWithMaxSleep(typ tikv.BackoffType, maxSleepMs int, err error) error {
e := b.b.BackoffWithMaxSleep(typ, maxSleepMs, err)
func (b *Backoffer) BackoffWithMaxSleepTxnLockFast(maxSleepMs int, err error) error {
e := b.b.BackoffWithMaxSleepTxnLockFast(maxSleepMs, err)
return derr.ToTiDBErr(e)
}

// GetBackoffTimes returns a map contains backoff time count by type.
func (b *Backoffer) GetBackoffTimes() map[tikv.BackoffType]int {
func (b *Backoffer) GetBackoffTimes() map[string]int {
return b.b.GetBackoffTimes()
}

Expand All @@ -80,7 +73,7 @@ func (b *Backoffer) GetVars() *tikv.Variables {
}

// GetBackoffSleepMS returns a map contains backoff sleep time by type.
func (b *Backoffer) GetBackoffSleepMS() map[tikv.BackoffType]int {
func (b *Backoffer) GetBackoffSleepMS() map[string]int {
return b.b.GetBackoffSleepMS()
}

Expand Down
18 changes: 9 additions & 9 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ import (
// Backoffer is a utility for retrying queries.
type Backoffer = retry.Backoffer

// BackoffType defines the backoff type.
type BackoffType = retry.BackoffType
// BackoffConfig defines the backoff configuration.
type BackoffConfig = retry.Config

// Back off types.
const (
BoRegionMiss = retry.BoRegionMiss
BoTiFlashRPC = retry.BoTiFlashRPC
BoTxnLockFast = retry.BoTxnLockFast
BoTxnLock = retry.BoTxnLock
BoPDRPC = retry.BoPDRPC
// Back off configurations.
var (
BoRegionMiss = retry.BoRegionMiss
BoTiFlashRPC = retry.BoTiFlashRPC
BoTxnLock = retry.BoTxnLock
BoPDRPC = retry.BoPDRPC
BoTiKVRPC = retry.BoTiKVRPC
)

// Maximum total sleep time(in ms) for kv/cop commands.
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *b
break
}

err2 := b.BackoffTiKVRPC(err1)
err2 := b.Backoff(BoTiKVRPC, err1)
// As timeout is set to math.MaxUint32, err2 should always be nil.
// This line is added to make the 'make errcheck' pass.
terror.Log(err2)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
}
atomic.AddInt64(&c.getDetail().ResolveLockTime, int64(time.Since(start)))
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
err = bo.BackoffWithCfgAndMaxSleep(retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think BackoffWithMaxSleep is ok. Now all Backoff*s need config. We do no need to mention it specially.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, we need to keep the origin BackoffWithMaxSleep since br is using it.

if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err
if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash {
err = bo.Backoff(retry.BoTiFlashRPC, errors.Errorf("send tiflash request error: %v, ctx: %v, try next peer later", err, ctx))
} else {
err = bo.BackoffTiKVRPC(errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx))
err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx))
}
return errors.Trace(err)
}
Expand Down
Loading