diff --git a/tests/wakunode_rest/test_rest_filter.nim b/tests/wakunode_rest/test_rest_filter.nim index e3893ec081..1127a9cc98 100644 --- a/tests/wakunode_rest/test_rest_filter.nim +++ b/tests/wakunode_rest/test_rest_filter.nim @@ -1,6 +1,8 @@ {.used.} import + std/os, + chronos/timer, stew/byteutils, stew/shims/net, testutils/unittests, @@ -52,7 +54,7 @@ proc init(T: type RestFilterTest): Future[T] {.async.} = await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start()) await testSetup.serviceNode.mountRelay() - await testSetup.serviceNode.mountFilter() + await testSetup.serviceNode.mountFilter(messageCacheTTL = 1.seconds) await testSetup.subscriberNode.mountFilterClient() testSetup.subscriberNode.peerManager.addServicePeer( @@ -315,3 +317,147 @@ suite "Waku v2 Rest API - Filter V2": messages == @[testMessage] await restFilterTest.shutdown() + + asyncTest "duplicate message push to filter subscriber": + # setup filter service and client node + let restFilterTest = await RestFilterTest.init() + let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId + restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + + let requestBody = FilterSubscribeRequest( + requestId: "1001", + contentFilters: @[DefaultContentTopic], + pubsubTopic: some(DefaultPubsubTopic), + ) + let response = await restFilterTest.client.filterPostSubscriptions(requestBody) + + # subscribe fiter service + let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers( + DefaultPubsubTopic, DefaultContentTopic + ) + + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.requestId == "1001" + subscribedPeer.len() == 1 + + # ping subscriber node + restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic) + + let pingResponse = await restFilterTest.client.filterSubscriberPing("1002") + + check: + pingResponse.status == 200 + pingResponse.data.requestId == "1002" + pingResponse.data.statusDesc == "OK" + + # first - message push from service node to subscriber client + let testMessage = WakuMessage( + payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(), + contentTopic: DefaultContentTopic, + timestamp: int64(2022), + meta: "test-meta".toBytes(), + ) + + let postMsgResponse1 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( + DefaultPubsubTopic, toRelayWakuMessage(testMessage) + ) + + # check messages received client side or not + let messages1 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic) + + check: + postMsgResponse1.status == 200 + $postMsgResponse1.contentType == $MIMETYPE_TEXT + postMsgResponse1.data == "OK" + len(messages1.data) == 1 + + # second - message push from service node to subscriber client + let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( + DefaultPubsubTopic, toRelayWakuMessage(testMessage) + ) + + # check message received client side or not + let messages2 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic) + + check: + postMsgResponse2.status == 200 + $postMsgResponse2.contentType == $MIMETYPE_TEXT + postMsgResponse2.data == "OK" + len(messages2.data) == 0 + + await restFilterTest.shutdown() + + asyncTest "duplicate message push to filter subscriber ( sleep in between )": + # setup filter service and client node + let restFilterTest = await RestFilterTest.init() + let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId + restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic)) + + let requestBody = FilterSubscribeRequest( + requestId: "1001", + contentFilters: @[DefaultContentTopic], + pubsubTopic: some(DefaultPubsubTopic), + ) + let response = await restFilterTest.client.filterPostSubscriptions(requestBody) + + # subscribe fiter service + let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers( + DefaultPubsubTopic, DefaultContentTopic + ) + + check: + response.status == 200 + $response.contentType == $MIMETYPE_JSON + response.data.requestId == "1001" + subscribedPeer.len() == 1 + + # ping subscriber node + restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic) + + let pingResponse = await restFilterTest.client.filterSubscriberPing("1002") + + check: + pingResponse.status == 200 + pingResponse.data.requestId == "1002" + pingResponse.data.statusDesc == "OK" + + # first - message push from service node to subscriber client + let testMessage = WakuMessage( + payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(), + contentTopic: DefaultContentTopic, + timestamp: int64(2022), + meta: "test-meta".toBytes(), + ) + + let postMsgResponse1 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( + DefaultPubsubTopic, toRelayWakuMessage(testMessage) + ) + + # check messages received client side or not + let messages1 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic) + + check: + postMsgResponse1.status == 200 + $postMsgResponse1.contentType == $MIMETYPE_TEXT + postMsgResponse1.data == "OK" + len(messages1.data) == 1 + + # Pause execution for 1 seconds to test TimeCache functionality of service node + await sleepAsync(1.seconds) + + # second - message push from service node to subscriber client + let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1( + DefaultPubsubTopic, toRelayWakuMessage(testMessage) + ) + + # check message received client side or not + let messages2 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic) + + check: + postMsgResponse2.status == 200 + $postMsgResponse2.contentType == $MIMETYPE_TEXT + postMsgResponse2.data == "OK" + len(messages2.data) == 1 + await restFilterTest.shutdown() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 513da49728..51abfd3420 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -434,12 +434,14 @@ proc mountFilter*( filter_subscriptions.DefaultSubscriptionTimeToLiveSec, maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer, + messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL, ) {.async: (raises: []).} = ## Mounting filter v2 protocol info "mounting filter protocol" node.wakuFilter = WakuFilter.new( - node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer + node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer, + messageCacheTTL, ) if node.started: diff --git a/waku/waku_filter_v2/protocol.nim b/waku/waku_filter_v2/protocol.nim index b3e1516eae..5b87d61485 100644 --- a/waku/waku_filter_v2/protocol.nim +++ b/waku/waku_filter_v2/protocol.nim @@ -295,14 +295,14 @@ proc new*( subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec, maxFilterPeers: uint32 = MaxFilterPeers, maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer, - timeout: Duration = 2.minutes, + messageCacheTTL: Duration = MessageCacheTTL, ): T = let wf = WakuFilter( subscriptions: FilterSubscriptions.init( subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer ), peerManager: peerManager, - messageCache: init(TimedCache[string], timeout), + messageCache: init(TimedCache[string], messageCacheTTL), ) wf.initProtocolHandler() diff --git a/waku/waku_filter_v2/subscriptions.nim b/waku/waku_filter_v2/subscriptions.nim index 61d292eedb..8a5c5bc91a 100644 --- a/waku/waku_filter_v2/subscriptions.nim +++ b/waku/waku_filter_v2/subscriptions.nim @@ -10,6 +10,7 @@ const MaxFilterPeers* = 1000 MaxFilterCriteriaPerPeer* = 1000 DefaultSubscriptionTimeToLiveSec* = 5.minutes + MessageCacheTTL* = 2.minutes type # a single filter criterion is fully defined by a pubsub topic and content topic