Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: big refactor to add waku component in libwaku instead of only waku node #2658

Merged
merged 4 commits into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import
../../waku/common/logging,
../../waku/factory/external_config,
../../waku/factory/networks_config,
../../waku/factory/app,
../../waku/factory/waku,
../../waku/node/health_monitor,
../../waku/node/waku_metrics,
../../waku/waku_api/rest/builder as rest_server_builder
Expand Down Expand Up @@ -63,7 +63,7 @@ when isMainModule:
## 5. Start monitoring tools and external interfaces
## 6. Setup graceful shutdown hooks

const versionString = "version / git commit hash: " & app.git_version
const versionString = "version / git commit hash: " & waku.git_version

let confRes = WakuNodeConf.load(version = versionString)
if confRes.isErr():
Expand Down Expand Up @@ -119,7 +119,7 @@ when isMainModule:
else:
discard

info "Running nwaku node", version = app.git_version
info "Running nwaku node", version = waku.git_version
logConfig(conf)

# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
Expand All @@ -135,25 +135,25 @@ when isMainModule:
error "Starting esential REST server failed.", error = $error
quit(QuitFailure)

var wakunode2 = App.init(conf).valueOr:
error "App initialization failed", error = error
var waku = Waku.init(conf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)

wakunode2.restServer = restServer
waku.restServer = restServer

nodeHealthMonitor.setNode(wakunode2.node)
nodeHealthMonitor.setNode(waku.node)

wakunode2.startApp().isOkOr:
error "Starting app failed", error = error
(waitFor startWaku(addr waku)).isOkOr:
error "Starting waku failed", error = error
quit(QuitFailure)

rest_server_builder.startRestServerProtocolSupport(
restServer, wakunode2.node, wakunode2.wakuDiscv5, conf
restServer, waku.node, waku.wakuDiscv5, conf
).isOkOr:
error "Starting protocols support REST server failed.", error = $error
quit(QuitFailure)

wakunode2.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr:
waku.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr:
error "Starting monitoring and external interfaces failed", error = error
quit(QuitFailure)

Expand All @@ -163,7 +163,7 @@ when isMainModule:
## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown.

proc asyncStopper(node: App) {.async: (raises: [Exception]).} =
proc asyncStopper(node: Waku) {.async: (raises: [Exception]).} =
nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
await node.stop()
quit(QuitSuccess)
Expand All @@ -174,15 +174,15 @@ when isMainModule:
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
notice "Shutting down after receiving SIGINT"
asyncSpawn asyncStopper(wakunode2)
asyncSpawn asyncStopper(waku)

setControlCHook(handleCtrlC)

# Handle SIGTERM
when defined(posix):
proc handleSigterm(signal: cint) {.noconv.} =
notice "Shutting down after receiving SIGTERM"
asyncSpawn asyncStopper(wakunode2)
asyncSpawn asyncStopper(waku)

c_signal(ansi_c.SIGTERM, handleSigterm)

Expand All @@ -195,7 +195,7 @@ when isMainModule:
# Not available in -d:release mode
writeStackTrace()

waitFor wakunode2.stop()
waitFor waku.stop()
quit(QuitFailure)

c_signal(ansi_c.SIGSEGV, handleSigsegv)
Expand Down
55 changes: 12 additions & 43 deletions examples/wakustealthcommitments/node_spec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}

import ../../apps/wakunode2/[networks_config, app, external_config]
import ../../waku/common/logging
import
../../waku/common/logging, ../../waku/factory/[waku, networks_config, external_config]
import
std/[options, strutils, os, sequtils],
stew/shims/net as stewNet,
Expand All @@ -15,11 +15,11 @@ import
libp2p/crypto/crypto

export
networks_config, app, logging, options, strutils, os, sequtils, stewNet, chronicles,
networks_config, waku, logging, options, strutils, os, sequtils, stewNet, chronicles,
chronos, metrics, libbacktrace, crypto

proc setup*(): App =
const versionString = "version / git commit hash: " & app.git_version
proc setup*(): Waku =
const versionString = "version / git commit hash: " & waku.git_version
let rng = crypto.newRng()

let confRes = WakuNodeConf.load(version = versionString)
Expand Down Expand Up @@ -48,48 +48,17 @@ proc setup*(): App =
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit

var wakunode2 = App.init(rng, conf)
## Peer persistence
let res1 = wakunode2.setupPeerPersistence()
if res1.isErr():
error "1/5 Setting up storage failed", error = $res1.error
quit(QuitFailure)

debug "2/5 Retrieve dynamic bootstrap nodes"

let res3 = wakunode2.setupDyamicBootstrapNodes()
if res3.isErr():
error "2/5 Retrieving dynamic bootstrap nodes failed", error = $res3.error
debug "Starting node"
var waku = Waku.init(conf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)

debug "3/5 Initializing node"

let res4 = wakunode2.setupWakuApp()
if res4.isErr():
error "3/5 Initializing node failed", error = $res4.error
(waitFor startWaku(addr waku)).isOkOr:
error "Starting waku failed", error = error
quit(QuitFailure)

debug "4/5 Mounting protocols"

var res5: Result[void, string]
try:
res5 = waitFor wakunode2.setupAndMountProtocols()
if res5.isErr():
error "4/5 Mounting protocols failed", error = $res5.error
quit(QuitFailure)
except Exception:
error "4/5 Mounting protocols failed", error = getCurrentExceptionMsg()
quit(QuitFailure)

debug "5/5 Starting node and mounted protocols"

# set triggerSelf to false, we don't want to process our own stealthCommitments
wakunode2.node.wakuRelay.triggerSelf = false

let res6 = wakunode2.startApp()
if res6.isErr():
error "5/5 Starting node and protocols failed", error = $res6.error
quit(QuitFailure)
waku.node.wakuRelay.triggerSelf = false

info "Node setup complete"
return wakunode2
return waku
12 changes: 6 additions & 6 deletions examples/wakustealthcommitments/stealth_commitment_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import
export wire_spec, logging

type StealthCommitmentProtocol* = object
wakuApp: App
waku: Waku
contentTopic: string
spendingKeyPair: StealthCommitmentFFI.KeyPair
viewingKeyPair: StealthCommitmentFFI.KeyPair
Expand Down Expand Up @@ -51,10 +51,10 @@ proc sendThruWaku*(
timestamp: getNanosecondTime(time),
)

(self.wakuApp.node.wakuRlnRelay.appendRLNProof(message, float64(time))).isOkOr:
(self.waku.node.wakuRlnRelay.appendRLNProof(message, float64(time))).isOkOr:
return err("could not append rate limit proof to the message: " & $error)

(await self.wakuApp.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
(await self.waku.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
return err("failed to publish message: " & $error)

debug "rate limit proof is appended to the message"
Expand Down Expand Up @@ -167,7 +167,7 @@ proc getSCPHandler(self: StealthCommitmentProtocol): SCPHandler =
return handler

proc new*(
wakuApp: App, contentTopic = ContentTopic("/wakustealthcommitments/1/app/proto")
waku: Waku, contentTopic = ContentTopic("/wakustealthcommitments/1/app/proto")
): Result[StealthCommitmentProtocol, string] =
let spendingKeyPair = StealthCommitmentFFI.generateKeyPair().valueOr:
return err("could not generate spending key pair: " & $error)
Expand All @@ -178,7 +178,7 @@ proc new*(
info "viewing public key", publicKey = viewingKeyPair.publicKey

let SCP = StealthCommitmentProtocol(
wakuApp: wakuApp,
waku: waku,
contentTopic: contentTopic,
spendingKeyPair: spendingKeyPair,
viewingKeyPair: viewingKeyPair,
Expand All @@ -192,5 +192,5 @@ proc new*(
except CatchableError:
error "could not handle SCP message: ", err = getCurrentExceptionMsg()

wakuApp.node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler))
waku.node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler))
return ok(SCP)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import std/[options, sequtils, strutils, json]
import chronicles, chronos, stew/results, stew/shims/net
import ../../../../waku/node/waku_node, ../../../alloc
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc

type DebugNodeMsgType* = enum
RETRIEVE_LISTENING_ADDRESSES
Expand All @@ -20,13 +20,13 @@ proc getMultiaddresses(node: WakuNode): seq[string] =
return node.info().listenAddresses

proc process*(
self: ptr DebugNodeRequest, node: WakuNode
self: ptr DebugNodeRequest, waku: Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

case self.operation
of RETRIEVE_LISTENING_ADDRESSES:
return ok($(%*node.getMultiaddresses()))
return ok($(%*waku.node.getMultiaddresses()))

return err("unsupported operation in DebugNodeRequest")
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import
../../../../waku/node/peer_manager/peer_manager,
../../../../waku/waku_core,
../../../../waku/factory/external_config,
../../../../waku/node/waku_node,
../../../../waku/factory/waku,
../../../../waku/node/config,
../../../../waku/waku_archive/driver/builder,
../../../../waku/waku_archive/driver,
Expand Down Expand Up @@ -48,16 +48,12 @@ proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)

proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.} =
proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
var conf: WakuNodeConf
var errorResp: string

try:
if not parseConfig(
$configJson,
conf,
errorResp,
):
if not parseConfig($configJson, conf, errorResp):
return err(errorResp)
except Exception:
return err("exception calling parseConfig: " & getCurrentExceptionMsg())
Expand All @@ -69,6 +65,7 @@ proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.}

# The Waku Network config (cluster-id=1)
if conf.clusterId == 1:
## TODO: This section is duplicated in wakunode2.nim. We need to move this to a common module
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
if len(conf.shards) != 0:
conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16])
Expand All @@ -88,31 +85,28 @@ proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.}
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit

let wakuRes = Waku.init(conf).valueOr:
error "waku initialization failed", error = error
return err("Failed setting up Waku: " & $error)

let nodeRes = setupNode(conf).valueOr():
error "Failed setting up node", error = error
return err("Failed setting up node: " & $error)

return ok(nodeRes)
return ok(wakuRes)

proc process*(
self: ptr NodeLifecycleRequest, node: ptr WakuNode
self: ptr NodeLifecycleRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

case self.operation
of CREATE_NODE:
let newNodeRes = await createNode(self.configJson)
if newNodeRes.isErr():
return err(newNodeRes.error)

node[] = newNodeRes.get()
waku[] = (await createWaku(self.configJson)).valueOr:
return err("error processing createWaku request: " & $error)
of START_NODE:
await node[].start()
(await waku.startWaku()).isOkOr:
return err("problem starting waku: " & $error)
of STOP_NODE:
try:
await node[].stop()
await waku[].stop()
except Exception:
return err("exception stopping node: " & getCurrentExceptionMsg())

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import std/[options, sequtils, strutils]
import chronicles, chronos, stew/results, stew/shims/net
import ../../../../waku/node/waku_node, ../../../alloc
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc

type PeerManagementMsgType* = enum
CONNECT_TO
Expand Down Expand Up @@ -43,14 +43,14 @@ proc connectTo(
return ok()

proc process*(
self: ptr PeerManagementRequest, node: WakuNode
self: ptr PeerManagementRequest, waku: Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

case self.operation
of CONNECT_TO:
let ret = node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
if ret.isErr():
return err(ret.error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import std/[options, sequtils, strutils]
import chronicles, chronos, stew/byteutils, stew/results, stew/shims/net
import
../../../../../waku/waku_core/message/message,
../../../../../waku/node/waku_node,
../../../../../waku/factory/waku,
../../../../../waku/waku_core/message,
../../../../../waku/waku_core/time, # Timestamp
../../../../../waku/waku_core/topics/pubsub_topic,
Expand Down Expand Up @@ -79,26 +79,26 @@ proc toWakuMessage(m: ThreadSafeWakuMessage): WakuMessage =
return wakuMessage

proc process*(
self: ptr RelayRequest, node: ptr WakuNode
self: ptr RelayRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

if node.wakuRelay.isNil():
if waku.node.wakuRelay.isNil():
return err("Operation not supported without Waku Relay enabled.")

case self.operation
of SUBSCRIBE:
# TO DO: properly perform 'subscribe'
discard node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback)
discard waku.node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback)
of UNSUBSCRIBE:
# TODO: properly perform 'unsubscribe'
node.wakuRelay.unsubscribeAll($self.pubsubTopic)
waku.node.wakuRelay.unsubscribeAll($self.pubsubTopic)
of PUBLISH:
let msg = self.message.toWakuMessage()
let pubsubTopic = $self.pubsubTopic

let numPeers = await node.wakuRelay.publish(pubsubTopic, msg)
let numPeers = await waku.node.wakuRelay.publish(pubsubTopic, msg)
if numPeers == 0:
return err("Message not sent because no peers found.")
elif numPeers > 0:
Expand Down
Loading
Loading