Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Nov 27, 2024
1 parent 741f0e8 commit 861c9fc
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 23 deletions.
57 changes: 38 additions & 19 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,8 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetMembers(ctx, req)
bo := retry.FromContext(ctx)
resp, err := protoClient.GetMembers(ctx, req, grpcutil.WithBackoffer(bo))
if err = c.respForErr(metrics.CmdFailedDurationGetAllMembers, start, err, resp.GetHeader()); err != nil {
return nil, err
}
Expand Down Expand Up @@ -613,9 +614,10 @@ func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, e
return 0, 0, errs.ErrClientGetProtoClient
}

bo := retry.FromContext(ctx)
resp, err := protoClient.GetMinTS(ctx, &pdpb.GetMinTSRequest{
Header: c.requestHeader(),
})
}, grpcutil.WithBackoffer(bo))
if err != nil {
if strings.Contains(err.Error(), "Unimplemented") {
// If the method is not supported, we fallback to GetTS.
Expand Down Expand Up @@ -758,7 +760,8 @@ func (c *client) GetPrevRegion(ctx context.Context, key []byte, opts ...opt.GetR
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req)
bo := retry.FromContext(ctx)
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetPrevRegion(cctx, req, grpcutil.WithBackoffer(bo))
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand Down Expand Up @@ -798,7 +801,8 @@ func (c *client) GetRegionByID(ctx context.Context, regionID uint64, opts ...opt
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req)
bo := retry.FromContext(ctx)
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).GetRegionByID(cctx, req, grpcutil.WithBackoffer(bo))
if serviceClient.NeedRetry(resp.GetHeader().GetError(), err) {
protoClient, cctx := c.getClientAndContext(ctx)
if protoClient == nil {
Expand Down Expand Up @@ -843,8 +847,9 @@ func (c *client) ScanRegions(ctx context.Context, key, endKey []byte, limit int,
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
//nolint:staticcheck
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req)
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).ScanRegions(cctx, req, grpcutil.WithBackoffer(bo))
failpoint.Inject("responseNil", func() {
resp = nil
})
Expand Down Expand Up @@ -899,7 +904,8 @@ func (c *client) BatchScanRegions(ctx context.Context, ranges []KeyRange, limit
if serviceClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).BatchScanRegions(cctx, req)
bo := retry.FromContext(ctx)
resp, err := pdpb.NewPDClient(serviceClient.GetClientConn()).BatchScanRegions(cctx, req, grpcutil.WithBackoffer(bo))
failpoint.Inject("responseNil", func() {
resp = nil
})
Expand Down Expand Up @@ -983,7 +989,8 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetStore(ctx, req)
bo := retry.FromContext(ctx)
resp, err := protoClient.GetStore(ctx, req, grpcutil.WithBackoffer(bo))

if err = c.respForErr(metrics.CmdFailedDurationGetStore, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand Down Expand Up @@ -1027,7 +1034,8 @@ func (c *client) GetAllStores(ctx context.Context, opts ...opt.GetStoreOption) (
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.GetAllStores(ctx, req)
bo := retry.FromContext(ctx)
resp, err := protoClient.GetAllStores(ctx, req, grpcutil.WithBackoffer(bo))

if err = c.respForErr(metrics.CmdFailedDurationGetAllStores, start, err, resp.GetHeader()); err != nil {
return nil, err
Expand All @@ -1054,7 +1062,8 @@ func (c *client) UpdateGCSafePoint(ctx context.Context, safePoint uint64) (uint6
if protoClient == nil {
return 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.UpdateGCSafePoint(ctx, req)
bo := retry.FromContext(ctx)
resp, err := protoClient.UpdateGCSafePoint(ctx, req, grpcutil.WithBackoffer(bo))

if err = c.respForErr(metrics.CmdFailedDurationUpdateGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
Expand Down Expand Up @@ -1087,7 +1096,8 @@ func (c *client) UpdateServiceGCSafePoint(ctx context.Context, serviceID string,
if protoClient == nil {
return 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.UpdateServiceGCSafePoint(ctx, req)
bo := retry.FromContext(ctx)
resp, err := protoClient.UpdateServiceGCSafePoint(ctx, req, grpcutil.WithBackoffer(bo))

if err = c.respForErr(metrics.CmdFailedDurationUpdateServiceGCSafePoint, start, err, resp.GetHeader()); err != nil {
return 0, err
Expand Down Expand Up @@ -1119,7 +1129,8 @@ func (c *client) scatterRegionsWithGroup(ctx context.Context, regionID uint64, g
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
resp, err := protoClient.ScatterRegion(ctx, req)
bo := retry.FromContext(ctx)
resp, err := protoClient.ScatterRegion(ctx, req, grpcutil.WithBackoffer(bo))
if err != nil {
return err
}
Expand Down Expand Up @@ -1163,7 +1174,8 @@ func (c *client) SplitAndScatterRegions(ctx context.Context, splitKeys [][]byte,
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
return protoClient.SplitAndScatterRegions(ctx, req)
bo := retry.FromContext(ctx)
return protoClient.SplitAndScatterRegions(ctx, req, grpcutil.WithBackoffer(bo))
}

// GetOperator implements the RPCClient interface.
Expand All @@ -1185,7 +1197,8 @@ func (c *client) GetOperator(ctx context.Context, regionID uint64) (*pdpb.GetOpe
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
return protoClient.GetOperator(ctx, req)
bo := retry.FromContext(ctx)
return protoClient.GetOperator(ctx, req, grpcutil.WithBackoffer(bo))
}

// SplitRegions split regions by given split keys
Expand All @@ -1211,7 +1224,8 @@ func (c *client) SplitRegions(ctx context.Context, splitKeys [][]byte, opts ...o
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
return protoClient.SplitRegions(ctx, req)
bo := retry.FromContext(ctx)
return protoClient.SplitRegions(ctx, req, grpcutil.WithBackoffer(bo))
}

func (c *client) requestHeader() *pdpb.RequestHeader {
Expand Down Expand Up @@ -1243,7 +1257,8 @@ func (c *client) scatterRegionsWithOptions(ctx context.Context, regionsID []uint
if protoClient == nil {
return nil, errs.ErrClientGetProtoClient
}
resp, err := protoClient.ScatterRegion(ctx, req)
bo := retry.FromContext(ctx)
resp, err := protoClient.ScatterRegion(ctx, req, grpcutil.WithBackoffer(bo))

if err != nil {
return nil, err
Expand All @@ -1262,7 +1277,8 @@ func (c *client) LoadGlobalConfig(ctx context.Context, names []string, configPat
if protoClient == nil {
return nil, 0, errs.ErrClientGetProtoClient
}
resp, err := protoClient.LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names, ConfigPath: configPath})
bo := retry.FromContext(ctx)
resp, err := protoClient.LoadGlobalConfig(ctx, &pdpb.LoadGlobalConfigRequest{Names: names, ConfigPath: configPath}, grpcutil.WithBackoffer(bo))
if err != nil {
return nil, 0, err
}
Expand Down Expand Up @@ -1294,7 +1310,8 @@ func (c *client) StoreGlobalConfig(ctx context.Context, configPath string, items
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
_, err := protoClient.StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath})
bo := retry.FromContext(ctx)
_, err := protoClient.StoreGlobalConfig(ctx, &pdpb.StoreGlobalConfigRequest{Changes: resArr, ConfigPath: configPath}, grpcutil.WithBackoffer(bo))
if err != nil {
return err
}
Expand Down Expand Up @@ -1361,9 +1378,10 @@ func (c *client) GetExternalTimestamp(ctx context.Context) (uint64, error) {
if protoClient == nil {
return 0, errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := protoClient.GetExternalTimestamp(ctx, &pdpb.GetExternalTimestampRequest{
Header: c.requestHeader(),
})
}, grpcutil.WithBackoffer(bo))
if err != nil {
return 0, err
}
Expand All @@ -1382,10 +1400,11 @@ func (c *client) SetExternalTimestamp(ctx context.Context, timestamp uint64) err
if protoClient == nil {
return errs.ErrClientGetProtoClient
}
bo := retry.FromContext(ctx)
resp, err := protoClient.SetExternalTimestamp(ctx, &pdpb.SetExternalTimestampRequest{
Header: c.requestHeader(),
Timestamp: timestamp,
})
}, grpcutil.WithBackoffer(bo))
if err != nil {
return err
}
Expand Down
7 changes: 5 additions & 2 deletions client/meta_storage_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/tikv/pd/client/errs"
"github.com/tikv/pd/client/metrics"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/retry"
"github.com/tikv/pd/client/pkg/utils/grpcutil"
)

Expand Down Expand Up @@ -78,7 +79,8 @@ func (c *innerClient) Put(ctx context.Context, key, value []byte, opts ...opt.Me
cancel()
return nil, errs.ErrClientGetMetaStorageClient
}
resp, err := cli.Put(ctx, req)
bo := retry.FromContext(ctx)
resp, err := cli.Put(ctx, req, grpcutil.WithBackoffer(bo))
cancel()

if err = c.respForMetaStorageErr(metrics.CmdFailedDurationPut, start, err, resp.GetHeader()); err != nil {
Expand Down Expand Up @@ -117,7 +119,8 @@ func (c *innerClient) Get(ctx context.Context, key []byte, opts ...opt.MetaStora
cancel()
return nil, errs.ErrClientGetMetaStorageClient
}
resp, err := cli.Get(ctx, req)
bo := retry.FromContext(ctx)
resp, err := cli.Get(ctx, req, grpcutil.WithBackoffer(bo))
cancel()

if err = c.respForMetaStorageErr(metrics.CmdFailedDurationGet, start, err, resp.GetHeader()); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions client/pkg/utils/grpcutil/grpcutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type retryCallOption struct {
bo *retry.Backoffer
}

// WithBackoffer returns a CallOption that adds a backoffer to the call.
func WithBackoffer(bo *retry.Backoffer) grpc.CallOption {
return &retryCallOption{bo: bo}
}
Expand All @@ -61,9 +62,9 @@ func getBackofferFromCallOptions(opts []grpc.CallOption) *retry.Backoffer {
return nil
}

// Add retry interceptor
// UnaryBackofferInterceptor is a gRPC interceptor that adds a backoffer to the call.
func UnaryBackofferInterceptor() grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
bo := getBackofferFromCallOptions(opts)
if bo == nil {
return invoker(ctx, method, req, reply, cc, opts...)
Expand Down

0 comments on commit 861c9fc

Please sign in to comment.