Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup Discarded Connections Correctly #10574

Merged
merged 3 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions beacon-chain/powchain/rpc_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,17 @@ func (s *Service) pollConnectionStatus(ctx context.Context) {
select {
case <-ticker.C:
log.Debugf("Trying to dial endpoint: %s", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url))
currClient := s.rpcClient
if err := s.setupExecutionClientConnections(ctx, s.cfg.currHttpEndpoint); err != nil {
errorLogger(err, "Could not connect to execution client endpoint")
s.runError = err
s.fallbackToNextEndpoint()
continue
}
// Close previous client, if connection was successful.
currClient.Close()
log.Infof("Connected to new endpoint: %s", logs.MaskCredentialsLogging(s.cfg.currHttpEndpoint.Url))
return
case <-s.ctx.Done():
log.Debug("Received cancelled context,closing existing powchain service")
return
Expand All @@ -80,10 +86,13 @@ func (s *Service) retryExecutionClientConnection(ctx context.Context, err error)
s.updateConnectedETH1(false)
// Back off for a while before redialing.
time.Sleep(backOffPeriod)
currClient := s.rpcClient
if err := s.setupExecutionClientConnections(ctx, s.cfg.currHttpEndpoint); err != nil {
s.runError = err
return
}
// Close previous client, if connection was successful.
currClient.Close()
// Reset run error in the event of a successful connection.
s.runError = nil
}
Expand All @@ -99,10 +108,13 @@ func (s *Service) checkDefaultEndpoint(ctx context.Context) {
return
}

currClient := s.rpcClient
if err := s.setupExecutionClientConnections(ctx, primaryEndpoint); err != nil {
log.Debugf("Primary endpoint not ready: %v", err)
return
}
// Close previous client, if connection was successful.
currClient.Close()
s.updateCurrHttpEndpoint(primaryEndpoint)
}

Expand Down
9 changes: 7 additions & 2 deletions beacon-chain/powchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (s *Service) Start() {
s.isRunning = true

// Poll the execution client connection and fallback if errors occur.
go s.pollConnectionStatus(s.ctx)
s.pollConnectionStatus(s.ctx)

// Check transition configuration for the engine API client in the background.
go s.checkTransitionConfiguration(s.ctx, make(chan *feed.Event, 1))
Expand Down Expand Up @@ -356,6 +356,7 @@ func (s *Service) ETH1ConnectionErrors() []error {
for _, ep := range s.cfg.httpEndpoints {
client, err := s.newRPCClientWithAuth(s.ctx, ep)
if err != nil {
client.Close()
errs = append(errs, err)
continue
}
Expand Down Expand Up @@ -607,7 +608,9 @@ func (s *Service) initPOWService() {
}
s.chainStartData.GenesisBlock = genBlock
if err := s.savePowchainData(ctx); err != nil {
s.retryExecutionClientConnection(ctx, err)
errorLogger(err, "Unable to save execution client data")
continue
}
}
return
Expand All @@ -629,17 +632,19 @@ func (s *Service) run(done <-chan struct{}) {
case <-done:
s.isRunning = false
s.runError = nil
s.rpcClient.Close()
s.updateConnectedETH1(false)
log.Debug("Context closed, exiting goroutine")
return
case <-s.headTicker.C:
head, err := s.eth1DataFetcher.HeaderByNumber(s.ctx, nil)
if err != nil {
s.pollConnectionStatus(s.ctx)
log.WithError(err).Debug("Could not fetch latest eth1 header")
continue
}
if eth1HeadIsBehind(head.Time) {
s.retryExecutionClientConnection(s.ctx, err)
s.pollConnectionStatus(s.ctx)
log.WithError(errFarBehind).Debug("Could not get an up to date eth1 header")
continue
}
Expand Down