Skip to content

Commit

Permalink
feat: topic subscriptions updates discv5 ENR (#1875)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS authored Aug 23, 2023
1 parent c07d63d commit c369b32
Show file tree
Hide file tree
Showing 9 changed files with 244 additions and 45 deletions.
6 changes: 4 additions & 2 deletions apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -541,12 +541,14 @@ proc startNode(node: WakuNode, conf: WakuNodeConf,

proc startApp*(app: App): Future[AppResult[void]] {.async.} =
if app.wakuDiscv5.isSome():
let res = app.wakuDiscv5.get().start()
let wakuDiscv5 = app.wakuDiscv5.get()

let res = wakuDiscv5.start()
if res.isErr():
return err("failed to start waku discovery v5: " & $res.error)

asyncSpawn app.wakuDiscv5.get().searchLoop(app.node.peerManager, some(app.record))
asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager, some(app.record))
asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue)

return await startNode(
app.node,
Expand Down
82 changes: 78 additions & 4 deletions tests/test_waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import
stew/results,
stew/shims/net,
chronos,
chronicles,
testutils/unittests,
libp2p/crypto/crypto as libp2p_keys,
eth/keys as eth_keys
import
../../waku/waku_core/topics,
../../waku/waku_enr,
../../waku/waku_discv5,
./testlib/common,
Expand Down Expand Up @@ -282,7 +284,7 @@ procSuite "Waku Discovery v5":
let gibberish = @["aedyttydcb/uioasduyio", "jhdfsjhlsdfjhk/sadjhk", "khfsd/hjfdsgjh/dfs"]
let empty: seq[string] = @[]

let relayShards = RelayShards.init(0, @[uint16(2), uint16(4), uint16(8)])
let relayShards = RelayShards.init(0, @[uint16(2), uint16(4), uint16(8)]).expect("Valid Shards")

## When

Expand Down Expand Up @@ -314,7 +316,7 @@ procSuite "Waku Discovery v5":
shardCluster: uint16 = 21
shardIndices: seq[uint16] = @[1u16, 2u16, 5u16, 7u16, 9u16, 11u16]

let shards = RelayShards.init(shardCluster, shardIndices)
let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards")

var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
require builder.withWakuRelaySharding(shards).isOk()
Expand All @@ -332,7 +334,7 @@ procSuite "Waku Discovery v5":
shardCluster: uint16 = 22
shardIndices: seq[uint16] = @[2u16, 4u16, 5u16, 8u16, 10u16, 12u16]

let shards = RelayShards.init(shardCluster, shardIndices)
let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards")

var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
require builder.withWakuRelaySharding(shards).isOk()
Expand All @@ -350,7 +352,7 @@ procSuite "Waku Discovery v5":
shardCluster: uint16 = 22
shardIndices: seq[uint16] = @[1u16, 3u16, 6u16, 7u16, 9u16, 11u16]

let shards = RelayShards.init(shardCluster, shardIndices)
let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards")

var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
require builder.withWakuRelaySharding(shards).isOk()
Expand All @@ -377,4 +379,76 @@ procSuite "Waku Discovery v5":
predicateCluster22(recordCluster22Indices1) == true
predicateCluster22(recordCluster22Indices2) == false

asyncTest "update ENR from subscriptions":
## Given
let
shard1 = "/waku/2/rs/0/1"
shard2 = "/waku/2/rs/0/2"
shard3 = "/waku/2/rs/0/3"
privKey = generateSecp256k1Key()
bindIp = "0.0.0.0"
extIp = "127.0.0.1"
tcpPort = 61500u16
udpPort = 9000u16

let record = newTestEnrRecord(
privKey = privKey,
extIp = extIp,
tcpPort = tcpPort,
udpPort = udpPort,
)

let node = newTestDiscv5(
privKey = privKey,
bindIp = bindIp,
tcpPort = tcpPort,
udpPort = udpPort,
record = record
)

let res = node.start()
assert res.isOk(), res.error

let queue = newAsyncEventQueue[SubscriptionEvent](0)

## When
asyncSpawn node.subscriptionsListener(queue)

## Then
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3))

await sleepAsync(1.seconds)

check:
node.protocol.localNode.record.containsShard(shard1) == true
node.protocol.localNode.record.containsShard(shard2) == true
node.protocol.localNode.record.containsShard(shard3) == true

queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: shard3))

await sleepAsync(1.seconds)

check:
node.protocol.localNode.record.containsShard(shard1) == true
node.protocol.localNode.record.containsShard(shard2) == true
node.protocol.localNode.record.containsShard(shard3) == true

queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard1))
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard2))
queue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: shard3))

await sleepAsync(1.seconds)

check:
node.protocol.localNode.record.containsShard(shard1) == false
node.protocol.localNode.record.containsShard(shard2) == false
node.protocol.localNode.record.containsShard(shard3) == false

## Cleanup
await node.stop()


24 changes: 14 additions & 10 deletions tests/test_waku_enr.nim
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,10 @@ suite "Waku ENR - Relay static sharding":
shardIndex: uint16 = 1024

## When
expect Defect:
discard RelayShards.init(shardCluster, shardIndex)
let res = RelayShards.init(shardCluster, shardIndex)

## Then
assert res.isErr(), $res.get()

test "new relay shards field with single invalid index in list":
## Given
Expand All @@ -272,8 +274,10 @@ suite "Waku ENR - Relay static sharding":
shardIndices: seq[uint16] = @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16, 1024u16]

## When
expect Defect:
discard RelayShards.init(shardCluster, shardIndices)
let res = RelayShards.init(shardCluster, shardIndices)

## Then
assert res.isErr(), $res.get()

test "new relay shards field with single valid index":
## Given
Expand All @@ -284,7 +288,7 @@ suite "Waku ENR - Relay static sharding":
let topic = NsPubsubTopic.staticSharding(shardCluster, shardIndex)

## When
let shards = RelayShards.init(shardCluster, shardIndex)
let shards = RelayShards.init(shardCluster, shardIndex).expect("Valid Shards")

## Then
check:
Expand All @@ -310,7 +314,7 @@ suite "Waku ENR - Relay static sharding":
shardIndices: seq[uint16] = @[1u16, 2u16, 2u16, 3u16, 3u16, 3u16]

## When
let shards = RelayShards.init(shardCluster, shardIndices)
let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards")

## Then
check:
Expand Down Expand Up @@ -344,7 +348,7 @@ suite "Waku ENR - Relay static sharding":
shardCluster: uint16 = 22
shardIndices: seq[uint16] = @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16]

let shards = RelayShards.init(shardCluster, shardIndices)
let shards = RelayShards.init(shardCluster, shardIndices).expect("Valid Shards")

## When
var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
Expand All @@ -370,7 +374,7 @@ suite "Waku ENR - Relay static sharding":
enrSeqNum = 1u64
enrPrivKey = generatesecp256k1key()

let shards = RelayShards.init(33, toSeq(0u16 ..< 64u16))
let shards = RelayShards.init(33, toSeq(0u16 ..< 64u16)).expect("Valid Shards")

var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
require builder.withWakuRelaySharding(shards).isOk()
Expand Down Expand Up @@ -398,8 +402,8 @@ suite "Waku ENR - Relay static sharding":
enrPrivKey = generatesecp256k1key()

let
shardsIndicesList = RelayShards.init(22, @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16])
shardsBitVector = RelayShards.init(33, @[13u16, 24u16, 37u16, 61u16, 98u16, 159u16])
shardsIndicesList = RelayShards.init(22, @[1u16, 1u16, 2u16, 3u16, 5u16, 8u16]).expect("Valid Shards")
shardsBitVector = RelayShards.init(33, @[13u16, 24u16, 37u16, 61u16, 98u16, 159u16]).expect("Valid Shards")


var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum)
Expand Down
9 changes: 5 additions & 4 deletions waku/node/jsonrpc/relay/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,9 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
debug "post_waku_v2_relay_v1_subscriptions"

# Subscribe to all requested topics
for topic in topics:
if cache.isSubscribed(topic):
continue
let newTopics = topics.filterIt(not cache.isSubscribed(it))

for topic in newTopics:
cache.subscribe(topic)
node.subscribe(topic, topicHandler)

Expand All @@ -70,7 +69,9 @@ proc installRelayApiHandlers*(node: WakuNode, server: RpcServer, cache: MessageC
debug "delete_waku_v2_relay_v1_subscriptions"

# Unsubscribe all handlers from requested topics
for topic in topics:
let subscribedTopics = topics.filterIt(cache.isSubscribed(it))

for topic in subscribedTopics:
node.unsubscribe(topic)
cache.unsubscribe(topic)

Expand Down
15 changes: 7 additions & 8 deletions waku/node/rest/relay/handlers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,12 @@ proc installRelayPostSubscriptionsV1Handler*(router: var RestRouter, node: WakuN

let req: RelayPostSubscriptionsRequest = reqResult.get()

for topic in req:
if cache.isSubscribed(string(topic)):
# Only subscribe to topics for which we have no subscribed topic handlers yet
continue
# Only subscribe to topics for which we have no subscribed topic handlers yet
let newTopics = req.filterIt(not cache.isSubscribed(it))

cache.subscribe(string(topic))
node.subscribe(string(topic), cache.messageHandler())
for topic in newTopics:
cache.subscribe(topic)
node.subscribe(topic, cache.messageHandler())

return RestApiResponse.ok()

Expand All @@ -88,8 +87,8 @@ proc installRelayDeleteSubscriptionsV1Handler*(router: var RestRouter, node: Wak

# Unsubscribe all handlers from requested topics
for topic in req:
node.unsubscribe(string(topic))
cache.unsubscribe(string(topic))
node.unsubscribe(topic)
cache.unsubscribe(topic)

# Successfully unsubscribed from all requested topics
return RestApiResponse.ok()
Expand Down
7 changes: 7 additions & 0 deletions waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ type
rendezvous*: RendezVous
announcedAddresses* : seq[MultiAddress]
started*: bool # Indicates that node has started listening
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]

proc getAutonatService*(rng: ref HmacDrbgContext): AutonatService =
## AutonatService request other peers to dial us back
Expand Down Expand Up @@ -141,12 +142,15 @@ proc new*(T: type WakuNode,

info "Initializing networking", addrs= $netConfig.announcedAddresses

let queue = newAsyncEventQueue[SubscriptionEvent](30)

return WakuNode(
peerManager: peerManager,
switch: switch,
rng: rng,
enr: enr,
announcedAddresses: netConfig.announcedAddresses,
topicSubscriptionQueue: queue
)

proc peerInfo*(node: WakuNode): PeerInfo =
Expand Down Expand Up @@ -229,6 +233,7 @@ proc subscribe*(node: WakuNode, topic: PubsubTopic) =

debug "subscribe", pubsubTopic= topic

node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: topic))
node.registerRelayDefaultHandler(topic)

proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) =
Expand All @@ -240,6 +245,7 @@ proc subscribe*(node: WakuNode, topic: PubsubTopic, handler: WakuRelayHandler) =

debug "subscribe", pubsubTopic= topic

node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubSub, pubsubSub: topic))
node.registerRelayDefaultHandler(topic)
node.wakuRelay.subscribe(topic, handler)

Expand All @@ -252,6 +258,7 @@ proc unsubscribe*(node: WakuNode, topic: PubsubTopic) =

info "unsubscribe", pubsubTopic=topic

node.topicSubscriptionQueue.emit(SubscriptionEvent(kind: PubsubUnsub, pubsubUnsub: topic))
node.wakuRelay.unsubscribe(topic)


Expand Down
9 changes: 9 additions & 0 deletions waku/waku_core/topics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,12 @@ export
content_topic,
pubsub_topic,
sharding

type
SubscriptionKind* = enum ContentSub, ContentUnsub, PubsubSub, PubsubUnsub
SubscriptionEvent* = object
case kind*: SubscriptionKind
of PubsubSub: pubsubSub*: string
of ContentSub: contentSub*: string
of PubsubUnsub: pubsubUnsub*: string
of ContentUnsub: contentUnsub*: string
Loading

0 comments on commit c369b32

Please sign in to comment.