From 68e8d9a79c6507fe7bd0889ed97c92e7182872ae Mon Sep 17 00:00:00 2001 From: Ivan Folgueira Bande <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 1 Sep 2023 08:37:02 +0200 Subject: [PATCH] chore(cbindings): Thread-safe libwaku. WakuNode instance created directly from the Waku Thread (#1957) * libwaku: create the WakuNode in the Waku thread. Not in the main thread * node_lifecycle_request.nim: moving hard-coded value to a const item * libwaku.nim: start using 'isOkOr' instead of 'isErr()'-approach * node_lifecycle_request.nim: better 'async' & 'await' usage. Not block the runtime --- examples/cbindings/waku_example.c | 1 - library/libwaku.nim | 15 ++- .../node_lifecycle_request.nim | 101 +++++++++++++++++- .../peer_manager_request.nim | 4 +- .../protocols/relay_request.nim | 2 +- .../inter_thread_communication/request.nim | 2 +- library/waku_thread/waku_thread.nim | 98 ++--------------- 7 files changed, 118 insertions(+), 105 deletions(-) diff --git a/examples/cbindings/waku_example.c b/examples/cbindings/waku_example.c index 3ffcd42535..f6d4a744ba 100644 --- a/examples/cbindings/waku_example.c +++ b/examples/cbindings/waku_example.c @@ -127,7 +127,6 @@ void publish_message(char* pubsubTopic, const char* msg) { WAKU_CALL( waku_relay_publish(pubsubTopic, jsonWakuMsg, 10000 /*timeout ms*/, - handle_publish_ok, handle_error) ); printf("waku relay response [%s]\n", publishResponse); diff --git a/library/libwaku.nim b/library/libwaku.nim index 538c069137..7efa368294 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -72,9 +72,18 @@ proc waku_new(configJson: cstring, if isNil(onErrCb): return RET_MISSING_CALLBACK - let createThRes = waku_thread.createWakuThread(configJson) - if createThRes.isErr(): - let msg = "Error in createWakuThread: " & $createThRes.error + ## Create the Waku thread that will keep waiting for req from the main thread. + waku_thread.createWakuThread().isOkOr: + let msg = "Error in createWakuThread: " & $error + onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) + return RET_ERR + + let sendReqRes = waku_thread.sendRequestToWakuThread( + NodeLifecycleRequest.new( + NodeLifecycleMsgType.CREATE_NODE, + configJson)) + if sendReqRes.isErr(): + let msg = $sendReqRes.error onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg))) return RET_ERR diff --git a/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim index 2f7f92e89b..0ce14088d8 100644 --- a/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/node_lifecycle_request.nim @@ -6,32 +6,123 @@ import stew/results, stew/shims/net import + ../../../waku/common/enr/builder, + ../../../waku/waku_enr/capabilities, + ../../../waku/waku_enr/multiaddr, + ../../../waku/waku_enr/sharding, + ../../../waku/waku_core/message/message, + ../../../waku/waku_core/topics/pubsub_topic, + ../../../waku/node/peer_manager/peer_manager, + ../../../waku/waku_core, ../../../waku/node/waku_node, + ../../../waku/node/builder, + ../../../waku/node/config, + ../../../waku/waku_relay/protocol, + ../../events/[json_error_event,json_message_event,json_base_event], + ../config, ./request type NodeLifecycleMsgType* = enum + CREATE_NODE START_NODE STOP_NODE type NodeLifecycleRequest* = ref object of InterThreadRequest operation: NodeLifecycleMsgType + configJson: cstring ## Only used in 'CREATE_NODE' operation proc new*(T: type NodeLifecycleRequest, - op: NodeLifecycleMsgType): T = + op: NodeLifecycleMsgType, + configJson: cstring = ""): T = - return NodeLifecycleRequest(operation: op) + return NodeLifecycleRequest(operation: op, configJson: configJson) + +proc createNode(configJson: cstring): + Future[Result[WakuNode, string]] {.async.} = + + var privateKey: PrivateKey + var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"), + Port(60000'u16)).value + var relay: bool + var topics = @[""] + var jsonResp: JsonEvent + + if not parseConfig($configJson, + privateKey, + netConfig, + relay, + topics, + jsonResp): + return err($jsonResp) + + var enrBuilder = EnrBuilder.init(privateKey) + + enrBuilder.withIpAddressAndPorts( + netConfig.enrIp, + netConfig.enrPort, + netConfig.discv5UdpPort + ) + + if netConfig.wakuFlags.isSome(): + enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get()) + + enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) + + let addShardedTopics = enrBuilder.withShardedTopics(topics) + if addShardedTopics.isErr(): + let msg = "Error setting shared topics: " & $addShardedTopics.error + return err($JsonErrorEvent.new(msg)) + + let recordRes = enrBuilder.build() + let record = + if recordRes.isErr(): + let msg = "Error building enr record: " & $recordRes.error + return err($JsonErrorEvent.new(msg)) + + else: recordRes.get() + + ## TODO: make the next const configurable from 'configJson'. + const MAX_CONNECTIONS = 50.int + + var builder = WakuNodeBuilder.init() + builder.withRng(crypto.newRng()) + builder.withNodeKey(privateKey) + builder.withRecord(record) + builder.withNetworkConfiguration(netConfig) + builder.withSwitchConfiguration( + maxConnections = some(MAX_CONNECTIONS) + ) + + let wakuNodeRes = builder.build() + if wakuNodeRes.isErr(): + let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error + return err($JsonErrorEvent.new(errorMsg)) + + var newNode = wakuNodeRes.get() + + if relay: + await newNode.mountRelay() + newNode.peerManager.start() + + return ok(newNode) method process*(self: NodeLifecycleRequest, - node: WakuNode): Future[Result[string, string]] {.async.} = + node: ptr WakuNode): Future[Result[string, string]] {.async.} = case self.operation: + of CREATE_NODE: + let newNodeRes = await createNode(self.configJson) + if newNodeRes.isErr(): + return err(newNodeRes.error) + + node[] = newNodeRes.get() of START_NODE: - waitFor node.start() + await node[].start() of STOP_NODE: - waitFor node.stop() + await node[].stop() return ok("") diff --git a/library/waku_thread/inter_thread_communication/peer_manager_request.nim b/library/waku_thread/inter_thread_communication/peer_manager_request.nim index 1d8e4111c2..a76b1ddf82 100644 --- a/library/waku_thread/inter_thread_communication/peer_manager_request.nim +++ b/library/waku_thread/inter_thread_communication/peer_manager_request.nim @@ -47,12 +47,12 @@ proc connectTo(node: WakuNode, return ok() method process*(self: PeerManagementRequest, - node: WakuNode): Future[Result[string, string]] {.async.} = + node: ptr WakuNode): Future[Result[string, string]] {.async.} = case self.operation: of CONNECT_TO: - let ret = node.connectTo(self.peerMultiAddr, self.dialTimeout) + let ret = node[].connectTo(self.peerMultiAddr, self.dialTimeout) if ret.isErr(): return err(ret.error) diff --git a/library/waku_thread/inter_thread_communication/protocols/relay_request.nim b/library/waku_thread/inter_thread_communication/protocols/relay_request.nim index 21a9010360..bc26ecc88a 100644 --- a/library/waku_thread/inter_thread_communication/protocols/relay_request.nim +++ b/library/waku_thread/inter_thread_communication/protocols/relay_request.nim @@ -38,7 +38,7 @@ proc new*(T: type RelayRequest, message: message) method process*(self: RelayRequest, - node: WakuNode): Future[Result[string, string]] {.async.} = + node: ptr WakuNode): Future[Result[string, string]] {.async.} = if node.wakuRelay.isNil(): return err("Operation not supported without Waku Relay enabled.") diff --git a/library/waku_thread/inter_thread_communication/request.nim b/library/waku_thread/inter_thread_communication/request.nim index 5adcb8eaf0..6d3060de55 100644 --- a/library/waku_thread/inter_thread_communication/request.nim +++ b/library/waku_thread/inter_thread_communication/request.nim @@ -14,7 +14,7 @@ import type InterThreadRequest* = ref object of RootObj -method process*(self: InterThreadRequest, node: WakuNode): +method process*(self: InterThreadRequest, node: ptr WakuNode): Future[Result[string, string]] {.base.} = discard proc `$`*(self: InterThreadRequest): string = diff --git a/library/waku_thread/waku_thread.nim b/library/waku_thread/waku_thread.nim index be029a7c35..8fd1253c8e 100644 --- a/library/waku_thread/waku_thread.nim +++ b/library/waku_thread/waku_thread.nim @@ -11,21 +11,8 @@ import stew/results, stew/shims/net import - ../../../waku/common/enr/builder, - ../../../waku/waku_enr/capabilities, - ../../../waku/waku_enr/multiaddr, - ../../../waku/waku_enr/sharding, - ../../../waku/waku_core/message/message, - ../../../waku/waku_core/topics/pubsub_topic, - ../../../waku/node/peer_manager/peer_manager, - ../../../waku/waku_core, ../../../waku/node/waku_node, - ../../../waku/node/builder, - ../../../waku/node/config, - ../../../waku/waku_relay/protocol, ../events/[json_error_event,json_message_event,json_base_event], - ../alloc, - ./config, ./inter_thread_communication/request type @@ -54,91 +41,24 @@ proc waku_init() = locals = addr(locals) nimGC_setStackBottom(locals) -proc createNode(configJson: cstring): Result[WakuNode, string] = - var privateKey: PrivateKey - var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"), - Port(60000'u16)).value - var relay: bool - var topics = @[""] - var jsonResp: JsonEvent - - let cj = configJson.alloc() - - if not parseConfig($cj, - privateKey, - netConfig, - relay, - topics, - jsonResp): - deallocShared(cj) - return err($jsonResp) - - deallocShared(cj) - - var enrBuilder = EnrBuilder.init(privateKey) - - enrBuilder.withIpAddressAndPorts( - netConfig.enrIp, - netConfig.enrPort, - netConfig.discv5UdpPort - ) - - if netConfig.wakuFlags.isSome(): - enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get()) - - enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs) - - let addShardedTopics = enrBuilder.withShardedTopics(topics) - if addShardedTopics.isErr(): - let msg = "Error setting shared topics: " & $addShardedTopics.error - return err($JsonErrorEvent.new(msg)) - - let recordRes = enrBuilder.build() - let record = - if recordRes.isErr(): - let msg = "Error building enr record: " & $recordRes.error - return err($JsonErrorEvent.new(msg)) - - else: recordRes.get() - - var builder = WakuNodeBuilder.init() - builder.withRng(crypto.newRng()) - builder.withNodeKey(privateKey) - builder.withRecord(record) - builder.withNetworkConfiguration(netConfig) - builder.withSwitchConfiguration( - maxConnections = some(50.int) - ) - - let wakuNodeRes = builder.build() - if wakuNodeRes.isErr(): - let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error - return err($JsonErrorEvent.new(errorMsg)) - - var newNode = wakuNodeRes.get() - - if relay: - waitFor newNode.mountRelay() - newNode.peerManager.start() - - return ok(newNode) - proc run(ctx: ptr Context) {.thread.} = ## This is the worker thread body. This thread runs the Waku node ## and attends library user requests (stop, connect_to, etc.) + var node: WakuNode + while running.load == true: ## Trying to get a request from the libwaku main thread let req = ctx.reqChannel.tryRecv() if req[0] == true: - let response = waitFor req[1].process(ctx.node) + let response = waitFor req[1].process(addr node) ctx.respChannel.send( response ) poll() tearDownForeignThreadGc() -proc createWakuThread*(configJson: cstring): Result[void, string] = +proc createWakuThread*(): Result[void, string] = ## This proc is called from the main thread and it creates ## the Waku working thread. @@ -148,21 +68,15 @@ proc createWakuThread*(configJson: cstring): Result[void, string] = ctx.reqChannel.open() ctx.respChannel.open() - let newNodeRes = createNode(configJson) - if newNodeRes.isErr(): - return err(newNodeRes.error) - - ctx.node = newNodeRes.get() - running.store(true) try: createThread(ctx.thread, run, ctx) - except ResourceExhaustedError: + except ValueError, ResourceExhaustedError: # and freeShared for typed allocations! freeShared(ctx) - return err("failed to create a thread: " & getCurrentExceptionMsg()) + return err("failed to create the Waku thread: " & getCurrentExceptionMsg()) return ok()