Skip to content

Commit

Permalink
refactor: simplify app.nim and move discovery items to appropriate mo…
Browse files Browse the repository at this point in the history
…dules (#2657)
  • Loading branch information
Ivansete-status authored May 1, 2024
1 parent e03d116 commit 404810a
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 127 deletions.
33 changes: 0 additions & 33 deletions waku/discovery/discovery_manager.nim

This file was deleted.

47 changes: 45 additions & 2 deletions waku/discovery/waku_discv5.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@ import
eth/keys as eth_keys,
eth/p2p/discoveryv5/node,
eth/p2p/discoveryv5/protocol
import ../node/peer_manager/peer_manager, ../waku_core, ../waku_enr
import
../node/peer_manager/peer_manager,
../waku_core,
../waku_enr,
../factory/external_config

export protocol, waku_enr

Expand Down Expand Up @@ -244,7 +248,7 @@ proc subscriptionsListener(wd: WakuDiscoveryV5) {.async.} =

wd.topicSubscriptionQueue.unregister(key)

proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async.} =
proc start*(wd: WakuDiscoveryV5): Future[Result[void, string]] {.async: (raises: []).} =
if wd.listening:
return err("already listening")

Expand Down Expand Up @@ -313,3 +317,42 @@ proc addBootstrapNode*(bootstrapAddr: string, bootstrapEnrs: var seq[enr.Record]
return

bootstrapEnrs.add(enrRes.value)

proc setupDiscoveryV5*(
myENR: enr.Record,
nodePeerManager: PeerManager,
nodeTopicSubscriptionQueue: AsyncEventQueue[SubscriptionEvent],
conf: WakuNodeConf,
dynamicBootstrapNodes: seq[RemotePeerInfo],
rng: ref HmacDrbgContext,
key: crypto.PrivateKey,
): WakuDiscoveryV5 =
let dynamicBootstrapEnrs =
dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get())

var discv5BootstrapEnrs: seq[enr.Record]

# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in conf.discv5BootstrapNodes:
addBootstrapNode(enrUri, discv5BootstrapEnrs)

discv5BootstrapEnrs.add(dynamicBootstrapEnrs)

let discv5Config = DiscoveryConfig.init(
conf.discv5TableIpLimit, conf.discv5BucketIpLimit, conf.discv5BitsPerHop
)

let discv5UdpPort = Port(uint16(conf.discv5UdpPort) + conf.portsShift)

let discv5Conf = WakuDiscoveryV5Config(
discv5Config: some(discv5Config),
address: conf.listenAddress,
port: discv5UdpPort,
privateKey: eth_keys.PrivateKey(key.skkey),
bootstrapRecords: discv5BootstrapEnrs,
autoupdateRecord: conf.discv5EnrAutoUpdate,
)

WakuDiscoveryV5.new(
rng, discv5Conf, some(myENR), some(nodePeerManager), nodeTopicSubscriptionQueue
)
33 changes: 33 additions & 0 deletions waku/discovery/waku_dnsdisc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import
libp2p/multiaddress,
libp2p/peerid,
dnsdisc/client
import libp2p/nameresolving/dnsresolver
import ../waku_core

export client
Expand Down Expand Up @@ -97,3 +98,35 @@ proc init*(
debug "init success"

return ok(wakuDnsDisc)

proc retrieveDynamicBootstrapNodes*(
dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[IpAddress]
): Result[seq[RemotePeerInfo], string] =
## Retrieve dynamic bootstrap nodes (DNS discovery)

if dnsDiscovery and dnsDiscoveryUrl != "":
# DNS discovery
debug "Discovering nodes using Waku DNS discovery", url = dnsDiscoveryUrl

var nameServers: seq[TransportAddress]
for ip in dnsDiscoveryNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53

let dnsResolver = DnsResolver.new(nameServers)

proc resolver(domain: string): Future[string] {.async, gcsafe.} =
trace "resolving", domain = domain
let resolved = await dnsResolver.resolveTxt(domain)
return resolved[0] # Use only first answer

var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
if wakuDnsDiscovery.isOk():
return wakuDnsDiscovery.get().findPeers().mapErr(
proc(e: cstring): string =
$e
)
else:
warn "Failed to init Waku DNS discovery"

debug "No method for retrieving dynamic bootstrap nodes specified."
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default
94 changes: 10 additions & 84 deletions waku/factory/app.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import
libp2p/wire,
libp2p/multicodec,
libp2p/crypto/crypto,
libp2p/nameresolving/dnsresolver,
libp2p/protocols/pubsub/gossipsub,
libp2p/peerid,
eth/keys,
Expand Down Expand Up @@ -59,7 +58,7 @@ type
rng: ref HmacDrbgContext
key: crypto.PrivateKey

wakuDiscv5*: Option[WakuDiscoveryV5]
wakuDiscv5*: WakuDiscoveryV5
dynamicBootstrapNodes: seq[RemotePeerInfo]

node: WakuNode
Expand All @@ -75,38 +74,6 @@ func node*(app: App): WakuNode =
func version*(app: App): string =
app.version

## Retrieve dynamic bootstrap nodes (DNS discovery)

proc retrieveDynamicBootstrapNodes*(
dnsDiscovery: bool, dnsDiscoveryUrl: string, dnsDiscoveryNameServers: seq[IpAddress]
): Result[seq[RemotePeerInfo], string] =
if dnsDiscovery and dnsDiscoveryUrl != "":
# DNS discovery
debug "Discovering nodes using Waku DNS discovery", url = dnsDiscoveryUrl

var nameServers: seq[TransportAddress]
for ip in dnsDiscoveryNameServers:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53

let dnsResolver = DnsResolver.new(nameServers)

proc resolver(domain: string): Future[string] {.async, gcsafe.} =
trace "resolving", domain = domain
let resolved = await dnsResolver.resolveTxt(domain)
return resolved[0] # Use only first answer

var wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl, resolver)
if wakuDnsDiscovery.isOk():
return wakuDnsDiscovery.get().findPeers().mapErr(
proc(e: cstring): string =
$e
)
else:
warn "Failed to init Waku DNS discovery"

debug "No method for retrieving dynamic bootstrap nodes specified."
ok(newSeq[RemotePeerInfo]()) # Return an empty seq by default

## Initialisation

proc init*(T: type App, conf: WakuNodeConf): Result[App, string] =
Expand All @@ -121,7 +88,7 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] =
confCopy.nodekey = some(keyRes.get())

debug "Retrieve dynamic bootstrap nodes"
let dynamicBootstrapNodesRes = retrieveDynamicBootstrapNodes(
let dynamicBootstrapNodesRes = waku_dnsdisc.retrieveDynamicBootstrapNodes(
confCopy.dnsDiscovery, confCopy.dnsDiscoveryUrl, confCopy.dnsDiscoveryNameServers
)
if dynamicBootstrapNodesRes.isErr():
Expand All @@ -147,43 +114,6 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] =

ok(app)

## Setup DiscoveryV5

proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
let dynamicBootstrapEnrs =
app.dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get())

var discv5BootstrapEnrs: seq[enr.Record]

# parse enrURIs from the configuration and add the resulting ENRs to the discv5BootstrapEnrs seq
for enrUri in app.conf.discv5BootstrapNodes:
addBootstrapNode(enrUri, discv5BootstrapEnrs)

discv5BootstrapEnrs.add(dynamicBootstrapEnrs)

let discv5Config = DiscoveryConfig.init(
app.conf.discv5TableIpLimit, app.conf.discv5BucketIpLimit, app.conf.discv5BitsPerHop
)

let discv5UdpPort = Port(uint16(app.conf.discv5UdpPort) + app.conf.portsShift)

let discv5Conf = WakuDiscoveryV5Config(
discv5Config: some(discv5Config),
address: app.conf.listenAddress,
port: discv5UdpPort,
privateKey: keys.PrivateKey(app.key.skkey),
bootstrapRecords: discv5BootstrapEnrs,
autoupdateRecord: app.conf.discv5EnrAutoUpdate,
)

WakuDiscoveryV5.new(
app.rng,
discv5Conf,
some(app.node.enr),
some(app.node.peerManager),
app.node.topicSubscriptionQueue,
)

proc getPorts(
listenAddrs: seq[MultiAddress]
): AppResult[tuple[tcpPort, websocketPort: Option[Port]]] =
Expand Down Expand Up @@ -259,17 +189,13 @@ proc startApp*(app: var App): AppResult[void] =

## Discv5
if app.conf.discv5Discovery:
app.wakuDiscV5 = some(app.setupDiscoveryV5())

if app.wakuDiscv5.isSome():
let wakuDiscv5 = app.wakuDiscv5.get()
let catchRes = catch:
(waitFor wakuDiscv5.start())
let startRes = catchRes.valueOr:
return err("failed to start waku discovery v5: " & catchRes.error.msg)
app.wakuDiscV5 = waku_discv5.setupDiscoveryV5(
app.node.enr, app.node.peerManager, app.node.topicSubscriptionQueue, app.conf,
app.dynamicBootstrapNodes, app.rng, app.key,
)

startRes.isOkOr:
return err("failed to start waku discovery v5: " & error)
(waitFor app.wakuDiscV5.start()).isOkOr:
return err("failed to start waku discovery v5: " & $error)

return ok()

Expand All @@ -282,8 +208,8 @@ proc stop*(app: App): Future[void] {.async: (raises: [Exception]).} =
if not app.metricsServer.isNil():
await app.metricsServer.stop()

if app.wakuDiscv5.isSome():
await app.wakuDiscv5.get().stop()
if not app.wakuDiscv5.isNil():
await app.wakuDiscv5.stop()

if not app.node.isNil():
await app.node.stop()
1 change: 0 additions & 1 deletion waku/node/waku_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ import
../waku_rln_relay,
./config,
./peer_manager,
../discovery/waku_dnsdisc,
../common/ratelimit

declarePublicCounter waku_node_messages, "number of messages received", ["type"]
Expand Down
14 changes: 7 additions & 7 deletions waku/waku_api/rest/builder.nim
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ proc startRestServerEsentials*(
proc startRestServerProtocolSupport*(
restServer: WakuRestServerRef,
node: WakuNode,
wakuDiscv5: Option[WakuDiscoveryV5],
wakuDiscv5: WakuDiscoveryV5,
conf: WakuNodeConf,
): Result[void, string] =
if not conf.rest:
Expand Down Expand Up @@ -154,8 +154,8 @@ proc startRestServerProtocolSupport*(
let filterCache = MessageCache.init()

let filterDiscoHandler =
if wakuDiscv5.isSome():
some(defaultDiscoveryHandler(wakuDiscv5.get(), Filter))
if not wakuDiscv5.isNil():
some(defaultDiscoveryHandler(wakuDiscv5, Filter))
else:
none(DiscoveryHandler)

Expand All @@ -168,8 +168,8 @@ proc startRestServerProtocolSupport*(

## Store REST API
let storeDiscoHandler =
if wakuDiscv5.isSome():
some(defaultDiscoveryHandler(wakuDiscv5.get(), Store))
if not wakuDiscv5.isNil():
some(defaultDiscoveryHandler(wakuDiscv5, Store))
else:
none(DiscoveryHandler)

Expand All @@ -182,8 +182,8 @@ proc startRestServerProtocolSupport*(
if (conf.lightpushnode != "" and node.wakuLightpushClient != nil) or
(conf.lightpush and node.wakuLightPush != nil and node.wakuRelay != nil):
let lightDiscoHandler =
if wakuDiscv5.isSome():
some(defaultDiscoveryHandler(wakuDiscv5.get(), Lightpush))
if not wakuDiscv5.isNil():
some(defaultDiscoveryHandler(wakuDiscv5, Lightpush))
else:
none(DiscoveryHandler)

Expand Down

0 comments on commit 404810a

Please sign in to comment.