Skip to content

Commit

Permalink
test(lightpush): Lightpush functional tests (#2269)
Browse files Browse the repository at this point in the history
* Add ligthpush payload tests.
* Add end to end lightpush tests.
* updating vendor/nim-unittest2 to protect against core dump issue
* Enable "Valid Payload Sizes" test again
---------
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
  • Loading branch information
AlejandroCabeza authored Feb 6, 2024
1 parent e4e147b commit 817b2e0
Show file tree
Hide file tree
Showing 18 changed files with 1,081 additions and 639 deletions.
23 changes: 7 additions & 16 deletions tests/all_tests_waku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import
./waku_core/test_peers,
./waku_core/test_published_address


# Waku archive test suite
import
./waku_archive/test_driver_queue_index,
Expand All @@ -22,15 +21,15 @@ import

const os* {.strdefine.} = ""
when os == "Linux" and
# GitHub only supports container actions on Linux
# and we need to start a postgress database in a docker container
defined(postgres):
# GitHub only supports container actions on Linux
# and we need to start a postgress database in a docker container
defined(postgres):
import
./waku_archive/test_driver_postgres_query,
./waku_archive/test_driver_postgres
./waku_archive/test_driver_postgres_query, ./waku_archive/test_driver_postgres

# Waku store test suite
import
./waku_store/test_client,
./waku_store/test_rpc_codec,
./waku_store/test_waku_store,
./waku_store/test_wakunode_store
Expand All @@ -39,17 +38,11 @@ when defined(waku_exp_store_resume):
# TODO: Review store resume test cases (#1282)
import ./waku_store/test_resume


import
./waku_relay/test_all,
./waku_filter_v2/test_all

import ./waku_relay/test_all, ./waku_filter_v2/test_all, ./waku_lightpush/test_all

import
# Waku v2 tests
./test_wakunode,
# Waku LightPush
./test_waku_lightpush,
./test_wakunode_lightpush,
# Waku Filter
./test_waku_filter_legacy,
Expand All @@ -71,9 +64,7 @@ import
./test_waku_rendezvous

# Waku Keystore test suite
import
./test_waku_keystore_keyfile,
./test_waku_keystore
import ./test_waku_keystore_keyfile, ./test_waku_keystore

## Wakunode JSON-RPC API test suite
import
Expand Down
4 changes: 4 additions & 0 deletions tests/node/test_all.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
import
./test_wakunode_filter,
./test_wakunode_lightpush,
./test_wakunode_store
108 changes: 55 additions & 53 deletions tests/node/test_wakunode_filter.nim
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
{.used.}

import
std/[
options,
tables,
sequtils
],
std/[options, tables, sequtils],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
chronicles,
os,
libp2p/[
peerstore,
crypto/crypto
]
libp2p/[peerstore, crypto/crypto]

import
../../../waku/[
Expand All @@ -25,14 +18,7 @@ import
waku_filter_v2/client,
waku_filter_v2/subscriptions
],
../testlib/[
common,
wakucore,
wakunode,
testasync,
futures,
testutils
]
../testlib/[common, wakucore, wakunode, testasync, futures, testutils]

suite "Waku Filter - End to End":
var client {.threadvar.}: WakuNode
Expand All @@ -48,10 +34,11 @@ suite "Waku Filter - End to End":

asyncSetup:
pushHandlerFuture = newFuture[(string, WakuMessage)]()
messagePushHandler = proc(
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[void] {.async, closure, gcsafe.} =
pushHandlerFuture.complete((pubsubTopic, message))
messagePushHandler =
proc(pubsubTopic: PubsubTopic, message: WakuMessage): Future[void] {.
async, closure, gcsafe
.} =
pushHandlerFuture.complete((pubsubTopic, message))

pubsubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
Expand All @@ -63,7 +50,8 @@ suite "Waku Filter - End to End":

server = newTestWakuNode(serverKey, parseIpAddress("0.0.0.0"), Port(23450))
client = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(23451))
clientClone = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(23451)) # Used for testing client restarts
clientClone = newTestWakuNode(clientKey, parseIpAddress("0.0.0.0"), Port(23451))
# Used for testing client restarts

await allFutures(server.start(), client.start())

Expand All @@ -83,9 +71,11 @@ suite "Waku Filter - End to End":

asyncTest "Client Node receives Push from Server Node, via Filter":
# When a client node subscribes to a filter node
let subscribeResponse = await client.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
let
subscribeResponse =
await client.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)

# Then the subscription is successful
check:
Expand All @@ -94,7 +84,7 @@ suite "Waku Filter - End to End":
server.wakuFilter.subscriptions.isSubscribed(clientPeerId)

# When sending a message to the subscribed content topic
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
let msg1 = fakeWakuMessage(contentTopic = contentTopic)
await server.filterHandleMessage(pubsubTopic, msg1)

# Then the message is pushed to the client
Expand All @@ -105,9 +95,11 @@ suite "Waku Filter - End to End":
pushedMsg1 == msg1

# When unsubscribing from the subscription
let unsubscribeResponse = await client.filterUnsubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
let
unsubscribeResponse =
await client.filterUnsubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)

# Then the unsubscription is successful
check:
Expand All @@ -116,7 +108,7 @@ suite "Waku Filter - End to End":

# When sending a message to the previously subscribed content topic
pushHandlerFuture = newPushHandlerFuture() # Clear previous future
let msg2 = fakeWakuMessage(contentTopic=contentTopic)
let msg2 = fakeWakuMessage(contentTopic = contentTopic)
await server.filterHandleMessage(pubsubTopic, msg2)

# Then the message is not pushed to the client
Expand All @@ -128,16 +120,18 @@ suite "Waku Filter - End to End":
await server.mountRelay()

# And valid filter subscription
let subscribeResponse = await client.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
let
subscribeResponse =
await client.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
require:
subscribeResponse.isOk()
server.wakuFilter.subscriptions.subscribedPeerCount() == 1

# When a server node gets a Relay message
let msg1 = fakeWakuMessage(contentTopic=contentTopic)
await server.publish(some(pubsubTopic), msg1)
let msg1 = fakeWakuMessage(contentTopic = contentTopic)
discard await server.publish(some(pubsubTopic), msg1)

# Then the message is not sent to the client's filter push handler
check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT))
Expand All @@ -154,18 +148,22 @@ suite "Waku Filter - End to End":
let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()

# When a client node subscribes to the server node
let subscribeResponse = await client.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
let
subscribeResponse =
await client.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)

# Then the subscription is successful
check (not subscribeResponse.isOk())

asyncTest "Filter Client Node can receive messages after subscribing and restarting, via Filter":
# Given a valid filter subscription
let subscribeResponse = await client.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
let
subscribeResponse =
await client.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
require:
subscribeResponse.isOk()
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
Expand All @@ -175,7 +173,7 @@ suite "Waku Filter - End to End":
await clientClone.start() # Mimic restart by starting the clone

# When a message is sent to the subscribed content topic, via Filter; without refreshing the subscription
let msg = fakeWakuMessage(contentTopic=contentTopic)
let msg = fakeWakuMessage(contentTopic = contentTopic)
await server.filterHandleMessage(pubsubTopic, msg)

# Then the message is pushed to the client
Expand All @@ -185,13 +183,15 @@ suite "Waku Filter - End to End":
pushedMsgPubsubTopic == pubsubTopic
pushedMsg == msg

asyncTest "Filter Client Node can't receive messages after subscribing and restarting, via Relay": # Given the server node has Relay enabled
asyncTest "Filter Client Node can't receive messages after subscribing and restarting, via Relay":
await server.mountRelay()

# Given a valid filter subscription
let subscribeResponse = await client.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
let
subscribeResponse =
await client.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
require:
subscribeResponse.isOk()
server.wakuFilter.subscriptions.subscribedPeerCount() == 1
Expand All @@ -201,24 +201,26 @@ suite "Waku Filter - End to End":
await clientClone.start() # Mimic restart by starting the clone

# When a message is sent to the subscribed content topic, via Relay
let msg = fakeWakuMessage(contentTopic=contentTopic)
await server.publish(some(pubsubTopic), msg)
let msg = fakeWakuMessage(contentTopic = contentTopic)
discard await server.publish(some(pubsubTopic), msg)

# Then the message is not sent to the client's filter push handler
check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT))

# Given the client refreshes the subscription
let subscribeResponse2 = await clientClone.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
let
subscribeResponse2 =
await clientClone.filterSubscribe(
some(pubsubTopic), contentTopicSeq, serverRemotePeerInfo
)
check:
subscribeResponse2.isOk()
server.wakuFilter.subscriptions.subscribedPeerCount() == 1

# When a message is sent to the subscribed content topic, via Relay
pushHandlerFuture = newPushHandlerFuture()
let msg2 = fakeWakuMessage(contentTopic=contentTopic)
await server.publish(some(pubsubTopic), msg2)
let msg2 = fakeWakuMessage(contentTopic = contentTopic)
discard await server.publish(some(pubsubTopic), msg2)

# Then the message is not sent to the client's filter push handler
check (not await pushHandlerFuture.withTimeout(FUTURE_TIMEOUT))
93 changes: 93 additions & 0 deletions tests/node/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
{.used.}

import
std/[options, tables, sequtils],
stew/shims/net as stewNet,
testutils/unittests,
chronos,
chronicles,
os,
libp2p/[peerstore, crypto/crypto]

import
../../../waku/[
waku_core,
node/peer_manager,
node/waku_node,
waku_filter_v2,
waku_filter_v2/client,
waku_filter_v2/subscriptions,
waku_lightpush,
waku_lightpush/common,
waku_lightpush/client,
waku_lightpush/protocol_metrics,
waku_lightpush/rpc
],
../testlib/[assertions, common, wakucore, wakunode, testasync, futures, testutils]

suite "Waku Lightpush - End To End":
var
handlerFuture {.threadvar.}: Future[(PubsubTopic, WakuMessage)]
handler {.threadvar.}: PushMessageHandler

server {.threadvar.}: WakuNode
client {.threadvar.}: WakuNode

serverRemotePeerInfo {.threadvar.}: RemotePeerInfo
pubsubTopic {.threadvar.}: PubsubTopic
contentTopic {.threadvar.}: ContentTopic
message {.threadvar.}: WakuMessage

asyncSetup:
handlerFuture = newPushHandlerFuture()
handler =
proc(
peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult[void]] {.async.} =
handlerFuture.complete((pubsubTopic, message))
return ok()

let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()

server = newTestWakuNode(serverKey, ValidIpAddress.init("0.0.0.0"), Port(0))
client = newTestWakuNode(clientKey, ValidIpAddress.init("0.0.0.0"), Port(0))

await allFutures(server.start(), client.start())
await server.start()

waitFor server.mountRelay()
waitFor server.mountLightpush()
client.mountLightpushClient()

serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
pubsubTopic = DefaultPubsubTopic
contentTopic = DefaultContentTopic
message = fakeWakuMessage()

asyncTeardown:
await server.stop()

suite "Assessment of Message Relaying Mechanisms":
asyncTest "Via 11/WAKU2-RELAY from Relay/Full Node":
# Given a light lightpush client
let
lightpushClient =
newTestWakuNode(
generateSecp256k1Key(), ValidIpAddress.init("0.0.0.0"), Port(0)
)
lightpushClient.mountLightpushClient()

# When the client publishes a message
let
publishResponse =
await lightpushClient.lightpushPublish(
some(pubsubTopic), message, serverRemotePeerInfo
)

if not publishResponse.isOk():
echo "Publish failed: ", publishResponse.error()

# Then the message is relayed to the server
assertResultOk publishResponse
Loading

0 comments on commit 817b2e0

Please sign in to comment.