Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Handle feedback - fix grpc client connection pool (server side) resou…
Browse files Browse the repository at this point in the history
…rce leak problem.

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
binshi-bing committed Jun 12, 2023

Verified

This commit was signed with the committer’s verified signature.
chenrui333 Rui Chen
1 parent 891c9d1 commit d357d79
Showing 4 changed files with 34 additions and 31 deletions.
8 changes: 4 additions & 4 deletions pkg/utils/etcdutil/etcdutil.go
Original file line number Diff line number Diff line change
@@ -608,10 +608,10 @@ func (lw *LoopWatcher) load(ctx context.Context) (nextRevision int64, err error)

// ForceLoad forces to load the key.
func (lw *LoopWatcher) ForceLoad() {
// When NotLeader error happened, a large volume of force load requests will be received here,
// so the minimal force load interval is used to avoid the congestion and two-phase locking is
// used to let most of the requests return directly without acquiring the write lock and causing
// the system to choke.
// When NotLeader error happens, a large volume of force load requests will be received here,
// so the minimal interval between two force loads (from etcd) is used to avoid the congestion.
// Two-phase locking is also used to let most of the requests return directly without acquiring
// the write lock and causing the system to choke.
lw.forceLoadMu.RLock()
if time.Since(lw.lastTimeForceLoad) < defaultForceLoadMinimalInterval {
lw.forceLoadMu.RUnlock()
36 changes: 24 additions & 12 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
@@ -56,6 +56,7 @@ const (
getMinTSFromTSOServerTimeout = 1 * time.Second
defaultMaxConcurrentTSOProxyStreamings = 5000
defaultTSOProxyClientRecvTimeout = 1 * time.Hour
defaultGRPCDialTimeout = 3 * time.Second
)

// gRPC errors
@@ -2056,19 +2057,30 @@ func (s *GrpcServer) validateInternalRequest(header *pdpb.RequestHeader, onlyAll

func (s *GrpcServer) getDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) {
client, ok := s.clientConns.Load(forwardedHost)
if !ok {
tlsConfig, err := s.GetTLSConfig().ToTLSConfig()
if err != nil {
return nil, err
}
cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig)
if err != nil {
return nil, err
}
client = cc
s.clientConns.Store(forwardedHost, cc)
if ok {
// Mostly, the connection is already established, and return it directly.
return client.(*grpc.ClientConn), nil
}

tlsConfig, err := s.GetTLSConfig().ToTLSConfig()
if err != nil {
return nil, err
}
ctxTimeout, cancel := context.WithTimeout(ctx, defaultGRPCDialTimeout)
defer cancel()
newConn, err := grpcutil.GetClientConn(ctxTimeout, forwardedHost, tlsConfig)
if err != nil {
return nil, err
}
conn, loaded := s.clientConns.LoadOrStore(forwardedHost, newConn)
if !loaded {
// Successfully stored the connection we created.
return newConn, nil
}
return client.(*grpc.ClientConn), nil
// Loaded a connection created/stored by another goroutine, so close the one we created
// and return the one we loaded.
newConn.Close()
return conn.(*grpc.ClientConn), nil
}

func (s *GrpcServer) isLocalRequest(forwardedHost string) bool {
2 changes: 1 addition & 1 deletion tests/integrations/mcs/go.mod
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@ require (
github.com/tikv/pd/client v0.0.0-00010101000000-000000000000
go.etcd.io/etcd v0.5.0-alpha.5.0.20220915004622-85b640cee793
go.uber.org/goleak v1.1.12
go.uber.org/zap v1.24.0
google.golang.org/grpc v1.54.0
)

@@ -158,7 +159,6 @@ require (
go.uber.org/dig v1.9.0 // indirect
go.uber.org/fx v1.12.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/exp v0.0.0-20230108222341-4b8118a2686a // indirect
golang.org/x/image v0.5.0 // indirect
19 changes: 5 additions & 14 deletions tests/integrations/mcs/tso/proxy_test.go
Original file line number Diff line number Diff line change
@@ -31,6 +31,7 @@ import (
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
"go.uber.org/zap"
"google.golang.org/grpc"
)

@@ -81,28 +82,22 @@ func (s *tsoProxyTestSuite) SetupSuite() {
}

func (s *tsoProxyTestSuite) TearDownSuite() {
log.Info("exiting tsoProxyTestSuite")
s.cleanupGRPCStreams(s.cleanupFuncs)
s.tsoCluster.Destroy()
s.apiCluster.Destroy()
s.cancel()
log.Info("exited tsoProxyTestSuite")
}

// TestTSOProxyBasic tests the TSO Proxy's basic function to forward TSO requests to TSO microservice.
// It also verifies the correctness of the TSO Proxy's TSO response, such as the count of timestamps
// to retrieve in one TSO request and the monotonicity of the returned timestamps.
func (s *tsoProxyTestSuite) TestTSOProxyBasic() {
log.Info("entering tsoProxyTestSuite/TestTSOProxyBasic")
defer log.Info("exited tsoProxyTestSuite/TestTSOProxyBasic")
s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 100, true)
}

// TestTSOProxyWithLargeCount tests while some grpc streams being cancelled and the others are still
// working, the TSO Proxy can still work correctly.
func (s *tsoProxyTestSuite) TestTSOProxyWorksWithCancellation() {
log.Info("entering tsoProxyTestSuite/TestTSOProxyWorksWithCancellation")
defer log.Info("exited tsoProxyTestSuite/TestTSOProxyWorksWithCancellation")
re := s.Require()
wg := &sync.WaitGroup{}
wg.Add(2)
@@ -129,8 +124,6 @@ func (s *tsoProxyTestSuite) TestTSOProxyWorksWithCancellation() {
// but the TSO Proxy should not panic, blocked or deadlocked, and if it returns a timestamp, it should be a valid
// timestamp monotonic increasing. After the stress, the TSO Proxy should still work correctly.
func TestTSOProxyStress(t *testing.T) {
log.Info("entering tsoProxyTestSuite/TestTSOProxyStress")
defer log.Info("exited tsoProxyTestSuite/TestTSOProxyStress")
s := new(tsoProxyTestSuite)
s.SetT(&testing.T{})
s.SetupSuite()
@@ -151,31 +144,29 @@ func TestTSOProxyStress(t *testing.T) {

// Push load from many concurrent clients in multiple rounds and increase the #client each round.
for i := 0; i < totalRounds; i++ {
fmt.Printf("start the %dth round of stress test with %d concurrent clients.\n",
i, len(streams)+clientsIncr)
log.Info("start a new round of stress test",
zap.Int("round-id", i), zap.Int("clients-count", len(streams)+clientsIncr))
streamsTemp, cleanupFuncsTemp :=
createTSOStreams(re, s.ctx, s.backendEndpoints, clientsIncr)
streams = append(streams, streamsTemp...)
cleanupFuncs = append(cleanupFuncs, cleanupFuncsTemp...)
s.verifyTSOProxy(ctxTimeout, streams, cleanupFuncs, 50, false)
}
s.cleanupGRPCStreams(cleanupFuncs)
fmt.Println("the stress test completed.")
log.Info("the stress test completed.")

// Wait for the TSO Proxy to recover from the stress.
time.Sleep(recoverySLA)

// Verify the TSO Proxy can still work correctly after the stress.
s.verifyTSOProxy(s.ctx, s.streams, s.cleanupFuncs, 100, true)
s.TearDownSuite()
fmt.Println("verified that the TSO Proxy can still work correctly after the stress.")
log.Info("verified that the TSO Proxy can still work correctly after the stress.")
}

// TestTSOProxyClientsWithSameContext tests the TSO Proxy can work correctly while the grpc streams
// are created with the same context.
func (s *tsoProxyTestSuite) TestTSOProxyClientsWithSameContext() {
log.Info("entering tsoProxyTestSuite/TestTSOProxyClientsWithSameContext")
defer log.Info("exited tsoProxyTestSuite/TestTSOProxyClientsWithSameContext")
re := s.Require()
const clientCount = 1000
cleanupFuncs := make([]testutil.CleanupFunc, clientCount)

0 comments on commit d357d79

Please sign in to comment.