diff --git a/config/retry/backoff.go b/config/retry/backoff.go index c18577ad0c..a874bb71ae 100644 --- a/config/retry/backoff.go +++ b/config/retry/backoff.go @@ -233,9 +233,22 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e zap.Int("maxSleep", b.maxSleep), zap.Stringer("type", cfg), zap.Reflect("txnStartTS", startTs)) + + // fail fast if we don't have enough retry tokens + if cfg.retryRateLimiter != nil && !cfg.retryRateLimiter.takeRetryToken() { + logutil.Logger(b.ctx).Warn(fmt.Sprintf("Retry limit for %s is exhausted", cfg.name)) + return errors.WithStack(err) + } + return nil } +func (b *Backoffer) OnSuccess(cfg *Config) { + if cfg.retryRateLimiter != nil { + cfg.retryRateLimiter.addRetryToken() + } +} + func (b *Backoffer) String() string { if b.totalSleep == 0 { return "" diff --git a/config/retry/backoff_test.go b/config/retry/backoff_test.go index 63af9d4ceb..d5bef27740 100644 --- a/config/retry/backoff_test.go +++ b/config/retry/backoff_test.go @@ -83,6 +83,24 @@ func TestBackoffErrorType(t *testing.T) { assert.Fail(t, "should not be here") } +func TestRetryLimit(t *testing.T) { + globalConfig := NewConfigWithRetryLimit("TestConfig", nil, NewBackoffFnCfg(1, 1000, NoJitter), NewRetryRateLimiter(1, 1), errors.New("test error")) + b := NewBackofferWithVars(context.TODO(), 100, nil) + // test we start with retry limit at cap (1 in this test) + assert.Nil(t, b.Backoff(globalConfig, errors.New("test error"))) + // test retry limit hit + assert.NotNil(t, b.Backoff(globalConfig, errors.New("test error"))) + // test the limit is global across difference Backoff instances + b2 := NewBackofferWithVars(context.TODO(), 100, nil) + assert.NotNil(t, b2.Backoff(globalConfig, errors.New("test error"))) + // test the limit is repopulated with the cap by populating 2 tokens + b.OnSuccess(globalConfig) + b.OnSuccess(globalConfig) + // test we have only one token due to cap + assert.Nil(t, b2.Backoff(globalConfig, errors.New("test error"))) + assert.NotNil(t, b2.Backoff(globalConfig, errors.New("test error"))) +} + func TestBackoffDeepCopy(t *testing.T) { var err error b := NewBackofferWithVars(context.TODO(), 4, nil) diff --git a/config/retry/config.go b/config/retry/config.go index 19ac34b688..01331bb4b2 100644 --- a/config/retry/config.go +++ b/config/retry/config.go @@ -39,6 +39,7 @@ import ( "math" "math/rand" "strings" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -52,10 +53,11 @@ import ( // Config is the configuration of the Backoff function. type Config struct { - name string - metric *prometheus.Observer - fnCfg *BackoffFnCfg - err error + name string + metric *prometheus.Observer + fnCfg *BackoffFnCfg + retryRateLimiter *RetryRateLimiter + err error } // backoffFn is the backoff function which compute the sleep time and do sleep. @@ -96,6 +98,50 @@ func NewConfig(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFn } } +type RetryRateLimiter struct { + tokenCount int32 + allowedRetryToSuccessRatio float32 + cap int32 +} + +func NewRetryRateLimiter(cap int32, ratio float32) *RetryRateLimiter { + return &RetryRateLimiter{ + cap, // always init with full bucket to allow retries on startup + ratio, + cap, + } +} + +// add a token to the rate limiter bucket according to configured retry to success ratio and cap +func (r *RetryRateLimiter) addRetryToken() { + if rand.Float32() < r.allowedRetryToSuccessRatio { + if atomic.LoadInt32(&r.tokenCount) < r.cap { + // it is ok to add more than the cap, because the cap is the soft limit + atomic.AddInt32(&r.tokenCount, 1) + } + } +} + +// return true if there is a token to retry, false otherwise +func (r *RetryRateLimiter) takeRetryToken() bool { + if atomic.LoadInt32(&r.tokenCount) > 0 { + // it is ok to go below 0, because consumed token will still match added one at the end + atomic.AddInt32(&r.tokenCount, -1) + return true + } + return false +} + +func NewConfigWithRetryLimit(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFnCfg, retryRateLimiter *RetryRateLimiter, err error) *Config { + return &Config{ + name: name, + metric: metric, + fnCfg: backoffFnCfg, + retryRateLimiter: retryRateLimiter, + err: err, + } +} + // Base returns the base time of the backoff function. func (c *Config) Base() int { return c.fnCfg.base @@ -119,10 +165,11 @@ const txnLockFastName = "txnLockFast" // Backoff Config variables. var ( // TODO: distinguish tikv and tiflash in metrics - BoTiKVRPC = NewConfig("tikvRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiKVServerTimeout) - BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout) - BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout) - BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout("")) + BoTiKVRPC = NewConfig("tikvRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiKVServerTimeout) + BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout) + BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout) + BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout("")) + BoPDRegionMetadata = NewConfigWithRetryLimit("pdRegionMetadata", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), NewRetryRateLimiter(10, 0.1), tikverr.NewErrPDServerTimeout("")) // change base time to 2ms, because it may recover soon. BoRegionMiss = NewConfig("regionMiss", &metrics.BackoffHistogramRegionMiss, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable) BoRegionScheduling = NewConfig("regionScheduling", &metrics.BackoffHistogramRegionScheduling, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable)