Skip to content

Commit

Permalink
change ratio to int, document and more tests
Browse files Browse the repository at this point in the history
Signed-off-by: artem_danilov <artem_danilov@airbnb.com>
  • Loading branch information
artem_danilov committed Oct 24, 2024
1 parent f780e41 commit 2de173c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 9 deletions.
27 changes: 27 additions & 0 deletions config/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,33 @@ func TestRetryLimit(t *testing.T) {
assert.NotNil(t, b2.Backoff(globalConfig, errors.New("test error")))
}

func TestBoPDRegionMetadataRetryLimit(t *testing.T) {
BoPDRegionMetadataTestConfig := NewConfigWithRetryLimit("TestConfig", nil, NewBackoffFnCfg(1, 1000, NoJitter), BoPDRegionMetadata.retryRateLimiter, errors.New("test error"))
assertRetryLimit(t, BoPDRegionMetadataTestConfig, 10)

// the token refill is probabilistic, so to make sure we hit hte limit we call 10x times more than required on average
b := NewBackofferWithVars(context.TODO(), 100, nil)
for i := 0; i < 1000; i++ {
b.OnSuccess(BoPDRegionMetadataTestConfig)
}

//verify the token bucket is refilled
assertRetryLimit(t, BoPDRegionMetadataTestConfig, 10)
}

func assertRetryLimit(t *testing.T, BoPDRegionMetadataTestConfig *Config, retryLimit int) {
// test we start with retry limit cap of 10
for i := 0; i < retryLimit; i++ {
// create a new Backoff instance to reset exponential backoff to reduce test duration
b := NewBackofferWithVars(context.TODO(), 100, nil)
assert.Nil(t, b.Backoff(BoPDRegionMetadataTestConfig, errors.New("test error")))
}
// create a new Backoff instance to reset exponential backoff to reduce test duration
b := NewBackofferWithVars(context.TODO(), 100, nil)
// test retry limit hit
assert.NotNil(t, b.Backoff(BoPDRegionMetadataTestConfig, errors.New("test error")))
}

func TestBackoffDeepCopy(t *testing.T) {
var err error
b := NewBackofferWithVars(context.TODO(), 4, nil)
Expand Down
22 changes: 13 additions & 9 deletions config/retry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,31 +102,34 @@ func NewConfig(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFn
type RetryRateLimiter struct {
// tokenCount represents number of available tokens for retry
tokenCount int32
// allowedRetryToSuccessRatio is a ratio representing number of retries allowed for each success
allowedRetryToSuccessRatio float32
// successForRetryCount represents how many success requests are need to allow a single retry
successForRetryCount int
// cap limits the number of retry tokens which can be accumulated over time
cap int32
}

func NewRetryRateLimiter(cap int32, ratio float32) *RetryRateLimiter {
// NewRetryRateLimiter creates a new RetryRateLimiter
// cap: the maximum number of retry tokens can be accumulated over time. Start with full bucket.
// successForRetryCount: how many success requests are needed to allow a single retry. E.g. if you want to allow a single retry per 10 calls, set it to 10.
func NewRetryRateLimiter(cap int32, successForRetryCount int) *RetryRateLimiter {
return &RetryRateLimiter{
cap, // always init with full bucket to allow retries on startup
ratio,
cap,
successForRetryCount,
cap,
}
}

// add a token to the rate limiter bucket according to configured retry to success ratio and cap
// addRetryToken adds a token to the rate limiter bucket according to configured retry to success ratio and the cap
func (r *RetryRateLimiter) addRetryToken() {
if rand.Float32() < r.allowedRetryToSuccessRatio {
if rand.Intn(r.successForRetryCount) == 0 {
if atomic.LoadInt32(&r.tokenCount) < r.cap {
// it is ok to add more than the cap, because the cap is the soft limit
atomic.AddInt32(&r.tokenCount, 1)
}
}
}

// return true if there is a token to retry, false otherwise
// takeRetryToken returns true if there is a token to retry, false otherwise
func (r *RetryRateLimiter) takeRetryToken() bool {
if atomic.LoadInt32(&r.tokenCount) > 0 {
// it is ok to go below 0, because consumed token will still match added one at the end
Expand All @@ -136,6 +139,7 @@ func (r *RetryRateLimiter) takeRetryToken() bool {
return false
}

// NewConfigWithRetryLimit creates a new Config for the Backoff operation with a retry limit.
func NewConfigWithRetryLimit(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFnCfg, retryRateLimiter *RetryRateLimiter, err error) *Config {
return &Config{
name: name,
Expand Down Expand Up @@ -173,7 +177,7 @@ var (
BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout)
BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout)
BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout(""))
BoPDRegionMetadata = NewConfigWithRetryLimit("pdRegionMetadata", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), NewRetryRateLimiter(10, 0.1), tikverr.NewErrPDServerTimeout(""))
BoPDRegionMetadata = NewConfigWithRetryLimit("pdRegionMetadata", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), NewRetryRateLimiter(10, 10), tikverr.NewErrPDServerTimeout(""))
// change base time to 2ms, because it may recover soon.
BoRegionMiss = NewConfig("regionMiss", &metrics.BackoffHistogramRegionMiss, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable)
BoRegionScheduling = NewConfig("regionScheduling", &metrics.BackoffHistogramRegionScheduling, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable)
Expand Down

0 comments on commit 2de173c

Please sign in to comment.