Skip to content

Commit

Permalink
client/http, api/middleware: enhance the retry logic of the HTTP clie…
Browse files Browse the repository at this point in the history
…nt (#8229)

ref #7300

Schedule a member change check when the HTTP status code is 503 or receives a leader/primary change error.

Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato authored Jun 3, 2024
1 parent 199b017 commit a929a54
Show file tree
Hide file tree
Showing 16 changed files with 99 additions and 56 deletions.
11 changes: 0 additions & 11 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1431,17 +1431,6 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
return resp, nil
}

// IsLeaderChange will determine whether there is a leader change.
func IsLeaderChange(err error) bool {
if err == errs.ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, errs.NotLeaderErr) ||
strings.Contains(errMsg, errs.MismatchLeaderErr) ||
strings.Contains(errMsg, errs.NotServedErr)
}

const (
httpSchemePrefix = "http://"
httpsSchemePrefix = "https://"
Expand Down
13 changes: 6 additions & 7 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@ import (
"github.com/pingcap/errors"
)

// Note: keep the same as the ones defined on the server side to ensure the client can use them correctly.
const (
// NoLeaderErr indicates there is no leader in the cluster currently.
NoLeaderErr = "no leader"
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotLeaderErr = "is not leader"
NotLeaderErr = "not leader"
// MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
MismatchLeaderErr = "mismatch leader id"
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
// RetryTimeoutErr indicates the server is busy.
RetryTimeoutErr = "retry timeout"
// NotPrimaryErr indicates the non-primary member received the requests which should be received by primary.
NotPrimaryErr = "not primary"
)

// client errors
Expand Down
18 changes: 18 additions & 0 deletions client/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,29 @@
package errs

import (
"strings"

"github.com/pingcap/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// IsLeaderChange will determine whether there is a leader/primary change.
func IsLeaderChange(err error) bool {
if err == nil {
return false
}
if err == ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, NoLeaderErr) ||
strings.Contains(errMsg, NotLeaderErr) ||
strings.Contains(errMsg, MismatchLeaderErr) ||
strings.Contains(errMsg, NotServedErr) ||
strings.Contains(errMsg, NotPrimaryErr)
}

// ZapError is used to make the log output easier.
func ZapError(err error, causeError ...error) zap.Field {
if err == nil {
Expand Down
49 changes: 33 additions & 16 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,47 @@ func (ci *clientInner) requestWithRetry(
headerOpts ...HeaderOption,
) error {
var (
serverURL string
isLeader bool
statusCode int
err error
logFields = append(reqInfo.logFields(), zap.String("source", ci.source))
)
execFunc := func() error {
defer func() {
// If the status code is 503, it indicates that there may be PD leader/follower changes.
// If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change.
if statusCode == http.StatusServiceUnavailable || errs.IsLeaderChange(err) {
ci.sd.ScheduleCheckMemberChanged()
}
log.Debug("[pd] http request finished", append(logFields,
zap.String("server-url", serverURL),
zap.Bool("is-leader", isLeader),
zap.Int("status-code", statusCode),
zap.Error(err))...)
}()
// It will try to send the request to the PD leader first and then try to send the request to the other PD followers.
clients := ci.sd.GetAllServiceClients()
if len(clients) == 0 {
return errs.ErrClientNoAvailableMember
}
skipNum := 0
for _, cli := range clients {
url := cli.GetURL()
if reqInfo.targetURL != "" && reqInfo.targetURL != url {
serverURL = cli.GetURL()
isLeader = cli.IsConnectedToLeader()
if len(reqInfo.targetURL) > 0 && reqInfo.targetURL != serverURL {
skipNum++
continue
}
statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...)
statusCode, err = ci.doRequest(ctx, serverURL, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request url failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err))
log.Debug("[pd] http request url failed", append(logFields,
zap.String("server-url", serverURL),
zap.Bool("is-leader", isLeader),
zap.Int("status-code", statusCode),
zap.Error(err))...)
}
if skipNum == len(clients) {
return errs.ErrClientNoTargetMember
Expand Down Expand Up @@ -169,26 +188,21 @@ func noNeedRetry(statusCode int) bool {

func (ci *clientInner) doRequest(
ctx context.Context,
url string, reqInfo *requestInfo,
serverURL string, reqInfo *requestInfo,
headerOpts ...HeaderOption,
) (int, error) {
var (
source = ci.source
callerID = reqInfo.callerID
name = reqInfo.name
method = reqInfo.method
body = reqInfo.body
res = reqInfo.res
respHandler = reqInfo.respHandler
url = reqInfo.getURL(serverURL)
logFields = append(reqInfo.logFields(),
zap.String("source", ci.source),
zap.String("url", url))
)
url = reqInfo.getURL(url)
logFields := []zap.Field{
zap.String("source", source),
zap.String("name", name),
zap.String("url", url),
zap.String("method", method),
zap.String("caller-id", callerID),
}
log.Debug("[pd] request the http url", logFields...)
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(body))
if err != nil {
Expand Down Expand Up @@ -229,11 +243,14 @@ func (ci *clientInner) doRequest(
if readErr != nil {
logFields = append(logFields, zap.NamedError("read-body-error", err))
} else {
// API server will return a JSON body containing the detailed error message
// when the status code is not `http.StatusOK` 200.
bs = bytes.TrimSpace(bs)
logFields = append(logFields, zap.ByteString("body", bs))
}

log.Error("[pd] request failed with a non-200 status", logFields...)
return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s'", resp.Status)
return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s', body: '%s'", resp.Status, bs)
}

if res == nil {
Expand Down
11 changes: 11 additions & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

"github.com/tikv/pd/client/retry"
"go.uber.org/zap"
)

// The following constants are the names of the requests.
Expand Down Expand Up @@ -157,3 +158,13 @@ func (ri *requestInfo) WithTargetURL(targetURL string) *requestInfo {
func (ri *requestInfo) getURL(addr string) string {
return fmt.Sprintf("%s%s", addr, ri.uri)
}

func (ri *requestInfo) logFields() []zap.Field {
return []zap.Field{
zap.String("caller-id", ri.callerID),
zap.String("name", ri.name),
zap.String("uri", ri.uri),
zap.String("method", ri.method),
zap.String("target-url", ri.targetURL),
}
}
3 changes: 2 additions & 1 deletion client/pd_service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/testutil"
"google.golang.org/grpc"
Expand Down Expand Up @@ -205,7 +206,7 @@ func (suite *serviceClientTestSuite) TestServiceClient() {
re.NotNil(leaderConn)

_, err := pb.NewGreeterClient(followerConn).SayHello(suite.ctx, &pb.HelloRequest{Name: "pd"})
re.ErrorContains(err, "not leader")
re.ErrorContains(err, errs.NotLeaderErr)
resp, err := pb.NewGreeterClient(leaderConn).SayHello(suite.ctx, &pb.HelloRequest{Name: "pd"})
re.NoError(err)
re.Equal("Hello pd", resp.GetMessage())
Expand Down
7 changes: 1 addition & 6 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package pd

import (
"context"
"strings"
"time"

"github.com/gogo/protobuf/proto"
Expand All @@ -35,10 +34,6 @@ const (
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
controllerConfigPathPrefix = "resource_group/controller"
// errNotPrimary is returned when the requested server is not primary.
errNotPrimary = "not primary"
// errNotLeader is returned when the requested server is not pd leader.
errNotLeader = "not leader"
)

// GroupSettingsPathPrefixBytes is used to watch or get resource groups.
Expand Down Expand Up @@ -83,7 +78,7 @@ func (c *client) resourceManagerClient() (rmpb.ResourceManagerClient, error) {

// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service.
func (c *client) gRPCErrorHandler(err error) {
if strings.Contains(err.Error(), errNotPrimary) || strings.Contains(err.Error(), errNotLeader) {
if errs.IsLeaderChange(err) {
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ tsoBatchLoop:
cancel()
stream = nil
// Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP.
if IsLeaderChange(err) {
if errs.IsLeaderChange(err) {
if err := bo.Exec(ctx, svcDiscovery.CheckMemberChanged); err != nil {
select {
case <-ctx.Done():
Expand Down
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,21 @@ error = '''
redirect failed
'''

["PD:apiutil:ErrRedirectNoLeader"]
error = '''
redirect finds no leader
'''

["PD:apiutil:ErrRedirectToNotLeader"]
error = '''
redirect to not leader
'''

["PD:apiutil:ErrRedirectToNotPrimary"]
error = '''
redirect to not primary
'''

["PD:autoscaling:ErrEmptyMetricsResponse"]
error = '''
metrics response from Prometheus is empty
Expand Down
9 changes: 5 additions & 4 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,11 @@ var (

// apiutil errors
var (
ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect"))
ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist"))
// ErrRedirectToNotLeader is the error message for redirect to not leader.
ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader"))
ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect"))
ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist"))
ErrRedirectNoLeader = errors.Normalize("redirect finds no leader", errors.RFCCodeText("PD:apiutil:ErrRedirectNoLeader"))
ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader"))
ErrRedirectToNotPrimary = errors.Normalize("redirect to not primary", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotPrimary"))
)

// grpcutil errors
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/apiutil/multiservicesapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func ServiceRedirector() gin.HandlerFunc {

// Prevent more than one redirection.
if name := c.Request.Header.Get(ServiceRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error())
log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirectToNotPrimary))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirectToNotPrimary.FastGenByArgs().Error())
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
leader := h.waitForLeader(r)
// The leader has not been elected yet.
if leader == nil {
http.Error(w, "no leader", http.StatusServiceUnavailable)
http.Error(w, errs.ErrRedirectNoLeader.FastGenByArgs().Error(), http.StatusServiceUnavailable)
return
}
// If the leader is the current server now, we can handle the request directly.
Expand All @@ -222,7 +222,7 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name())
} else {
// Prevent more than one redirection among PD/API servers.
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect))
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
}
Expand Down
4 changes: 2 additions & 2 deletions server/apiv2/middlewares/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func Redirector() gin.HandlerFunc {

// Prevent more than one redirection.
if name := c.Request.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error())
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirectToNotLeader.FastGenByArgs().Error())
return
}

Expand Down
3 changes: 2 additions & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
clierrs "github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -528,7 +529,7 @@ func TestGlobalAndLocalTSO(t *testing.T) {
re.NotEmpty(cluster.WaitLeader())
_, _, err = cli.GetTS(ctx)
re.Error(err)
re.True(pd.IsLeaderChange(err))
re.True(clierrs.IsLeaderChange(err))
_, _, err = cli.GetTS(ctx)
re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateMember"))
Expand Down
5 changes: 3 additions & 2 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
clierrs "github.com/tikv/pd/client/errs"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
Expand Down Expand Up @@ -467,8 +468,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) dispatchClient(
errMsg := err.Error()
// Ignore the errors caused by the split and context cancellation.
if strings.Contains(errMsg, "context canceled") ||
strings.Contains(errMsg, "not leader") ||
strings.Contains(errMsg, "not served") ||
strings.Contains(errMsg, clierrs.NotLeaderErr) ||
strings.Contains(errMsg, clierrs.NotServedErr) ||
strings.Contains(errMsg, "ErrKeyspaceNotAssigned") ||
strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") {
continue
Expand Down
2 changes: 1 addition & 1 deletion tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func TestNotLeader(t *testing.T) {
grpcStatus, ok := status.FromError(err)
re.True(ok)
re.Equal(codes.Unavailable, grpcStatus.Code())
re.Equal("not leader", grpcStatus.Message())
re.ErrorContains(server.ErrNotLeader, grpcStatus.Message())
}

func TestStoreVersionChange(t *testing.T) {
Expand Down

0 comments on commit a929a54

Please sign in to comment.