Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(discv5): update filter predicate #1918

Merged
merged 2 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ proc startApp*(app: App): Future[AppResult[void]] {.async.} =
if res.isErr():
return err("failed to start waku discovery v5: " & $res.error)

asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager, some(app.record))
asyncSpawn wakuDiscv5.searchLoop(app.node.peerManager)
asyncSpawn wakuDiscv5.subscriptionsListener(app.node.topicSubscriptionQueue)

return await startNode(
Expand Down
2 changes: 1 addition & 1 deletion examples/publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
error "failed to start discv5", error= discv5Res.error
quit(1)

asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr))
asyncSpawn wakuDiscv5.searchLoop(node.peerManager)

# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
Expand Down
2 changes: 1 addition & 1 deletion examples/subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
error "failed to start discv5", error = discv5Res.error
quit(1)

asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr))
asyncSpawn wakuDiscv5.searchLoop(node.peerManager)

# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_waku_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ procSuite "Waku Peer Exchange":
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
let resultDisc2StartRes = disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
asyncSpawn disc1.searchLoop(node1.peerManager, none(enr.Record))
asyncSpawn disc2.searchLoop(node2.peerManager, none(enr.Record))
asyncSpawn disc1.searchLoop(node1.peerManager)
asyncSpawn disc2.searchLoop(node2.peerManager)

## When
var attempts = 10
Expand Down
86 changes: 48 additions & 38 deletions waku/waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ type WakuDiscoveryV5* = ref object
conf: WakuDiscoveryV5Config
protocol*: protocol.Protocol
listening*: bool
predicate: Option[WakuDiscv5Predicate]

proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
## Filter peers based on relay sharding information

let typeRecordRes = record.toTyped()
let typedRecord =
if typeRecordRes.isErr():
debug "peer filtering failed", reason= $typeRecordRes.error
return none(WakuDiscv5Predicate)
else: typeRecordRes.get()

let nodeShardOp = typedRecord.relaySharding()
let nodeShard =
if nodeShardOp.isNone():
debug "no relay sharding information, peer filtering disabled"
return none(WakuDiscv5Predicate)
else: nodeShardOp.get()

debug "peer filtering updated"

let predicate = proc(record: waku_enr.Record): bool =
nodeShard.indices.anyIt(record.containsShard(nodeShard.cluster, it))

return some(predicate)

proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscoveryV5Config, record: Option[waku_enr.Record]): T =
let protocol = newProtocol(
Expand All @@ -64,7 +89,13 @@ proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscovery
enrUdpPort = none(Port),
)

WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false)
let shardPredOp =
if record.isSome():
shardingPredicate(record.get())
else:
none(WakuDiscv5Predicate)

WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false, predicate: shardPredOp)

proc new*(T: type WakuDiscoveryV5,
extIp: Option[ValidIpAddress],
Expand Down Expand Up @@ -195,57 +226,29 @@ proc updateENRShards(wd: WakuDiscoveryV5,

return ok()

proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
## Filter peers based on relay sharding information

let typeRecordRes = record.toTyped()
let typedRecord =
if typeRecordRes.isErr():
debug "peer filtering failed", reason= $typeRecordRes.error
return none(WakuDiscv5Predicate)
else: typeRecordRes.get()

let nodeShardOp = typedRecord.relaySharding()
let nodeShard =
if nodeShardOp.isNone():
debug "no relay sharding information, peer filtering disabled"
return none(WakuDiscv5Predicate)
else: nodeShardOp.get()

debug "peer filtering enabled"

let predicate = proc(record: waku_enr.Record): bool =
nodeShard.indices.anyIt(record.containsShard(nodeShard.cluster, it))

return some(predicate)

proc findRandomPeers*(wd: WakuDiscoveryV5, pred = none(WakuDiscv5Predicate)): Future[seq[waku_enr.Record]] {.async.} =
proc findRandomPeers*(wd: WakuDiscoveryV5, overridePred = none(WakuDiscv5Predicate)): Future[seq[waku_enr.Record]] {.async.} =
## Find random peers to connect to using Discovery v5
let discoveredNodes = await wd.protocol.queryRandom()

var discoveredRecords = discoveredNodes.mapIt(it.record)

# Filter out nodes that do not match the predicate
if pred.isSome():
discoveredRecords = discoveredRecords.filter(pred.get())
if overridePred.isSome():
discoveredRecords = discoveredRecords.filter(overridePred.get())
elif wd.predicate.isSome():
discoveredRecords = discoveredRecords.filter(wd.predicate.get())

return discoveredRecords

#TODO abstract away PeerManager
proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager, record: Option[enr.Record]) {.async.} =
proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager) {.async.} =
## Continuously add newly discovered nodes

info "Starting discovery v5 search"

let shardPredOp =
if record.isSome():
shardingPredicate(record.get())
else:
none(WakuDiscv5Predicate)

while wd.listening:
trace "running discv5 discovery loop"
let discoveredRecords = await wd.findRandomPeers(shardPredOp)
let discoveredRecords = await wd.findRandomPeers()
let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value)

for peer in discoveredPeers:
Expand Down Expand Up @@ -305,6 +308,9 @@ proc subscriptionsListener*(wd: WakuDiscoveryV5, topicSubscriptionQueue: AsyncEv
let subs = events.filterIt(it.kind == SubscriptionKind.PubsubSub).mapIt(it.pubsubSub)
let unsubs = events.filterIt(it.kind == SubscriptionKind.PubsubUnsub).mapIt(it.pubsubUnsub)

if subs.len == 0 and unsubs.len == 0:
continue

let unsubRes = wd.updateENRShards(unsubs, false)
let subRes = wd.updateENRShards(subs, true)

Expand All @@ -314,8 +320,12 @@ proc subscriptionsListener*(wd: WakuDiscoveryV5, topicSubscriptionQueue: AsyncEv
if unsubRes.isErr():
debug "ENR shard removal failed", reason= $unsubRes.error

if subRes.isOk() and unsubRes.isOk():
debug "ENR updated successfully"
if subRes.isErr() and unsubRes.isErr():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this better be an or ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if either one succeed then add the log message and update the predicate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Failing is not that bad in this case.

continue

debug "ENR updated successfully"
SionoiS marked this conversation as resolved.
Show resolved Hide resolved

wd.predicate = shardingPredicate(wd.protocol.localNode.record)

topicSubscriptionQueue.unregister(key)

Expand Down