Skip to content

Commit

Permalink
fix: remove sucess peers too.
Browse files Browse the repository at this point in the history
  • Loading branch information
kaichaosun committed Jan 8, 2024
1 parent 48d9a1e commit a306492
Showing 1 changed file with 20 additions and 9 deletions.
29 changes: 20 additions & 9 deletions wakuv2/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,7 @@ func (mgr *FilterManager) processEvents(ev *FilterEvent) {
mgr.resubscribe(ev.filterID)
break
}
// Remove peer from list
for i, p := range mgr.peers {
if ev.peerID == p {
mgr.peers = append(mgr.peers[:i], mgr.peers[i+1:]...)
break
}
}

// Delete subs for removed peer
for filterID, subs := range mgr.filterSubs {
for _, sub := range subs {
Expand Down Expand Up @@ -216,8 +210,9 @@ func (mgr *FilterManager) subscribeToFilter(filterID string, tempID string) {
if err != nil {
logger.Warn("filter could not add wakuv2 filter for peers", zap.Any("peers", mgr.peers), zap.Error(err))
} else {
logger.Debug("filter subscription success", zap.Any("peers", mgr.peers), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
sub = subDetails[0]
logger.Debug("filter subscription success", zap.Stringer("peer", sub.PeerID), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
mgr.removePeer(sub.PeerID)
}

success := err == nil
Expand Down Expand Up @@ -281,6 +276,7 @@ func (mgr *FilterManager) pingPeers() {
} else {
logger.Debug("filter aliveness check failed", zap.Stringer("peerId", sub.PeerID), zap.Error(err))
}
mgr.removePeer(sub.PeerID)
mgr.eventChan <- FilterEvent{eventType: FilterEventPingResult, peerID: sub.PeerID, success: alive}
}(sub)
}
Expand Down Expand Up @@ -330,14 +326,20 @@ func (mgr *FilterManager) resubscribe(filterID string) {
// do nothing
return
}
for _, sub := range subs {
if sub == nil {
continue
}
mgr.removePeer(sub.PeerID)
}
mgr.logger.Debug("filter resubscribe subs count:", zap.String("filterId", filterID), zap.Int("len", len(subs)))
for i := len(subs); i < mgr.settings.MinPeersForFilter; i++ {
mgr.logger.Debug("filter check not passed, try subscribing to peers", zap.String("filterId", filterID))

// Create sub placeholder in order to avoid potentially too many subs
tempID := uuid.NewString()
subs[tempID] = nil
go mgr.subscribeToFilter(filterID, tempID)
mgr.subscribeToFilter(filterID, tempID)
}
}

Expand All @@ -359,3 +361,12 @@ func (mgr *FilterManager) runFilterSubscriptionLoop(sub *subscription.Subscripti
}
}
}

func (mgr *FilterManager) removePeer(peerID peer.ID) {
for i, p := range mgr.peers {
if peerID == p {
mgr.peers = append(mgr.peers[:i], mgr.peers[i+1:]...)
break
}
}
}

0 comments on commit a306492

Please sign in to comment.