Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract contentFilter and subscriptions out of filter to reuse in relay #779

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions examples/chat2/chat.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node.
}

if options.Filter.Enable {
cf := filter.ContentFilter{
cf := protocol.ContentFilter{
PubsubTopic: relay.DefaultWakuTopic,
ContentTopics: filter.NewContentTopicSet(options.ContentTopic),
ContentTopics: protocol.NewContentTopicSet(options.ContentTopic),
}
var filterOpt filter.FilterSubscribeOption
peerID, err := options.Filter.NodePeerID()
Expand Down Expand Up @@ -269,7 +269,7 @@ func (c *Chat) SendMessage(line string) {
err := c.publish(tCtx, line)
if err != nil {
if err.Error() == "validation failed" {
err = errors.New("message rate violation!")
err = errors.New("message rate violation")
}
c.ui.ErrorMessage(err)
}
Expand Down Expand Up @@ -524,7 +524,7 @@ func (c *Chat) discoverNodes(connectionWg *sync.WaitGroup) {

ctx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second)
defer cancel()
err = c.node.DialPeerWithInfo(ctx, n)
err = c.node.DialPeerWithInfo(ctx, info)
if err != nil {

c.ui.ErrorMessage(fmt.Errorf("co!!uld not connect to %s: %w", info.ID.Pretty(), err))
Expand Down
4 changes: 2 additions & 2 deletions examples/filter2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ func main() {
}

// Send FilterRequest from light node to full node
cf := filter.ContentFilter{
ContentTopics: filter.NewContentTopicSet(contentTopic),
cf := protocol.ContentFilter{
ContentTopics: protocol.NewContentTopicSet(contentTopic),
}

theFilter, err := lightNode.FilterLightnode().Subscribe(ctx, cf)
Expand Down
16 changes: 9 additions & 7 deletions library/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,30 +7,32 @@ import (
"time"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
)

type filterArgument struct {
PubsubTopic string `json:"pubsubTopic,omitempty"`
ContentTopics []string `json:"contentTopics,omitempty"`
}

func toContentFilter(filterJSON string) (filter.ContentFilter, error) {
func toContentFilter(filterJSON string) (protocol.ContentFilter, error) {
var f filterArgument
err := json.Unmarshal([]byte(filterJSON), &f)
if err != nil {
return filter.ContentFilter{}, err
return protocol.ContentFilter{}, err
}

return filter.ContentFilter{
return protocol.ContentFilter{
PubsubTopic: f.PubsubTopic,
ContentTopics: filter.NewContentTopicSet(f.ContentTopics...),
ContentTopics: protocol.NewContentTopicSet(f.ContentTopics...),
}, nil
}

type subscribeResult struct {
Subscriptions []*filter.SubscriptionDetails `json:"subscriptions"`
Error string `json:"error,omitempty"`
Subscriptions []*subscription.SubscriptionDetails `json:"subscriptions"`
Error string `json:"error,omitempty"`
}

// FilterSubscribe is used to create a subscription to a filter node to receive messages
Expand Down Expand Up @@ -71,7 +73,7 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) {
}

for _, subscriptionDetails := range subscriptions {
go func(subscriptionDetails *filter.SubscriptionDetails) {
go func(subscriptionDetails *subscription.SubscriptionDetails) {
for envelope := range subscriptionDetails.C {
send("message", toSubscriptionMessage(envelope))
}
Expand Down
30 changes: 30 additions & 0 deletions waku/v2/protocol/content_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package protocol

import "golang.org/x/exp/maps"

type ContentTopicSet map[string]struct{}

func NewContentTopicSet(contentTopics ...string) ContentTopicSet {
s := make(ContentTopicSet, len(contentTopics))
for _, ct := range contentTopics {
s[ct] = struct{}{}
}
return s
}

// ContentFilter is used to specify the filter to be applied for a FilterNode.
// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding.
// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding)
// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm
type ContentFilter struct {
PubsubTopic string
ContentTopics ContentTopicSet
}

func (cf ContentFilter) ContentTopicsList() []string {
return maps.Keys(cf.ContentTopics)
}

func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter {
return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)}
}
88 changes: 36 additions & 52 deletions waku/v2/protocol/filter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/filter/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/subscription"
"github.com/waku-org/go-waku/waku/v2/timesource"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

// FilterPushID_v20beta1 is the current Waku Filter protocol identifier used to allow
Expand All @@ -41,27 +41,10 @@ type WakuFilterLightNode struct {
timesource timesource.Timesource
metrics Metrics
log *zap.Logger
subscriptions *SubscriptionsMap
subscriptions *subscription.SubscriptionsMap
pm *peermanager.PeerManager
}

// ContentFilter is used to specify the filter to be applied for a FilterNode.
// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding.
// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding)
// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm
type ContentFilter struct {
PubsubTopic string
ContentTopics ContentTopicSet
}

func (cf ContentFilter) ContentTopicsList() []string {
return maps.Keys(cf.ContentTopics)
}

func NewContentFilter(pubsubTopic string, contentTopics ...string) ContentFilter {
return ContentFilter{pubsubTopic, NewContentTopicSet(contentTopics...)}
}

type WakuFilterPushResult struct {
Err error
PeerID peer.ID
Expand Down Expand Up @@ -95,7 +78,7 @@ func (wf *WakuFilterLightNode) Start(ctx context.Context) error {
}

func (wf *WakuFilterLightNode) start() error {
wf.subscriptions = NewSubscriptionMap(wf.log)
wf.subscriptions = subscription.NewSubscriptionMap(wf.log)
wf.h.SetStreamHandlerMatch(FilterPushID_v20beta1, protocol.PrefixTextMatch(string(FilterPushID_v20beta1)), wf.onRequest(wf.Context()))

wf.log.Info("filter-push protocol started")
Expand Down Expand Up @@ -153,7 +136,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str
} else {
pubSubTopic = *messagePush.PubsubTopic
}
if !wf.subscriptions.Has(s.Conn().RemotePeer(), NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
if !wf.subscriptions.Has(s.Conn().RemotePeer(), protocol.NewContentFilter(pubSubTopic, messagePush.WakuMessage.ContentTopic)) {
logger.Warn("received messagepush with invalid subscription parameters",
logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic),
zap.String("contentTopic", messagePush.WakuMessage.ContentTopic))
Expand Down Expand Up @@ -181,7 +164,7 @@ func (wf *WakuFilterLightNode) notify(remotePeerID peer.ID, pubsubTopic string,
}

func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscribeParameters,
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter ContentFilter) error {
reqType pb.FilterSubscribeRequest_FilterSubscribeType, contentFilter protocol.ContentFilter) error {
conn, err := wf.h.NewStream(ctx, params.selectedPeer, FilterSubscribeID_v20beta1)
if err != nil {
wf.metrics.RecordError(dialFailure)
Expand Down Expand Up @@ -231,7 +214,7 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr
}

// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics
func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]string, error) {
func contentFilterToPubSubTopicMap(contentFilter protocol.ContentFilter) (map[string][]string, error) {
pubSubTopicMap := make(map[string][]string)

if contentFilter.PubsubTopic != "" {
Expand All @@ -257,7 +240,7 @@ func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]st
// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer.
// This may change if Filterv2 protocol is updated to handle such a scenario in a single request.
// Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics.
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) ([]*SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) ([]*subscription.SubscriptionDetails, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down Expand Up @@ -293,11 +276,11 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont
return nil, err
}
failedContentTopics := []string{}
subscriptions := make([]*SubscriptionDetails, 0)
subscriptions := make([]*subscription.SubscriptionDetails, 0)
for pubSubTopic, cTopics := range pubSubTopicMap {
var cFilter ContentFilter
var cFilter protocol.ContentFilter
cFilter.PubsubTopic = pubSubTopic
cFilter.ContentTopics = NewContentTopicSet(cTopics...)
cFilter.ContentTopics = protocol.NewContentTopicSet(cTopics...)
err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter)
if err != nil {
wf.log.Error("Failed to subscribe", zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics),
Expand All @@ -315,7 +298,7 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont
}

// FilterSubscription is used to obtain an object from which you could receive messages received via filter protocol
func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter ContentFilter) (*SubscriptionDetails, error) {
func (wf *WakuFilterLightNode) FilterSubscription(peerID peer.ID, contentFilter protocol.ContentFilter) (*subscription.SubscriptionDetails, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down Expand Up @@ -351,10 +334,10 @@ func (wf *WakuFilterLightNode) Ping(ctx context.Context, peerID peer.ID) error {
ctx,
&FilterSubscribeParameters{selectedPeer: peerID, requestID: protocol.GenerateRequestID()},
pb.FilterSubscribeRequest_SUBSCRIBER_PING,
ContentFilter{})
protocol.ContentFilter{})
}

func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *SubscriptionDetails) error {
func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscription *subscription.SubscriptionDetails) error {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand All @@ -364,7 +347,7 @@ func (wf *WakuFilterLightNode) IsSubscriptionAlive(ctx context.Context, subscrip
return wf.Ping(ctx, subscription.PeerID)
}

func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails {
func (wf *WakuFilterLightNode) Subscriptions() []*subscription.SubscriptionDetails {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand All @@ -374,10 +357,10 @@ func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails {
wf.subscriptions.RLock()
defer wf.subscriptions.RUnlock()

var output []*SubscriptionDetails
var output []*subscription.SubscriptionDetails

for _, peerSubscription := range wf.subscriptions.items {
for _, subscriptions := range peerSubscription.subsPerPubsubTopic {
for _, peerSubscription := range wf.subscriptions.Items {
for _, subscriptions := range peerSubscription.SubsPerPubsubTopic {
for _, subscriptionDetail := range subscriptions {
output = append(output, subscriptionDetail)
}
Expand All @@ -387,16 +370,16 @@ func (wf *WakuFilterLightNode) Subscriptions() []*SubscriptionDetails {
return output
}

func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter ContentFilter) {
func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilter protocol.ContentFilter) {
wf.subscriptions.Lock()
defer wf.subscriptions.Unlock()

peerSubscription, ok := wf.subscriptions.items[peerID]
peerSubscription, ok := wf.subscriptions.Items[peerID]
if !ok {
return
}

subscriptionDetailList, ok := peerSubscription.subsPerPubsubTopic[contentFilter.PubsubTopic]
subscriptionDetailList, ok := peerSubscription.SubsPerPubsubTopic[contentFilter.PubsubTopic]
if !ok {
return
}
Expand All @@ -405,18 +388,18 @@ func (wf *WakuFilterLightNode) cleanupSubscriptions(peerID peer.ID, contentFilte
subscriptionDetail.Remove(contentFilter.ContentTopicsList()...)
if len(subscriptionDetail.ContentFilter.ContentTopics) == 0 {
delete(subscriptionDetailList, subscriptionDetailID)
subscriptionDetail.closeC()
subscriptionDetail.CloseC()
}
}

if len(subscriptionDetailList) == 0 {
delete(wf.subscriptions.items[peerID].subsPerPubsubTopic, contentFilter.PubsubTopic)
delete(wf.subscriptions.Items[peerID].SubsPerPubsubTopic, contentFilter.PubsubTopic)
}

}

// Unsubscribe is used to stop receiving messages from a peer that match a content filter
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter protocol.ContentFilter, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand All @@ -440,22 +423,22 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
if err != nil {
return nil, err
}
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.Items))
for pTopic, cTopics := range pubSubTopicMap {
cFilter := NewContentFilter(pTopic, cTopics...)
for peerID := range wf.subscriptions.items {
cFilter := protocol.NewContentFilter(pTopic, cTopics...)
for peerID := range wf.subscriptions.Items {
if params.selectedPeer != "" && peerID != params.selectedPeer {
continue
}

subscriptions, ok := wf.subscriptions.items[peerID]
subscriptions, ok := wf.subscriptions.Items[peerID]
if !ok || subscriptions == nil {
continue
}

wf.cleanupSubscriptions(peerID, cFilter)
if len(subscriptions.subsPerPubsubTopic) == 0 {
delete(wf.subscriptions.items, peerID)
if len(subscriptions.SubsPerPubsubTopic) == 0 {
delete(wf.subscriptions.Items, peerID)
}

if params.wg != nil {
Expand Down Expand Up @@ -491,7 +474,8 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co
// UnsubscribeWithSubscription is used to close a particular subscription
// If there are no more subscriptions matching the passed [peer, contentFilter] pair,
// server unsubscribe is also performed
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *SubscriptionDetails, opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context, sub *subscription.SubscriptionDetails,
opts ...FilterSubscribeOption) (<-chan WakuFilterPushResult, error) {
wf.RLock()
defer wf.RUnlock()
if err := wf.ErrOnNotRunning(); err != nil {
Expand Down Expand Up @@ -521,7 +505,7 @@ func (wf *WakuFilterLightNode) UnsubscribeWithSubscription(ctx context.Context,

}

func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter ContentFilter) error {
func (wf *WakuFilterLightNode) unsubscribeFromServer(ctx context.Context, params *FilterSubscribeParameters, cFilter protocol.ContentFilter) error {
err := wf.request(ctx, params, pb.FilterSubscribeRequest_UNSUBSCRIBE, cFilter)
if err != nil {
ferr, ok := err.(*FilterError)
Expand All @@ -544,14 +528,14 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
wf.subscriptions.Lock()
defer wf.subscriptions.Unlock()

resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items))
resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.Items))

for peerID := range wf.subscriptions.items {
for peerID := range wf.subscriptions.Items {
if params.selectedPeer != "" && peerID != params.selectedPeer {
continue
}

delete(wf.subscriptions.items, peerID)
delete(wf.subscriptions.Items, peerID)

if params.wg != nil {
params.wg.Add(1)
Expand All @@ -568,7 +552,7 @@ func (wf *WakuFilterLightNode) unsubscribeAll(ctx context.Context, opts ...Filte
ctx,
&FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID},
pb.FilterSubscribeRequest_UNSUBSCRIBE_ALL,
ContentFilter{})
protocol.ContentFilter{})
if err != nil {
wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err))
}
Expand Down
Loading