From 10a15dac29d6c67b3218363b20f300854e469132 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 19 Apr 2023 15:40:54 +0800 Subject: [PATCH 1/3] mcs: update client when meet transport is closing Signed-off-by: lhy1024 --- server/grpc_service.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 48ac9875415..8d83aa3c324 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -20,6 +20,7 @@ import ( "io" "path" "strconv" + "strings" "sync/atomic" "time" @@ -1787,7 +1788,12 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest }) ts, err := forwardStream.Recv() if err != nil { - log.Error("get global tso from tso server failed", zap.Error(err)) + log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso", forwardedHost)) + if strings.Contains(err.Error(), "transport is closing") { + s.tsoClientPool.Lock() + delete(s.tsoClientPool.clients, forwardedHost) + s.tsoClientPool.Unlock() + } return pdpb.Timestamp{}, err } return *ts.GetTimestamp(), nil From c8820e5f16b589586203c8f2d2d571ee9f55776b Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 19 Apr 2023 17:21:05 +0800 Subject: [PATCH 2/3] address comments Signed-off-by: lhy1024 --- server/grpc_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index 8d83aa3c324..f0939a45043 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1789,7 +1789,7 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest ts, err := forwardStream.Recv() if err != nil { log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso", forwardedHost)) - if strings.Contains(err.Error(), "transport is closing") { + if strings.Contains(err.Error(), codes.Unavailable.String()) { s.tsoClientPool.Lock() delete(s.tsoClientPool.clients, forwardedHost) s.tsoClientPool.Unlock() From aa4caf21687cb15190774c65438bb87b99e80388 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 19 Apr 2023 17:40:36 +0800 Subject: [PATCH 3/3] add retry Signed-off-by: lhy1024 --- server/grpc_service.go | 44 +++++++++++++++++++++++++++--------------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/server/grpc_service.go b/server/grpc_service.go index f0939a45043..d9883e4453f 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -49,7 +49,8 @@ import ( ) const ( - heartbeatSendTimeout = 5 * time.Second + heartbeatSendTimeout = 5 * time.Second + maxRetryTimesGetGlobalTSOFromTSOServer = 3 ) // gRPC errors @@ -1774,29 +1775,40 @@ func (s *GrpcServer) getGlobalTSOFromTSOServer(ctx context.Context) (pdpb.Timest if !ok || forwardedHost == "" { return pdpb.Timestamp{}, ErrNotFoundTSOAddr } - forwardStream, err := s.getTSOForwardStream(forwardedHost) - if err != nil { - return pdpb.Timestamp{}, err - } - forwardStream.Send(&tsopb.TsoRequest{ + request := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: s.clusterID, KeyspaceId: utils.DefaultKeyspaceID, KeyspaceGroupId: utils.DefaultKeyspaceGroupID, }, Count: 1, - }) - ts, err := forwardStream.Recv() - if err != nil { - log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso", forwardedHost)) - if strings.Contains(err.Error(), codes.Unavailable.String()) { - s.tsoClientPool.Lock() - delete(s.tsoClientPool.clients, forwardedHost) - s.tsoClientPool.Unlock() + } + var ( + forwardStream tsopb.TSO_TsoClient + ts *tsopb.TsoResponse + err error + ) + for i := 0; i < maxRetryTimesGetGlobalTSOFromTSOServer; i++ { + forwardStream, err = s.getTSOForwardStream(forwardedHost) + if err != nil { + return pdpb.Timestamp{}, err + } + forwardStream.Send(request) + ts, err = forwardStream.Recv() + if err != nil { + if strings.Contains(err.Error(), codes.Unavailable.String()) { + s.tsoClientPool.Lock() + delete(s.tsoClientPool.clients, forwardedHost) + s.tsoClientPool.Unlock() + continue + } + log.Error("get global tso from tso service primary addr failed", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return pdpb.Timestamp{}, err } - return pdpb.Timestamp{}, err + return *ts.GetTimestamp(), nil } - return *ts.GetTimestamp(), nil + log.Error("get global tso from tso service primary addr failed after retry", zap.Error(err), zap.String("tso-addr", forwardedHost)) + return pdpb.Timestamp{}, err } func (s *GrpcServer) getTSOForwardStream(forwardedHost string) (tsopb.TSO_TsoClient, error) {