From 692685388788a243f546d1f6055ac1c9eb0feabc Mon Sep 17 00:00:00 2001 From: JmPotato Date: Thu, 14 Nov 2024 15:35:30 +0800 Subject: [PATCH] Independent the client option Signed-off-by: JmPotato --- client/client.go | 185 +++++++-------- client/client_test.go | 9 +- client/gc_client.go | 6 +- client/keyspace_client.go | 6 +- client/meta_storage_client.go | 4 +- client/{ => opt}/option.go | 214 +++++++++--------- client/{ => opt}/option_test.go | 46 ++-- client/pd_service_discovery.go | 23 +- client/tso_client.go | 25 +- client/tso_dispatcher.go | 19 +- client/tso_dispatcher_test.go | 23 +- client/tso_service_discovery.go | 11 +- tests/integrations/client/client_test.go | 53 ++--- tests/integrations/client/client_tls_test.go | 9 +- tests/integrations/mcs/testutil.go | 7 +- tests/integrations/mcs/tso/server_test.go | 3 +- .../realcluster/cluster_id_test.go | 3 +- tests/integrations/tso/client_test.go | 5 +- tools/pd-api-bench/cases/cases.go | 3 +- tools/pd-api-bench/main.go | 5 +- tools/pd-tso-bench/main.go | 11 +- 21 files changed, 344 insertions(+), 326 deletions(-) rename client/{ => opt}/option.go (65%) rename client/{ => opt}/option_test.go (55%) diff --git a/client/client.go b/client/client.go index d919e3665ae..020dbbc700c 100644 --- a/client/client.go +++ b/client/client.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/tlsutil" "go.uber.org/zap" ) @@ -78,25 +79,25 @@ type RPCClient interface { // taking care of region change. // Also, it may return nil if PD finds no Region for the key temporarily, // client should retry later. - GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) + GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) // GetRegionFromMember gets a region from certain members. - GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) + GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...opt.GetRegionOption) (*Region, error) // GetPrevRegion gets the previous region and its leader Peer of the region where the key is located. - GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) + GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) // GetRegionByID gets a region and its leader Peer from PD by id. - GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) + GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error) // Deprecated: use BatchScanRegions instead. // ScanRegions gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. It returns all the regions in the given range if limit <= 0. // If a region has no leader, corresponding leader will be placed by a peer // with empty value (PeerID is 0). - ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) + ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error) // BatchScanRegions gets a list of regions, starts from the region that contains key. // Limit limits the maximum number of regions returned. It returns all the regions in the given ranges if limit <= 0. // If a region has no leader, corresponding leader will be placed by a peer // with empty value (PeerID is 0). // The returned regions are flattened, even there are key ranges located in the same region, only one region will be returned. - BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error) + BatchScanRegions(ctx context.Context, keyRanges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error) // GetStore gets a store from PD by store id. // The store may expire later. Caller is responsible for caching and taking care // of store change. @@ -104,7 +105,7 @@ type RPCClient interface { // GetAllStores gets all stores from pd. // The store may expire later. Caller is responsible for caching and taking care // of store change. - GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*metapb.Store, error) + GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) // UpdateGCSafePoint TiKV will check it and do GC themselves if necessary. // If the given safePoint is less than the current one, it will not be updated. // Returns the new safePoint after updating. @@ -120,11 +121,11 @@ type RPCClient interface { ScatterRegion(ctx context.Context, regionID uint64) error // ScatterRegions scatters the specified regions. Should use it for a batch of regions, // and the distribution of these regions will be dispersed. - ScatterRegions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) + ScatterRegions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) // SplitRegions split regions by given split keys - SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitRegionsResponse, error) + SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitRegionsResponse, error) // SplitAndScatterRegions split regions by given split keys and scatter new regions - SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) + SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) // GetOperator gets the status of operator of the specified region. GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOperatorResponse, error) @@ -166,7 +167,7 @@ type Client interface { GetServiceDiscovery() ServiceDiscovery // UpdateOption updates the client option. - UpdateOption(option DynamicOption, value any) error + UpdateOption(option opt.DynamicOption, value any) error // Close closes the client. Close() @@ -226,7 +227,7 @@ type client struct { cancel context.CancelFunc wg sync.WaitGroup tlsCfg *tls.Config - option *option + option *opt.Option } // SecurityOption records options about tls @@ -274,7 +275,7 @@ func (r *KeyRange) EscapeAsHexStr() (startKeyStr, endKeyStr string) { // NewClient creates a PD client. func NewClient( - svrAddrs []string, security SecurityOption, opts ...ClientOption, + svrAddrs []string, security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { return NewClientWithContext(context.Background(), svrAddrs, security, opts...) } @@ -282,7 +283,7 @@ func NewClient( // NewClientWithContext creates a PD client with context. This API uses the default keyspace id 0. func NewClientWithContext( ctx context.Context, svrAddrs []string, - security SecurityOption, opts ...ClientOption, + security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { return createClientWithKeyspace(ctx, nullKeyspaceID, svrAddrs, security, opts...) } @@ -291,7 +292,7 @@ func NewClientWithContext( // And now, it's only for test purpose. func NewClientWithKeyspace( ctx context.Context, keyspaceID uint32, svrAddrs []string, - security SecurityOption, opts ...ClientOption, + security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { if keyspaceID < defaultKeyspaceID || keyspaceID > maxKeyspaceID { return nil, errors.Errorf("invalid keyspace id %d. It must be in the range of [%d, %d]", @@ -303,7 +304,7 @@ func NewClientWithKeyspace( // createClientWithKeyspace creates a client with context and the specified keyspace id. func createClientWithKeyspace( ctx context.Context, keyspaceID uint32, svrAddrs []string, - security SecurityOption, opts ...ClientOption, + security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { tlsCfg, err := tlsutil.TLSConfig{ CAPath: security.CAPath, @@ -325,12 +326,12 @@ func createClientWithKeyspace( keyspaceID: keyspaceID, svrUrls: svrAddrs, tlsCfg: tlsCfg, - option: newOption(), + option: opt.NewOption(), } // Inject the client options. for _, opt := range opts { - opt(c) + opt(c.option) } c.pdSvcDiscovery = newPDServiceDiscovery( @@ -407,7 +408,7 @@ func (apiCtx *apiContextV2) GetKeyspaceName() (keyspaceName string) { // NewClientWithAPIContext creates a client according to the API context. func NewClientWithAPIContext( ctx context.Context, apiCtx APIContext, svrAddrs []string, - security SecurityOption, opts ...ClientOption, + security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { apiVersion, keyspaceName := apiCtx.GetAPIVersion(), apiCtx.GetKeyspaceName() switch apiVersion { @@ -423,7 +424,7 @@ func NewClientWithAPIContext( // newClientWithKeyspaceName creates a client with context and the specified keyspace name. func newClientWithKeyspaceName( ctx context.Context, keyspaceName string, svrAddrs []string, - security SecurityOption, opts ...ClientOption, + security SecurityOption, opts ...opt.ClientOption, ) (Client, error) { tlsCfg, err := tlsutil.TLSConfig{ CAPath: security.CAPath, @@ -445,12 +446,12 @@ func newClientWithKeyspaceName( cancel: clientCancel, svrUrls: svrAddrs, tlsCfg: tlsCfg, - option: newOption(), + option: opt.NewOption(), } // Inject the client options. for _, opt := range opts { - opt(c) + opt(c.option) } updateKeyspaceIDFunc := func() error { @@ -484,8 +485,8 @@ func newClientWithKeyspaceName( func (c *client) setup() error { // Init the metrics. - if c.option.initMetrics { - initAndRegisterMetrics(c.option.metricsLabels) + if c.option.InitMetrics { + initAndRegisterMetrics(c.option.MetricsLabels) } // Init the client base. @@ -520,7 +521,7 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) { c.Lock() defer c.Unlock() - if c.option.useTSOServerProxy { + if c.option.UseTSOServerProxy { // If we are using TSO server proxy, we always use PD_SVC_MODE. newMode = pdpb.ServiceMode_PD_SVC_MODE } @@ -628,17 +629,17 @@ func (c *client) GetServiceDiscovery() ServiceDiscovery { } // UpdateOption updates the client option. -func (c *client) UpdateOption(option DynamicOption, value any) error { +func (c *client) UpdateOption(option opt.DynamicOption, value any) error { switch option { - case MaxTSOBatchWaitInterval: + case opt.MaxTSOBatchWaitInterval: interval, ok := value.(time.Duration) if !ok { return errors.New("[pd] invalid value type for MaxTSOBatchWaitInterval option, it should be time.Duration") } - if err := c.option.setMaxTSOBatchWaitInterval(interval); err != nil { + if err := c.option.SetMaxTSOBatchWaitInterval(interval); err != nil { return err } - case EnableTSOFollowerProxy: + case opt.EnableTSOFollowerProxy: if c.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE { return errors.New("[pd] tso follower proxy is only supported in PD service mode") } @@ -646,19 +647,19 @@ func (c *client) UpdateOption(option DynamicOption, value any) error { if !ok { return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool") } - c.option.setEnableTSOFollowerProxy(enable) - case EnableFollowerHandle: + c.option.SetEnableTSOFollowerProxy(enable) + case opt.EnableFollowerHandle: enable, ok := value.(bool) if !ok { return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool") } - c.option.setEnableFollowerHandle(enable) - case TSOClientRPCConcurrency: + c.option.SetEnableFollowerHandle(enable) + case opt.TSOClientRPCConcurrency: value, ok := value.(int) if !ok { return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int") } - c.option.setTSOClientRPCConcurrency(value) + c.option.SetTSOClientRPCConcurrency(value) default: return errors.New("[pd] unsupported client option") } @@ -670,7 +671,7 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) { start := time.Now() defer func() { cmdDurationGetAllMembers.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() req := &pdpb.GetMembersRequest{Header: c.requestHeader()} protoClient, ctx := c.getClientAndContext(ctx) @@ -798,7 +799,7 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e default: return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode") } - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() // Call GetMinTS API to get the minimal TS from the API leader. protoClient, ctx := c.getClientAndContext(ctx) @@ -847,7 +848,7 @@ func handleRegionResponse(res *pdpb.GetRegionResponse) *Region { } // GetRegionFromMember implements the RPCClient interface. -func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...GetRegionOption) (*Region, error) { +func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, _ ...opt.GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -886,26 +887,26 @@ func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs } // GetRegion implements the RPCClient interface. -func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) { +func (c *client) GetRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetRegion", opentracing.ChildOf(span.Context())) defer span.Finish() } start := time.Now() defer func() { cmdDurationGetRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() - options := &GetRegionOp{} + options := &opt.GetRegionOp{} for _, opt := range opts { opt(options) } req := &pdpb.GetRegionRequest{ Header: c.requestHeader(), RegionKey: key, - NeedBuckets: options.needBuckets, + NeedBuckets: options.NeedBuckets, } - serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle()) + serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.AllowFollowerHandle && c.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -925,26 +926,26 @@ func (c *client) GetRegion(ctx context.Context, key []byte, opts ...GetRegionOpt } // GetPrevRegion implements the RPCClient interface. -func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) { +func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetPrevRegion", opentracing.ChildOf(span.Context())) defer span.Finish() } start := time.Now() defer func() { cmdDurationGetPrevRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() - options := &GetRegionOp{} + options := &opt.GetRegionOp{} for _, opt := range opts { opt(options) } req := &pdpb.GetRegionRequest{ Header: c.requestHeader(), RegionKey: key, - NeedBuckets: options.needBuckets, + NeedBuckets: options.NeedBuckets, } - serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle()) + serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.AllowFollowerHandle && c.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -964,26 +965,26 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegio } // GetRegionByID implements the RPCClient interface. -func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...GetRegionOption) (*Region, error) { +func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt.GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetRegionByID", opentracing.ChildOf(span.Context())) defer span.Finish() } start := time.Now() defer func() { cmdDurationGetRegionByID.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() - options := &GetRegionOp{} + options := &opt.GetRegionOp{} for _, opt := range opts { opt(options) } req := &pdpb.GetRegionByIDRequest{ Header: c.requestHeader(), RegionId: regionID, - NeedBuckets: options.needBuckets, + NeedBuckets: options.NeedBuckets, } - serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.allowFollowerHandle && c.option.getEnableFollowerHandle()) + serviceClient, cctx := c.getRegionAPIClientAndContext(ctx, options.AllowFollowerHandle && c.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -1003,7 +1004,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get } // ScanRegions implements the RPCClient interface. -func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) { +func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...opt.GetRegionOption) ([]*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -1014,10 +1015,10 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, var cancel context.CancelFunc scanCtx := ctx if _, ok := ctx.Deadline(); !ok { - scanCtx, cancel = context.WithTimeout(ctx, c.option.timeout) + scanCtx, cancel = context.WithTimeout(ctx, c.option.Timeout) defer cancel() } - options := &GetRegionOp{} + options := &opt.GetRegionOp{} for _, opt := range opts { opt(options) } @@ -1027,7 +1028,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, EndKey: endKey, Limit: int32(limit), } - serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle()) + serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.AllowFollowerHandle && c.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -1053,7 +1054,7 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, } // BatchScanRegions implements the RPCClient interface. -func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...GetRegionOption) ([]*Region, error) { +func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit int, opts ...opt.GetRegionOption) ([]*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.BatchScanRegions", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -1064,10 +1065,10 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit var cancel context.CancelFunc scanCtx := ctx if _, ok := ctx.Deadline(); !ok { - scanCtx, cancel = context.WithTimeout(ctx, c.option.timeout) + scanCtx, cancel = context.WithTimeout(ctx, c.option.Timeout) defer cancel() } - options := &GetRegionOp{} + options := &opt.GetRegionOp{} for _, opt := range opts { opt(options) } @@ -1077,12 +1078,12 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit } req := &pdpb.BatchScanRegionsRequest{ Header: c.requestHeader(), - NeedBuckets: options.needBuckets, + NeedBuckets: options.NeedBuckets, Ranges: pbRanges, Limit: int32(limit), - ContainAllKeyRange: options.outputMustContainAllKeyRange, + ContainAllKeyRange: options.OutputMustContainAllKeyRange, } - serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.allowFollowerHandle && c.option.getEnableFollowerHandle()) + serviceClient, cctx := c.getRegionAPIClientAndContext(scanCtx, options.AllowFollowerHandle && c.option.GetEnableFollowerHandle()) if serviceClient == nil { return nil, errs.ErrClientGetProtoClient } @@ -1160,7 +1161,7 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e start := time.Now() defer func() { cmdDurationGetStore.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() req := &pdpb.GetStoreRequest{ Header: c.requestHeader(), @@ -1190,9 +1191,9 @@ func handleStoreResponse(resp *pdpb.GetStoreResponse) (*metapb.Store, error) { } // GetAllStores implements the RPCClient interface. -func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*metapb.Store, error) { +func (c *client) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) ([]*metapb.Store, error) { // Applies options - options := &GetStoreOp{} + options := &opt.GetStoreOp{} for _, opt := range opts { opt(options) } @@ -1204,11 +1205,11 @@ func (c *client) GetAllStores(ctx context.Context, opts ...GetStoreOption) ([]*m start := time.Now() defer func() { cmdDurationGetAllStores.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() req := &pdpb.GetAllStoresRequest{ Header: c.requestHeader(), - ExcludeTombstoneStores: options.excludeTombstone, + ExcludeTombstoneStores: options.ExcludeTombstone, } protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1231,7 +1232,7 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6 start := time.Now() defer func() { cmdDurationUpdateGCSafePoint.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() req := &pdpb.UpdateGCSafePointRequest{ Header: c.requestHeader(), @@ -1262,7 +1263,7 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, start := time.Now() defer func() { cmdDurationUpdateServiceGCSafePoint.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() req := &pdpb.UpdateServiceGCSafePointRequest{ Header: c.requestHeader(), @@ -1295,7 +1296,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g start := time.Now() defer func() { cmdDurationScatterRegion.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() req := &pdpb.ScatterRegionRequest{ Header: c.requestHeader(), @@ -1317,7 +1318,7 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g } // ScatterRegions implements the RPCClient interface. -func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) { +func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.ScatterRegions", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -1326,24 +1327,24 @@ func (c *client) ScatterRegions(ctx context.Context, regionsID []uint64, opts .. } // SplitAndScatterRegions implements the RPCClient interface. -func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) { +func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitAndScatterRegionsResponse, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.SplitAndScatterRegions", opentracing.ChildOf(span.Context())) defer span.Finish() } start := time.Now() defer func() { cmdDurationSplitAndScatterRegions.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() - options := &RegionsOp{} + options := &opt.RegionsOp{} for _, opt := range opts { opt(options) } req := &pdpb.SplitAndScatterRegionsRequest{ Header: c.requestHeader(), SplitKeys: splitKeys, - Group: options.group, - RetryLimit: options.retryLimit, + Group: options.Group, + RetryLimit: options.RetryLimit, } protoClient, ctx := c.getClientAndContext(ctx) @@ -1362,7 +1363,7 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe start := time.Now() defer func() { cmdDurationGetOperator.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() req := &pdpb.GetOperatorRequest{ Header: c.requestHeader(), @@ -1376,23 +1377,23 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe } // SplitRegions split regions by given split keys -func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...RegionsOption) (*pdpb.SplitRegionsResponse, error) { +func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...opt.RegionsOption) (*pdpb.SplitRegionsResponse, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.SplitRegions", opentracing.ChildOf(span.Context())) defer span.Finish() } start := time.Now() defer func() { cmdDurationSplitRegions.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() - options := &RegionsOp{} + options := &opt.RegionsOp{} for _, opt := range opts { opt(options) } req := &pdpb.SplitRegionsRequest{ Header: c.requestHeader(), SplitKeys: splitKeys, - RetryLimit: options.retryLimit, + RetryLimit: options.RetryLimit, } protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1407,21 +1408,21 @@ func (c *client) requestHeader() *pdpb.RequestHeader { } } -func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint64, opts ...RegionsOption) (*pdpb.ScatterRegionResponse, error) { +func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint64, opts ...opt.RegionsOption) (*pdpb.ScatterRegionResponse, error) { start := time.Now() defer func() { cmdDurationScatterRegions.Observe(time.Since(start).Seconds()) }() - options := &RegionsOp{} + options := &opt.RegionsOp{} for _, opt := range opts { opt(options) } - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() req := &pdpb.ScatterRegionRequest{ Header: c.requestHeader(), - Group: options.group, + Group: options.Group, RegionsId: regionsID, - RetryLimit: options.retryLimit, - SkipStoreLimit: options.skipStoreLimit, + RetryLimit: options.RetryLimit, + SkipStoreLimit: options.SkipStoreLimit, } protoClient, ctx := c.getClientAndContext(ctx) @@ -1452,7 +1453,7 @@ func trimHTTPPrefix(str string) string { // LoadGlobalConfig implements the RPCClient interface. func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPath string) ([]GlobalConfigItem, int64, error) { - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1484,7 +1485,7 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items for i, it := range items { resArr[i] = &pdpb.GlobalConfigItem{Name: it.Name, Value: it.Value, Kind: it.EventType, Payload: it.PayLoad} } - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1502,7 +1503,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis // TODO: Add retry mechanism // register watch components there globalConfigWatcherCh := make(chan []GlobalConfigItem, 16) - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1551,7 +1552,7 @@ func (c *client) WatchGlobalConfig(ctx context.Context, configPath string, revis // GetExternalTimestamp implements the RPCClient interface. func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { @@ -1572,7 +1573,7 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) { // SetExternalTimestamp implements the RPCClient interface. func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) error { - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { diff --git a/client/client_test.go b/client/client_test.go index 2a26c24e1e2..61c538bb48e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/stretchr/testify/require" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/testutil" "github.com/tikv/pd/client/utils/tsoutil" "go.uber.org/goleak" @@ -55,7 +56,7 @@ func TestUpdateURLs(t *testing.T) { } return } - cli := &pdServiceDiscovery{option: newOption()} + cli := &pdServiceDiscovery{option: opt.NewOption()} cli.urls.Store([]string{}) cli.updateURLs(members[1:]) re.Equal(getURLs([]*pdpb.Member{members[1], members[3], members[2]}), cli.GetServiceURLs()) @@ -86,7 +87,7 @@ func TestClientCtx(t *testing.T) { func TestClientWithRetry(t *testing.T) { re := require.New(t) start := time.Now() - _, err := NewClientWithContext(context.TODO(), []string{testClientURL}, SecurityOption{}, WithMaxErrorRetry(5)) + _, err := NewClientWithContext(context.TODO(), []string{testClientURL}, SecurityOption{}, opt.WithMaxErrorRetry(5)) re.Error(err) re.Less(time.Since(start), time.Second*10) } @@ -101,10 +102,10 @@ func TestGRPCDialOption(t *testing.T) { ctx: ctx, cancel: cancel, tlsCfg: nil, - option: newOption(), + option: opt.NewOption(), } cli.urls.Store([]string{testClientURL}) - cli.option.gRPCDialOptions = []grpc.DialOption{grpc.WithBlock()} + cli.option.GRPCDialOptions = []grpc.DialOption{grpc.WithBlock()} err := cli.updateMember() re.Error(err) re.Greater(time.Since(start), 500*time.Millisecond) diff --git a/client/gc_client.go b/client/gc_client.go index 21eb0051499..538538ec50c 100644 --- a/client/gc_client.go +++ b/client/gc_client.go @@ -41,7 +41,7 @@ func (c *client) UpdateGCSafePointV2(ctx context.Context, keyspaceID uint32, saf start := time.Now() defer func() { cmdDurationUpdateGCSafePointV2.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) req := &pdpb.UpdateGCSafePointV2Request{ Header: c.requestHeader(), KeyspaceId: keyspaceID, @@ -70,7 +70,7 @@ func (c *client) UpdateServiceSafePointV2(ctx context.Context, keyspaceID uint32 start := time.Now() defer func() { cmdDurationUpdateServiceSafePointV2.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) req := &pdpb.UpdateServiceSafePointV2Request{ Header: c.requestHeader(), KeyspaceId: keyspaceID, @@ -99,7 +99,7 @@ func (c *client) WatchGCSafePointV2(ctx context.Context, revision int64) (chan [ Revision: revision, } - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() protoClient, ctx := c.getClientAndContext(ctx) if protoClient == nil { diff --git a/client/keyspace_client.go b/client/keyspace_client.go index e52a4f85f05..bdef1edec11 100644 --- a/client/keyspace_client.go +++ b/client/keyspace_client.go @@ -52,7 +52,7 @@ func (c *client) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.Key } start := time.Now() defer func() { cmdDurationLoadKeyspace.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) req := &keyspacepb.LoadKeyspaceRequest{ Header: c.requestHeader(), Name: name, @@ -96,7 +96,7 @@ func (c *client) UpdateKeyspaceState(ctx context.Context, id uint32, state keysp } start := time.Now() defer func() { cmdDurationUpdateKeyspaceState.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) req := &keyspacepb.UpdateKeyspaceStateRequest{ Header: c.requestHeader(), Id: id, @@ -140,7 +140,7 @@ func (c *client) GetAllKeyspaces(ctx context.Context, startID uint32, limit uint } start := time.Now() defer func() { cmdDurationGetAllKeyspaces.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) req := &keyspacepb.GetAllKeyspacesRequest{ Header: c.requestHeader(), StartId: startID, diff --git a/client/meta_storage_client.go b/client/meta_storage_client.go index a53d495241f..fd0e326f500 100644 --- a/client/meta_storage_client.go +++ b/client/meta_storage_client.go @@ -118,7 +118,7 @@ func (c *client) Put(ctx context.Context, key, value []byte, opts ...OpOption) ( start := time.Now() defer func() { cmdDurationPut.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) req := &meta_storagepb.PutRequest{ Key: key, Value: value, @@ -157,7 +157,7 @@ func (c *client) Get(ctx context.Context, key []byte, opts ...OpOption) (*meta_s start := time.Now() defer func() { cmdDurationGet.Observe(time.Since(start).Seconds()) }() - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) req := &meta_storagepb.GetRequest{ Key: key, RangeEnd: options.rangeEnd, diff --git a/client/option.go b/client/opt/option.go similarity index 65% rename from client/option.go rename to client/opt/option.go index ca21dcfefbf..b90ff3a905c 100644 --- a/client/option.go +++ b/client/opt/option.go @@ -1,4 +1,4 @@ -// Copyright 2021 TiKV Project Authors. +// Copyright 2024 TiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pd +package opt import ( "sync/atomic" @@ -50,31 +50,31 @@ const ( dynamicOptionCount ) -// option is the configurable option for the PD client. +// Option is the configurable option for the PD client. // It provides the ability to change some PD client's options online from the outside. -type option struct { +type Option struct { // Static options. - gRPCDialOptions []grpc.DialOption - timeout time.Duration - maxRetryTimes int - enableForwarding bool - useTSOServerProxy bool - metricsLabels prometheus.Labels - initMetrics bool + GRPCDialOptions []grpc.DialOption + Timeout time.Duration + MaxRetryTimes int + EnableForwarding bool + UseTSOServerProxy bool + MetricsLabels prometheus.Labels + InitMetrics bool // Dynamic options. dynamicOptions [dynamicOptionCount]atomic.Value - enableTSOFollowerProxyCh chan struct{} + EnableTSOFollowerProxyCh chan struct{} } -// newOption creates a new PD client option with the default values set. -func newOption() *option { - co := &option{ - timeout: defaultPDTimeout, - maxRetryTimes: maxInitClusterRetries, - enableTSOFollowerProxyCh: make(chan struct{}, 1), - initMetrics: true, +// NewOption creates a new PD client option with the default values set. +func NewOption() *Option { + co := &Option{ + Timeout: defaultPDTimeout, + MaxRetryTimes: maxInitClusterRetries, + EnableTSOFollowerProxyCh: make(chan struct{}, 1), + InitMetrics: true, } co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval) @@ -84,68 +84,124 @@ func newOption() *option { return co } -// setMaxTSOBatchWaitInterval sets the max TSO batch wait interval option. +// SetMaxTSOBatchWaitInterval sets the max TSO batch wait interval option. // It only accepts the interval value between 0 and 10ms. -func (o *option) setMaxTSOBatchWaitInterval(interval time.Duration) error { +func (o *Option) SetMaxTSOBatchWaitInterval(interval time.Duration) error { if interval < 0 || interval > 10*time.Millisecond { return errors.New("[pd] invalid max TSO batch wait interval, should be between 0 and 10ms") } - old := o.getMaxTSOBatchWaitInterval() + old := o.GetMaxTSOBatchWaitInterval() if interval != old { o.dynamicOptions[MaxTSOBatchWaitInterval].Store(interval) } return nil } -// setEnableFollowerHandle set the Follower Handle option. -func (o *option) setEnableFollowerHandle(enable bool) { - old := o.getEnableFollowerHandle() +// SetEnableFollowerHandle set the Follower Handle option. +func (o *Option) SetEnableFollowerHandle(enable bool) { + old := o.GetEnableFollowerHandle() if enable != old { o.dynamicOptions[EnableFollowerHandle].Store(enable) } } -// getMaxTSOBatchWaitInterval gets the Follower Handle enable option. -func (o *option) getEnableFollowerHandle() bool { +// GetEnableFollowerHandle gets the Follower Handle enable option. +func (o *Option) GetEnableFollowerHandle() bool { return o.dynamicOptions[EnableFollowerHandle].Load().(bool) } -// getMaxTSOBatchWaitInterval gets the max TSO batch wait interval option. -func (o *option) getMaxTSOBatchWaitInterval() time.Duration { +// GetMaxTSOBatchWaitInterval gets the max TSO batch wait interval option. +func (o *Option) GetMaxTSOBatchWaitInterval() time.Duration { return o.dynamicOptions[MaxTSOBatchWaitInterval].Load().(time.Duration) } -// setEnableTSOFollowerProxy sets the TSO Follower Proxy option. -func (o *option) setEnableTSOFollowerProxy(enable bool) { - old := o.getEnableTSOFollowerProxy() +// SetEnableTSOFollowerProxy sets the TSO Follower Proxy option. +func (o *Option) SetEnableTSOFollowerProxy(enable bool) { + old := o.GetEnableTSOFollowerProxy() if enable != old { o.dynamicOptions[EnableTSOFollowerProxy].Store(enable) select { - case o.enableTSOFollowerProxyCh <- struct{}{}: + case o.EnableTSOFollowerProxyCh <- struct{}{}: default: } } } -// getEnableTSOFollowerProxy gets the TSO Follower Proxy option. -func (o *option) getEnableTSOFollowerProxy() bool { +// GetEnableTSOFollowerProxy gets the TSO Follower Proxy option. +func (o *Option) GetEnableTSOFollowerProxy() bool { return o.dynamicOptions[EnableTSOFollowerProxy].Load().(bool) } -func (o *option) setTSOClientRPCConcurrency(value int) { - old := o.getTSOClientRPCConcurrency() +// SetTSOClientRPCConcurrency sets the TSO client RPC concurrency option. +func (o *Option) SetTSOClientRPCConcurrency(value int) { + old := o.GetTSOClientRPCConcurrency() if value != old { o.dynamicOptions[TSOClientRPCConcurrency].Store(value) } } -func (o *option) getTSOClientRPCConcurrency() int { +// GetTSOClientRPCConcurrency gets the TSO client RPC concurrency option. +func (o *Option) GetTSOClientRPCConcurrency() int { return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int) } +// ClientOption configures client. +type ClientOption func(*Option) + +// WithGRPCDialOptions configures the client with gRPC dial options. +func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { + return func(op *Option) { + op.GRPCDialOptions = append(op.GRPCDialOptions, opts...) + } +} + +// WithCustomTimeoutOption configures the client with timeout option. +func WithCustomTimeoutOption(timeout time.Duration) ClientOption { + return func(op *Option) { + op.Timeout = timeout + } +} + +// WithForwardingOption configures the client with forwarding option. +func WithForwardingOption(enableForwarding bool) ClientOption { + return func(op *Option) { + op.EnableForwarding = enableForwarding + } +} + +// WithTSOServerProxyOption configures the client to use TSO server proxy, +// i.e., the client will send TSO requests to the API leader (the TSO server +// proxy) which will forward the requests to the TSO servers. +func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption { + return func(op *Option) { + op.UseTSOServerProxy = useTSOServerProxy + } +} + +// WithMaxErrorRetry configures the client max retry times when connect meets error. +func WithMaxErrorRetry(count int) ClientOption { + return func(op *Option) { + op.MaxRetryTimes = count + } +} + +// WithMetricsLabels configures the client with metrics labels. +func WithMetricsLabels(labels prometheus.Labels) ClientOption { + return func(op *Option) { + op.MetricsLabels = labels + } +} + +// WithInitMetricsOption configures the client with metrics labels. +func WithInitMetricsOption(initMetrics bool) ClientOption { + return func(op *Option) { + op.InitMetrics = initMetrics + } +} + // GetStoreOp represents available options when getting stores. type GetStoreOp struct { - excludeTombstone bool + ExcludeTombstone bool } // GetStoreOption configures GetStoreOp. @@ -153,14 +209,14 @@ type GetStoreOption func(*GetStoreOp) // WithExcludeTombstone excludes tombstone stores from the result. func WithExcludeTombstone() GetStoreOption { - return func(op *GetStoreOp) { op.excludeTombstone = true } + return func(op *GetStoreOp) { op.ExcludeTombstone = true } } // RegionsOp represents available options when operate regions type RegionsOp struct { - group string - retryLimit uint64 - skipStoreLimit bool + Group string + RetryLimit uint64 + SkipStoreLimit bool } // RegionsOption configures RegionsOp @@ -168,24 +224,24 @@ type RegionsOption func(op *RegionsOp) // WithGroup specify the group during Scatter/Split Regions func WithGroup(group string) RegionsOption { - return func(op *RegionsOp) { op.group = group } + return func(op *RegionsOp) { op.Group = group } } // WithRetry specify the retry limit during Scatter/Split Regions func WithRetry(retry uint64) RegionsOption { - return func(op *RegionsOp) { op.retryLimit = retry } + return func(op *RegionsOp) { op.RetryLimit = retry } } // WithSkipStoreLimit specify if skip the store limit check during Scatter/Split Regions func WithSkipStoreLimit() RegionsOption { - return func(op *RegionsOp) { op.skipStoreLimit = true } + return func(op *RegionsOp) { op.SkipStoreLimit = true } } // GetRegionOp represents available options when getting regions. type GetRegionOp struct { - needBuckets bool - allowFollowerHandle bool - outputMustContainAllKeyRange bool + NeedBuckets bool + AllowFollowerHandle bool + OutputMustContainAllKeyRange bool } // GetRegionOption configures GetRegionOp. @@ -193,69 +249,15 @@ type GetRegionOption func(op *GetRegionOp) // WithBuckets means getting region and its buckets. func WithBuckets() GetRegionOption { - return func(op *GetRegionOp) { op.needBuckets = true } + return func(op *GetRegionOp) { op.NeedBuckets = true } } // WithAllowFollowerHandle means that client can send request to follower and let it handle this request. func WithAllowFollowerHandle() GetRegionOption { - return func(op *GetRegionOp) { op.allowFollowerHandle = true } + return func(op *GetRegionOp) { op.AllowFollowerHandle = true } } // WithOutputMustContainAllKeyRange means the output must contain all key ranges. func WithOutputMustContainAllKeyRange() GetRegionOption { - return func(op *GetRegionOp) { op.outputMustContainAllKeyRange = true } -} - -// ClientOption configures client. -type ClientOption func(c *client) - -// WithGRPCDialOptions configures the client with gRPC dial options. -func WithGRPCDialOptions(opts ...grpc.DialOption) ClientOption { - return func(c *client) { - c.option.gRPCDialOptions = append(c.option.gRPCDialOptions, opts...) - } -} - -// WithCustomTimeoutOption configures the client with timeout option. -func WithCustomTimeoutOption(timeout time.Duration) ClientOption { - return func(c *client) { - c.option.timeout = timeout - } -} - -// WithForwardingOption configures the client with forwarding option. -func WithForwardingOption(enableForwarding bool) ClientOption { - return func(c *client) { - c.option.enableForwarding = enableForwarding - } -} - -// WithTSOServerProxyOption configures the client to use TSO server proxy, -// i.e., the client will send TSO requests to the API leader (the TSO server -// proxy) which will forward the requests to the TSO servers. -func WithTSOServerProxyOption(useTSOServerProxy bool) ClientOption { - return func(c *client) { - c.option.useTSOServerProxy = useTSOServerProxy - } -} - -// WithMaxErrorRetry configures the client max retry times when connect meets error. -func WithMaxErrorRetry(count int) ClientOption { - return func(c *client) { - c.option.maxRetryTimes = count - } -} - -// WithMetricsLabels configures the client with metrics labels. -func WithMetricsLabels(labels prometheus.Labels) ClientOption { - return func(c *client) { - c.option.metricsLabels = labels - } -} - -// WithInitMetricsOption configures the client with metrics labels. -func WithInitMetricsOption(initMetrics bool) ClientOption { - return func(c *client) { - c.option.initMetrics = initMetrics - } + return func(op *GetRegionOp) { op.OutputMustContainAllKeyRange = true } } diff --git a/client/option_test.go b/client/opt/option_test.go similarity index 55% rename from client/option_test.go rename to client/opt/option_test.go index 84e5dd3abce..4a0b8d16fcf 100644 --- a/client/option_test.go +++ b/client/opt/option_test.go @@ -1,4 +1,4 @@ -// Copyright 2021 TiKV Project Authors. +// Copyright 2024 TiKV Project Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package pd +package opt import ( "testing" @@ -24,43 +24,43 @@ import ( func TestDynamicOptionChange(t *testing.T) { re := require.New(t) - o := newOption() + o := NewOption() // Check the default value setting. - re.Equal(defaultMaxTSOBatchWaitInterval, o.getMaxTSOBatchWaitInterval()) - re.Equal(defaultEnableTSOFollowerProxy, o.getEnableTSOFollowerProxy()) - re.Equal(defaultEnableFollowerHandle, o.getEnableFollowerHandle()) + re.Equal(defaultMaxTSOBatchWaitInterval, o.GetMaxTSOBatchWaitInterval()) + re.Equal(defaultEnableTSOFollowerProxy, o.GetEnableTSOFollowerProxy()) + re.Equal(defaultEnableFollowerHandle, o.GetEnableFollowerHandle()) // Check the invalid value setting. - re.Error(o.setMaxTSOBatchWaitInterval(time.Second)) - re.Equal(defaultMaxTSOBatchWaitInterval, o.getMaxTSOBatchWaitInterval()) + re.Error(o.SetMaxTSOBatchWaitInterval(time.Second)) + re.Equal(defaultMaxTSOBatchWaitInterval, o.GetMaxTSOBatchWaitInterval()) expectInterval := time.Millisecond - o.setMaxTSOBatchWaitInterval(expectInterval) - re.Equal(expectInterval, o.getMaxTSOBatchWaitInterval()) + o.SetMaxTSOBatchWaitInterval(expectInterval) + re.Equal(expectInterval, o.GetMaxTSOBatchWaitInterval()) expectInterval = time.Duration(float64(time.Millisecond) * 0.5) - o.setMaxTSOBatchWaitInterval(expectInterval) - re.Equal(expectInterval, o.getMaxTSOBatchWaitInterval()) + o.SetMaxTSOBatchWaitInterval(expectInterval) + re.Equal(expectInterval, o.GetMaxTSOBatchWaitInterval()) expectInterval = time.Duration(float64(time.Millisecond) * 1.5) - o.setMaxTSOBatchWaitInterval(expectInterval) - re.Equal(expectInterval, o.getMaxTSOBatchWaitInterval()) + o.SetMaxTSOBatchWaitInterval(expectInterval) + re.Equal(expectInterval, o.GetMaxTSOBatchWaitInterval()) expectBool := true - o.setEnableTSOFollowerProxy(expectBool) + o.SetEnableTSOFollowerProxy(expectBool) // Check the value changing notification. testutil.Eventually(re, func() bool { - <-o.enableTSOFollowerProxyCh + <-o.EnableTSOFollowerProxyCh return true }) - re.Equal(expectBool, o.getEnableTSOFollowerProxy()) + re.Equal(expectBool, o.GetEnableTSOFollowerProxy()) // Check whether any data will be sent to the channel. // It will panic if the test fails. - close(o.enableTSOFollowerProxyCh) + close(o.EnableTSOFollowerProxyCh) // Setting the same value should not notify the channel. - o.setEnableTSOFollowerProxy(expectBool) + o.SetEnableTSOFollowerProxy(expectBool) expectBool = true - o.setEnableFollowerHandle(expectBool) - re.Equal(expectBool, o.getEnableFollowerHandle()) + o.SetEnableFollowerHandle(expectBool) + re.Equal(expectBool, o.GetEnableFollowerHandle()) expectBool = false - o.setEnableFollowerHandle(expectBool) - re.Equal(expectBool, o.getEnableFollowerHandle()) + o.SetEnableFollowerHandle(expectBool) + re.Equal(expectBool, o.GetEnableFollowerHandle()) } diff --git a/client/pd_service_discovery.go b/client/pd_service_discovery.go index c961e9e42fd..0bdc6868c65 100644 --- a/client/pd_service_discovery.go +++ b/client/pd_service_discovery.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/client/utils/grpcutil" "go.uber.org/zap" @@ -436,7 +437,7 @@ type pdServiceDiscovery struct { keyspaceID uint32 tlsCfg *tls.Config // Client option. - option *option + option *opt.Option } // NewDefaultPDServiceDiscovery returns a new default PD service discovery-based client. @@ -445,7 +446,7 @@ func NewDefaultPDServiceDiscovery( urls []string, tlsCfg *tls.Config, ) *pdServiceDiscovery { var wg sync.WaitGroup - return newPDServiceDiscovery(ctx, cancel, &wg, nil, nil, defaultKeyspaceID, urls, tlsCfg, newOption()) + return newPDServiceDiscovery(ctx, cancel, &wg, nil, nil, defaultKeyspaceID, urls, tlsCfg, opt.NewOption()) } // newPDServiceDiscovery returns a new PD service discovery-based client. @@ -455,7 +456,7 @@ func newPDServiceDiscovery( serviceModeUpdateCb func(pdpb.ServiceMode), updateKeyspaceIDFunc updateKeyspaceIDFunc, keyspaceID uint32, - urls []string, tlsCfg *tls.Config, option *option, + urls []string, tlsCfg *tls.Config, option *opt.Option, ) *pdServiceDiscovery { pdsd := &pdServiceDiscovery{ checkMembershipCh: make(chan struct{}, 1), @@ -515,7 +516,7 @@ func (c *pdServiceDiscovery) initRetry(f func() error) error { var err error ticker := time.NewTicker(time.Second) defer ticker.Stop() - for range c.option.maxRetryTimes { + for range c.option.MaxRetryTimes { if err = f(); err == nil { return nil } @@ -607,7 +608,7 @@ func (c *pdServiceDiscovery) memberHealthCheckLoop() { } func (c *pdServiceDiscovery) checkLeaderHealth(ctx context.Context) { - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + ctx, cancel := context.WithTimeout(ctx, c.option.Timeout) defer cancel() leader := c.getLeaderServiceClient() leader.checkNetworkAvailable(ctx) @@ -673,7 +674,7 @@ func (c *pdServiceDiscovery) discoverMicroservice(svcType serviceType) (urls []s case tsoService: leaderURL := c.getLeaderURL() if len(leaderURL) > 0 { - clusterInfo, err := c.getClusterInfo(c.ctx, leaderURL, c.option.timeout) + clusterInfo, err := c.getClusterInfo(c.ctx, leaderURL, c.option.Timeout) if err != nil { log.Error("[pd] failed to get cluster info", zap.String("leader-url", leaderURL), errs.ZapError(err)) @@ -744,7 +745,7 @@ func (c *pdServiceDiscovery) getServiceClientByKind(kind apiKind) ServiceClient // GetServiceClient returns the leader/primary ServiceClient if it is healthy. func (c *pdServiceDiscovery) GetServiceClient() ServiceClient { leaderClient := c.getLeaderServiceClient() - if c.option.enableForwarding && !leaderClient.Available() { + if c.option.EnableForwarding && !leaderClient.Available() { if followerClient := c.getServiceClientByKind(forwardAPIKind); followerClient != nil { log.Debug("[pd] use follower client", zap.String("url", followerClient.GetURL())) return followerClient @@ -823,7 +824,7 @@ func (c *pdServiceDiscovery) initClusterID() error { defer cancel() clusterID := uint64(0) for _, url := range c.GetServiceURLs() { - members, err := c.getMembers(ctx, url, c.option.timeout) + members, err := c.getMembers(ctx, url, c.option.Timeout) if err != nil || members.GetHeader() == nil { log.Warn("[pd] failed to get cluster id", zap.String("url", url), errs.ZapError(err)) continue @@ -854,7 +855,7 @@ func (c *pdServiceDiscovery) checkServiceModeChanged() error { return errors.New("no leader found") } - clusterInfo, err := c.getClusterInfo(c.ctx, leaderURL, c.option.timeout) + clusterInfo, err := c.getClusterInfo(c.ctx, leaderURL, c.option.Timeout) if err != nil { if strings.Contains(err.Error(), "Unimplemented") { // If the method is not supported, we set it to pd mode. @@ -967,7 +968,7 @@ func (c *pdServiceDiscovery) updateURLs(members []*pdpb.Member) { } c.urls.Store(urls) // Update the connection contexts when member changes if TSO Follower Proxy is enabled. - if c.option.getEnableTSOFollowerProxy() { + if c.option.GetEnableTSOFollowerProxy() { // Run callbacks to reflect the membership changes in the leader and followers. for _, cb := range c.membersChangedCbs { cb() @@ -1079,7 +1080,7 @@ func (c *pdServiceDiscovery) updateServiceClient(members []*pdpb.Member, leader // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given URL. func (c *pdServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) { - return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...) + return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.GRPCDialOptions...) } func addrsToURLs(addrs []string, tlsCfg *tls.Config) []string { diff --git a/client/tso_client.go b/client/tso_client.go index 584c5df6134..18e39dffd14 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/grpcutil" "go.uber.org/zap" "google.golang.org/grpc" @@ -68,7 +69,7 @@ type tsoClient struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - option *option + option *opt.Option svcDiscovery ServiceDiscovery tsoStreamBuilderFactory @@ -83,7 +84,7 @@ type tsoClient struct { // newTSOClient returns a new TSO client. func newTSOClient( - ctx context.Context, option *option, + ctx context.Context, option *opt.Option, svcDiscovery ServiceDiscovery, factory tsoStreamBuilderFactory, ) *tsoClient { ctx, cancel := context.WithCancel(ctx) @@ -111,7 +112,7 @@ func newTSOClient( return c } -func (c *tsoClient) getOption() *option { return c.option } +func (c *tsoClient) getOption() *opt.Option { return c.option } func (c *tsoClient) getServiceDiscovery() ServiceDiscovery { return c.svcDiscovery } @@ -207,7 +208,7 @@ func (c *tsoClient) backupClientConn() (*grpc.ClientConn, string) { if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(url); err != nil { continue } - healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) + healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.Timeout) resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) healthCancel() if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { @@ -232,7 +233,7 @@ type tsoConnectionContext struct { func (c *tsoClient) updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map) bool { // Normal connection creating, it will be affected by the `enableForwarding`. createTSOConnection := c.tryConnectToTSO - if c.option.getEnableTSOFollowerProxy() { + if c.option.GetEnableTSOFollowerProxy() { createTSOConnection = c.tryConnectToTSOWithProxy } if err := createTSOConnection(ctx, connectionCtxs); err != nil { @@ -285,7 +286,7 @@ func (c *tsoClient) tryConnectToTSO( } if cc != nil { cctx, cancel := context.WithCancel(ctx) - stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.Timeout) failpoint.Inject("unreachableNetwork", func() { stream = nil err = status.New(codes.Unavailable, "unavailable").Err() @@ -295,7 +296,7 @@ func (c *tsoClient) tryConnectToTSO( return nil } - if err != nil && c.option.enableForwarding { + if err != nil && c.option.EnableForwarding { // The reason we need to judge if the error code is equal to "Canceled" here is that // when we create a stream we use a goroutine to manually control the timeout of the connection. // There is no need to wait for the transport layer timeout which can reduce the time of unavailability. @@ -329,7 +330,7 @@ func (c *tsoClient) tryConnectToTSO( // create the follower stream cctx, cancel := context.WithCancel(ctx) cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) - stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.timeout) + stream, err = c.tsoStreamBuilderFactory.makeBuilder(backupClientConn).build(cctx, cancel, c.option.Timeout) if err == nil { forwardedHostTrim := trimHTTPPrefix(forwardedHost) addr := trimHTTPPrefix(backupURL) @@ -370,7 +371,7 @@ func (c *tsoClient) checkLeader( healthCli = healthpb.NewHealthClient(cc) } if healthCli != nil { - healthCtx, healthCancel := context.WithTimeout(ctx, c.option.timeout) + healthCtx, healthCancel := context.WithTimeout(ctx, c.option.Timeout) resp, err := healthCli.Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) failpoint.Inject("unreachableNetwork", func() { resp.Status = healthpb.HealthCheckResponse_UNKNOWN @@ -379,7 +380,7 @@ func (c *tsoClient) checkLeader( if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { // create a stream of the original tso leader cctx, cancel := context.WithCancel(ctx) - stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.timeout) + stream, err := c.tsoStreamBuilderFactory.makeBuilder(cc).build(cctx, cancel, c.option.Timeout) if err == nil && stream != nil { log.Info("[tso] recover the original tso stream since the network has become normal", zap.String("url", url)) updateAndClear(url, &tsoConnectionContext{cctx, cancel, url, stream}) @@ -435,7 +436,7 @@ func (c *tsoClient) tryConnectToTSOWithProxy( cctx = grpcutil.BuildForwardContext(cctx, forwardedHost) } // Create the TSO stream. - stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.timeout) + stream, err := tsoStreamBuilder.build(cctx, cancel, c.option.Timeout) if err == nil { if addr != leaderAddr { forwardedHostTrim := trimHTTPPrefix(forwardedHost) @@ -468,7 +469,7 @@ func (c *tsoClient) getAllTSOStreamBuilders() map[string]tsoStreamBuilder { if cc, err = c.svcDiscovery.GetOrCreateGRPCConn(addr); err != nil { continue } - healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.timeout) + healthCtx, healthCancel := context.WithTimeout(c.ctx, c.option.Timeout) resp, err := healthpb.NewHealthClient(cc).Check(healthCtx, &healthpb.HealthCheckRequest{Service: ""}) healthCancel() if err == nil && resp.GetStatus() == healthpb.HealthCheckResponse_SERVING { diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 3d77610179d..7d19a11c2d0 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/client/utils/timerutil" "github.com/tikv/pd/client/utils/tsoutil" @@ -67,7 +68,7 @@ type tsoInfo struct { } type tsoServiceProvider interface { - getOption() *option + getOption() *opt.Option getServiceDiscovery() ServiceDiscovery updateConnectionCtxs(ctx context.Context, connectionCtxs *sync.Map) bool } @@ -223,7 +224,7 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { stream *tsoStream ) // Loop through each batch of TSO requests and send them for processing. - streamLoopTimer := time.NewTimer(option.timeout) + streamLoopTimer := time.NewTimer(option.Timeout) defer streamLoopTimer.Stop() // Create a not-started-timer to be used for collecting batches for concurrent RPC. @@ -245,7 +246,7 @@ tsoBatchLoop: tsoBatchController = td.batchBufferPool.Get().(*batchController[*tsoRequest]) } - maxBatchWaitInterval := option.getMaxTSOBatchWaitInterval() + maxBatchWaitInterval := option.GetMaxTSOBatchWaitInterval() currentBatchStartTime := time.Now() // Update concurrency settings if needed. @@ -280,7 +281,7 @@ tsoBatchLoop: } // We need be careful here, see more details in the comments of Timer.Reset. // https://pkg.go.dev/time@master#Timer.Reset - streamLoopTimer.Reset(option.timeout) + streamLoopTimer.Reset(option.Timeout) // Choose a stream to send the TSO gRPC request. streamChoosingLoop: for { @@ -390,7 +391,7 @@ tsoBatchLoop: } done := make(chan struct{}) - dl := newTSDeadline(option.timeout, done, cancel) + dl := newTSDeadline(option.Timeout, done, cancel) select { case <-ctx.Done(): // Finish the collected requests if the context is canceled. @@ -485,8 +486,8 @@ func (td *tsoDispatcher) connectionCtxsUpdater() { case <-ctx.Done(): log.Info("[tso] exit tso connection contexts updater") return - case <-option.enableTSOFollowerProxyCh: - enableTSOFollowerProxy := option.getEnableTSOFollowerProxy() + case <-option.EnableTSOFollowerProxyCh: + enableTSOFollowerProxy := option.GetEnableTSOFollowerProxy() log.Info("[tso] tso follower proxy status changed", zap.Bool("enable", enableTSOFollowerProxy)) if enableTSOFollowerProxy && updateTicker.C == nil { @@ -711,8 +712,8 @@ func (td *tsoDispatcher) checkTSORPCConcurrency(ctx context.Context, maxBatchWai } td.lastCheckConcurrencyTime = now - newConcurrency := td.provider.getOption().getTSOClientRPCConcurrency() - if maxBatchWaitInterval > 0 || td.provider.getOption().getEnableTSOFollowerProxy() { + newConcurrency := td.provider.getOption().GetTSOClientRPCConcurrency() + if maxBatchWaitInterval > 0 || td.provider.getOption().GetEnableTSOFollowerProxy() { newConcurrency = 1 } diff --git a/client/tso_dispatcher_test.go b/client/tso_dispatcher_test.go index 2eb30066532..84bc6a4dc99 100644 --- a/client/tso_dispatcher_test.go +++ b/client/tso_dispatcher_test.go @@ -26,23 +26,24 @@ import ( "github.com/pingcap/log" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/client/opt" "go.uber.org/zap/zapcore" ) type mockTSOServiceProvider struct { - option *option + option *opt.Option createStream func(ctx context.Context) *tsoStream updateConnMu sync.Mutex } -func newMockTSOServiceProvider(option *option, createStream func(ctx context.Context) *tsoStream) *mockTSOServiceProvider { +func newMockTSOServiceProvider(option *opt.Option, createStream func(ctx context.Context) *tsoStream) *mockTSOServiceProvider { return &mockTSOServiceProvider{ option: option, createStream: createStream, } } -func (m *mockTSOServiceProvider) getOption() *option { +func (m *mockTSOServiceProvider) getOption() *opt.Option { return m.option } @@ -79,15 +80,15 @@ type testTSODispatcherSuite struct { stream *tsoStream dispatcher *tsoDispatcher dispatcherWg sync.WaitGroup - option *option + option *opt.Option reqPool *sync.Pool } func (s *testTSODispatcherSuite) SetupTest() { s.re = require.New(s.T()) - s.option = newOption() - s.option.timeout = time.Hour + s.option = opt.NewOption() + s.option.Timeout = time.Hour // As the internal logic of the tsoDispatcher allows it to create streams multiple times, but our tests needs // single stable access to the inner stream, we do not allow it to create it more than once in these tests. creating := new(atomic.Bool) @@ -203,7 +204,7 @@ func (s *testTSODispatcherSuite) checkIdleTokenCount(expectedTotal int) { func (s *testTSODispatcherSuite) testStaticConcurrencyImpl(concurrency int) { ctx := context.Background() - s.option.setTSOClientRPCConcurrency(concurrency) + s.option.SetTSOClientRPCConcurrency(concurrency) // Make sure the state of the mock stream is clear. Unexpected batching may make the requests sent to the stream // less than expected, causing there are more `generateNext` signals or generated results. @@ -285,7 +286,7 @@ func (s *testTSODispatcherSuite) TestConcurrentRPC() { func (s *testTSODispatcherSuite) TestBatchDelaying() { ctx := context.Background() - s.option.setTSOClientRPCConcurrency(2) + s.option.SetTSOClientRPCConcurrency(2) s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeNoDelay", "return")) s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoStreamSimulateEstimatedRPCLatency", `return("12ms")`)) @@ -309,13 +310,13 @@ func (s *testTSODispatcherSuite) TestBatchDelaying() { s.reqMustReady(req) // Try other concurrency. - s.option.setTSOClientRPCConcurrency(3) + s.option.SetTSOClientRPCConcurrency(3) s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("4ms")`)) req = s.sendReq(ctx) s.streamInner.generateNext() s.reqMustReady(req) - s.option.setTSOClientRPCConcurrency(4) + s.option.SetTSOClientRPCConcurrency(4) s.re.NoError(failpoint.Enable("github.com/tikv/pd/client/tsoDispatcherConcurrentModeAssertDelayDuration", `return("3ms")`)) req = s.sendReq(ctx) s.streamInner.generateNext() @@ -347,7 +348,7 @@ func BenchmarkTSODispatcherHandleRequests(b *testing.B) { return req } - dispatcher := newTSODispatcher(ctx, defaultMaxTSOBatchSize, newMockTSOServiceProvider(newOption(), nil)) + dispatcher := newTSODispatcher(ctx, defaultMaxTSOBatchSize, newMockTSOServiceProvider(opt.NewOption(), nil)) var wg sync.WaitGroup wg.Add(1) diff --git a/client/tso_service_discovery.go b/client/tso_service_discovery.go index b8debf05efe..d4c8d22e3fe 100644 --- a/client/tso_service_discovery.go +++ b/client/tso_service_discovery.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/kvproto/pkg/tsopb" "github.com/pingcap/log" "github.com/tikv/pd/client/errs" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/grpcutil" "go.uber.org/zap" "google.golang.org/grpc" @@ -149,13 +150,13 @@ type tsoServiceDiscovery struct { tlsCfg *tls.Config // Client option. - option *option + option *opt.Option } // newTSOServiceDiscovery returns a new client-side service discovery for the independent TSO service. func newTSOServiceDiscovery( ctx context.Context, metacli MetaStorageClient, apiSvcDiscovery ServiceDiscovery, - keyspaceID uint32, tlsCfg *tls.Config, option *option, + keyspaceID uint32, tlsCfg *tls.Config, option *opt.Option, ) ServiceDiscovery { ctx, cancel := context.WithCancel(ctx) c := &tsoServiceDiscovery{ @@ -190,9 +191,9 @@ func newTSOServiceDiscovery( // Init initialize the concrete client underlying func (c *tsoServiceDiscovery) Init() error { log.Info("initializing tso service discovery", - zap.Int("max-retry-times", c.option.maxRetryTimes), + zap.Int("max-retry-times", c.option.MaxRetryTimes), zap.Duration("retry-interval", initRetryInterval)) - if err := c.retry(c.option.maxRetryTimes, initRetryInterval, c.updateMember); err != nil { + if err := c.retry(c.option.MaxRetryTimes, initRetryInterval, c.updateMember); err != nil { log.Error("failed to update member. initialization failed.", zap.Error(err)) c.cancel() return err @@ -325,7 +326,7 @@ func (c *tsoServiceDiscovery) GetBackupURLs() []string { // GetOrCreateGRPCConn returns the corresponding grpc client connection of the given URL. func (c *tsoServiceDiscovery) GetOrCreateGRPCConn(url string) (*grpc.ClientConn, error) { - return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.gRPCDialOptions...) + return grpcutil.GetOrCreateGRPCConn(c.ctx, &c.clientConns, url, c.tlsCfg, c.option.GRPCDialOptions...) } // ScheduleCheckMemberChanged is used to trigger a check to see if there is any change in service endpoints. diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index b52fe389a9c..53eed7bdc84 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -38,6 +38,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/retry" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/utils/constant" @@ -224,7 +225,7 @@ func TestGetTSAfterTransferLeader(t *testing.T) { re.NotEmpty(leader) defer cluster.Destroy() - cli := setupCli(ctx, re, endpoints, pd.WithCustomTimeoutOption(10*time.Second)) + cli := setupCli(ctx, re, endpoints, opt.WithCustomTimeoutOption(10*time.Second)) defer cli.Close() var leaderSwitched atomic.Bool @@ -260,7 +261,7 @@ func TestTSOFollowerProxy(t *testing.T) { defer cli1.Close() cli2 := setupCli(ctx, re, endpoints) defer cli2.Close() - err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, true) + err = cli2.UpdateOption(opt.EnableTSOFollowerProxy, true) re.NoError(err) var wg sync.WaitGroup @@ -287,7 +288,7 @@ func TestTSOFollowerProxy(t *testing.T) { wg.Wait() // Disable the follower proxy and check if the stream is updated. - err = cli2.UpdateOption(pd.EnableTSOFollowerProxy, false) + err = cli2.UpdateOption(opt.EnableTSOFollowerProxy, false) re.NoError(err) wg.Add(tsoRequestConcurrencyNumber) @@ -342,7 +343,7 @@ func TestTSOFollowerProxyWithTSOService(t *testing.T) { re.NotNil(cli) defer cli.Close() // TSO service does not support the follower proxy, so enabling it should fail. - err = cli.UpdateOption(pd.EnableTSOFollowerProxy, true) + err = cli.UpdateOption(opt.EnableTSOFollowerProxy, true) re.Error(err) re.NoError(failpoint.Disable("github.com/tikv/pd/client/fastUpdateServiceMode")) } @@ -420,7 +421,7 @@ func TestCustomTimeout(t *testing.T) { defer cluster.Destroy() endpoints := runServer(re, cluster) - cli := setupCli(ctx, re, endpoints, pd.WithCustomTimeoutOption(time.Second)) + cli := setupCli(ctx, re, endpoints, opt.WithCustomTimeoutOption(time.Second)) defer cli.Close() start := time.Now() @@ -493,7 +494,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionByFollowerForwardin ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - cli := setupCli(ctx, re, suite.endpoints, pd.WithForwardingOption(true)) + cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true)) defer cli.Close() re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork1", "return(true)")) time.Sleep(200 * time.Millisecond) @@ -513,7 +514,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding1( re := suite.Require() ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - cli := setupCli(ctx, re, suite.endpoints, pd.WithForwardingOption(true)) + cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true)) defer cli.Close() re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)")) @@ -548,7 +549,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoByFollowerForwarding2( re := suite.Require() ctx, cancel := context.WithCancel(suite.ctx) defer cancel() - cli := setupCli(ctx, re, suite.endpoints, pd.WithForwardingOption(true)) + cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true)) defer cli.Close() re.NoError(failpoint.Enable("github.com/tikv/pd/client/unreachableNetwork", "return(true)")) @@ -585,7 +586,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetTsoAndRegionByFollowerFor follower := cluster.GetServer(cluster.GetFollower()) re.NoError(failpoint.Enable("github.com/tikv/pd/client/utils/grpcutil/unreachableNetwork2", fmt.Sprintf("return(\"%s\")", follower.GetAddr()))) - cli := setupCli(ctx, re, suite.endpoints, pd.WithForwardingOption(true)) + cli := setupCli(ctx, re, suite.endpoints, opt.WithForwardingOption(true)) defer cli.Close() var lastTS uint64 testutil.Eventually(re, func() bool { @@ -689,7 +690,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { cluster := suite.cluster cli := setupCli(ctx, re, suite.endpoints) defer cli.Close() - cli.UpdateOption(pd.EnableFollowerHandle, true) + cli.UpdateOption(opt.EnableFollowerHandle, true) re.NotEmpty(cluster.WaitLeader()) leader := cluster.GetLeaderServer() testutil.Eventually(re, func() bool { @@ -707,7 +708,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { // follower have no region cnt := 0 for range 100 { - resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + resp, err := cli.GetRegion(ctx, []byte("a"), opt.WithAllowFollowerHandle()) if err == nil && resp != nil { cnt++ } @@ -721,7 +722,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { time.Sleep(150 * time.Millisecond) cnt = 0 for range 100 { - resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + resp, err := cli.GetRegion(ctx, []byte("a"), opt.WithAllowFollowerHandle()) if err == nil && resp != nil { cnt++ } @@ -736,7 +737,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { time.Sleep(100 * time.Millisecond) cnt = 0 for range 100 { - resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + resp, err := cli.GetRegion(ctx, []byte("a"), opt.WithAllowFollowerHandle()) if err == nil && resp != nil { cnt++ } @@ -749,7 +750,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { re.NoError(failpoint.Enable("github.com/tikv/pd/server/followerHandleError", "return(true)")) cnt = 0 for range 100 { - resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + resp, err := cli.GetRegion(ctx, []byte("a"), opt.WithAllowFollowerHandle()) if err == nil && resp != nil { cnt++ } @@ -764,7 +765,7 @@ func (suite *followerForwardAndHandleTestSuite) TestGetRegionFromFollower() { time.Sleep(100 * time.Millisecond) cnt = 0 for range 100 { - resp, err := cli.GetRegion(ctx, []byte("a"), pd.WithAllowFollowerHandle()) + resp, err := cli.GetRegion(ctx, []byte("a"), opt.WithAllowFollowerHandle()) if err == nil && resp != nil { cnt++ } @@ -848,7 +849,7 @@ func runServer(re *require.Assertions, cluster *tests.TestCluster) []string { return endpoints } -func setupCli(ctx context.Context, re *require.Assertions, endpoints []string, opts ...pd.ClientOption) pd.Client { +func setupCli(ctx context.Context, re *require.Assertions, endpoints []string, opts ...opt.ClientOption) pd.Client { cli, err := pd.NewClientWithContext(ctx, endpoints, pd.SecurityOption{}, opts...) re.NoError(err) return cli @@ -1116,7 +1117,7 @@ func (suite *clientTestSuite) TestGetRegion() { } re.NoError(suite.reportBucket.Send(breq)) testutil.Eventually(re, func() bool { - r, err := suite.client.GetRegion(context.Background(), []byte("a"), pd.WithBuckets()) + r, err := suite.client.GetRegion(context.Background(), []byte("a"), opt.WithBuckets()) re.NoError(err) if r == nil { return false @@ -1126,7 +1127,7 @@ func (suite *clientTestSuite) TestGetRegion() { suite.srv.GetRaftCluster().GetOpts().(*config.PersistOptions).SetRegionBucketEnabled(false) testutil.Eventually(re, func() bool { - r, err := suite.client.GetRegion(context.Background(), []byte("a"), pd.WithBuckets()) + r, err := suite.client.GetRegion(context.Background(), []byte("a"), opt.WithBuckets()) re.NoError(err) if r == nil { return false @@ -1356,7 +1357,7 @@ func (suite *clientTestSuite) TestGetStore() { re.True(contains) // Should not return tombstone stores. - stores, err = suite.client.GetAllStores(context.Background(), pd.WithExcludeTombstone()) + stores, err = suite.client.GetAllStores(context.Background(), opt.WithExcludeTombstone()) re.NoError(err) for _, store := range stores { if store.GetId() == physicallyDestroyedStoreID { @@ -1570,7 +1571,7 @@ func (suite *clientTestSuite) TestScatterRegion() { regionsID := []uint64{regionID} // Test interface `ScatterRegions`. testutil.Eventually(re, func() bool { - scatterResp, err := suite.client.ScatterRegions(context.Background(), regionsID, pd.WithGroup("test"), pd.WithRetry(1)) + scatterResp, err := suite.client.ScatterRegions(context.Background(), regionsID, opt.WithGroup("test"), opt.WithRetry(1)) if err != nil { return false } @@ -1855,12 +1856,12 @@ func (suite *clientTestSuite) TestBatchScanRegions() { check := func(ranges []pd.KeyRange, limit int, expect []*metapb.Region) { for _, bucket := range []bool{false, true} { for _, outputMustContainAllKeyRange := range outputMustContainAllKeyRangeOptions { - var opts []pd.GetRegionOption + var opts []opt.GetRegionOption if bucket { - opts = append(opts, pd.WithBuckets()) + opts = append(opts, opt.WithBuckets()) } if outputMustContainAllKeyRange { - opts = append(opts, pd.WithOutputMustContainAllKeyRange()) + opts = append(opts, opt.WithOutputMustContainAllKeyRange()) } scanRegions, err := suite.client.BatchScanRegions(ctx, ranges, limit, opts...) re.NoError(err) @@ -1934,7 +1935,7 @@ func (suite *clientTestSuite) TestBatchScanRegions() { ctx, []pd.KeyRange{{StartKey: []byte{1}, EndKey: []byte{0}}}, 10, - pd.WithOutputMustContainAllKeyRange(), + opt.WithOutputMustContainAllKeyRange(), ) re.ErrorContains(err, "invalid key range, start key > end key") _, err = suite.client.BatchScanRegions(ctx, []pd.KeyRange{ @@ -1946,7 +1947,7 @@ func (suite *clientTestSuite) TestBatchScanRegions() { ctx, []pd.KeyRange{{StartKey: []byte{9}, EndKey: []byte{10, 1}}}, 10, - pd.WithOutputMustContainAllKeyRange(), + opt.WithOutputMustContainAllKeyRange(), ) re.ErrorContains(err, "found a hole region in the last") req := &pdpb.RegionHeartbeatRequest{ @@ -1971,7 +1972,7 @@ func (suite *clientTestSuite) TestBatchScanRegions() { ctx, []pd.KeyRange{{StartKey: []byte{9}, EndKey: []byte{101}}}, 10, - pd.WithOutputMustContainAllKeyRange(), + opt.WithOutputMustContainAllKeyRange(), ) return err != nil && strings.Contains(err.Error(), "found a hole region between") }) diff --git a/tests/integrations/client/client_tls_test.go b/tests/integrations/client/client_tls_test.go index 75706c3d902..92c25afc1cf 100644 --- a/tests/integrations/client/client_tls_test.go +++ b/tests/integrations/client/client_tls_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/netutil" "github.com/tikv/pd/server/config" @@ -165,7 +166,7 @@ func testTLSReload( CAPath: testClientTLSInfo.TrustedCAFile, CertPath: testClientTLSInfo.CertFile, KeyPath: testClientTLSInfo.KeyFile, - }, pd.WithGRPCDialOptions(grpc.WithBlock())) + }, opt.WithGRPCDialOptions(grpc.WithBlock())) if err != nil { errc <- err dcancel() @@ -196,7 +197,7 @@ func testTLSReload( CAPath: testClientTLSInfo.TrustedCAFile, CertPath: testClientTLSInfo.CertFile, KeyPath: testClientTLSInfo.KeyFile, - }, pd.WithGRPCDialOptions(grpc.WithBlock())) + }, opt.WithGRPCDialOptions(grpc.WithBlock())) re.NoError(err) dcancel() cli.Close() @@ -209,7 +210,7 @@ func testTLSReload( SSLCABytes: caData, SSLCertBytes: certData, SSLKEYBytes: keyData, - }, pd.WithGRPCDialOptions(grpc.WithBlock())) + }, opt.WithGRPCDialOptions(grpc.WithBlock())) re.NoError(err) defer cli.Close() cancel1() @@ -321,7 +322,7 @@ func testAllowedCN(ctx context.Context, endpoints []string, tls transport.TLSInf CAPath: tls.TrustedCAFile, CertPath: tls.CertFile, KeyPath: tls.KeyFile, - }, pd.WithGRPCDialOptions(grpc.WithBlock())) + }, opt.WithGRPCDialOptions(grpc.WithBlock())) if err != nil { return err } diff --git a/tests/integrations/mcs/testutil.go b/tests/integrations/mcs/testutil.go index d23da905f78..eb1e002f656 100644 --- a/tests/integrations/mcs/testutil.go +++ b/tests/integrations/mcs/testutil.go @@ -20,13 +20,14 @@ import ( "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" ) // SetupClientWithAPIContext creates a TSO client with api context name for test. func SetupClientWithAPIContext( - ctx context.Context, re *require.Assertions, apiCtx pd.APIContext, endpoints []string, opts ...pd.ClientOption, + ctx context.Context, re *require.Assertions, apiCtx pd.APIContext, endpoints []string, opts ...opt.ClientOption, ) pd.Client { cli, err := pd.NewClientWithAPIContext(ctx, apiCtx, endpoints, pd.SecurityOption{}, opts...) re.NoError(err) @@ -36,7 +37,7 @@ func SetupClientWithAPIContext( // SetupClientWithKeyspaceID creates a TSO client with the given keyspace id for test. func SetupClientWithKeyspaceID( ctx context.Context, re *require.Assertions, - keyspaceID uint32, endpoints []string, opts ...pd.ClientOption, + keyspaceID uint32, endpoints []string, opts ...opt.ClientOption, ) pd.Client { cli, err := pd.NewClientWithKeyspace(ctx, keyspaceID, endpoints, pd.SecurityOption{}, opts...) re.NoError(err) @@ -106,7 +107,7 @@ func WaitForMultiKeyspacesTSOAvailable( clients := make([]pd.Client, 0, len(keyspaceIDs)) for _, keyspaceID := range keyspaceIDs { - cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints, pd.WithForwardingOption(true)) + cli := SetupClientWithKeyspaceID(ctx, re, keyspaceID, backendEndpoints, opt.WithForwardingOption(true)) re.NotNil(cli) clients = append(clients, cli) diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index db07b72635f..77ec760d1c8 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mcs/discovery" tso "github.com/tikv/pd/pkg/mcs/tso/server" @@ -245,7 +246,7 @@ func NewAPIServerForward(re *require.Assertions) APIServerForward { re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)")) suite.pdClient, err = pd.NewClientWithContext(context.Background(), - []string{suite.backendEndpoints}, pd.SecurityOption{}, pd.WithMaxErrorRetry(1)) + []string{suite.backendEndpoints}, pd.SecurityOption{}, opt.WithMaxErrorRetry(1)) re.NoError(err) return suite } diff --git a/tests/integrations/realcluster/cluster_id_test.go b/tests/integrations/realcluster/cluster_id_test.go index e327f472bbb..59a56896693 100644 --- a/tests/integrations/realcluster/cluster_id_test.go +++ b/tests/integrations/realcluster/cluster_id_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" ) type clusterIDSuite struct { @@ -48,7 +49,7 @@ func (s *clusterIDSuite) TestClientClusterID() { // Try to create a client with the mixed endpoints. _, err := pd.NewClientWithContext( ctx, pdEndpoints, - pd.SecurityOption{}, pd.WithMaxErrorRetry(1), + pd.SecurityOption{}, opt.WithMaxErrorRetry(1), ) re.Error(err) re.Contains(err.Error(), "unmatched cluster id") diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index 41564a4b3de..19f8f4cd549 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/testutil" bs "github.com/tikv/pd/pkg/basicserver" "github.com/tikv/pd/pkg/mcs/utils/constant" @@ -148,7 +149,7 @@ func (suite *tsoClientTestSuite) SetupSuite() { func (suite *tsoClientTestSuite) SetupTest() { re := suite.Require() if suite.legacy { - client, err := pd.NewClientWithContext(suite.ctx, suite.getBackendEndpoints(), pd.SecurityOption{}, pd.WithForwardingOption(true)) + client, err := pd.NewClientWithContext(suite.ctx, suite.getBackendEndpoints(), pd.SecurityOption{}, opt.WithForwardingOption(true)) re.NoError(err) innerClient, ok := client.(interface{ GetServiceDiscovery() pd.ServiceDiscovery }) re.True(ok) @@ -548,7 +549,7 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) { // Create a pd client in PD mode to let the API leader to forward requests to the TSO cluster. re.NoError(failpoint.Enable("github.com/tikv/pd/client/usePDServiceMode", "return(true)")) pdClient, err := pd.NewClientWithContext(context.Background(), - []string{backendEndpoints}, pd.SecurityOption{}, pd.WithMaxErrorRetry(1)) + []string{backendEndpoints}, pd.SecurityOption{}, opt.WithMaxErrorRetry(1)) re.NoError(err) defer pdClient.Close() diff --git a/tools/pd-api-bench/cases/cases.go b/tools/pd-api-bench/cases/cases.go index 54c1247c208..118b8aaed5e 100644 --- a/tools/pd-api-bench/cases/cases.go +++ b/tools/pd-api-bench/cases/cases.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/log" pd "github.com/tikv/pd/client" pdHttp "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/opt" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -323,7 +324,7 @@ func newGetRegionEnableFollower() func() GRPCCase { func (*getRegionEnableFollower) unary(ctx context.Context, cli pd.Client) error { id := rand.Intn(totalRegion)*4 + 1 - _, err := cli.GetRegion(ctx, generateKeyForSimulator(id), pd.WithAllowFollowerHandle()) + _, err := cli.GetRegion(ctx, generateKeyForSimulator(id), opt.WithAllowFollowerHandle()) if err != nil { return err } diff --git a/tools/pd-api-bench/main.go b/tools/pd-api-bench/main.go index a61eff39cb1..fbe52aa470b 100644 --- a/tools/pd-api-bench/main.go +++ b/tools/pd-api-bench/main.go @@ -35,6 +35,7 @@ import ( flag "github.com/spf13/pflag" pd "github.com/tikv/pd/client" pdHttp "github.com/tikv/pd/client/http" + "github.com/tikv/pd/client/opt" "github.com/tikv/pd/client/utils/tlsutil" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/utils/logutil" @@ -118,7 +119,7 @@ func main() { pdClis := make([]pd.Client, cfg.Client) for i := range cfg.Client { pdClis[i] = newPDClient(ctx, cfg) - pdClis[i].UpdateOption(pd.EnableFollowerHandle, true) + pdClis[i].UpdateOption(opt.EnableFollowerHandle, true) } etcdClis := make([]*clientv3.Client, cfg.Client) for i := range cfg.Client { @@ -380,7 +381,7 @@ func newPDClient(ctx context.Context, cfg *config.Config) pd.Client { CertPath: cfg.CertPath, KeyPath: cfg.KeyPath, }, - pd.WithGRPCDialOptions( + opt.WithGRPCDialOptions( grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: keepaliveTime, Timeout: keepaliveTimeout, diff --git a/tools/pd-tso-bench/main.go b/tools/pd-tso-bench/main.go index bcf0ff6eb40..393ebb97026 100644 --- a/tools/pd-tso-bench/main.go +++ b/tools/pd-tso-bench/main.go @@ -33,6 +33,7 @@ import ( "github.com/pingcap/log" "github.com/prometheus/client_golang/prometheus/promhttp" pd "github.com/tikv/pd/client" + "github.com/tikv/pd/client/opt" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -430,11 +431,11 @@ func createPDClient(ctx context.Context) (pd.Client, error) { err error ) - opts := make([]pd.ClientOption, 0) + opts := make([]opt.ClientOption, 0) if *useTSOServerProxy { - opts = append(opts, pd.WithTSOServerProxyOption(true)) + opts = append(opts, opt.WithTSOServerProxyOption(true)) } - opts = append(opts, pd.WithGRPCDialOptions( + opts = append(opts, opt.WithGRPCDialOptions( grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: keepaliveTime, Timeout: keepaliveTimeout, @@ -459,8 +460,8 @@ func createPDClient(ctx context.Context) (pd.Client, error) { return nil, err } - pdCli.UpdateOption(pd.MaxTSOBatchWaitInterval, *maxBatchWaitInterval) - pdCli.UpdateOption(pd.EnableTSOFollowerProxy, *enableTSOFollowerProxy) + pdCli.UpdateOption(opt.MaxTSOBatchWaitInterval, *maxBatchWaitInterval) + pdCli.UpdateOption(opt.EnableTSOFollowerProxy, *enableTSOFollowerProxy) return pdCli, err }