Skip to content

Commit

Permalink
feat: Autosharding API for RELAY subscriptions (#1983)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Sep 26, 2023
1 parent d178105 commit 1763b1e
Show file tree
Hide file tree
Showing 29 changed files with 906 additions and 328 deletions.
5 changes: 2 additions & 3 deletions apps/chat2/chat2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ proc publish(c: Chat, line: string) =
# Attempt lightpush
asyncSpawn c.node.lightpushPublish(some(DefaultPubsubTopic), message)
else:
asyncSpawn c.node.publish(DefaultPubsubTopic, message)
asyncSpawn c.node.publish(some(DefaultPubsubTopic), message)

# TODO This should read or be subscribe handler subscribe
proc readAndPrint(c: Chat) {.async.} =
Expand Down Expand Up @@ -490,8 +490,7 @@ proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
if msg.contentTopic == chat.contentTopic:
chat.printReceivedMessage(msg)

let topic = DefaultPubsubTopic
node.subscribe(topic, handler)
node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler))

if conf.rlnRelay:
info "WakuRLNRelay is enabled"
Expand Down
8 changes: 4 additions & 4 deletions apps/chat2bridge/chat2bridge.nim
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =

chat2_mb_transfers.inc(labelValues = ["mb_to_chat2"])

await cmb.nodev2.publish(DefaultPubsubTopic, msg)
await cmb.nodev2.publish(some(DefaultPubsubTopic), msg)

proc toMatterbridge(cmb: Chat2MatterBridge, msg: WakuMessage) {.gcsafe, raises: [Exception].} =
if cmb.seen.containsOrAdd(msg.payload.hash()):
Expand Down Expand Up @@ -204,7 +204,7 @@ proc start*(cmb: Chat2MatterBridge) {.async.} =
trace "Bridging message from Chat2 to Matterbridge", msg=msg
cmb.toMatterbridge(msg)

cmb.nodev2.subscribe(DefaultPubsubTopic, relayHandler)
cmb.nodev2.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))

proc stop*(cmb: Chat2MatterBridge) {.async.} =
info "Stopping Chat2MatterBridge"
Expand All @@ -229,8 +229,8 @@ when isMainModule:

# Install enabled API handlers:
if conf.relay:
let topicCache = relay_api.MessageCache.init(capacity=30)
installRelayApiHandlers(node, rpcServer, topicCache)
let cache = MessageCache[string].init(capacity=30)
installRelayApiHandlers(node, rpcServer, cache)

if conf.filter:
let messageCache = filter_api.MessageCache.init(capacity=30)
Expand Down
2 changes: 1 addition & 1 deletion apps/networkmonitor/networkmonitor.nim
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ proc subscribeAndHandleMessages(node: WakuNode,
else:
msgPerContentTopic[msg.contentTopic] = 1

node.subscribe(pubsubTopic, handler)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))

when isMainModule:
# known issue: confutils.nim(775, 17) Error: can raise an unlisted exception: ref IOError
Expand Down
63 changes: 43 additions & 20 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,11 @@ import
../../waku/node/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/node/peer_manager/peer_store/migrations as peer_store_sqlite_migrations,
../../waku/waku_archive,
../../waku/waku_dnsdisc,
../../waku/waku_enr,
../../waku/waku_discv5,
../../waku/waku_peer_exchange,
../../waku/waku_rln_relay,
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_filter,
../../waku/waku_filter_v2,
./wakunode2_validator_signed,
./internal_config,
./external_config
import
../../waku/waku_api/message_cache,
../../waku/waku_api/cache_handlers,
../../waku/waku_api/rest/server,
../../waku/waku_api/rest/debug/handlers as rest_debug_api,
../../waku/waku_api/rest/relay/handlers as rest_relay_api,
../../waku/waku_api/rest/relay/topic_cache,
../../waku/waku_api/rest/filter/legacy_handlers as rest_legacy_filter_api,
../../waku/waku_api/rest/filter/handlers as rest_filter_api,
../../waku/waku_api/rest/lightpush/handlers as rest_lightpush_api,
Expand All @@ -56,7 +42,20 @@ import
../../waku/waku_api/jsonrpc/debug/handlers as rpc_debug_api,
../../waku/waku_api/jsonrpc/filter/handlers as rpc_filter_api,
../../waku/waku_api/jsonrpc/relay/handlers as rpc_relay_api,
../../waku/waku_api/jsonrpc/store/handlers as rpc_store_api
../../waku/waku_api/jsonrpc/store/handlers as rpc_store_api,
../../waku/waku_archive,
../../waku/waku_dnsdisc,
../../waku/waku_enr,
../../waku/waku_discv5,
../../waku/waku_peer_exchange,
../../waku/waku_rln_relay,
../../waku/waku_store,
../../waku/waku_lightpush,
../../waku/waku_filter,
../../waku/waku_filter_v2,
./wakunode2_validator_signed,
./internal_config,
./external_config

logScope:
topics = "wakunode app"
Expand Down Expand Up @@ -576,8 +575,20 @@ proc startRestServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNo

## Relay REST API
if conf.relay:
let relayCache = TopicCache.init(capacity=conf.restRelayCacheCapacity)
installRelayApiHandlers(server.router, app.node, relayCache)
let cache = MessageCache[string].init(capacity=conf.restRelayCacheCapacity)

let handler = messageCacheHandler(cache)
let autoHandler = autoMessageCacheHandler(cache)

for pubsubTopic in conf.pubsubTopics:
cache.subscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))

for contentTopic in conf.contentTopics:
cache.subscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))

installRelayApiHandlers(server.router, app.node, cache)

## Filter REST API
if conf.filter:
Expand Down Expand Up @@ -610,8 +621,20 @@ proc startRpcServer(app: App, address: ValidIpAddress, port: Port, conf: WakuNod
installDebugApiHandlers(app.node, server)

if conf.relay:
let relayMessageCache = rpc_relay_api.MessageCache.init(capacity=30)
installRelayApiHandlers(app.node, server, relayMessageCache)
let cache = MessageCache[string].init(capacity=30)

let handler = messageCacheHandler(cache)
let autoHandler = autoMessageCacheHandler(cache)

for pubsubTopic in conf.pubsubTopics:
cache.subscribe(pubsubTopic)
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))

for contentTopic in conf.contentTopics:
cache.subscribe(contentTopic)
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(autoHandler))

installRelayApiHandlers(app.node, server, cache)

if conf.filternode != "":
let filterMessageCache = rpc_filter_api.MessageCache.init(capacity=30)
Expand Down
2 changes: 1 addition & 1 deletion examples/publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
contentTopic: contentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: now()) # current timestamp
await node.publish(pubSubTopic, message)
await node.publish(some(pubSubTopic), message)
notice "published message", text = text, timestamp = message.timestamp, psTopic = pubSubTopic, contentTopic = contentTopic
await sleepAsync(5000)

Expand Down
2 changes: 1 addition & 1 deletion examples/subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
pubsubTopic=pubsubTopic,
contentTopic=msg.contentTopic,
timestamp=msg.timestamp
node.subscribe(pubSubTopic, handler)
node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))

when isMainModule:
let rng = crypto.newRng()
Expand Down
6 changes: 3 additions & 3 deletions tests/test_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ procSuite "Peer Exchange":
peerExchangeHandler = handlePeerExchange
emptyHandler = ignorePeerExchange

await node1.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(emptyHandler))
await node2.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(emptyHandler))
await node3.mountRelay(topics = @[DefaultPubsubTopic], peerExchangeHandler = some(peerExchangeHandler))
await node1.mountRelay(@[DefaultPubsubTopic], some(emptyHandler))
await node2.mountRelay(@[DefaultPubsubTopic], some(emptyHandler))
await node3.mountRelay(@[DefaultPubsubTopic], some(peerExchangeHandler))

# Ensure that node1 prunes all peers after the first connection
node1.wakuRelay.parameters.dHigh = 1
Expand Down
18 changes: 9 additions & 9 deletions tests/test_waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,9 @@ procSuite "Waku Discovery v5":
asyncSpawn node.subscriptionsListener(queue)

## Then
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3))
queue.emit((kind: PubsubSub, topic: shard1))
queue.emit((kind: PubsubSub, topic: shard2))
queue.emit((kind: PubsubSub, topic: shard3))

await sleepAsync(1.seconds)

Expand All @@ -426,9 +426,9 @@ procSuite "Waku Discovery v5":
node.protocol.localNode.record.containsShard(shard2) == true
node.protocol.localNode.record.containsShard(shard3) == true

queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3))
queue.emit((kind: PubsubSub, topic: shard1))
queue.emit((kind: PubsubSub, topic: shard2))
queue.emit((kind: PubsubSub, topic: shard3))

await sleepAsync(1.seconds)

Expand All @@ -437,9 +437,9 @@ procSuite "Waku Discovery v5":
node.protocol.localNode.record.containsShard(shard2) == true
node.protocol.localNode.record.containsShard(shard3) == true

queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard3))
queue.emit((kind: PubsubUnsub, topic: shard1))
queue.emit((kind: PubsubUnsub, topic: shard2))
queue.emit((kind: PubsubUnsub, topic: shard3))

await sleepAsync(1.seconds)

Expand Down
4 changes: 2 additions & 2 deletions tests/test_wakunode.nim
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ suite "WakuNode":
msg.payload == payload
completionFut.complete(true)

node2.subscribe(pubSubTopic, relayHandler)
node2.subscribe((kind: PubsubSub, topic: pubsubTopic), some(relayHandler))
await sleepAsync(2000.millis)

await node1.publish(pubSubTopic, message)
await node1.publish(some(pubSubTopic), message)
await sleepAsync(2000.millis)

check:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_wakunode_lightpush.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ suite "WakuNode - Lightpush":
topic == DefaultPubsubTopic
msg == message
completionFutRelay.complete(true)
destNode.subscribe(DefaultPubsubTopic, relayHandler)
destNode.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(relayHandler))

# Wait for subscription to take effect
await sleepAsync(100.millis)
Expand Down
18 changes: 9 additions & 9 deletions tests/waku_relay/test_waku_relay.nim
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ suite "Waku Relay":
networkB = "test-network2"

## when
nodeA.subscribe(networkA, noopRawHandler())
nodeA.subscribe(networkB, noopRawHandler())
discard nodeA.subscribe(networkA, noopRawHandler())
discard nodeA.subscribe(networkB, noopRawHandler())

## Then
check:
Expand All @@ -73,9 +73,9 @@ suite "Waku Relay":
networkB = "test-network2"
networkC = "test-network3"

nodeA.subscribe(networkA, noopRawHandler())
nodeA.subscribe(networkB, noopRawHandler())
nodeA.subscribe(networkC, noopRawHandler())
discard nodeA.subscribe(networkA, noopRawHandler())
discard nodeA.subscribe(networkB, noopRawHandler())
discard nodeA.subscribe(networkC, noopRawHandler())

let topics = toSeq(nodeA.subscribedTopics)
require:
Expand All @@ -85,7 +85,7 @@ suite "Waku Relay":
topics.contains(networkC)

## When
nodeA.unsubscribe(networkA)
nodeA.unsubscribeAll(networkA)

## Then
check:
Expand Down Expand Up @@ -129,14 +129,14 @@ suite "Waku Relay":
proc srcSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
srcSubsFut.complete((topic, message))

srcNode.subscribe(networkTopic, srcSubsHandler)
discard srcNode.subscribe(networkTopic, srcSubsHandler)

# Subscription
let dstSubsFut = newFuture[(PubsubTopic, WakuMessage)]()
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
dstSubsFut.complete((topic, message))

dstNode.subscribe(networkTopic, dstSubsHandler)
discard dstNode.subscribe(networkTopic, dstSubsHandler)

await sleepAsync(500.millis)

Expand Down Expand Up @@ -196,7 +196,7 @@ suite "Waku Relay":
proc dstSubsHandler(topic: PubsubTopic, message: WakuMessage) {.async, gcsafe.} =
dstSubsFut.complete((topic, message))

dstNode.subscribe(networkTopic, dstSubsHandler)
discard dstNode.subscribe(networkTopic, dstSubsHandler)

await sleepAsync(500.millis)

Expand Down
Loading

0 comments on commit 1763b1e

Please sign in to comment.