Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add retry limiter to backoff function #1478

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions config/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,9 +233,22 @@ func (b *Backoffer) BackoffWithCfgAndMaxSleep(cfg *Config, maxSleepMs int, err e
zap.Int("maxSleep", b.maxSleep),
zap.Stringer("type", cfg),
zap.Reflect("txnStartTS", startTs))

// fail fast if we don't have enough retry tokens
if cfg.retryRateLimiter != nil && !cfg.retryRateLimiter.takeRetryToken() {
logutil.Logger(b.ctx).Warn(fmt.Sprintf("Retry limit for %s is exhausted", cfg.name))
return errors.WithStack(err)
}

return nil
}

func (b *Backoffer) OnSuccess(cfg *Config) {
if cfg.retryRateLimiter != nil {
cfg.retryRateLimiter.addRetryToken()
}
}

func (b *Backoffer) String() string {
if b.totalSleep == 0 {
return ""
Expand Down
18 changes: 18 additions & 0 deletions config/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,24 @@ func TestBackoffErrorType(t *testing.T) {
assert.Fail(t, "should not be here")
}

func TestRetryLimit(t *testing.T) {
globalConfig := NewConfigWithRetryLimit("TestConfig", nil, NewBackoffFnCfg(1, 1000, NoJitter), NewRetryRateLimiter(1, 1), errors.New("test error"))
b := NewBackofferWithVars(context.TODO(), 100, nil)
// test we start with retry limit at cap (1 in this test)
assert.Nil(t, b.Backoff(globalConfig, errors.New("test error")))
// test retry limit hit
assert.NotNil(t, b.Backoff(globalConfig, errors.New("test error")))
// test the limit is global across difference Backoff instances
b2 := NewBackofferWithVars(context.TODO(), 100, nil)
assert.NotNil(t, b2.Backoff(globalConfig, errors.New("test error")))
// test the limit is repopulated with the cap by populating 2 tokens
b.OnSuccess(globalConfig)
b.OnSuccess(globalConfig)
// test we have only one token due to cap
assert.Nil(t, b2.Backoff(globalConfig, errors.New("test error")))
assert.NotNil(t, b2.Backoff(globalConfig, errors.New("test error")))
}

func TestBackoffDeepCopy(t *testing.T) {
var err error
b := NewBackofferWithVars(context.TODO(), 4, nil)
Expand Down
71 changes: 63 additions & 8 deletions config/retry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"math"
"math/rand"
"strings"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -52,10 +53,11 @@ import (

// Config is the configuration of the Backoff function.
type Config struct {
name string
metric *prometheus.Observer
fnCfg *BackoffFnCfg
err error
name string
metric *prometheus.Observer
fnCfg *BackoffFnCfg
retryRateLimiter *RetryRateLimiter
err error
}

// backoffFn is the backoff function which compute the sleep time and do sleep.
Expand Down Expand Up @@ -96,6 +98,58 @@ func NewConfig(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFn
}
}

// RetryRateLimiter is used to limit the number of retries
type RetryRateLimiter struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need comments like for exported functions and type definitions,

// RetryRateLimiter is used to limit retry times for PD requests.

or the lint check would fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somehow golangci-lint run didn't complain. Fixed each comment

// tokenCount represents number of available tokens for retry
tokenCount int32
// successForRetryCount represents how many success requests are need to allow a single retry
successForRetryCount int
// cap limits the number of retry tokens which can be accumulated over time
cap int32
}

// NewRetryRateLimiter creates a new RetryRateLimiter
// cap: the maximum number of retry tokens can be accumulated over time. Start with full bucket.
// successForRetryCount: how many success requests are needed to allow a single retry. E.g. if you want to allow a single retry per 10 calls, set it to 10.
func NewRetryRateLimiter(cap int32, successForRetryCount int) *RetryRateLimiter {
return &RetryRateLimiter{
cap,
successForRetryCount,
cap,
}
}

// addRetryToken adds a token to the rate limiter bucket according to configured retry to success ratio and the cap
func (r *RetryRateLimiter) addRetryToken() {
if rand.Intn(r.successForRetryCount) == 0 {
if atomic.LoadInt32(&r.tokenCount) < r.cap {
// it is ok to add more than the cap, because the cap is the soft limit
atomic.AddInt32(&r.tokenCount, 1)
}
}
}

// takeRetryToken returns true if there is a token to retry, false otherwise
func (r *RetryRateLimiter) takeRetryToken() bool {
if atomic.LoadInt32(&r.tokenCount) > 0 {
// it is ok to go below 0, because consumed token will still match added one at the end
atomic.AddInt32(&r.tokenCount, -1)
return true
}
return false
}

// NewConfigWithRetryLimit creates a new Config for the Backoff operation with a retry limit.
func NewConfigWithRetryLimit(name string, metric *prometheus.Observer, backoffFnCfg *BackoffFnCfg, retryRateLimiter *RetryRateLimiter, err error) *Config {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto for the comments of exported function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed

return &Config{
name: name,
metric: metric,
fnCfg: backoffFnCfg,
retryRateLimiter: retryRateLimiter,
err: err,
}
}

// Base returns the base time of the backoff function.
func (c *Config) Base() int {
return c.fnCfg.base
Expand All @@ -119,10 +173,11 @@ const txnLockFastName = "txnLockFast"
// Backoff Config variables.
var (
// TODO: distinguish tikv and tiflash in metrics
BoTiKVRPC = NewConfig("tikvRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiKVServerTimeout)
BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout)
BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout)
BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout(""))
BoTiKVRPC = NewConfig("tikvRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiKVServerTimeout)
BoTiFlashRPC = NewConfig("tiflashRPC", &metrics.BackoffHistogramRPC, NewBackoffFnCfg(100, 2000, EqualJitter), tikverr.ErrTiFlashServerTimeout)
BoTxnLock = NewConfig("txnLock", &metrics.BackoffHistogramLock, NewBackoffFnCfg(100, 3000, EqualJitter), tikverr.ErrResolveLockTimeout)
BoPDRPC = NewConfig("pdRPC", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), tikverr.NewErrPDServerTimeout(""))
BoPDRegionMetadata = NewConfigWithRetryLimit("pdRegionMetadata", &metrics.BackoffHistogramPD, NewBackoffFnCfg(500, 3000, EqualJitter), NewRetryRateLimiter(100, 1), tikverr.NewErrPDServerTimeout(""))
// change base time to 2ms, because it may recover soon.
BoRegionMiss = NewConfig("regionMiss", &metrics.BackoffHistogramRegionMiss, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable)
BoRegionScheduling = NewConfig("regionScheduling", &metrics.BackoffHistogramRegionScheduling, NewBackoffFnCfg(2, 500, NoJitter), tikverr.ErrRegionUnavailable)
Expand Down
12 changes: 10 additions & 2 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2059,7 +2059,7 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool,
opts = append(opts, pd.WithBuckets())
for {
if backoffErr != nil {
err := bo.Backoff(retry.BoPDRPC, backoffErr)
err := bo.Backoff(retry.BoPDRegionMetadata, backoffErr)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -2077,6 +2077,10 @@ func (c *RegionCache) loadRegion(bo *retry.Backoffer, key []byte, isEndKey bool,
metrics.RegionCacheCounterWithGetCacheMissError.Inc()
} else {
metrics.RegionCacheCounterWithGetCacheMissOK.Inc()
if backoffErr == nil {
// refill retry allowance only for the original call
bo.OnSuccess(retry.BoPDRegionMetadata)
}
}
if err != nil {
if apicodec.IsDecodeError(err) {
Expand Down Expand Up @@ -2112,7 +2116,7 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg
var backoffErr error
for {
if backoffErr != nil {
err := bo.Backoff(retry.BoPDRPC, backoffErr)
err := bo.Backoff(retry.BoPDRegionMetadata, backoffErr)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -2124,6 +2128,10 @@ func (c *RegionCache) loadRegionByID(bo *retry.Backoffer, regionID uint64) (*Reg
metrics.RegionCacheCounterWithGetRegionByIDError.Inc()
} else {
metrics.RegionCacheCounterWithGetRegionByIDOK.Inc()
if backoffErr == nil {
// refill retry allowance only for the original call
bo.OnSuccess(retry.BoPDRegionMetadata)
}
}
if err != nil {
if apicodec.IsDecodeError(err) {
Expand Down
Loading