Skip to content

Commit

Permalink
update filter predicate
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Aug 18, 2023
1 parent b3bb7a1 commit 94559eb
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 39 deletions.
2 changes: 1 addition & 1 deletion apps/wakunode2/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ proc startApp*(app: App): Future[AppResult[void]] {.async.} =
if res.isErr():
return err("failed to start waku discovery v5: " & $res.error)

asyncSpawn app.wakuDiscv5.get().searchLoop(app.node.peerManager, some(app.record))
asyncSpawn app.wakuDiscv5.get().searchLoop(app.node.peerManager)

return await startNode(
app.node,
Expand Down
2 changes: 1 addition & 1 deletion examples/publisher.nim
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
error "failed to start discv5", error= discv5Res.error
quit(1)

asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr))
asyncSpawn wakuDiscv5.searchLoop(node.peerManager)

# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
Expand Down
2 changes: 1 addition & 1 deletion examples/subscriber.nim
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
error "failed to start discv5", error = discv5Res.error
quit(1)

asyncSpawn wakuDiscv5.searchLoop(node.peerManager, some(node.enr))
asyncSpawn wakuDiscv5.searchLoop(node.peerManager)

# wait for a minimum of peers to be connected, otherwise messages wont be gossiped
while true:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ procSuite "Waku Discovery v5":


## When
let peers = await node1.findRandomPeers(some(recordPredicate))
let peers = await node1.findRandomPeers()

## Then
check:
Expand Down
4 changes: 2 additions & 2 deletions tests/test_waku_peer_exchange.nim
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ procSuite "Waku Peer Exchange":
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
let resultDisc2StartRes = disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
asyncSpawn disc1.searchLoop(node1.peerManager, none(enr.Record))
asyncSpawn disc2.searchLoop(node2.peerManager, none(enr.Record))
asyncSpawn disc1.searchLoop(node1.peerManager)
asyncSpawn disc2.searchLoop(node2.peerManager)

## When
var attempts = 10
Expand Down
137 changes: 106 additions & 31 deletions waku/waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ else:
{.push raises: [].}

import
std/[sequtils, strutils, options],
std/[sequtils, strutils, options, sugar, sets],
stew/results,
stew/shims/net,
chronos,
Expand Down Expand Up @@ -48,6 +48,31 @@ type WakuDiscoveryV5* = ref object
conf: WakuDiscoveryV5Config
protocol*: protocol.Protocol
listening*: bool
predicate: Option[WakuDiscv5Predicate]

proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
## Filter peers based on relay sharding information

let typeRecordRes = record.toTyped()
let typedRecord =
if typeRecordRes.isErr():
debug "peer filtering failed", reason= $typeRecordRes.error
return none(WakuDiscv5Predicate)
else: typeRecordRes.get()

let nodeShardOp = typedRecord.relaySharding()
let nodeShard =
if nodeShardOp.isNone():
debug "no relay sharding information, peer filtering disabled"
return none(WakuDiscv5Predicate)
else: nodeShardOp.get()

debug "peer filtering updated"

let predicate = proc(record: waku_enr.Record): bool =
nodeShard.indices.anyIt(record.containsShard(nodeShard.cluster, it))

return some(predicate)

proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscoveryV5Config, record: Option[waku_enr.Record]): T =
let protocol = newProtocol(
Expand All @@ -64,7 +89,13 @@ proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscovery
enrUdpPort = none(Port),
)

WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false)
let shardPredOp =
if record.isSome():
shardingPredicate(record.get())
else:
none(WakuDiscv5Predicate)

WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false, predicate: shardPredOp)

proc new*(T: type WakuDiscoveryV5,
extIp: Option[ValidIpAddress],
Expand Down Expand Up @@ -122,57 +153,101 @@ proc new*(T: type WakuDiscoveryV5,

WakuDiscoveryV5.new(rng, conf, some(record))

proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] =
## Filter peers based on relay sharding information
proc updateENR(wd: WakuDiscoveryV5, shards: RelayShards): Result[void, string] =
let (field, value) =
if shards.indices.len >= 64:
(ShardingBitVectorEnrField, shards.toBitVector())
else:
let listRes = shards.toIndicesList()
let list =
if listRes.isErr():
return err($listRes)
else: listRes.get()

let typeRecordRes = record.toTyped()
let typedRecord =
if typeRecordRes.isErr():
debug "peer filtering failed", reason= $typeRecordRes.error
return none(WakuDiscv5Predicate)
else: typeRecordRes.get()
(ShardingIndicesListEnrField, list)

let nodeShardOp = typedRecord.relaySharding()
let nodeShard =
if nodeShardOp.isNone():
debug "no relay sharding information, peer filtering disabled"
return none(WakuDiscv5Predicate)
else: nodeShardOp.get()
let updateRes = wd.protocol.updateRecord([(field, value)])
if updateRes.isErr():
return err($updateRes)

debug "peer filtering enabled"
proc enrSharding(wd: WakuDiscoveryV5, newTopics: seq[PubsubTopic], add: bool): Result[void, string] =
let newShardOp = ?topicsToRelayShards(newTopics)

let predicate = proc(record: waku_enr.Record): bool =
nodeShard.indices.anyIt(record.containsShard(nodeShard.cluster, it))
#[ let newShardOp =
if newShardRes.isErr():
return err($newShardRes.error)
else: newShardRes.get() ]#

return some(predicate)
let newShard =
if newShardOp.isSome():
newShardOp.get()
else:
return ok()

let res = wd.protocol.localNode.record.toTyped()
let req =
if res.isErr():
return err($res.error)
else: res.get()

let oldShardsOp = req.relaySharding()

proc findRandomPeers*(wd: WakuDiscoveryV5, pred = none(WakuDiscv5Predicate)): Future[seq[waku_enr.Record]] {.async.} =
let resultShard =
if add and oldShardsOp.isSome():
let oldShard = oldShardsOp.get()

if oldShard.cluster != newShard.cluster:
return err("ENR are limited to one shard cluster")

RelayShards.init(oldShard.cluster, oldShard.indices & newShard.indices)
elif not add and oldShardsOp.isSome():
let oldShard = oldShardsOp.get()

if oldShard.cluster != newShard.cluster:
return err("ENR are limited to one shard cluster")

let oldSet = toHashSet(oldShard.indices)
let newSet = toHashSet(newShard.indices)

let indices = toSeq(oldset - newSet)

RelayShards.init(oldShard.cluster, indices)
elif add and oldShardsOp.isNone(): newShard
else: return ok()

?wd.updateENR(resultShard)

wd.predicate = shardingPredicate(wd.protocol.localNode.record)

return ok()

proc addENRShards*(wd: WakuDiscoveryV5, newTopics: seq[PubsubTopic]): Result[void, string] =
wd.enrSharding(newTopics, true)

proc removeENRShards*(wd: WakuDiscoveryV5, newTopics: seq[PubsubTopic]): Result[void, string] =
wd.enrSharding(newTopics, false)

proc findRandomPeers*(wd: WakuDiscoveryV5): Future[seq[waku_enr.Record]] {.async.} =
## Find random peers to connect to using Discovery v5
let discoveredNodes = await wd.protocol.queryRandom()

var discoveredRecords = discoveredNodes.mapIt(it.record)

# Filter out nodes that do not match the predicate
if pred.isSome():
discoveredRecords = discoveredRecords.filter(pred.get())
if wd.predicate.isSome():
discoveredRecords = discoveredRecords.filter(wd.predicate.get())

return discoveredRecords

#TODO abstract away PeerManager
proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager, record: Option[enr.Record]) {.async.} =
proc searchLoop*(wd: WakuDiscoveryV5, peerManager: PeerManager) {.async.} =
## Continuously add newly discovered nodes

info "Starting discovery v5 search"

let shardPredOp =
if record.isSome():
shardingPredicate(record.get())
else:
none(WakuDiscv5Predicate)

while wd.listening:
trace "running discv5 discovery loop"
let discoveredRecords = await wd.findRandomPeers(shardPredOp)
let discoveredRecords = await wd.findRandomPeers()
let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value)

for peer in discoveredPeers:
Expand Down
4 changes: 2 additions & 2 deletions waku/waku_enr/sharding.nim
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func contains*(rs: RelayShards, topic: PubsubTopic|string): bool =

# ENR builder extension

func toIndicesList(rs: RelayShards): EnrResult[seq[byte]] =
func toIndicesList*(rs: RelayShards): EnrResult[seq[byte]] =
if rs.indices.len > high(uint8).int:
return err("indices list too long")

Expand Down Expand Up @@ -137,7 +137,7 @@ func fromIndicesList(buf: seq[byte]): Result[RelayShards, string] =

ok(RelayShards(cluster: cluster, indices: indices))

func toBitVector(rs: RelayShards): seq[byte] =
func toBitVector*(rs: RelayShards): seq[byte] =
## The value is comprised of a two-byte shard cluster index in network byte
## order concatenated with a 128-byte wide bit vector. The bit vector
## indicates which shards of the respective shard cluster the node is part
Expand Down

0 comments on commit 94559eb

Please sign in to comment.