From d357d7971dda5aa59bce066e55568f25bf2696a7 Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Sat, 10 Jun 2023 12:11:55 -0700 Subject: [PATCH] Handle feedback - fix grpc client connection pool (server side) resource leak problem. Signed-off-by: Bin Shi --- pkg/utils/etcdutil/etcdutil.go | 8 +++--- server/grpc_service.go | 36 ++++++++++++++++-------- tests/integrations/mcs/go.mod | 2 +- tests/integrations/mcs/tso/proxy_test.go | 19 ++++--------- 4 files changed, 34 insertions(+), 31 deletions(-) diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 67395768fe2e..4b1f1d7fa151 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -608,10 +608,10 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error) // ForceLoad forces to load the key. func (lw *LoopWatcher) ForceLoad() { - // When NotLeader error happened, a large volume of force load requests will be received here, - // so the minimal force load interval is used to avoid the congestion and two-phase locking is - // used to let most of the requests return directly without acquiring the write lock and causing - // the system to choke. + // When NotLeader error happens, a large volume of force load requests will be received here, + // so the minimal interval between two force loads (from etcd) is used to avoid the congestion. + // Two-phase locking is also used to let most of the requests return directly without acquiring + // the write lock and causing the system to choke. lw.forceLoadMu.RLock() if time.Since(lw.lastTimeForceLoad) < defaultForceLoadMinimalInterval { lw.forceLoadMu.RUnlock() diff --git a/server/grpc_service.go b/server/grpc_service.go index 185971baa06b..2d3103142ae3 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -56,6 +56,7 @@ const ( getMinTSFromTSOServerTimeout = 1 * time.Second defaultMaxConcurrentTSOProxyStreamings = 5000 defaultTSOProxyClientRecvTimeout = 1 * time.Hour + defaultGRPCDialTimeout = 3 * time.Second ) // gRPC errors @@ -2056,19 +2057,30 @@ func (s *GrpcServer) validateInternalRequest(header *pdpb.RequestHeader, onlyAll func (s *GrpcServer) getDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) { client, ok := s.clientConns.Load(forwardedHost) - if !ok { - tlsConfig, err := s.GetTLSConfig().ToTLSConfig() - if err != nil { - return nil, err - } - cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig) - if err != nil { - return nil, err - } - client = cc - s.clientConns.Store(forwardedHost, cc) + if ok { + // Mostly, the connection is already established, and return it directly. + return client.(*grpc.ClientConn), nil + } + + tlsConfig, err := s.GetTLSConfig().ToTLSConfig() + if err != nil { + return nil, err + } + ctxTimeout, cancel := context.WithTimeout(ctx, defaultGRPCDialTimeout) + defer cancel() + newConn, err := grpcutil.GetClientConn(ctxTimeout, forwardedHost, tlsConfig) + if err != nil { + return nil, err + } + conn, loaded := s.clientConns.LoadOrStore(forwardedHost, newConn) + if !loaded { + // Successfully stored the connection we created. + return newConn, nil } - return client.(*grpc.ClientConn), nil + // Loaded a connection created/stored by another goroutine, so close the one we created + // and return the one we loaded. + newConn.Close() + return conn.(*grpc.ClientConn), nil } func (s *GrpcServer) isLocalRequest(forwardedHost string) bool { diff --git a/tests/integrations/mcs/go.mod b/tests/integrations/mcs/go.mod index 554b35cb45ae..2b225f0ced2e 100644 --- a/tests/integrations/mcs/go.mod +++ b/tests/integrations/mcs/go.mod @@ -19,6 +19,7 @@ require ( github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793 go.uber.org/goleak v1.1.12 + go.uber.org/zap v1.24.0 google.golang.org/grpc v1.54.0 ) @@ -158,7 +159,6 @@ require ( go.uber.org/dig v1.9.0 // indirect go.uber.org/fx v1.12.0 // indirect go.uber.org/multierr v1.11.0 // indirect - go.uber.org/zap v1.24.0 // indirect golang.org/x/crypto v0.1.0 // indirect golang.org/x/exp v0.0.0-20230108222341-4b8118a2686a // indirect golang.org/x/image v0.5.0 // indirect diff --git a/tests/integrations/mcs/tso/proxy_test.go b/tests/integrations/mcs/tso/proxy_test.go index a3c1927763f1..8ec437b4c03e 100644 --- a/tests/integrations/mcs/tso/proxy_test.go +++ b/tests/integrations/mcs/tso/proxy_test.go @@ -31,6 +31,7 @@ import ( "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/tests" "github.com/tikv/pd/tests/integrations/mcs" + "go.uber.org/zap" "google.golang.org/grpc" ) @@ -81,28 +82,22 @@ func (s *tsoProxyTestSuite) SetupSuite() { } func (s *tsoProxyTestSuite) TearDownSuite() { - log.Info("exiting tsoProxyTestSuite") s.cleanupGRPCStreams(s.cleanupFuncs) s.tsoCluster.Destroy() s.apiCluster.Destroy() s.cancel() - log.Info("exited tsoProxyTestSuite") } // TestTSOProxyBasic tests the TSO Proxy's basic function to forward TSO requests to TSO microservice. // It also verifies the correctness of the TSO Proxy's TSO response, such as the count of timestamps // to retrieve in one TSO request and the monotonicity of the returned timestamps. func (s *tsoProxyTestSuite) TestTSOProxyBasic() { - log.Info("entering tsoProxyTestSuite/TestTSOProxyBasic") - defer log.Info("exited tsoProxyTestSuite/TestTSOProxyBasic") s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 100, true) } // TestTSOProxyWithLargeCount tests while some grpc streams being cancelled and the others are still // working, the TSO Proxy can still work correctly. func (s *tsoProxyTestSuite) TestTSOProxyWorksWithCancellation() { - log.Info("entering tsoProxyTestSuite/TestTSOProxyWorksWithCancellation") - defer log.Info("exited tsoProxyTestSuite/TestTSOProxyWorksWithCancellation") re := s.Require() wg := &sync.WaitGroup{} wg.Add(2) @@ -129,8 +124,6 @@ func (s *tsoProxyTestSuite) TestTSOProxyWorksWithCancellation() { // but the TSO Proxy should not panic, blocked or deadlocked, and if it returns a timestamp, it should be a valid // timestamp monotonic increasing. After the stress, the TSO Proxy should still work correctly. func TestTSOProxyStress(t *testing.T) { - log.Info("entering tsoProxyTestSuite/TestTSOProxyStress") - defer log.Info("exited tsoProxyTestSuite/TestTSOProxyStress") s := new(tsoProxyTestSuite) s.SetT(&testing.T{}) s.SetupSuite() @@ -151,8 +144,8 @@ func TestTSOProxyStress(t *testing.T) { // Push load from many concurrent clients in multiple rounds and increase the #client each round. for i := 0; i < totalRounds; i++ { - fmt.Printf("start the %dth round of stress test with %d concurrent clients.\n", - i, len(streams)+clientsIncr) + log.Info("start a new round of stress test", + zap.Int("round-id", i), zap.Int("clients-count", len(streams)+clientsIncr)) streamsTemp, cleanupFuncsTemp := createTSOStreams(re, s.ctx, s.backendEndpoints, clientsIncr) streams = append(streams, streamsTemp...) @@ -160,7 +153,7 @@ func TestTSOProxyStress(t *testing.T) { s.verifyTSOProxy(ctxTimeout, streams, cleanupFuncs, 50, false) } s.cleanupGRPCStreams(cleanupFuncs) - fmt.Println("the stress test completed.") + log.Info("the stress test completed.") // Wait for the TSO Proxy to recover from the stress. time.Sleep(recoverySLA) @@ -168,14 +161,12 @@ func TestTSOProxyStress(t *testing.T) { // Verify the TSO Proxy can still work correctly after the stress. s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 100, true) s.TearDownSuite() - fmt.Println("verified that the TSO Proxy can still work correctly after the stress.") + log.Info("verified that the TSO Proxy can still work correctly after the stress.") } // TestTSOProxyClientsWithSameContext tests the TSO Proxy can work correctly while the grpc streams // are created with the same context. func (s *tsoProxyTestSuite) TestTSOProxyClientsWithSameContext() { - log.Info("entering tsoProxyTestSuite/TestTSOProxyClientsWithSameContext") - defer log.Info("exited tsoProxyTestSuite/TestTSOProxyClientsWithSameContext") re := s.Require() const clientCount = 1000 cleanupFuncs := make([]testutil.CleanupFunc, clientCount)