-
Notifications
You must be signed in to change notification settings - Fork 726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
client: use ServiceDiscovery to update member in HTTP client #7668
Changes from all commits
3917a61
f09af97
120728c
5ba9e63
0606fbf
12d600c
449bb8f
f392d75
2b7b72e
faa9261
13945a6
3ddabcb
a26d1c6
8b65ba7
3757435
9d21255
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,17 +19,16 @@ import ( | |
"context" | ||
"crypto/tls" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"os" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
"github.com/pingcap/errors" | ||
"github.com/pingcap/log" | ||
"github.com/prometheus/client_golang/prometheus" | ||
pd "github.com/tikv/pd/client" | ||
"github.com/tikv/pd/client/errs" | ||
"github.com/tikv/pd/client/retry" | ||
"go.uber.org/zap" | ||
) | ||
|
@@ -58,9 +57,7 @@ type clientInner struct { | |
ctx context.Context | ||
cancel context.CancelFunc | ||
|
||
sync.RWMutex | ||
pdAddrs []string | ||
leaderAddrIdx int | ||
sd pd.ServiceDiscovery | ||
|
||
// source is used to mark the source of the client creation, | ||
// it will also be used in the caller ID of the inner client. | ||
|
@@ -72,12 +69,11 @@ type clientInner struct { | |
executionDuration *prometheus.HistogramVec | ||
} | ||
|
||
func newClientInner(source string) *clientInner { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
return &clientInner{ctx: ctx, cancel: cancel, leaderAddrIdx: -1, source: source} | ||
func newClientInner(ctx context.Context, cancel context.CancelFunc, source string) *clientInner { | ||
return &clientInner{ctx: ctx, cancel: cancel, source: source} | ||
} | ||
|
||
func (ci *clientInner) init() { | ||
func (ci *clientInner) init(sd pd.ServiceDiscovery) { | ||
// Init the HTTP client if it's not configured. | ||
if ci.cli == nil { | ||
ci.cli = &http.Client{Timeout: defaultTimeout} | ||
|
@@ -87,8 +83,7 @@ func (ci *clientInner) init() { | |
ci.cli.Transport = transport | ||
} | ||
} | ||
// Start the members info updater daemon. | ||
go ci.membersInfoUpdater(ci.ctx) | ||
ci.sd = sd | ||
} | ||
|
||
func (ci *clientInner) close() { | ||
|
@@ -98,33 +93,6 @@ func (ci *clientInner) close() { | |
} | ||
} | ||
|
||
// getPDAddrs returns the current PD addresses and the index of the leader address. | ||
func (ci *clientInner) getPDAddrs() ([]string, int) { | ||
ci.RLock() | ||
defer ci.RUnlock() | ||
return ci.pdAddrs, ci.leaderAddrIdx | ||
} | ||
|
||
func (ci *clientInner) setPDAddrs(pdAddrs []string, leaderAddrIdx int) { | ||
ci.Lock() | ||
defer ci.Unlock() | ||
// Normalize the addresses with correct scheme prefix. | ||
var scheme string | ||
if ci.tlsConf == nil { | ||
scheme = httpScheme | ||
} else { | ||
scheme = httpsScheme | ||
} | ||
for i, addr := range pdAddrs { | ||
if strings.HasPrefix(addr, httpScheme) { | ||
continue | ||
} | ||
pdAddrs[i] = fmt.Sprintf("%s://%s", scheme, addr) | ||
} | ||
ci.pdAddrs = pdAddrs | ||
ci.leaderAddrIdx = leaderAddrIdx | ||
} | ||
|
||
func (ci *clientInner) reqCounter(name, status string) { | ||
if ci.requestCounter == nil { | ||
return | ||
|
@@ -151,32 +119,19 @@ func (ci *clientInner) requestWithRetry( | |
err error | ||
) | ||
execFunc := func() error { | ||
var ( | ||
addr string | ||
pdAddrs, leaderAddrIdx = ci.getPDAddrs() | ||
) | ||
// Try to send the request to the PD leader first. | ||
if leaderAddrIdx != -1 { | ||
addr = pdAddrs[leaderAddrIdx] | ||
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...) | ||
if err == nil || noNeedRetry(statusCode) { | ||
return err | ||
} | ||
log.Debug("[pd] request leader addr failed", | ||
zap.String("source", ci.source), zap.Int("leader-idx", leaderAddrIdx), zap.String("addr", addr), zap.Error(err)) | ||
// It will try to send the request to the PD leader first and then try to send the request to the other PD followers. | ||
clients := ci.sd.GetAllServiceClients() | ||
if len(clients) == 0 { | ||
return errs.ErrClientNoAvailableMember | ||
} | ||
// Try to send the request to the other PD followers. | ||
for idx := 0; idx < len(pdAddrs); idx++ { | ||
if idx == leaderAddrIdx { | ||
continue | ||
} | ||
addr = ci.pdAddrs[idx] | ||
for _, cli := range clients { | ||
addr := cli.GetHTTPAddress() | ||
statusCode, err = ci.doRequest(ctx, addr, reqInfo, headerOpts...) | ||
if err == nil || noNeedRetry(statusCode) { | ||
break | ||
return err | ||
} | ||
log.Debug("[pd] request follower addr failed", | ||
zap.String("source", ci.source), zap.Int("idx", idx), zap.String("addr", addr), zap.Error(err)) | ||
log.Debug("[pd] request addr failed", | ||
zap.String("source", ci.source), zap.Bool("is-leader", cli.IsConnectedToLeader()), zap.String("addr", addr), zap.Error(err)) | ||
} | ||
return err | ||
} | ||
|
@@ -278,73 +233,6 @@ func (ci *clientInner) doRequest( | |
return resp.StatusCode, nil | ||
} | ||
|
||
func (ci *clientInner) membersInfoUpdater(ctx context.Context) { | ||
ci.updateMembersInfo(ctx) | ||
log.Info("[pd] http client member info updater started", zap.String("source", ci.source)) | ||
ticker := time.NewTicker(defaultMembersInfoUpdateInterval) | ||
defer ticker.Stop() | ||
for { | ||
select { | ||
case <-ctx.Done(): | ||
log.Info("[pd] http client member info updater stopped", zap.String("source", ci.source)) | ||
return | ||
case <-ticker.C: | ||
ci.updateMembersInfo(ctx) | ||
} | ||
} | ||
} | ||
|
||
func (ci *clientInner) updateMembersInfo(ctx context.Context) { | ||
var membersInfo MembersInfo | ||
err := ci.requestWithRetry(ctx, newRequestInfo(). | ||
WithCallerID(fmt.Sprintf("%s-%s", ci.source, defaultInnerCallerID)). | ||
WithName(getMembersName). | ||
WithURI(membersPrefix). | ||
WithMethod(http.MethodGet). | ||
WithResp(&membersInfo)) | ||
if err != nil { | ||
log.Error("[pd] http client get members info failed", zap.String("source", ci.source), zap.Error(err)) | ||
return | ||
} | ||
if len(membersInfo.Members) == 0 { | ||
log.Error("[pd] http client get empty members info", zap.String("source", ci.source)) | ||
return | ||
} | ||
var ( | ||
newPDAddrs []string | ||
newLeaderAddrIdx int = -1 | ||
) | ||
for _, member := range membersInfo.Members { | ||
if membersInfo.Leader != nil && member.GetMemberId() == membersInfo.Leader.GetMemberId() { | ||
newLeaderAddrIdx = len(newPDAddrs) | ||
} | ||
newPDAddrs = append(newPDAddrs, member.GetClientUrls()...) | ||
} | ||
// Prevent setting empty addresses. | ||
if len(newPDAddrs) == 0 { | ||
log.Error("[pd] http client get empty member addresses", zap.String("source", ci.source)) | ||
return | ||
} | ||
oldPDAddrs, oldLeaderAddrIdx := ci.getPDAddrs() | ||
ci.setPDAddrs(newPDAddrs, newLeaderAddrIdx) | ||
// Log the member info change if it happens. | ||
var oldPDLeaderAddr, newPDLeaderAddr string | ||
if oldLeaderAddrIdx != -1 { | ||
oldPDLeaderAddr = oldPDAddrs[oldLeaderAddrIdx] | ||
} | ||
if newLeaderAddrIdx != -1 { | ||
newPDLeaderAddr = newPDAddrs[newLeaderAddrIdx] | ||
} | ||
oldMemberNum, newMemberNum := len(oldPDAddrs), len(newPDAddrs) | ||
if oldPDLeaderAddr != newPDLeaderAddr || oldMemberNum != newMemberNum { | ||
log.Info("[pd] http client members info changed", zap.String("source", ci.source), | ||
zap.Int("old-member-num", oldMemberNum), zap.Int("new-member-num", newMemberNum), | ||
zap.Strings("old-addrs", oldPDAddrs), zap.Strings("new-addrs", newPDAddrs), | ||
zap.Int("old-leader-addr-idx", oldLeaderAddrIdx), zap.Int("new-leader-addr-idx", newLeaderAddrIdx), | ||
zap.String("old-leader-addr", oldPDLeaderAddr), zap.String("new-leader-addr", newPDLeaderAddr)) | ||
} | ||
} | ||
|
||
type client struct { | ||
inner *clientInner | ||
|
||
|
@@ -397,19 +285,36 @@ func WithLoggerRedirection(logLevel, fileName string) ClientOption { | |
return func(c *client) {} | ||
} | ||
|
||
// NewClientWithServiceDiscovery creates a PD HTTP client with the given PD service discovery. | ||
func NewClientWithServiceDiscovery( | ||
source string, | ||
sd pd.ServiceDiscovery, | ||
opts ...ClientOption, | ||
) Client { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
c := &client{inner: newClientInner(ctx, cancel, source), callerID: defaultCallerID} | ||
// Apply the options first. | ||
for _, opt := range opts { | ||
opt(c) | ||
} | ||
c.inner.init(sd) | ||
return c | ||
} | ||
|
||
// NewClient creates a PD HTTP client with the given PD addresses and TLS config. | ||
func NewClient( | ||
source string, | ||
pdAddrs []string, | ||
opts ...ClientOption, | ||
) Client { | ||
c := &client{inner: newClientInner(source), callerID: defaultCallerID} | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
c := &client{inner: newClientInner(ctx, cancel, source), callerID: defaultCallerID} | ||
// Apply the options first. | ||
for _, opt := range opts { | ||
opt(c) | ||
} | ||
c.inner.setPDAddrs(pdAddrs, -1) | ||
c.inner.init() | ||
sd := pd.NewDefaultPDServiceDiscovery(ctx, cancel, pdAddrs, c.inner.tlsConf) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not just use newPDServiceDiscovery? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. I didn't think it was a good idea to pass in a lot of nil params before. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I forgot that |
||
c.inner.init(sd) | ||
return c | ||
} | ||
|
||
|
@@ -466,16 +371,17 @@ func (c *client) request(ctx context.Context, reqInfo *requestInfo, headerOpts . | |
headerOpts...) | ||
} | ||
|
||
// UpdateMembersInfo updates the members info of the PD cluster in the inner client. | ||
// Exported for testing. | ||
func (c *client) UpdateMembersInfo() { | ||
c.inner.updateMembersInfo(c.inner.ctx) | ||
// requestChecker is used to check the HTTP request sent by the client. | ||
type requestChecker func(req *http.Request) error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cc @HuSharp There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it just used in the test? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe better to add some comments for |
||
|
||
// RoundTrip implements the `http.RoundTripper` interface. | ||
func (rc requestChecker) RoundTrip(req *http.Request) (resp *http.Response, err error) { | ||
return &http.Response{StatusCode: http.StatusOK}, rc(req) | ||
} | ||
|
||
// setLeaderAddrIdx sets the index of the leader address in the inner client. | ||
// only used for testing. | ||
func (c *client) setLeaderAddrIdx(idx int) { | ||
c.inner.Lock() | ||
defer c.inner.Unlock() | ||
c.inner.leaderAddrIdx = idx | ||
// NewHTTPClientWithRequestChecker returns a http client with checker. | ||
func NewHTTPClientWithRequestChecker(checker requestChecker) *http.Client { | ||
return &http.Client{ | ||
Transport: checker, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should use a context passed through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found out that when we created innerClient, it created a context itself and cancel it with the
Close
function, so I kept it. Do we need to change it?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot about the logic. Then let's keep it the same.