Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/http, api/middleware: enhance the retry logic of the HTTP client (#8229) #8464

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 0 additions & 11 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,17 +1424,6 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
return resp, nil
}

// IsLeaderChange will determine whether there is a leader change.
func IsLeaderChange(err error) bool {
if err == errs.ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, errs.NotLeaderErr) ||
strings.Contains(errMsg, errs.MismatchLeaderErr) ||
strings.Contains(errMsg, errs.NotServedErr)
}

const (
httpSchemePrefix = "http://"
httpsSchemePrefix = "https://"
Expand Down
13 changes: 6 additions & 7 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,20 @@ import (
"github.com/pingcap/errors"
)

// Note: keep the same as the ones defined on the server side to ensure the client can use them correctly.
const (
// NoLeaderErr indicates there is no leader in the cluster currently.
NoLeaderErr = "no leader"
// NotLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotLeaderErr = "is not leader"
NotLeaderErr = "not leader"
// MismatchLeaderErr indicates the non-leader member received the requests which should be received by leader.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
MismatchLeaderErr = "mismatch leader id"
// NotServedErr indicates an tso node/pod received the requests for the keyspace groups which are not served by it.
// Note: keep the same as the ones defined on the server side, because the client side checks if an error message
// contains this string to judge whether the leader is changed.
NotServedErr = "is not served"
// RetryTimeoutErr indicates the server is busy.
RetryTimeoutErr = "retry timeout"
// NotPrimaryErr indicates the non-primary member received the requests which should be received by primary.
NotPrimaryErr = "not primary"
)

// client errors
Expand Down
18 changes: 18 additions & 0 deletions client/errs/errs.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,29 @@
package errs

import (
"strings"

"github.com/pingcap/errors"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

// IsLeaderChange will determine whether there is a leader/primary change.
func IsLeaderChange(err error) bool {
if err == nil {
return false
}
if err == ErrClientTSOStreamClosed {
return true
}
errMsg := err.Error()
return strings.Contains(errMsg, NoLeaderErr) ||
strings.Contains(errMsg, NotLeaderErr) ||
strings.Contains(errMsg, MismatchLeaderErr) ||
strings.Contains(errMsg, NotServedErr) ||
strings.Contains(errMsg, NotPrimaryErr)
}

// ZapError is used to make the log output easier.
func ZapError(err error, causeError ...error) zap.Field {
if err == nil {
Expand Down
49 changes: 33 additions & 16 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,28 +120,47 @@ func (ci *clientInner) requestWithRetry(
headerOpts ...HeaderOption,
) error {
var (
serverURL string
isLeader bool
statusCode int
err error
logFields = append(reqInfo.logFields(), zap.String("source", ci.source))
)
execFunc := func() error {
defer func() {
// If the status code is 503, it indicates that there may be PD leader/follower changes.
// If the error message contains the leader/primary change information, it indicates that there may be PD leader/primary change.
if statusCode == http.StatusServiceUnavailable || errs.IsLeaderChange(err) {
ci.sd.ScheduleCheckMemberChanged()
}
log.Debug("[pd] http request finished", append(logFields,
zap.String("server-url", serverURL),
zap.Bool("is-leader", isLeader),
zap.Int("status-code", statusCode),
zap.Error(err))...)
}()
// It will try to send the request to the PD leader first and then try to send the request to the other PD followers.
clients := ci.sd.GetAllServiceClients()
if len(clients) == 0 {
return errs.ErrClientNoAvailableMember
}
skipNum := 0
for _, cli := range clients {
url := cli.GetURL()
if reqInfo.targetURL != "" && reqInfo.targetURL != url {
serverURL = cli.GetURL()
isLeader = cli.IsConnectedToLeader()
if len(reqInfo.targetURL) > 0 && reqInfo.targetURL != serverURL {
skipNum++
continue
}
statusCode, err = ci.doRequest(ctx, url, reqInfo, headerOpts...)
statusCode, err = ci.doRequest(ctx, serverURL, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request url failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("url", url), zap.Error(err))
log.Debug("[pd] http request url failed", append(logFields,
zap.String("server-url", serverURL),
zap.Bool("is-leader", isLeader),
zap.Int("status-code", statusCode),
zap.Error(err))...)
}
if skipNum == len(clients) {
return errs.ErrClientNoTargetMember
Expand All @@ -168,26 +187,21 @@ func noNeedRetry(statusCode int) bool {

func (ci *clientInner) doRequest(
ctx context.Context,
url string, reqInfo *requestInfo,
serverURL string, reqInfo *requestInfo,
headerOpts ...HeaderOption,
) (int, error) {
var (
source = ci.source
callerID = reqInfo.callerID
name = reqInfo.name
method = reqInfo.method
body = reqInfo.body
res = reqInfo.res
respHandler = reqInfo.respHandler
url = reqInfo.getURL(serverURL)
logFields = append(reqInfo.logFields(),
zap.String("source", ci.source),
zap.String("url", url))
)
url = reqInfo.getURL(url)
logFields := []zap.Field{
zap.String("source", source),
zap.String("name", name),
zap.String("url", url),
zap.String("method", method),
zap.String("caller-id", callerID),
}
log.Debug("[pd] request the http url", logFields...)
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewBuffer(body))
if err != nil {
Expand Down Expand Up @@ -228,11 +242,14 @@ func (ci *clientInner) doRequest(
if readErr != nil {
logFields = append(logFields, zap.NamedError("read-body-error", err))
} else {
// API server will return a JSON body containing the detailed error message
// when the status code is not `http.StatusOK` 200.
bs = bytes.TrimSpace(bs)
logFields = append(logFields, zap.ByteString("body", bs))
}

log.Error("[pd] request failed with a non-200 status", logFields...)
return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s'", resp.Status)
return resp.StatusCode, errors.Errorf("request pd http api failed with status: '%s', body: '%s'", resp.Status, bs)
}

if res == nil {
Expand Down
11 changes: 11 additions & 0 deletions client/http/request_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"

"github.com/tikv/pd/client/retry"
"go.uber.org/zap"
)

// The following constants are the names of the requests.
Expand Down Expand Up @@ -156,3 +157,13 @@ func (ri *requestInfo) WithTargetURL(targetURL string) *requestInfo {
func (ri *requestInfo) getURL(addr string) string {
return fmt.Sprintf("%s%s", addr, ri.uri)
}

func (ri *requestInfo) logFields() []zap.Field {
return []zap.Field{
zap.String("caller-id", ri.callerID),
zap.String("name", ri.name),
zap.String("uri", ri.uri),
zap.String("method", ri.method),
zap.String("target-url", ri.targetURL),
}
}
3 changes: 2 additions & 1 deletion client/pd_service_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/grpcutil"
"github.com/tikv/pd/client/testutil"
"google.golang.org/grpc"
Expand Down Expand Up @@ -205,7 +206,7 @@ func (suite *serviceClientTestSuite) TestServiceClient() {
re.NotNil(leaderConn)

_, err := pb.NewGreeterClient(followerConn).SayHello(suite.ctx, &pb.HelloRequest{Name: "pd"})
re.ErrorContains(err, "not leader")
re.ErrorContains(err, errs.NotLeaderErr)
resp, err := pb.NewGreeterClient(leaderConn).SayHello(suite.ctx, &pb.HelloRequest{Name: "pd"})
re.NoError(err)
re.Equal("Hello pd", resp.GetMessage())
Expand Down
7 changes: 1 addition & 6 deletions client/resource_manager_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package pd

import (
"context"
"strings"
"time"

"github.com/gogo/protobuf/proto"
Expand All @@ -35,10 +34,6 @@ const (
modify actionType = 1
groupSettingsPathPrefix = "resource_group/settings"
controllerConfigPathPrefix = "resource_group/controller"
// errNotPrimary is returned when the requested server is not primary.
errNotPrimary = "not primary"
// errNotLeader is returned when the requested server is not pd leader.
errNotLeader = "not leader"
)

// GroupSettingsPathPrefixBytes is used to watch or get resource groups.
Expand Down Expand Up @@ -83,7 +78,7 @@ func (c *client) resourceManagerClient() (rmpb.ResourceManagerClient, error) {

// gRPCErrorHandler is used to handle the gRPC error returned by the resource manager service.
func (c *client) gRPCErrorHandler(err error) {
if strings.Contains(err.Error(), errNotPrimary) || strings.Contains(err.Error(), errNotLeader) {
if errs.IsLeaderChange(err) {
c.pdSvcDiscovery.ScheduleCheckMemberChanged()
}
}
Expand Down
2 changes: 1 addition & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ tsoBatchLoop:
cancel()
stream = nil
// Because ScheduleCheckMemberChanged is asynchronous, if the leader changes, we better call `updateMember` ASAP.
if IsLeaderChange(err) {
if errs.IsLeaderChange(err) {
if err := bo.Exec(dispatcherCtx, c.svcDiscovery.CheckMemberChanged); err != nil {
select {
case <-dispatcherCtx.Done():
Expand Down
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,21 @@ error = '''
redirect failed
'''

["PD:apiutil:ErrRedirectNoLeader"]
error = '''
redirect finds no leader
'''

["PD:apiutil:ErrRedirectToNotLeader"]
error = '''
redirect to not leader
'''

["PD:apiutil:ErrRedirectToNotPrimary"]
error = '''
redirect to not primary
'''

["PD:autoscaling:ErrEmptyMetricsResponse"]
error = '''
metrics response from Prometheus is empty
Expand Down
9 changes: 5 additions & 4 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,11 @@ var (

// apiutil errors
var (
ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect"))
ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist"))
// ErrRedirectToNotLeader is the error message for redirect to not leader.
ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader"))
ErrRedirect = errors.Normalize("redirect failed", errors.RFCCodeText("PD:apiutil:ErrRedirect"))
ErrOptionNotExist = errors.Normalize("the option %s does not exist", errors.RFCCodeText("PD:apiutil:ErrOptionNotExist"))
ErrRedirectNoLeader = errors.Normalize("redirect finds no leader", errors.RFCCodeText("PD:apiutil:ErrRedirectNoLeader"))
ErrRedirectToNotLeader = errors.Normalize("redirect to not leader", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotLeader"))
ErrRedirectToNotPrimary = errors.Normalize("redirect to not primary", errors.RFCCodeText("PD:apiutil:ErrRedirectToNotPrimary"))
)

// grpcutil errors
Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/apiutil/multiservicesapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func ServiceRedirector() gin.HandlerFunc {

// Prevent more than one redirection.
if name := c.Request.Header.Get(ServiceRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error())
log.Error("redirect but server is not primary", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirectToNotPrimary))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirectToNotPrimary.FastGenByArgs().Error())
return
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/utils/apiutil/serverapi/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,14 +209,14 @@ func (h *redirector) ServeHTTP(w http.ResponseWriter, r *http.Request, next http
} else if name := r.Header.Get(apiutil.PDRedirectorHeader); len(name) == 0 {
leader := h.waitForLeader(r)
if leader == nil {
http.Error(w, "no leader", http.StatusServiceUnavailable)
http.Error(w, errs.ErrRedirectNoLeader.FastGenByArgs().Error(), http.StatusServiceUnavailable)
return
}
clientUrls = leader.GetClientUrls()
r.Header.Set(apiutil.PDRedirectorHeader, h.s.Name())
} else {
// Prevent more than one redirection among PD/API servers.
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirect))
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", h.s.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
http.Error(w, errs.ErrRedirectToNotLeader.FastGenByArgs().Error(), http.StatusInternalServerError)
return
}
Expand Down
4 changes: 2 additions & 2 deletions server/apiv2/middlewares/redirector.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func Redirector() gin.HandlerFunc {

// Prevent more than one redirection.
if name := c.Request.Header.Get(apiutil.PDRedirectorHeader); len(name) != 0 {
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirect))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirect.FastGenByArgs().Error())
log.Error("redirect but server is not leader", zap.String("from", name), zap.String("server", svr.Name()), errs.ZapError(errs.ErrRedirectToNotLeader))
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrRedirectToNotLeader.FastGenByArgs().Error())
return
}

Expand Down
3 changes: 2 additions & 1 deletion tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
clierrs "github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
Expand Down Expand Up @@ -493,7 +494,7 @@ func TestGlobalAndLocalTSO(t *testing.T) {
re.NotEmpty(cluster.WaitLeader())
_, _, err = cli.GetTS(ctx)
re.Error(err)
re.True(pd.IsLeaderChange(err))
re.True(clierrs.IsLeaderChange(err))
_, _, err = cli.GetTS(ctx)
re.NoError(err)
re.NoError(failpoint.Disable("github.com/tikv/pd/client/skipUpdateMember"))
Expand Down
5 changes: 3 additions & 2 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
pd "github.com/tikv/pd/client"
clierrs "github.com/tikv/pd/client/errs"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
Expand Down Expand Up @@ -448,8 +449,8 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) dispatchClient(
errMsg := err.Error()
// Ignore the errors caused by the split and context cancellation.
if strings.Contains(errMsg, "context canceled") ||
strings.Contains(errMsg, "not leader") ||
strings.Contains(errMsg, "not served") ||
strings.Contains(errMsg, clierrs.NotLeaderErr) ||
strings.Contains(errMsg, clierrs.NotServedErr) ||
strings.Contains(errMsg, "ErrKeyspaceNotAssigned") ||
strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") {
continue
Expand Down
2 changes: 1 addition & 1 deletion tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -662,7 +662,7 @@ func TestNotLeader(t *testing.T) {
grpcStatus, ok := status.FromError(err)
re.True(ok)
re.Equal(codes.Unavailable, grpcStatus.Code())
re.Equal("not leader", grpcStatus.Message())
re.ErrorContains(server.ErrNotLeader, grpcStatus.Message())
}

func TestStoreVersionChange(t *testing.T) {
Expand Down
Loading