Skip to content

Commit

Permalink
Introduce ClientOption to configure the PD client (ref #3149)
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Oct 28, 2021
1 parent 400344b commit 216c0fa
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 86 deletions.
19 changes: 7 additions & 12 deletions client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,8 @@ type baseClient struct {

security SecurityOption

// BaseClient options.
gRPCDialOptions []grpc.DialOption
timeout time.Duration
maxRetryTimes int
enableTSOFollowerProxy bool
// Client option.
clientOption *ClientOption
}

// SecurityOption records options about tls
Expand All @@ -89,9 +86,7 @@ func newBaseClient(ctx context.Context, urls []string, security SecurityOption)
ctx: clientCtx,
cancel: clientCancel,
security: security,
timeout: defaultPDTimeout,
maxRetryTimes: maxInitClusterRetries,
enableTSOFollowerProxy: false,
clientOption: NewClientOption(),
}
bc.urls.Store(urls)
return bc
Expand All @@ -115,7 +110,7 @@ func (c *baseClient) init() error {

func (c *baseClient) initRetry(f func() error) error {
var err error
for i := 0; i < c.maxRetryTimes; i++ {
for i := 0; i < c.clientOption.maxRetryTimes; i++ {
if err = f(); err == nil {
return nil
}
Expand Down Expand Up @@ -250,7 +245,7 @@ func (c *baseClient) initClusterID() error {
ctx, cancel := context.WithCancel(c.ctx)
defer cancel()
for _, u := range c.GetURLs() {
members, err := c.getMembers(ctx, u, c.timeout)
members, err := c.getMembers(ctx, u, c.clientOption.timeout)
if err != nil || members.GetHeader() == nil {
log.Warn("[pd] failed to get cluster id", zap.String("url", u), errs.ZapError(err))
continue
Expand Down Expand Up @@ -330,7 +325,7 @@ func (c *baseClient) updateURLs(members []*pdpb.Member) {
}
c.urls.Store(urls)
// Update the connection contexts when member changes if TSO Follower Proxy is enabled.
if c.enableTSOFollowerProxy {
if c.clientOption.GetEnableTSOFollowerProxy() {
c.scheduleUpdateConnectionCtxs()
}
log.Info("[pd] update member urls", zap.Strings("old-urls", oldURLs), zap.Strings("new-urls", urls))
Expand Down Expand Up @@ -418,7 +413,7 @@ func (c *baseClient) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error)
}
dCtx, cancel := context.WithTimeout(c.ctx, dialTimeout)
defer cancel()
cc, err := grpcutil.GetClientConn(dCtx, addr, tlsCfg, c.gRPCDialOptions...)
cc, err := grpcutil.GetClientConn(dCtx, addr, tlsCfg, c.clientOption.gRPCDialOptions...)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 216c0fa

Please sign in to comment.