diff --git a/config/retry/backoff.go b/config/retry/backoff.go index c18577ad0..a874bb71a 100644 --- a/config/retry/backoff.go +++ b/config/retry/backoff.go @@ -233,9 +233,22 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e zap.Int("maxSleep", b.maxSleep), zap.Stringer("type", cfg), zap.Reflect("txnStartTS", startTs)) + + // fail fast if we don't have enough retry tokens + if cfg.retryRateLimiter != nil && !cfg.retryRateLimiter.takeRetryToken() { + logutil.Logger(b.ctx).Warn(fmt.Sprintf("Retry limit for %s is exhausted", cfg.name)) + return errors.WithStack(err) + } + return nil } +func (b *Backoffer) OnSuccess(cfg *Config) { + if cfg.retryRateLimiter != nil { + cfg.retryRateLimiter.addRetryToken() + } +} + func (b *Backoffer) String() string { if b.totalSleep == 0 { return "" diff --git a/config/retry/backoff_test.go b/config/retry/backoff_test.go index 63af9d4ce..d5bef2774 100644 --- a/config/retry/backoff_test.go +++ b/config/retry/backoff_test.go @@ -83,6 +83,24 @@ func TestBackoffErrorType(t *testing.T) { assert.Fail(t, "should not be here") } +func TestRetryLimit(t *testing.T) { + globalConfig := NewConfigWithRetryLimit("TestConfig", nil, NewBackoffFnCfg(1, 1000, NoJitter), NewRetryRateLimiter(1, 1), errors.New("test error")) + b := NewBackofferWithVars(context.TODO(), 100, nil) + // test we start with retry limit at cap (1 in this test) + assert.Nil(t, b.Backoff(globalConfig, errors.New("test error"))) + // test retry limit hit + assert.NotNil(t, b.Backoff(globalConfig, errors.New("test error"))) + // test the limit is global across difference Backoff instances + b2 := NewBackofferWithVars(context.TODO(), 100, nil) + assert.NotNil(t, b2.Backoff(globalConfig, errors.New("test error"))) + // test the limit is repopulated with the cap by populating 2 tokens + b.OnSuccess(globalConfig) + b.OnSuccess(globalConfig) + // test we have only one token due to cap + assert.Nil(t, b2.Backoff(globalConfig, errors.New("test error"))) + assert.NotNil(t, b2.Backoff(globalConfig, errors.New("test error"))) +} + func TestBackoffDeepCopy(t *testing.T) { var err error b := NewBackofferWithVars(context.TODO(), 4, nil) diff --git a/config/retry/config.go b/config/retry/config.go index 19ac34b68..7431ad8f9 100644 --- a/config/retry/config.go +++ b/config/retry/config.go @@ -39,6 +39,7 @@ import ( "math" "math/rand" "strings" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -52,10 +53,11 @@ import ( // Config is the configuration of the Backoff function. type Config struct { - name string - metric *prometheus.Observer - fnCfg *BackoffFnCfg - err error + name string + metric *prometheus.Observer + fnCfg *BackoffFnCfg + retryRateLimiter *RetryRateLimiter + err error } // backoffFn is the backoff function which compute the sleep time and do sleep. @@ -96,6 +98,58 @@ func NewConfig(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFn } } +// RetryRateLimiter is used to limit the number of retries +type RetryRateLimiter struct { + // tokenCount represents number of available tokens for retry + tokenCount int32 + // successForRetryCount represents how many success requests are need to allow a single retry + successForRetryCount int + // cap limits the number of retry tokens which can be accumulated over time + cap int32 +} + +// NewRetryRateLimiter creates a new RetryRateLimiter +// cap: the maximum number of retry tokens can be accumulated over time. Start with full bucket. +// successForRetryCount: how many success requests are needed to allow a single retry. E.g. if you want to allow a single retry per 10 calls, set it to 10. +func NewRetryRateLimiter(cap int32, successForRetryCount int) *RetryRateLimiter { + return &RetryRateLimiter{ + cap, + successForRetryCount, + cap, + } +} + +// addRetryToken adds a token to the rate limiter bucket according to configured retry to success ratio and the cap +func (r *RetryRateLimiter) addRetryToken() { + if rand.Intn(r.successForRetryCount) == 0 { + if atomic.LoadInt32(&r.tokenCount) < r.cap { + // it is ok to add more than the cap, because the cap is the soft limit + atomic.AddInt32(&r.tokenCount, 1) + } + } +} + +// takeRetryToken returns true if there is a token to retry, false otherwise +func (r *RetryRateLimiter) takeRetryToken() bool { + if atomic.LoadInt32(&r.tokenCount) > 0 { + // it is ok to go below 0, because consumed token will still match added one at the end + atomic.AddInt32(&r.tokenCount, -1) + return true + } + return false +} + +// NewConfigWithRetryLimit creates a new Config for the Backoff operation with a retry limit. +func NewConfigWithRetryLimit(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFnCfg, retryRateLimiter *RetryRateLimiter, err error) *Config { + return &Config{ + name: name, + metric: metric, + fnCfg: backoffFnCfg, + retryRateLimiter: retryRateLimiter, + err: err, + } +} + // Base returns the base time of the backoff function. func (c *Config) Base() int { return c.fnCfg.base @@ -119,10 +173,11 @@ const txnLockFastName = "txnLockFast" // Backoff Config variables. var ( // TODO: distinguish tikv and tiflash in metrics - BoTiKVRPC = NewConfig("tikvRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiKVServerTimeout) - BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout) - BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout) - BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout("")) + BoTiKVRPC = NewConfig("tikvRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiKVServerTimeout) + BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout) + BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout) + BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout("")) + BoPDRegionMetadata = NewConfigWithRetryLimit("pdRegionMetadata", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), NewRetryRateLimiter(100, 1), tikverr.NewErrPDServerTimeout("")) // change base time to 2ms, because it may recover soon. BoRegionMiss = NewConfig("regionMiss", &metrics.BackoffHistogramRegionMiss, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable) BoRegionScheduling = NewConfig("regionScheduling", &metrics.BackoffHistogramRegionScheduling, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 01dfa385c..cc9d54e24 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -2059,7 +2059,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, opts = append(opts, pd.WithBuckets()) for { if backoffErr != nil { - err := bo.Backoff(retry.BoPDRPC, backoffErr) + err := bo.Backoff(retry.BoPDRegionMetadata, backoffErr) if err != nil { return nil, errors.WithStack(err) } @@ -2077,6 +2077,10 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool, metrics.RegionCacheCounterWithGetCacheMissError.Inc() } else { metrics.RegionCacheCounterWithGetCacheMissOK.Inc() + if backoffErr == nil { + // refill retry allowance only for the original call + bo.OnSuccess(retry.BoPDRegionMetadata) + } } if err != nil { if apicodec.IsDecodeError(err) { @@ -2112,7 +2116,7 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg var backoffErr error for { if backoffErr != nil { - err := bo.Backoff(retry.BoPDRPC, backoffErr) + err := bo.Backoff(retry.BoPDRegionMetadata, backoffErr) if err != nil { return nil, errors.WithStack(err) } @@ -2124,6 +2128,10 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg metrics.RegionCacheCounterWithGetRegionByIDError.Inc() } else { metrics.RegionCacheCounterWithGetRegionByIDOK.Inc() + if backoffErr == nil { + // refill retry allowance only for the original call + bo.OnSuccess(retry.BoPDRegionMetadata) + } } if err != nil { if apicodec.IsDecodeError(err) {