From 216c0fafab12af7ab89306bcc4c1f2afe3f8f3ac Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 28 Oct 2021 21:49:30 +0800 Subject: [PATCH] Introduce ClientOption to configure the PD client (ref #3149) Signed-off-by: JmPotato --- client/base_client.go | 19 ++-- client/client.go | 165 ++++++++++++++++++++--------------- client/client_option.go | 96 ++++++++++++++++++++ client/client_option_test.go | 62 +++++++++++++ client/client_test.go | 6 +- tests/client/client_test.go | 2 +- 6 files changed, 264 insertions(+), 86 deletions(-) create mode 100644 client/client_option.go create mode 100644 client/client_option_test.go diff --git a/client/base_client.go b/client/base_client.go index b7d47bbaac0..8124b775ce0 100644 --- a/client/base_client.go +++ b/client/base_client.go @@ -61,11 +61,8 @@ type baseClient struct { security SecurityOption - // BaseClient options. - gRPCDialOptions []grpc.DialOption - timeout time.Duration - maxRetryTimes int - enableTSOFollowerProxy bool + // Client option. + clientOption *ClientOption } // SecurityOption records options about tls @@ -89,9 +86,7 @@ func newBaseClient(ctx context.Context, urls []string, security SecurityOption) ctx: clientCtx, cancel: clientCancel, security: security, - timeout: defaultPDTimeout, - maxRetryTimes: maxInitClusterRetries, - enableTSOFollowerProxy: false, + clientOption: NewClientOption(), } bc.urls.Store(urls) return bc @@ -115,7 +110,7 @@ func (c *baseClient) init() error { func (c *baseClient) initRetry(f func() error) error { var err error - for i := 0; i < c.maxRetryTimes; i++ { + for i := 0; i < c.clientOption.maxRetryTimes; i++ { if err = f(); err == nil { return nil } @@ -250,7 +245,7 @@ func (c *baseClient) initClusterID() error { ctx, cancel := context.WithCancel(c.ctx) defer cancel() for _, u := range c.GetURLs() { - members, err := c.getMembers(ctx, u, c.timeout) + members, err := c.getMembers(ctx, u, c.clientOption.timeout) if err != nil || members.GetHeader() == nil { log.Warn("[pd] failed to get cluster id", zap.String("url", u), errs.ZapError(err)) continue @@ -330,7 +325,7 @@ func (c *baseClient) updateURLs(members []*pdpb.Member) { } c.urls.Store(urls) // Update the connection contexts when member changes if TSO Follower Proxy is enabled. - if c.enableTSOFollowerProxy { + if c.clientOption.GetEnableTSOFollowerProxy() { c.scheduleUpdateConnectionCtxs() } log.Info("[pd] update member urls", zap.Strings("old-urls", oldURLs), zap.Strings("new-urls", urls)) @@ -418,7 +413,7 @@ func (c *baseClient) getOrCreateGRPCConn(addr string) (*grpc.ClientConn, error) } dCtx, cancel := context.WithTimeout(c.ctx, dialTimeout) defer cancel() - cc, err := grpcutil.GetClientConn(dCtx, addr, tlsCfg, c.gRPCDialOptions...) + cc, err := grpcutil.GetClientConn(dCtx, addr, tlsCfg, c.clientOption.gRPCDialOptions...) if err != nil { return nil, err } diff --git a/client/client.go b/client/client.go index 160257d0440..692f8604faa 100644 --- a/client/client.go +++ b/client/client.go @@ -157,8 +157,7 @@ type tsoRequest struct { } type tsoBatchController struct { - maxBatchSize int - maxBatchWaitInterval time.Duration + maxBatchSize int // bestBatchSize is a dynamic size that changed based on the current batch effect. bestBatchSize int @@ -169,10 +168,9 @@ type tsoBatchController struct { batchStartTime time.Time } -func newTSOBatchController(tsoRequestCh chan *tsoRequest, maxBatchSize int, maxBatchWaitInterval time.Duration) *tsoBatchController { +func newTSOBatchController(tsoRequestCh chan *tsoRequest, maxBatchSize int) *tsoBatchController { return &tsoBatchController{ maxBatchSize: maxBatchSize, - maxBatchWaitInterval: maxBatchWaitInterval, bestBatchSize: 8, /* Starting from a low value is necessary because we need to make sure it will be converged to (current_batch_size - 4) */ tsoRequestCh: tsoRequestCh, collectedRequests: make([]*tsoRequest, maxBatchSize+1), @@ -182,7 +180,10 @@ func newTSOBatchController(tsoRequestCh chan *tsoRequest, maxBatchSize int, maxB // fetchPendingRequests will start a new round of the batch collecting from the channel. // It returns true if everything goes well, otherwise false which means we should stop the service. -func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context) error { +func (tbc *tsoBatchController) fetchPendingRequests( + ctx context.Context, + maxTSOBatchWaitInterval time.Duration, +) error { var firstTSORequest *tsoRequest select { case <-ctx.Done(): @@ -209,7 +210,7 @@ fetchPendingRequestsLoop: // Check whether we should fetch more pending TSO requests from the channel. // TODO: maybe consider the actual load that returns through a TSO response from PD server. - if tbc.collectedRequestCount >= tbc.maxBatchSize || tbc.maxBatchWaitInterval <= 0 { + if tbc.collectedRequestCount >= tbc.maxBatchSize || maxTSOBatchWaitInterval <= 0 { return nil } @@ -217,7 +218,7 @@ fetchPendingRequestsLoop: // Try to collect `tbc.bestBatchSize` requests, or wait `tbc.maxBatchWaitInterval` // when `tbc.collectedRequestCount` is less than the `tbc.bestBatchSize`. if tbc.collectedRequestCount < tbc.bestBatchSize { - after := time.NewTimer(tbc.maxBatchWaitInterval) + after := time.NewTimer(maxTSOBatchWaitInterval) defer after.Stop() for tbc.collectedRequestCount < tbc.bestBatchSize { select { @@ -277,12 +278,10 @@ type lastTSO struct { } const ( - defaultPDTimeout = 3 * time.Second dialTimeout = 3 * time.Second updateMemberTimeout = time.Second // Use a shorter timeout to recover faster from network isolation. tsLoopDCCheckInterval = time.Minute defaultMaxTSOBatchSize = 10000 // should be higher if client is sending requests in burst - maxInitClusterRetries = 100 retryInterval = 1 * time.Second maxRetryTimes = 5 ) @@ -299,48 +298,48 @@ var ( errTSOLength = errors.New("[pd] tso length in rpc response is incorrect") ) -// ClientOption configures client. -type ClientOption func(c *client) +// ClientOptionFunc configures ClientOption. +type ClientOptionFunc func(co *ClientOption) // WithGRPCDialOptions configures the client with gRPC dial options. -func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { - return func(c *client) { - c.gRPCDialOptions = append(c.gRPCDialOptions, opts...) +func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOptionFunc { + return func(co *ClientOption) { + co.gRPCDialOptions = append(co.gRPCDialOptions, opts...) } } // WithCustomTimeoutOption configures the client with timeout option. -func WithCustomTimeoutOption(timeout time.Duration) ClientOption { - return func(c *client) { - c.timeout = timeout +func WithCustomTimeoutOption(timeout time.Duration) ClientOptionFunc { + return func(co *ClientOption) { + co.timeout = timeout } } // WithForwardingOption configures the client with forwarding option. -func WithForwardingOption(enableForwarding bool) ClientOption { - return func(c *client) { - c.enableForwarding = enableForwarding +func WithForwardingOption(enableForwarding bool) ClientOptionFunc { + return func(co *ClientOption) { + co.enableForwarding = enableForwarding } } // WithMaxErrorRetry configures the client max retry times when connect meets error. -func WithMaxErrorRetry(count int) ClientOption { - return func(c *client) { - c.maxRetryTimes = count +func WithMaxErrorRetry(count int) ClientOptionFunc { + return func(co *ClientOption) { + co.maxRetryTimes = count } } -// WithTSOFollowerProxy configures the client with TSO Follower Proxy option. -func WithTSOFollowerProxy(enable bool) ClientOption { - return func(c *client) { - c.enableTSOFollowerProxy = enable +// WithMaxTSOBatchWaitInterval configures the client max TSO batch wait interval. +func WithMaxTSOBatchWaitInterval(interval time.Duration) ClientOptionFunc { + return func(co *ClientOption) { + co.SetMaxTSOBatchWaitInterval(interval) } } -// WithMaxTSOBatchWaitInterval configures the client max TSO batch wait interval. -func WithMaxTSOBatchWaitInterval(maxTSOBatchWaitInterval time.Duration) ClientOption { - return func(c *client) { - c.maxTSOBatchWaitInterval = maxTSOBatchWaitInterval +// WithTSOFollowerProxy configures the client with TSO Follower Proxy option. +func WithTSOFollowerProxy(enable bool) ClientOptionFunc { + return func(co *ClientOption) { + co.SetEnableTSOFollowerProxy(enable) } } @@ -357,28 +356,41 @@ type client struct { // For internal usage. checkTSDeadlineCh chan struct{} leaderNetworkFailure int32 - - // Client options. - enableForwarding bool - // TODO: make `maxTSOBatchWaitInterval` can be changed manually online. - maxTSOBatchWaitInterval time.Duration } // NewClient creates a PD client. -func NewClient(pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { - return NewClientWithContext(context.Background(), pdAddrs, security, opts...) +func NewClient(pdAddrs []string, security SecurityOption, opts ...ClientOptionFunc) (Client, error) { + return createClient(context.Background(), pdAddrs, security, nil, opts...) } -// NewClientWithContext creates a PD client with context. -func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) { +// NewClientWithContext creates a PD client with the given context. +func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOptionFunc) (Client, error) { + return createClient(ctx, pdAddrs, security, nil, opts...) +} + +// NewClientWithClientOption creates a PD client with the given ClientOption. +func NewClientWithClientOption(pdAddrs []string, security SecurityOption, clientOption *ClientOption, opts ...ClientOptionFunc) (Client, error) { + return createClient(context.Background(), pdAddrs, security, clientOption, opts...) +} + +func createClient( + ctx context.Context, + pdAddrs []string, + security SecurityOption, + clientOption *ClientOption, + opts ...ClientOptionFunc, +) (Client, error) { log.Info("[pd] create pd client with endpoints", zap.Strings("pd-address", pdAddrs)) c := &client{ baseClient: newBaseClient(ctx, addrsToUrls(pdAddrs), security), checkTSDeadlineCh: make(chan struct{}), } + if clientOption != nil { + c.clientOption = clientOption + } // Inject the client options. for _, opt := range opts { - opt(c) + opt(c.clientOption) } // Init the client base. if err := c.init(); err != nil { @@ -439,7 +451,7 @@ func (c *client) leaderCheckLoop() { } func (c *client) checkLeaderHealth(ctx context.Context) { - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) defer cancel() if cc, ok := c.clientConns.Load(c.GetLeaderAddr()); ok { healthCli := healthpb.NewHealthClient(cc.(*grpc.ClientConn)) @@ -524,7 +536,7 @@ func (c *client) checkStreamTimeout(streamCtx context.Context, cancel context.Ca select { case <-done: return - case <-time.After(c.timeout): + case <-time.After(c.clientOption.timeout): cancel() case <-streamCtx.Done(): } @@ -535,7 +547,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { start := time.Now() defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) req := &pdpb.GetMembersRequest{Header: c.requestHeader()} ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) resp, err := c.getClient().GetMembers(ctx, req) @@ -594,7 +606,7 @@ func (c *client) checkAllocator( log.Info("[pd] the leader of the allocator leader is changed", zap.String("dc", dc), zap.String("origin", url), zap.String("new", u)) return } - healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.timeout) + healthCtx, healthCancel := context.WithTimeout(dispatcherCtx, c.clientOption.timeout) resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) failpoint.Inject("unreachableNetwork", func() { resp.Status = healthpb.HealthCheckResponse_UNKNOWN @@ -632,9 +644,11 @@ func (c *client) checkTSODispatcher(dcLocation string) bool { func (c *client) createTSODispatcher(dcLocation string) { dispatcherCtx, dispatcherCancel := context.WithCancel(c.ctx) dispatcher := &tsoDispatcher{ - dispatcherCtx: dispatcherCtx, - dispatcherCancel: dispatcherCancel, - tsoBatchController: newTSOBatchController(make(chan *tsoRequest, defaultMaxTSOBatchSize*2), defaultMaxTSOBatchSize, c.maxTSOBatchWaitInterval), + dispatcherCtx: dispatcherCtx, + dispatcherCancel: dispatcherCancel, + tsoBatchController: newTSOBatchController( + make(chan *tsoRequest, defaultMaxTSOBatchSize*2), + defaultMaxTSOBatchSize), } // Each goroutine is responsible for handling the tso stream request for its dc-location. // The only case that will make the dispatcher goroutine exit @@ -675,7 +689,7 @@ func (c *client) handleDispatcher( if dc == globalDCLocation { go func() { var updateTicker = &time.Ticker{} - if c.enableTSOFollowerProxy { + if c.clientOption.GetEnableTSOFollowerProxy() { updateTicker = time.NewTicker(memberUpdateInterval) defer updateTicker.Stop() } @@ -683,6 +697,16 @@ func (c *client) handleDispatcher( select { case <-dispatcherCtx.Done(): return + case <-c.clientOption.enableTSOFollowerProxyCh: + if c.clientOption.GetEnableTSOFollowerProxy() && updateTicker.C == nil { + updateTicker = time.NewTicker(memberUpdateInterval) + defer updateTicker.Stop() + } else if !c.clientOption.GetEnableTSOFollowerProxy() { + if updateTicker != nil { + updateTicker.Stop() + updateTicker = &time.Ticker{} + } + } case <-updateTicker.C: case <-c.updateConnectionCtxsCh: } @@ -707,7 +731,7 @@ func (c *client) handleDispatcher( if stream == nil { log.Info("[pd] tso stream is not ready", zap.String("dc", dc)) c.updateConnectionCtxs(dispatcherCtx, dc, &connectionCtxs) - if retryTimeConsuming >= c.timeout { + if retryTimeConsuming >= c.clientOption.timeout { err = errs.ErrClientCreateTSOStream.FastGenByArgs() log.Error("[pd] create tso stream error", zap.String("dc-location", dc), errs.ZapError(err)) c.ScheduleCheckLeader() @@ -725,16 +749,17 @@ func (c *client) handleDispatcher( } retryTimeConsuming = 0 // Start to collect the TSO requests. - if err = tbc.fetchPendingRequests(dispatcherCtx); err != nil { + maxTSOBatchWaitInterval := c.clientOption.GetMaxTSOBatchWaitInterval() + if err = tbc.fetchPendingRequests(dispatcherCtx, maxTSOBatchWaitInterval); err != nil { log.Error("[pd] fetch pending tso requests error", zap.String("dc-location", dc), errs.ZapError(errs.ErrClientGetTSO, err)) return } - if tbc.maxBatchWaitInterval >= 0 { + if maxTSOBatchWaitInterval >= 0 { tbc.adjustBestBatchSize() } done := make(chan struct{}) dl := deadline{ - timer: time.After(c.timeout), + timer: time.After(c.clientOption.timeout), done: done, cancel: cancel, } @@ -782,7 +807,7 @@ func (c *client) handleDispatcher( // TSO Follower Proxy only supports the Global TSO proxy now. func (c *client) allowTSOFollowerProxy(dc string) bool { - return dc == globalDCLocation && c.enableTSOFollowerProxy + return dc == globalDCLocation && c.clientOption.GetEnableTSOFollowerProxy() } // chooseStream uses the reservoir sampling algorithm to randomly choose a connection. @@ -860,7 +885,7 @@ func (c *client) tryConnect( return nil } - if err != nil && c.enableForwarding { + if err != nil && c.clientOption.enableForwarding { // The reason we need to judge if the error code is equal to "Canceled" here is that // when we create a stream we use a goroutine to manually control the timeout of the connection. // There is no need to wait for the transport layer timeout which can reduce the time of unavailability. @@ -1105,7 +1130,7 @@ func (c *client) followerClient() (pdpb.PDClient, string) { if cc, err = c.getOrCreateGRPCConn(addr); err != nil { continue } - healthCtx, healthCancel := context.WithTimeout(c.ctx, c.timeout) + healthCtx, healthCancel := context.WithTimeout(c.ctx, c.clientOption.timeout) resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) healthCancel() if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { @@ -1116,7 +1141,7 @@ func (c *client) followerClient() (pdpb.PDClient, string) { } func (c *client) getClient() pdpb.PDClient { - if c.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 { + if c.clientOption.enableForwarding && atomic.LoadInt32(&c.leaderNetworkFailure) == 1 { followerClient, addr := c.followerClient() if followerClient != nil { log.Debug("[pd] use follower client", zap.String("addr", addr)) @@ -1140,7 +1165,7 @@ func (c *client) getAllClients() map[string]pdpb.PDClient { if cc, err = c.getOrCreateGRPCConn(addr); err != nil { continue } - healthCtx, healthCancel := context.WithTimeout(c.ctx, c.timeout) + healthCtx, healthCancel := context.WithTimeout(c.ctx, c.clientOption.timeout) resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) healthCancel() if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { @@ -1261,7 +1286,7 @@ func (c *client) GetRegion(ctx context.Context, key []byte) (*Region, error) { start := time.Now() defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) req := &pdpb.GetRegionRequest{ Header: c.requestHeader(), RegionKey: key, @@ -1328,7 +1353,7 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte) (*Region, error) start := time.Now() defer func() { cmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) req := &pdpb.GetRegionRequest{ Header: c.requestHeader(), RegionKey: key, @@ -1353,7 +1378,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64) (*Region, e start := time.Now() defer func() { cmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) req := &pdpb.GetRegionByIDRequest{ Header: c.requestHeader(), RegionId: regionID, @@ -1381,7 +1406,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) var cancel context.CancelFunc scanCtx := ctx if _, ok := ctx.Deadline(); !ok { - scanCtx, cancel = context.WithTimeout(ctx, c.timeout) + scanCtx, cancel = context.WithTimeout(ctx, c.clientOption.timeout) defer cancel() } req := &pdpb.ScanRegionsRequest{ @@ -1438,7 +1463,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e start := time.Now() defer func() { cmdDurationGetStore.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) req := &pdpb.GetStoreRequest{ Header: c.requestHeader(), StoreId: storeID, @@ -1480,7 +1505,7 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m start := time.Now() defer func() { cmdDurationGetAllStores.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) req := &pdpb.GetAllStoresRequest{ Header: c.requestHeader(), ExcludeTombstoneStores: options.excludeTombstone, @@ -1505,7 +1530,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 start := time.Now() defer func() { cmdDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) req := &pdpb.UpdateGCSafePointRequest{ Header: c.requestHeader(), SafePoint: safePoint, @@ -1535,7 +1560,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, start := time.Now() defer func() { cmdDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) req := &pdpb.UpdateServiceGCSafePointRequest{ Header: c.requestHeader(), ServiceId: []byte(serviceID), @@ -1566,7 +1591,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g start := time.Now() defer func() { cmdDurationScatterRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) req := &pdpb.ScatterRegionRequest{ Header: c.requestHeader(), RegionId: regionID, @@ -1600,7 +1625,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe start := time.Now() defer func() { cmdDurationGetOperator.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) defer cancel() req := &pdpb.GetOperatorRequest{ Header: c.requestHeader(), @@ -1618,7 +1643,7 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...R } start := time.Now() defer func() { cmdDurationSplitRegions.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) defer cancel() options := &RegionsOp{} for _, opt := range opts { @@ -1646,7 +1671,7 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint for _, opt := range opts { opt(options) } - ctx, cancel := context.WithTimeout(ctx, c.timeout) + ctx, cancel := context.WithTimeout(ctx, c.clientOption.timeout) req := &pdpb.ScatterRegionRequest{ Header: c.requestHeader(), Group: options.group, diff --git a/client/client_option.go b/client/client_option.go new file mode 100644 index 00000000000..730e03fdabb --- /dev/null +++ b/client/client_option.go @@ -0,0 +1,96 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "sync/atomic" + "time" + + "google.golang.org/grpc" +) + +const ( + defaultPDTimeout = 3 * time.Second + maxInitClusterRetries = 100 + defaultMaxTSOBatchWaitInterval = 0 + defaultEnableTSOFollowerProxy = false +) + +// ClientOption is the configurable option for the PD client +type ClientOption struct { + // Static options. + gRPCDialOptions []grpc.DialOption + timeout time.Duration + maxRetryTimes int + enableForwarding bool + // Dynamic options. + // TODO: maybe using a more flexible way to do the dynamic registration for the new option. + maxTSOBatchWaitInterval atomic.Value // Store as time.Duration. + maxTSOBatchWaitIntervalCh chan struct{} + enableTSOFollowerProxy atomic.Value // Store as bool. + enableTSOFollowerProxyCh chan struct{} +} + +// NewClientOption creates a new ClientOption with the default values set. +func NewClientOption() *ClientOption { + co := &ClientOption{ + timeout: defaultPDTimeout, + maxRetryTimes: maxInitClusterRetries, + maxTSOBatchWaitIntervalCh: make(chan struct{}, 1), + enableTSOFollowerProxyCh: make(chan struct{}, 1), + } + co.maxTSOBatchWaitInterval.Store(time.Duration(defaultMaxTSOBatchWaitInterval)) + co.enableTSOFollowerProxy.Store(defaultEnableTSOFollowerProxy) + return co +} + +// SetMaxTSOBatchWaitInterval sets the max TSO batch wait interval option. +// It only accepts the interval value between 0 and 10ms. +func (co *ClientOption) SetMaxTSOBatchWaitInterval(interval time.Duration) { + if interval < 0 || interval > 10*time.Millisecond { + return + } + old := co.GetMaxTSOBatchWaitInterval() + if interval != old { + co.maxTSOBatchWaitInterval.Store(interval) + select { + case co.maxTSOBatchWaitIntervalCh <- struct{}{}: + default: + } + } +} + +// GetMaxTSOBatchWaitInterval gets the max TSO batch wait interval option. +func (co *ClientOption) GetMaxTSOBatchWaitInterval() time.Duration { + return co.maxTSOBatchWaitInterval.Load().(time.Duration) +} + +// SetEnableTSOFollowerProxy sets the TSO Follower Proxy option. +func (co *ClientOption) SetEnableTSOFollowerProxy(enable bool) { + old := co.GetEnableTSOFollowerProxy() + if enable != old { + co.enableTSOFollowerProxy.Store(enable) + select { + case co.enableTSOFollowerProxyCh <- struct{}{}: + default: + } + } +} + +// GetEnableTSOFollowerProxy gets the TSO Follower Proxy option. +func (co *ClientOption) GetEnableTSOFollowerProxy() bool { + value, ok := co.enableTSOFollowerProxy.Load().(bool) + return ok && value +} diff --git a/client/client_option_test.go b/client/client_option_test.go new file mode 100644 index 00000000000..fbd9bcd9efc --- /dev/null +++ b/client/client_option_test.go @@ -0,0 +1,62 @@ +// Copyright 2021 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pd + +import ( + "time" + + . "github.com/pingcap/check" + "github.com/tikv/pd/pkg/testutil" +) + +var _ = Suite(&testClientOptionSuite{}) + +type testClientOptionSuite struct{} + +func (s *testClientSuite) TestDynamicOptionChange(c *C) { + co := NewClientOption() + // Check the default value setting. + c.Assert(co.GetMaxTSOBatchWaitInterval(), Equals, time.Duration(defaultMaxTSOBatchWaitInterval)) + c.Assert(co.GetEnableTSOFollowerProxy(), Equals, defaultEnableTSOFollowerProxy) + + // Check the invalid value setting. + co.SetMaxTSOBatchWaitInterval(time.Second) + c.Assert(co.GetMaxTSOBatchWaitInterval(), Equals, time.Duration(defaultMaxTSOBatchWaitInterval)) + expectInterval := time.Millisecond + co.SetMaxTSOBatchWaitInterval(expectInterval) + // Check the value changing notification. + testutil.WaitUntil(c, func(c *C) bool { + <-co.maxTSOBatchWaitIntervalCh + return true + }) + c.Assert(co.GetMaxTSOBatchWaitInterval(), Equals, expectInterval) + // Check whether any data will be sent to the channel. + // It will panic if the test fails. + close(co.maxTSOBatchWaitIntervalCh) + co.SetMaxTSOBatchWaitInterval(expectInterval) + + expectBool := true + co.SetEnableTSOFollowerProxy(expectBool) + // Check the value changing notification. + testutil.WaitUntil(c, func(c *C) bool { + <-co.enableTSOFollowerProxyCh + return true + }) + c.Assert(co.GetEnableTSOFollowerProxy(), Equals, expectBool) + // Check whether any data will be sent to the channel. + // It will panic if the test fails. + close(co.enableTSOFollowerProxyCh) + co.SetEnableTSOFollowerProxy(expectBool) +} diff --git a/client/client_test.go b/client/client_test.go index 9f0e6bdeceb..e196467135c 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -60,7 +60,7 @@ func (s *testClientSuite) TestUpdateURLs(c *C) { } return } - cli := &baseClient{} + cli := &baseClient{clientOption: NewClientOption()} cli.urls.Store([]string{}) cli.updateURLs(members[1:]) c.Assert(cli.GetURLs(), DeepEquals, getURLs([]*pdpb.Member{members[1], members[3], members[2]})) @@ -107,10 +107,10 @@ func (s *testClientDialOptionSuite) TestGRPCDialOption(c *C) { ctx: ctx, cancel: cancel, security: SecurityOption{}, - gRPCDialOptions: []grpc.DialOption{grpc.WithBlock()}, + clientOption: NewClientOption(), } + cli.clientOption.gRPCDialOptions = []grpc.DialOption{grpc.WithBlock()} cli.urls.Store([]string{testClientURL}) - err := cli.updateMember() c.Assert(err, NotNil) c.Assert(time.Since(start), Greater, 500*time.Millisecond) diff --git a/tests/client/client_test.go b/tests/client/client_test.go index de6724cee9b..bbacf90225c 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -492,7 +492,7 @@ func (s *clientTestSuite) runServer(c *C, cluster *tests.TestCluster) []string { return endpoints } -func setupCli(c *C, ctx context.Context, endpoints []string, opts ...pd.ClientOption) pd.Client { +func setupCli(c *C, ctx context.Context, endpoints []string, opts ...pd.ClientOptionFunc) pd.Client { cli, err := pd.NewClientWithContext(ctx, endpoints, pd.SecurityOption{}, opts...) c.Assert(err, IsNil) return cli