Skip to content

Commit

Permalink
store/tikv/retry: define Config instead of BackoffType
Browse files Browse the repository at this point in the history
Signed-off-by: shirly <AndreMouche@126.com>
  • Loading branch information
AndreMouche committed May 18, 2021
1 parent e92df20 commit 038ee7b
Show file tree
Hide file tree
Showing 14 changed files with 255 additions and 263 deletions.
7 changes: 3 additions & 4 deletions store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b
return nil
}

if err1 := bo.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil {
return errors.Trace(err)
}

Expand Down Expand Up @@ -431,9 +431,8 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *copro
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
for backoff := range backoffTimes {
backoffName := backoff.String()
resp.detail.BackoffTimes[backoffName] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoffName] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
}
resp.detail.CalleeAddress = task.storeAddr

Expand Down
9 changes: 4 additions & 5 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *ti
if task.storeType == kv.TiFlash {
err1 = bo.Backoff(tikv.BoTiFlashRPC, err1)
} else {
err1 = bo.BackoffTiKVRPC(err1)
err1 = bo.Backoff(tikv.BoTiKVRPC, err1)
}

if err1 != nil {
Expand Down Expand Up @@ -883,7 +883,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
return nil, errors.Trace(err1)
}
if msBeforeExpired > 0 {
if err := bo.BackoffWithMaxSleep(tikv.BoTxnLockFast, int(msBeforeExpired), errors.New(lockErr.String())); err != nil {
if err := bo.BackoffWithMaxSleepTxnLockFast(int(msBeforeExpired), errors.New(lockErr.String())); err != nil {
return nil, errors.Trace(err)
}
}
Expand Down Expand Up @@ -914,9 +914,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
for backoff := range backoffTimes {
backoffName := backoff.String()
resp.detail.BackoffTimes[backoffName] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoffName] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
}
if rpcCtx != nil {
resp.detail.CalleeAddress = rpcCtx.Addr
Expand Down
7 changes: 3 additions & 4 deletions store/copr/mpp.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques
return
}

if err1 := bo.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if err1 := bo.Backoff(tikv.BoTiKVRPC, errors.Errorf("recv stream response error: %v", err)); err1 != nil {
if errors.Cause(err) == context.Canceled {
logutil.BgLogger().Info("stream recv timeout", zap.Error(err))
} else {
Expand Down Expand Up @@ -383,9 +383,8 @@ func (m *mppIterator) handleMPPStreamResponse(bo *Backoffer, response *mpp.MPPDa
resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes))
resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes))
for backoff := range backoffTimes {
backoffName := backoff.String()
resp.detail.BackoffTimes[backoffName] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoffName] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
resp.detail.BackoffTimes[backoff] = backoffTimes[backoff]
resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond
}
resp.detail.CalleeAddress = req.Meta.GetAddress()

Expand Down
23 changes: 8 additions & 15 deletions store/driver/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,29 +43,22 @@ func (b *Backoffer) TiKVBackoffer() *tikv.Backoffer {
return b.b
}

// Backoff sleeps a while base on the backoffType and records the error message.
// Backoff sleeps a while base on the BackoffConfig and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) Backoff(typ tikv.BackoffType, err error) error {
e := b.b.Backoff(typ, err)
func (b *Backoffer) Backoff(cfg *tikv.BackoffConfig, err error) error {
e := b.b.Backoff(cfg, err)
return derr.ToTiDBErr(e)
}

// BackoffTiKVRPC sleeps a while base on the TiKVRPC and records the error message.
// It returns a retryable error if total sleep time exceeds maxSleep.
func (b *Backoffer) BackoffTiKVRPC(err error) error {
e := b.b.BackoffTiKVRPC(err)
return derr.ToTiDBErr(e)
}

// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message
// BackoffWithMaxSleepTxnLockFast sleeps a while for the operation TxnLockFast and records the error message
// and never sleep more than maxSleepMs for each sleep.
func (b *Backoffer) BackoffWithMaxSleep(typ tikv.BackoffType, maxSleepMs int, err error) error {
e := b.b.BackoffWithMaxSleep(typ, maxSleepMs, err)
func (b *Backoffer) BackoffWithMaxSleepTxnLockFast(maxSleepMs int, err error) error {
e := b.b.BackoffWithMaxSleepTxnLockFast(maxSleepMs, err)
return derr.ToTiDBErr(e)
}

// GetBackoffTimes returns a map contains backoff time count by type.
func (b *Backoffer) GetBackoffTimes() map[tikv.BackoffType]int {
func (b *Backoffer) GetBackoffTimes() map[string]int {
return b.b.GetBackoffTimes()
}

Expand All @@ -80,7 +73,7 @@ func (b *Backoffer) GetVars() *tikv.Variables {
}

// GetBackoffSleepMS returns a map contains backoff sleep time by type.
func (b *Backoffer) GetBackoffSleepMS() map[tikv.BackoffType]int {
func (b *Backoffer) GetBackoffSleepMS() map[string]int {
return b.b.GetBackoffSleepMS()
}

Expand Down
18 changes: 9 additions & 9 deletions store/tikv/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ import (
// Backoffer is a utility for retrying queries.
type Backoffer = retry.Backoffer

// BackoffType defines the backoff type.
type BackoffType = retry.BackoffType
// BackoffConfig defines the backoff configuration.
type BackoffConfig = retry.Config

// Back off types.
const (
BoRegionMiss = retry.BoRegionMiss
BoTiFlashRPC = retry.BoTiFlashRPC
BoTxnLockFast = retry.BoTxnLockFast
BoTxnLock = retry.BoTxnLock
BoPDRPC = retry.BoPDRPC
// Back off configurations.
var (
BoRegionMiss = retry.BoRegionMiss
BoTiFlashRPC = retry.BoTiFlashRPC
BoTxnLock = retry.BoTxnLock
BoPDRPC = retry.BoPDRPC
BoTiKVRPC = retry.BoTiKVRPC
)

// Maximum total sleep time(in ms) for kv/cop commands.
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/client_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -673,7 +673,7 @@ func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *b
break
}

err2 := b.BackoffTiKVRPC(err1)
err2 := b.Backoff(BoTiKVRPC, err1)
// As timeout is set to math.MaxUint32, err2 should always be nil.
// This line is added to make the 'make errcheck' pass.
terror.Log(err2)
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff
}
atomic.AddInt64(&c.getDetail().ResolveLockTime, int64(time.Since(start)))
if msBeforeExpired > 0 {
err = bo.BackoffWithMaxSleep(retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
err = bo.BackoffWithCfgAndMaxSleep(retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks)))
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err
if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash {
err = bo.Backoff(retry.BoTiFlashRPC, errors.Errorf("send tiflash request error: %v, ctx: %v, try next peer later", err, ctx))
} else {
err = bo.BackoffTiKVRPC(errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx))
err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx))
}
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit 038ee7b

Please sign in to comment.