Skip to content

Commit

Permalink
chore: unit test for duplicate message push (#2852)
Browse files Browse the repository at this point in the history
* chore: add unit test for testing duplicate message push with timedcache

* chore: update according to better naming convention

---------

Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
  • Loading branch information
darshankabariya and NagyZoltanPeter authored Jun 28, 2024
1 parent 9bd8c33 commit 31c632e
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 4 deletions.
148 changes: 147 additions & 1 deletion tests/wakunode_rest/test_rest_filter.nim
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{.used.}

import
std/os,
chronos/timer,
stew/byteutils,
stew/shims/net,
testutils/unittests,
Expand Down Expand Up @@ -52,7 +54,7 @@ proc init(T: type RestFilterTest): Future[T] {.async.} =
await allFutures(testSetup.serviceNode.start(), testSetup.subscriberNode.start())

await testSetup.serviceNode.mountRelay()
await testSetup.serviceNode.mountFilter()
await testSetup.serviceNode.mountFilter(messageCacheTTL = 1.seconds)
await testSetup.subscriberNode.mountFilterClient()

testSetup.subscriberNode.peerManager.addServicePeer(
Expand Down Expand Up @@ -315,3 +317,147 @@ suite "Waku v2 Rest API - Filter V2":
messages == @[testMessage]

await restFilterTest.shutdown()

asyncTest "duplicate message push to filter subscriber":
# setup filter service and client node
let restFilterTest = await RestFilterTest.init()
let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))

let requestBody = FilterSubscribeRequest(
requestId: "1001",
contentFilters: @[DefaultContentTopic],
pubsubTopic: some(DefaultPubsubTopic),
)
let response = await restFilterTest.client.filterPostSubscriptions(requestBody)

# subscribe fiter service
let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(
DefaultPubsubTopic, DefaultContentTopic
)

check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.requestId == "1001"
subscribedPeer.len() == 1

# ping subscriber node
restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic)

let pingResponse = await restFilterTest.client.filterSubscriberPing("1002")

check:
pingResponse.status == 200
pingResponse.data.requestId == "1002"
pingResponse.data.statusDesc == "OK"

# first - message push from service node to subscriber client
let testMessage = WakuMessage(
payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(),
contentTopic: DefaultContentTopic,
timestamp: int64(2022),
meta: "test-meta".toBytes(),
)

let postMsgResponse1 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
DefaultPubsubTopic, toRelayWakuMessage(testMessage)
)

# check messages received client side or not
let messages1 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic)

check:
postMsgResponse1.status == 200
$postMsgResponse1.contentType == $MIMETYPE_TEXT
postMsgResponse1.data == "OK"
len(messages1.data) == 1

# second - message push from service node to subscriber client
let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
DefaultPubsubTopic, toRelayWakuMessage(testMessage)
)

# check message received client side or not
let messages2 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic)

check:
postMsgResponse2.status == 200
$postMsgResponse2.contentType == $MIMETYPE_TEXT
postMsgResponse2.data == "OK"
len(messages2.data) == 0

await restFilterTest.shutdown()

asyncTest "duplicate message push to filter subscriber ( sleep in between )":
# setup filter service and client node
let restFilterTest = await RestFilterTest.init()
let subPeerId = restFilterTest.subscriberNode.peerInfo.toRemotePeerInfo().peerId
restFilterTest.serviceNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic))

let requestBody = FilterSubscribeRequest(
requestId: "1001",
contentFilters: @[DefaultContentTopic],
pubsubTopic: some(DefaultPubsubTopic),
)
let response = await restFilterTest.client.filterPostSubscriptions(requestBody)

# subscribe fiter service
let subscribedPeer = restFilterTest.serviceNode.wakuFilter.subscriptions.findSubscribedPeers(
DefaultPubsubTopic, DefaultContentTopic
)

check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.requestId == "1001"
subscribedPeer.len() == 1

# ping subscriber node
restFilterTest.messageCache.pubsubSubscribe(DefaultPubsubTopic)

let pingResponse = await restFilterTest.client.filterSubscriberPing("1002")

check:
pingResponse.status == 200
pingResponse.data.requestId == "1002"
pingResponse.data.statusDesc == "OK"

# first - message push from service node to subscriber client
let testMessage = WakuMessage(
payload: "TEST-PAYLOAD-MUST-RECEIVE".toBytes(),
contentTopic: DefaultContentTopic,
timestamp: int64(2022),
meta: "test-meta".toBytes(),
)

let postMsgResponse1 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
DefaultPubsubTopic, toRelayWakuMessage(testMessage)
)

# check messages received client side or not
let messages1 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic)

check:
postMsgResponse1.status == 200
$postMsgResponse1.contentType == $MIMETYPE_TEXT
postMsgResponse1.data == "OK"
len(messages1.data) == 1

# Pause execution for 1 seconds to test TimeCache functionality of service node
await sleepAsync(1.seconds)

# second - message push from service node to subscriber client
let postMsgResponse2 = await restFilterTest.clientTwdServiceNode.relayPostMessagesV1(
DefaultPubsubTopic, toRelayWakuMessage(testMessage)
)

# check message received client side or not
let messages2 = await restFilterTest.client.filterGetMessagesV1(DefaultContentTopic)

check:
postMsgResponse2.status == 200
$postMsgResponse2.contentType == $MIMETYPE_TEXT
postMsgResponse2.data == "OK"
len(messages2.data) == 1
await restFilterTest.shutdown()
4 changes: 3 additions & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -434,12 +434,14 @@ proc mountFilter*(
filter_subscriptions.DefaultSubscriptionTimeToLiveSec,
maxFilterPeers: uint32 = filter_subscriptions.MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = filter_subscriptions.MaxFilterCriteriaPerPeer,
messageCacheTTL: Duration = filter_subscriptions.MessageCacheTTL,
) {.async: (raises: []).} =
## Mounting filter v2 protocol

info "mounting filter protocol"
node.wakuFilter = WakuFilter.new(
node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
node.peerManager, subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer,
messageCacheTTL,
)

if node.started:
Expand Down
4 changes: 2 additions & 2 deletions waku/waku_filter_v2/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,14 @@ proc new*(
subscriptionTimeout: Duration = DefaultSubscriptionTimeToLiveSec,
maxFilterPeers: uint32 = MaxFilterPeers,
maxFilterCriteriaPerPeer: uint32 = MaxFilterCriteriaPerPeer,
timeout: Duration = 2.minutes,
messageCacheTTL: Duration = MessageCacheTTL,
): T =
let wf = WakuFilter(
subscriptions: FilterSubscriptions.init(
subscriptionTimeout, maxFilterPeers, maxFilterCriteriaPerPeer
),
peerManager: peerManager,
messageCache: init(TimedCache[string], timeout),
messageCache: init(TimedCache[string], messageCacheTTL),
)

wf.initProtocolHandler()
Expand Down
1 change: 1 addition & 0 deletions waku/waku_filter_v2/subscriptions.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const
MaxFilterPeers* = 1000
MaxFilterCriteriaPerPeer* = 1000
DefaultSubscriptionTimeToLiveSec* = 5.minutes
MessageCacheTTL* = 2.minutes

type
# a single filter criterion is fully defined by a pubsub topic and content topic
Expand Down

0 comments on commit 31c632e

Please sign in to comment.