Skip to content

Commit

Permalink
chore(cbindings): Thread-safe libwaku. WakuNode instance created dire…
Browse files Browse the repository at this point in the history
…ctly from the Waku Thread (#1957)

* libwaku: create the WakuNode in the Waku thread. Not in the main thread
* node_lifecycle_request.nim: moving hard-coded value to a const item
* libwaku.nim: start using 'isOkOr' instead of 'isErr()'-approach
* node_lifecycle_request.nim: better 'async' & 'await' usage. Not block the runtime
  • Loading branch information
Ivansete-status authored Sep 1, 2023
1 parent 0c40588 commit 68e8d9a
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 105 deletions.
1 change: 0 additions & 1 deletion examples/cbindings/waku_example.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ void publish_message(char* pubsubTopic, const char* msg) {
WAKU_CALL( waku_relay_publish(pubsubTopic,
jsonWakuMsg,
10000 /*timeout ms*/,
handle_publish_ok,
handle_error) );

printf("waku relay response [%s]\n", publishResponse);
Expand Down
15 changes: 12 additions & 3 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,18 @@ proc waku_new(configJson: cstring,
if isNil(onErrCb):
return RET_MISSING_CALLBACK

let createThRes = waku_thread.createWakuThread(configJson)
if createThRes.isErr():
let msg = "Error in createWakuThread: " & $createThRes.error
## Create the Waku thread that will keep waiting for req from the main thread.
waku_thread.createWakuThread().isOkOr:
let msg = "Error in createWakuThread: " & $error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR

let sendReqRes = waku_thread.sendRequestToWakuThread(
NodeLifecycleRequest.new(
NodeLifecycleMsgType.CREATE_NODE,
configJson))
if sendReqRes.isErr():
let msg = $sendReqRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
return RET_ERR

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,123 @@ import
stew/results,
stew/shims/net
import
../../../waku/common/enr/builder,
../../../waku/waku_enr/capabilities,
../../../waku/waku_enr/multiaddr,
../../../waku/waku_enr/sharding,
../../../waku/waku_core/message/message,
../../../waku/waku_core/topics/pubsub_topic,
../../../waku/node/peer_manager/peer_manager,
../../../waku/waku_core,
../../../waku/node/waku_node,
../../../waku/node/builder,
../../../waku/node/config,
../../../waku/waku_relay/protocol,
../../events/[json_error_event,json_message_event,json_base_event],
../config,
./request

type
NodeLifecycleMsgType* = enum
CREATE_NODE
START_NODE
STOP_NODE

type
NodeLifecycleRequest* = ref object of InterThreadRequest
operation: NodeLifecycleMsgType
configJson: cstring ## Only used in 'CREATE_NODE' operation

proc new*(T: type NodeLifecycleRequest,
op: NodeLifecycleMsgType): T =
op: NodeLifecycleMsgType,
configJson: cstring = ""): T =

return NodeLifecycleRequest(operation: op)
return NodeLifecycleRequest(operation: op, configJson: configJson)

proc createNode(configJson: cstring):
Future[Result[WakuNode, string]] {.async.} =

var privateKey: PrivateKey
var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"),
Port(60000'u16)).value
var relay: bool
var topics = @[""]
var jsonResp: JsonEvent

if not parseConfig($configJson,
privateKey,
netConfig,
relay,
topics,
jsonResp):
return err($jsonResp)

var enrBuilder = EnrBuilder.init(privateKey)

enrBuilder.withIpAddressAndPorts(
netConfig.enrIp,
netConfig.enrPort,
netConfig.discv5UdpPort
)

if netConfig.wakuFlags.isSome():
enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get())

enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)

let addShardedTopics = enrBuilder.withShardedTopics(topics)
if addShardedTopics.isErr():
let msg = "Error setting shared topics: " & $addShardedTopics.error
return err($JsonErrorEvent.new(msg))

let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
let msg = "Error building enr record: " & $recordRes.error
return err($JsonErrorEvent.new(msg))

else: recordRes.get()

## TODO: make the next const configurable from 'configJson'.
const MAX_CONNECTIONS = 50.int

var builder = WakuNodeBuilder.init()
builder.withRng(crypto.newRng())
builder.withNodeKey(privateKey)
builder.withRecord(record)
builder.withNetworkConfiguration(netConfig)
builder.withSwitchConfiguration(
maxConnections = some(MAX_CONNECTIONS)
)

let wakuNodeRes = builder.build()
if wakuNodeRes.isErr():
let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error
return err($JsonErrorEvent.new(errorMsg))

var newNode = wakuNodeRes.get()

if relay:
await newNode.mountRelay()
newNode.peerManager.start()

return ok(newNode)

method process*(self: NodeLifecycleRequest,
node: WakuNode): Future[Result[string, string]] {.async.} =
node: ptr WakuNode): Future[Result[string, string]] {.async.} =

case self.operation:
of CREATE_NODE:
let newNodeRes = await createNode(self.configJson)
if newNodeRes.isErr():
return err(newNodeRes.error)

node[] = newNodeRes.get()

of START_NODE:
waitFor node.start()
await node[].start()

of STOP_NODE:
waitFor node.stop()
await node[].stop()

return ok("")
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ proc connectTo(node: WakuNode,
return ok()

method process*(self: PeerManagementRequest,
node: WakuNode): Future[Result[string, string]] {.async.} =
node: ptr WakuNode): Future[Result[string, string]] {.async.} =

case self.operation:

of CONNECT_TO:
let ret = node.connectTo(self.peerMultiAddr, self.dialTimeout)
let ret = node[].connectTo(self.peerMultiAddr, self.dialTimeout)
if ret.isErr():
return err(ret.error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ proc new*(T: type RelayRequest,
message: message)

method process*(self: RelayRequest,
node: WakuNode): Future[Result[string, string]] {.async.} =
node: ptr WakuNode): Future[Result[string, string]] {.async.} =

if node.wakuRelay.isNil():
return err("Operation not supported without Waku Relay enabled.")
Expand Down
2 changes: 1 addition & 1 deletion library/waku_thread/inter_thread_communication/request.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import
type
InterThreadRequest* = ref object of RootObj

method process*(self: InterThreadRequest, node: WakuNode):
method process*(self: InterThreadRequest, node: ptr WakuNode):
Future[Result[string, string]] {.base.} = discard

proc `$`*(self: InterThreadRequest): string =
Expand Down
98 changes: 6 additions & 92 deletions library/waku_thread/waku_thread.nim
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,8 @@ import
stew/results,
stew/shims/net
import
../../../waku/common/enr/builder,
../../../waku/waku_enr/capabilities,
../../../waku/waku_enr/multiaddr,
../../../waku/waku_enr/sharding,
../../../waku/waku_core/message/message,
../../../waku/waku_core/topics/pubsub_topic,
../../../waku/node/peer_manager/peer_manager,
../../../waku/waku_core,
../../../waku/node/waku_node,
../../../waku/node/builder,
../../../waku/node/config,
../../../waku/waku_relay/protocol,
../events/[json_error_event,json_message_event,json_base_event],
../alloc,
./config,
./inter_thread_communication/request

type
Expand Down Expand Up @@ -54,91 +41,24 @@ proc waku_init() =
locals = addr(locals)
nimGC_setStackBottom(locals)

proc createNode(configJson: cstring): Result[WakuNode, string] =
var privateKey: PrivateKey
var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"),
Port(60000'u16)).value
var relay: bool
var topics = @[""]
var jsonResp: JsonEvent

let cj = configJson.alloc()

if not parseConfig($cj,
privateKey,
netConfig,
relay,
topics,
jsonResp):
deallocShared(cj)
return err($jsonResp)

deallocShared(cj)

var enrBuilder = EnrBuilder.init(privateKey)

enrBuilder.withIpAddressAndPorts(
netConfig.enrIp,
netConfig.enrPort,
netConfig.discv5UdpPort
)

if netConfig.wakuFlags.isSome():
enrBuilder.withWakuCapabilities(netConfig.wakuFlags.get())

enrBuilder.withMultiaddrs(netConfig.enrMultiaddrs)

let addShardedTopics = enrBuilder.withShardedTopics(topics)
if addShardedTopics.isErr():
let msg = "Error setting shared topics: " & $addShardedTopics.error
return err($JsonErrorEvent.new(msg))

let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
let msg = "Error building enr record: " & $recordRes.error
return err($JsonErrorEvent.new(msg))

else: recordRes.get()

var builder = WakuNodeBuilder.init()
builder.withRng(crypto.newRng())
builder.withNodeKey(privateKey)
builder.withRecord(record)
builder.withNetworkConfiguration(netConfig)
builder.withSwitchConfiguration(
maxConnections = some(50.int)
)

let wakuNodeRes = builder.build()
if wakuNodeRes.isErr():
let errorMsg = "failed to create waku node instance: " & wakuNodeRes.error
return err($JsonErrorEvent.new(errorMsg))

var newNode = wakuNodeRes.get()

if relay:
waitFor newNode.mountRelay()
newNode.peerManager.start()

return ok(newNode)

proc run(ctx: ptr Context) {.thread.} =
## This is the worker thread body. This thread runs the Waku node
## and attends library user requests (stop, connect_to, etc.)

var node: WakuNode

while running.load == true:
## Trying to get a request from the libwaku main thread
let req = ctx.reqChannel.tryRecv()
if req[0] == true:
let response = waitFor req[1].process(ctx.node)
let response = waitFor req[1].process(addr node)
ctx.respChannel.send( response )

poll()

tearDownForeignThreadGc()

proc createWakuThread*(configJson: cstring): Result[void, string] =
proc createWakuThread*(): Result[void, string] =
## This proc is called from the main thread and it creates
## the Waku working thread.

Expand All @@ -148,21 +68,15 @@ proc createWakuThread*(configJson: cstring): Result[void, string] =
ctx.reqChannel.open()
ctx.respChannel.open()

let newNodeRes = createNode(configJson)
if newNodeRes.isErr():
return err(newNodeRes.error)

ctx.node = newNodeRes.get()

running.store(true)

try:
createThread(ctx.thread, run, ctx)
except ResourceExhaustedError:
except ValueError, ResourceExhaustedError:
# and freeShared for typed allocations!
freeShared(ctx)

return err("failed to create a thread: " & getCurrentExceptionMsg())
return err("failed to create the Waku thread: " & getCurrentExceptionMsg())

return ok()

Expand Down

0 comments on commit 68e8d9a

Please sign in to comment.