From eb3da36cc29f61636eb9bb428699339c5957b0a0 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Tue, 28 Nov 2023 17:43:46 +0800 Subject: [PATCH 1/3] add follower option Signed-off-by: Cabinfever_B --- client/client.go | 22 +++++++++++++++++----- client/option.go | 17 +++++++++++++++++ client/option_test.go | 8 ++++++++ 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/client/client.go b/client/client.go index 2d30d9fb6c4..4eb064dd269 100644 --- a/client/client.go +++ b/client/client.go @@ -91,7 +91,7 @@ type Client interface { // client should retry later. GetRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) // GetRegionFromMember gets a region from certain members. - GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*Region, error) + GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) // GetPrevRegion gets the previous region and its leader Peer of the region where the key is located. GetPrevRegion(ctx context.Context, key []byte, opts ...GetRegionOption) (*Region, error) // GetRegionByID gets a region and its leader Peer from PD by id. @@ -100,7 +100,7 @@ type Client interface { // Limit limits the maximum number of regions returned. // If a region has no leader, corresponding leader will be placed by a peer // with empty value (PeerID is 0). - ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*Region, error) + ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) // GetStore gets a store from PD by store id. // The store may expire later. Caller is responsible for caching and taking care // of store change. @@ -200,7 +200,8 @@ func WithSkipStoreLimit() RegionsOption { // GetRegionOp represents available options when getting regions. type GetRegionOp struct { - needBuckets bool + needBuckets bool + allowFollowerHandle bool } // GetRegionOption configures GetRegionOp. @@ -211,6 +212,11 @@ func WithBuckets() GetRegionOption { return func(op *GetRegionOp) { op.needBuckets = true } } +// AllowFollowerHandle means that client can send request to follower and let it handle this request. +func AllowFollowerHandle() GetRegionOption { + return func(op *GetRegionOp) { op.allowFollowerHandle = true } +} + // LeaderHealthCheckInterval might be changed in the unit to shorten the testing time. var LeaderHealthCheckInterval = time.Second @@ -701,6 +707,12 @@ func (c *client) UpdateOption(option DynamicOption, value interface{}) error { return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool") } c.option.setEnableTSOFollowerProxy(enable) + case EnableFollowerHandle: + enable, ok := value.(bool) + if !ok { + return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool") + } + c.option.setEnableFollowerHandle(enable) default: return errors.New("[pd] unsupported client option") } @@ -952,7 +964,7 @@ func isNetworkError(code codes.Code) bool { return code == codes.Unavailable || code == codes.DeadlineExceeded } -func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string) (*Region, error) { +func (c *client) GetRegionFromMember(ctx context.Context, key []byte, memberURLs []string, opts ...GetRegionOption) (*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil { span = opentracing.StartSpan("pdclient.GetRegionFromMember", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -1056,7 +1068,7 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...Get return handleRegionResponse(resp), nil } -func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int) ([]*Region, error) { +func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int, opts ...GetRegionOption) ([]*Region, error) { if span := opentracing.SpanFromContext(ctx); span != nil { span = opentracing.StartSpan("pdclient.ScanRegions", opentracing.ChildOf(span.Context())) defer span.Finish() diff --git a/client/option.go b/client/option.go index d6a6d61d2f9..1b0b8413f82 100644 --- a/client/option.go +++ b/client/option.go @@ -28,6 +28,7 @@ const ( maxInitClusterRetries = 100 defaultMaxTSOBatchWaitInterval time.Duration = 0 defaultEnableTSOFollowerProxy = false + defaultEnableFollowerHandle = false ) // DynamicOption is used to distinguish the dynamic option type. @@ -40,6 +41,8 @@ const ( // EnableTSOFollowerProxy is the TSO Follower Proxy option. // It is stored as bool. EnableTSOFollowerProxy + // EnableFollowerHandle is the follower handle option. + EnableFollowerHandle dynamicOptionCount ) @@ -72,6 +75,7 @@ func newOption() *option { co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval) co.dynamicOptions[EnableTSOFollowerProxy].Store(defaultEnableTSOFollowerProxy) + co.dynamicOptions[EnableFollowerHandle].Store(defaultEnableFollowerHandle) return co } @@ -88,6 +92,19 @@ func (o *option) setMaxTSOBatchWaitInterval(interval time.Duration) error { return nil } +// setEnableFollowerHandle set the Follower Handle option. +func (o *option) setEnableFollowerHandle(enable bool) { + old := o.getEnableFollowerHandle() + if enable != old { + o.dynamicOptions[EnableFollowerHandle].Store(enable) + } +} + +// getMaxTSOBatchWaitIntervalgets the Follower Handle enable option. +func (o *option) getEnableFollowerHandle() bool { + return o.dynamicOptions[EnableFollowerHandle].Load().(bool) +} + // getMaxTSOBatchWaitInterval gets the max TSO batch wait interval option. func (o *option) getMaxTSOBatchWaitInterval() time.Duration { return o.dynamicOptions[MaxTSOBatchWaitInterval].Load().(time.Duration) diff --git a/client/option_test.go b/client/option_test.go index 1b5604f4d19..1a8faf8fcd9 100644 --- a/client/option_test.go +++ b/client/option_test.go @@ -28,6 +28,7 @@ func TestDynamicOptionChange(t *testing.T) { // Check the default value setting. re.Equal(defaultMaxTSOBatchWaitInterval, o.getMaxTSOBatchWaitInterval()) re.Equal(defaultEnableTSOFollowerProxy, o.getEnableTSOFollowerProxy()) + re.Equal(defaultEnableFollowerHandle, o.getEnableFollowerHandle()) // Check the invalid value setting. re.NotNil(o.setMaxTSOBatchWaitInterval(time.Second)) @@ -55,4 +56,11 @@ func TestDynamicOptionChange(t *testing.T) { close(o.enableTSOFollowerProxyCh) // Setting the same value should not notify the channel. o.setEnableTSOFollowerProxy(expectBool) + + expectBool = true + o.setEnableFollowerHandle(expectBool) + re.Equal(expectBool, o.getEnableFollowerHandle()) + expectBool = false + o.setEnableFollowerHandle(expectBool) + re.Equal(expectBool, o.getEnableFollowerHandle()) } From 92ed72b2b5fd5bf4caba400dea35380d1b7414c3 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 29 Nov 2023 09:33:41 +0800 Subject: [PATCH 2/3] address comment Signed-off-by: Cabinfever_B --- client/client.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/client.go b/client/client.go index 4eb064dd269..70de1322488 100644 --- a/client/client.go +++ b/client/client.go @@ -212,8 +212,8 @@ func WithBuckets() GetRegionOption { return func(op *GetRegionOp) { op.needBuckets = true } } -// AllowFollowerHandle means that client can send request to follower and let it handle this request. -func AllowFollowerHandle() GetRegionOption { +// WithAllowFollowerHandle means that client can send request to follower and let it handle this request. +func WithAllowFollowerHandle() GetRegionOption { return func(op *GetRegionOp) { op.allowFollowerHandle = true } } From 79b0ba0437f90124deef1d0916afc30862310d13 Mon Sep 17 00:00:00 2001 From: Cabinfever_B Date: Wed, 29 Nov 2023 15:12:26 +0800 Subject: [PATCH 3/3] address comment Signed-off-by: Cabinfever_B --- client/option.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/option.go b/client/option.go index 1b0b8413f82..2a6c285cfb7 100644 --- a/client/option.go +++ b/client/option.go @@ -100,7 +100,7 @@ func (o *option) setEnableFollowerHandle(enable bool) { } } -// getMaxTSOBatchWaitIntervalgets the Follower Handle enable option. +// getMaxTSOBatchWaitInterval gets the Follower Handle enable option. func (o *option) getEnableFollowerHandle() bool { return o.dynamicOptions[EnableFollowerHandle].Load().(bool) }