From 975924dd1b6c1e7c67aa49a6ef7a14448571af5d Mon Sep 17 00:00:00 2001 From: Lorenzo Delgado Date: Fri, 28 Apr 2023 00:45:12 +0200 Subject: [PATCH] feat(discv5): added find random nodes with predicate --- apps/wakunode2/app.nim | 10 +- examples/v2/publisher.nim | 6 +- examples/v2/subscriber.nim | 4 +- tests/v2/test_waku_discv5.nim | 442 ++++++++++++++++----------- tests/v2/test_waku_peer_exchange.nim | 4 +- waku/v2/node/waku_node.nim | 60 ++-- waku/v2/waku_discv5.nim | 197 ++++++------ 7 files changed, 413 insertions(+), 310 deletions(-) diff --git a/apps/wakunode2/app.nim b/apps/wakunode2/app.nim index 9a74b4cc1c..fb183923d1 100644 --- a/apps/wakunode2/app.nim +++ b/apps/wakunode2/app.nim @@ -682,13 +682,11 @@ proc startNode(node: WakuNode, conf: WakuNodeConf, except CatchableError: return err("failed to start waku node: " & getCurrentExceptionMsg()) - # Start discv5 and connect to discovered nodes + # Start discv5 based discovery service (discovery loop) if conf.discv5Discovery: - try: - if not await node.startDiscv5(): - error "could not start Discovery v5" - except CatchableError: - return err("failed to start waku discovery v5: " & getCurrentExceptionMsg()) + let startDiscv5Res = await node.startDiscv5() + if startDiscv5Res.isErr(): + return err("failed to start waku discovery v5: " & startDiscv5Res.error) # Connect to configured static nodes if conf.staticnodes.len > 0: diff --git a/examples/v2/publisher.nim b/examples/v2/publisher.nim index 4aca434b50..99f9a81011 100644 --- a/examples/v2/publisher.nim +++ b/examples/v2/publisher.nim @@ -66,8 +66,10 @@ proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} = await node.start() await node.mountRelay() node.peerManager.start() - if not await node.startDiscv5(): - error "failed to start discv5" + + let discv5Res = await node.startDiscv5() + if discv5Res.isErr(): + error "failed to start discv5", error= discv5Res.error quit(1) # wait for a minimum of peers to be connected, otherwise messages wont be gossiped diff --git a/examples/v2/subscriber.nim b/examples/v2/subscriber.nim index 4d87df3756..21b91b6a8c 100644 --- a/examples/v2/subscriber.nim +++ b/examples/v2/subscriber.nim @@ -61,7 +61,9 @@ proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} = await node.start() await node.mountRelay() node.peerManager.start() - if not await node.startDiscv5(): + + let discv5Res = await node.startDiscv5() + if discv5Res.isErr(): error "failed to start discv5" quit(1) diff --git a/tests/v2/test_waku_discv5.nim b/tests/v2/test_waku_discv5.nim index 9d7c53d3af..623e37feac 100644 --- a/tests/v2/test_waku_discv5.nim +++ b/tests/v2/test_waku_discv5.nim @@ -1,201 +1,295 @@ {.used.} import - stew/[results, byteutils], + std/sets, + stew/results, stew/shims/net, chronos, chronicles, testutils/unittests, - libp2p/crypto/crypto, - eth/keys, - eth/p2p/discoveryv5/enr + libp2p/crypto/crypto as libp2p_keys, + eth/keys as eth_keys import ../../waku/v2/waku_node, - ../../waku/v2/waku_core, + ../../waku/v2/waku_enr, ../../waku/v2/waku_discv5, ./testlib/common, ./testlib/wakucore, ./testlib/wakunode + +proc newTestEnrRecord(privKey: libp2p_keys.PrivateKey, + extIp: string, tcpPort: uint16, udpPort: uint16, + flags = none(CapabilitiesBitfield)): waku_enr.Record = + var builder = EnrBuilder.init(privKey) + builder.withIpAddressAndPorts( + ipAddr = some(ValidIpAddress.init(extIp)), + tcpPort = some(Port(tcpPort)), + udpPort = some(Port(udpPort)), + ) + + if flags.isSome(): + builder.withWakuCapabilities(flags.get()) + + builder.build().tryGet() + + +proc newTestDiscv5Node(privKey: libp2p_keys.PrivateKey, + bindIp: string, tcpPort: uint16, udpPort: uint16, + record: waku_enr.Record, + bootstrapRecords = newSeq[waku_enr.Record]()): WakuNode = + let config = WakuDiscoveryV5Config( + privateKey: eth_keys.PrivateKey(privKey.skkey), + address: ValidIpAddress.init(bindIp), + port: Port(udpPort), + bootstrapRecords: bootstrapRecords, + ) + + let protocol = WakuDiscoveryV5.new(rng(), config, some(record)) + let node = newTestWakuNode( + nodeKey = privKey, + bindIp = ValidIpAddress.init(bindIp), + bindPort = Port(tcpPort), + wakuDiscv5 = some(protocol) + ) + + return node + + + procSuite "Waku Discovery v5": - asyncTest "Waku Discovery v5 end-to-end": - ## Tests integrated discovery v5 + asyncTest "find random peers": + ## Given + # Node 1 let - bindIp = ValidIpAddress.init("0.0.0.0") - extIp = ValidIpAddress.init("127.0.0.1") - - nodeKey1 = generateSecp256k1Key() - nodeTcpPort1 = Port(61500) - nodeUdpPort1 = Port(9000) - node1 = newTestWakuNode(nodeKey1, bindIp, nodeTcpPort1) - - nodeKey2 = generateSecp256k1Key() - nodeTcpPort2 = Port(61502) - nodeUdpPort2 = Port(9002) - node2 = newTestWakuNode(nodeKey2, bindIp, nodeTcpPort2) - - nodeKey3 = generateSecp256k1Key() - nodeTcpPort3 = Port(61504) - nodeUdpPort3 = Port(9004) - node3 = newTestWakuNode(nodeKey3, bindIp, nodeTcpPort3) - - flags = CapabilitiesBitfield.init( - lightpush = false, - filter = false, - store = false, - relay = true - ) - - # E2E relay test paramaters - pubSubTopic = "/waku/2/default-waku/proto" - contentTopic = ContentTopic("/waku/2/default-content/proto") - payload = "Can you see me?".toBytes() - message = WakuMessage(payload: payload, contentTopic: contentTopic) - - # Mount discv5 - node1.wakuDiscv5 = WakuDiscoveryV5.new( - some(extIp), some(nodeTcpPort1), some(nodeUdpPort1), - bindIp, - nodeUdpPort1, - newSeq[enr.Record](), - false, - keys.PrivateKey(nodeKey1.skkey), - flags, - newSeq[MultiAddress](), # Empty multiaddr fields, for now - node1.rng - ) - - node2.wakuDiscv5 = WakuDiscoveryV5.new( - some(extIp), some(nodeTcpPort2), some(nodeUdpPort2), - bindIp, - nodeUdpPort2, - @[node1.wakuDiscv5.protocol.localNode.record], # Bootstrap with node1 - false, - keys.PrivateKey(nodeKey2.skkey), - flags, - newSeq[MultiAddress](), # Empty multiaddr fields, for now - node2.rng - ) - - node3.wakuDiscv5 = WakuDiscoveryV5.new( - some(extIp), some(nodeTcpPort3), some(nodeUdpPort3), - bindIp, - nodeUdpPort3, - @[node2.wakuDiscv5.protocol.localNode.record], # Bootstrap with node2 - false, - keys.PrivateKey(nodeKey3.skkey), - flags, - newSeq[MultiAddress](), # Empty multiaddr fields, for now - node3.rng - ) - - await node1.mountRelay() - await node2.mountRelay() - await node3.mountRelay() - - await allFutures([node1.start(), node2.start(), node3.start()]) - - await allFutures([node1.startDiscv5(), node2.startDiscv5(), node3.startDiscv5()]) - - await sleepAsync(3000.millis) # Give the algorithm some time to work its magic + privKey1 = generateSecp256k1Key() + bindIp1 = "0.0.0.0" + extIp1 = "127.0.0.1" + tcpPort1 = 61500u16 + udpPort1 = 9000u16 + + let record1 = newTestEnrRecord( + privKey = privKey1, + extIp = extIp1, + tcpPort = tcpPort1, + udpPort = udpPort1, + ) + let node1 = newTestDiscv5Node( + privKey = privKey1, + bindIp = bindIp1, + tcpPort = tcpPort1, + udpPort = udpPort1, + record = record1 + ) + + # Node 2 + let + privKey2 = generateSecp256k1Key() + bindIp2 = "0.0.0.0" + extIp2 = "127.0.0.1" + tcpPort2 = 61502u16 + udpPort2 = 9002u16 + + let record2 = newTestEnrRecord( + privKey = privKey2, + extIp = extIp2, + tcpPort = tcpPort2, + udpPort = udpPort2, + ) + + let node2 = newTestDiscv5Node( + privKey = privKey2, + bindIp = bindIp2, + tcpPort = tcpPort2, + udpPort = udpPort2, + record = record2, + ) + + # Node 3 + let + privKey3 = generateSecp256k1Key() + bindIp3 = "0.0.0.0" + extIp3 = "127.0.0.1" + tcpPort3 = 61504u16 + udpPort3 = 9004u16 + + let record3 = newTestEnrRecord( + privKey = privKey3, + extIp = extIp3, + tcpPort = tcpPort3, + udpPort = udpPort3, + ) + + let node3 = newTestDiscv5Node( + privKey = privKey3, + bindIp = bindIp3, + tcpPort = tcpPort3, + udpPort = udpPort3, + record = record3, + bootstrapRecords = @[record1, record2] + ) + + await allFutures(node1.start(), node2.start(), node3.start()) + + ## When + # Starting discv5 via `WakuNode.startDiscV5()` starts the discv5 background task. + await allFutures(node1.startDiscv5(), node2.startDiscv5(), node3.startDiscv5()) + + await sleepAsync(1500.millis) # Wait for discv5 to bootstrap + + let res = await node1.wakuDiscv5.findRandomPeers() + + ## Then + check: + node1.wakuDiscv5.protocol.nodesDiscovered == 2 + node2.wakuDiscv5.protocol.nodesDiscovered == 2 + node3.wakuDiscv5.protocol.nodesDiscovered == 2 + check: - node1.wakuDiscv5.protocol.nodesDiscovered > 0 - node2.wakuDiscv5.protocol.nodesDiscovered > 0 - node3.wakuDiscv5.protocol.nodesDiscovered > 0 + res.len == 2 + res.contains(record2) + res.contains(record3) - # Let's see if we can deliver a message end-to-end - # var completionFut = newFuture[bool]() - # proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = - # let msg = WakuMessage.decode(data) - # if msg.isOk(): - # let val = msg.value() - # check: - # topic == pubSubTopic - # val.contentTopic == contentTopic - # val.payload == payload - # completionFut.complete(true) + ## Cleanup + await allFutures(node1.stop(), node2.stop(), node3.stop()) - # node3.subscribe(pubSubTopic, relayHandler) - # await sleepAsync(2000.millis) + asyncTest "find random peers with predicate": + ## Setup + # Records + let + privKey1 = generateSecp256k1Key() + bindIp1 = "0.0.0.0" + extIp1 = "127.0.0.1" + tcpPort1 = 61500u16 + udpPort1 = 9000u16 - # await node1.publish(pubSubTopic, message) + let record1 = newTestEnrRecord( + privKey = privKey1, + extIp = extIp1, + tcpPort = tcpPort1, + udpPort = udpPort1, + flags = some(CapabilitiesBitfield.init(Capabilities.Relay)) + ) - # check: - # (await completionFut.withTimeout(6.seconds)) == true + let + privKey2 = generateSecp256k1Key() + bindIp2 = "0.0.0.0" + extIp2 = "127.0.0.1" + tcpPort2 = 61502u16 + udpPort2 = 9002u16 - await allFutures([node1.stop(), node2.stop(), node3.stop()]) + let record2 = newTestEnrRecord( + privKey = privKey2, + extIp = extIp2, + tcpPort = tcpPort2, + udpPort = udpPort2, + flags = some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)) + ) - asyncTest "Custom multiaddresses are advertised correctly": let - bindIp = ValidIpAddress.init("0.0.0.0") - extIp = ValidIpAddress.init("127.0.0.1") - expectedMultiAddr = MultiAddress.init("/ip4/200.200.200.200/tcp/9000/wss").tryGet() - - flags = CapabilitiesBitfield.init( - lightpush = false, - filter = false, - store = false, - relay = true - ) - - nodeTcpPort1 = Port(9010) - nodeUdpPort1 = Port(9012) - node1Key = generateSecp256k1Key() - node1NetConfig = NetConfig.init(bindIp = bindIp, - extIp = some(extIp), - extPort = some(nodeTcpPort1), - bindPort = nodeTcpPort1, - extmultiAddrs = @[expectedMultiAddr], - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort1)).get() - node1discV5 = WakuDiscoveryV5.new(extIp = node1NetConfig.extIp, - extTcpPort = node1NetConfig.extPort, - extUdpPort = node1NetConfig.discv5UdpPort, - bindIp = node1NetConfig.bindIp, - discv5UdpPort = node1NetConfig.discv5UdpPort.get(), - privateKey = keys.PrivateKey(node1Key.skkey), - multiaddrs = node1NetConfig.enrMultiaddrs, - flags = node1NetConfig.wakuFlags.get(), - rng = rng) - node1 = WakuNode.new(nodekey = node1Key, - netConfig = node1NetConfig, - wakuDiscv5 = some(node1discV5), - rng = rng) - - - nodeTcpPort2 = Port(9014) - nodeUdpPort2 = Port(9016) - node2Key = generateSecp256k1Key() - node2NetConfig = NetConfig.init(bindIp = bindIp, - extIp = some(extIp), - extPort = some(nodeTcpPort2), - bindPort = nodeTcpPort2, - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort2)).get() - node2discV5 = WakuDiscoveryV5.new(extIp = node2NetConfig.extIp, - extTcpPort = node2NetConfig.extPort, - extUdpPort = node2NetConfig.discv5UdpPort, - bindIp = node2NetConfig.bindIp, - discv5UdpPort = node2NetConfig.discv5UdpPort.get(), - bootstrapEnrs = @[node1.wakuDiscv5.protocol.localNode.record], - privateKey = keys.PrivateKey(node2Key.skkey), - flags = node2NetConfig.wakuFlags.get(), - rng = rng) - node2 = WakuNode.new(nodeKey = node2Key, - netConfig = node2NetConfig, - wakuDiscv5 = some(node2discV5)) - - await allFutures([node1.start(), node2.start()]) - - await allFutures([node1.startDiscv5(), node2.startDiscv5()]) - - await sleepAsync(3000.millis) # Give the algorithm some time to work its magic - - let node1Enr = node2.wakuDiscv5.protocol.routingTable.buckets[0].nodes[0].record - let multiaddrs = node1Enr.toTyped().get().multiaddrs.get() + privKey3 = generateSecp256k1Key() + bindIp3 = "0.0.0.0" + extIp3 = "127.0.0.1" + tcpPort3 = 61504u16 + udpPort3 = 9004u16 + + let record3 = newTestEnrRecord( + privKey = privKey3, + extIp = extIp3, + tcpPort = tcpPort3, + udpPort = udpPort3, + flags = some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Filter)) + ) + + let + privKey4 = generateSecp256k1Key() + bindIp4 = "0.0.0.0" + extIp4 = "127.0.0.1" + tcpPort4 = 61506u16 + udpPort4 = 9006u16 + + let record4 = newTestEnrRecord( + privKey = privKey4, + extIp = extIp4, + tcpPort = tcpPort4, + udpPort = udpPort4, + flags = some(CapabilitiesBitfield.init(Capabilities.Relay, Capabilities.Store)) + ) + + + # Nodes + let node1 = newTestDiscv5Node( + privKey = privKey1, + bindIp = bindIp1, + tcpPort = tcpPort1, + udpPort = udpPort1, + record = record1, + bootstrapRecords = @[record2] + ) + let node2 = newTestDiscv5Node( + privKey = privKey2, + bindIp = bindIp2, + tcpPort = tcpPort2, + udpPort = udpPort2, + record = record2, + bootstrapRecords = @[record3, record4] + ) + + let node3 = newTestDiscv5Node( + privKey = privKey3, + bindIp = bindIp3, + tcpPort = tcpPort3, + udpPort = udpPort3, + record = record3 + ) + + let node4 = newTestDiscv5Node( + privKey = privKey4, + bindIp = bindIp4, + tcpPort = tcpPort4, + udpPort = udpPort4, + record = record4 + ) + + # Start nodes' discoveryV5 protocols + require node1.wakuDiscV5.start().isOk() + require node2.wakuDiscV5.start().isOk() + require node3.wakuDiscV5.start().isOk() + require node4.wakuDiscV5.start().isOk() + + await allFutures(node1.start(), node2.start(), node3.start(), node4.start()) + + ## Given + let recordPredicate = proc(record: waku_enr.Record): bool = + let typedRecord = record.toTyped() + if typedRecord.isErr(): + return false + + let capabilities = typedRecord.value.waku2 + if capabilities.isNone(): + return false + + return capabilities.get().supportsCapability(Capabilities.Store) + + + ## When + # Do a random peer search with a predicate multiple times + var peers = initHashSet[waku_enr.Record]() + for i in 0..<10: + for peer in await node1.wakuDiscv5.findRandomPeers(pred=recordPredicate): + peers.incl(peer) + + ## Then + check: + peers.len == 2 + peers.contains(record2) + peers.contains(record4) check: - node1.wakuDiscv5.protocol.nodesDiscovered > 0 - node2.wakuDiscv5.protocol.nodesDiscovered > 0 - multiaddrs.contains(expectedMultiAddr) + node1.wakuDiscv5.protocol.nodesDiscovered == 2 + node2.wakuDiscv5.protocol.nodesDiscovered == 2 + node3.wakuDiscv5.protocol.nodesDiscovered == 2 + node4.wakuDiscv5.protocol.nodesDiscovered == 2 + # Cleanup + await allFutures(node1.stop(), node2.stop(), node3.stop(), node4.stop()) diff --git a/tests/v2/test_waku_peer_exchange.nim b/tests/v2/test_waku_peer_exchange.nim index 0e87239a95..24d928520c 100644 --- a/tests/v2/test_waku_peer_exchange.nim +++ b/tests/v2/test_waku_peer_exchange.nim @@ -123,8 +123,8 @@ procSuite "Waku Peer Exchange": ) ## Given - await allFutures([node1.start(), node2.start(), node3.start()]) - await allFutures([node1.startDiscv5(), node2.startDiscv5()]) + await allFutures(node1.start(), node2.start(), node3.start()) + await allFutures(node1.startDiscv5(), node2.startDiscv5()) var attempts = 10 while (node1.wakuDiscv5.protocol.nodesDiscovered < 1 or diff --git a/waku/v2/node/waku_node.nim b/waku/v2/node/waku_node.nim index ffb0ef663f..bd448c4e7a 100644 --- a/waku/v2/node/waku_node.nim +++ b/waku/v2/node/waku_node.nim @@ -853,59 +853,47 @@ proc startKeepalive*(node: WakuNode) = asyncSpawn node.keepaliveLoop(defaultKeepalive) proc runDiscv5Loop(node: WakuNode) {.async.} = - ## Continuously add newly discovered nodes - ## using Node Discovery v5 - if (node.wakuDiscv5.isNil): + ## Continuously add newly discovered nodes using Node Discovery v5 + if node.wakuDiscv5.isNil(): warn "Trying to run discovery v5 while it's disabled" return - info "Starting discovery loop" + info "starting discv5 discovery loop" while node.wakuDiscv5.listening: - trace "Running discovery loop" - let discoveredPeersRes = await node.wakuDiscv5.findRandomPeers() + trace "running discv5 discovery loop" + let discoveredRecords = await node.wakuDiscv5.findRandomPeers() + let discoveredPeers = discoveredRecords.mapIt(it.toRemotePeerInfo()).filterIt(it.isOk()).mapIt(it.value) - if discoveredPeersRes.isOk: - let discoveredPeers = discoveredPeersRes.get - let newSeen = discoveredPeers.countIt(not node.peerManager.peerStore[AddressBook].contains(it.peerId)) - info "Discovered peers", discovered=discoveredPeers.len, new=newSeen + for peer in discoveredPeers: + let isNew = not node.peerManager.peerStore[AddressBook].contains(peer.peerId) + debug "peer discovered", peer= $peer, origin= "discv5", is_new= $isNew - # Add all peers, new ones and already seen (in case their addresses changed) - for peer in discoveredPeers: - node.peerManager.addPeer(peer, Discv5) + node.peerManager.addPeer(peer, PeerOrigin.Discv5) # Discovery `queryRandom` can have a synchronous fast path for example # when no peers are in the routing table. Don't run it in continuous loop. # - # Also, give some time to dial the discovered nodes and update stats etc + # Also, give some time to dial the discovered nodes and update stats, etc. await sleepAsync(5.seconds) -proc startDiscv5*(node: WakuNode): Future[bool] {.async.} = +proc startDiscv5*(node: WakuNode): Future[Result[void, string]] {.async.} = ## Start Discovery v5 service + if node.wakuDiscv5.isNil(): + return err("discovery v5 is disabled") info "Starting discovery v5 service" + let res = node.wakuDiscv5.start() + if res.isErr(): + return err(res.error) + + trace "Start discovering new peers using discv5" + asyncSpawn node.runDiscv5Loop() + + debug "Successfully started discovery v5 service" + info "Discv5: discoverable ENR ", enr = node.wakuDiscV5.protocol.localNode.record.toUri() + return ok() - if not node.wakuDiscv5.isNil(): - ## First start listening on configured port - try: - trace "Start listening on discv5 port" - node.wakuDiscv5.open() - except CatchableError: - error "Failed to start discovery service. UDP port may be already in use" - return false - - ## Start Discovery v5 - trace "Start discv5 service" - node.wakuDiscv5.start() - trace "Start discovering new peers using discv5" - - asyncSpawn node.runDiscv5Loop() - - debug "Successfully started discovery v5 service" - info "Discv5: discoverable ENR ", enr = node.wakuDiscV5.protocol.localNode.record.toUri() - return true - - return false proc stopDiscv5*(node: WakuNode): Future[bool] {.async.} = ## Stop Discovery v5 service diff --git a/waku/v2/waku_discv5.nim b/waku/v2/waku_discv5.nim index 259dd16335..9e3fd829f7 100644 --- a/waku/v2/waku_discv5.nim +++ b/waku/v2/waku_discv5.nim @@ -4,15 +4,14 @@ else: {.push raises: [].} import - std/[strutils, options], + std/[sequtils, strutils, options], stew/results, stew/shims/net, chronos, chronicles, metrics, libp2p/multiaddress, - eth/keys, - eth/p2p/discoveryv5/enr, + eth/keys as eth_keys, eth/p2p/discoveryv5/node, eth/p2p/discoveryv5/protocol import @@ -29,53 +28,42 @@ logScope: topics = "waku discv5" -type WakuDiscoveryV5* = ref object - protocol*: protocol.Protocol - listening*: bool +## Config +type WakuDiscoveryV5Config* = object + discv5Config*: Option[DiscoveryConfig] + address*: ValidIpAddress + port*: Port + privateKey*: eth_keys.PrivateKey + bootstrapRecords*: seq[waku_enr.Record] + autoupdateRecord*: bool -#################### -# Helper functions # -#################### -proc parseBootstrapAddress(address: string): Result[enr.Record, cstring] = - logScope: - address = address +## Protocol - if address[0] == '/': - return err("MultiAddress bootstrap addresses are not supported") +type WakuDiscv5Predicate* = proc(record: waku_enr.Record): bool {.closure, gcsafe.} - let lowerCaseAddress = toLowerAscii(address) - if lowerCaseAddress.startsWith("enr:"): - var enrRec: enr.Record - if not enrRec.fromURI(address): - return err("Invalid ENR bootstrap record") - - return ok(enrRec) - - elif lowerCaseAddress.startsWith("enode:"): - return err("ENode bootstrap addresses are not supported") - - else: - return err("Ignoring unrecognized bootstrap address type") - -proc addBootstrapNode*(bootstrapAddr: string, - bootstrapEnrs: var seq[enr.Record]) = - # Ignore empty lines or lines starting with # - if bootstrapAddr.len == 0 or bootstrapAddr[0] == '#': - return - - let enrRes = parseBootstrapAddress(bootstrapAddr) - if enrRes.isErr(): - debug "ignoring invalid bootstrap address", reason = enrRes.error - return - - bootstrapEnrs.add(enrRes.value) +type WakuDiscoveryV5* = ref object + conf: WakuDiscoveryV5Config + protocol*: protocol.Protocol + listening*: bool +proc new*(T: type WakuDiscoveryV5, rng: ref HmacDrbgContext, conf: WakuDiscoveryV5Config, record: Option[waku_enr.Record]): T = + let protocol = newProtocol( + rng = rng, + config = conf.discv5Config.get(protocol.defaultDiscoveryConfig), + bindPort = conf.port, + bindIp = conf.address, + privKey = conf.privateKey, + bootstrapRecords = conf.bootstrapRecords, + enrAutoUpdate = conf.autoupdateRecord, + previousRecord = record, + enrIp = none(ValidIpAddress), + enrTcpPort = none(Port), + enrUdpPort = none(Port), + ) -#################### -# Discovery v5 API # -#################### + WakuDiscoveryV5(conf: conf, protocol: protocol, listening: false) proc new*(T: type WakuDiscoveryV5, extIp: Option[ValidIpAddress], @@ -85,49 +73,55 @@ proc new*(T: type WakuDiscoveryV5, discv5UdpPort: Port, bootstrapEnrs = newSeq[enr.Record](), enrAutoUpdate = false, - privateKey: keys.PrivateKey, + privateKey: eth_keys.PrivateKey, flags: CapabilitiesBitfield, multiaddrs = newSeq[MultiAddress](), rng: ref HmacDrbgContext, - discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T = - - # Add the waku capabilities field - var enrInitFields = @[(CapabilitiesEnrField, @[flags.byte])] - - # Add the waku multiaddrs field - if multiaddrs.len > 0: - let value = waku_enr.encodeMultiaddrs(multiaddrs) - enrInitFields.add((MultiaddrEnrField, value)) - - let protocol = newProtocol( - privateKey, - enrIp = extIp, - enrTcpPort = extTcpPort, - enrUdpPort = extUdpPort, - enrInitFields, - bootstrapEnrs, - bindPort = discv5UdpPort, - bindIp = bindIP, - enrAutoUpdate = enrAutoUpdate, - config = discv5Config, - rng = rng + discv5Config: protocol.DiscoveryConfig = protocol.defaultDiscoveryConfig): T {. + deprecated: "use the config and record proc variant instead".}= + + let record = block: + var builder = EnrBuilder.init(privateKey) + builder.withIpAddressAndPorts( + ipAddr = extIp, + tcpPort = extTcpPort, + udpPort = extUdpPort, + ) + builder.withWakuCapabilities(flags) + builder.withMultiaddrs(multiaddrs) + builder.build().expect("Record within size limits") + + let conf = WakuDiscoveryV5Config( + discv5Config: some(discv5Config), + address: bindIP, + port: discv5UdpPort, + privateKey: privateKey, + bootstrapRecords: bootstrapEnrs, + autoupdateRecord: enrAutoUpdate, ) - WakuDiscoveryV5(protocol: protocol, listening: false) + WakuDiscoveryV5.new(rng, conf, some(record)) + -# TODO: Do not raise an exception, return a result -proc open*(wd: WakuDiscoveryV5) {.raises: [CatchableError].} = - debug "Opening Waku discovery v5 ports" +proc start*(wd: WakuDiscoveryV5): Result[void, string] = if wd.listening: - return + return err("already listening") + + # Start listening on configured port + debug "start listening on udp port", address = wd.conf.address, port = wd.conf.port + try: + wd.protocol.open() + except CatchableError: + return err("failed to open udp port: " & getCurrentExceptionMsg()) - wd.protocol.open() wd.listening = true -proc start*(wd: WakuDiscoveryV5) = - debug "starting Waku discovery v5 service" + # Start Discovery v5 + trace "start discv5 service" wd.protocol.start() + ok() + proc closeWait*(wd: WakuDiscoveryV5) {.async.} = debug "closing Waku discovery v5 node" if not wd.listening: @@ -136,26 +130,51 @@ proc closeWait*(wd: WakuDiscoveryV5) {.async.} = wd.listening = false await wd.protocol.closeWait() -proc findRandomPeers*(wd: WakuDiscoveryV5): Future[Result[seq[RemotePeerInfo], cstring]] {.async.} = +proc findRandomPeers*(wd: WakuDiscoveryV5, pred: WakuDiscv5Predicate = nil): Future[seq[waku_enr.Record]] {.async.} = ## Find random peers to connect to using Discovery v5 - - # Query for a random target and collect all discovered nodes let discoveredNodes = await wd.protocol.queryRandom() - ## Filter based on our needs - # let filteredNodes = discoveredNodes.filter(isWakuNode) # Currently only a single predicate - # TODO: consider node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage + var discoveredRecords = discoveredNodes.mapIt(it.record) + + # Filter out nodes that do not match the predicate + if not pred.isNil(): + discoveredRecords = discoveredRecords.filter(pred) + + return discoveredRecords + + +## Helper functions + +proc parseBootstrapAddress(address: string): Result[enr.Record, cstring] = + logScope: + address = address + + if address[0] == '/': + return err("MultiAddress bootstrap addresses are not supported") + + let lowerCaseAddress = toLowerAscii(address) + if lowerCaseAddress.startsWith("enr:"): + var enrRec: enr.Record + if not enrRec.fromURI(address): + return err("Invalid ENR bootstrap record") + + return ok(enrRec) - var discoveredPeers: seq[RemotePeerInfo] + elif lowerCaseAddress.startsWith("enode:"): + return err("ENode bootstrap addresses are not supported") - for node in discoveredNodes: - let res = node.record.toRemotePeerInfo() - if res.isErr(): - error "failed to convert ENR to peer info", enr= $node.record, err=res.error - waku_discv5_errors.inc(labelValues = ["peer_info_failure"]) - continue + else: + return err("Ignoring unrecognized bootstrap address type") - discoveredPeers.add(res.value) +proc addBootstrapNode*(bootstrapAddr: string, + bootstrapEnrs: var seq[enr.Record]) = + # Ignore empty lines or lines starting with # + if bootstrapAddr.len == 0 or bootstrapAddr[0] == '#': + return + let enrRes = parseBootstrapAddress(bootstrapAddr) + if enrRes.isErr(): + debug "ignoring invalid bootstrap address", reason = enrRes.error + return - return ok(discoveredPeers) + bootstrapEnrs.add(enrRes.value)