diff --git a/waku/discovery/discovery_manager.nim b/waku/discovery/discovery_manager.nim deleted file mode 100644 index 9fd239598a..0000000000 --- a/waku/discovery/discovery_manager.nim +++ /dev/null @@ -1,33 +0,0 @@ -when (NimMajor, NimMinor) < (1, 4): - {.push raises: [Defect].} -else: - {.push raises: [].} - -import waku_discv5, ../../waku_core - -## This module contains the logic needed to discover other peers and -## also to make the "self" node discoverable by other peers. - -type DiscoveryManager* = object - wakuDiscv5*: Option[WakuDiscoveryV5] - dynamicBootstrapNodes*: seq[RemotePeerInfo] - -#[ - TODO: in future PRs we will have: - - App* = object - version: string - conf: WakuNodeConf - rng: ref HmacDrbgContext - key: crypto.PrivateKey - - ## in future PRs, the following two items will be encapsulated by 'DiscoveryManager' - wakuDiscv5: Option[WakuDiscoveryV5] <-- this will get removed - dynamicBootstrapNodes: seq[RemotePeerInfo] <-- this will get removed - - node: WakuNode <-- this will contain a discManager instance - - restServer: Option[WakuRestServerRef] - metricsServer: Option[MetricsHttpServerRef] - - ]# diff --git a/waku/discovery/waku_discv5.nim b/waku/discovery/waku_discv5.nim index 1637c35049..85f3658294 100644 --- a/waku/discovery/waku_discv5.nim +++ b/waku/discovery/waku_discv5.nim @@ -14,7 +14,11 @@ import eth/keys as eth_keys, eth/p2p/discoveryv5/node, eth/p2p/discoveryv5/protocol -import ../node/peer_manager/peer_manager, ../waku_core, ../waku_enr +import + ../node/peer_manager/peer_manager, + ../waku_core, + ../waku_enr, + ../factory/external_config export protocol, waku_enr @@ -244,7 +248,7 @@ proc subscriptionsListener(wd: WakuDiscoveryV5) {.async.} = wd.topicSubscriptionQueue.unregister(key) -proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async.} = +proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async: (raises: []).} = if wd.listening: return err("already listening") @@ -313,3 +317,42 @@ proc addBootstrapNode*(bootstrapAddr: string, bootstrapEnrs: var seq[enr.Record] return bootstrapEnrs.add(enrRes.value) + +proc setupDiscoveryV5*( + myENR: enr.Record, + nodePeerManager: PeerManager, + nodeTopicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent], + conf: WakuNodeConf, + dynamicBootstrapNodes: seq[RemotePeerInfo], + rng: ref HmacDrbgContext, + key: crypto.PrivateKey, +): WakuDiscoveryV5 = + let dynamicBootstrapEnrs = + dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get()) + + var discv5BootstrapEnrs: seq[enr.Record] + + # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq + for enrUri in conf.discv5BootstrapNodes: + addBootstrapNode(enrUri, discv5BootstrapEnrs) + + discv5BootstrapEnrs.add(dynamicBootstrapEnrs) + + let discv5Config = DiscoveryConfig.init( + conf.discv5TableIpLimit, conf.discv5BucketIpLimit, conf.discv5BitsPerHop + ) + + let discv5UdpPort = Port(uint16(conf.discv5UdpPort) + conf.portsShift) + + let discv5Conf = WakuDiscoveryV5Config( + discv5Config: some(discv5Config), + address: conf.listenAddress, + port: discv5UdpPort, + privateKey: eth_keys.PrivateKey(key.skkey), + bootstrapRecords: discv5BootstrapEnrs, + autoupdateRecord: conf.discv5EnrAutoUpdate, + ) + + WakuDiscoveryV5.new( + rng, discv5Conf, some(myENR), some(nodePeerManager), nodeTopicSubscriptionQueue + ) diff --git a/waku/discovery/waku_dnsdisc.nim b/waku/discovery/waku_dnsdisc.nim index 921daff499..b635679a12 100644 --- a/waku/discovery/waku_dnsdisc.nim +++ b/waku/discovery/waku_dnsdisc.nim @@ -21,6 +21,7 @@ import libp2p/multiaddress, libp2p/peerid, dnsdisc/client +import libp2p/nameresolving/dnsresolver import ../waku_core export client @@ -97,3 +98,35 @@ proc init*( debug "init success" return ok(wakuDnsDisc) + +proc retrieveDynamicBootstrapNodes*( + dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[IpAddress] +): Result[seq[RemotePeerInfo], string] = + ## Retrieve dynamic bootstrap nodes (DNS discovery) + + if dnsDiscovery and dnsDiscoveryUrl != "": + # DNS discovery + debug "Discovering nodes using Waku DNS discovery", url = dnsDiscoveryUrl + + var nameServers: seq[TransportAddress] + for ip in dnsDiscoveryNameServers: + nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 + + let dnsResolver = DnsResolver.new(nameServers) + + proc resolver(domain: string): Future[string] {.async, gcsafe.} = + trace "resolving", domain = domain + let resolved = await dnsResolver.resolveTxt(domain) + return resolved[0] # Use only first answer + + var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver) + if wakuDnsDiscovery.isOk(): + return wakuDnsDiscovery.get().findPeers().mapErr( + proc(e: cstring): string = + $e + ) + else: + warn "Failed to init Waku DNS discovery" + + debug "No method for retrieving dynamic bootstrap nodes specified." + ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default diff --git a/waku/factory/app.nim b/waku/factory/app.nim index 1ef3b0393a..4b72a23deb 100644 --- a/waku/factory/app.nim +++ b/waku/factory/app.nim @@ -11,7 +11,6 @@ import libp2p/wire, libp2p/multicodec, libp2p/crypto/crypto, - libp2p/nameresolving/dnsresolver, libp2p/protocols/pubsub/gossipsub, libp2p/peerid, eth/keys, @@ -59,7 +58,7 @@ type rng: ref HmacDrbgContext key: crypto.PrivateKey - wakuDiscv5*: Option[WakuDiscoveryV5] + wakuDiscv5*: WakuDiscoveryV5 dynamicBootstrapNodes: seq[RemotePeerInfo] node: WakuNode @@ -75,38 +74,6 @@ func node*(app: App): WakuNode = func version*(app: App): string = app.version -## Retrieve dynamic bootstrap nodes (DNS discovery) - -proc retrieveDynamicBootstrapNodes*( - dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[IpAddress] -): Result[seq[RemotePeerInfo], string] = - if dnsDiscovery and dnsDiscoveryUrl != "": - # DNS discovery - debug "Discovering nodes using Waku DNS discovery", url = dnsDiscoveryUrl - - var nameServers: seq[TransportAddress] - for ip in dnsDiscoveryNameServers: - nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 - - let dnsResolver = DnsResolver.new(nameServers) - - proc resolver(domain: string): Future[string] {.async, gcsafe.} = - trace "resolving", domain = domain - let resolved = await dnsResolver.resolveTxt(domain) - return resolved[0] # Use only first answer - - var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver) - if wakuDnsDiscovery.isOk(): - return wakuDnsDiscovery.get().findPeers().mapErr( - proc(e: cstring): string = - $e - ) - else: - warn "Failed to init Waku DNS discovery" - - debug "No method for retrieving dynamic bootstrap nodes specified." - ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default - ## Initialisation proc init*(T: type App, conf: WakuNodeConf): Result[App, string] = @@ -121,7 +88,7 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] = confCopy.nodekey = some(keyRes.get()) debug "Retrieve dynamic bootstrap nodes" - let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes( + let dynamicBootstrapNodesRes = waku_dnsdisc.retrieveDynamicBootstrapNodes( confCopy.dnsDiscovery, confCopy.dnsDiscoveryUrl, confCopy.dnsDiscoveryNameServers ) if dynamicBootstrapNodesRes.isErr(): @@ -147,43 +114,6 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] = ok(app) -## Setup DiscoveryV5 - -proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 = - let dynamicBootstrapEnrs = - app.dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get()) - - var discv5BootstrapEnrs: seq[enr.Record] - - # parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq - for enrUri in app.conf.discv5BootstrapNodes: - addBootstrapNode(enrUri, discv5BootstrapEnrs) - - discv5BootstrapEnrs.add(dynamicBootstrapEnrs) - - let discv5Config = DiscoveryConfig.init( - app.conf.discv5TableIpLimit, app.conf.discv5BucketIpLimit, app.conf.discv5BitsPerHop - ) - - let discv5UdpPort = Port(uint16(app.conf.discv5UdpPort) + app.conf.portsShift) - - let discv5Conf = WakuDiscoveryV5Config( - discv5Config: some(discv5Config), - address: app.conf.listenAddress, - port: discv5UdpPort, - privateKey: keys.PrivateKey(app.key.skkey), - bootstrapRecords: discv5BootstrapEnrs, - autoupdateRecord: app.conf.discv5EnrAutoUpdate, - ) - - WakuDiscoveryV5.new( - app.rng, - discv5Conf, - some(app.node.enr), - some(app.node.peerManager), - app.node.topicSubscriptionQueue, - ) - proc getPorts( listenAddrs: seq[MultiAddress] ): AppResult[tuple[tcpPort, websocketPort: Option[Port]]] = @@ -259,17 +189,13 @@ proc startApp*(app: var App): AppResult[void] = ## Discv5 if app.conf.discv5Discovery: - app.wakuDiscV5 = some(app.setupDiscoveryV5()) - - if app.wakuDiscv5.isSome(): - let wakuDiscv5 = app.wakuDiscv5.get() - let catchRes = catch: - (waitFor wakuDiscv5.start()) - let startRes = catchRes.valueOr: - return err("failed to start waku discovery v5: " & catchRes.error.msg) + app.wakuDiscV5 = waku_discv5.setupDiscoveryV5( + app.node.enr, app.node.peerManager, app.node.topicSubscriptionQueue, app.conf, + app.dynamicBootstrapNodes, app.rng, app.key, + ) - startRes.isOkOr: - return err("failed to start waku discovery v5: " & error) + (waitFor app.wakuDiscV5.start()).isOkOr: + return err("failed to start waku discovery v5: " & $error) return ok() @@ -282,8 +208,8 @@ proc stop*(app: App): Future[void] {.async: (raises: [Exception]).} = if not app.metricsServer.isNil(): await app.metricsServer.stop() - if app.wakuDiscv5.isSome(): - await app.wakuDiscv5.get().stop() + if not app.wakuDiscv5.isNil(): + await app.wakuDiscv5.stop() if not app.node.isNil(): await app.node.stop() diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 3dd514f6c4..e8284592ac 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -50,7 +50,6 @@ import ../waku_rln_relay, ./config, ./peer_manager, - ../discovery/waku_dnsdisc, ../common/ratelimit declarePublicCounter waku_node_messages, "number of messages received", ["type"] diff --git a/waku/waku_api/rest/builder.nim b/waku/waku_api/rest/builder.nim index 2c7466f969..3b139fd9a0 100644 --- a/waku/waku_api/rest/builder.nim +++ b/waku/waku_api/rest/builder.nim @@ -113,7 +113,7 @@ proc startRestServerEsentials*( proc startRestServerProtocolSupport*( restServer: WakuRestServerRef, node: WakuNode, - wakuDiscv5: Option[WakuDiscoveryV5], + wakuDiscv5: WakuDiscoveryV5, conf: WakuNodeConf, ): Result[void, string] = if not conf.rest: @@ -154,8 +154,8 @@ proc startRestServerProtocolSupport*( let filterCache = MessageCache.init() let filterDiscoHandler = - if wakuDiscv5.isSome(): - some(defaultDiscoveryHandler(wakuDiscv5.get(), Filter)) + if not wakuDiscv5.isNil(): + some(defaultDiscoveryHandler(wakuDiscv5, Filter)) else: none(DiscoveryHandler) @@ -168,8 +168,8 @@ proc startRestServerProtocolSupport*( ## Store REST API let storeDiscoHandler = - if wakuDiscv5.isSome(): - some(defaultDiscoveryHandler(wakuDiscv5.get(), Store)) + if not wakuDiscv5.isNil(): + some(defaultDiscoveryHandler(wakuDiscv5, Store)) else: none(DiscoveryHandler) @@ -182,8 +182,8 @@ proc startRestServerProtocolSupport*( if (conf.lightpushnode != "" and node.wakuLightpushClient != nil) or (conf.lightpush and node.wakuLightPush != nil and node.wakuRelay != nil): let lightDiscoHandler = - if wakuDiscv5.isSome(): - some(defaultDiscoveryHandler(wakuDiscv5.get(), Lightpush)) + if not wakuDiscv5.isNil(): + some(defaultDiscoveryHandler(wakuDiscv5, Lightpush)) else: none(DiscoveryHandler)