diff --git a/examples/chat2/chat.go b/examples/chat2/chat.go index c2cad29ca..23a57f344 100644 --- a/examples/chat2/chat.go +++ b/examples/chat2/chat.go @@ -75,11 +75,11 @@ func NewChat(ctx context.Context, node *node.WakuNode, connNotifier <-chan node. filterOpt = filter.WithPeer(peerID) chat.ui.InfoMessage(fmt.Sprintf("Subscribing to filter node %s", peerID)) } - theFilter, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt) + theFilters, err := node.FilterLightnode().Subscribe(ctx, cf, filterOpt) if err != nil { chat.ui.ErrorMessage(err) } else { - chat.C = theFilter.C + chat.C = theFilters[0].C //Picking first subscription since there is only 1 contentTopic specified. } } else { diff --git a/examples/filter2/README.md b/examples/filter2/README.md index c78c67785..46a3626b7 100644 --- a/examples/filter2/README.md +++ b/examples/filter2/README.md @@ -21,28 +21,20 @@ The app will run 2 nodes ("full" node and "light" node), with light node subscri ## Flow description -1. Light node submits a FilterRequest through WakuNode.SubscribeFilter. This request is submitted to a particular peer. -Filter is stored in WakuNode.filters map. That's it. -DONE -2. Full node: we read incoming messages in WakuFilter.onRequest(). It is set as a stream handler on wakunode.Host for WakuFilterProtocolId. -3. In WakuFilter.onRequest(): - 3.1. We check whether it's a MessagePush or FilterRequest. - 3.2. If it's a MessagePush, then we're on a light node. Invoke pushHandler coming from WakuNode.mountFilter() - 3.3. If it's a FilterRequest, add a subscriber. -4. WakuNode.Subscribe has a message loop extracting WakuMessages from a wakurelay.Subscription object. -It denotes a pubsub topic subscription. -All envelopes are then submitted to node.broadcaster. - - -## Nim code flow -1. Light node: WakuFilter.subscribe(). Find a peer, wrileLP(FilterRequest). Store requestId in WakuNode.filters along with a ContentFilterHandler proc. -2. Full node: WakuFilter inherits LPProtocol. LPProtocol.handler invokes readLP() to read FilterRPC messages -3. this handler function has a signature (conn: Connection, proto: string). - 3.1. it checks whether a MessagePush or FilterRequest is received. - 3.2. (light node) if it's a MessagePush, then we're on a light node. Invoke pushHandler of MessagePushHandler type. This pushHandler comes from WakuNode.mountFilter(). It iterates through all registered WakuNode.filters (stored in step 1) and invokes their ContentFilterHandler proc. - 3.3. (full node) if it's a FilterRequest, create a Subscriber and add to WakuFilter.subscribers seq -4. (full node) Each time a message is received through GossipSub in wakunode.subscribe.defaultHandler(), we iterate through subscriptions. -5. (full node) One of these subscriptions is a filter subscription added by WakuNode.mountFilter(), which in turn is returned from WakuFilter.subscription() -6. (full node) This subscription iterates through subscribers added by WakuFilter.handler() fn (subscribers being light nodes) -7. (full node) Once subscriber peer is found, a message is pushed directly to the peer (go to step 3.2) - +### Light Node +1. A light node is created with option WithWakuFilterLightNode. +2. Starting this node sets stream handler on wakunode.Host for WakuFilterProtocolId. +3. Light node submits a FilterSubscribeRequest through WakuFilterLightNode.Subscribe. This request is submitted to a particular peer. +Filter is stored in WakuFilterLightNode.subscriptions map. That's it. +4. Now we wait on WakuFilterLightNode.onRequest to process any further messages. +5. On receiving a message check and notify all subscribers on relevant channel (which is part of subscription obbject). +6. If a broadcaster is specified, + WakuNode.Subscribe has a message loop extracting WakuMessages from a wakurelay.Subscription object.It denotes a pubsub topic subscription.All envelopes are then submitted to node.broadcaster. +### Full Node +1. Full node is created with option WithWakuFilterFullNode. +2. We read incoming messages in WithWakuFilterFullNode.onRequest(). It is set as a stream handler on wakunode.Host for WakuFilterProtocolId. +3. In WakuFilter.onRequest + * We check the type of FilterRequest and handle accordingly. + * If it's a FilterRequest for subscribe, add a subscriber. + * If it is a SubscriberPing request, check if subscriptions exists or not and respond accordingly. + * If it is an unsubscribe/unsubscribeAll request, check and remove relevant subscriptions. diff --git a/examples/filter2/main.go b/examples/filter2/main.go index fd31b77ef..c35584ea9 100644 --- a/examples/filter2/main.go +++ b/examples/filter2/main.go @@ -26,7 +26,7 @@ var log = logging.Logger("filter2") var pubSubTopic = protocol.DefaultPubsubTopic() -const contentTopic = "test" +const contentTopic = "/filter2test/1/testTopic/proto" func main() { hostAddr1, _ := net.ResolveTCPAddr("tcp", "0.0.0.0:60000") @@ -98,7 +98,6 @@ func main() { // Send FilterRequest from light node to full node cf := filter.ContentFilter{ - PubsubTopic: pubSubTopic.String(), ContentTopics: filter.NewContentTopicSet(contentTopic), } @@ -108,7 +107,7 @@ func main() { } go func() { - for env := range theFilter.C { + for env := range theFilter[0].C { //Safely picking first subscriptions since only 1 contentTopic is subscribed log.Info("Light node received msg, ", string(env.Message().Payload)) } log.Info("Message channel closed!") diff --git a/library/c/README.md b/library/c/README.md index 6b837afdd..227bb54e3 100644 --- a/library/c/README.md +++ b/library/c/README.md @@ -88,7 +88,7 @@ The criteria to create subscription to a filter full node in JSON Format: Fields: - `contentTopics`: Array of content topics. -- `topic`: pubsub topic. +- `topic`: Optional pubsub topic when using contentTopics as per Autosharding. In case of named or static-sharding, pubSub topic is mandatory. ### `LegacyFilterSubscription` type @@ -884,15 +884,21 @@ Creates a subscription to a filter full node matching a content filter.. A status code. Refer to the [`Status codes`](#status-codes) section for possible values. -If the function is executed succesfully, `onOkCb` will receive the subscription details. +If the function is executed succesfully, `onOkCb` will receive the following subscription details along with any partial errors. For example: ```json { - "peerID": "....", - "pubsubTopic": "...", - "contentTopics": [...] + "subscriptions" : [ + { + "ID": "", + "peerID": "....", + "pubsubTopic": "...", + "contentTopics": [...] + } + ], + "error" : "subscriptions failed for contentTopics:,.." // Empty if all subscriptions are succesful } ``` diff --git a/library/c/api_filter.go b/library/c/api_filter.go index f4279c382..4a7496990 100644 --- a/library/c/api_filter.go +++ b/library/c/api_filter.go @@ -10,14 +10,14 @@ import "github.com/waku-org/go-waku/library" // filterJSON must contain a JSON with this format: // // { -// "pubsubTopic": "the pubsub topic" // mandatory +// "pubsubTopic": "the pubsub topic" // optional if using autosharding, mandatory if using static or named sharding. // "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10 // } // // peerID should contain the ID of a peer supporting the filter protocol. Use NULL to automatically select a node // If ms is greater than 0, the subscription must happen before the timeout // (in milliseconds) is reached, or an error will be returned -// It returns a json object containing the peerID to which we are subscribed to and the details of the subscription +// It returns a json object containing the details of the subscriptions along with any errors in case of partial failures // //export waku_filter_subscribe func waku_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int, onOkCb C.WakuCallBack, onErrCb C.WakuCallBack) C.int { @@ -42,7 +42,7 @@ func waku_filter_ping(peerID *C.char, ms C.int, onErrCb C.WakuCallBack) C.int { // criteria // // { -// "pubsubTopic": "the pubsub topic" // mandatory +// "pubsubTopic": "the pubsub topic" // optional if using autosharding, mandatory if using static or named sharding. // "contentTopics": ["the content topic"] // mandatory, at least one required, with a max of 10 // } // diff --git a/library/filter.go b/library/filter.go index 5c6cadc2c..a66a6fbef 100644 --- a/library/filter.go +++ b/library/filter.go @@ -28,6 +28,11 @@ func toContentFilter(filterJSON string) (filter.ContentFilter, error) { }, nil } +type subscribeResult struct { + Subscriptions []*filter.SubscriptionDetails `json:"subscriptions"` + Error string `json:"error,omitempty"` +} + // FilterSubscribe is used to create a subscription to a filter node to receive messages func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) { cf, err := toContentFilter(filterJSON) @@ -60,18 +65,22 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) { fOptions = append(fOptions, filter.WithAutomaticPeerSelection()) } - subscriptionDetails, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...) - if err != nil { + subscriptions, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...) + if err != nil && subscriptions == nil { return "", err } - go func(subscriptionDetails *filter.SubscriptionDetails) { - for envelope := range subscriptionDetails.C { - send("message", toSubscriptionMessage(envelope)) - } - }(subscriptionDetails) - - return marshalJSON(subscriptionDetails) + for _, subscriptionDetails := range subscriptions { + go func(subscriptionDetails *filter.SubscriptionDetails) { + for envelope := range subscriptionDetails.C { + send("message", toSubscriptionMessage(envelope)) + } + }(subscriptionDetails) + } + var subResult subscribeResult + subResult.Subscriptions = subscriptions + subResult.Error = err.Error() + return marshalJSON(subResult) } // FilterPing is used to determine if a peer has an active subscription diff --git a/waku/v2/protocol/filter/client.go b/waku/v2/protocol/filter/client.go index 5a074d260..a07469a3a 100644 --- a/waku/v2/protocol/filter/client.go +++ b/waku/v2/protocol/filter/client.go @@ -7,6 +7,7 @@ import ( "fmt" "math" "net/http" + "strings" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -44,6 +45,10 @@ type WakuFilterLightNode struct { pm *peermanager.PeerManager } +// ContentFilter is used to specify the filter to be applied for a FilterNode. +// Topic means pubSubTopic - optional in case of using contentTopics that following Auto sharding, mandatory in case of named or static sharding. +// ContentTopics - Specify list of content topics to be filtered under a pubSubTopic (for named and static sharding), or a list of contentTopics (in case ofAuto sharding) +// If pubSub topic is not specified, then content-topics are used to derive the shard and corresponding pubSubTopic using autosharding algorithm type ContentFilter struct { PubsubTopic string ContentTopics ContentTopicSet @@ -117,7 +122,6 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str return func(s network.Stream) { defer s.Close() logger := wf.log.With(logging.HostID("peer", s.Conn().RemotePeer())) - if !wf.subscriptions.IsSubscribedTo(s.Conn().RemotePeer()) { logger.Warn("received message push from unknown peer", logging.HostID("peerID", s.Conn().RemotePeer())) wf.metrics.RecordError(unknownPeerMessagePush) @@ -133,15 +137,21 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str wf.metrics.RecordError(decodeRPCFailure) return } + pubSubTopic := "" //For now returning failure, this will get addressed with autosharding changes for filter. if messagePush.PubsubTopic == nil { - logger.Error("empty pubsub topic") - wf.metrics.RecordError(decodeRPCFailure) - return + pubSubTopic, err = getPubSubTopicFromContentTopic(messagePush.WakuMessage.ContentTopic) + if err != nil { + logger.Error("could not derive pubSubTopic from contentTopic", zap.Error(err)) + wf.metrics.RecordError(decodeRPCFailure) + return + } + } else { + pubSubTopic = *messagePush.PubsubTopic } - if !wf.subscriptions.Has(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage.ContentTopic) { + if !wf.subscriptions.Has(s.Conn().RemotePeer(), pubSubTopic, messagePush.WakuMessage.ContentTopic) { logger.Warn("received messagepush with invalid subscription parameters", - logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", *messagePush.PubsubTopic), + logging.HostID("peerID", s.Conn().RemotePeer()), zap.String("topic", pubSubTopic), zap.String("contentTopic", messagePush.WakuMessage.ContentTopic)) wf.metrics.RecordError(invalidSubscriptionMessage) return @@ -149,7 +159,7 @@ func (wf *WakuFilterLightNode) onRequest(ctx context.Context) func(s network.Str wf.metrics.RecordMessage() - wf.notify(s.Conn().RemotePeer(), *messagePush.PubsubTopic, messagePush.WakuMessage) + wf.notify(s.Conn().RemotePeer(), pubSubTopic, messagePush.WakuMessage) logger.Info("received message push") } @@ -200,7 +210,6 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr wf.metrics.RecordError(decodeRPCFailure) return err } - if filterSubscribeResponse.RequestId != request.RequestId { wf.log.Error("requestID mismatch", zap.String("expected", request.RequestId), zap.String("received", filterSubscribeResponse.RequestId)) wf.metrics.RecordError(requestIDMismatch) @@ -217,18 +226,50 @@ func (wf *WakuFilterLightNode) request(ctx context.Context, params *FilterSubscr return nil } +func getPubSubTopicFromContentTopic(cTopicString string) (string, error) { + cTopic, err := protocol.StringToContentTopic(cTopicString) + if err != nil { + return "", fmt.Errorf("%s : %s", err.Error(), cTopicString) + } + pTopic := protocol.GetShardFromContentTopic(cTopic, protocol.GenerationZeroShardsCount) + + return pTopic.String(), nil +} + +// This function converts a contentFilter into a map of pubSubTopics and corresponding contentTopics +func contentFilterToPubSubTopicMap(contentFilter ContentFilter) (map[string][]string, error) { + pubSubTopicMap := make(map[string][]string) + + if contentFilter.PubsubTopic != "" { + pubSubTopicMap[contentFilter.PubsubTopic] = contentFilter.ContentTopicsList() + } else { + //Parse the content-Topics to figure out shards. + for _, cTopicString := range contentFilter.ContentTopicsList() { + pTopicStr, err := getPubSubTopicFromContentTopic(cTopicString) + if err != nil { + return nil, err + } + _, ok := pubSubTopicMap[pTopicStr] + if !ok { + pubSubTopicMap[pTopicStr] = []string{} + } + pubSubTopicMap[pTopicStr] = append(pubSubTopicMap[pTopicStr], cTopicString) + } + } + return pubSubTopicMap, nil +} + // Subscribe setups a subscription to receive messages that match a specific content filter -func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) (*SubscriptionDetails, error) { +// If contentTopics passed result in different pubSub topics (due to Auto/Static sharding), then multiple subscription requests are sent to the peer. +// This may change if Filterv2 protocol is updated to handle such a scenario in a single request. +// Note: In case of partial failure, results are returned for successful subscriptions along with error indicating failed contentTopics. +func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter ContentFilter, opts ...FilterSubscribeOption) ([]*SubscriptionDetails, error) { wf.RLock() defer wf.RUnlock() if err := wf.ErrOnNotRunning(); err != nil { return nil, err } - if contentFilter.PubsubTopic == "" { - return nil, errors.New("pubsub topic is required") - } - if len(contentFilter.ContentTopics) == 0 { return nil, errors.New("at least one content topic is required") } @@ -253,15 +294,36 @@ func (wf *WakuFilterLightNode) Subscribe(ctx context.Context, contentFilter Cont return nil, ErrNoPeersAvailable } - existingSub := wf.subscriptions.Get(params.selectedPeer, contentFilter) - if existingSub != nil { - return existingSub, nil - } else { - err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, contentFilter) - if err != nil { - return nil, err + pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter) + if err != nil { + return nil, err + } + failedContentTopics := []string{} + subscriptions := make([]*SubscriptionDetails, 0) + for pubSubTopic, cTopics := range pubSubTopicMap { + var cFilter ContentFilter + cFilter.PubsubTopic = pubSubTopic + cFilter.ContentTopics = NewContentTopicSet(cTopics...) + existingSub := wf.subscriptions.Get(params.selectedPeer, contentFilter) + if existingSub != nil { + subscriptions = append(subscriptions, existingSub) + } else { + //TO OPTIMIZE: Should we parallelize these, if so till how many batches? + err := wf.request(ctx, params, pb.FilterSubscribeRequest_SUBSCRIBE, cFilter) + if err != nil { + wf.log.Error("Failed to subscribe for conentTopics ", + zap.String("pubSubTopic", pubSubTopic), zap.Strings("contentTopics", cTopics), + zap.Error(err)) + failedContentTopics = append(failedContentTopics, cTopics...) + } + subscriptions = append(subscriptions, wf.subscriptions.NewSubscription(params.selectedPeer, cFilter)) } - return wf.subscriptions.NewSubscription(params.selectedPeer, contentFilter), nil + } + + if len(failedContentTopics) > 0 { + return subscriptions, fmt.Errorf("subscriptions failed for contentTopics: %s", strings.Join(failedContentTopics, ",")) + } else { + return subscriptions, nil } } @@ -377,10 +439,6 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co return nil, err } - if contentFilter.PubsubTopic == "" { - return nil, errors.New("pubsub topic is required") - } - if len(contentFilter.ContentTopics) == 0 { return nil, errors.New("at least one content topic is required") } @@ -394,57 +452,66 @@ func (wf *WakuFilterLightNode) Unsubscribe(ctx context.Context, contentFilter Co return nil, err } - resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) - for peerID := range wf.subscriptions.items { - if params.selectedPeer != "" && peerID != params.selectedPeer { - continue - } + pubSubTopicMap, err := contentFilterToPubSubTopicMap(contentFilter) + if err != nil { + return nil, err + } - subscriptions, ok := wf.subscriptions.items[peerID] - if !ok || subscriptions == nil { - continue - } + resultChan := make(chan WakuFilterPushResult, len(wf.subscriptions.items)) + for pTopic, cTopics := range pubSubTopicMap { + var cFilter ContentFilter + cFilter.PubsubTopic = pTopic + cFilter.ContentTopics = NewContentTopicSet(cTopics...) + for peerID := range wf.subscriptions.items { + if params.selectedPeer != "" && peerID != params.selectedPeer { + continue + } - wf.cleanupSubscriptions(peerID, contentFilter) - if len(subscriptions.subsPerPubsubTopic) == 0 { - delete(wf.subscriptions.items, peerID) - } + subscriptions, ok := wf.subscriptions.items[peerID] + if !ok || subscriptions == nil { + continue + } - if params.wg != nil { - params.wg.Add(1) - } + wf.cleanupSubscriptions(peerID, cFilter) + if len(subscriptions.subsPerPubsubTopic) == 0 { + delete(wf.subscriptions.items, peerID) + } - go func(peerID peer.ID) { - defer func() { - if params.wg != nil { - params.wg.Done() - } - }() + if params.wg != nil { + params.wg.Add(1) + } - err := wf.request( - ctx, - &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, - pb.FilterSubscribeRequest_UNSUBSCRIBE, - contentFilter) - if err != nil { - ferr, ok := err.(*FilterError) - if ok && ferr.Code == http.StatusNotFound { - wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", peerID), zap.Error(err)) - } else { - wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) - return + go func(peerID peer.ID) { + defer func() { + if params.wg != nil { + params.wg.Done() + } + }() + + err := wf.request( + ctx, + &FilterSubscribeParameters{selectedPeer: peerID, requestID: params.requestID}, + pb.FilterSubscribeRequest_UNSUBSCRIBE, + cFilter) + if err != nil { + ferr, ok := err.(*FilterError) + if ok && ferr.Code == http.StatusNotFound { + wf.log.Warn("peer does not have a subscription", logging.HostID("peerID", peerID), zap.Error(err)) + } else { + wf.log.Error("could not unsubscribe from peer", logging.HostID("peerID", peerID), zap.Error(err)) + return + } } - } - if params.wg != nil { - resultChan <- WakuFilterPushResult{ - Err: err, - PeerID: peerID, + if params.wg != nil { + resultChan <- WakuFilterPushResult{ + Err: err, + PeerID: peerID, + } } - } - }(peerID) + }(peerID) + } } - if params.wg != nil { params.wg.Wait() } diff --git a/waku/v2/protocol/filter/filter_test.go b/waku/v2/protocol/filter/filter_test.go index de48d214d..fb774cda0 100644 --- a/waku/v2/protocol/filter/filter_test.go +++ b/waku/v2/protocol/filter/filter_test.go @@ -42,7 +42,7 @@ type FilterTestSuite struct { fullNodeHost host.Host wg *sync.WaitGroup contentFilter ContentFilter - subDetails *SubscriptionDetails + subDetails []*SubscriptionDetails log *zap.Logger } @@ -142,7 +142,7 @@ func (s *FilterTestSuite) waitForTimeout(fn func(), ch chan *protocol.Envelope) s.wg.Wait() } -func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) *SubscriptionDetails { +func (s *FilterTestSuite) subscribe(pubsubTopic string, contentTopic string, peer peer.ID) []*SubscriptionDetails { s.contentFilter = ContentFilter{pubsubTopic, NewContentTopicSet(contentTopic)} subDetails, err := s.lightNode.Subscribe(s.ctx, s.contentFilter, WithPeer(peer)) @@ -181,7 +181,8 @@ func (s *FilterTestSuite) SetupTest() { s.testTopic = "/waku/2/go/filter/test" s.testContentTopic = "TopicA" - s.lightNode = s.makeWakuFilterLightNode(true, false) + s.lightNode = s.makeWakuFilterLightNode(true, true) + //TODO: Add tests to verify broadcaster. s.relayNode, s.fullNode = s.makeWakuFilterFullNode(s.testTopic) @@ -202,19 +203,18 @@ func (s *FilterTestSuite) TearDownTest() { } func (s *FilterTestSuite) TestWakuFilter() { - // Initial subscribe s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) // Should be received s.waitForMsg(func() { s.publishMsg(s.testTopic, s.testContentTopic, "first") - }, s.subDetails.C) + }, s.subDetails[0].C) // Wrong content topic s.waitForTimeout(func() { s.publishMsg(s.testTopic, "TopicB", "second") - }, s.subDetails.C) + }, s.subDetails[0].C) _, err := s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID())) s.Require().NoError(err) @@ -224,11 +224,10 @@ func (s *FilterTestSuite) TestWakuFilter() { // Should not receive after unsubscribe s.waitForTimeout(func() { s.publishMsg(s.testTopic, s.testContentTopic, "third") - }, s.subDetails.C) + }, s.subDetails[0].C) } func (s *FilterTestSuite) TestSubscriptionPing() { - err := s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) s.Require().Error(err) filterErr, ok := err.(*FilterError) @@ -243,7 +242,6 @@ func (s *FilterTestSuite) TestSubscriptionPing() { } func (s *FilterTestSuite) TestPeerFailure() { - broadcaster2 := relay.NewBroadcaster(10) s.Require().NoError(broadcaster2.Start(context.Background())) @@ -260,7 +258,7 @@ func (s *FilterTestSuite) TestPeerFailure() { s.waitForMsg(func() { s.publishMsg(s.testTopic, s.testContentTopic) - }, s.subDetails.C) + }, s.subDetails[0].C) // Failure is removed s.Require().False(s.fullNode.subscriptions.IsFailedPeer(s.lightNodeHost.ID())) @@ -287,15 +285,13 @@ func (s *FilterTestSuite) TestPeerFailure() { } func (s *FilterTestSuite) TestCreateSubscription() { - // Initial subscribe s.subDetails = s.subscribe(s.testTopic, s.testContentTopic, s.fullNodeHost.ID()) - s.waitForMsg(func() { _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) s.Require().NoError(err) - }, s.subDetails.C) + }, s.subDetails[0].C) } func (s *FilterTestSuite) TestModifySubscription() { @@ -307,7 +303,7 @@ func (s *FilterTestSuite) TestModifySubscription() { _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) s.Require().NoError(err) - }, s.subDetails.C) + }, s.subDetails[0].C) // Subscribe to another content_topic newContentTopic := "Topic_modified" @@ -317,7 +313,7 @@ func (s *FilterTestSuite) TestModifySubscription() { _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic) s.Require().NoError(err) - }, s.subDetails.C) + }, s.subDetails[0].C) } func (s *FilterTestSuite) TestMultipleMessages() { @@ -329,16 +325,17 @@ func (s *FilterTestSuite) TestMultipleMessages() { _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "first"), s.testTopic) s.Require().NoError(err) - }, s.subDetails.C) + }, s.subDetails[0].C) s.waitForMsg(func() { _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch(), "second"), s.testTopic) s.Require().NoError(err) - }, s.subDetails.C) + }, s.subDetails[0].C) } func (s *FilterTestSuite) TestRunningGuard() { + s.lightNode.Stop() contentFilter := ContentFilter{"test", NewContentTopicSet("test")} @@ -356,6 +353,7 @@ func (s *FilterTestSuite) TestRunningGuard() { } func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { + contentFilter := ContentFilter{"test", NewContentTopicSet("test")} _, err := s.lightNode.Subscribe(s.ctx, contentFilter, WithPeer(s.fullNodeHost.ID())) @@ -376,6 +374,7 @@ func (s *FilterTestSuite) TestFireAndForgetAndCustomWg() { } func (s *FilterTestSuite) TestStartStop() { + var wg sync.WaitGroup wg.Add(2) s.lightNode = s.makeWakuFilterLightNode(false, false) @@ -403,3 +402,87 @@ func (s *FilterTestSuite) TestStartStop() { wg.Wait() } + +func (s *FilterTestSuite) TestAutoShard() { + + //Workaround as could not find a way to reuse setup test with params + // Stop what is run in setup + s.fullNode.Stop() + s.lightNode.Stop() + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) // Test can't exceed 10 seconds + s.ctx = ctx + s.ctxCancel = cancel + + cTopic1Str := "0/test/1/testTopic/proto" + cTopic1, err := protocol.StringToContentTopic(cTopic1Str) + s.Require().NoError(err) + //Computing pubSubTopic only for filterFullNode. + pubSubTopic := protocol.GetShardFromContentTopic(cTopic1, protocol.GenerationZeroShardsCount) + s.testContentTopic = cTopic1Str + s.testTopic = pubSubTopic.String() + + s.lightNode = s.makeWakuFilterLightNode(true, false) + s.relayNode, s.fullNode = s.makeWakuFilterFullNode(pubSubTopic.String()) + + s.lightNodeHost.Peerstore().AddAddr(s.fullNodeHost.ID(), tests.GetHostAddress(s.fullNodeHost), peerstore.PermanentAddrTTL) + err = s.lightNodeHost.Peerstore().AddProtocols(s.fullNodeHost.ID(), FilterSubscribeID_v20beta1) + s.Require().NoError(err) + + s.log.Info("Testing Autoshard:CreateSubscription") + s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID()) + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(s.testContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails[0].C) + + // Wrong content topic + s.waitForTimeout(func() { + s.publishMsg(s.testTopic, "TopicB", "second") + }, s.subDetails[0].C) + + _, err = s.lightNode.Unsubscribe(s.ctx, s.contentFilter, Peer(s.fullNodeHost.ID())) + s.Require().NoError(err) + + time.Sleep(1 * time.Second) + + // Should not receive after unsubscribe + s.waitForTimeout(func() { + s.publishMsg(s.testTopic, s.testContentTopic, "third") + }, s.subDetails[0].C) + + s.subDetails = s.subscribe("", s.testContentTopic, s.fullNodeHost.ID()) + + s.log.Info("Testing Autoshard:SubscriptionPing") + err = s.lightNode.Ping(context.Background(), s.fullNodeHost.ID()) + s.Require().NoError(err) + + // Test ModifySubscription Subscribe to another content_topic + s.log.Info("Testing Autoshard:ModifySubscription") + + newContentTopic := "0/test/1/testTopic1/proto" + s.subDetails = s.subscribe("", newContentTopic, s.fullNodeHost.ID()) + + s.waitForMsg(func() { + _, err := s.relayNode.PublishToTopic(s.ctx, tests.CreateWakuMessage(newContentTopic, utils.GetUnixEpoch()), s.testTopic) + s.Require().NoError(err) + + }, s.subDetails[0].C) + _, err = s.lightNode.Unsubscribe(s.ctx, ContentFilter{ + PubsubTopic: s.testTopic, + ContentTopics: NewContentTopicSet(newContentTopic), + }) + s.Require().NoError(err) + + _, err = s.lightNode.UnsubscribeAll(s.ctx) + s.Require().NoError(err) + +} + +func (s *FilterTestSuite) BeforeTest(suiteName, testName string) { + s.log.Info("Executing ", zap.String("testName", testName)) +} + +func (s *FilterTestSuite) AfterTest(suiteName, testName string) { + s.log.Info("Finished executing ", zap.String("testName", testName)) +}