diff --git a/store/copr/batch_coprocessor.go b/store/copr/batch_coprocessor.go index 8c73f58fbf892..6efbf76775186 100644 --- a/store/copr/batch_coprocessor.go +++ b/store/copr/batch_coprocessor.go @@ -302,7 +302,7 @@ func buildBatchCopTasks(bo *Backoffer, cache *tikv.RegionCache, ranges *tikv.Key // As mentioned above, nil rpcCtx is always attributed to failed stores. // It's equal to long poll the store but get no response. Here we'd better use // TiFlash error to trigger the TiKV fallback mechanism. - err = bo.Backoff(tikv.BoTiFlashRPC, errors.New("Cannot find region with TiFlash peer")) + err = bo.Backoff(tikv.BoTiFlashRPC(), errors.New("Cannot find region with TiFlash peer")) if err != nil { return nil, errors.Trace(err) } @@ -548,7 +548,7 @@ func (b *batchCopIterator) handleStreamedBatchCopResponse(ctx context.Context, b return nil } - if err1 := bo.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil { + if err1 := bo.Backoff(tikv.BoTiKVRPC(), errors.Errorf("recv stream response error: %v, task store addr: %s", err, task.storeAddr)); err1 != nil { return errors.Trace(err) } @@ -597,9 +597,8 @@ func (b *batchCopIterator) handleBatchCopResponse(bo *Backoffer, response *copro resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes)) resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes)) for backoff := range backoffTimes { - backoffName := backoff.String() - resp.detail.BackoffTimes[backoffName] = backoffTimes[backoff] - resp.detail.BackoffSleep[backoffName] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond + resp.detail.BackoffTimes[backoff] = backoffTimes[backoff] + resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond } resp.detail.CalleeAddress = task.storeAddr diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index 989a6d835ce0f..c66c3cda9af35 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -833,9 +833,9 @@ func (worker *copIteratorWorker) handleCopStreamResult(bo *Backoffer, rpcCtx *ti err1 := errors.Errorf("recv stream response error: %v, task: %s", err, task) if task.storeType == kv.TiFlash { - err1 = bo.Backoff(tikv.BoTiFlashRPC, err1) + err1 = bo.Backoff(tikv.BoTiFlashRPC(), err1) } else { - err1 = bo.BackoffTiKVRPC(err1) + err1 = bo.Backoff(tikv.BoTiKVRPC(), err1) } if err1 != nil { @@ -869,7 +869,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R } errStr := fmt.Sprintf("region_id:%v, region_ver:%v, store_type:%s, peer_addr:%s, error:%s", task.region.GetID(), task.region.GetVer(), task.storeType.Name(), task.storeAddr, regionErr.String()) - if err := bo.Backoff(tikv.BoRegionMiss, errors.New(errStr)); err != nil { + if err := bo.Backoff(tikv.BoRegionMiss(), errors.New(errStr)); err != nil { return nil, errors.Trace(err) } // We may meet RegionError at the first packet, but not during visiting the stream. @@ -884,7 +884,7 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R return nil, errors.Trace(err1) } if msBeforeExpired > 0 { - if err := bo.BackoffWithMaxSleep(tikv.BoTxnLockFast, int(msBeforeExpired), errors.New(lockErr.String())); err != nil { + if err := bo.BackoffWithMaxSleepTxnLockFast(int(msBeforeExpired), errors.New(lockErr.String())); err != nil { return nil, errors.Trace(err) } } @@ -915,9 +915,8 @@ func (worker *copIteratorWorker) handleCopResponse(bo *Backoffer, rpcCtx *tikv.R resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes)) resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes)) for backoff := range backoffTimes { - backoffName := backoff.String() - resp.detail.BackoffTimes[backoffName] = backoffTimes[backoff] - resp.detail.BackoffSleep[backoffName] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond + resp.detail.BackoffTimes[backoff] = backoffTimes[backoff] + resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond } if rpcCtx != nil { resp.detail.CalleeAddress = rpcCtx.Addr diff --git a/store/copr/mpp.go b/store/copr/mpp.go index 1941f2b3fbfa4..6d58e4ef732fe 100644 --- a/store/copr/mpp.go +++ b/store/copr/mpp.go @@ -340,7 +340,7 @@ func (m *mppIterator) establishMPPConns(bo *Backoffer, req *kv.MPPDispatchReques return } - if err1 := bo.BackoffTiKVRPC(errors.Errorf("recv stream response error: %v", err)); err1 != nil { + if err1 := bo.Backoff(tikv.BoTiKVRPC(), errors.Errorf("recv stream response error: %v", err)); err1 != nil { if errors.Cause(err) == context.Canceled { logutil.BgLogger().Info("stream recv timeout", zap.Error(err)) } else { @@ -383,9 +383,8 @@ func (m *mppIterator) handleMPPStreamResponse(bo *Backoffer, response *mpp.MPPDa resp.detail.BackoffSleep = make(map[string]time.Duration, len(backoffTimes)) resp.detail.BackoffTimes = make(map[string]int, len(backoffTimes)) for backoff := range backoffTimes { - backoffName := backoff.String() - resp.detail.BackoffTimes[backoffName] = backoffTimes[backoff] - resp.detail.BackoffSleep[backoffName] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond + resp.detail.BackoffTimes[backoff] = backoffTimes[backoff] + resp.detail.BackoffSleep[backoff] = time.Duration(bo.GetBackoffSleepMS()[backoff]) * time.Millisecond } resp.detail.CalleeAddress = req.Meta.GetAddress() diff --git a/store/driver/backoff/backoff.go b/store/driver/backoff/backoff.go index f634366381d06..35979edc638b4 100644 --- a/store/driver/backoff/backoff.go +++ b/store/driver/backoff/backoff.go @@ -43,29 +43,22 @@ func (b *Backoffer) TiKVBackoffer() *tikv.Backoffer { return b.b } -// Backoff sleeps a while base on the backoffType and records the error message. +// Backoff sleeps a while base on the BackoffConfig and records the error message. // It returns a retryable error if total sleep time exceeds maxSleep. -func (b *Backoffer) Backoff(typ tikv.BackoffType, err error) error { - e := b.b.Backoff(typ, err) +func (b *Backoffer) Backoff(cfg *tikv.BackoffConfig, err error) error { + e := b.b.Backoff(cfg, err) return derr.ToTiDBErr(e) } -// BackoffTiKVRPC sleeps a while base on the TiKVRPC and records the error message. -// It returns a retryable error if total sleep time exceeds maxSleep. -func (b *Backoffer) BackoffTiKVRPC(err error) error { - e := b.b.BackoffTiKVRPC(err) - return derr.ToTiDBErr(e) -} - -// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message +// BackoffWithMaxSleepTxnLockFast sleeps a while for the operation TxnLockFast and records the error message // and never sleep more than maxSleepMs for each sleep. -func (b *Backoffer) BackoffWithMaxSleep(typ tikv.BackoffType, maxSleepMs int, err error) error { - e := b.b.BackoffWithMaxSleep(typ, maxSleepMs, err) +func (b *Backoffer) BackoffWithMaxSleepTxnLockFast(maxSleepMs int, err error) error { + e := b.b.BackoffWithMaxSleepTxnLockFast(maxSleepMs, err) return derr.ToTiDBErr(e) } // GetBackoffTimes returns a map contains backoff time count by type. -func (b *Backoffer) GetBackoffTimes() map[tikv.BackoffType]int { +func (b *Backoffer) GetBackoffTimes() map[string]int { return b.b.GetBackoffTimes() } @@ -80,7 +73,7 @@ func (b *Backoffer) GetVars() *tikv.Variables { } // GetBackoffSleepMS returns a map contains backoff sleep time by type. -func (b *Backoffer) GetBackoffSleepMS() map[tikv.BackoffType]int { +func (b *Backoffer) GetBackoffSleepMS() map[string]int { return b.b.GetBackoffSleepMS() } diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index 2d93b7eda4abb..5f5471d8e7251 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -231,7 +231,7 @@ func (s *tikvStore) EtcdAddrs() ([]string, error) { for { members, err := pdClient.GetAllMembers(ctx) if err != nil { - err := bo.Backoff(tikv.BoRegionMiss, err) + err := bo.Backoff(tikv.BoRegionMiss(), err) if err != nil { return nil, err } diff --git a/store/gcworker/gc_worker.go b/store/gcworker/gc_worker.go index b5b42df1838b9..72ae8bc34f0fa 100644 --- a/store/gcworker/gc_worker.go +++ b/store/gcworker/gc_worker.go @@ -1089,7 +1089,7 @@ retryScanAndResolve: return stat, errors.Trace(err) } if regionErr != nil { - err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) if err != nil { return stat, errors.Trace(err) } @@ -1125,7 +1125,7 @@ retryScanAndResolve: return stat, errors.Trace(err1) } if !ok { - err = bo.Backoff(tikv.BoTxnLock, errors.Errorf("remain locks: %d", len(locks))) + err = bo.Backoff(tikv.BoTxnLock(), errors.Errorf("remain locks: %d", len(locks))) if err != nil { return stat, errors.Trace(err) } @@ -1497,7 +1497,7 @@ func (w *GCWorker) resolveLocksAcrossRegions(ctx context.Context, locks []*tikv. return errors.Trace(err) } if !ok { - err = bo.Backoff(tikv.BoTxnLock, errors.Errorf("remain locks: %d", len(locks))) + err = bo.Backoff(tikv.BoTxnLock(), errors.Errorf("remain locks: %d", len(locks))) if err != nil { return errors.Trace(err) } @@ -1525,7 +1525,7 @@ func (w *GCWorker) uploadSafePointToPD(ctx context.Context, safePoint uint64) er if errors.Cause(err) == context.Canceled { return errors.Trace(err) } - err = bo.Backoff(tikv.BoPDRPC, errors.Errorf("failed to upload safe point to PD, err: %v", err)) + err = bo.Backoff(tikv.BoPDRPC(), errors.Errorf("failed to upload safe point to PD, err: %v", err)) if err != nil { return errors.Trace(err) } @@ -1567,7 +1567,7 @@ func (w *GCWorker) doGCForRange(ctx context.Context, startKey []byte, endKey []b // we check regionErr here first, because we know 'regionErr' and 'err' should not return together, to keep it to // make the process correct. if regionErr != nil { - err = bo.Backoff(tikv.BoRegionMiss, errors.New(regionErr.String())) + err = bo.Backoff(tikv.BoRegionMiss(), errors.New(regionErr.String())) if err == nil { continue } diff --git a/store/tikv/backoff.go b/store/tikv/backoff.go index c622e21d2ee5d..918acc9addcb8 100644 --- a/store/tikv/backoff.go +++ b/store/tikv/backoff.go @@ -23,17 +23,8 @@ import ( // Backoffer is a utility for retrying queries. type Backoffer = retry.Backoffer -// BackoffType defines the backoff type. -type BackoffType = retry.BackoffType - -// Back off types. -const ( - BoRegionMiss = retry.BoRegionMiss - BoTiFlashRPC = retry.BoTiFlashRPC - BoTxnLockFast = retry.BoTxnLockFast - BoTxnLock = retry.BoTxnLock - BoPDRPC = retry.BoPDRPC -) +// BackoffConfig defines the backoff configuration. +type BackoffConfig = retry.Config // Maximum total sleep time(in ms) for kv/cop commands. const ( @@ -62,6 +53,31 @@ func TxnStartKey() interface{} { return retry.TxnStartKey } +// BoRegionMiss returns the default backoff config for RegionMiss. +func BoRegionMiss() *BackoffConfig { + return retry.BoRegionMiss +} + +// BoTiFlashRPC returns the default backoff config for TiFlashRPC. +func BoTiFlashRPC() *BackoffConfig { + return retry.BoTiFlashRPC +} + +// BoTxnLock returns the default backoff config for TxnLock. +func BoTxnLock() *BackoffConfig { + return retry.BoTxnLock +} + +// BoPDRPC returns the default backoff config for PDRPC. +func BoPDRPC() *BackoffConfig { + return retry.BoPDRPC +} + +// BoTiKVRPC returns the default backoff config for TiKVRPC. +func BoTiKVRPC() *BackoffConfig { + return retry.BoTiKVRPC +} + // NewGcResolveLockMaxBackoffer creates a Backoffer for Gc to resolve lock. func NewGcResolveLockMaxBackoffer(ctx context.Context) *Backoffer { return retry.NewBackofferWithVars(ctx, gcResolveLockMaxBackoff, nil) diff --git a/store/tikv/batch_request_sender.go b/store/tikv/batch_request_sender.go index 9aad070b70306..74a62dcfd781c 100644 --- a/store/tikv/batch_request_sender.go +++ b/store/tikv/batch_request_sender.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" tikverr "github.com/pingcap/tidb/store/tikv/error" + "github.com/pingcap/tidb/store/tikv/retry" "github.com/pingcap/tidb/store/tikv/tikvrpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -93,6 +94,6 @@ func (ss *RegionBatchRequestSender) onSendFailForBatchRegions(bo *Backoffer, ctx // When a store is not available, the leader of related region should be elected quickly. // TODO: the number of retry time should be limited:since region may be unavailable // when some unrecoverable disaster happened. - err = bo.Backoff(BoTiFlashRPC, errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos)) + err = bo.Backoff(retry.BoTiFlashRPC, errors.Errorf("send request error: %v, ctx: %v, regionInfos: %v", err, ctx, regionInfos)) return errors.Trace(err) } diff --git a/store/tikv/client_batch.go b/store/tikv/client_batch.go index e5ec039fc6911..70f1cf27ccacc 100644 --- a/store/tikv/client_batch.go +++ b/store/tikv/client_batch.go @@ -673,7 +673,7 @@ func (c *batchCommandsClient) recreateStreamingClient(err error, streamClient *b break } - err2 := b.BackoffTiKVRPC(err1) + err2 := b.Backoff(retry.BoTiKVRPC, err1) // As timeout is set to math.MaxUint32, err2 should always be nil. // This line is added to make the 'make errcheck' pass. terror.Log(err2) diff --git a/store/tikv/prewrite.go b/store/tikv/prewrite.go index ffb47e1fb46fa..49ddc1525b748 100644 --- a/store/tikv/prewrite.go +++ b/store/tikv/prewrite.go @@ -269,7 +269,7 @@ func (action actionPrewrite) handleSingleBatch(c *twoPhaseCommitter, bo *Backoff } atomic.AddInt64(&c.getDetail().ResolveLockTime, int64(time.Since(start))) if msBeforeExpired > 0 { - err = bo.BackoffWithMaxSleep(retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) + err = bo.BackoffWithCfgAndMaxSleep(retry.BoTxnLock, int(msBeforeExpired), errors.Errorf("2PC prewrite lockedKeys: %d", len(locks))) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go index f42f7add092db..36b297c580102 100644 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -575,7 +575,7 @@ func (s *RegionRequestSender) onSendFail(bo *Backoffer, ctx *RPCContext, err err if ctx.Store != nil && ctx.Store.storeType == tikvrpc.TiFlash { err = bo.Backoff(retry.BoTiFlashRPC, errors.Errorf("send tiflash request error: %v, ctx: %v, try next peer later", err, ctx)) } else { - err = bo.BackoffTiKVRPC(errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx)) + err = bo.Backoff(retry.BoTiKVRPC, errors.Errorf("send tikv request error: %v, ctx: %v, try next peer later", err, ctx)) } return errors.Trace(err) } diff --git a/store/tikv/retry/backoff.go b/store/tikv/retry/backoff.go index 9e7a527c69caa..d07b9c4fdccae 100644 --- a/store/tikv/retry/backoff.go +++ b/store/tikv/retry/backoff.go @@ -17,7 +17,6 @@ import ( "context" "fmt" "math" - "math/rand" "strings" "sync/atomic" "time" @@ -30,202 +29,24 @@ import ( "github.com/pingcap/tidb/store/tikv/logutil" "github.com/pingcap/tidb/store/tikv/metrics" "github.com/pingcap/tidb/store/tikv/util" - "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) -const ( - // NoJitter makes the backoff sequence strict exponential. - NoJitter = 1 + iota - // FullJitter applies random factors to strict exponential. - FullJitter - // EqualJitter is also randomized, but prevents very short sleeps. - EqualJitter - // DecorrJitter increases the maximum jitter based on the last random value. - DecorrJitter -) - -func (t BackoffType) metric() prometheus.Observer { - switch t { - // TODO: distinguish tikv and tiflash in metrics - case boTiKVRPC, BoTiFlashRPC: - return metrics.BackoffHistogramRPC - case BoTxnLock: - return metrics.BackoffHistogramLock - case BoTxnLockFast: - return metrics.BackoffHistogramLockFast - case BoPDRPC: - return metrics.BackoffHistogramPD - case BoRegionMiss: - return metrics.BackoffHistogramRegionMiss - case BoTiKVServerBusy, BoTiFlashServerBusy: - return metrics.BackoffHistogramServerBusy - case BoStaleCmd: - return metrics.BackoffHistogramStaleCmd - } - return metrics.BackoffHistogramEmpty -} - -// NewBackoffFn creates a backoff func which implements exponential backoff with -// optional jitters. -// See http://www.awsarchitectureblog.com/2015/03/backoff.html -func NewBackoffFn(base, cap, jitter int) func(ctx context.Context, maxSleepMs int) int { - if base < 2 { - // Top prevent panic in 'rand.Intn'. - base = 2 - } - attempts := 0 - lastSleep := base - return func(ctx context.Context, maxSleepMs int) int { - var sleep int - switch jitter { - case NoJitter: - sleep = expo(base, cap, attempts) - case FullJitter: - v := expo(base, cap, attempts) - sleep = rand.Intn(v) - case EqualJitter: - v := expo(base, cap, attempts) - sleep = v/2 + rand.Intn(v/2) - case DecorrJitter: - sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base)))) - } - logutil.BgLogger().Debug("backoff", - zap.Int("base", base), - zap.Int("sleep", sleep), - zap.Int("attempts", attempts)) - - realSleep := sleep - // when set maxSleepMs >= 0 in `tikv.BackoffWithMaxSleep` will force sleep maxSleepMs milliseconds. - if maxSleepMs >= 0 && realSleep > maxSleepMs { - realSleep = maxSleepMs - } - select { - case <-time.After(time.Duration(realSleep) * time.Millisecond): - attempts++ - lastSleep = sleep - return realSleep - case <-ctx.Done(): - return 0 - } - } -} - -func expo(base, cap, n int) int { - return int(math.Min(float64(cap), float64(base)*math.Pow(2.0, float64(n)))) -} - -// BackoffType defines the backoff type. -type BackoffType int - -// Back off types. -const ( - boTiKVRPC BackoffType = iota - BoTiFlashRPC - BoTxnLock - BoTxnLockFast - BoPDRPC - BoRegionMiss - BoTiKVServerBusy - BoTiFlashServerBusy - BoTxnNotFound - BoStaleCmd - BoMaxTsNotSynced -) - -func (t BackoffType) createFn(vars *kv.Variables) func(context.Context, int) int { - switch t { - case boTiKVRPC, BoTiFlashRPC: - return NewBackoffFn(100, 2000, EqualJitter) - case BoTxnLock: - return NewBackoffFn(200, 3000, EqualJitter) - case BoTxnLockFast: - return NewBackoffFn(vars.BackoffLockFast, 3000, EqualJitter) - case BoPDRPC: - return NewBackoffFn(500, 3000, EqualJitter) - case BoRegionMiss: - // change base time to 2ms, because it may recover soon. - return NewBackoffFn(2, 500, NoJitter) - case BoTxnNotFound: - return NewBackoffFn(2, 500, NoJitter) - case BoTiKVServerBusy, BoTiFlashServerBusy: - return NewBackoffFn(2000, 10000, EqualJitter) - case BoStaleCmd: - return NewBackoffFn(2, 1000, NoJitter) - case BoMaxTsNotSynced: - return NewBackoffFn(2, 500, NoJitter) - } - return nil -} - -func (t BackoffType) String() string { - switch t { - case boTiKVRPC: - return "tikvRPC" - case BoTiFlashRPC: - return "tiflashRPC" - case BoTxnLock: - return "txnLock" - case BoTxnLockFast: - return "txnLockFast" - case BoPDRPC: - return "pdRPC" - case BoRegionMiss: - return "regionMiss" - case BoTiKVServerBusy: - return "tikvServerBusy" - case BoTiFlashServerBusy: - return "tiflashServerBusy" - case BoStaleCmd: - return "staleCommand" - case BoTxnNotFound: - return "txnNotFound" - case BoMaxTsNotSynced: - return "maxTsNotSynced" - } - return "" -} - -// TError returns pingcap/error of the backoff type. -func (t BackoffType) TError() error { - switch t { - case boTiKVRPC: - return tikverr.ErrTiKVServerTimeout - case BoTiFlashRPC: - return tikverr.ErrTiFlashServerTimeout - case BoTxnLock, BoTxnLockFast, BoTxnNotFound: - return tikverr.ErrResolveLockTimeout - case BoPDRPC: - return tikverr.NewErrPDServerTimeout("") - case BoRegionMiss: - return tikverr.ErrRegionUnavailable - case BoTiKVServerBusy: - return tikverr.ErrTiKVServerBusy - case BoTiFlashServerBusy: - return tikverr.ErrTiFlashServerBusy - case BoStaleCmd: - return tikverr.ErrTiKVStaleCommand - case BoMaxTsNotSynced: - return tikverr.ErrTiKVMaxTimestampNotSynced - } - return tikverr.ErrUnknown -} - // Backoffer is a utility for retrying queries. type Backoffer struct { ctx context.Context - fn map[BackoffType]func(context.Context, int) int + fn map[string]backoffFn maxSleep int totalSleep int errors []error - types []fmt.Stringer + configs []fmt.Stringer vars *kv.Variables noop bool - backoffSleepMS map[BackoffType]int - backoffTimes map[BackoffType]int + backoffSleepMS map[string]int + backoffTimes map[string]int } type txnStartCtxKeyType struct{} @@ -265,26 +86,71 @@ func (b *Backoffer) withVars(vars *kv.Variables) *Backoffer { return b } -// Backoff sleeps a while base on the backoffType and records the error message. +// Backoff sleeps a while base on the Config and records the error message. // It returns a retryable error if total sleep time exceeds maxSleep. -func (b *Backoffer) Backoff(typ BackoffType, err error) error { +func (b *Backoffer) Backoff(cfg *Config, err error) error { if span := opentracing.SpanFromContext(b.ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan(fmt.Sprintf("tikv.backoff.%s", typ), opentracing.ChildOf(span.Context())) + span1 := span.Tracer().StartSpan(fmt.Sprintf("tikv.backoff.%s", cfg), opentracing.ChildOf(span.Context())) defer span1.Finish() opentracing.ContextWithSpan(b.ctx, span1) } - return b.BackoffWithMaxSleep(typ, -1, err) + return b.BackoffWithCfgAndMaxSleep(cfg, -1, err) } -// BackoffTiKVRPC sleeps a while base on the TiKVRPC and records the error message. -// It returns a retryable error if total sleep time exceeds maxSleep. -func (b *Backoffer) BackoffTiKVRPC(err error) error { - return b.Backoff(boTiKVRPC, err) +// BackoffWithMaxSleepTxnLockFast sleeps a while base on the MaxSleepTxnLock and records the error message +// and never sleep more than maxSleepMs for each sleep. +func (b *Backoffer) BackoffWithMaxSleepTxnLockFast(maxSleepMs int, err error) error { + cfg := BoTxnLockFast + return b.BackoffWithCfgAndMaxSleep(cfg, maxSleepMs, err) +} + +// BackoffWithMaxSleep is deprecated, please use BackoffWithCfgAndMaxSleep instead. TODO: remove it when br is ready. +func (b *Backoffer) BackoffWithMaxSleep(typ int, maxSleepMs int, err error) error { + // Back off types. + const ( + boTiKVRPC int = iota + boTiFlashRPC + boTxnLock + boTxnLockFast + boPDRPC + boRegionMiss + boTiKVServerBusy + boTiFlashServerBusy + boTxnNotFound + boStaleCmd + boMaxTsNotSynced + ) + switch typ { + case boTiKVRPC: + return b.BackoffWithCfgAndMaxSleep(BoTiKVRPC, maxSleepMs, err) + case boTiFlashRPC: + return b.BackoffWithCfgAndMaxSleep(BoTiFlashRPC, maxSleepMs, err) + case boTxnLock: + return b.BackoffWithCfgAndMaxSleep(BoTxnLock, maxSleepMs, err) + case boTxnLockFast: + return b.BackoffWithCfgAndMaxSleep(BoTxnLockFast, maxSleepMs, err) + case boPDRPC: + return b.BackoffWithCfgAndMaxSleep(BoPDRPC, maxSleepMs, err) + case boRegionMiss: + return b.BackoffWithCfgAndMaxSleep(BoRegionMiss, maxSleepMs, err) + case boTiKVServerBusy: + return b.BackoffWithCfgAndMaxSleep(BoTiKVServerBusy, maxSleepMs, err) + case boTiFlashServerBusy: + return b.BackoffWithCfgAndMaxSleep(BoTiFlashServerBusy, maxSleepMs, err) + case boTxnNotFound: + return b.BackoffWithCfgAndMaxSleep(BoTxnNotFound, maxSleepMs, err) + case boStaleCmd: + return b.BackoffWithCfgAndMaxSleep(BoStaleCmd, maxSleepMs, err) + case boMaxTsNotSynced: + return b.BackoffWithCfgAndMaxSleep(BoMaxTsNotSynced, maxSleepMs, err) + } + cfg := NewConfig("", metrics.BackoffHistogramEmpty, nil, tikverr.ErrUnknown) + return b.BackoffWithCfgAndMaxSleep(cfg, maxSleepMs, err) } -// BackoffWithMaxSleep sleeps a while base on the backoffType and records the error message +// BackoffWithCfgAndMaxSleep sleeps a while base on the Config and records the error message // and never sleep more than maxSleepMs for each sleep. -func (b *Backoffer) BackoffWithMaxSleep(typ BackoffType, maxSleepMs int, err error) error { +func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err error) error { if strings.Contains(err.Error(), tikverr.MismatchClusterID) { logutil.BgLogger().Fatal("critical error", zap.Error(err)) } @@ -295,9 +161,9 @@ func (b *Backoffer) BackoffWithMaxSleep(typ BackoffType, maxSleepMs int, err err } b.errors = append(b.errors, errors.Errorf("%s at %s", err.Error(), time.Now().Format(time.RFC3339Nano))) - b.types = append(b.types, typ) + b.configs = append(b.configs, cfg) if b.noop || (b.maxSleep > 0 && b.totalSleep >= b.maxSleep) { - errMsg := fmt.Sprintf("%s backoffer.maxSleep %dms is exceeded, errors:", typ.String(), b.maxSleep) + errMsg := fmt.Sprintf("%s backoffer.maxSleep %dms is exceeded, errors:", cfg.String(), b.maxSleep) for i, err := range b.errors { // Print only last 3 errors for non-DEBUG log levels. if log.GetLevel() == zapcore.DebugLevel || i >= len(b.errors)-3 { @@ -306,30 +172,29 @@ func (b *Backoffer) BackoffWithMaxSleep(typ BackoffType, maxSleepMs int, err err } logutil.BgLogger().Warn(errMsg) // Use the first backoff type to generate a MySQL error. - return b.types[0].(BackoffType).TError() + return b.configs[0].(*Config).err } // Lazy initialize. if b.fn == nil { - b.fn = make(map[BackoffType]func(context.Context, int) int) + b.fn = make(map[string]backoffFn) } - f, ok := b.fn[typ] + f, ok := b.fn[cfg.name] if !ok { - f = typ.createFn(b.vars) - b.fn[typ] = f + f = cfg.createBackoffFn(b.vars) + b.fn[cfg.name] = f } - realSleep := f(b.ctx, maxSleepMs) - typ.metric().Observe(float64(realSleep) / 1000) + cfg.metric.Observe(float64(realSleep) / 1000) b.totalSleep += realSleep if b.backoffSleepMS == nil { - b.backoffSleepMS = make(map[BackoffType]int) + b.backoffSleepMS = make(map[string]int) } - b.backoffSleepMS[typ] += realSleep + b.backoffSleepMS[cfg.name] += realSleep if b.backoffTimes == nil { - b.backoffTimes = make(map[BackoffType]int) + b.backoffTimes = make(map[string]int) } - b.backoffTimes[typ]++ + b.backoffTimes[cfg.name]++ stmtExec := b.ctx.Value(util.ExecDetailsKey) if stmtExec != nil { @@ -352,7 +217,7 @@ func (b *Backoffer) BackoffWithMaxSleep(typ BackoffType, maxSleepMs int, err err zap.Error(err), zap.Int("totalSleep", b.totalSleep), zap.Int("maxSleep", b.maxSleep), - zap.Stringer("type", typ), + zap.Stringer("type", cfg), zap.Reflect("txnStartTS", startTs)) return nil } @@ -361,7 +226,7 @@ func (b *Backoffer) String() string { if b.totalSleep == 0 { return "" } - return fmt.Sprintf(" backoff(%dms %v)", b.totalSleep, b.types) + return fmt.Sprintf(" backoff(%dms %v)", b.totalSleep, b.configs) } // Clone creates a new Backoffer which keeps current Backoffer's sleep time and errors, and shares @@ -401,7 +266,7 @@ func (b *Backoffer) GetTotalSleep() int { // GetTypes returns type list. func (b *Backoffer) GetTypes() []fmt.Stringer { - return b.types + return b.configs } // GetCtx returns the binded context. @@ -415,12 +280,12 @@ func (b *Backoffer) SetCtx(ctx context.Context) { } // GetBackoffTimes returns a map contains backoff time count by type. -func (b *Backoffer) GetBackoffTimes() map[BackoffType]int { +func (b *Backoffer) GetBackoffTimes() map[string]int { return b.backoffTimes } // GetBackoffSleepMS returns a map contains backoff sleep time by type. -func (b *Backoffer) GetBackoffSleepMS() map[BackoffType]int { +func (b *Backoffer) GetBackoffSleepMS() map[string]int { return b.backoffSleepMS } diff --git a/store/tikv/retry/backoff_test.go b/store/tikv/retry/backoff_test.go index f8dfb9ed120f3..a0a566499b10f 100644 --- a/store/tikv/retry/backoff_test.go +++ b/store/tikv/retry/backoff_test.go @@ -27,7 +27,7 @@ var _ = Suite(&testBackoffSuite{}) func (s *testBackoffSuite) TestBackoffWithMax(c *C) { b := NewBackofferWithVars(context.TODO(), 2000, nil) - err := b.BackoffWithMaxSleep(BoTxnLockFast, 30, errors.New("test")) + err := b.BackoffWithMaxSleepTxnLockFast(30, errors.New("test")) c.Assert(err, IsNil) c.Assert(b.totalSleep, Equals, 30) } diff --git a/store/tikv/retry/config.go b/store/tikv/retry/config.go new file mode 100644 index 0000000000000..bd118cabd8028 --- /dev/null +++ b/store/tikv/retry/config.go @@ -0,0 +1,159 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package retry + +import ( + "context" + "math" + "math/rand" + "strings" + "time" + + tikverr "github.com/pingcap/tidb/store/tikv/error" + "github.com/pingcap/tidb/store/tikv/kv" + "github.com/pingcap/tidb/store/tikv/logutil" + "github.com/pingcap/tidb/store/tikv/metrics" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +// Config is the configuration of the Backoff function. +type Config struct { + name string + metric prometheus.Observer + fnCfg *BackoffFnCfg + err error +} + +// backoffFn is the backoff function which compute the sleep time and do sleep. +type backoffFn func(ctx context.Context, maxSleepMs int) int + +func (c *Config) createBackoffFn(vars *kv.Variables) backoffFn { + if strings.EqualFold(c.name, txnLockFastName) { + return newBackoffFn(vars.BackoffLockFast, c.fnCfg.cap, c.fnCfg.jitter) + } + return newBackoffFn(c.fnCfg.base, c.fnCfg.cap, c.fnCfg.jitter) +} + +// BackoffFnCfg is the configuration for the backoff func which implements exponential backoff with +// optional jitters. +// See http://www.awsarchitectureblog.com/2015/03/backoff.html +type BackoffFnCfg struct { + base int + cap int + jitter int +} + +// NewBackoffFnCfg creates the config for BackoffFn. +func NewBackoffFnCfg(base, cap, jitter int) *BackoffFnCfg { + return &BackoffFnCfg{ + base, + cap, + jitter, + } +} + +// NewConfig creates a new Config for the Backoff operation. +func NewConfig(name string, metric prometheus.Observer, backoffFnCfg *BackoffFnCfg, err error) *Config { + return &Config{ + name: name, + metric: metric, + fnCfg: backoffFnCfg, + err: err, + } +} + +func (c *Config) String() string { + return c.name +} + +const txnLockFastName = "txnLockFast" + +// Backoff Config samples. +var ( + // TODO: distinguish tikv and tiflash in metrics + BoTiKVRPC = NewConfig("tikvRPC", metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiKVServerTimeout) + BoTiFlashRPC = NewConfig("tiflashRPC", metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout) + BoTxnLock = NewConfig("txnLock", metrics.BackoffHistogramLock, NewBackoffFnCfg(200, 3000, EqualJitter), tikverr.ErrResolveLockTimeout) + BoPDRPC = NewConfig("pdRPC", metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout("")) + // change base time to 2ms, because it may recover soon. + BoRegionMiss = NewConfig("regionMiss", metrics.BackoffHistogramRegionMiss, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable) + BoTiKVServerBusy = NewConfig("tikvServerBusy", metrics.BackoffHistogramServerBusy, NewBackoffFnCfg(2000, 10000, EqualJitter), tikverr.ErrTiKVServerBusy) + BoTiFlashServerBusy = NewConfig("tiflashServerBusy", metrics.BackoffHistogramServerBusy, NewBackoffFnCfg(2000, 10000, EqualJitter), tikverr.ErrTiFlashServerBusy) + BoTxnNotFound = NewConfig("txnNotFound", metrics.BackoffHistogramEmpty, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrResolveLockTimeout) + BoStaleCmd = NewConfig("staleCommand", metrics.BackoffHistogramStaleCmd, NewBackoffFnCfg(2, 1000, NoJitter), tikverr.ErrTiKVStaleCommand) + BoMaxTsNotSynced = NewConfig("maxTsNotSynced", metrics.BackoffHistogramEmpty, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrTiKVMaxTimestampNotSynced) + // TxnLockFast's `base` load from vars.BackoffLockFast when create BackoffFn. + BoTxnLockFast = NewConfig(txnLockFastName, metrics.BackoffHistogramLockFast, NewBackoffFnCfg(2, 3000, EqualJitter), tikverr.ErrResolveLockTimeout) +) + +const ( + // NoJitter makes the backoff sequence strict exponential. + NoJitter = 1 + iota + // FullJitter applies random factors to strict exponential. + FullJitter + // EqualJitter is also randomized, but prevents very short sleeps. + EqualJitter + // DecorrJitter increases the maximum jitter based on the last random value. + DecorrJitter +) + +// newBackoffFn creates a backoff func which implements exponential backoff with +// optional jitters. +// See http://www.awsarchitectureblog.com/2015/03/backoff.html +func newBackoffFn(base, cap, jitter int) backoffFn { + if base < 2 { + // Top prevent panic in 'rand.Intn'. + base = 2 + } + attempts := 0 + lastSleep := base + return func(ctx context.Context, maxSleepMs int) int { + var sleep int + switch jitter { + case NoJitter: + sleep = expo(base, cap, attempts) + case FullJitter: + v := expo(base, cap, attempts) + sleep = rand.Intn(v) + case EqualJitter: + v := expo(base, cap, attempts) + sleep = v/2 + rand.Intn(v/2) + case DecorrJitter: + sleep = int(math.Min(float64(cap), float64(base+rand.Intn(lastSleep*3-base)))) + } + logutil.BgLogger().Debug("backoff", + zap.Int("base", base), + zap.Int("sleep", sleep), + zap.Int("attempts", attempts)) + + realSleep := sleep + // when set maxSleepMs >= 0 in `tikv.BackoffWithMaxSleep` will force sleep maxSleepMs milliseconds. + if maxSleepMs >= 0 && realSleep > maxSleepMs { + realSleep = maxSleepMs + } + select { + case <-time.After(time.Duration(realSleep) * time.Millisecond): + attempts++ + lastSleep = sleep + return realSleep + case <-ctx.Done(): + return 0 + } + } +} + +func expo(base, cap, n int) int { + return int(math.Min(float64(cap), float64(base)*math.Pow(2.0, float64(n)))) +} diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 035291a783aec..94ece80ff067f 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -254,7 +254,7 @@ func (s *Scanner) getData(bo *Backoffer) error { return errors.Trace(err) } if msBeforeExpired > 0 { - err = bo.BackoffWithMaxSleep(retry.BoTxnLockFast, int(msBeforeExpired), errors.Errorf("key is locked during scanning")) + err = bo.BackoffWithMaxSleepTxnLockFast(int(msBeforeExpired), errors.Errorf("key is locked during scanning")) if err != nil { return errors.Trace(err) } diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index ab3862fe4bf0b..180ac59369aca 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -381,7 +381,7 @@ func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collec return errors.Trace(err) } if msBeforeExpired > 0 { - err = bo.BackoffWithMaxSleep(retry.BoTxnLockFast, int(msBeforeExpired), errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys))) + err = bo.BackoffWithMaxSleepTxnLockFast(int(msBeforeExpired), errors.Errorf("batchGet lockedKeys: %d", len(lockedKeys))) if err != nil { return errors.Trace(err) } @@ -527,7 +527,7 @@ func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k []byte) ([]byte, return nil, errors.Trace(err) } if msBeforeExpired > 0 { - err = bo.BackoffWithMaxSleep(retry.BoTxnLockFast, int(msBeforeExpired), errors.New(keyErr.String())) + err = bo.BackoffWithMaxSleepTxnLockFast(int(msBeforeExpired), errors.New(keyErr.String())) if err != nil { return nil, errors.Trace(err) } @@ -730,8 +730,8 @@ func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRunti // SnapshotRuntimeStats records the runtime stats of snapshot. type SnapshotRuntimeStats struct { rpcStats RegionRequestRuntimeStats - backoffSleepMS map[retry.BackoffType]int - backoffTimes map[retry.BackoffType]int + backoffSleepMS map[string]int + backoffTimes map[string]int scanDetail *util.ScanDetail timeDetail *util.TimeDetail } @@ -745,8 +745,8 @@ func (rs *SnapshotRuntimeStats) Clone() *SnapshotRuntimeStats { } } if len(rs.backoffSleepMS) > 0 { - newRs.backoffSleepMS = make(map[retry.BackoffType]int) - newRs.backoffTimes = make(map[retry.BackoffType]int) + newRs.backoffSleepMS = make(map[string]int) + newRs.backoffTimes = make(map[string]int) for k, v := range rs.backoffSleepMS { newRs.backoffSleepMS[k] += v } @@ -767,10 +767,10 @@ func (rs *SnapshotRuntimeStats) Merge(other *SnapshotRuntimeStats) { } if len(other.backoffSleepMS) > 0 { if rs.backoffSleepMS == nil { - rs.backoffSleepMS = make(map[retry.BackoffType]int) + rs.backoffSleepMS = make(map[string]int) } if rs.backoffTimes == nil { - rs.backoffTimes = make(map[retry.BackoffType]int) + rs.backoffTimes = make(map[string]int) } for k, v := range other.backoffSleepMS { rs.backoffSleepMS[k] += v @@ -791,7 +791,7 @@ func (rs *SnapshotRuntimeStats) String() string { } ms := rs.backoffSleepMS[k] d := time.Duration(ms) * time.Millisecond - buf.WriteString(fmt.Sprintf("%s_backoff:{num:%d, total_time:%s}", k.String(), v, util.FormatDuration(d))) + buf.WriteString(fmt.Sprintf("%s_backoff:{num:%d, total_time:%s}", k, v, util.FormatDuration(d))) } timeDetail := rs.timeDetail.String() if timeDetail != "" { diff --git a/store/tikv/tests/snapshot_test.go b/store/tikv/tests/snapshot_test.go index a126decfc1c7d..ee2a71730b5bf 100644 --- a/store/tikv/tests/snapshot_test.go +++ b/store/tikv/tests/snapshot_test.go @@ -273,7 +273,7 @@ func (s *testSnapshotSuite) TestSnapshotRuntimeStats(c *C) { snapshot.MergeRegionRequestStats(reqStats.Stats) snapshot.MergeRegionRequestStats(reqStats.Stats) bo := tikv.NewBackofferWithVars(context.Background(), 2000, nil) - err := bo.BackoffWithMaxSleep(tikv.BoTxnLockFast, 30, errors.New("test")) + err := bo.BackoffWithMaxSleepTxnLockFast(30, errors.New("test")) c.Assert(err, IsNil) snapshot.RecordBackoffInfo(bo) snapshot.RecordBackoffInfo(bo) diff --git a/util/stmtsummary/statement_summary_test.go b/util/stmtsummary/statement_summary_test.go index 5971e83e7980f..66e9af8969edd 100644 --- a/util/stmtsummary/statement_summary_test.go +++ b/util/stmtsummary/statement_summary_test.go @@ -192,7 +192,7 @@ func (s *testStmtSummarySuite) TestAddStatement(c *C) { sync.Mutex BackoffTypes []fmt.Stringer }{ - BackoffTypes: []fmt.Stringer{tikv.BoTxnLock}, + BackoffTypes: []fmt.Stringer{tikv.BoTxnLock()}, }, ResolveLockTime: 10000, WriteKeys: 100000, @@ -268,7 +268,7 @@ func (s *testStmtSummarySuite) TestAddStatement(c *C) { expectedSummaryElement.sumTxnRetry += int64(stmtExecInfo2.ExecDetail.CommitDetail.TxnRetry) expectedSummaryElement.maxTxnRetry = stmtExecInfo2.ExecDetail.CommitDetail.TxnRetry expectedSummaryElement.sumBackoffTimes += 1 - expectedSummaryElement.backoffTypes[tikv.BoTxnLock] = 1 + expectedSummaryElement.backoffTypes[tikv.BoTxnLock()] = 1 expectedSummaryElement.sumMem += stmtExecInfo2.MemMax expectedSummaryElement.maxMem = stmtExecInfo2.MemMax expectedSummaryElement.sumDisk += stmtExecInfo2.DiskMax @@ -319,7 +319,7 @@ func (s *testStmtSummarySuite) TestAddStatement(c *C) { sync.Mutex BackoffTypes []fmt.Stringer }{ - BackoffTypes: []fmt.Stringer{tikv.BoTxnLock}, + BackoffTypes: []fmt.Stringer{tikv.BoTxnLock()}, }, ResolveLockTime: 1000, WriteKeys: 10000, @@ -374,7 +374,7 @@ func (s *testStmtSummarySuite) TestAddStatement(c *C) { expectedSummaryElement.sumPrewriteRegionNum += int64(stmtExecInfo3.ExecDetail.CommitDetail.PrewriteRegionNum) expectedSummaryElement.sumTxnRetry += int64(stmtExecInfo3.ExecDetail.CommitDetail.TxnRetry) expectedSummaryElement.sumBackoffTimes += 1 - expectedSummaryElement.backoffTypes[tikv.BoTxnLock] = 2 + expectedSummaryElement.backoffTypes[tikv.BoTxnLock()] = 2 expectedSummaryElement.sumMem += stmtExecInfo3.MemMax expectedSummaryElement.sumDisk += stmtExecInfo3.DiskMax expectedSummaryElement.sumAffectedRows += stmtExecInfo3.StmtCtx.AffectedRows() @@ -575,7 +575,7 @@ func generateAnyExecInfo() *StmtExecInfo { sync.Mutex BackoffTypes []fmt.Stringer }{ - BackoffTypes: []fmt.Stringer{tikv.BoTxnLock}, + BackoffTypes: []fmt.Stringer{tikv.BoTxnLock()}, }, ResolveLockTime: 2000, WriteKeys: 20000, @@ -960,10 +960,10 @@ func (s *testStmtSummarySuite) TestFormatBackoffTypes(c *C) { backoffMap := make(map[fmt.Stringer]int) c.Assert(formatBackoffTypes(backoffMap), IsNil) - backoffMap[tikv.BoPDRPC] = 1 + backoffMap[tikv.BoPDRPC()] = 1 c.Assert(formatBackoffTypes(backoffMap), Equals, "pdRPC:1") - backoffMap[tikv.BoTxnLock] = 2 + backoffMap[tikv.BoTxnLock()] = 2 c.Assert(formatBackoffTypes(backoffMap), Equals, "txnLock:2,pdRPC:1") }