Skip to content

Commit

Permalink
feat: eventCallback per wakunode and userData (#2418)
Browse files Browse the repository at this point in the history
* feat: store event callback in `Context`
* feat: add userData to callbacks
  • Loading branch information
richard-ramos authored Feb 13, 2024
1 parent d58aca0 commit 707f3e8
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 38 deletions.
2 changes: 1 addition & 1 deletion examples/cbindings/waku_example.c
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ int main(int argc, char** argv) {
printf("Bind addr: %s:%u\n", cfgNode.host, cfgNode.port);
printf("Waku Relay enabled: %s\n", cfgNode.relay == 1 ? "YES": "NO");

waku_set_event_callback(event_handler, userData);
waku_set_event_callback(ctx, event_handler, userData);
waku_start(ctx, event_handler, userData);

printf("Establishing connection with: %s\n", cfgNode.peers);
Expand Down
3 changes: 2 additions & 1 deletion library/callback.nim
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
type
WakuCallBack* = proc(callerRet: cint,
msg: ptr cchar,
len: csize_t) {.cdecl, gcsafe, raises: [].}
len: csize_t,
userData: pointer) {.cdecl, gcsafe, raises: [].}
5 changes: 3 additions & 2 deletions library/libwaku.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
extern "C" {
#endif

typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len);
typedef void (*WakuCallBack) (int callerRet, const char* msg, size_t len, void* userData);

// Creates a new instance of the waku node.
// Sets up the waku node from the given configuration.
Expand All @@ -39,7 +39,8 @@ int waku_version(void* ctx,
WakuCallBack callback,
void* userData);

void waku_set_event_callback(WakuCallBack callback,
void waku_set_event_callback(void* ctx,
WakuCallBack callback,
void* userData);

int waku_content_topic(void* ctx,
Expand Down
66 changes: 32 additions & 34 deletions library/libwaku.nim
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,19 @@ const RET_MISSING_CALLBACK: cint = 2
################################################################################
### Not-exported components

# May keep a reference to a callback defined externally
var extEventCallback*: WakuCallBack = nil

proc relayEventCallback(pubsubTopic: PubsubTopic,
msg: WakuMessage): Future[void] {.async.} =
# Callback that hadles the Waku Relay events. i.e. messages or errors.
if not isNil(extEventCallback):
try:
let event = $JsonMessageEvent.new(pubsubTopic, msg)
extEventCallback(RET_OK, unsafeAddr event[0], cast[csize_t](len(event)))
except Exception,CatchableError:
let msg = "Exception when calling 'eventCallBack': " &
getCurrentExceptionMsg()
extEventCallback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
else:
error "extEventCallback is nil"
proc relayEventCallback(ctx: ptr Context): WakuRelayHandler =
return proc (pubsubTopic: PubsubTopic, msg: WakuMessage): Future[system.void]{.async.} =
# Callback that hadles the Waku Relay events. i.e. messages or errors.
if not isNil(ctx[].eventCallback):
try:
let event = $JsonMessageEvent.new(pubsubTopic, msg)
cast[WakuCallBack](ctx[].eventCallback)(RET_OK, unsafeAddr event[0], cast[csize_t](len(event)), nil)
except Exception,CatchableError:
let msg = "Exception when calling 'eventCallBack': " &
getCurrentExceptionMsg()
cast[WakuCallBack](ctx[].eventCallback)(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), nil)
else:
error "eventCallback is nil"

### End of not-exported components
################################################################################
Expand All @@ -76,7 +73,7 @@ proc waku_new(configJson: cstring,
## Create the Waku thread that will keep waiting for req from the main thread.
var ctx = waku_thread.createWakuThread().valueOr:
let msg = "Error in createWakuThread: " & $error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil

ctx.userData = userData
Expand All @@ -89,7 +86,7 @@ proc waku_new(configJson: cstring,
configJson))
if sendReqRes.isErr():
let msg = $sendReqRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return nil

return ctx
Expand All @@ -104,12 +101,13 @@ proc waku_version(ctx: ptr Context,
return RET_MISSING_CALLBACK

callback(RET_OK, cast[ptr cchar](WakuNodeVersionString),
cast[csize_t](len(WakuNodeVersionString)))
cast[csize_t](len(WakuNodeVersionString)), userData)

return RET_OK

proc waku_set_event_callback(callback: WakuCallBack) {.dynlib, exportc.} =
extEventCallback = callback
proc waku_set_event_callback(ctx: ptr Context,
callback: WakuCallBack) {.dynlib, exportc.} =
ctx[].eventCallback = cast[pointer](callback)

proc waku_content_topic(ctx: ptr Context,
appName: cstring,
Expand All @@ -130,7 +128,7 @@ proc waku_content_topic(ctx: ptr Context,
let encodingStr = encoding.alloc()

let contentTopic = fmt"/{$appStr}/{appVersion}/{$ctnStr}/{$encodingStr}"
callback(RET_OK, unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic)))
callback(RET_OK, unsafeAddr contentTopic[0], cast[csize_t](len(contentTopic)), userData)

deallocShared(appStr)
deallocShared(ctnStr)
Expand All @@ -152,7 +150,7 @@ proc waku_pubsub_topic(ctx: ptr Context,
let topicNameStr = topicName.alloc()

let outPubsubTopic = fmt"/waku/2/{$topicNameStr}"
callback(RET_OK, unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic)))
callback(RET_OK, unsafeAddr outPubsubTopic[0], cast[csize_t](len(outPubsubTopic)), userData)

deallocShared(topicNameStr)

Expand All @@ -168,7 +166,7 @@ proc waku_default_pubsub_topic(ctx: ptr Context,
if isNil(callback):
return RET_MISSING_CALLBACK

callback(RET_OK, cast[ptr cchar](DefaultPubsubTopic), cast[csize_t](len(DefaultPubsubTopic)))
callback(RET_OK, cast[ptr cchar](DefaultPubsubTopic), cast[csize_t](len(DefaultPubsubTopic)), userData)

return RET_OK

Expand All @@ -194,7 +192,7 @@ proc waku_relay_publish(ctx: ptr Context,
except JsonParsingError:
deallocShared(jwm)
let msg = fmt"Error parsing json message: {getCurrentExceptionMsg()}"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

deallocShared(jwm)
Expand All @@ -215,7 +213,7 @@ proc waku_relay_publish(ctx: ptr Context,
)
except KeyError:
let msg = fmt"Problem building the WakuMessage: {getCurrentExceptionMsg()}"
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

let pst = pubSubTopic.alloc()
Expand All @@ -230,13 +228,13 @@ proc waku_relay_publish(ctx: ptr Context,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.PUBLISH,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback),
WakuRelayHandler(relayEventCallback(ctx)),
wakuMessage))
deallocShared(pst)

if sendReqRes.isErr():
let msg = $sendReqRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

return RET_OK
Expand Down Expand Up @@ -274,18 +272,18 @@ proc waku_relay_subscribe(
ctx[].userData = userData

let pst = pubSubTopic.alloc()

var cb = relayEventCallback(ctx)
let sendReqRes = waku_thread.sendRequestToWakuThread(
ctx,
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback)))
WakuRelayHandler(cb)))
deallocShared(pst)

if sendReqRes.isErr():
let msg = $sendReqRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

return RET_OK
Expand All @@ -306,12 +304,12 @@ proc waku_relay_unsubscribe(
RequestType.RELAY,
RelayRequest.createShared(RelayMsgType.SUBSCRIBE,
PubsubTopic($pst),
WakuRelayHandler(relayEventCallback)))
WakuRelayHandler(relayEventCallback(ctx))))
deallocShared(pst)

if sendReqRes.isErr():
let msg = $sendReqRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

return RET_OK
Expand All @@ -334,7 +332,7 @@ proc waku_connect(ctx: ptr Context,
chronos.milliseconds(timeoutMs)))
if connRes.isErr():
let msg = $connRes.error
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)))
callback(RET_ERR, unsafeAddr msg[0], cast[csize_t](len(msg)), userData)
return RET_ERR

return RET_OK
Expand Down
1 change: 1 addition & 0 deletions library/waku_thread/waku_thread.nim
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type
respChannel: ChannelSPSCSingle[ptr InterThreadResponse]
respSignal: ThreadSignalPtr
userData*: pointer
eventCallback*: pointer

# To control when the thread is running
var running: Atomic[bool]
Expand Down

0 comments on commit 707f3e8

Please sign in to comment.