Skip to content

Commit

Permalink
feat: use automatic peer selection for filter. (#4531)
Browse files Browse the repository at this point in the history
* feat: use automatic peer selection for filter.

* fix: remove sucess peers too.

* chore: remove filter manager state of peer candidates
  • Loading branch information
kaichaosun authored Jan 12, 2024
1 parent 250b8ee commit 0c474bb
Showing 1 changed file with 11 additions and 61 deletions.
72 changes: 11 additions & 61 deletions wakuv2/filter_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ package wakuv2

import (
"context"
"crypto/rand"
"errors"
"math/big"
"sync"
"time"

Expand All @@ -19,7 +16,6 @@ import (
node "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
)

Expand Down Expand Up @@ -63,7 +59,6 @@ type FilterManager struct {
isFilterSubAlive func(sub *subscription.SubscriptionDetails) error
getFilter func(string) *common.Filter
onNewEnvelopes func(env *protocol.Envelope) error
peers []peer.ID
logger *zap.Logger
settings settings
node *node.WakuNode
Expand All @@ -78,7 +73,6 @@ func newFilterManager(ctx context.Context, logger *zap.Logger, getFilterFn func(
mgr.onNewEnvelopes = onNewEnvelopes
mgr.filterSubs = make(FilterSubs)
mgr.eventChan = make(chan FilterEvent, 100)
mgr.peers = make([]peer.ID, 0)
mgr.settings = settings
mgr.node = node
mgr.isFilterSubAlive = func(sub *subscription.SubscriptionDetails) error {
Expand All @@ -96,15 +90,11 @@ func (mgr *FilterManager) runFilterLoop(wg *sync.WaitGroup) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

// Populate filter peers initially
mgr.peers = mgr.findFilterPeers() // ordered list of peers to select from

for {
select {
case <-mgr.ctx.Done():
return
case <-ticker.C:
mgr.peers = mgr.findFilterPeers()
mgr.pingPeers()
case ev := <-mgr.eventChan:
mgr.processEvents(&ev)
Expand Down Expand Up @@ -141,13 +131,7 @@ func (mgr *FilterManager) processEvents(ev *FilterEvent) {
mgr.resubscribe(ev.filterID)
break
}
// Remove peer from list
for i, p := range mgr.peers {
if ev.peerID == p {
mgr.peers = append(mgr.peers[:i], mgr.peers[i+1:]...)
break
}
}

// Delete subs for removed peer
for filterID, subs := range mgr.filterSubs {
for _, sub := range subs {
Expand Down Expand Up @@ -200,7 +184,7 @@ func (mgr *FilterManager) processEvents(ev *FilterEvent) {
}
}

func (mgr *FilterManager) subscribeToFilter(filterID string, peer peer.ID, tempID string) {
func (mgr *FilterManager) subscribeToFilter(filterID string, tempID string) {

logger := mgr.logger.With(zap.String("filterId", filterID))
f := mgr.getFilter(filterID)
Expand All @@ -210,17 +194,17 @@ func (mgr *FilterManager) subscribeToFilter(filterID string, peer peer.ID, tempI
return
}
contentFilter := mgr.buildContentFilter(f.PubsubTopic, f.ContentTopics)
logger.Debug("filter subscribe to filter node", zap.Stringer("peer", peer), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
logger.Debug("filter subscribe to filter node", zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
ctx, cancel := context.WithTimeout(mgr.ctx, requestTimeout)
defer cancel()

subDetails, err := mgr.node.FilterLightnode().Subscribe(ctx, contentFilter, filter.WithPeer(peer))
subDetails, err := mgr.node.FilterLightnode().Subscribe(ctx, contentFilter, filter.WithAutomaticPeerSelection())
var sub *subscription.SubscriptionDetails
if err != nil {
logger.Warn("filter could not add wakuv2 filter for peer", zap.Stringer("peer", peer), zap.Error(err))
logger.Warn("filter could not add wakuv2 filter for peers", zap.Error(err))
} else {
logger.Debug("filter subscription success", zap.Stringer("peer", peer), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
sub = subDetails[0]
logger.Debug("filter subscription success", zap.Stringer("peer", sub.PeerID), zap.String("pubsubTopic", contentFilter.PubsubTopic), zap.Strings("contentTopics", contentFilter.ContentTopicsList()))
}

success := err == nil
Expand Down Expand Up @@ -299,35 +283,6 @@ func (mgr *FilterManager) buildContentFilter(pubsubTopic string, contentTopicSet
return protocol.NewContentFilter(pubsubTopic, contentTopics...)
}

// Find suitable peer(s)
func (mgr *FilterManager) findFilterPeers() []peer.ID {
allPeers := mgr.node.Host().Peerstore().Peers()

peers := make([]peer.ID, 0)
for _, peer := range allPeers {
protocols, err := mgr.node.Host().Peerstore().SupportsProtocols(peer, filter.FilterSubscribeID_v20beta1, relay.WakuRelayID_v200)
if err != nil {
mgr.logger.Debug("SupportsProtocols error", zap.Error(err))
continue
}

if len(protocols) == 2 {
peers = append(peers, peer)
}
}

mgr.logger.Debug("Filtered peers", zap.Int("cnt", len(peers)))
return peers
}

func (mgr *FilterManager) findPeerCandidate() (peer.ID, error) {
if len(mgr.peers) == 0 {
return "", errors.New("filter could not select a suitable peer")
}
n, _ := rand.Int(rand.Reader, big.NewInt(int64(len(mgr.peers))))
return mgr.peers[n.Int64()], nil
}

func (mgr *FilterManager) resubscribe(filterID string) {
subs, found := mgr.filterSubs[filterID]
if !found {
Expand All @@ -344,16 +299,11 @@ func (mgr *FilterManager) resubscribe(filterID string) {
mgr.logger.Debug("filter resubscribe subs count:", zap.String("filterId", filterID), zap.Int("len", len(subs)))
for i := len(subs); i < mgr.settings.MinPeersForFilter; i++ {
mgr.logger.Debug("filter check not passed, try subscribing to peers", zap.String("filterId", filterID))
peer, err := mgr.findPeerCandidate()

if err == nil {
// Create sub placeholder in order to avoid potentially too many subs
tempID := uuid.NewString()
subs[tempID] = nil
go mgr.subscribeToFilter(filterID, peer, tempID)
} else {
mgr.logger.Error("filter resubscribe findPeer error", zap.Error(err))
}

// Create sub placeholder in order to avoid potentially too many subs
tempID := uuid.NewString()
subs[tempID] = nil
go mgr.subscribeToFilter(filterID, tempID)
}
}

Expand Down

0 comments on commit 0c474bb

Please sign in to comment.