Skip to content

Commit

Permalink
Thread-safe comms between main thread & Waku Thread - ChannelSPSCSingle
Browse files Browse the repository at this point in the history
  • Loading branch information
Ivansete-status committed Sep 1, 2023
1 parent 68e8d9a commit ae48b46
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 104 deletions.
5 changes: 5 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,8 @@
url = https://github.com/nitely/nim-unicodedb.git
ignore = untracked
branch = master
[submodule "vendor/nim-taskpools"]
path = vendor/nim-taskpools
url = https://github.com/status-im/nim-taskpools.git
ignore = untracked
branch = stable
44 changes: 27 additions & 17 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import
../../../waku/waku_relay/protocol,
./events/json_message_event,
./waku_thread/waku_thread,
./waku_thread/inter_thread_communication/node_lifecycle_request,
./waku_thread/inter_thread_communication/peer_manager_request,
./waku_thread/inter_thread_communication/protocols/relay_request,
./waku_thread/inter_thread_communication/requests/node_lifecycle_request,
./waku_thread/inter_thread_communication/requests/peer_manager_request,
./waku_thread/inter_thread_communication/requests/protocols/relay_request,
./waku_thread/inter_thread_communication/waku_thread_request,
./alloc

################################################################################
Expand Down Expand Up @@ -79,9 +80,10 @@ proc waku_new(configJson: cstring,
return RET_ERR

let sendReqRes = waku_thread.sendRequestToWakuThread(
NodeLifecycleRequest.new(
NodeLifecycleMsgType.CREATE_NODE,
configJson))
RequestType.LIFECYCLE,
NodeLifecycleRequest.new(
NodeLifecycleMsgType.CREATE_NODE,
configJson))
if sendReqRes.isErr():
let msg = $sendReqRes.error
onErrCb(unsafeAddr msg[0], cast[csize_t](len(msg)))
Expand Down Expand Up @@ -199,10 +201,11 @@ proc waku_relay_publish(pubSubTopic: cstring,
$pst

let sendReqRes = waku_thread.sendRequestToWakuThread(
RelayRequest.new(RelayMsgType.PUBLISH,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback),
wakuMessage))
RequestType.RELAY,
RelayRequest.new(RelayMsgType.PUBLISH,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback),
wakuMessage))
deallocShared(pst)

if sendReqRes.isErr():
Expand All @@ -214,22 +217,26 @@ proc waku_relay_publish(pubSubTopic: cstring,

proc waku_start() {.dynlib, exportc.} =
discard waku_thread.sendRequestToWakuThread(
NodeLifecycleRequest.new(
NodeLifecycleMsgType.START_NODE))
RequestType.LIFECYCLE,
NodeLifecycleRequest.new(
NodeLifecycleMsgType.START_NODE))

proc waku_stop() {.dynlib, exportc.} =
discard waku_thread.sendRequestToWakuThread(
NodeLifecycleRequest.new(
NodeLifecycleMsgType.STOP_NODE))
RequestType.LIFECYCLE,
NodeLifecycleRequest.new(
NodeLifecycleMsgType.STOP_NODE))

proc waku_relay_subscribe(
pubSubTopic: cstring,
onErrCb: WakuCallBack): cint
{.dynlib, exportc.} =

let pst = pubSubTopic.alloc()

let sendReqRes = waku_thread.sendRequestToWakuThread(
RelayRequest.new(RelayMsgType.SUBSCRIBE,
RequestType.RELAY,
RelayRequest.new(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback)))
deallocShared(pst)
Expand All @@ -247,8 +254,10 @@ proc waku_relay_unsubscribe(
{.dynlib, exportc.} =

let pst = pubSubTopic.alloc()

let sendReqRes = waku_thread.sendRequestToWakuThread(
RelayRequest.new(RelayMsgType.UNSUBSCRIBE,
RequestType.RELAY,
RelayRequest.new(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback)))
deallocShared(pst)
Expand All @@ -266,7 +275,8 @@ proc waku_connect(peerMultiAddr: cstring,
{.dynlib, exportc.} =

let connRes = waku_thread.sendRequestToWakuThread(
PeerManagementRequest.new(
RequestType.PEER_MANAGER,
PeerManagementRequest.new(
PeerManagementMsgType.CONNECT_TO,
$peerMultiAddr,
chronos.milliseconds(timeoutMs)))
Expand Down
21 changes: 0 additions & 21 deletions library/waku_thread/inter_thread_communication/request.nim

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,20 @@ import
stew/results,
stew/shims/net
import
../../../waku/common/enr/builder,
../../../waku/waku_enr/capabilities,
../../../waku/waku_enr/multiaddr,
../../../waku/waku_enr/sharding,
../../../waku/waku_core/message/message,
../../../waku/waku_core/topics/pubsub_topic,
../../../waku/node/peer_manager/peer_manager,
../../../waku/waku_core,
../../../waku/node/waku_node,
../../../waku/node/builder,
../../../waku/node/config,
../../../waku/waku_relay/protocol,
../../events/[json_error_event,json_message_event,json_base_event],
../config,
./request
../../../../waku/common/enr/builder,
../../../../waku/waku_enr/capabilities,
../../../../waku/waku_enr/multiaddr,
../../../../waku/waku_enr/sharding,
../../../../waku/waku_core/message/message,
../../../../waku/waku_core/topics/pubsub_topic,
../../../../waku/node/peer_manager/peer_manager,
../../../../waku/waku_core,
../../../../waku/node/waku_node,
../../../../waku/node/builder,
../../../../waku/node/config,
../../../../waku/waku_relay/protocol,
../../../events/[json_error_event,json_message_event,json_base_event],
../../config

type
NodeLifecycleMsgType* = enum
Expand All @@ -29,18 +28,22 @@ type
STOP_NODE

type
NodeLifecycleRequest* = ref object of InterThreadRequest
NodeLifecycleRequest* = object
operation: NodeLifecycleMsgType
configJson: cstring ## Only used in 'CREATE_NODE' operation

proc new*(T: type NodeLifecycleRequest,
op: NodeLifecycleMsgType,
configJson: cstring = ""): T =
configJson: cstring = ""): ptr NodeLifecycleRequest =

return NodeLifecycleRequest(operation: op, configJson: configJson)
var ret = cast[ptr NodeLifecycleRequest](
allocShared0(sizeof(NodeLifecycleRequest)))
ret[].operation = op
ret[].configJson = configJson
return ret

proc createNode(configJson: cstring):
Future[Result[WakuNode, string]] {.async.} =
Future[Result[WakuNode, string]] {.async.} =

var privateKey: PrivateKey
var netConfig = NetConfig.init(ValidIpAddress.init("127.0.0.1"),
Expand Down Expand Up @@ -108,8 +111,10 @@ proc createNode(configJson: cstring):

return ok(newNode)

method process*(self: NodeLifecycleRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
proc process*(self: ptr NodeLifecycleRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =

defer: deallocShared(self)

case self.operation:
of CREATE_NODE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,29 @@ import
stew/results,
stew/shims/net
import
../../../waku/node/waku_node,
./request
../../../../waku/node/waku_node

type
PeerManagementMsgType* = enum
CONNECT_TO

type
PeerManagementRequest* = ref object of InterThreadRequest
PeerManagementRequest* = object
operation: PeerManagementMsgType
peerMultiAddr: string
dialTimeout: Duration

proc new*(T: type PeerManagementRequest,
op: PeerManagementMsgType,
peerMultiAddr: string,
dialTimeout: Duration): T =
dialTimeout: Duration): ptr PeerManagementRequest =

return PeerManagementRequest(operation: op,
peerMultiAddr: peerMultiAddr,
dialTimeout: dialTimeout)
var ret = cast[ptr PeerManagementRequest](
allocShared0(sizeof(PeerManagementRequest)))
ret[].operation = op
ret[].peerMultiAddr = peerMultiAddr
ret[].dialTimeout = dialTimeout
return ret

proc connectTo(node: WakuNode,
peerMultiAddr: string,
Expand All @@ -46,13 +48,15 @@ proc connectTo(node: WakuNode,

return ok()

method process*(self: PeerManagementRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
proc process*(self: ptr PeerManagementRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =

defer: deallocShared(self)

case self.operation:

of CONNECT_TO:
let ret = node[].connectTo(self.peerMultiAddr, self.dialTimeout)
let ret = node[].connectTo(self[].peerMultiAddr, self[].dialTimeout)
if ret.isErr():
return err(ret.error)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@ import
stew/results,
stew/shims/net
import
../../../../waku/waku_core/message/message,
../../../../waku/node/waku_node,
../../../../waku/waku_core/topics/pubsub_topic,
../../../../waku/waku_relay/protocol,
../request
../../../../../waku/waku_core/message/message,
../../../../../waku/node/waku_node,
../../../../../waku/waku_core/topics/pubsub_topic,
../../../../../waku/waku_relay/protocol

type
RelayMsgType* = enum
Expand All @@ -20,7 +19,7 @@ type
PUBLISH

type
RelayRequest* = ref object of InterThreadRequest
RelayRequest* = object
operation: RelayMsgType
pubsubTopic: PubsubTopic
relayEventCallback: WakuRelayHandler # not used in 'PUBLISH' requests
Expand All @@ -30,15 +29,19 @@ proc new*(T: type RelayRequest,
op: RelayMsgType,
pubsubTopic: PubsubTopic,
relayEventCallback: WakuRelayHandler = nil,
message = WakuMessage()): T =
message = WakuMessage()): ptr RelayRequest =

return RelayRequest(operation: op,
pubsubTopic: pubsubTopic,
relayEventCallback: relayEventCallback,
message: message)
var ret = cast[ptr RelayRequest](allocShared0(sizeof(RelayRequest)))
ret[].operation = op
ret[].pubsubTopic = pubsubTopic
ret[].relayEventCallback = relayEventCallback
ret[].message = message
return ret

method process*(self: RelayRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =
proc process*(self: ptr RelayRequest,
node: ptr WakuNode): Future[Result[string, string]] {.async.} =

defer: deallocShared(self)

if node.wakuRelay.isNil():
return err("Operation not supported without Waku Relay enabled.")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

## This file contains the base message request type that will be handled.
## The requests are created by the main thread and processed by
## the Waku Thread.

import
std/json,
stew/results
import
chronos
import
../../../waku/node/waku_node,
./requests/node_lifecycle_request,
./requests/peer_manager_request,
./requests/protocols/relay_request

type
RequestType* {.pure.} = enum
LIFECYCLE,
PEER_MANAGER,
RELAY,

type
InterThreadRequest* = object
reqType: RequestType
reqContent: pointer

proc new*(T: type InterThreadRequest,
reqType: RequestType,
reqContent: pointer): ptr InterThreadRequest =
var ret = cast[ptr InterThreadRequest](
allocShared0(sizeof(InterThreadRequest)))
ret[].reqType = reqType
ret[].reqContent = reqContent
return ret

proc process*(T: type InterThreadRequest,
request: ptr InterThreadRequest,
node: ptr WakuNode):
Result[string, string] =
## Processes the request and deallocates its memory
defer: deallocShared(request)

echo "Request received: " & $request[].reqType

case request[].reqType
of LIFECYCLE:
waitFor cast[ptr NodeLifecycleRequest](request[].reqContent).process(node)
of PEER_MANAGER:
waitFor cast[ptr PeerManagementRequest](request[].reqContent).process(node)
of RELAY:
waitFor cast[ptr RelayRequest](request[].reqContent).process(node)

proc `$`*(self: InterThreadRequest): string =
return $self.reqType
Loading

0 comments on commit ae48b46

Please sign in to comment.