From e949ddf17d9e9d964e416982dd9679e12b737d31 Mon Sep 17 00:00:00 2001 From: buffer <1045931706@qq.com> Date: Fri, 29 Jul 2022 04:21:11 +0800 Subject: [PATCH] grpc: fix the wrong error handler. (#5374) close tikv/pd#5373 Signed-off-by: bufferflies <1045931706@qq.com> Signed-off-by: nolouch Co-authored-by: nolouch --- server/grpc_service.go | 13 +++++++++++++ tests/client/client_test.go | 7 +++++++ 2 files changed, 20 insertions(+) diff --git a/server/grpc_service.go b/server/grpc_service.go index 29aa5e63c25..377a5e2f629 100755 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -745,10 +745,20 @@ func (s *GrpcServer) ReportBuckets(stream pdpb.PD_ReportBucketsServer) error { }() for { request, err := server.Recv() + failpoint.Inject("grpcClientClosed", func() { + err = status.Error(codes.Canceled, "grpc client closed") + request = nil + }) if err == io.EOF { return nil } + if err != nil { + return errors.WithStack(err) + } forwardedHost := getForwardedHost(stream.Context()) + failpoint.Inject("grpcClientClosed", func() { + forwardedHost = s.GetMember().Member().GetClientUrls()[0] + }) if !s.isLocalRequest(forwardedHost) { if forwardStream == nil || lastForwardedHost != forwardedHost { if cancel != nil { @@ -1774,6 +1784,9 @@ func getForwardedHost(ctx context.Context) string { } func (s *GrpcServer) isLocalRequest(forwardedHost string) bool { + failpoint.Inject("useForwardRequest", func() { + failpoint.Return(false) + }) if forwardedHost == "" { return true } diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 84c76508b2a..9f7e565aa70 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -894,6 +894,13 @@ func (suite *clientTestSuite) TestGetRegion() { return r.Buckets == nil }) config.EnableRegionBucket = true + + suite.NoError(failpoint.Enable("github.com/tikv/pd/server/grpcClientClosed", `return(true)`)) + suite.NoError(failpoint.Enable("github.com/tikv/pd/server/useForwardRequest", `return(true)`)) + suite.NoError(suite.reportBucket.Send(breq)) + suite.Error(suite.reportBucket.RecvMsg(breq)) + suite.NoError(failpoint.Disable("github.com/tikv/pd/server/grpcClientClosed")) + suite.NoError(failpoint.Disable("github.com/tikv/pd/server/useForwardRequest")) } func (suite *clientTestSuite) TestGetPrevRegion() {