From 0f30ea73fe717f3bddd59295951ddc1223b53dca Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Thu, 21 Mar 2024 23:29:49 -0400 Subject: [PATCH] refactor(c-bindings): node initialization --- library/waku_thread/config.nim | 115 ++++---------- .../requests/node_lifecycle_request.nim | 145 ++---------------- 2 files changed, 49 insertions(+), 211 deletions(-) diff --git a/library/waku_thread/config.nim b/library/waku_thread/config.nim index 4cd2746243..8cd7a79810 100644 --- a/library/waku_thread/config.nim +++ b/library/waku_thread/config.nim @@ -5,15 +5,16 @@ import stew/shims/net, ../../waku/waku_enr/capabilities, ../../waku/common/utils/nat, + ../../waku/factory/external_config, ../../waku/node/waku_node, ../../waku/node/config, ../events/json_base_event proc parsePrivateKey( - jsonNode: JsonNode, privateKey: var PrivateKey, errorResp: var string + jsonNode: JsonNode, conf: var WakuNodeConf, errorResp: var string ): bool = if not jsonNode.contains("key") or jsonNode["key"].kind == JsonNodeKind.JNull: - privateKey = PrivateKey.random(Secp256k1, newRng()[]).tryGet() + conf.nodekey = some(PrivateKey.random(Secp256k1, newRng()[]).tryGet()) return true if jsonNode["key"].kind != JsonNodeKind.JString: @@ -24,7 +25,7 @@ proc parsePrivateKey( try: let skPrivKey = SkPrivateKey.init(crypto.fromHex(key)).tryGet() - privateKey = crypto.PrivateKey(scheme: Secp256k1, skkey: skPrivKey) + conf.nodekey = some(crypto.PrivateKey(scheme: Secp256k1, skkey: skPrivKey)) except CatchableError: let msg = "Invalid node key: " & getCurrentExceptionMsg() errorResp = msg @@ -33,11 +34,12 @@ proc parsePrivateKey( return true proc parseListenAddr( - jsonNode: JsonNode, listenAddr: var IpAddress, errorResp: var string + jsonNode: JsonNode, conf: var WakuNodeConf, errorResp: var string ): bool = + var listenAddr: IpAddress if not jsonNode.contains("host"): - errorResp = "host attribute is required" - return false + conf.listenAddress = defaultListenAddress() + return true if jsonNode["host"].kind != JsonNodeKind.JString: errorResp = "The node host should be a string." @@ -54,20 +56,20 @@ proc parseListenAddr( return true -proc parsePort(jsonNode: JsonNode, port: var int, errorResp: var string): bool = +proc parsePort(jsonNode: JsonNode, conf: var WakuNodeConf, errorResp: var string): bool = if not jsonNode.contains("port"): - errorResp = "port attribute is required" - return false + conf.tcpPort = Port(60000) + return true if jsonNode["port"].kind != JsonNodeKind.JInt: errorResp = "The node port should be an integer." return false - port = jsonNode["port"].getInt() + conf.tcpPort = Port(jsonNode["port"].getInt()) return true -proc parseRelay(jsonNode: JsonNode, relay: var bool, errorResp: var string): bool = +proc parseRelay(jsonNode: JsonNode, conf: var WakuNodeConf, errorResp: var string): bool = if not jsonNode.contains("relay"): errorResp = "relay attribute is required" return false @@ -76,96 +78,80 @@ proc parseRelay(jsonNode: JsonNode, relay: var bool, errorResp: var string): boo errorResp = "The relay config param should be a boolean" return false - relay = jsonNode["relay"].getBool() + conf.relay = jsonNode["relay"].getBool() return true proc parseStore( jsonNode: JsonNode, - store: var bool, - storeNode: var string, - storeRetentionPolicy: var string, - storeDbUrl: var string, - storeVacuum: var bool, - storeDbMigration: var bool, - storeMaxNumDbConnections: var int, + conf: var WakuNodeConf, errorResp: var string, ): bool = if not jsonNode.contains("store"): ## the store parameter is not required. By default is is disabled - store = false + conf.store = false return true if jsonNode["store"].kind != JsonNodeKind.JBool: errorResp = "The store config param should be a boolean" return false - store = jsonNode["store"].getBool() + conf.store = jsonNode["store"].getBool() if jsonNode.contains("storeNode"): if jsonNode["storeNode"].kind != JsonNodeKind.JString: errorResp = "The storeNode config param should be a string" return false - storeNode = jsonNode["storeNode"].getStr() + conf.storeNode = jsonNode["storeNode"].getStr() if jsonNode.contains("storeRetentionPolicy"): if jsonNode["storeRetentionPolicy"].kind != JsonNodeKind.JString: errorResp = "The storeRetentionPolicy config param should be a string" return false - storeRetentionPolicy = jsonNode["storeRetentionPolicy"].getStr() + conf.storeMessageRetentionPolicy = jsonNode["storeRetentionPolicy"].getStr() if jsonNode.contains("storeDbUrl"): if jsonNode["storeDbUrl"].kind != JsonNodeKind.JString: errorResp = "The storeDbUrl config param should be a string" return false - storeDbUrl = jsonNode["storeDbUrl"].getStr() + conf.storeMessageDbUrl = jsonNode["storeDbUrl"].getStr() if jsonNode.contains("storeVacuum"): if jsonNode["storeVacuum"].kind != JsonNodeKind.JBool: errorResp = "The storeVacuum config param should be a bool" return false - storeVacuum = jsonNode["storeVacuum"].getBool() + conf.storeMessageDbVacuum = jsonNode["storeVacuum"].getBool() if jsonNode.contains("storeDbMigration"): if jsonNode["storeDbMigration"].kind != JsonNodeKind.JBool: errorResp = "The storeDbMigration config param should be a bool" return false - storeDbMigration = jsonNode["storeDbMigration"].getBool() + conf.storeMessageDbMigration = jsonNode["storeDbMigration"].getBool() if jsonNode.contains("storeMaxNumDbConnections"): if jsonNode["storeMaxNumDbConnections"].kind != JsonNodeKind.JInt: errorResp = "The storeMaxNumDbConnections config param should be an int" return false - storeMaxNumDbConnections = jsonNode["storeMaxNumDbConnections"].getInt() + conf.storeMaxNumDbConnections = jsonNode["storeMaxNumDbConnections"].getInt() return true -proc parseTopics(jsonNode: JsonNode, topics: var seq[string]) = +proc parseTopics(jsonNode: JsonNode, conf: var WakuNodeConf) = if jsonNode.contains("topics"): for topic in jsonNode["topics"].items: - topics.add(topic.getStr()) + conf.topics.add(topic.getStr()) else: - topics = @["/waku/2/default-waku/proto"] + conf.topics = @["/waku/2/default-waku/proto"] proc parseConfig*( configNodeJson: string, - privateKey: var PrivateKey, - netConfig: var NetConfig, - relay: var bool, - topics: var seq[string], - store: var bool, - storeNode: var string, - storeRetentionPolicy: var string, - storeDbUrl: var string, - storeVacuum: var bool, - storeDbMigration: var bool, - storeMaxNumDbConnections: var int, + conf: var WakuNodeConf, errorResp: var string, ): bool {.raises: [].} = if configNodeJson.len == 0: @@ -181,7 +167,7 @@ proc parseConfig*( # key try: - if not parsePrivateKey(jsonNode, privateKey, errorResp): + if not parsePrivateKey(jsonNode, conf, errorResp): return false except Exception, KeyError: errorResp = "Exception calling parsePrivateKey: " & getCurrentExceptionMsg() @@ -191,37 +177,23 @@ proc parseConfig*( var listenAddr: IpAddress try: listenAddr = parseIpAddress("127.0.0.1") - if not parseListenAddr(jsonNode, listenAddr, errorResp): + if not parseListenAddr(jsonNode, conf, errorResp): return false except Exception, ValueError: errorResp = "Exception calling parseIpAddress: " & getCurrentExceptionMsg() return false # port - var port = 0 try: - if not parsePort(jsonNode, port, errorResp): + if not parsePort(jsonNode, conf, errorResp): return false except Exception, ValueError: errorResp = "Exception calling parsePort: " & getCurrentExceptionMsg() return false - let natRes = setupNat("any", clientId, Port(uint16(port)), Port(uint16(port))) - if natRes.isErr(): - errorResp = "failed to setup NAT: " & $natRes.error - return false - - let (extIp, extTcpPort, _) = natRes.get() - - let extPort = - if extIp.isSome() and extTcpPort.isNone(): - some(Port(uint16(port))) - else: - extTcpPort - # relay try: - if not parseRelay(jsonNode, relay, errorResp): + if not parseRelay(jsonNode, conf, errorResp): return false except Exception, KeyError: errorResp = "Exception calling parseRelay: " & getCurrentExceptionMsg() @@ -229,38 +201,17 @@ proc parseConfig*( # topics try: - parseTopics(jsonNode, topics) + parseTopics(jsonNode, conf) except Exception, KeyError: errorResp = "Exception calling parseTopics: " & getCurrentExceptionMsg() return false # store try: - if not parseStore( - jsonNode, store, storeNode, storeRetentionPolicy, storeDbUrl, storeVacuum, - storeDbMigration, storeMaxNumDbConnections, errorResp, - ): + if not parseStore(jsonNode, conf, errorResp): return false except Exception, KeyError: errorResp = "Exception calling parseStore: " & getCurrentExceptionMsg() return false - let wakuFlags = CapabilitiesBitfield.init( - lightpush = false, filter = false, store = false, relay = relay - ) - - let netConfigRes = NetConfig.init( - bindIp = listenAddr, - bindPort = Port(uint16(port)), - extIp = extIp, - extPort = extPort, - wakuFlags = some(wakuFlags), - ) - - if netConfigRes.isErr(): - errorResp = "Error creating NetConfig: " & $netConfigRes.error - return false - - netConfig = netConfigRes.value - return true diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index 4fa7e4f394..b2db670c9a 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -1,4 +1,5 @@ import std/options +import std/sequtils import chronos, chronicles, stew/results, stew/shims/net import ../../../../waku/common/enr/builder, @@ -6,9 +7,11 @@ import ../../../../waku/waku_enr/multiaddr, ../../../../waku/waku_enr/sharding, ../../../../waku/waku_core/message/message, + ../../../../waku/waku_core/message/default_values, ../../../../waku/waku_core/topics/pubsub_topic, ../../../../waku/node/peer_manager/peer_manager, ../../../../waku/waku_core, + ../../../../waku/factory/external_config, ../../../../waku/node/waku_node, ../../../../waku/node/config, ../../../../waku/waku_archive/driver/builder, @@ -18,6 +21,8 @@ import ../../../../waku/waku_relay/protocol, ../../../../waku/waku_store, ../../../../waku/factory/builder, + ../../../../waku/factory/node_factory, + ../../../apps/wakunode2/networks_config, ../../../events/[json_message_event, json_base_event], ../../../alloc, ../../config @@ -43,149 +48,31 @@ proc destroyShared(self: ptr NodeLifecycleRequest) = deallocShared(self[].configJson) deallocShared(self) -proc configureStore( - node: WakuNode, - storeNode: string, - storeRetentionPolicy: string, - storeDbUrl: string, - storeVacuum: bool, - storeDbMigration: bool, - storeMaxNumDbConnections: int, -): Future[Result[void, string]] {.async.} = - ## This snippet is extracted/duplicated from the app.nim file - - var onFatalErrorAction = proc(msg: string) {.gcsafe, closure.} = - ## Action to be taken when an internal error occurs during the node run. - ## e.g. the connection with the database is lost and not recovered. - # error "Unrecoverable error occurred", error = msg - ## TODO: use a callback given as a parameter - discard - - # Archive setup - let archiveDriverRes = await ArchiveDriver.new( - storeDbUrl, storeVacuum, storeDbMigration, storeMaxNumDbConnections, - onFatalErrorAction, - ) - if archiveDriverRes.isErr(): - return err("failed to setup archive driver: " & archiveDriverRes.error) - - let retPolicyRes = RetentionPolicy.new(storeRetentionPolicy) - if retPolicyRes.isErr(): - return err("failed to create retention policy: " & retPolicyRes.error) - - let mountArcRes = node.mountArchive(archiveDriverRes.get(), retPolicyRes.get()) - if mountArcRes.isErr(): - return err("failed to mount waku archive protocol: " & mountArcRes.error) - - # Store setup - try: - await mountStore(node) - except CatchableError: - return err("failed to mount waku store protocol: " & getCurrentExceptionMsg()) - - mountStoreClient(node) - if storeNode != "": - let storeNodeInfo = parsePeerInfo(storeNode) - if storeNodeInfo.isOk(): - node.peerManager.addServicePeer(storeNodeInfo.value, WakuStoreCodec) - else: - return err("failed to set node waku store peer: " & storeNodeInfo.error) - - return ok() - proc createNode(configJson: cstring): Future[Result[WakuNode, string]] {.async.} = - var privateKey: PrivateKey - var netConfig = NetConfig.init(parseIpAddress("127.0.0.1"), Port(60000'u16)).value - ## relay - var relay: bool - var topics = @[""] - - ## store - var store: bool - var storeNode: string - var storeRetentionPolicy: string - var storeDbUrl: string - var storeVacuum: bool - var storeDbMigration: bool - var storeMaxNumDbConnections: int - + var conf: WakuNodeConf var errorResp: string try: if not parseConfig( $configJson, - privateKey, - netConfig, - relay, - topics, - store, - storeNode, - storeRetentionPolicy, - storeDbUrl, - storeVacuum, - storeDbMigration, - storeMaxNumDbConnections, + conf, errorResp, ): return err(errorResp) except Exception: return err("exception calling parseConfig: " & getCurrentExceptionMsg()) - var enrBuilder = EnrBuilder.init(privateKey) - - enrBuilder.withIpAddressAndPorts( - netConfig.enrIp, netConfig.enrPort, netConfig.discv5UdpPort - ) - - if netConfig.wakuFlags.isSome(): - enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get()) - - enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) - - let addShardedTopics = enrBuilder.withShardedTopics(topics) - if addShardedTopics.isErr(): - let msg = "Error setting shared topics: " & $addShardedTopics.error - return err(msg) - - let recordRes = enrBuilder.build() - let record = - if recordRes.isErr(): - let msg = "Error building enr record: " & $recordRes.error - return err(msg) - else: - recordRes.get() - - ## TODO: make the next const configurable from 'configJson'. - const MAX_CONNECTIONS = 50.int - - var builder = WakuNodeBuilder.init() - builder.withRng(crypto.newRng()) - builder.withNodeKey(privateKey) - builder.withRecord(record) - builder.withNetworkConfiguration(netConfig) - builder.withSwitchConfiguration(maxConnections = some(MAX_CONNECTIONS)) - - let wakuNodeRes = builder.build() - if wakuNodeRes.isErr(): - let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error - return err(errorMsg) - - var newNode = wakuNodeRes.get() - - if relay: - await newNode.mountRelay() - newNode.peerManager.start() + # TODO: figure out how to extract default values from the config pragma + conf.clusterId = 0 + conf.nat = "any" + conf.maxConnections = 50.uint16 + conf.maxMessageSize = default_values.DefaultMaxWakuMessageSizeStr - if store: - ( - await newNode.configureStore( - storeNode, storeRetentionPolicy, storeDbUrl, storeVacuum, storeDbMigration, - storeMaxNumDbConnections, - ) - ).isOkOr: - return err("error configuring store: " & $error) + let nodeRes = setupNode(conf).valueOr(): + error "Failed setting up node", error = error + return err("Failed setting up node: " & $error) - return ok(newNode) + return ok(nodeRes) proc process*( self: ptr NodeLifecycleRequest, node: ptr WakuNode