Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Wenqi Mou <wenqimou@gmail.com>
  • Loading branch information
Tristan1900 committed Nov 25, 2024
1 parent 4c3308a commit 1cc3695
Showing 1 changed file with 122 additions and 38 deletions.
160 changes: 122 additions & 38 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,41 +147,92 @@ type backoffStrategyImpl struct {
isNonRetryErr func(error) bool
}

func NewBackoffRetryAllErrorStrategy(remainingAttempts int, delayTime, maxDelayTime time.Duration) BackoffStrategy {
errContext := NewZeroRetryContext("retry all errors")
return &backoffStrategyImpl{
remainingAttempts: remainingAttempts,
delayTime: delayTime,
maxDelayTime: maxDelayTime,
errContext: errContext,
isRetryErr: alwaysTrueFunc(),
isNonRetryErr: alwaysFalseFunc(),
// BackoffOption defines a function type for configuring backoffStrategyImpl
type BackoffOption func(*backoffStrategyImpl)

// WithRemainingAttempts sets the remaining attempts
func WithRemainingAttempts(attempts int) BackoffOption {
return func(b *backoffStrategyImpl) {
b.remainingAttempts = attempts
}
}

func NewBackoffRetryAllExceptStrategy(remainingAttempts int, delayTime, maxDelayTime time.Duration, isNonRetryFunc func(error) bool) BackoffStrategy {
errContext := NewZeroRetryContext("retry all except")
return &backoffStrategyImpl{
remainingAttempts: remainingAttempts,
delayTime: delayTime,
maxDelayTime: maxDelayTime,
errContext: errContext,
isRetryErr: alwaysTrueFunc(),
isNonRetryErr: isNonRetryFunc,
// WithDelayTime sets the initial delay time
func WithDelayTime(delay time.Duration) BackoffOption {
return func(b *backoffStrategyImpl) {
b.delayTime = delay
}
}

// WithMaxDelayTime sets the maximum delay time
func WithMaxDelayTime(maxDelay time.Duration) BackoffOption {
return func(b *backoffStrategyImpl) {
b.maxDelayTime = maxDelay
}
}

// WithErrorContext sets the error context
func WithErrorContext(errContext *ErrorContext) BackoffOption {
return func(b *backoffStrategyImpl) {
b.errContext = errContext
}
}

// WithRetryErrorFunc sets the retry error checking function
func WithRetryErrorFunc(isRetryErr func(error) bool) BackoffOption {
return func(b *backoffStrategyImpl) {
b.isRetryErr = isRetryErr
}
}

// WithNonRetryErrorFunc sets the non-retry error checking function
func WithNonRetryErrorFunc(isNonRetryErr func(error) bool) BackoffOption {
return func(b *backoffStrategyImpl) {
b.isNonRetryErr = isNonRetryErr
}
}

// NewBackoffStrategy creates a new backoff strategy with custom retry logic
func newBackoffStrategy(remainingAttempts int, delayTime, maxDelayTime time.Duration, errContext *ErrorContext,
isRetryErr func(error) bool, isNonRetryErr func(error) bool) BackoffStrategy {
return &backoffStrategyImpl{
remainingAttempts: remainingAttempts,
delayTime: delayTime,
maxDelayTime: maxDelayTime,
errContext: errContext,
isRetryErr: isRetryErr,
isNonRetryErr: isNonRetryErr,
func NewBackoffStrategy(opts ...BackoffOption) BackoffStrategy {
// Default values
bs := &backoffStrategyImpl{
remainingAttempts: 1,
delayTime: time.Second,
maxDelayTime: 10 * time.Second,
errContext: NewZeroRetryContext("default"),
isRetryErr: alwaysTrueFunc(),
isNonRetryErr: alwaysFalseFunc(),
}

for _, opt := range opts {
opt(bs)
}

return bs
}

func NewBackoffRetryAllErrorStrategy(remainingAttempts int, delayTime, maxDelayTime time.Duration) BackoffStrategy {
errContext := NewZeroRetryContext("retry all errors")
return NewBackoffStrategy(
WithRemainingAttempts(remainingAttempts),
WithDelayTime(delayTime),
WithMaxDelayTime(maxDelayTime),
WithErrorContext(errContext),
WithRetryErrorFunc(alwaysTrueFunc()),
WithNonRetryErrorFunc(alwaysFalseFunc()),
)
}

func NewBackoffRetryAllExceptStrategy(remainingAttempts int, delayTime, maxDelayTime time.Duration, isNonRetryFunc func(error) bool) BackoffStrategy {
errContext := NewZeroRetryContext("retry all except")
return NewBackoffStrategy(
WithRemainingAttempts(remainingAttempts),
WithDelayTime(delayTime),
WithMaxDelayTime(maxDelayTime),
WithErrorContext(errContext),
WithRetryErrorFunc(alwaysTrueFunc()),
WithNonRetryErrorFunc(isNonRetryFunc),
)
}

func NewTiKVStoreBackoffStrategy(maxRetry int, delayTime, maxDelayTime time.Duration,
Expand Down Expand Up @@ -209,7 +260,14 @@ func NewTiKVStoreBackoffStrategy(maxRetry int, delayTime, maxDelayTime time.Dura
isRetryErrFunc := buildIsRetryErrFunc(retryErrs, grpcRetryCodes)
isNonRetryErrFunc := buildIsNonRetryErrFunc(nonRetryErrs)

return newBackoffStrategy(maxRetry, delayTime, maxDelayTime, errContext, isRetryErrFunc, isNonRetryErrFunc)
return NewBackoffStrategy(
WithRemainingAttempts(maxRetry),
WithDelayTime(delayTime),
WithMaxDelayTime(maxDelayTime),
WithErrorContext(errContext),
WithRetryErrorFunc(isRetryErrFunc),
WithNonRetryErrorFunc(isNonRetryErrFunc),
)
}

func NewImportSSTBackoffStrategy() BackoffStrategy {
Expand Down Expand Up @@ -255,8 +313,14 @@ func NewPDBackoffStrategy(maxRetry int, delayTime, maxDelayTime time.Duration) B
isRetryErrFunc := buildIsRetryErrFunc(retryErrs, grpcRetryCodes)
isNonRetryErrFunc := buildIsNonRetryErrFunc(nonRetryErrs)

return newBackoffStrategy(maxRetry, delayTime, maxDelayTime, NewZeroRetryContext("connect PD"),
isRetryErrFunc, isNonRetryErrFunc)
return NewBackoffStrategy(
WithRemainingAttempts(maxRetry),
WithDelayTime(delayTime),
WithMaxDelayTime(maxDelayTime),
WithErrorContext(NewZeroRetryContext("connect PD")),
WithRetryErrorFunc(isRetryErrFunc),
WithNonRetryErrorFunc(isNonRetryErrFunc),
)
}

func NewAggressivePDBackoffStrategy() BackoffStrategy {
Expand All @@ -276,23 +340,43 @@ func NewDiskCheckBackoffStrategy() BackoffStrategy {

isRetryErrFunc := buildIsRetryErrFunc(retryErrs, grpcRetryCodes)

return newBackoffStrategy(resetTSRetryTime, resetTSWaitInterval, resetTSMaxWaitInterval, NewZeroRetryContext("disk check"),
isRetryErrFunc, alwaysFalseFunc())
return NewBackoffStrategy(
WithRemainingAttempts(resetTSRetryTime),
WithDelayTime(resetTSWaitInterval),
WithErrorContext(NewZeroRetryContext("disk check")),
WithRetryErrorFunc(isRetryErrFunc),
WithNonRetryErrorFunc(alwaysFalseFunc()),
)
}

func NewRecoveryBackoffStrategy(isRetryErrFunc func(error) bool) BackoffStrategy {
return newBackoffStrategy(recoveryMaxAttempts, recoveryDelayTime, recoveryMaxDelayTime, NewZeroRetryContext("recovery"),
isRetryErrFunc, alwaysFalseFunc())
return NewBackoffStrategy(
WithRemainingAttempts(recoveryMaxAttempts),
WithDelayTime(recoveryDelayTime),
WithErrorContext(NewZeroRetryContext("recovery")),
WithRetryErrorFunc(isRetryErrFunc),
WithNonRetryErrorFunc(alwaysFalseFunc()),
)
}

func NewFlashBackBackoffStrategy() BackoffStrategy {
return newBackoffStrategy(FlashbackRetryTime, FlashbackWaitInterval, FlashbackMaxWaitInterval, NewZeroRetryContext("flashback"),
alwaysTrueFunc(), alwaysFalseFunc())
return NewBackoffStrategy(
WithRemainingAttempts(FlashbackRetryTime),
WithDelayTime(FlashbackWaitInterval),
WithErrorContext(NewZeroRetryContext("flashback")),
WithRetryErrorFunc(alwaysTrueFunc()),
WithNonRetryErrorFunc(alwaysFalseFunc()),
)
}

func NewChecksumBackoffStrategy() BackoffStrategy {
return newBackoffStrategy(ChecksumRetryTime, ChecksumWaitInterval, ChecksumMaxWaitInterval, NewZeroRetryContext("checksum"),
alwaysTrueFunc(), alwaysFalseFunc())
return NewBackoffStrategy(
WithRemainingAttempts(ChecksumRetryTime),
WithDelayTime(ChecksumWaitInterval),
WithErrorContext(NewZeroRetryContext("checksum")),
WithRetryErrorFunc(alwaysTrueFunc()),
WithNonRetryErrorFunc(alwaysFalseFunc()),
)
}

func (bo *backoffStrategyImpl) NextBackoff(err error) time.Duration {
Expand Down

0 comments on commit 1cc3695

Please sign in to comment.