Skip to content

Commit

Permalink
feat(share/discovery): Add callback on peers set updates. (#1609)
Browse files Browse the repository at this point in the history
notify Discovery clients about peers set changes in async manner by
calling provided callback.

Co-authored-by: Ryan <ryan@celestia.org>
  • Loading branch information
walldiss and distractedm1nd authored Jan 20, 2023
1 parent 8a101d9 commit 7305099
Showing 1 changed file with 16 additions and 0 deletions.
16 changes: 16 additions & 0 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ type Discovery struct {
discoveryInterval time.Duration
// advertiseInterval is an interval between advertising sessions.
advertiseInterval time.Duration
// onUpdatedPeers will be called on peer set changes
onUpdatedPeers OnUpdatedPeers
}

type OnUpdatedPeers func(peerID peer.ID, isAdded bool)

// NewDiscovery constructs a new discovery.
func NewDiscovery(
h host.Host,
Expand All @@ -57,6 +61,15 @@ func NewDiscovery(
peersLimit,
discInterval,
advertiseInterval,
func(peer.ID, bool) {},
}
}

// WithOnPeersUpdate adds OnPeersUpdate callback call on every update of discovered peers list.
func (d *Discovery) WithOnPeersUpdate(f OnUpdatedPeers) {
d.onUpdatedPeers = func(peerID peer.ID, isAdded bool) {
d.onUpdatedPeers(peerID, isAdded)
f(peerID, isAdded)
}
}

Expand All @@ -78,6 +91,8 @@ func (d *Discovery) handlePeerFound(ctx context.Context, topic string, peer peer
d.set.Remove(peer.ID)
return
}

d.onUpdatedPeers(peer.ID, true)
log.Debugw("added peer to set", "id", peer.ID)
// add tag to protect peer of being killed by ConnManager
d.host.ConnManager().TagPeer(peer.ID, topic, peerWeight)
Expand Down Expand Up @@ -132,6 +147,7 @@ func (d *Discovery) EnsurePeers(ctx context.Context) {
if d.set.Contains(connStatus.Peer) {
d.connector.RestartBackoff(connStatus.Peer)
d.set.Remove(connStatus.Peer)
d.onUpdatedPeers(connStatus.Peer, false)
d.host.ConnManager().UntagPeer(connStatus.Peer, topic)
t.Reset(d.discoveryInterval)
}
Expand Down

0 comments on commit 7305099

Please sign in to comment.