Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unclosed tickers/timers #7190

Merged
merged 10 commits into from
Sep 8, 2020
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var prepareForkChoiceAttsPeriod = slotutil.DivideSlotBy(3 /* times-per-slot */)
// every prepareForkChoiceAttsPeriod.
func (s *Service) prepareForkChoiceAtts() {
ticker := time.NewTicker(prepareForkChoiceAttsPeriod)
defer ticker.Stop()
for {
ctx := context.Background()
select {
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/operations/attestations/prune_expired.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@ import (
// pruneAttsPool prunes attestations pool on every slot interval.
func (s *Service) pruneAttsPool() {
ticker := time.NewTicker(s.pruneInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.pruneExpiredAtts()
s.updateMetrics()
case <-s.ctx.Done():
log.Debug("Context closed, exiting routine")
ticker.Stop()
return
}
}
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/p2p/peers/scorer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ func TestPeerScorer_PeerScorerManager_loop(t *testing.T) {
done <- struct{}{}
}()
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
Expand Down
3 changes: 1 addition & 2 deletions beacon-chain/powchain/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ func (s *Service) waitForConnection() {
logCounter++
}
ticker := time.NewTicker(backOffPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
Expand All @@ -460,12 +461,10 @@ func (s *Service) waitForConnection() {
log.WithFields(logrus.Fields{
"endpoint": s.httpEndpoint,
}).Info("Connected to eth1 proof-of-work chain")
ticker.Stop()
return
}
log.Debug("Eth1 node is currently syncing")
case <-s.ctx.Done():
ticker.Stop()
log.Debug("Received cancelled context,closing existing powchain service")
return
}
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,12 +179,12 @@ func (f *blocksFetcher) loop() {
// Periodically remove stale peer locks.
go func() {
ticker := time.NewTicker(peerLocksPollingInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
f.removeStalePeerLocks(peerLockMaxAge)
case <-f.ctx.Done():
ticker.Stop()
return
}
}
Expand Down Expand Up @@ -338,9 +338,9 @@ func (f *blocksFetcher) requestBlocks(
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
log.WithField("peer", pid).Debug("Slowing down for rate limit")
timer := time.NewTimer(f.rateLimiter.TillEmpty(pid.String()))
defer timer.Stop()
select {
case <-f.ctx.Done():
timer.Stop()
return nil, errFetcherCtxIsDone
case <-timer.C:
// Peer has gathered enough capacity to be polled again.
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func (q *blocksQueue) loop() {
}

ticker := time.NewTicker(pollingInterval)
defer ticker.Stop()
for {
// Check highest expected slot when we approach chain's head slot.
if q.headFetcher.HeadSlot() >= q.highestExpectedSlot {
Expand Down Expand Up @@ -248,7 +249,6 @@ func (q *blocksQueue) loop() {
}
case <-q.ctx.Done():
log.Debug("Context closed, exiting goroutine (blocks queue)")
ticker.Stop()
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion shared/fileutil/fileutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"path/filepath"
"strings"

log "github.com/sirupsen/logrus"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

// ExpandPath given a string which may be a relative path.
Expand Down
1 change: 1 addition & 0 deletions slasher/beaconclient/chain_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (bs *Service) querySyncStatus(ctx context.Context) {
return
}
ticker := time.NewTicker(syncStatusPollingInterval)
defer ticker.Stop()
log.Info("Waiting for beacon node to be fully synced...")
for {
select {
Expand Down
4 changes: 2 additions & 2 deletions slasher/beaconclient/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) {
var atts []*ethpb.IndexedAttestation
halfSlot := slotutil.DivideSlotBy(2 /* 1/2 slot duration */)
ticker := time.NewTicker(halfSlot)
defer ticker.Stop()
for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -185,6 +186,7 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) {

func (bs *Service) restartBeaconConnection(ctx context.Context) error {
ticker := time.NewTicker(reconnectPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
Expand All @@ -203,12 +205,10 @@ func (bs *Service) restartBeaconConnection(ctx context.Context) error {
continue
}
log.Info("Beacon node is fully synced")

return nil
case <-ctx.Done():
log.Debug("Context closed, exiting reconnect routine")
return errors.New("context closed, no longer attempting to restart stream")
}
}

}
1 change: 1 addition & 0 deletions validator/client/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ func (v *validator) SlasherReady(ctx context.Context) error {
return nil
}
ticker := time.NewTicker(reconnectPeriod)
defer ticker.Stop()
for {
select {
case <-ticker.C:
Expand Down