Skip to content

Commit

Permalink
feat: example using filter and lightpush (#1720)
Browse files Browse the repository at this point in the history
* feat: add filter-lightpush example

* chore: examples/v2/filter_subscriber.nim

Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>

* chore: update examples/v2/filter_subscriber.nim

Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>

---------

Co-authored-by: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com>
  • Loading branch information
jm-clius and Ivansete-status authored May 12, 2023
1 parent b277ce1 commit 8987d4a
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 2 deletions.
34 changes: 32 additions & 2 deletions examples/v2/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ TODO

# publisher/subscriber

Within `examples/v2` you can find a `publisher` and a `subscriber`. The first one publises messages to the default pubsub topic to a given content topic, and the second one runs forever listening to that pubsub topic and printing the content it receives.
Within `examples/v2` you can find a `publisher` and a `subscriber`. The first one publishes messages to the default pubsub topic on a given content topic, and the second one runs forever listening to that pubsub topic and printing the content it receives.

**Some notes:**
* These examples are meant to work even in if you are behind a firewall and you can't be discovered by discv5.
Expand All @@ -31,4 +31,34 @@ And run a publisher
./build/publisher
```

See how the subscriber received the messages published by the publisher. Feel free to experiment from different machines in different locations.
See how the subscriber received the messages published by the publisher. Feel free to experiment from different machines in different locations.

# resource-restricted publisher/subscriber (lightpush/filter)

To illustrate publishing and receiving messages on a resource-restricted client,
`examples/v2` also provides a `lightpush_publisher` and a `filter_subscriber`.
The `lightpush_publisher` continually publishes messages via a lightpush service node
to the default pubsub topic on a given content topic.
The `filter_subscriber` subscribes via a filter service node
to the same pubsub and content topic.
It runs forever, maintaining this subscription
and printing the content it receives.

**compile and run:**

Wait until the filter subscriber is ready.
```console
./env.sh bash
nim c -r examples/v2/filter_subscriber.nim
```

And run a lightpush publisher
```console
./env.sh bash
nim c -r examples/v2/lightpush_publisher.nim
```

See how the filter subscriber receives messages published by the lightpush publisher.
Neither the publisher nor the subscriber participates in `relay`,
but instead make use of service nodes to save resources.
Feel free to experiment from different machines in different locations.
82 changes: 82 additions & 0 deletions examples/v2/filter_subscriber.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
## Example showing how a resource restricted client may
## subscribe to messages without relay

import
chronicles,
chronos,
stew/byteutils,
stew/results
import
../../../waku/common/logging,
../../../waku/v2/node/peer_manager,
../../../waku/v2/waku_core,
../../../waku/v2/waku_filter_v2/client

const
FilterPeer = "/ip4/104.154.239.128/tcp/30303/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS" # node-01.gc-us-central1-a.wakuv2.test.statusim.net on wakuv2.test
FilterPubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
FilterContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto")

proc unsubscribe(wfc: WakuFilterClient,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
filterContentTopic: ContentTopic) {.async.} =
notice "unsubscribing from filter"
let unsubscribeRes = await wfc.unsubscribe(filterPeer, filterPubsubTopic, @[filterContentTopic])
if unsubscribeRes.isErr:
notice "unsubscribe request failed", err=unsubscribeRes.error
else:
notice "unsubscribe request successful"

proc messagePushHandler(pubsubTopic: PubsubTopic, message: WakuMessage) =
let payloadStr = string.fromBytes(message.payload)
notice "message received", payload=payloadStr,
pubsubTopic=pubsubTopic,
contentTopic=message.contentTopic,
timestamp=message.timestamp

proc maintainSubscription(wfc: WakuFilterClient,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
filterContentTopic: ContentTopic) {.async.} =
while true:
notice "maintaining subscription"
# First use filter-ping to check if we have an active subscription
let pingRes = await wfc.ping(filterPeer)
if pingRes.isErr():
# No subscription found. Let's subscribe.
notice "no subscription found. Sending subscribe request"

let subscribeRes = await wfc.subscribe(filterPeer, filterPubsubTopic, @[filterContentTopic])

if subscribeRes.isErr():
notice "subscribe request failed. Quitting.", err=subscribeRes.error
break
else:
notice "subscribe request successful."
else:
notice "subscription found."

await sleepAsync(60.seconds) # Subscription maintenance interval

proc setupAndSubscribe(rng: ref HmacDrbgContext) =
let filterPeer = parsePeerInfo(FilterPeer).get()

setupLogLevel(logging.LogLevel.NOTICE)
notice "starting filter subscriber"

var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wfc = WakuFilterClient.new(rng, messagePushHandler, pm)

# Mount filter client protocol
switch.mount(wfc)

# Start maintaining subscription
asyncSpawn maintainSubscription(wfc, filterPeer, FilterPubsubTopic, FilterContentTopic)

when isMainModule:
let rng = newRng()
setupAndSubscribe(rng)
runForever()
57 changes: 57 additions & 0 deletions examples/v2/lightpush_publisher.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
## Example showing how a resource restricted client may
## use lightpush to publish messages without relay

import
chronicles,
chronos,
stew/byteutils,
stew/results
import
../../../waku/common/logging,
../../../waku/v2/node/peer_manager,
../../../waku/v2/waku_core,
../../../waku/v2/waku_lightpush/client

const
LightpushPeer = "/ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ" # node-01.do-ams3.wakuv2.test.statusim.net on wakuv2.test
LightpushPubsubTopic = PubsubTopic("/waku/2/default-waku/proto")
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto")

proc publishMessages(wlc: WakuLightpushClient,
lightpushPeer: RemotePeerInfo,
lightpushPubsubTopic: PubsubTopic,
lightpushContentTopic: ContentTopic) {.async.} =
while true:
let text = "hi there i'm a lightpush publisher"
let message = WakuMessage(payload: toBytes(text), # content of the message
contentTopic: lightpushContentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: getNowInNanosecondTime()) # current timestamp

let wlpRes = await wlc.publish(lightpushPubsubTopic, message, lightpushPeer)

if wlpRes.isOk():
notice "published message using lightpush", message=message
else:
notice "failed to publish message using lightpush", err=wlpRes.error()

await sleepAsync(5000) # Publish every 5 seconds

proc setupAndPublish(rng: ref HmacDrbgContext) =
let lightpushPeer = parsePeerInfo(LightpushPeer).get()

setupLogLevel(logging.LogLevel.NOTICE)
notice "starting lightpush publisher"

var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wlc = WakuLightpushClient.new(pm, rng)

# Start maintaining subscription
asyncSpawn publishMessages(wlc, lightpushPeer, LightpushPubsubTopic, LightpushContentTopic)

when isMainModule:
let rng = newRng()
setupAndPublish(rng)
runForever()

0 comments on commit 8987d4a

Please sign in to comment.