Skip to content

Commit

Permalink
server: partially fix grpc status (#4798)
Browse files Browse the repository at this point in the history
ref #4797

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
rleungx and ti-chi-bot authored Apr 9, 2022
1 parent bd1a18b commit e3fd147
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 22 deletions.
46 changes: 24 additions & 22 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,33 @@ import (
"google.golang.org/grpc/status"
)

// GrpcServer wraps Server to provide grpc service.
type GrpcServer struct {
*Server
}
const (
heartbeatSendTimeout = 5 * time.Second
// store config
storeReadyWaitTime = 5 * time.Second

// tso
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second

// global config
globalConfigPath = "/global/config/"
)

// gRPC errors
var (
// ErrNotLeader is returned when current server is not the leader and not possible to process request.
// TODO: work as proxy.
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
storeReadyWaitTime = 5 * time.Second
ErrNotLeader = status.Errorf(codes.Unavailable, "not leader")
ErrNotStarted = status.Errorf(codes.Unavailable, "server not started")
ErrSendHeartbeatTimeout = status.Errorf(codes.DeadlineExceeded, "send heartbeat timeout")
)

// GrpcServer wraps Server to provide grpc service.
type GrpcServer struct {
*Server
}

type forwardFn func(ctx context.Context, client *grpc.ClientConn) (interface{}, error)

func (s *GrpcServer) unaryMiddleware(ctx context.Context, header *pdpb.RequestHeader, fn forwardFn) (rsp interface{}, err error) {
Expand Down Expand Up @@ -123,11 +136,6 @@ func (s *GrpcServer) GetMembers(context.Context, *pdpb.GetMembersRequest) (*pdpb
}, nil
}

const (
maxMergeTSORequests = 10000
defaultTSOProxyTimeout = 3 * time.Second
)

// Tso implements gRPC PDServer.
func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error {
var (
Expand Down Expand Up @@ -636,10 +644,6 @@ func (s *GrpcServer) StoreHeartbeat(ctx context.Context, request *pdpb.StoreHear
return resp, nil
}

const heartbeatSendTimeout = 5 * time.Second

var errSendHeartbeatTimeout = errors.New("send heartbeat timeout")

// bucketHeartbeatServer wraps PD_ReportBucketsServer to ensure when any error
// occurs on SendAndClose() or Recv(), both endpoints will be closed.
type bucketHeartbeatServer struct {
Expand All @@ -663,7 +667,7 @@ func (b *bucketHeartbeatServer) Send(bucket *pdpb.ReportBucketsResponse) error {
return err
case <-time.After(heartbeatSendTimeout):
atomic.StoreInt32(&b.closed, 1)
return errors.WithStack(errSendHeartbeatTimeout)
return ErrSendHeartbeatTimeout
}
}

Expand Down Expand Up @@ -700,7 +704,7 @@ func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error {
return errors.WithStack(err)
case <-time.After(heartbeatSendTimeout):
atomic.StoreInt32(&s.closed, 1)
return errors.WithStack(errSendHeartbeatTimeout)
return ErrSendHeartbeatTimeout
}
}

Expand Down Expand Up @@ -1489,7 +1493,7 @@ func (s *GrpcServer) GetOperator(ctx context.Context, request *pdpb.GetOperatorR
// TODO: Call it in gRPC interceptor.
func (s *GrpcServer) validateRequest(header *pdpb.RequestHeader) error {
if s.IsClosed() || !s.member.IsLeader() {
return errors.WithStack(ErrNotLeader)
return ErrNotLeader
}
if header.GetClusterId() != s.clusterID {
return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, header.GetClusterId())
Expand Down Expand Up @@ -1670,7 +1674,7 @@ func (s *GrpcServer) GetDCLocationInfo(ctx context.Context, request *pdpb.GetDCL
// the gRPC communication between PD servers internally.
func (s *GrpcServer) validateInternalRequest(header *pdpb.RequestHeader, onlyAllowLeader bool) error {
if s.IsClosed() {
return errors.WithStack(ErrNotStarted)
return ErrNotStarted
}
// If onlyAllowLeader is true, check whether the sender is PD leader.
if onlyAllowLeader {
Expand Down Expand Up @@ -1792,8 +1796,6 @@ func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan
<-done
}

const globalConfigPath = "/global/config/"

// StoreGlobalConfig store global config into etcd by transaction
func (s *GrpcServer) StoreGlobalConfig(_ context.Context, request *pdpb.StoreGlobalConfigRequest) (*pdpb.StoreGlobalConfigResponse, error) {
ops := make([]clientv3.Op, len(request.Changes))
Expand Down
21 changes: 21 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ import (
"github.com/tikv/pd/server/schedulers"
"github.com/tikv/pd/server/storage"
"github.com/tikv/pd/tests"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func Test(t *testing.T) {
Expand Down Expand Up @@ -424,6 +426,25 @@ func (s *clusterTestSuite) TestGetPDMembers(c *C) {
c.Assert(resp.GetMembers(), Not(HasLen), 0)
}

func (s *clusterTestSuite) TestNotLeader(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 2)
defer tc.Destroy()
c.Assert(err, IsNil)
c.Assert(tc.RunInitialServers(), IsNil)

tc.WaitLeader()
followerServer := tc.GetServer(tc.GetFollower())
grpcPDClient := testutil.MustNewGrpcClient(c, followerServer.GetAddr())
clusterID := followerServer.GetClusterID()
req := &pdpb.AllocIDRequest{Header: testutil.NewRequestHeader(clusterID)}
resp, err := grpcPDClient.AllocID(context.Background(), req)
c.Assert(resp, IsNil)
grpcStatus, ok := status.FromError(err)
c.Assert(ok, IsTrue)
c.Assert(grpcStatus.Code(), Equals, codes.Unavailable)
c.Assert(grpcStatus.Message(), Equals, "not leader")
}

func (s *clusterTestSuite) TestStoreVersionChange(c *C) {
tc, err := tests.NewTestCluster(s.ctx, 1)
defer tc.Destroy()
Expand Down

0 comments on commit e3fd147

Please sign in to comment.