From 1349aacc1d990a031b3db8b9544a25a348bea8b4 Mon Sep 17 00:00:00 2001 From: nisdas Date: Thu, 28 Apr 2022 18:08:12 +0800 Subject: [PATCH] cleanup discarded connections --- beacon-chain/powchain/rpc_connection.go | 12 ++++++++++++ beacon-chain/powchain/service.go | 9 +++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/beacon-chain/powchain/rpc_connection.go b/beacon-chain/powchain/rpc_connection.go index 2918fe73e9a2..be739951be7d 100644 --- a/beacon-chain/powchain/rpc_connection.go +++ b/beacon-chain/powchain/rpc_connection.go @@ -62,11 +62,17 @@ func (s *Service) pollConnectionStatus(ctx context.Context) { select { case <-ticker.C: log.Debugf("Trying to dial endpoint: %s", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url)) + currClient := s.rpcClient if err := s.setupExecutionClientConnections(ctx, s.cfg.currHttpEndpoint); err != nil { errorLogger(err, "Could not connect to execution client endpoint") s.runError = err s.fallbackToNextEndpoint() + continue } + // Close previous client, if connection was successful. + currClient.Close() + log.Infof("Connected to new endpoint: %s", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url)) + return case <-s.ctx.Done(): log.Debug("Received cancelled context,closing existing powchain service") return @@ -80,10 +86,13 @@ func (s *Service) retryExecutionClientConnection(ctx context.Context, err error) s.updateConnectedETH1(false) // Back off for a while before redialing. time.Sleep(backOffPeriod) + currClient := s.rpcClient if err := s.setupExecutionClientConnections(ctx, s.cfg.currHttpEndpoint); err != nil { s.runError = err return } + // Close previous client, if connection was successful. + currClient.Close() // Reset run error in the event of a successful connection. s.runError = nil } @@ -99,10 +108,13 @@ func (s *Service) checkDefaultEndpoint(ctx context.Context) { return } + currClient := s.rpcClient if err := s.setupExecutionClientConnections(ctx, primaryEndpoint); err != nil { log.Debugf("Primary endpoint not ready: %v", err) return } + // Close previous client, if connection was successful. + currClient.Close() s.updateCurrHttpEndpoint(primaryEndpoint) } diff --git a/beacon-chain/powchain/service.go b/beacon-chain/powchain/service.go index 56129103bdb2..42b8c2f5aaf4 100644 --- a/beacon-chain/powchain/service.go +++ b/beacon-chain/powchain/service.go @@ -247,7 +247,7 @@ func (s *Service) Start() { s.isRunning = true // Poll the execution client connection and fallback if errors occur. - go s.pollConnectionStatus(s.ctx) + s.pollConnectionStatus(s.ctx) // Check transition configuration for the engine API client in the background. go s.checkTransitionConfiguration(s.ctx, make(chan *feed.Event, 1)) @@ -356,6 +356,7 @@ func (s *Service) ETH1ConnectionErrors() []error { for _, ep := range s.cfg.httpEndpoints { client, err := s.newRPCClientWithAuth(s.ctx, ep) if err != nil { + client.Close() errs = append(errs, err) continue } @@ -607,7 +608,9 @@ func (s *Service) initPOWService() { } s.chainStartData.GenesisBlock = genBlock if err := s.savePowchainData(ctx); err != nil { + s.retryExecutionClientConnection(ctx, err) errorLogger(err, "Unable to save execution client data") + continue } } return @@ -629,17 +632,19 @@ func (s *Service) run(done <-chan struct{}) { case <-done: s.isRunning = false s.runError = nil + s.rpcClient.Close() s.updateConnectedETH1(false) log.Debug("Context closed, exiting goroutine") return case <-s.headTicker.C: head, err := s.eth1DataFetcher.HeaderByNumber(s.ctx, nil) if err != nil { + s.pollConnectionStatus(s.ctx) log.WithError(err).Debug("Could not fetch latest eth1 header") continue } if eth1HeadIsBehind(head.Time) { - s.retryExecutionClientConnection(s.ctx, err) + s.pollConnectionStatus(s.ctx) log.WithError(errFarBehind).Debug("Could not get an up to date eth1 header") continue }