Skip to content

Commit

Permalink
put the unknown error into the header instead of directly returning a…
Browse files Browse the repository at this point in the history
… gRPC error.

Signed-off-by: husharp <jinhao.hu@pingcap.com>
  • Loading branch information
HuSharp committed Jul 18, 2022
1 parent f04b89e commit bcaf870
Show file tree
Hide file tree
Showing 15 changed files with 215 additions and 90 deletions.
7 changes: 4 additions & 3 deletions client/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,10 @@ func (c *baseClient) getMembers(ctx context.Context, url string, timeout time.Du
if err != nil {
return nil, err
}
members, err := pdpb.NewPDClient(cc).GetMembers(ctx, &pdpb.GetMembersRequest{})
if err != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s", err, cc.Target(), cc.GetState().String())
members, _ := pdpb.NewPDClient(cc).GetMembers(ctx, &pdpb.GetMembersRequest{})
if members != nil && members.Header.GetError() != nil {
attachErr := errors.Errorf("error:%s target:%s status:%s",
members.Header.GetError().String(), cc.Target(), cc.GetState().String())
return nil, errs.ErrClientGetMember.Wrap(attachErr).GenWithStackByCause()
}
return members, nil
Expand Down
11 changes: 7 additions & 4 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,12 +583,12 @@ func (c *client) GetAllMembers(ctx context.Context) ([]*pdpb.Member, error) {
ctx, cancel := context.WithTimeout(ctx, c.option.timeout)
req := &pdpb.GetMembersRequest{Header: c.requestHeader()}
ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr())
resp, err := c.getClient().GetMembers(ctx, req)
resp, _ := c.getClient().GetMembers(ctx, req)
cancel()
if err != nil {
if resp.Header.GetError() != nil {
cmdFailDurationGetAllMembers.Observe(time.Since(start).Seconds())
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
log.Error("[pd] can't get members.", errs.ZapError(errors.New(resp.Header.GetError().String())))
return nil, errors.WithStack(errors.New(resp.Header.GetError().String()))
}
return resp.GetMembers(), nil
}
Expand Down Expand Up @@ -1557,6 +1557,9 @@ func (c *client) GetStore(ctx context.Context, storeID uint64) (*metapb.Store, e
c.ScheduleCheckLeader()
return nil, errors.WithStack(err)
}
if resp.Header.GetError() != nil {
return nil, errors.Errorf("get store failed: %s", resp.Header.GetError().String())
}
return handleStoreResponse(resp)
}

Expand Down
10 changes: 6 additions & 4 deletions server/api/label_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func (suite *strictlyLabelsStoreTestSuite) TestStoreMatch() {
}

for _, testCase := range testCases {
_, err := suite.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
resp, err := suite.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()},
Store: &metapb.Store{
Id: testCase.store.Id,
Expand All @@ -289,8 +289,9 @@ func (suite *strictlyLabelsStoreTestSuite) TestStoreMatch() {
})
if testCase.valid {
suite.NoError(err)
suite.Nil(resp.Header.GetError())
} else {
suite.Contains(err.Error(), testCase.expectError)
suite.Contains(resp.Header.GetError().String(), testCase.expectError)
}
}

Expand All @@ -301,7 +302,7 @@ func (suite *strictlyLabelsStoreTestSuite) TestStoreMatch() {
[]byte(`{"enable-placement-rules":"true"}`),
tu.StatusOK(suite.Require())))
for _, testCase := range testCases {
_, err := suite.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
resp, err := suite.grpcSvr.PutStore(context.Background(), &pdpb.PutStoreRequest{
Header: &pdpb.RequestHeader{ClusterId: suite.svr.ClusterID()},
Store: &metapb.Store{
Id: testCase.store.Id,
Expand All @@ -313,8 +314,9 @@ func (suite *strictlyLabelsStoreTestSuite) TestStoreMatch() {
})
if testCase.valid {
suite.NoError(err)
suite.Nil(resp.Header.GetError())
} else {
suite.Contains(err.Error(), testCase.expectError)
suite.Contains(resp.Header.GetError().String(), testCase.expectError)
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions server/api/member.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func (h *memberHandler) GetMembers(w http.ResponseWriter, r *http.Request) {
func getMembers(svr *server.Server) (*pdpb.GetMembersResponse, error) {
req := &pdpb.GetMembersRequest{Header: &pdpb.RequestHeader{ClusterId: svr.ClusterID()}}
grpcServer := &server.GrpcServer{Server: svr}
members, err := grpcServer.GetMembers(context.Background(), req)
if err != nil {
return nil, errors.WithStack(err)
members, _ := grpcServer.GetMembers(context.Background(), req)
if members.Header.GetError() != nil {
return nil, errors.WithStack(errors.New(members.Header.GetError().String()))
}
dclocationDistribution, err := svr.GetTSOAllocatorManager().GetClusterDCLocationsFromEtcd()
if err != nil {
Expand Down
122 changes: 91 additions & 31 deletions server/grpc_service.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,22 @@ func (s *GrpcServer) unaryMiddleware(ctx context.Context, header *pdpb.RequestHe
return nil, nil
}

func (s *GrpcServer) wrapErrorToHeader(errorType pdpb.ErrorType, message string) *pdpb.ResponseHeader {
return s.errorHeader(&pdpb.Error{
Type: errorType,
Message: message,
})
}

// GetMembers implements gRPC PDServer.
func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb.GetMembersResponse, error) {
// Here we purposely do not check the cluster ID because the client does not know the correct cluster ID
// at startup and needs to get the cluster ID with the first request (i.e. GetMembers).
members, err := s.Server.GetMembers()
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.GetMembersResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

var etcdLeader, pdLeader *pdpb.Member
Expand All @@ -113,7 +122,9 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb
tsoAllocatorManager := s.GetTSOAllocatorManager()
tsoAllocatorLeaders, err := tsoAllocatorManager.GetLocalAllocatorLeaders()
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.GetMembersResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

leader := s.member.GetLeader()
Expand Down Expand Up @@ -399,7 +410,9 @@ func (s *GrpcServer) Bootstrap(ctx context.Context, request *pdpb.BootstrapReque

res, err := s.bootstrapCluster(request)
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.BootstrapResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

res.Header = s.header()
Expand Down Expand Up @@ -438,7 +451,9 @@ func (s *GrpcServer) AllocID(ctx context.Context, request *pdpb.AllocIDRequest)
// We can use an allocator for all types ID allocation.
id, err := s.idAllocator.Alloc()
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.AllocIDResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

return &pdpb.AllocIDResponse{
Expand Down Expand Up @@ -466,7 +481,10 @@ func (s *GrpcServer) GetStore(ctx context.Context, request *pdpb.GetStoreRequest
storeID := request.GetStoreId()
store := rc.GetStore(storeID)
if store == nil {
return nil, status.Errorf(codes.Unknown, "invalid store ID %d, not found", storeID)
return &pdpb.GetStoreResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
fmt.Sprintf("invalid store ID %d, not found", storeID)),
}, nil
}
return &pdpb.GetStoreResponse{
Header: s.header(),
Expand Down Expand Up @@ -515,11 +533,16 @@ func (s *GrpcServer) PutStore(ctx context.Context, request *pdpb.PutStoreRequest

// NOTE: can be removed when placement rules feature is enabled by default.
if !s.GetConfig().Replication.EnablePlacementRules && core.IsStoreContainLabel(store, core.EngineKey, core.EngineTiFlash) {
return nil, status.Errorf(codes.FailedPrecondition, "placement rules is disabled")
return &pdpb.PutStoreResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
"placement rules is disabled"),
}, nil
}

if err := rc.PutStore(store); err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.PutStoreResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

log.Info("put store ok", zap.Stringer("store", store))
Expand Down Expand Up @@ -592,7 +615,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
storeID := request.GetStats().GetStoreId()
store := rc.GetStore(storeID)
if store == nil {
return nil, errors.Errorf("store %v not found", storeID)
return &pdpb.StoreHeartbeatResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
fmt.Sprintf("store %v not found", storeID)),
}, nil
}

// Bypass stats handling if the store report for unsafe recover is not empty.
Expand All @@ -603,7 +629,10 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear

err := rc.HandleStoreHeartbeat(request.GetStats())
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.StoreHeartbeatResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
err.Error()),
}, nil
}

s.handleDamagedStore(request.GetStats())
Expand Down Expand Up @@ -1063,18 +1092,18 @@ func (s *GrpcServer) AskSplit(ctx context.Context, request *pdpb.AskSplitRequest
return &pdpb.AskSplitResponse{Header: s.notBootstrappedHeader()}, nil
}
if request.GetRegion() == nil {
return nil, errors.New("missing region for split")
return &pdpb.AskSplitResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_REGION_NOT_FOUND,
"missing region for split"),
}, nil
}
req := &pdpb.AskSplitRequest{
Region: request.Region,
}
split, err := rc.HandleAskSplit(req)
if err != nil {
return &pdpb.AskSplitResponse{
Header: s.errorHeader(&pdpb.Error{
Type: pdpb.ErrorType_UNKNOWN,
Message: err.Error(),
}),
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

Expand Down Expand Up @@ -1105,7 +1134,10 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp
return &pdpb.AskBatchSplitResponse{Header: s.incompatibleVersion("batch_split")}, nil
}
if request.GetRegion() == nil {
return nil, errors.New("missing region for split")
return &pdpb.AskBatchSplitResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_REGION_NOT_FOUND,
"missing region for split"),
}, nil
}
req := &pdpb.AskBatchSplitRequest{
Region: request.Region,
Expand All @@ -1114,10 +1146,7 @@ func (s *GrpcServer) AskBatchSplit(ctx context.Context, request *pdpb.AskBatchSp
split, err := rc.HandleAskBatchSplit(req)
if err != nil {
return &pdpb.AskBatchSplitResponse{
Header: s.errorHeader(&pdpb.Error{
Type: pdpb.ErrorType_UNKNOWN,
Message: err.Error(),
}),
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

Expand All @@ -1144,7 +1173,9 @@ func (s *GrpcServer) ReportSplit(ctx context.Context, request *pdpb.ReportSplitR
}
_, err := rc.HandleReportSplit(request)
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.ReportSplitResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}

return &pdpb.ReportSplitResponse{
Expand All @@ -1170,7 +1201,10 @@ func (s *GrpcServer) ReportBatchSplit(ctx context.Context, request *pdpb.ReportB

_, err := rc.HandleBatchReportSplit(request)
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.ReportBatchSplitResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
err.Error()),
}, nil
}

return &pdpb.ReportBatchSplitResponse{
Expand Down Expand Up @@ -1216,7 +1250,10 @@ func (s *GrpcServer) PutClusterConfig(ctx context.Context, request *pdpb.PutClus
}
conf := request.GetCluster()
if err := rc.PutMetaCluster(conf); err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.PutClusterConfigResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
err.Error()),
}, nil
}

log.Info("put cluster config ok", zap.Reflect("config", conf))
Expand Down Expand Up @@ -1258,7 +1295,10 @@ func (s *GrpcServer) ScatterRegion(ctx context.Context, request *pdpb.ScatterReg
if region == nil {
if request.GetRegion() == nil {
//nolint
return nil, errors.Errorf("region %d not found", request.GetRegionId())
return &pdpb.ScatterRegionResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_REGION_NOT_FOUND,
"region %d not found"),
}, nil
}
region = core.NewRegionInfo(request.GetRegion(), request.GetLeader())
}
Expand Down Expand Up @@ -1487,12 +1527,17 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest
tsoAllocatorManager := s.GetTSOAllocatorManager()
// There is no dc-location found in this server, return err.
if tsoAllocatorManager.GetClusterDCLocationsNumber() == 0 {
return nil, status.Errorf(codes.Unknown, "empty cluster dc-location found, checker may not work properly")
return &pdpb.SyncMaxTSResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
"empty cluster dc-location found, checker may not work properly"),
}, nil
}
// Get all Local TSO Allocator leaders
allocatorLeaders, err := tsoAllocatorManager.GetHoldingLocalAllocatorLeaders()
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.SyncMaxTSResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}
if !request.GetSkipCheck() {
var maxLocalTS *pdpb.Timestamp
Expand All @@ -1505,7 +1550,9 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest
}
currentLocalTSO, err := allocator.GetCurrentTSO()
if err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.SyncMaxTSResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}
if tsoutil.CompareTimestamp(currentLocalTSO, maxLocalTS) > 0 {
maxLocalTS = currentLocalTSO
Expand All @@ -1522,10 +1569,16 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest
})

if maxLocalTS == nil {
return nil, status.Errorf(codes.Unknown, "local tso allocator leaders have changed during the sync, should retry")
return &pdpb.SyncMaxTSResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
"local tso allocator leaders have changed during the sync, should retry"),
}, nil
}
if request.GetMaxTs() == nil {
return nil, status.Errorf(codes.Unknown, "empty maxTS in the request, should retry")
return &pdpb.SyncMaxTSResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
"empty maxTS in the request, should retry"),
}, nil
}
// Found a bigger or equal maxLocalTS, return it directly.
cmpResult := tsoutil.CompareTimestamp(maxLocalTS, request.GetMaxTs())
Expand All @@ -1551,7 +1604,9 @@ func (s *GrpcServer) SyncMaxTS(_ context.Context, request *pdpb.SyncMaxTSRequest
continue
}
if err := allocator.WriteTSO(request.GetMaxTs()); err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.SyncMaxTSResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}
syncedDCs = append(syncedDCs, allocator.GetDCLocation())
}
Expand Down Expand Up @@ -1649,7 +1704,10 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL
info, ok := am.GetDCLocationInfo(request.GetDcLocation())
if !ok {
am.ClusterDCLocationChecker()
return nil, status.Errorf(codes.Unknown, "dc-location %s is not found", request.GetDcLocation())
return &pdpb.GetDCLocationInfoResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN,
fmt.Sprintf("dc-location %s is not found", request.GetDcLocation())),
}, nil
}
resp := &pdpb.GetDCLocationInfoResponse{
Header: s.header(),
Expand All @@ -1664,7 +1722,9 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL
// when it becomes the Local TSO Allocator leader.
// Please take a look at https://github.com/tikv/pd/issues/3260 for more details.
if resp.MaxTs, err = am.GetMaxLocalTSO(ctx); err != nil {
return nil, status.Errorf(codes.Unknown, err.Error())
return &pdpb.GetDCLocationInfoResponse{
Header: s.wrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}
return resp, nil
}
Expand Down
Loading

0 comments on commit bcaf870

Please sign in to comment.