Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/http, retry: support backoffer in HTTP client #7680

Merged
merged 6 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 48 additions & 27 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/pd/client/retry"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -140,42 +141,53 @@

// requestWithRetry will first try to send the request to the PD leader, if it fails, it will try to send
// the request to the other PD followers to gain a better availability.
// TODO: support custom retry logic, e.g. retry with customizable backoffer.
func (ci *clientInner) requestWithRetry(
ctx context.Context,
reqInfo *requestInfo,
headerOpts ...HeaderOption,
) error {
var (
statusCode int
err error
addr string
pdAddrs, leaderAddrIdx = ci.getPDAddrs()
statusCode int
err error
)
// Try to send the request to the PD leader first.
if leaderAddrIdx != -1 {
addr = pdAddrs[leaderAddrIdx]
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
execFunc := func() error {
var (
addr string
pdAddrs, leaderAddrIdx = ci.getPDAddrs()
)
// Try to send the request to the PD leader first.
if leaderAddrIdx != -1 {
addr = pdAddrs[leaderAddrIdx]
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request leader addr failed",
zap.String("source", ci.source), zap.Int("leader-idx", leaderAddrIdx), zap.String("addr", addr), zap.Error(err))
}
log.Debug("[pd] request leader addr failed",
zap.String("source", ci.source), zap.Int("leader-idx", leaderAddrIdx), zap.String("addr", addr), zap.Error(err))
}
// Try to send the request to the other PD followers.
for idx := 0; idx < len(pdAddrs); idx++ {
if idx == leaderAddrIdx {
continue
// Try to send the request to the other PD followers.
for idx := 0; idx < len(pdAddrs); idx++ {
if idx == leaderAddrIdx {
continue
}
addr = ci.pdAddrs[idx]
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
break
}
log.Debug("[pd] request follower addr failed",
zap.String("source", ci.source), zap.Int("idx", idx), zap.String("addr", addr), zap.Error(err))
}
addr = ci.pdAddrs[idx]
_, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
break
}
log.Debug("[pd] request follower addr failed",
zap.String("source", ci.source), zap.Int("idx", idx), zap.String("addr", addr), zap.Error(err))
return err
}
if reqInfo.bo == nil {
return execFunc()
}
return err
// Backoffer also needs to check the status code to determine whether to retry.
reqInfo.bo.SetRetryableChecker(func(err error) bool {
return err != nil && !noNeedRetry(statusCode)
})
return reqInfo.bo.Exec(ctx, execFunc)
}

func noNeedRetry(statusCode int) bool {
Expand Down Expand Up @@ -210,7 +222,7 @@
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(body))
if err != nil {
log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...)
return -1, errors.Trace(err)

Check warning on line 225 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L225

Added line #L225 was not covered by tests
}
for _, opt := range headerOpts {
opt(req.Header)
Expand All @@ -229,7 +241,7 @@

// Give away the response handling to the caller if the handler is set.
if respHandler != nil {
return resp.StatusCode, respHandler(resp, res)

Check warning on line 244 in client/http/client.go

View check run for this annotation

Codecov / codecov/patch

client/http/client.go#L244

Added line #L244 was not covered by tests
}

defer func() {
Expand Down Expand Up @@ -336,6 +348,7 @@

callerID string
respHandler respHandleFunc
bo *retry.Backoffer
}

// ClientOption configures the HTTP client.
Expand Down Expand Up @@ -420,6 +433,13 @@
return &newClient
}

// WithBackoffer sets and returns a new client with the given backoffer.
func (c *client) WithBackoffer(bo *retry.Backoffer) Client {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
newClient := *c
newClient.bo = bo
return &newClient
}

// Header key definition constants.
const (
pdAllowFollowerHandleKey = "PD-Allow-Follower-Handle"
Expand All @@ -439,7 +459,8 @@
func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts ...HeaderOption) error {
return c.inner.requestWithRetry(ctx, reqInfo.
WithCallerID(c.callerID).
WithRespHandler(c.respHandler),
WithRespHandler(c.respHandler).
WithBackoffer(c.bo),
headerOpts...)
}

Expand Down
28 changes: 28 additions & 0 deletions client/http/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ import (
"net/http"
"strings"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/retry"
"go.uber.org/atomic"
)

Expand Down Expand Up @@ -169,3 +171,29 @@ func TestRedirectWithMetrics(t *testing.T) {
re.Equal(float64(4), out.Counter.GetValue())
c.Close()
}

func TestWithBackoffer(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := NewClient("test-with-backoffer", []string{"http://127.0.0.1"})

base := 100 * time.Millisecond
max := 500 * time.Millisecond
total := time.Second
bo := retry.InitialBackoffer(base, max, total)
// Test the time cost of the backoff.
start := time.Now()
_, err := c.WithBackoffer(bo).GetPDVersion(ctx)
re.InDelta(total, time.Since(start), float64(250*time.Millisecond))
re.Error(err)
// Test if the infinite retry works.
bo = retry.InitialBackoffer(base, max, 0)
timeoutCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
start = time.Now()
_, err = c.WithBackoffer(bo).GetPDVersion(timeoutCtx)
re.InDelta(3*time.Second, time.Since(start), float64(250*time.Millisecond))
re.ErrorIs(err, context.DeadlineExceeded)
c.Close()
}
3 changes: 3 additions & 0 deletions client/http/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/tikv/pd/client/retry"
)

// Client is a PD (Placement Driver) HTTP client.
Expand Down Expand Up @@ -92,6 +93,8 @@ type Client interface {
// Additionally, it is important for the caller to handle the content of the response body properly
// in order to ensure that it can be read and marshaled correctly into `res`.
WithRespHandler(func(resp *http.Response, res interface{}) error) Client
// WithBackoffer sets and returns a new client with the given backoffer.
WithBackoffer(*retry.Backoffer) Client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer caller use ClientOption in NewClient to set it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using WithBackoffer here allows us to flexibly use different Backoffers for different scenarios and interfaces like:

cli := NewClient()
cli.WithBackoffer(bo1).GetRegion()
cli.WithBackoffer(bo2).GetStore()

If we useClientOption for initialization, we may need to create a new Client every time we want to use a different Backoffer.

// Close gracefully closes the HTTP client.
Close()
}
Expand Down
13 changes: 12 additions & 1 deletion client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@

package http

import "fmt"
import (
"fmt"

"github.com/tikv/pd/client/retry"
)

// The following constants are the names of the requests.
const (
Expand Down Expand Up @@ -75,6 +79,7 @@ type requestInfo struct {
body []byte
res interface{}
respHandler respHandleFunc
bo *retry.Backoffer
}

// newRequestInfo creates a new request info.
Expand Down Expand Up @@ -124,6 +129,12 @@ func (ri *requestInfo) WithRespHandler(respHandler respHandleFunc) *requestInfo
return ri
}

// WithBackoffer sets the backoffer of the request.
func (ri *requestInfo) WithBackoffer(bo *retry.Backoffer) *requestInfo {
ri.bo = bo
return ri
}

func (ri *requestInfo) getURL(addr string) string {
return fmt.Sprintf("%s%s", addr, ri.uri)
}
2 changes: 1 addition & 1 deletion client/pd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ func (c *pdServiceDiscovery) updateMemberLoop() {
ticker := time.NewTicker(memberUpdateInterval)
defer ticker.Stop()

bo := retry.InitialBackOffer(updateMemberBackOffBaseTime, updateMemberTimeout)
bo := retry.InitialBackoffer(updateMemberBackOffBaseTime, updateMemberTimeout, updateMemberBackOffBaseTime)
for {
select {
case <-ctx.Done():
Expand Down
106 changes: 85 additions & 21 deletions client/retry/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,65 +18,129 @@
"context"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
)

// BackOffer is a backoff policy for retrying operations.
type BackOffer struct {
max time.Duration
next time.Duration
// Backoffer is a backoff policy for retrying operations.
type Backoffer struct {
// base defines the initial time interval to wait before each retry.
base time.Duration
// max defines the max time interval to wait before each retry.
max time.Duration
// total defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
total time.Duration
// retryableChecker is used to check if the error is retryable.
// By default, all errors are retryable.
retryableChecker func(err error) bool

next time.Duration
currentTotal time.Duration
}

// Exec is a helper function to exec backoff.
func (bo *BackOffer) Exec(
func (bo *Backoffer) Exec(
ctx context.Context,
fn func() error,
) error {
if err := fn(); err != nil {
after := time.NewTimer(bo.nextInterval())
defer after.Stop()
defer bo.resetBackoff()
var (
err error
after *time.Timer
)
for {
err = fn()
if !bo.isRetryable(err) {
break
}
currentInterval := bo.nextInterval()
if after == nil {
after = time.NewTimer(currentInterval)
} else {
after.Reset(currentInterval)
rleungx marked this conversation as resolved.
Show resolved Hide resolved
}
select {
case <-ctx.Done():
after.Stop()
return errors.Trace(ctx.Err())
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
case <-after.C:
failpoint.Inject("backOffExecute", func() {
testBackOffExecuteFlag = true
})
}
return err
after.Stop()
// If the current total time exceeds the maximum total time, return the last error.
if bo.total > 0 {
bo.currentTotal += currentInterval
if bo.currentTotal >= bo.total {
break
}
}
}
return err
}

// InitialBackoffer make the initial state for retrying.
// - `base` defines the initial time interval to wait before each retry.
// - `max` defines the max time interval to wait before each retry.
// - `total` defines the max total time duration cost in retrying. If it's 0, it means infinite retry until success.
func InitialBackoffer(base, max, total time.Duration) *Backoffer {
// Make sure the base is less than or equal to the max.
if base > max {
base = max
}
// Make sure the total is not less than the base.
if total > 0 && total < base {
total = base
}
return &Backoffer{
base: base,
max: max,
total: total,
retryableChecker: func(err error) bool {
return err != nil
},
next: base,
currentTotal: 0,
}
// reset backoff when fn() succeed.
bo.resetBackoff()
return nil
}

// InitialBackOffer make the initial state for retrying.
func InitialBackOffer(base, max time.Duration) BackOffer {
return BackOffer{
max: max,
base: base,
next: base,
// SetRetryableChecker sets the retryable checker.
func (bo *Backoffer) SetRetryableChecker(checker func(err error) bool) {
bo.retryableChecker = checker
}

func (bo *Backoffer) isRetryable(err error) bool {
if bo.retryableChecker == nil {
return true

Check warning on line 115 in client/retry/backoff.go

View check run for this annotation

Codecov / codecov/patch

client/retry/backoff.go#L115

Added line #L115 was not covered by tests
}
return bo.retryableChecker(err)
}

// nextInterval for now use the `exponentialInterval`.
func (bo *BackOffer) nextInterval() time.Duration {
func (bo *Backoffer) nextInterval() time.Duration {
return bo.exponentialInterval()
}

// exponentialInterval returns the exponential backoff duration.
func (bo *BackOffer) exponentialInterval() time.Duration {
func (bo *Backoffer) exponentialInterval() time.Duration {
backoffInterval := bo.next
// Make sure the total backoff time is less than the total.
if bo.total > 0 && bo.currentTotal+backoffInterval > bo.total {
backoffInterval = bo.total - bo.currentTotal
}
bo.next *= 2
// Make sure the next backoff time is less than the max.
if bo.next > bo.max {
bo.next = bo.max
}
return backoffInterval
}

// resetBackoff resets the backoff to initial state.
func (bo *BackOffer) resetBackoff() {
func (bo *Backoffer) resetBackoff() {
bo.next = bo.base
bo.currentTotal = 0
}

// Only used for test.
Expand Down
Loading
Loading