Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: use ServiceDiscovery to update member in HTTP client #7668

Merged
merged 16 commits into from
Jan 12, 2024
3 changes: 3 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ type Client interface {
// SetExternalTimestamp sets external timestamp
SetExternalTimestamp(ctx context.Context, timestamp uint64) error

// GetServiceDiscovery returns ServiceDiscovery
GetServiceDiscovery() ServiceDiscovery

// TSOClient is the TSO client.
TSOClient
// MetaStorageClient is the meta storage client.
Expand Down
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var (
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
ErrClientUpdateMember = errors.Normalize("update member failed, %v", errors.RFCCodeText("PD:client:ErrUpdateMember"))
ErrClientNoAvailableMember = errors.Normalize("no available member", errors.RFCCodeText("PD:client:ErrClientNoAvailableMember"))
ErrClientProtoUnmarshal = errors.Normalize("failed to unmarshal proto", errors.RFCCodeText("PD:proto:ErrClientProtoUnmarshal"))
ErrClientGetMultiResponse = errors.Normalize("get invalid value response %v, must only one", errors.RFCCodeText("PD:client:ErrClientGetMultiResponse"))
ErrClientGetServingEndpoint = errors.Normalize("get serving endpoint failed", errors.RFCCodeText("PD:client:ErrClientGetServingEndpoint"))
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/pingcap/kvproto v0.0.0-20231222062942-c0c73f41d0b2
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/prometheus/client_model v0.2.0
github.com/stretchr/testify v1.8.2
go.uber.org/atomic v1.10.0
go.uber.org/goleak v1.1.11
Expand All @@ -32,6 +31,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.26.0 // indirect
github.com/prometheus/procfs v0.6.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
188 changes: 48 additions & 140 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ import (
"io"
"net/http"
"os"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/prometheus/client_golang/prometheus"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/retry"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -58,9 +58,7 @@ type clientInner struct {
ctx context.Context
cancel context.CancelFunc

sync.RWMutex
pdAddrs []string
leaderAddrIdx int
sd pd.ServiceDiscovery

// source is used to mark the source of the client creation,
// it will also be used in the caller ID of the inner client.
Expand All @@ -72,12 +70,11 @@ type clientInner struct {
executionDuration *prometheus.HistogramVec
}

func newClientInner(source string) *clientInner {
ctx, cancel := context.WithCancel(context.Background())
return &clientInner{ctx: ctx, cancel: cancel, leaderAddrIdx: -1, source: source}
func newClientInner(ctx context.Context, cancel context.CancelFunc, source string) *clientInner {
return &clientInner{ctx: ctx, cancel: cancel, source: source}
}

func (ci *clientInner) init() {
func (ci *clientInner) init(sd pd.ServiceDiscovery) {
// Init the HTTP client if it's not configured.
if ci.cli == nil {
ci.cli = &http.Client{Timeout: defaultTimeout}
Expand All @@ -87,8 +84,7 @@ func (ci *clientInner) init() {
ci.cli.Transport = transport
}
}
// Start the members info updater daemon.
go ci.membersInfoUpdater(ci.ctx)
ci.sd = sd
}

func (ci *clientInner) close() {
Expand All @@ -98,33 +94,6 @@ func (ci *clientInner) close() {
}
}

// getPDAddrs returns the current PD addresses and the index of the leader address.
func (ci *clientInner) getPDAddrs() ([]string, int) {
ci.RLock()
defer ci.RUnlock()
return ci.pdAddrs, ci.leaderAddrIdx
}

func (ci *clientInner) setPDAddrs(pdAddrs []string, leaderAddrIdx int) {
ci.Lock()
defer ci.Unlock()
// Normalize the addresses with correct scheme prefix.
var scheme string
if ci.tlsConf == nil {
scheme = httpScheme
} else {
scheme = httpsScheme
}
for i, addr := range pdAddrs {
if strings.HasPrefix(addr, httpScheme) {
continue
}
pdAddrs[i] = fmt.Sprintf("%s://%s", scheme, addr)
}
ci.pdAddrs = pdAddrs
ci.leaderAddrIdx = leaderAddrIdx
}

func (ci *clientInner) reqCounter(name, status string) {
if ci.requestCounter == nil {
return
Expand All @@ -151,35 +120,23 @@ func (ci *clientInner) requestWithRetry(
err error
)
execFunc := func() error {
var (
addr string
pdAddrs, leaderAddrIdx = ci.getPDAddrs()
)
// Try to send the request to the PD leader first.
if leaderAddrIdx != -1 {
addr = pdAddrs[leaderAddrIdx]
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
return err
}
log.Debug("[pd] request leader addr failed",
zap.String("source", ci.source), zap.Int("leader-idx", leaderAddrIdx), zap.String("addr", addr), zap.Error(err))
// It will try to send the request to the PD leader first and then try to send the request to the other PD followers.
clients := ci.sd.GetAllServiceClients()
if len(clients) == 0 {
return errs.ErrClientNoAvailableMember
}
// Try to send the request to the other PD followers.
for idx := 0; idx < len(pdAddrs); idx++ {
if idx == leaderAddrIdx {
continue
}
addr = ci.pdAddrs[idx]
for _, cli := range clients {
addr := cli.GetHTTPAddress()
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...)
if err == nil || noNeedRetry(statusCode) {
break
return err
}
log.Debug("[pd] request follower addr failed",
zap.String("source", ci.source), zap.Int("idx", idx), zap.String("addr", addr), zap.Error(err))
log.Debug("[pd] request addr failed",
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("addr", addr), zap.Error(err))
}
return err
}
fmt.Println("reqInfo.bo", reqInfo.bo == nil)
CabinfeverB marked this conversation as resolved.
Show resolved Hide resolved
if reqInfo.bo == nil {
return execFunc()
}
Expand Down Expand Up @@ -276,73 +233,6 @@ func (ci *clientInner) doRequest(
return resp.StatusCode, nil
}

func (ci *clientInner) membersInfoUpdater(ctx context.Context) {
ci.updateMembersInfo(ctx)
log.Info("[pd] http client member info updater started", zap.String("source", ci.source))
ticker := time.NewTicker(defaultMembersInfoUpdateInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("[pd] http client member info updater stopped", zap.String("source", ci.source))
return
case <-ticker.C:
ci.updateMembersInfo(ctx)
}
}
}

func (ci *clientInner) updateMembersInfo(ctx context.Context) {
var membersInfo MembersInfo
err := ci.requestWithRetry(ctx, newRequestInfo().
WithCallerID(fmt.Sprintf("%s-%s", ci.source, defaultInnerCallerID)).
WithName(getMembersName).
WithURI(membersPrefix).
WithMethod(http.MethodGet).
WithResp(&membersInfo))
if err != nil {
log.Error("[pd] http client get members info failed", zap.String("source", ci.source), zap.Error(err))
return
}
if len(membersInfo.Members) == 0 {
log.Error("[pd] http client get empty members info", zap.String("source", ci.source))
return
}
var (
newPDAddrs []string
newLeaderAddrIdx int = -1
)
for _, member := range membersInfo.Members {
if membersInfo.Leader != nil && member.GetMemberId() == membersInfo.Leader.GetMemberId() {
newLeaderAddrIdx = len(newPDAddrs)
}
newPDAddrs = append(newPDAddrs, member.GetClientUrls()...)
}
// Prevent setting empty addresses.
if len(newPDAddrs) == 0 {
log.Error("[pd] http client get empty member addresses", zap.String("source", ci.source))
return
}
oldPDAddrs, oldLeaderAddrIdx := ci.getPDAddrs()
ci.setPDAddrs(newPDAddrs, newLeaderAddrIdx)
// Log the member info change if it happens.
var oldPDLeaderAddr, newPDLeaderAddr string
if oldLeaderAddrIdx != -1 {
oldPDLeaderAddr = oldPDAddrs[oldLeaderAddrIdx]
}
if newLeaderAddrIdx != -1 {
newPDLeaderAddr = newPDAddrs[newLeaderAddrIdx]
}
oldMemberNum, newMemberNum := len(oldPDAddrs), len(newPDAddrs)
if oldPDLeaderAddr != newPDLeaderAddr || oldMemberNum != newMemberNum {
log.Info("[pd] http client members info changed", zap.String("source", ci.source),
zap.Int("old-member-num", oldMemberNum), zap.Int("new-member-num", newMemberNum),
zap.Strings("old-addrs", oldPDAddrs), zap.Strings("new-addrs", newPDAddrs),
zap.Int("old-leader-addr-idx", oldLeaderAddrIdx), zap.Int("new-leader-addr-idx", newLeaderAddrIdx),
zap.String("old-leader-addr", oldPDLeaderAddr), zap.String("new-leader-addr", newPDLeaderAddr))
}
}

type client struct {
inner *clientInner

Expand Down Expand Up @@ -395,19 +285,36 @@ func WithLoggerRedirection(logLevel, fileName string) ClientOption {
return func(c *client) {}
}

// NewClientWithServiceDiscovery creates a PD HTTP client with the given PD service discovery.
func NewClientWithServiceDiscovery(
source string,
sd pd.ServiceDiscovery,
opts ...ClientOption,
) Client {
ctx, cancel := context.WithCancel(context.Background())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should use a context passed through.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found out that when we created innerClient, it created a context itself and cancel it with the Close function, so I kept it. Do we need to change it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot about the logic. Then let's keep it the same.

c := &client{inner: newClientInner(ctx, cancel, source), callerID: defaultCallerID}
// Apply the options first.
for _, opt := range opts {
opt(c)
}
c.inner.init(sd)
return c
}

// NewClient creates a PD HTTP client with the given PD addresses and TLS config.
func NewClient(
source string,
pdAddrs []string,
opts ...ClientOption,
) Client {
c := &client{inner: newClientInner(source), callerID: defaultCallerID}
ctx, cancel := context.WithCancel(context.Background())
c := &client{inner: newClientInner(ctx, cancel, source), callerID: defaultCallerID}
// Apply the options first.
for _, opt := range opts {
opt(c)
}
c.inner.setPDAddrs(pdAddrs, -1)
c.inner.init()
sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just use newPDServiceDiscovery?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I didn't think it was a good idea to pass in a lot of nil params before.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot that option is unexported, so I think we can use NewDefaultPDServiceDiscovery to avoid to input some unconcerned params.

c.inner.init(sd)
return c
}

Expand Down Expand Up @@ -464,16 +371,17 @@ func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts .
headerOpts...)
}

// UpdateMembersInfo updates the members info of the PD cluster in the inner client.
// Exported for testing.
func (c *client) UpdateMembersInfo() {
c.inner.updateMembersInfo(c.inner.ctx)
// requestChecker is used to check the HTTP request sent by the client.
type requestChecker func(req *http.Request) error
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it just used in the test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe better to add some comments for NewHTTPClientWithRequestChecker


// RoundTrip implements the `http.RoundTripper` interface.
func (rc requestChecker) RoundTrip(req *http.Request) (resp *http.Response, err error) {
return &http.Response{StatusCode: http.StatusOK}, rc(req)
}

// setLeaderAddrIdx sets the index of the leader address in the inner client.
// only used for testing.
func (c *client) setLeaderAddrIdx(idx int) {
c.inner.Lock()
defer c.inner.Unlock()
c.inner.leaderAddrIdx = idx
// NewHTTPClientWithRequestChecker returns a http client with checker.
func NewHTTPClientWithRequestChecker(checker requestChecker) *http.Client {
return &http.Client{
Transport: checker,
}
}
Loading
Loading