Skip to content

Commit

Permalink
Fix unclosed tickers/timers (#7190)
Browse files Browse the repository at this point in the history
* fix resource leak
* fixes leak in blocks fetcher
* client/validator release ticker resorces
* powchain, more straightforward ticker closing
* adds missing ticker.stop() calls
* more straightforward ticker closing
* Merge refs/heads/master into fix-unclosed-tickers-timers
* Merge refs/heads/master into fix-unclosed-tickers-timers
* Merge refs/heads/master into fix-unclosed-tickers-timers
* gofmt issues introduced in #7176
  • Loading branch information
farazdagi authored Sep 8, 2020
1 parent f4848e4 commit 8baa22f
Show file tree
Hide file tree
Showing 10 changed files with 12 additions and 9 deletions.
1 change: 1 addition & 0 deletions beacon-chain/operations/attestations/prepare_forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var prepareForkChoiceAttsPeriod = slotutil.DivideSlotBy(3 /* times-per-slot */)
// every prepareForkChoiceAttsPeriod.
func (s *Service) prepareForkChoiceAtts() {
ticker := time.NewTicker(prepareForkChoiceAttsPeriod)
defer ticker.Stop()
for {
ctx := context.Background()
select {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/operations/attestations/prune_expired.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
// pruneAttsPool prunes attestations pool on every slot interval.
func (s *Service) pruneAttsPool() {
ticker := time.NewTicker(s.pruneInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.pruneExpiredAtts()
s.updateMetrics()
case <-s.ctx.Done():
log.Debug("Context closed, exiting routine")
ticker.Stop()
return
}
}
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/peers/scorer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func TestPeerScorer_PeerScorerManager_loop(t *testing.T) {
done <- struct{}{}
}()
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
Expand Down
3 changes: 1 addition & 2 deletions beacon-chain/powchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ func (s *Service) waitForConnection() {
logCounter++
}
ticker := time.NewTicker(backOffPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
Expand All @@ -460,12 +461,10 @@ func (s *Service) waitForConnection() {
log.WithFields(logrus.Fields{
"endpoint": s.httpEndpoint,
}).Info("Connected to eth1 proof-of-work chain")
ticker.Stop()
return
}
log.Debug("Eth1 node is currently syncing")
case <-s.ctx.Done():
ticker.Stop()
log.Debug("Received cancelled context,closing existing powchain service")
return
}
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ func (f *blocksFetcher) loop() {
// Periodically remove stale peer locks.
go func() {
ticker := time.NewTicker(peerLocksPollingInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
f.removeStalePeerLocks(peerLockMaxAge)
case <-f.ctx.Done():
ticker.Stop()
return
}
}
Expand Down Expand Up @@ -338,9 +338,9 @@ func (f *blocksFetcher) requestBlocks(
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
log.WithField("peer", pid).Debug("Slowing down for rate limit")
timer := time.NewTimer(f.rateLimiter.TillEmpty(pid.String()))
defer timer.Stop()
select {
case <-f.ctx.Done():
timer.Stop()
return nil, errFetcherCtxIsDone
case <-timer.C:
// Peer has gathered enough capacity to be polled again.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (q *blocksQueue) loop() {
}

ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for {
// Check highest expected slot when we approach chain's head slot.
if q.headFetcher.HeadSlot() >= q.highestExpectedSlot {
Expand Down Expand Up @@ -248,7 +249,6 @@ func (q *blocksQueue) loop() {
}
case <-q.ctx.Done():
log.Debug("Context closed, exiting goroutine (blocks queue)")
ticker.Stop()
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion shared/fileutil/fileutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"path/filepath"
"strings"

log "github.com/sirupsen/logrus"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

// ExpandPath given a string which may be a relative path.
Expand Down
1 change: 1 addition & 0 deletions slasher/beaconclient/chain_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (bs *Service) querySyncStatus(ctx context.Context) {
return
}
ticker := time.NewTicker(syncStatusPollingInterval)
defer ticker.Stop()
log.Info("Waiting for beacon node to be fully synced...")
for {
select {
Expand Down
4 changes: 2 additions & 2 deletions slasher/beaconclient/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) {
var atts []*ethpb.IndexedAttestation
halfSlot := slotutil.DivideSlotBy(2 /* 1/2 slot duration */)
ticker := time.NewTicker(halfSlot)
defer ticker.Stop()
for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -185,6 +186,7 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) {

func (bs *Service) restartBeaconConnection(ctx context.Context) error {
ticker := time.NewTicker(reconnectPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
Expand All @@ -203,12 +205,10 @@ func (bs *Service) restartBeaconConnection(ctx context.Context) error {
continue
}
log.Info("Beacon node is fully synced")

return nil
case <-ctx.Done():
log.Debug("Context closed, exiting reconnect routine")
return errors.New("context closed, no longer attempting to restart stream")
}
}

}
1 change: 1 addition & 0 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (v *validator) SlasherReady(ctx context.Context) error {
return nil
}
ticker := time.NewTicker(reconnectPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
Expand Down

0 comments on commit 8baa22f

Please sign in to comment.