From a4da87bb8c768e28cef8d0656c2dc308c7d660ee Mon Sep 17 00:00:00 2001 From: Simon-Pierre Vivier Date: Tue, 20 Jun 2023 12:08:10 -0400 Subject: [PATCH] feat: discovery peer filtering for relay shard (#1804) Add discv6 predicate that filter peer by static shard. Co-authored-by: Hanno Cornelius <68783915+jm-clius@users.noreply.github.com> --- tests/v2/test_waku_discv5.nim | 79 ++++++++++++++++++++++++++++++++++- waku/v2/node/waku_node.nim | 4 +- waku/v2/waku_discv5.nim | 30 +++++++++++-- 3 files changed, 107 insertions(+), 6 deletions(-) diff --git a/tests/v2/test_waku_discv5.nim b/tests/v2/test_waku_discv5.nim index 7820fee0c5..d868949e71 100644 --- a/tests/v2/test_waku_discv5.nim +++ b/tests/v2/test_waku_discv5.nim @@ -251,7 +251,7 @@ procSuite "Waku Discovery v5": await allFutures(node1.start(), node2.start(), node3.start(), node4.start()) ## Given - let recordPredicate = proc(record: waku_enr.Record): bool = + let recordPredicate: WakuDiscv5Predicate = proc(record: waku_enr.Record): bool = let typedRecord = record.toTyped() if typedRecord.isErr(): return false @@ -270,7 +270,7 @@ procSuite "Waku Discovery v5": # for peer in await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate): # peers.incl(peer) await sleepAsync(5.seconds) # Wait for discv5 discvery loop to run - let peers = await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate) + let peers = await node1.wakuDiscv5.findRandomPeers(some(recordPredicate)) ## Then check: @@ -309,3 +309,78 @@ procSuite "Waku Discovery v5": assert emptyRes.isOk(), emptyRes.error assert emptyRes.value.isNone(), $emptyRes.value + asyncTest "filter peer per static shard": + ## Given + let recordCluster21 = block: + let + enrSeqNum = 1u64 + enrPrivKey = generatesecp256k1key() + + let + shardCluster: uint16 = 21 + shardIndices: seq[uint16] = @[1u16, 2u16, 5u16, 7u16, 9u16, 11u16] + + let shards = RelayShards.init(shardCluster, shardIndices) + + var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) + require builder.withWakuRelaySharding(shards).isOk() + + let recordRes = builder.build() + require recordRes.isOk() + recordRes.tryGet() + + let recordCluster22Indices1 = block: + let + enrSeqNum = 1u64 + enrPrivKey = generatesecp256k1key() + + let + shardCluster: uint16 = 22 + shardIndices: seq[uint16] = @[2u16, 4u16, 5u16, 8u16, 10u16, 12u16] + + let shards = RelayShards.init(shardCluster, shardIndices) + + var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) + require builder.withWakuRelaySharding(shards).isOk() + + let recordRes = builder.build() + require recordRes.isOk() + recordRes.tryGet() + + let recordCluster22Indices2 = block: + let + enrSeqNum = 1u64 + enrPrivKey = generatesecp256k1key() + + let + shardCluster: uint16 = 22 + shardIndices: seq[uint16] = @[1u16, 3u16, 6u16, 7u16, 9u16, 11u16] + + let shards = RelayShards.init(shardCluster, shardIndices) + + var builder = EnrBuilder.init(enrPrivKey, seqNum = enrSeqNum) + require builder.withWakuRelaySharding(shards).isOk() + + let recordRes = builder.build() + require recordRes.isOk() + recordRes.tryGet() + + ## When + let predicateCluster21Op = shardingPredicate(recordCluster21) + require predicateCluster21Op.isSome() + let predicateCluster21 = predicateCluster21Op.get() + + let predicateCluster22Op = shardingPredicate(recordCluster22Indices1) + require predicateCluster22Op.isSome() + let predicateCluster22 = predicateCluster22Op.get() + + ## Then + check: + predicateCluster21(recordCluster21) == true + predicateCluster21(recordCluster22Indices1) == false + predicateCluster21(recordCluster22Indices2) == false + predicateCluster22(recordCluster21) == false + predicateCluster22(recordCluster22Indices1) == true + predicateCluster22(recordCluster22Indices2) == false + + diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index 2dde32f7b1..8a663ca14b 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -844,9 +844,11 @@ proc runDiscv5Loop(node: WakuNode) {.async.} = info "starting discv5 discovery loop" + let shardPredOp = shardingPredicate(node.enr) + while node.wakuDiscv5.listening: trace "running discv5 discovery loop" - let discoveredRecords = await node.wakuDiscv5.findRandomPeers() + let discoveredRecords = await node.wakuDiscv5.findRandomPeers(shardPredOp) let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value) for peer in discoveredPeers: diff --git a/waku/v2/waku_discv5.nim b/waku/v2/waku_discv5.nim index 41eb979137..7f5637f125 100644 --- a/waku/v2/waku_discv5.nim +++ b/waku/v2/waku_discv5.nim @@ -170,15 +170,39 @@ proc closeWait*(wd: WakuDiscoveryV5) {.async.} = wd.listening = false await wd.protocol.closeWait() -proc findRandomPeers*(wd: WakuDiscoveryV5, pred: WakuDiscv5Predicate = nil): Future[seq[waku_enr.Record]] {.async.} = +proc shardingPredicate*(record: Record): Option[WakuDiscv5Predicate] = + ## Filter peers based on relay sharding information + + let typeRecordRes = record.toTyped() + let typedRecord = + if typeRecordRes.isErr(): + debug "peer filtering failed", reason= $typeRecordRes.error + return none(WakuDiscv5Predicate) + else: typeRecordRes.get() + + let nodeShardOp = typedRecord.relaySharding() + let nodeShard = + if nodeShardOp.isNone(): + debug "no relay sharding information, peer filtering disabled" + return none(WakuDiscv5Predicate) + else: nodeShardOp.get() + + debug "peer filtering enabled" + + let predicate = proc(record: waku_enr.Record): bool = + nodeShard.indices.anyIt(record.containsShard(nodeShard.cluster, it)) + + return some(predicate) + +proc findRandomPeers*(wd: WakuDiscoveryV5, pred = none(WakuDiscv5Predicate)): Future[seq[waku_enr.Record]] {.async.} = ## Find random peers to connect to using Discovery v5 let discoveredNodes = await wd.protocol.queryRandom() var discoveredRecords = discoveredNodes.mapIt(it.record) # Filter out nodes that do not match the predicate - if not pred.isNil(): - discoveredRecords = discoveredRecords.filter(pred) + if pred.isSome(): + discoveredRecords = discoveredRecords.filter(pred.get()) return discoveredRecords