Skip to content

Commit

Permalink
Refine the retryable error check
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Jan 9, 2024
1 parent 47826a8 commit cd119aa
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 11 deletions.
12 changes: 9 additions & 3 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,12 @@ func (ci *clientInner) requestWithRetry(
reqInfo *requestInfo,
headerOpts ...HeaderOption,
) error {
var (
statusCode int
err error
)
execFunc := func() error {
var (
statusCode int
err error
addr string
pdAddrs, leaderAddrIdx = ci.getPDAddrs()
)
Expand All @@ -169,7 +171,7 @@ func (ci *clientInner) requestWithRetry(
continue
}
addr = ci.pdAddrs[idx]
_, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
break
}
Expand All @@ -181,6 +183,10 @@ func (ci *clientInner) requestWithRetry(
if reqInfo.bo == nil {
return execFunc()
}
// Backoffer also needs to check the status code to determine whether to retry.
reqInfo.bo.SetRetryableChecker(func(err error) bool {
return err != nil && !noNeedRetry(statusCode)
})
return reqInfo.bo.Exec(ctx, execFunc)
}

Expand Down
38 changes: 32 additions & 6 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Backoffer struct {
max time.Duration
// total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
total time.Duration
// retryableChecker is used to check if the error is retryable.
// By default, all errors are retryable.
retryableChecker func(err error) bool

next time.Duration
currentTotal time.Duration
Expand All @@ -39,15 +42,23 @@ type Backoffer struct {
func (bo *Backoffer) Exec(
ctx context.Context,
fn func() error,
) (err error) {
) error {
defer bo.resetBackoff()
var (
err error
after *time.Timer
)
for {
err = fn()
if err == nil {
if !bo.isRetryable(err) {
break
}
currentInterval := bo.nextInterval()
after := time.NewTimer(currentInterval)
if after == nil {
after = time.NewTimer(currentInterval)
} else {
after.Reset(currentInterval)
}
select {
case <-ctx.Done():
after.Stop()
Expand Down Expand Up @@ -83,14 +94,29 @@ func InitialBackoffer(base, max, total time.Duration) *Backoffer {
total = base
}
return &Backoffer{
base: base,
max: max,
total: total,
base: base,
max: max,
total: total,
retryableChecker: func(err error) bool {
return err != nil
},
next: base,
currentTotal: 0,
}
}

// SetRetryableChecker sets the retryable checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) {
bo.retryableChecker = checker
}

func (bo *Backoffer) isRetryable(err error) bool {
if bo.retryableChecker == nil {
return true

Check warning on line 115 in client/retry/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/retry/backoff.go#L115

Added line #L115 was not covered by tests
}
return bo.retryableChecker(err)
}

// nextInterval for now use the `exponentialInterval`.
func (bo *Backoffer) nextInterval() time.Duration {
return bo.exponentialInterval()
Expand Down
20 changes: 18 additions & 2 deletions client/retry/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (

func TestBackoffer(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

base := time.Second
max := 100 * time.Millisecond
Expand All @@ -49,7 +51,7 @@ func TestBackoffer(t *testing.T) {
execCount int
expectedErr = errors.New("test")
)
err := bo.Exec(context.Background(), func() error {
err := bo.Exec(ctx, func() error {
execCount++
return expectedErr
})
Expand All @@ -74,7 +76,7 @@ func TestBackoffer(t *testing.T) {
// Test the total time cost.
execCount = 0
var start time.Time
err = bo.Exec(context.Background(), func() error {
err = bo.Exec(ctx, func() error {
execCount++
if start.IsZero() {
start = time.Now()
Expand All @@ -85,6 +87,20 @@ func TestBackoffer(t *testing.T) {
re.ErrorIs(err, expectedErr)
re.Equal(4, execCount)
re.True(isBackofferReset(bo))

// Test the retryable checker.
execCount = 0
bo = InitialBackoffer(base, max, total)
bo.SetRetryableChecker(func(err error) bool {
return execCount < 2
})
err = bo.Exec(ctx, func() error {
execCount++
return nil
})
re.NoError(err)
re.Equal(2, execCount)
re.True(isBackofferReset(bo))
}

func isBackofferReset(bo *Backoffer) bool {
Expand Down
15 changes: 15 additions & 0 deletions tests/integrations/client/http_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client/http"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/placement"
Expand Down Expand Up @@ -490,3 +491,17 @@ func (suite *httpClientTestSuite) TestVersion() {
re.NoError(err)
re.Equal(versioninfo.PDReleaseVersion, ver)
}

func (suite *httpClientTestSuite) TestWithBackoffer() {
re := suite.Require()
// Should return with 404 error without backoffer.
rule, err := suite.client.GetPlacementRule(suite.ctx, "non-exist-group", "non-exist-rule")
re.ErrorContains(err, http.StatusText(http.StatusNotFound))
re.Nil(rule)
// Should return with 404 error even with an infinite backoffer.
rule, err = suite.client.
WithBackoffer(retry.InitialBackoffer(100*time.Millisecond, time.Second, 0)).
GetPlacementRule(suite.ctx, "non-exist-group", "non-exist-rule")
re.ErrorContains(err, http.StatusText(http.StatusNotFound))
re.Nil(rule)
}

0 comments on commit cd119aa

Please sign in to comment.