From 98571f52f6f1d0694ee0ab8e6635616cd1bcca5e Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 26 Nov 2024 11:21:50 +0800 Subject: [PATCH] use ResourceExhausted Signed-off-by: Ryan Leung --- server/grpc_service.go | 151 +++++++++++------------------------------ 1 file changed, 40 insertions(+), 111 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index d5fd8ae3e32..33198086240 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -301,9 +301,7 @@ func (s *GrpcServer) GetMinTS( if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetMinTSResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -458,9 +456,7 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetMembersResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } // Here we purposely do not check the cluster ID because the client does not know the correct cluster ID @@ -522,7 +518,7 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return err + return status.Error(codes.ResourceExhausted, err.Error()) } } if s.IsServiceIndependent(constant.TSOServiceName) { @@ -640,9 +636,7 @@ func (s *GrpcServer) Bootstrap(ctx context.Context, request *pdpb.BootstrapReque if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.BootstrapResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -684,9 +678,7 @@ func (s *GrpcServer) IsBootstrapped(ctx context.Context, request *pdpb.IsBootstr if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.IsBootstrappedResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -713,9 +705,7 @@ func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest) if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.AllocIDResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -749,9 +739,7 @@ func (s *GrpcServer) IsSnapshotRecovering(ctx context.Context, _ *pdpb.IsSnapsho if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.IsSnapshotRecoveringResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } // recovering mark is stored in etcd directly, there's no need to forward. @@ -775,9 +763,7 @@ func (s *GrpcServer) GetStore(ctx context.Context, request *pdpb.GetStoreRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetStoreResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -831,9 +817,7 @@ func (s *GrpcServer) PutStore(ctx context.Context, request *pdpb.PutStoreRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.PutStoreResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -888,9 +872,7 @@ func (s *GrpcServer) GetAllStores(ctx context.Context, request *pdpb.GetAllStore if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetAllStoresResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -933,9 +915,7 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.StoreHeartbeatResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, errs.ErrRateLimitExceeded.FastGenByArgs().Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1114,7 +1094,7 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error { if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return err + return status.Error(codes.ResourceExhausted, err.Error()) } } for { @@ -1230,7 +1210,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return err + return status.Error(codes.ResourceExhausted, err.Error()) } } for { @@ -1435,9 +1415,7 @@ func (s *GrpcServer) GetRegion(ctx context.Context, request *pdpb.GetRegionReque if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetRegionResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1499,9 +1477,7 @@ func (s *GrpcServer) GetPrevRegion(ctx context.Context, request *pdpb.GetRegionR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetRegionResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1558,9 +1534,7 @@ func (s *GrpcServer) GetRegionByID(ctx context.Context, request *pdpb.GetRegionB if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetRegionResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1620,9 +1594,7 @@ func (s *GrpcServer) ScanRegions(ctx context.Context, request *pdpb.ScanRegionsR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ScanRegionsResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1678,9 +1650,7 @@ func (s *GrpcServer) BatchScanRegions(ctx context.Context, request *pdpb.BatchSc if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.BatchScanRegionsResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1770,9 +1740,7 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.AskSplitResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1816,9 +1784,7 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.AskBatchSplitResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } @@ -1892,9 +1858,7 @@ func (s *GrpcServer) ReportSplit(ctx context.Context, request *pdpb.ReportSplitR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ReportSplitResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1930,9 +1894,7 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ReportBatchSplitResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -1969,9 +1931,7 @@ func (s *GrpcServer) GetClusterConfig(ctx context.Context, request *pdpb.GetClus if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetClusterConfigResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2001,9 +1961,7 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.PutClusterConfigResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2042,9 +2000,7 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ScatterRegionResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } @@ -2156,9 +2112,7 @@ func (s *GrpcServer) GetGCSafePoint(ctx context.Context, request *pdpb.GetGCSafe if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetGCSafePointResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2197,7 +2151,7 @@ func (s *GrpcServer) SyncRegions(stream pdpb.PD_SyncRegionsServer) error { if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return err + return status.Error(codes.ResourceExhausted, err.Error()) } } ctx := s.cluster.Context() @@ -2215,9 +2169,7 @@ func (s *GrpcServer) UpdateGCSafePoint(ctx context.Context, request *pdpb.Update if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.UpdateGCSafePointResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2264,9 +2216,7 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.UpdateServiceGCSafePointResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2320,9 +2270,7 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetOperatorResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } @@ -2535,9 +2483,7 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.SyncMaxTSResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } tsoAllocatorManager := s.GetTSOAllocatorManager() @@ -2640,9 +2586,7 @@ func (s *GrpcServer) SplitRegions(ctx context.Context, request *pdpb.SplitRegion if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.SplitRegionsResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } @@ -2706,9 +2650,7 @@ func (s *GrpcServer) SplitAndScatterRegions(ctx context.Context, request *pdpb.S if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.SplitAndScatterRegionsResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -2772,9 +2714,7 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetDCLocationInfoResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } am := s.GetTSOAllocatorManager() @@ -2838,12 +2778,7 @@ func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlo if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.StoreGlobalConfigResponse{ - Error: &pdpb.Error{ - Type: pdpb.ErrorType_UNKNOWN, - Message: err.Error(), - }, - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } configPath := request.GetConfigPath() @@ -2889,7 +2824,7 @@ func (s *GrpcServer) LoadGlobalConfig(ctx context.Context, request *pdpb.LoadGlo if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return nil, err + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } configPath := request.GetConfigPath() @@ -2937,7 +2872,7 @@ func (s *GrpcServer) WatchGlobalConfig(req *pdpb.WatchGlobalConfigRequest, serve if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return err + return status.Error(codes.ResourceExhausted, err.Error()) } } ctx, cancel := context.WithCancel(server.Context()) @@ -3034,9 +2969,7 @@ func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.Repo if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.ReportMinResolvedTsResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -3074,9 +3007,7 @@ func (s *GrpcServer) SetExternalTimestamp(ctx context.Context, request *pdpb.Set if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.SetExternalTimestampResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) { @@ -3112,9 +3043,7 @@ func (s *GrpcServer) GetExternalTimestamp(ctx context.Context, request *pdpb.Get if done, err := limiter.Allow(fName); err == nil { defer done() } else { - return &pdpb.GetExternalTimestampResponse{ - Header: wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil + return nil, status.Error(codes.ResourceExhausted, err.Error()) } } fn := func(ctx context.Context, client *grpc.ClientConn) (any, error) {