From 0c474bb42c8dd7f0c16fff9e37e9ccdb0e7481a8 Mon Sep 17 00:00:00 2001 From: kaichao Date: Fri, 12 Jan 2024 15:09:35 +0800 Subject: [PATCH] feat: use automatic peer selection for filter. (#4531) * feat: use automatic peer selection for filter. * fix: remove sucess peers too. * chore: remove filter manager state of peer candidates --- wakuv2/filter_manager.go | 72 ++++++---------------------------------- 1 file changed, 11 insertions(+), 61 deletions(-) diff --git a/wakuv2/filter_manager.go b/wakuv2/filter_manager.go index de8f253e548..425bd790dd3 100644 --- a/wakuv2/filter_manager.go +++ b/wakuv2/filter_manager.go @@ -2,9 +2,6 @@ package wakuv2 import ( "context" - "crypto/rand" - "errors" - "math/big" "sync" "time" @@ -19,7 +16,6 @@ import ( node "github.com/waku-org/go-waku/waku/v2/node" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" - "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" ) @@ -63,7 +59,6 @@ type FilterManager struct { isFilterSubAlive func(sub *subscription.SubscriptionDetails) error getFilter func(string) *common.Filter onNewEnvelopes func(env *protocol.Envelope) error - peers []peer.ID logger *zap.Logger settings settings node *node.WakuNode @@ -78,7 +73,6 @@ func newFilterManager(ctx context.Context, logger *zap.Logger, getFilterFn func( mgr.onNewEnvelopes = onNewEnvelopes mgr.filterSubs = make(FilterSubs) mgr.eventChan = make(chan FilterEvent, 100) - mgr.peers = make([]peer.ID, 0) mgr.settings = settings mgr.node = node mgr.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error { @@ -96,15 +90,11 @@ func (mgr *FilterManager) runFilterLoop(wg *sync.WaitGroup) { ticker := time.NewTicker(5 * time.Second) defer ticker.Stop() - // Populate filter peers initially - mgr.peers = mgr.findFilterPeers() // ordered list of peers to select from - for { select { case <-mgr.ctx.Done(): return case <-ticker.C: - mgr.peers = mgr.findFilterPeers() mgr.pingPeers() case ev := <-mgr.eventChan: mgr.processEvents(&ev) @@ -141,13 +131,7 @@ func (mgr *FilterManager) processEvents(ev *FilterEvent) { mgr.resubscribe(ev.filterID) break } - // Remove peer from list - for i, p := range mgr.peers { - if ev.peerID == p { - mgr.peers = append(mgr.peers[:i], mgr.peers[i+1:]...) - break - } - } + // Delete subs for removed peer for filterID, subs := range mgr.filterSubs { for _, sub := range subs { @@ -200,7 +184,7 @@ func (mgr *FilterManager) processEvents(ev *FilterEvent) { } } -func (mgr *FilterManager) subscribeToFilter(filterID string, peer peer.ID, tempID string) { +func (mgr *FilterManager) subscribeToFilter(filterID string, tempID string) { logger := mgr.logger.With(zap.String("filterId", filterID)) f := mgr.getFilter(filterID) @@ -210,17 +194,17 @@ func (mgr *FilterManager) subscribeToFilter(filterID string, peer peer.ID, tempI return } contentFilter := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics) - logger.Debug("filter subscribe to filter node", zap.Stringer("peer", peer), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList())) + logger.Debug("filter subscribe to filter node", zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList())) ctx, cancel := context.WithTimeout(mgr.ctx, requestTimeout) defer cancel() - subDetails, err := mgr.node.FilterLightnode().Subscribe(ctx, contentFilter, filter.WithPeer(peer)) + subDetails, err := mgr.node.FilterLightnode().Subscribe(ctx, contentFilter, filter.WithAutomaticPeerSelection()) var sub *subscription.SubscriptionDetails if err != nil { - logger.Warn("filter could not add wakuv2 filter for peer", zap.Stringer("peer", peer), zap.Error(err)) + logger.Warn("filter could not add wakuv2 filter for peers", zap.Error(err)) } else { - logger.Debug("filter subscription success", zap.Stringer("peer", peer), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList())) sub = subDetails[0] + logger.Debug("filter subscription success", zap.Stringer("peer", sub.PeerID), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList())) } success := err == nil @@ -299,35 +283,6 @@ func (mgr *FilterManager) buildContentFilter(pubsubTopic string, contentTopicSet return protocol.NewContentFilter(pubsubTopic, contentTopics...) } -// Find suitable peer(s) -func (mgr *FilterManager) findFilterPeers() []peer.ID { - allPeers := mgr.node.Host().Peerstore().Peers() - - peers := make([]peer.ID, 0) - for _, peer := range allPeers { - protocols, err := mgr.node.Host().Peerstore().SupportsProtocols(peer, filter.FilterSubscribeID_v20beta1, relay.WakuRelayID_v200) - if err != nil { - mgr.logger.Debug("SupportsProtocols error", zap.Error(err)) - continue - } - - if len(protocols) == 2 { - peers = append(peers, peer) - } - } - - mgr.logger.Debug("Filtered peers", zap.Int("cnt", len(peers))) - return peers -} - -func (mgr *FilterManager) findPeerCandidate() (peer.ID, error) { - if len(mgr.peers) == 0 { - return "", errors.New("filter could not select a suitable peer") - } - n, _ := rand.Int(rand.Reader, big.NewInt(int64(len(mgr.peers)))) - return mgr.peers[n.Int64()], nil -} - func (mgr *FilterManager) resubscribe(filterID string) { subs, found := mgr.filterSubs[filterID] if !found { @@ -344,16 +299,11 @@ func (mgr *FilterManager) resubscribe(filterID string) { mgr.logger.Debug("filter resubscribe subs count:", zap.String("filterId", filterID), zap.Int("len", len(subs))) for i := len(subs); i < mgr.settings.MinPeersForFilter; i++ { mgr.logger.Debug("filter check not passed, try subscribing to peers", zap.String("filterId", filterID)) - peer, err := mgr.findPeerCandidate() - - if err == nil { - // Create sub placeholder in order to avoid potentially too many subs - tempID := uuid.NewString() - subs[tempID] = nil - go mgr.subscribeToFilter(filterID, peer, tempID) - } else { - mgr.logger.Error("filter resubscribe findPeer error", zap.Error(err)) - } + + // Create sub placeholder in order to avoid potentially too many subs + tempID := uuid.NewString() + subs[tempID] = nil + go mgr.subscribeToFilter(filterID, tempID) } }