Skip to content

Commit

Permalink
refactor: big refactor to add waku component in libwaku instead of on…
Browse files Browse the repository at this point in the history
…lu waku node (#2658)
  • Loading branch information
Ivansete-status authored May 3, 2024
1 parent 853ec18 commit 2463527
Show file tree
Hide file tree
Showing 16 changed files with 152 additions and 206 deletions.
30 changes: 15 additions & 15 deletions apps/wakunode2/wakunode2.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import
../../waku/common/logging,
../../waku/factory/external_config,
../../waku/factory/networks_config,
../../waku/factory/app,
../../waku/factory/waku,
../../waku/node/health_monitor,
../../waku/node/waku_metrics,
../../waku/waku_api/rest/builder as rest_server_builder
Expand Down Expand Up @@ -63,7 +63,7 @@ when isMainModule:
## 5. Start monitoring tools and external interfaces
## 6. Setup graceful shutdown hooks

const versionString = "version / git commit hash: " & app.git_version
const versionString = "version / git commit hash: " & waku.git_version

let confRes = WakuNodeConf.load(version = versionString)
if confRes.isErr():
Expand Down Expand Up @@ -119,7 +119,7 @@ when isMainModule:
else:
discard

info "Running nwaku node", version = app.git_version
info "Running nwaku node", version = waku.git_version
logConfig(conf)

# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
Expand All @@ -135,25 +135,25 @@ when isMainModule:
error "Starting esential REST server failed.", error = $error
quit(QuitFailure)

var wakunode2 = App.init(conf).valueOr:
error "App initialization failed", error = error
var waku = Waku.init(conf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)

wakunode2.restServer = restServer
waku.restServer = restServer

nodeHealthMonitor.setNode(wakunode2.node)
nodeHealthMonitor.setNode(waku.node)

wakunode2.startApp().isOkOr:
error "Starting app failed", error = error
(waitFor startWaku(addr waku)).isOkOr:
error "Starting waku failed", error = error
quit(QuitFailure)

rest_server_builder.startRestServerProtocolSupport(
restServer, wakunode2.node, wakunode2.wakuDiscv5, conf
restServer, waku.node, waku.wakuDiscv5, conf
).isOkOr:
error "Starting protocols support REST server failed.", error = $error
quit(QuitFailure)

wakunode2.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr:
waku.metricsServer = waku_metrics.startMetricsServerAndLogging(conf).valueOr:
error "Starting monitoring and external interfaces failed", error = error
quit(QuitFailure)

Expand All @@ -163,7 +163,7 @@ when isMainModule:
## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown.

proc asyncStopper(node: App) {.async: (raises: [Exception]).} =
proc asyncStopper(node: Waku) {.async: (raises: [Exception]).} =
nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
await node.stop()
quit(QuitSuccess)
Expand All @@ -174,15 +174,15 @@ when isMainModule:
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
notice "Shutting down after receiving SIGINT"
asyncSpawn asyncStopper(wakunode2)
asyncSpawn asyncStopper(waku)

setControlCHook(handleCtrlC)

# Handle SIGTERM
when defined(posix):
proc handleSigterm(signal: cint) {.noconv.} =
notice "Shutting down after receiving SIGTERM"
asyncSpawn asyncStopper(wakunode2)
asyncSpawn asyncStopper(waku)

c_signal(ansi_c.SIGTERM, handleSigterm)

Expand All @@ -195,7 +195,7 @@ when isMainModule:
# Not available in -d:release mode
writeStackTrace()

waitFor wakunode2.stop()
waitFor waku.stop()
quit(QuitFailure)

c_signal(ansi_c.SIGSEGV, handleSigsegv)
Expand Down
55 changes: 12 additions & 43 deletions examples/wakustealthcommitments/node_spec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}

import ../../apps/wakunode2/[networks_config, app, external_config]
import ../../waku/common/logging
import
../../waku/common/logging, ../../waku/factory/[waku, networks_config, external_config]
import
std/[options, strutils, os, sequtils],
stew/shims/net as stewNet,
Expand All @@ -15,11 +15,11 @@ import
libp2p/crypto/crypto

export
networks_config, app, logging, options, strutils, os, sequtils, stewNet, chronicles,
networks_config, waku, logging, options, strutils, os, sequtils, stewNet, chronicles,
chronos, metrics, libbacktrace, crypto

proc setup*(): App =
const versionString = "version / git commit hash: " & app.git_version
proc setup*(): Waku =
const versionString = "version / git commit hash: " & waku.git_version
let rng = crypto.newRng()

let confRes = WakuNodeConf.load(version = versionString)
Expand Down Expand Up @@ -48,48 +48,17 @@ proc setup*(): App =
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit

var wakunode2 = App.init(rng, conf)
## Peer persistence
let res1 = wakunode2.setupPeerPersistence()
if res1.isErr():
error "1/5 Setting up storage failed", error = $res1.error
quit(QuitFailure)

debug "2/5 Retrieve dynamic bootstrap nodes"

let res3 = wakunode2.setupDyamicBootstrapNodes()
if res3.isErr():
error "2/5 Retrieving dynamic bootstrap nodes failed", error = $res3.error
debug "Starting node"
var waku = Waku.init(conf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)

debug "3/5 Initializing node"

let res4 = wakunode2.setupWakuApp()
if res4.isErr():
error "3/5 Initializing node failed", error = $res4.error
(waitFor startWaku(addr waku)).isOkOr:
error "Starting waku failed", error = error
quit(QuitFailure)

debug "4/5 Mounting protocols"

var res5: Result[void, string]
try:
res5 = waitFor wakunode2.setupAndMountProtocols()
if res5.isErr():
error "4/5 Mounting protocols failed", error = $res5.error
quit(QuitFailure)
except Exception:
error "4/5 Mounting protocols failed", error = getCurrentExceptionMsg()
quit(QuitFailure)

debug "5/5 Starting node and mounted protocols"

# set triggerSelf to false, we don't want to process our own stealthCommitments
wakunode2.node.wakuRelay.triggerSelf = false

let res6 = wakunode2.startApp()
if res6.isErr():
error "5/5 Starting node and protocols failed", error = $res6.error
quit(QuitFailure)
waku.node.wakuRelay.triggerSelf = false

info "Node setup complete"
return wakunode2
return waku
12 changes: 6 additions & 6 deletions examples/wakustealthcommitments/stealth_commitment_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import
export wire_spec, logging

type StealthCommitmentProtocol* = object
wakuApp: App
waku: Waku
contentTopic: string
spendingKeyPair: StealthCommitmentFFI.KeyPair
viewingKeyPair: StealthCommitmentFFI.KeyPair
Expand Down Expand Up @@ -51,10 +51,10 @@ proc sendThruWaku*(
timestamp: getNanosecondTime(time),
)

(self.wakuApp.node.wakuRlnRelay.appendRLNProof(message, float64(time))).isOkOr:
(self.waku.node.wakuRlnRelay.appendRLNProof(message, float64(time))).isOkOr:
return err("could not append rate limit proof to the message: " & $error)

(await self.wakuApp.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
(await self.waku.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
return err("failed to publish message: " & $error)

debug "rate limit proof is appended to the message"
Expand Down Expand Up @@ -167,7 +167,7 @@ proc getSCPHandler(self: StealthCommitmentProtocol): SCPHandler =
return handler

proc new*(
wakuApp: App, contentTopic = ContentTopic("/wakustealthcommitments/1/app/proto")
waku: Waku, contentTopic = ContentTopic("/wakustealthcommitments/1/app/proto")
): Result[StealthCommitmentProtocol, string] =
let spendingKeyPair = StealthCommitmentFFI.generateKeyPair().valueOr:
return err("could not generate spending key pair: " & $error)
Expand All @@ -178,7 +178,7 @@ proc new*(
info "viewing public key", publicKey = viewingKeyPair.publicKey

let SCP = StealthCommitmentProtocol(
wakuApp: wakuApp,
waku: waku,
contentTopic: contentTopic,
spendingKeyPair: spendingKeyPair,
viewingKeyPair: viewingKeyPair,
Expand All @@ -192,5 +192,5 @@ proc new*(
except CatchableError:
error "could not handle SCP message: ", err = getCurrentExceptionMsg()

wakuApp.node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler))
waku.node.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), some(handler))
return ok(SCP)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import std/[options, sequtils, strutils, json]
import chronicles, chronos, stew/results, stew/shims/net
import ../../../../waku/node/waku_node, ../../../alloc
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc

type DebugNodeMsgType* = enum
RETRIEVE_LISTENING_ADDRESSES
Expand All @@ -20,13 +20,13 @@ proc getMultiaddresses(node: WakuNode): seq[string] =
return node.info().listenAddresses

proc process*(
self: ptr DebugNodeRequest, node: WakuNode
self: ptr DebugNodeRequest, waku: Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

case self.operation
of RETRIEVE_LISTENING_ADDRESSES:
return ok($(%*node.getMultiaddresses()))
return ok($(%*waku.node.getMultiaddresses()))

return err("unsupported operation in DebugNodeRequest")
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import
../../../../waku/node/peer_manager/peer_manager,
../../../../waku/waku_core,
../../../../waku/factory/external_config,
../../../../waku/node/waku_node,
../../../../waku/factory/waku,
../../../../waku/node/config,
../../../../waku/waku_archive/driver/builder,
../../../../waku/waku_archive/driver,
Expand Down Expand Up @@ -48,16 +48,12 @@ proc destroyShared(self: ptr NodeLifecycleRequest) =
deallocShared(self[].configJson)
deallocShared(self)

proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.} =
proc createWaku(configJson: cstring): Future[Result[Waku, string]] {.async.} =
var conf: WakuNodeConf
var errorResp: string

try:
if not parseConfig(
$configJson,
conf,
errorResp,
):
if not parseConfig($configJson, conf, errorResp):
return err(errorResp)
except Exception:
return err("exception calling parseConfig: " & getCurrentExceptionMsg())
Expand All @@ -69,6 +65,7 @@ proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.}

# The Waku Network config (cluster-id=1)
if conf.clusterId == 1:
## TODO: This section is duplicated in wakunode2.nim. We need to move this to a common module
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
if len(conf.shards) != 0:
conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16])
Expand All @@ -88,31 +85,28 @@ proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.}
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit

let wakuRes = Waku.init(conf).valueOr:
error "waku initialization failed", error = error
return err("Failed setting up Waku: " & $error)

let nodeRes = setupNode(conf).valueOr():
error "Failed setting up node", error = error
return err("Failed setting up node: " & $error)

return ok(nodeRes)
return ok(wakuRes)

proc process*(
self: ptr NodeLifecycleRequest, node: ptr WakuNode
self: ptr NodeLifecycleRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

case self.operation
of CREATE_NODE:
let newNodeRes = await createNode(self.configJson)
if newNodeRes.isErr():
return err(newNodeRes.error)

node[] = newNodeRes.get()
waku[] = (await createWaku(self.configJson)).valueOr:
return err("error processing createWaku request: " & $error)
of START_NODE:
await node[].start()
(await waku.startWaku()).isOkOr:
return err("problem starting waku: " & $error)
of STOP_NODE:
try:
await node[].stop()
await waku[].stop()
except Exception:
return err("exception stopping node: " & getCurrentExceptionMsg())

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import std/[options, sequtils, strutils]
import chronicles, chronos, stew/results, stew/shims/net
import ../../../../waku/node/waku_node, ../../../alloc
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node, ../../../alloc

type PeerManagementMsgType* = enum
CONNECT_TO
Expand Down Expand Up @@ -43,14 +43,14 @@ proc connectTo(
return ok()

proc process*(
self: ptr PeerManagementRequest, node: WakuNode
self: ptr PeerManagementRequest, waku: Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

case self.operation
of CONNECT_TO:
let ret = node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
let ret = waku.node.connectTo($self[].peerMultiAddr, self[].dialTimeout)
if ret.isErr():
return err(ret.error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import std/[options, sequtils, strutils]
import chronicles, chronos, stew/byteutils, stew/results, stew/shims/net
import
../../../../../waku/waku_core/message/message,
../../../../../waku/node/waku_node,
../../../../../waku/factory/waku,
../../../../../waku/waku_core/message,
../../../../../waku/waku_core/time, # Timestamp
../../../../../waku/waku_core/topics/pubsub_topic,
Expand Down Expand Up @@ -79,26 +79,26 @@ proc toWakuMessage(m: ThreadSafeWakuMessage): WakuMessage =
return wakuMessage

proc process*(
self: ptr RelayRequest, node: ptr WakuNode
self: ptr RelayRequest, waku: ptr Waku
): Future[Result[string, string]] {.async.} =
defer:
destroyShared(self)

if node.wakuRelay.isNil():
if waku.node.wakuRelay.isNil():
return err("Operation not supported without Waku Relay enabled.")

case self.operation
of SUBSCRIBE:
# TO DO: properly perform 'subscribe'
discard node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback)
discard waku.node.wakuRelay.subscribe($self.pubsubTopic, self.relayEventCallback)
of UNSUBSCRIBE:
# TODO: properly perform 'unsubscribe'
node.wakuRelay.unsubscribeAll($self.pubsubTopic)
waku.node.wakuRelay.unsubscribeAll($self.pubsubTopic)
of PUBLISH:
let msg = self.message.toWakuMessage()
let pubsubTopic = $self.pubsubTopic

let numPeers = await node.wakuRelay.publish(pubsubTopic, msg)
let numPeers = await waku.node.wakuRelay.publish(pubsubTopic, msg)
if numPeers == 0:
return err("Message not sent because no peers found.")
elif numPeers > 0:
Expand Down
Loading

0 comments on commit 2463527

Please sign in to comment.