From 05c467de0fe549ff9e05a5e9aa361660c9a5dfa3 Mon Sep 17 00:00:00 2001 From: Vlad Date: Thu, 26 Jan 2023 14:12:12 +0300 Subject: [PATCH] ensure only one instance of EnsurePeers is running --- share/availability/discovery/discovery.go | 10 ++++++++++ share/availability/full/availability.go | 2 +- share/availability/light/availability.go | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/share/availability/discovery/discovery.go b/share/availability/discovery/discovery.go index 95e8e3b7a8..09a824218f 100644 --- a/share/availability/discovery/discovery.go +++ b/share/availability/discovery/discovery.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "sync/atomic" "time" logging "github.com/ipfs/go-log/v2" @@ -41,6 +42,8 @@ type Discovery struct { advertiseInterval time.Duration // onUpdatedPeers will be called on peer set changes onUpdatedPeers OnUpdatedPeers + // ensureIsRunning indicates if ensure process has been started already + ensureIsRunning atomic.Bool } type OnUpdatedPeers func(peerID peer.ID, isAdded bool) @@ -62,6 +65,7 @@ func NewDiscovery( discInterval, advertiseInterval, func(peer.ID, bool) {}, + atomic.Bool{}, } } @@ -102,6 +106,12 @@ func (d *Discovery) handlePeerFound(ctx context.Context, topic string, peer peer // It starts peer discovery every 30 seconds until peer cache reaches peersLimit. // Discovery is restarted if any previously connected peers disconnect. func (d *Discovery) EnsurePeers(ctx context.Context) { + if d.ensureIsRunning.CompareAndSwap(false, true) { + go d.ensurePeers(ctx) + } +} + +func (d *Discovery) ensurePeers(ctx context.Context) { if d.peersLimit == 0 { log.Warn("peers limit is set to 0. Skipping discovery...") return diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index f6ef1ff05f..9a0ce67f70 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -37,7 +37,7 @@ func (fa *ShareAvailability) Start(context.Context) error { fa.cancel = cancel go fa.disc.Advertise(ctx) - go fa.disc.EnsurePeers(ctx) + fa.disc.EnsurePeers(ctx) return nil } diff --git a/share/availability/light/availability.go b/share/availability/light/availability.go index f804a91e32..3519ae1c8a 100644 --- a/share/availability/light/availability.go +++ b/share/availability/light/availability.go @@ -44,7 +44,7 @@ func (la *ShareAvailability) Start(context.Context) error { ctx, cancel := context.WithCancel(context.Background()) la.cancel = cancel - go la.disc.EnsurePeers(ctx) + la.disc.EnsurePeers(ctx) return nil }