Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract contentFilter and subscriptions out of filter to reuse in relay #779

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
in progress extract contentFilter outside filter package
chaitanyaprem committed Sep 27, 2023
commit e50789edb3be33b79ea8f876be916db7f300131b
9 changes: 5 additions & 4 deletions library/filter.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
)

@@ -15,16 +16,16 @@ type filterArgument struct {
ContentTopics []string `json:"contentTopics,omitempty"`
}

func toContentFilter(filterJSON string) (filter.ContentFilter, error) {
func toContentFilter(filterJSON string) (protocol.ContentFilter, error) {
var f filterArgument
err := json.Unmarshal([]byte(filterJSON), &f)
if err != nil {
return filter.ContentFilter{}, err
return protocol.ContentFilter{}, err
}

return filter.ContentFilter{
return protocol.ContentFilter{
PubsubTopic: f.PubsubTopic,
ContentTopics: filter.NewContentTopicSet(f.ContentTopics...),
ContentTopics: protocol.NewContentTopicSet(f.ContentTopics...),
}, nil
}

30 changes: 30 additions & 0 deletions waku/v2/protocol/content_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package protocol

import "golang.org/x/exp/maps"

type ContentTopicSet map[string]struct{}

func NewContentTopicSet(contentTopics ...string) ContentTopicSet {
s := make(ContentTopicSet, len(contentTopics))
for _, ct := range contentTopics {
s[ct] = struct{}{}
}
return s
}

// ContentFilter is used to specify the filter to be applied for a FilterNode.
// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding.
// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding)
// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm
type ContentFilter struct {
PubsubTopic string
ContentTopics ContentTopicSet
}

func (cf ContentFilter) ContentTopicsList() []string {
return maps.Keys(cf.ContentTopics)
}

func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter {
return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)}
}
44 changes: 13 additions & 31 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
@@ -23,7 +23,6 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

// FilterPushID_v20beta1 is the current Waku Filter protocol identifier used to allow
@@ -45,23 +44,6 @@ type WakuFilterLightNode struct {
pm *peermanager.PeerManager
}

// ContentFilter is used to specify the filter to be applied for a FilterNode.
// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding.
// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding)
// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm
type ContentFilter struct {
PubsubTopic string
ContentTopics ContentTopicSet
}

func (cf ContentFilter) ContentTopicsList() []string {
return maps.Keys(cf.ContentTopics)
}

func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter {
return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)}
}

type WakuFilterPushResult struct {
Err error
PeerID peer.ID
@@ -153,7 +135,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str
} else {
pubSubTopic = *messagePush.PubsubTopic
}
if !wf.subscriptions.Has(s.Conn().RemotePeer(), NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
if !wf.subscriptions.Has(s.Conn().RemotePeer(), protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
logger.Warn("received messagepush with invalid subscription parameters",
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic),
zap.String("contentTopic", messagePush.WakuMessage.ContentTopic))
@@ -181,7 +163,7 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string,
}

func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error {
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error {
conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
if err != nil {
wf.metrics.RecordError(dialFailure)
@@ -231,7 +213,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
}

// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics
func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]string, error) {
func contentFilterToPubSubTopicMap(contentFilter protocol.ContentFilter) (map[string][]string, error) {
pubSubTopicMap := make(map[string][]string)

if contentFilter.PubsubTopic != "" {
@@ -257,7 +239,7 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st
// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer.
// This may change if Filterv2 protocol is updated to handle such a scenario in a single request.
// Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics.
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) ([]*SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*SubscriptionDetails, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
@@ -295,9 +277,9 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont
failedContentTopics := []string{}
subscriptions := make([]*SubscriptionDetails, 0)
for pubSubTopic, cTopics := range pubSubTopicMap {
var cFilter ContentFilter
var cFilter protocol.ContentFilter
cFilter.PubsubTopic = pubSubTopic
cFilter.ContentTopics = NewContentTopicSet(cTopics...)
cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...)
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
@@ -315,7 +297,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont
}

// FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol
func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter protocol.ContentFilter) (*SubscriptionDetails, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
@@ -351,7 +333,7 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error {
ctx,
&FilterSubscribeParameters{selectedPeer: peerID, requestID: protocol.GenerateRequestID()},
pb.FilterSubscribeRequest_SUBSCRIBER_PING,
ContentFilter{})
protocol.ContentFilter{})
}

func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error {
@@ -387,7 +369,7 @@ func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails {
return output
}

func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) {
func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter protocol.ContentFilter) {
wf.subscriptions.Lock()
defer wf.subscriptions.Unlock()

@@ -416,7 +398,7 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte
}

// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
@@ -442,7 +424,7 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
}
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
for pTopic, cTopics := range pubSubTopicMap {
cFilter := NewContentFilter(pTopic, cTopics...)
cFilter := protocol.NewContentFilter(pTopic, cTopics...)
for peerID := range wf.subscriptions.items {
if params.selectedPeer != "" && peerID != params.selectedPeer {
continue
@@ -521,7 +503,7 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,

}

func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter ContentFilter) error {
func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter protocol.ContentFilter) error {
err := wf.request(ctx, params, pb.FilterSubscribeRequest_UNSUBSCRIBE, cFilter)
if err != nil {
ferr, ok := err.(*FilterError)
@@ -568,7 +550,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
ctx,
&FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL,
ContentFilter{})
protocol.ContentFilter{})
if err != nil {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
12 changes: 6 additions & 6 deletions waku/v2/protocol/filter/filter_test.go
Original file line number Diff line number Diff line change
@@ -40,7 +40,7 @@ type FilterTestSuite struct {
fullNode *WakuFilterFullNode
fullNodeHost host.Host
wg *sync.WaitGroup
contentFilter ContentFilter
contentFilter protocol.ContentFilter
subDetails []*SubscriptionDetails
log *zap.Logger
}
@@ -146,7 +146,7 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope)
}

func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*SubscriptionDetails {
s.contentFilter = ContentFilter{pubsubTopic, NewContentTopicSet(contentTopic)}
s.contentFilter = protocol.ContentFilter{PubsubTopic: pubsubTopic, ContentTopics: protocol.NewContentTopicSet(contentTopic)}

subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer))
s.Require().NoError(err)
@@ -378,7 +378,7 @@ func (s *FilterTestSuite) TestRunningGuard() {

s.lightNode.Stop()

contentFilter := ContentFilter{"test", NewContentTopicSet("test")}
contentFilter := protocol.ContentFilter{PubsubTopic: "test", ContentTopics: protocol.NewContentTopicSet("test")}

_, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))

@@ -394,7 +394,7 @@ func (s *FilterTestSuite) TestRunningGuard() {

func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() {

contentFilter := ContentFilter{"test", NewContentTopicSet("test")}
contentFilter := protocol.ContentFilter{PubsubTopic: "test", ContentTopics: protocol.NewContentTopicSet("test")}

_, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID()))
s.Require().NoError(err)
@@ -508,9 +508,9 @@ func (s *FilterTestSuite) TestAutoShard() {
s.Require().NoError(err)

}, s.subDetails[0].C)
_, err = s.lightNode.Unsubscribe(s.ctx, ContentFilter{
_, err = s.lightNode.Unsubscribe(s.ctx, protocol.ContentFilter{
PubsubTopic: s.testTopic,
ContentTopics: NewContentTopicSet(newContentTopic),
ContentTopics: protocol.NewContentTopicSet(newContentTopic),
})
s.Require().NoError(err)

15 changes: 3 additions & 12 deletions waku/v2/protocol/filter/subscribers_map.go
Original file line number Diff line number Diff line change
@@ -8,23 +8,14 @@ import (

"github.com/ethereum/go-ethereum/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
)

var ErrNotFound = errors.New("not found")

type ContentTopicSet map[string]struct{}

func NewContentTopicSet(contentTopics ...string) ContentTopicSet {
s := make(ContentTopicSet, len(contentTopics))
for _, ct := range contentTopics {
s[ct] = struct{}{}
}
return s
}

type PeerSet map[peer.ID]struct{}

type PubsubTopics map[string]ContentTopicSet // pubsubTopic => contentTopics
type PubsubTopics map[string]protocol.ContentTopicSet // pubsubTopic => contentTopics

type SubscribersMap struct {
sync.RWMutex
@@ -65,7 +56,7 @@ func (sub *SubscribersMap) Set(peerID peer.ID, pubsubTopic string, contentTopics

contentTopicsMap, ok := pubsubTopicMap[pubsubTopic]
if !ok {
contentTopicsMap = make(ContentTopicSet)
contentTopicsMap = make(protocol.ContentTopicSet)
}

for _, c := range contentTopics {
10 changes: 5 additions & 5 deletions waku/v2/protocol/filter/subscriptions_map.go
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ type SubscriptionDetails struct {
once sync.Once

PeerID peer.ID
ContentFilter ContentFilter
ContentFilter protocol.ContentFilter
C chan *protocol.Envelope
}

@@ -45,7 +45,7 @@ func NewSubscriptionMap(logger *zap.Logger) *SubscriptionsMap {
}
}

func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf ContentFilter) *SubscriptionDetails {
func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf protocol.ContentFilter) *SubscriptionDetails {
sub.Lock()
defer sub.Unlock()

@@ -68,7 +68,7 @@ func (sub *SubscriptionsMap) NewSubscription(peerID peer.ID, cf ContentFilter) *
mapRef: sub,
PeerID: peerID,
C: make(chan *protocol.Envelope, 1024),
ContentFilter: ContentFilter{cf.PubsubTopic, maps.Clone(cf.ContentTopics)},
ContentFilter: protocol.ContentFilter{PubsubTopic: cf.PubsubTopic, ContentTopics: maps.Clone(cf.ContentTopics)},
}

sub.items[peerID].subsPerPubsubTopic[cf.PubsubTopic][details.ID] = details
@@ -85,7 +85,7 @@ func (sub *SubscriptionsMap) IsSubscribedTo(peerID peer.ID) bool {
}

// Check if we have subscriptions for all (pubsubTopic, contentTopics[i]) pairs provided
func (sub *SubscriptionsMap) Has(peerID peer.ID, cf ContentFilter) bool {
func (sub *SubscriptionsMap) Has(peerID peer.ID, cf protocol.ContentFilter) bool {
sub.RLock()
defer sub.RUnlock()

@@ -174,7 +174,7 @@ func (s *SubscriptionDetails) Clone() *SubscriptionDetails {
mapRef: s.mapRef,
Closed: false,
PeerID: s.PeerID,
ContentFilter: ContentFilter{s.ContentFilter.PubsubTopic, maps.Clone(s.ContentFilter.ContentTopics)},
ContentFilter: protocol.ContentFilter{PubsubTopic: s.ContentFilter.PubsubTopic, ContentTopics: maps.Clone(s.ContentFilter.ContentTopics)},
C: make(chan *protocol.Envelope),
}

18 changes: 9 additions & 9 deletions waku/v2/protocol/filter/subscriptions_map_test.go
Original file line number Diff line number Diff line change
@@ -15,9 +15,9 @@ import (
func TestSubscriptionMapAppend(t *testing.T) {
fmap := NewSubscriptionMap(utils.Logger())
peerID := createPeerID(t)
contentTopics := NewContentTopicSet("ct1", "ct2")
contentTopics := protocol.NewContentTopicSet("ct1", "ct2")

sub := fmap.NewSubscription(peerID, ContentFilter{PUBSUB_TOPIC, contentTopics})
sub := fmap.NewSubscription(peerID, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC, ContentTopics: contentTopics})
_, found := sub.ContentFilter.ContentTopics["ct1"]
require.True(t, found)
_, found = sub.ContentFilter.ContentTopics["ct2"]
@@ -44,12 +44,12 @@ func TestSubscriptionMapAppend(t *testing.T) {

func TestSubscriptionClear(t *testing.T) {
fmap := NewSubscriptionMap(utils.Logger())
contentTopics := NewContentTopicSet("ct1", "ct2")
contentTopics := protocol.NewContentTopicSet("ct1", "ct2")

var subscriptions = []*SubscriptionDetails{
fmap.NewSubscription(createPeerID(t), ContentFilter{PUBSUB_TOPIC + "1", contentTopics}),
fmap.NewSubscription(createPeerID(t), ContentFilter{PUBSUB_TOPIC + "2", contentTopics}),
fmap.NewSubscription(createPeerID(t), ContentFilter{PUBSUB_TOPIC + "3", contentTopics}),
fmap.NewSubscription(createPeerID(t), protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "1", ContentTopics: contentTopics}),
fmap.NewSubscription(createPeerID(t), protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "2", ContentTopics: contentTopics}),
fmap.NewSubscription(createPeerID(t), protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "3", ContentTopics: contentTopics}),
}

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
@@ -84,9 +84,9 @@ func TestSubscriptionsNotify(t *testing.T) {
p1 := createPeerID(t)
p2 := createPeerID(t)
var subscriptions = []*SubscriptionDetails{
fmap.NewSubscription(p1, ContentFilter{PUBSUB_TOPIC + "1", NewContentTopicSet("ct1", "ct2")}),
fmap.NewSubscription(p2, ContentFilter{PUBSUB_TOPIC + "1", NewContentTopicSet("ct1")}),
fmap.NewSubscription(p1, ContentFilter{PUBSUB_TOPIC + "2", NewContentTopicSet("ct1", "ct2")}),
fmap.NewSubscription(p1, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "1", ContentTopics: protocol.NewContentTopicSet("ct1", "ct2")}),
fmap.NewSubscription(p2, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "1", ContentTopics: protocol.NewContentTopicSet("ct1")}),
fmap.NewSubscription(p1, protocol.ContentFilter{PubsubTopic: PUBSUB_TOPIC + "2", ContentTopics: protocol.NewContentTopicSet("ct1", "ct2")}),
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
Loading