From 5e3c9fdfa15ff9d94527f6415cfc2275a91445d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Fri, 15 Dec 2023 10:46:21 -0400 Subject: [PATCH] feat(c-bindings): support creating multiple instances (#929) --- examples/c-bindings/main.c | 31 +-- library/c/README.md | 286 ++++++++++++++++------------ library/c/api.go | 163 ++++++++++++---- library/c/api_discovery.go | 15 +- library/c/api_encoding.go | 8 +- library/c/api_filter.go | 34 ++-- library/c/api_legacy_filter.go | 18 +- library/c/api_lightpush.go | 8 +- library/c/api_relay.go | 42 ++-- library/c/api_store.go | 16 +- library/c/cgo_utils.go | 11 ++ library/discovery.go | 33 ++-- library/filter.go | 50 ++--- library/legacy_filter.go | 26 +-- library/lightpush.go | 16 +- library/mobile/api.go | 124 +++++++++--- library/mobile/api_discovery.go | 27 ++- library/mobile/api_filter.go | 36 +++- library/mobile/api_legacy_filter.go | 18 +- library/mobile/api_lightpush.go | 9 +- library/mobile/api_relay.go | 45 ++++- library/mobile/api_store.go | 18 +- library/mobile/signals.go | 9 +- library/node.go | 228 +++++++++++++++------- library/relay.go | 54 +++--- library/signals.c | 11 +- library/signals.go | 27 ++- library/store.go | 24 +-- 28 files changed, 918 insertions(+), 469 deletions(-) diff --git a/examples/c-bindings/main.c b/examples/c-bindings/main.c index b46f4a7a4..049e1f6ea 100644 --- a/examples/c-bindings/main.c +++ b/examples/c-bindings/main.c @@ -25,7 +25,7 @@ void on_error(int ret, const char *result, void *user_data) return; } - printf("function execution failed. Returned code: %d\n", ret); + printf("function execution failed. Returned code: %d, %s\n", ret, result); exit(1); } @@ -33,7 +33,7 @@ void on_response(int ret, const char *result, void *user_data) { if (ret != 0) { - printf("function execution failed. Returned code: %d\n", ret); + printf("function execution failed. Returned code: %d, %s\n", ret, result); exit(1); } @@ -119,26 +119,26 @@ void callBack(int ret, const char *signal, void *user_data) int main(int argc, char *argv[]) { - // Set callback to be executed each time a message is received - waku_set_event_callback(callBack); - // configJSON can be NULL too to use defaults. Any value not defined will have // a default set char *configJSON = "{\"host\": \"0.0.0.0\", \"port\": 60000, " "\"logLevel\":\"error\", \"store\":true}"; - waku_new(configJSON, on_error, NULL); + void* ctx = waku_new(configJSON, on_error, NULL); + + // Set callback to be executed each time a message is received + waku_set_event_callback(ctx, callBack); // Start the node, enabling the waku protocols - waku_start(on_error, NULL); + waku_start(ctx, on_error, NULL); // Obtain the node's peerID char *peerID = NULL; - waku_peerid(on_response, (void *)&peerID); + waku_peerid(ctx, on_response, (void *)&peerID); printf("PeerID: %s\n", peerID); // Obtain the node's multiaddresses char *addresses = NULL; - waku_listen_addresses(on_response, (void *)&addresses); + waku_listen_addresses(ctx, on_response, (void *)&addresses); printf("Addresses: %s\n", addresses); // Build a content topic @@ -154,12 +154,12 @@ int main(int argc, char *argv[]) // To use dns discovery, and retrieve nodes from a enrtree url char *discoveredNodes = NULL; - waku_dns_discovery("enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im", + waku_dns_discovery(ctx, "enrtree://AO47IDOLBKH72HIZZOXQP6NMRESAN7CHYWIBNXDXWRJRZWLODKII6@test.wakuv2.nodes.status.im", "", 0, on_response, (void *)&discoveredNodes); printf("Discovered nodes: %s\n", discoveredNodes); // Connect to a node - waku_connect("/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/30303/" + waku_connect(ctx, "/dns4/node-01.do-ams3.wakuv2.test.statusim.net/tcp/30303/" "p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ", 0, on_response, NULL); @@ -176,7 +176,7 @@ int main(int argc, char *argv[]) sprintf(contentFilter, "{\"pubsubTopic\":\"%s\",\"contentTopics\":[\"%s\"]}", defaultPubsubTopic, contentTopic); - waku_relay_subscribe(contentFilter, on_error, NULL); + waku_relay_subscribe(ctx, contentFilter, on_error, NULL); int i = 0; int version = 1; @@ -202,7 +202,7 @@ int main(int argc, char *argv[]) // Broadcast via waku relay char *messageID = NULL; - waku_relay_publish(encodedMessage, defaultPubsubTopic, 0, on_response, + waku_relay_publish(ctx, encodedMessage, defaultPubsubTopic, 0, on_response, (void *)&messageID); printf("MessageID: %s\n", messageID); @@ -221,7 +221,10 @@ int main(int argc, char *argv[]) // printf("%s\n", local_result); // Stop the node's execution - waku_stop(on_response, NULL); + waku_stop(ctx, on_response, NULL); + + // Release resources allocated to waku + waku_free(ctx, on_response, NULL); // TODO: free all char* diff --git a/library/c/README.md b/library/c/README.md index 83d3ead68..0720b8f5a 100644 --- a/library/c/README.md +++ b/library/c/README.md @@ -479,59 +479,84 @@ If a key is `undefined`, or `null`, a default value will be set. If using `secur - `certPath`: secure websocket certificate path - `keyPath`: secure websocket key path +### `extern void* waku_init()` -### `extern int waku_new(char* jsonConfig, WakuCallBack onErrCb void* userData)` +Allocate memory for a waku node. + +**Returns** + +The result of this function must be passed to all waku_* functions that require a `void* ctx` + +### `extern int waku_new(void* ctx, char* jsonConfig, WakuCallBack onErrCb void* userData)` Instantiates a Waku node. **Parameters** -1. `char* jsonConfig`: JSON string containing the options used to initialize a go-waku node. +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* jsonConfig`: JSON string containing the options used to initialize a go-waku node. Type [`JsonConfig`](#jsonconfig-type). It can be `NULL` to use defaults. -2. `WakuCallBack onErrCb`: callback to be executed if the function fails -3. `void* userData`: used to pass custom information to the callback function +3. `WakuCallBack onErrCb`: callback to be executed if the function fails +4. `void* userData`: used to pass custom information to the callback function **Returns** A status code. Refer to the [`Status codes`](#status-codes) section for possible values. -### `extern int waku_start(WakuCallBack onErrCb void* userData)` +### `extern int waku_start(void* ctx, WakuCallBack onErrCb void* userData)` Start a Waku node mounting all the protocols that were enabled during the Waku node instantiation. **Parameters** -1. `WakuCallBack onErrCb`: callback to be executed if the function fails -2. `void* userData`: used to pass custom information to the callback function +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `WakuCallBack onErrCb`: callback to be executed if the function fails +3. `void* userData`: used to pass custom information to the callback function **Returns** A status code. Refer to the [`Status codes`](#status-codes) section for possible values. -### `extern int waku_stop(WakuCallBack onErrCb void* userData)` +### `extern int waku_stop(void* ctx, WakuCallBack onErrCb void* userData)` Stops a Waku node. **Parameters** -1. `WakuCallBack onErrCb`: callback to be executed if the function fails -2. `void* userData`: used to pass custom information to the callback function +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `WakuCallBack onErrCb`: callback to be executed if the function fails +3. `void* userData`: used to pass custom information to the callback function **Returns** A status code. Refer to the [`Status codes`](#status-codes) section for possible values. +### `extern int waku_free(void* ctx, WakuCallBack onErrCb void* userData)` -### `extern int waku_peerid(WakuCallBack cb, void* userData)` +Release the resources allocated to a waku node (stopping the node first if necessary) + +**Parameters** + +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `WakuCallBack onErrCb`: callback to be executed if the function fails +3. `void* userData`: used to pass custom information to the callback function + +**Returns** + +A status code. Refer to the [`Status codes`](#status-codes) section for possible values. + + +### `extern int waku_peerid(void* ctx, WakuCallBack cb, void* userData)` Get the peer ID of the waku node. **Parameters** -1. `WakuCallBack cb`: callback to be executed. -2. `void* userData`: used to pass custom information to the callback function +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `WakuCallBack cb`: callback to be executed. +3. `void* userData`: used to pass custom information to the callback function **Returns** @@ -539,14 +564,15 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible If the function execution fails, `cb` will receive a string containing an error. If the function is executed succesfully, `cb` will receive the base58 encoded peer ID, for example `QmWjHKUrXDHPCwoWXpUZ77E8o6UbAoTTZwf1AD1tDC4KNP`. -### `extern int waku_listen_addresses(WakuCallBack cb, void* userData)` +### `extern int waku_listen_addresses(void* ctx, WakuCallBack cb, void* userData)` Get the multiaddresses the Waku node is listening to. **Parameters** -1. `WakuCallBack cb`: callback to be executed -2. `void* userData`: used to pass custom information to the callback function +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `WakuCallBack cb`: callback to be executed +3. `void* userData`: used to pass custom information to the callback function **Returns** @@ -567,16 +593,17 @@ For example: ## Connecting to peers -### `extern int waku_add_peer(char* address, char* protocolId, WakuCallBack cb, void* userData)` +### `extern int waku_add_peer(void* ctx, char* address, char* protocolId, WakuCallBack cb, void* userData)` Add a node multiaddress and protocol to the waku node's peerstore. **Parameters** -1. `char* address`: A multiaddress (with peer id) to reach the peer being added. -2. `char* protocolId`: A protocol we expect the peer to support. -3. `WakuCallBack cb`: callback to be executed -4. `void* userData`: used to pass custom information to the callback function +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* address`: A multiaddress (with peer id) to reach the peer being added. +3. `char* protocolId`: A protocol we expect the peer to support. +4. `WakuCallBack cb`: callback to be executed +5. `void* userData`: used to pass custom information to the callback function **Returns** @@ -585,14 +612,15 @@ If the function execution fails, `cb` will receive a string containing an error. If the function is executed succesfully, `cb` will receive the base 58 peer ID of the peer that was added. For example: `QmWjHKUrXDHPCwoWXpUZ77E8o6UbAoTTZwf1AD1tDC4KNP` -### `extern int waku_connect(char* address, int timeoutMs, WakuCallBack onErrCb void* userData)` +### `extern int waku_connect(void* ctx, char* address, int timeoutMs, WakuCallBack onErrCb void* userData)` Dial peer using a multiaddress. **Parameters** -1. `char* address`: A multiaddress to reach the peer being dialed. -2. `int timeoutMs`: Timeout value in milliseconds to execute the call. +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* address`: A multiaddress to reach the peer being dialed. +3. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. Use `0` for no timeout. @@ -603,48 +631,52 @@ Dial peer using a multiaddress. A status code. Refer to the [`Status codes`](#status-codes) section for possible values. -### `extern int waku_connect_peerid(char* peerId, int timeoutMs, WakuCallBack onErrCb void* userData)` +### `extern int waku_connect_peerid(void* ctx, char* peerId, int timeoutMs, WakuCallBack onErrCb void* userData)` Dial peer using its peer ID. **Parameters** -1`char* peerID`: Peer ID to dial. +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* peerID`: Peer ID to dial. The peer must be already known. It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) or previously dialed with [`waku_connect`](#extern-char-waku_connectchar-address-int-timeoutms). -2. `int timeoutMs`: Timeout value in milliseconds to execute the call. +3. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. Use `0` for no timeout. -3. `WakuCallBack onErrCb`: callback to be executed if the function fails -4. `void* userData`: used to pass custom information to the callback function +4. `WakuCallBack onErrCb`: callback to be executed if the function fails +5. `void* userData`: used to pass custom information to the callback function **Returns** A status code. Refer to the [`Status codes`](#status-codes) section for possible values. -### `extern int waku_disconnect(char* peerId, WakuCallBack onErrCb void* userData)` +### `extern int waku_disconnect(void* ctx, char* peerId, WakuCallBack onErrCb void* userData)` Disconnect a peer using its peerID **Parameters** -1. `char* peerID`: Peer ID to disconnect. -2. `WakuCallBack onErrCb`: callback to be executed if the function fails -3. `void* userData`: used to pass custom information to the callback function +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* peerID`: Peer ID to disconnect. +3. `WakuCallBack onErrCb`: callback to be executed if the function fails +4. `void* userData`: used to pass custom information to the callback function **Returns** A status code. Refer to the [`Status codes`](#status-codes) section for possible values. -### `extern int waku_peer_cnt(WakuCallBack cb, void* userData)` +### `extern int waku_peer_cnt(void* ctx, WakuCallBack cb, void* userData)` Get number of connected peers. **Parameters** -1. `WakuCallBack cb`: callback to be executed -2. `void* userData`: used to pass custom information to the callback function + +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `WakuCallBack cb`: callback to be executed +3. `void* userData`: used to pass custom information to the callback function **Returns** @@ -652,13 +684,15 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible If the function execution fails, `cb` will receive a string containing an error. If the function is executed succesfully, `cb` will receive the number of connected peers. -### `extern int waku_peers(WakuCallBack cb, void* userData)` +### `extern int waku_peers(void* ctx, WakuCallBack cb, void* userData)` Retrieve the list of peers known by the Waku node. **Parameters** -1. `WakuCallBack cb`: callback to be executed if the function is succesful -2. `void* userData`: used to pass custom information to the callback function + +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `WakuCallBack cb`: callback to be executed if the function is succesful +3. `void* userData`: used to pass custom information to the callback function **Returns** @@ -748,21 +782,22 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible If the function is executed succesfully, `onOkCb` will receive the default pubsub topic: `/waku/2/default-waku/proto` -### `extern int waku_relay_publish(char* messageJson, char* pubsubTopic, int timeoutMs, WakuCallBack cb, void* userData)` +### `extern int waku_relay_publish(void* ctx, char* messageJson, char* pubsubTopic, int timeoutMs, WakuCallBack cb, void* userData)` Publish a message using Waku Relay. **Parameters** -1. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type). -2. `char* pubsubTopic`: pubsub topic on which to publish the message. +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type). +3. `char* pubsubTopic`: pubsub topic on which to publish the message. If `NULL`, it derives the pubsub topic from content-topic based on autosharding. -3. `int timeoutMs`: Timeout value in milliseconds to execute the call. +4. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. Use `0` for no timeout. -4. `WakuCallBack cb`: callback to be executed -5. `void* userData`: used to pass custom information to the callback function +5. `WakuCallBack cb`: callback to be executed +6. `void* userData`: used to pass custom information to the callback function **Returns** @@ -770,16 +805,17 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible If the function execution fails, `cb` will receive a string containing an error. If the function is executed succesfully, `onOkCb` will receive the message ID. -### `extern int waku_relay_enough_peers(char* pubsubTopic, WakuCallBack cb, void* userData)` +### `extern int waku_relay_enough_peers(void* ctx, char* pubsubTopic, WakuCallBack cb, void* userData)` Determine if there are enough peers to publish a message on a given pubsub topic. **Parameters** -1. `char* pubsubTopic`: Pubsub topic to verify. +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* pubsubTopic`: Pubsub topic to verify. If `NULL`, it verifies the number of peers in the default pubsub topic. -2. `WakuCallBack cb`: callback to be executed -3. `void* userData`: used to pass custom information to the callback function +3. `WakuCallBack cb`: callback to be executed +4. `void* userData`: used to pass custom information to the callback function **Returns** @@ -787,27 +823,30 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible If the function execution fails, `cb` will receive a string containing an error. If the function is executed succesfully, `onOkCb` will receive a string `boolean` indicating whether there are enough peers, i.e. `true` or `false` -### `extern int waku_relay_subscribe(char* filterJSON, WakuCallBack onErrCb, void* userData)` +### `extern int waku_relay_subscribe(void* ctx, char* filterJSON, WakuCallBack onErrCb, void* userData)` Subscribe to a Waku Relay pubsub topic to receive messages. **Parameters** -1. `char* filterJSON`: JSON string containing the [`ContentFilter`](#contentfilter-type) with the criteria of the messages to receive -2. `WakuCallBack onErrCb`: callback to be executed if the function fails -3. `void* userData`: used to pass custom information to the callback function +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* filterJSON`: JSON string containing the [`ContentFilter`](#contentfilter-type) with the criteria of the messages to receive +3. `WakuCallBack onErrCb`: callback to be executed if the function fails +4. `void* userData`: used to pass custom information to the callback function **Returns** A status code. Refer to the [`Status codes`](#status-codes) section for possible values. -### `extern int waku_relay_topics(WakuCallBack cb, void* userData)` +### `extern int waku_relay_topics(void* ctx, WakuCallBack cb, void* userData)` Get the list of subscribed pubsub topics in Waku Relay. **Parameters** -1. `WakuCallBack cb`: callback to be executed -2. `void* userData`: used to pass custom information to the callback function + +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `WakuCallBack cb`: callback to be executed +3. `void* userData`: used to pass custom information to the callback function **Returns** @@ -824,7 +863,7 @@ For example: **Events** -When a message is received, a ``"message"` event` is emitted containing the message, pubsub topic, and node ID in which +When a message is received, a `"message"` event` is emitted containing the message, pubsub topic, and node ID in which the message was received. The `event` type is [`JsonMessageEvent`](#jsonmessageevent-type). @@ -847,16 +886,17 @@ For Example: } ``` -### `extern int waku_relay_unsubscribe(char* filterJSON, WakuCallBack onErrCb, void* userData)` +### `extern int waku_relay_unsubscribe(void* ctx, char* filterJSON, WakuCallBack onErrCb, void* userData)` Closes the pubsub subscription to pubsub topic matching a criteria. No more messages will be received from this pubsub topic. **Parameters** -1. `char* filterJSON`: JSON string containing the [`ContentFilter`](#contentfilter-type) with the criteria of the messages to unsubscribe from -2. `WakuCallBack onErrCb`: callback to be executed if the function fails -3. `void* userData`: used to pass custom information to the callback function +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* filterJSON`: JSON string containing the [`ContentFilter`](#contentfilter-type) with the criteria of the messages to unsubscribe from +3. `WakuCallBack onErrCb`: callback to be executed if the function fails +4. `void* userData`: used to pass custom information to the callback function **Returns** @@ -865,24 +905,25 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible ## Waku Filter -### `extern int waku_filter_subscribe(char* filterJSON, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)` +### `extern int waku_filter_subscribe(void* ctx, char* filterJSON, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)` Creates a subscription to a filter full node matching a content filter.. **Parameters** -1. `char* filterJSON`: JSON string containing the [`ContentFilter`](#contentfilter-type) with the criteria of the messages to receive -2. `char* peerID`: Peer ID to subscribe to. +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* filterJSON`: JSON string containing the [`ContentFilter`](#contentfilter-type) with the criteria of the messages to receive +3. `char* peerID`: Peer ID to subscribe to. The peer must be already known. It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms). Use `NULL` to automatically select a node. -3. `int timeoutMs`: Timeout value in milliseconds to execute the call. +4. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. Use `0` for no timeout. -4. `WakuCallBack cb`: callback to be executed -5. `void* userData`: used to pass custom information to the callback function +5. `WakuCallBack cb`: callback to be executed +6. `void* userData`: used to pass custom information to the callback function **Returns** @@ -932,51 +973,53 @@ For Example: ``` -### `extern int waku_filter_ping(char* peerID, int timeoutMs, WakuCallBack onErrCb, void* userData)` +### `extern int waku_filter_ping(void* ctx, char* peerID, int timeoutMs, WakuCallBack onErrCb, void* userData)` Used to know if a service node has an active subscription for this client **Parameters** -1. `char* peerID`: Peer ID to check for an active subscription +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* peerID`: Peer ID to check for an active subscription The peer must be already known. It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms). -2. `int timeoutMs`: Timeout value in milliseconds to execute the call. +3. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. Use `0` for no timeout. -3. `WakuCallBack onErrCb`: callback to be executed if the function fails -4. `void* userData`: used to pass custom information to the callback function +4. `WakuCallBack onErrCb`: callback to be executed if the function fails +5. `void* userData`: used to pass custom information to the callback function **Returns** A status code. Refer to the [`Status codes`](#status-codes) section for possible values. -### `extern int waku_filter_unsubscribe(filterJSON *C.char, char* peerID, int timeoutMs, WakuCallBack onErrCb, void* userData)` +### `extern int waku_filter_unsubscribe(void* ctx, filterJSON *C.char, char* peerID, int timeoutMs, WakuCallBack onErrCb, void* userData)` Sends a requests to a service node to stop pushing messages matching this filter to this client. It might be used to modify an existing subscription by providing a subset of the original filter criteria **Parameters** -1. `char* filterJSON`: JSON string containing the [`FilterSubscription`](#filtersubscription-type) criteria to unsubscribe from -2. `char* peerID`: Peer ID to unsubscribe from +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* filterJSON`: JSON string containing the [`FilterSubscription`](#filtersubscription-type) criteria to unsubscribe from +3. `char* peerID`: Peer ID to unsubscribe from The peer must be already known. It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms). -3. `int timeoutMs`: Timeout value in milliseconds to execute the call. +4. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. Use `0` for no timeout. -4. `WakuCallBack onErrCb`: callback to be executed if the function fails -5. `void* userData`: used to pass custom information to the callback function +5. `WakuCallBack onErrCb`: callback to be executed if the function fails +6. `void* userData`: used to pass custom information to the callback function **Returns** A status code. Refer to the [`Status codes`](#status-codes) section for possible values. -### `extern int waku_filter_unsubscribe_all(char* peerID, int timeoutMs, WakuCallBack cb, void* userData)` +### `extern int waku_filter_unsubscribe_all(void* ctx, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)` Sends a requests to a service node (or all service nodes) to stop pushing messages @@ -1015,24 +1058,25 @@ For example: ## Waku Legacy Filter -### `extern int waku_legacy_filter_subscribe(char* filterJSON, char* peerID, int timeoutMs, WakuCallBack onErrCb, void* userData)` +### `extern int waku_legacy_filter_subscribe(void* ctx, char* filterJSON, char* peerID, int timeoutMs, WakuCallBack onErrCb, void* userData)` Creates a subscription in a lightnode for messages that matches a content filter and optionally a [PubSub `topic`](https://github.com/libp2p/specs/blob/master/pubsub/README.md#the-topic-descriptor). **Parameters** -1. `char* filterJSON`: JSON string containing the [`LegacyFilterSubscription`](#legacyfiltersubscription-type) to subscribe to. -2. `char* peerID`: Peer ID to subscribe to. +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* filterJSON`: JSON string containing the [`LegacyFilterSubscription`](#legacyfiltersubscription-type) to subscribe to. +3. `char* peerID`: Peer ID to subscribe to. The peer must be already known. It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms). Use `NULL` to automatically select a node. -3. `int timeoutMs`: Timeout value in milliseconds to execute the call. +4. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. Use `0` for no timeout. -4. `WakuCallBack onErrCb`: callback to be executed if the function fails -5. `void* userData`: used to pass custom information to the callback function +5. `WakuCallBack onErrCb`: callback to be executed if the function fails +6. `void* userData`: used to pass custom information to the callback function **Returns** @@ -1063,19 +1107,20 @@ For Example: } ``` -### `extern int waku_legacy_filter_unsubscribe(char* filterJSON, int timeoutMs, WakuCallBack onErrCb, void* userData)` +### `extern int waku_legacy_filter_unsubscribe(void* ctx, char* filterJSON, int timeoutMs, WakuCallBack onErrCb, void* userData)` Removes subscriptions in a light node matching a content filter and, optionally, a [PubSub `topic`](https://github.com/libp2p/specs/blob/master/pubsub/README.md#the-topic-descriptor). **Parameters** -1. `char* filterJSON`: JSON string containing the [`LegacyFilterSubscription`](#filtersubscription-type). -2. `int timeoutMs`: Timeout value in milliseconds to execute the call. +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* filterJSON`: JSON string containing the [`LegacyFilterSubscription`](#filtersubscription-type). +3. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. Use `0` for no timeout. -3. `WakuCallBack onErrCb`: callback to be executed if the function fails -4. `void* userData`: used to pass custom information to the callback function +4. `WakuCallBack onErrCb`: callback to be executed if the function fails +5. `void* userData`: used to pass custom information to the callback function **Returns** @@ -1083,26 +1128,27 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible ## Waku Lightpush -### `extern int waku_lightpush_publish(char* messageJSON, char* topic, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)` +### `extern int waku_lightpush_publish(void* ctx, char* messageJSON, char* topic, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)` Publish a message using Waku Lightpush. **Parameters** -1. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type). -2. `char* pubsubTopic`: pubsub topic on which to publish the message. +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* messageJson`: JSON string containing the [Waku Message](https://rfc.vac.dev/spec/14/) as [`JsonMessage`](#jsonmessage-type). +3. `char* pubsubTopic`: pubsub topic on which to publish the message. If `NULL`, it derives the pubsub topic from content-topic based on autosharding. -3. `char* peerID`: Peer ID supporting the lightpush protocol. +4. `char* peerID`: Peer ID supporting the lightpush protocol. The peer must be already known. It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms). Use `NULL` to automatically select a node. -4. `int timeoutMs`: Timeout value in milliseconds to execute the call. +5. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. Use `0` for no timeout. -5. `WakuCallBack cb`: callback to be executed -6. `void* userData`: used to pass custom information to the callback function +6. `WakuCallBack cb`: callback to be executed +7. `void* userData`: used to pass custom information to the callback function **Returns** @@ -1113,7 +1159,7 @@ If the function is executed succesfully, `cb` will receive the message ID. ## Waku Store -### `extern int waku_store_query(char* queryJSON, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)` +### `extern int waku_store_query(void* ctx, char* queryJSON, char* peerID, int timeoutMs, WakuCallBack cb, void* userData)` Retrieves historical messages on specific content topics. This method may be called with [`PagingOptions`](#pagingoptions-type), to retrieve historical messages on a per-page basis. If the request included [`PagingOptions`](#pagingoptions-type), the node @@ -1122,17 +1168,18 @@ must contain a cursor pointing to the Index from which a new page can be request **Parameters** -1. `char* queryJSON`: JSON string containing the [`StoreQuery`](#storequery-type). -2. `char* peerID`: Peer ID supporting the store protocol. +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* queryJSON`: JSON string containing the [`StoreQuery`](#storequery-type). +3. `char* peerID`: Peer ID supporting the store protocol. The peer must be already known. It must have been added before with [`waku_add_peer`](#extern-char-waku_add_peerchar-address-char-protocolid) or previously dialed with [`waku_connect_peer`](#extern-char-waku_connect_peerchar-address-int-timeoutms). -3. `int timeoutMs`: Timeout value in milliseconds to execute the call. +4. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. Use `0` for no timeout. -4. `WakuCallBack cb`: callback to be executed -2. `void* userData`: used to pass custom information to the callback function +5. `WakuCallBack cb`: callback to be executed +6. `void* userData`: used to pass custom information to the callback function **Returns** @@ -1140,7 +1187,7 @@ A status code. Refer to the [`Status codes`](#status-codes) section for possible If the function execution fails, `cb` will receive a string containing an error. If the function is executed succesfully, `cb` will receive a [`StoreResponse`](#storeresponse-type). -### `extern int waku_store_local_query(char* queryJSON, WakuCallBack cb, void* userData)` +### `extern int waku_store_local_query(void* ctx, char* queryJSON, WakuCallBack cb, void* userData)` Retrieves locally stored historical messages on specific content topics. This method may be called with [`PagingOptions`](#pagingoptions-type), to retrieve historical messages on a per-page basis. If the request included [`PagingOptions`](#pagingoptions-type), the node @@ -1149,13 +1196,14 @@ must contain a cursor pointing to the Index from which a new page can be request **Parameters** -1. `char* queryJSON`: JSON string containing the [`StoreQuery`](#storequery-type). -2. `int timeoutMs`: Timeout value in milliseconds to execute the call. +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* queryJSON`: JSON string containing the [`StoreQuery`](#storequery-type). +3. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. Use `0` for no timeout. -3. `WakuCallBack cb`: callback to be executed -2. `void* userData`: used to pass custom information to the callback function +4. `WakuCallBack cb`: callback to be executed +5. `void* userData`: used to pass custom information to the callback function **Returns** @@ -1265,20 +1313,21 @@ If the function is executed succesfully, `cb` will receive the decoded payload a ## DNS Discovery -### `extern int waku_dns_discovery(char* url, char* nameserver, int timeoutMs, WakuCallBack cb, void* userData)` +### `extern int waku_dns_discovery(void* ctx, char* url, char* nameserver, int timeoutMs, WakuCallBack cb, void* userData)` Returns a list of multiaddress and enrs given a url to a DNS discoverable ENR tree **Parameters** -1. `char* url`: URL containing a discoverable ENR tree -2. `char* nameserver`: The nameserver to resolve the ENR tree url. +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* url`: URL containing a discoverable ENR tree +3. `char* nameserver`: The nameserver to resolve the ENR tree url. If `NULL` or empty, it will automatically use the default system dns. -3. `int timeoutMs`: Timeout value in milliseconds to execute the call. +4. `int timeoutMs`: Timeout value in milliseconds to execute the call. If the function execution takes longer than this value, the execution will be canceled and an error returned. Use `0` for no timeout. -4. `WakuCallBack cb`: callback to be executed -3. `void* userData`: used to pass custom information to the callback function +5. `WakuCallBack cb`: callback to be executed +6. `void* userData`: used to pass custom information to the callback function **Returns** @@ -1302,14 +1351,15 @@ If the function is executed succesfully, `onOkCb` will receive an array objects ## DiscoveryV5 -### `extern int waku_discv5_update_bootnodes(char* bootnodes, WakuCallBack onErrCb, void* userData)` +### `extern int waku_discv5_update_bootnodes(void* ctx, char* bootnodes, WakuCallBack onErrCb, void* userData)` Update the bootnode list used for discovering new peers via DiscoveryV5 **Parameters** -1. `char* bootnodes`: JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]` -2. `WakuCallBack onErrCb`: callback to be executed if the function fails -3. `void* userData`: used to pass custom information to the callback function +1. `void* ctx`: waku node instance, returned by `waku_init`. +2. `char* bootnodes`: JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]` +3. `WakuCallBack onErrCb`: callback to be executed if the function fails +4. `void* userData`: used to pass custom information to the callback function **Returns** diff --git a/library/c/api.go b/library/c/api.go index 7e0019ba5..3810a6ca3 100644 --- a/library/c/api.go +++ b/library/c/api.go @@ -88,41 +88,106 @@ func main() {} // - dns4DomainName: the domain name resolving to the node's public IPv4 address. // //export waku_new -func waku_new(configJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.NewNode(C.GoString(configJSON)) - return onError(err, cb, userData) +func waku_new(configJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) unsafe.Pointer { + if cb == nil { + panic("error: missing callback in waku_new") + } + + cid := C.malloc(C.size_t(unsafe.Sizeof(uintptr(0)))) + pid := (*uint)(cid) + instance := library.Init() + *pid = instance.ID + + err := library.NewNode(instance, C.GoString(configJSON)) + if err != nil { + onError(err, cb, userData) + return nil + } + + return cid } // Starts the waku node // //export waku_start -func waku_start(onErr C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.Start() +func waku_start(ctx unsafe.Pointer, onErr C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, onErr, userData) + } + + err = library.Start(instance) return onError(err, onErr, userData) } // Stops a waku node // //export waku_stop -func waku_stop(onErr C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.Stop() +func waku_stop(ctx unsafe.Pointer, onErr C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, onErr, userData) + } + + err = library.Stop(instance) + return onError(err, onErr, userData) +} + +// Release the resources allocated to a waku node (stopping the node first if necessary) +// +//export waku_free +func waku_free(ctx unsafe.Pointer, onErr C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + return onError(err, onErr, userData) + } + + err = library.Stop(instance) + if err != nil { + return onError(err, onErr, userData) + } + + err = library.Free(instance) + if err == nil { + C.free(ctx) + } + return onError(err, onErr, userData) } // Determine is a node is started or not // //export waku_is_started -func waku_is_started() C.int { - started := library.IsStarted() +func waku_is_started(ctx unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + return 0 + } + + started := library.IsStarted(instance) if started { return 1 } return 0 } -type fn func() (string, error) +type fn func(instance *library.WakuInstance) (string, error) +type fnNoInstance func() (string, error) + +func singleFnExec(f fn, ctx unsafe.Pointer, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, cb, userData) + } + + result, err := f(instance) + if err != nil { + return onError(err, cb, userData) + } + return onSuccesfulResponse(result, cb, userData) +} -func singleFnExec(f fn, cb C.WakuCallBack, userData unsafe.Pointer) C.int { +func singleFnExecNoCtx(f fnNoInstance, cb C.WakuCallBack, userData unsafe.Pointer) C.int { result, err := f() if err != nil { return onError(err, cb, userData) @@ -133,62 +198,77 @@ func singleFnExec(f fn, cb C.WakuCallBack, userData unsafe.Pointer) C.int { // Obtain the peer ID of the waku node // //export waku_peerid -func waku_peerid(cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - return library.PeerID() - }, cb, userData) +func waku_peerid(ctx unsafe.Pointer, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + return library.PeerID(instance) + }, ctx, cb, userData) } // Obtain the multiaddresses the wakunode is listening to // //export waku_listen_addresses -func waku_listen_addresses(cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - return library.ListenAddresses() - }, cb, userData) +func waku_listen_addresses(ctx unsafe.Pointer, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + return library.ListenAddresses(instance) + }, ctx, cb, userData) } // Add node multiaddress and protocol to the wakunode peerstore // //export waku_add_peer -func waku_add_peer(address *C.char, protocolID *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - return library.AddPeer(C.GoString(address), C.GoString(protocolID)) - }, cb, userData) +func waku_add_peer(ctx unsafe.Pointer, address *C.char, protocolID *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + return library.AddPeer(instance, C.GoString(address), C.GoString(protocolID)) + }, ctx, cb, userData) } // Connect to peer at multiaddress. if ms > 0, cancel the function execution if it takes longer than N milliseconds // //export waku_connect -func waku_connect(address *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.Connect(C.GoString(address), int(ms)) +func waku_connect(ctx unsafe.Pointer, address *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, cb, userData) + } + + err = library.Connect(instance, C.GoString(address), int(ms)) return onError(err, cb, userData) } // Connect to known peer by peerID. if ms > 0, cancel the function execution if it takes longer than N milliseconds // //export waku_connect_peerid -func waku_connect_peerid(peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.ConnectPeerID(C.GoString(peerID), int(ms)) +func waku_connect_peerid(ctx unsafe.Pointer, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, cb, userData) + } + + err = library.ConnectPeerID(instance, C.GoString(peerID), int(ms)) return onError(err, cb, userData) } // Close connection to a known peer by peerID // //export waku_disconnect -func waku_disconnect(peerID *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.Disconnect(C.GoString(peerID)) +func waku_disconnect(ctx unsafe.Pointer, peerID *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, cb, userData) + } + + err = library.Disconnect(instance, C.GoString(peerID)) return onError(err, cb, userData) } // Get number of connected peers // //export waku_peer_cnt -func waku_peer_cnt(cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - peerCnt, err := library.PeerCnt() +func waku_peer_cnt(ctx unsafe.Pointer, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + peerCnt, err := library.PeerCnt(instance) return fmt.Sprintf("%d", peerCnt), err - }, cb, userData) + }, ctx, cb, userData) } // Create a content topic string according to RFC 23 @@ -211,15 +291,20 @@ func waku_default_pubsub_topic(cb C.WakuCallBack, userData unsafe.Pointer) C.int // signature for the callback should be `void myCallback(char* signalJSON)` // //export waku_set_event_callback -func waku_set_event_callback(cb C.WakuCallBack) { - library.SetEventCallback(unsafe.Pointer(cb)) +func waku_set_event_callback(ctx unsafe.Pointer, cb C.WakuCallBack) { + instance, err := getInstance(ctx) + if err != nil { + panic(err.Error()) // TODO: refactor to return an error instead of panic + } + + library.SetEventCallback(instance, unsafe.Pointer(cb)) } // Retrieve the list of peers known by the waku node // //export waku_peers -func waku_peers(cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - return library.Peers() - }, cb, userData) +func waku_peers(ctx unsafe.Pointer, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + return library.Peers(instance) + }, ctx, cb, userData) } diff --git a/library/c/api_discovery.go b/library/c/api_discovery.go index c5d009eca..e4bee2ce9 100644 --- a/library/c/api_discovery.go +++ b/library/c/api_discovery.go @@ -21,17 +21,22 @@ import ( // (in milliseconds) is reached, or an error will be returned // //export waku_dns_discovery -func waku_dns_discovery(url *C.char, nameserver *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { +func waku_dns_discovery(ctx unsafe.Pointer, url *C.char, nameserver *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { return library.DNSDiscovery(C.GoString(url), C.GoString(nameserver), int(ms)) - }, cb, userData) + }, ctx, cb, userData) } // Update the bootnode list used for discovering new peers via DiscoveryV5 // The bootnodes param should contain a JSON array containing the bootnode ENRs i.e. `["enr:...", "enr:..."]` // //export waku_discv5_update_bootnodes -func waku_discv5_update_bootnodes(bootnodes *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.SetBootnodes(C.GoString(bootnodes)) +func waku_discv5_update_bootnodes(ctx unsafe.Pointer, bootnodes *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, cb, userData) + } + + err = library.SetBootnodes(instance, C.GoString(bootnodes)) return onError(err, cb, userData) } diff --git a/library/c/api_encoding.go b/library/c/api_encoding.go index a9c38ae96..37d7cb158 100644 --- a/library/c/api_encoding.go +++ b/library/c/api_encoding.go @@ -14,7 +14,7 @@ import ( // //export waku_decode_symmetric func waku_decode_symmetric(messageJSON *C.char, symmetricKey *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { + return singleFnExecNoCtx(func() (string, error) { return library.DecodeSymmetric(C.GoString(messageJSON), C.GoString(symmetricKey)) }, cb, userData) } @@ -23,7 +23,7 @@ func waku_decode_symmetric(messageJSON *C.char, symmetricKey *C.char, cb C.WakuC // //export waku_decode_asymmetric func waku_decode_asymmetric(messageJSON *C.char, privateKey *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { + return singleFnExecNoCtx(func() (string, error) { return library.DecodeAsymmetric(C.GoString(messageJSON), C.GoString(privateKey)) }, cb, userData) } @@ -35,7 +35,7 @@ func waku_decode_asymmetric(messageJSON *C.char, privateKey *C.char, cb C.WakuCa // //export waku_encode_asymmetric func waku_encode_asymmetric(messageJSON *C.char, publicKey *C.char, optionalSigningKey *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { + return singleFnExecNoCtx(func() (string, error) { return library.EncodeAsymmetric(C.GoString(messageJSON), C.GoString(publicKey), C.GoString(optionalSigningKey)) }, cb, userData) } @@ -47,7 +47,7 @@ func waku_encode_asymmetric(messageJSON *C.char, publicKey *C.char, optionalSign // //export waku_encode_symmetric func waku_encode_symmetric(messageJSON *C.char, symmetricKey *C.char, optionalSigningKey *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { + return singleFnExecNoCtx(func() (string, error) { return library.EncodeSymmetric(C.GoString(messageJSON), C.GoString(symmetricKey), C.GoString(optionalSigningKey)) }, cb, userData) } diff --git a/library/c/api_filter.go b/library/c/api_filter.go index caa26a454..5baeab687 100644 --- a/library/c/api_filter.go +++ b/library/c/api_filter.go @@ -24,10 +24,10 @@ import ( // It returns a json object containing the details of the subscriptions along with any errors in case of partial failures // //export waku_filter_subscribe -func waku_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - return library.FilterSubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms)) - }, cb, userData) +func waku_filter_subscribe(ctx unsafe.Pointer, filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + return library.FilterSubscribe(instance, C.GoString(filterJSON), C.GoString(peerID), int(ms)) + }, ctx, cb, userData) } // Used to know if a service node has an active subscription for this client @@ -36,8 +36,13 @@ func waku_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int, cb C.Wa // (in milliseconds) is reached, or an error will be returned // //export waku_filter_ping -func waku_filter_ping(peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.FilterPing(C.GoString(peerID), int(ms)) +func waku_filter_ping(ctx unsafe.Pointer, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, cb, userData) + } + + err = library.FilterPing(instance, C.GoString(peerID), int(ms)) return onError(err, cb, userData) } @@ -55,8 +60,13 @@ func waku_filter_ping(peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsa // (in milliseconds) is reached, or an error will be returned // //export waku_filter_unsubscribe -func waku_filter_unsubscribe(filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.FilterUnsubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms)) +func waku_filter_unsubscribe(ctx unsafe.Pointer, filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, cb, userData) + } + + err = library.FilterUnsubscribe(instance, C.GoString(filterJSON), C.GoString(peerID), int(ms)) return onError(err, cb, userData) } @@ -67,8 +77,8 @@ func waku_filter_unsubscribe(filterJSON *C.char, peerID *C.char, ms C.int, cb C. // (in milliseconds) is reached, or an error will be returned // //export waku_filter_unsubscribe_all -func waku_filter_unsubscribe_all(peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - return library.FilterUnsubscribeAll(C.GoString(peerID), int(ms)) - }, cb, userData) +func waku_filter_unsubscribe_all(ctx unsafe.Pointer, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + return library.FilterUnsubscribeAll(instance, C.GoString(peerID), int(ms)) + }, ctx, cb, userData) } diff --git a/library/c/api_legacy_filter.go b/library/c/api_legacy_filter.go index a88bc529f..68b214293 100644 --- a/library/c/api_legacy_filter.go +++ b/library/c/api_legacy_filter.go @@ -27,8 +27,13 @@ import ( // (in milliseconds) is reached, or an error will be returned // //export waku_legacy_filter_subscribe -func waku_legacy_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.LegacyFilterSubscribe(C.GoString(filterJSON), C.GoString(peerID), int(ms)) +func waku_legacy_filter_subscribe(ctx unsafe.Pointer, filterJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, cb, userData) + } + + err = library.LegacyFilterSubscribe(instance, C.GoString(filterJSON), C.GoString(peerID), int(ms)) return onError(err, cb, userData) } @@ -48,7 +53,12 @@ func waku_legacy_filter_subscribe(filterJSON *C.char, peerID *C.char, ms C.int, // (in milliseconds) is reached, or an error will be returned // //export waku_legacy_filter_unsubscribe -func waku_legacy_filter_unsubscribe(filterJSON *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.LegacyFilterUnsubscribe(C.GoString(filterJSON), int(ms)) +func waku_legacy_filter_unsubscribe(ctx unsafe.Pointer, filterJSON *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, cb, userData) + } + + err = library.LegacyFilterUnsubscribe(instance, C.GoString(filterJSON), int(ms)) return onError(err, cb, userData) } diff --git a/library/c/api_lightpush.go b/library/c/api_lightpush.go index 909d62f83..7edb6daae 100644 --- a/library/c/api_lightpush.go +++ b/library/c/api_lightpush.go @@ -16,8 +16,8 @@ import ( // (in milliseconds) is reached, or an error will be returned // //export waku_lightpush_publish -func waku_lightpush_publish(messageJSON *C.char, topic *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - return library.LightpushPublish(C.GoString(messageJSON), C.GoString(topic), C.GoString(peerID), int(ms)) - }, cb, userData) +func waku_lightpush_publish(ctx unsafe.Pointer, messageJSON *C.char, topic *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + return library.LightpushPublish(instance, C.GoString(messageJSON), C.GoString(topic), C.GoString(peerID), int(ms)) + }, ctx, cb, userData) } diff --git a/library/c/api_relay.go b/library/c/api_relay.go index 2b88af307..5250bf5f6 100644 --- a/library/c/api_relay.go +++ b/library/c/api_relay.go @@ -14,14 +14,14 @@ import ( // to verify the number of peers in the default pubsub topic // //export waku_relay_enough_peers -func waku_relay_enough_peers(topic *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - result, err := library.RelayEnoughPeers(C.GoString(topic)) +func waku_relay_enough_peers(ctx unsafe.Pointer, topic *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + result, err := library.RelayEnoughPeers(instance, C.GoString(topic)) if result { return "true", err } return "false", err - }, cb, userData) + }, ctx, cb, userData) } // Publish a message using waku relay and returns the message ID. Use NULL for topic to derive the pubsub topic from the contentTopic. @@ -29,10 +29,10 @@ func waku_relay_enough_peers(topic *C.char, cb C.WakuCallBack, userData unsafe.P // (in milliseconds) is reached, or an error will be returned. // //export waku_relay_publish -func waku_relay_publish(messageJSON *C.char, topic *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - return library.RelayPublish(C.GoString(messageJSON), C.GoString(topic), int(ms)) - }, cb, userData) +func waku_relay_publish(ctx unsafe.Pointer, messageJSON *C.char, topic *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + return library.RelayPublish(instance, C.GoString(messageJSON), C.GoString(topic), int(ms)) + }, ctx, cb, userData) } // Subscribe to WakuRelay to receive messages matching a content filter. @@ -47,8 +47,13 @@ func waku_relay_publish(messageJSON *C.char, topic *C.char, ms C.int, cb C.WakuC // the message was received // //export waku_relay_subscribe -func waku_relay_subscribe(filterJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.RelaySubscribe(C.GoString(filterJSON)) +func waku_relay_subscribe(ctx unsafe.Pointer, filterJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, cb, userData) + } + + err = library.RelaySubscribe(instance, C.GoString(filterJSON)) return onError(err, cb, userData) } @@ -56,10 +61,10 @@ func waku_relay_subscribe(filterJSON *C.char, cb C.WakuCallBack, userData unsafe // is subscribed to in WakuRelay // //export waku_relay_topics -func waku_relay_topics(cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - return library.RelayTopics() - }, cb, userData) +func waku_relay_topics(ctx unsafe.Pointer, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + return library.RelayTopics(instance) + }, ctx, cb, userData) } // Closes the pubsub subscription to stop receiving messages matching a content filter @@ -71,7 +76,12 @@ func waku_relay_topics(cb C.WakuCallBack, userData unsafe.Pointer) C.int { // } // //export waku_relay_unsubscribe -func waku_relay_unsubscribe(filterJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - err := library.RelayUnsubscribe(C.GoString(filterJSON)) +func waku_relay_unsubscribe(ctx unsafe.Pointer, filterJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + instance, err := getInstance(ctx) + if err != nil { + onError(err, cb, userData) + } + + err = library.RelayUnsubscribe(instance, C.GoString(filterJSON)) return onError(err, cb, userData) } diff --git a/library/c/api_store.go b/library/c/api_store.go index f9e82ff46..2ae23e8a2 100644 --- a/library/c/api_store.go +++ b/library/c/api_store.go @@ -39,10 +39,10 @@ import ( // (in milliseconds) is reached, or an error will be returned // //export waku_store_query -func waku_store_query(queryJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - return library.StoreQuery(C.GoString(queryJSON), C.GoString(peerID), int(ms)) - }, cb, userData) +func waku_store_query(ctx unsafe.Pointer, queryJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + return library.StoreQuery(instance, C.GoString(queryJSON), C.GoString(peerID), int(ms)) + }, ctx, cb, userData) } // Query historic messages stored in the localDB using waku store protocol. @@ -72,8 +72,8 @@ func waku_store_query(queryJSON *C.char, peerID *C.char, ms C.int, cb C.WakuCall // Requires the `store` option to be passed when setting up the initial configuration // //export waku_store_local_query -func waku_store_local_query(queryJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { - return singleFnExec(func() (string, error) { - return library.StoreLocalQuery(C.GoString(queryJSON)) - }, cb, userData) +func waku_store_local_query(ctx unsafe.Pointer, queryJSON *C.char, cb C.WakuCallBack, userData unsafe.Pointer) C.int { + return singleFnExec(func(instance *library.WakuInstance) (string, error) { + return library.StoreLocalQuery(instance, C.GoString(queryJSON)) + }, ctx, cb, userData) } diff --git a/library/c/cgo_utils.go b/library/c/cgo_utils.go index 120fd1d38..fcd6e5c60 100644 --- a/library/c/cgo_utils.go +++ b/library/c/cgo_utils.go @@ -9,6 +9,8 @@ import "C" import ( "errors" "unsafe" + + "github.com/waku-org/go-waku/library" ) const ret_ok = 0 @@ -51,3 +53,12 @@ func onError(err error, cb C.WakuCallBack, userData unsafe.Pointer) C.int { C._waku_execCB(cb, C.int(retCode), nil, userData) return ret_ok } + +func getInstance(wakuCtx unsafe.Pointer) (*library.WakuInstance, error) { + pid := (*uint)(wakuCtx) + if pid == nil { + return nil, errors.New("invalid waku context") + } + + return library.GetInstance(*pid) +} diff --git a/library/discovery.go b/library/discovery.go index 747064bf9..c53a330bc 100644 --- a/library/discovery.go +++ b/library/discovery.go @@ -58,34 +58,37 @@ func DNSDiscovery(url string, nameserver string, ms int) (string, error) { } // StartDiscoveryV5 starts discv5 discovery -func StartDiscoveryV5() error { - if wakuState.node == nil { - return errWakuNodeNotReady +func StartDiscoveryV5(instance *WakuInstance) error { + if err := validateInstance(instance, MustBeStarted); err != nil { + return err } - if wakuState.node.DiscV5() == nil { + + if instance.node.DiscV5() == nil { return errors.New("DiscV5 is not mounted") } - return wakuState.node.DiscV5().Start(context.Background()) + return instance.node.DiscV5().Start(instance.ctx) } // StopDiscoveryV5 stops discv5 discovery -func StopDiscoveryV5() error { - if wakuState.node == nil { - return errWakuNodeNotReady +func StopDiscoveryV5(instance *WakuInstance) error { + if err := validateInstance(instance, MustBeStarted); err != nil { + return err } - if wakuState.node.DiscV5() == nil { + + if instance.node.DiscV5() == nil { return errors.New("DiscV5 is not mounted") } - wakuState.node.DiscV5().Stop() + instance.node.DiscV5().Stop() return nil } // SetBootnodes is used to update the bootnodes receiving a JSON array of ENRs -func SetBootnodes(bootnodes string) error { - if wakuState.node == nil { - return errWakuNodeNotReady +func SetBootnodes(instance *WakuInstance, bootnodes string) error { + if err := validateInstance(instance, MustBeStarted); err != nil { + return err } - if wakuState.node.DiscV5() == nil { + + if instance.node.DiscV5() == nil { return errors.New("DiscV5 is not mounted") } @@ -112,5 +115,5 @@ func SetBootnodes(bootnodes string) error { nodes = append(nodes, node) } - return wakuState.node.DiscV5().SetBootnodes(nodes) + return instance.node.DiscV5().SetBootnodes(nodes) } diff --git a/library/filter.go b/library/filter.go index f66d2d988..d806ba876 100644 --- a/library/filter.go +++ b/library/filter.go @@ -36,24 +36,24 @@ type subscribeResult struct { } // FilterSubscribe is used to create a subscription to a filter node to receive messages -func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) { +func FilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) (string, error) { cf, err := toContentFilter(filterJSON) if err != nil { return "", err } - if wakuState.node == nil { - return "", errWakuNodeNotReady + if err := validateInstance(instance, MustBeStarted); err != nil { + return "", err } var ctx context.Context var cancel context.CancelFunc if ms > 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) defer cancel() } else { - ctx = context.Background() + ctx = instance.ctx } var fOptions []filter.FilterSubscribeOption @@ -67,7 +67,7 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) { fOptions = append(fOptions, filter.WithAutomaticPeerSelection()) } - subscriptions, err := wakuState.node.FilterLightnode().Subscribe(ctx, cf, fOptions...) + subscriptions, err := instance.node.FilterLightnode().Subscribe(ctx, cf, fOptions...) if err != nil && subscriptions == nil { return "", err } @@ -75,7 +75,7 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) { for _, subscriptionDetails := range subscriptions { go func(subscriptionDetails *subscription.SubscriptionDetails) { for envelope := range subscriptionDetails.C { - send("message", toSubscriptionMessage(envelope)) + send(instance, "message", toSubscriptionMessage(envelope)) } }(subscriptionDetails) } @@ -89,19 +89,19 @@ func FilterSubscribe(filterJSON string, peerID string, ms int) (string, error) { } // FilterPing is used to determine if a peer has an active subscription -func FilterPing(peerID string, ms int) error { - if wakuState.node == nil { - return errWakuNodeNotReady +func FilterPing(instance *WakuInstance, peerID string, ms int) error { + if err := validateInstance(instance, MustBeStarted); err != nil { + return err } var ctx context.Context var cancel context.CancelFunc if ms > 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) defer cancel() } else { - ctx = context.Background() + ctx = instance.ctx } var pID peer.ID @@ -115,28 +115,28 @@ func FilterPing(peerID string, ms int) error { return errors.New("peerID is required") } - return wakuState.node.FilterLightnode().Ping(ctx, pID) + return instance.node.FilterLightnode().Ping(ctx, pID) } // FilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node -func FilterUnsubscribe(filterJSON string, peerID string, ms int) error { +func FilterUnsubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) error { cf, err := toContentFilter(filterJSON) if err != nil { return err } - if wakuState.node == nil { - return errWakuNodeNotReady + if err := validateInstance(instance, MustBeStarted); err != nil { + return err } var ctx context.Context var cancel context.CancelFunc if ms > 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) defer cancel() } else { - ctx = context.Background() + ctx = instance.ctx } var fOptions []filter.FilterSubscribeOption @@ -150,7 +150,7 @@ func FilterUnsubscribe(filterJSON string, peerID string, ms int) error { return errors.New("peerID is required") } - result, err := wakuState.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...) + result, err := instance.node.FilterLightnode().Unsubscribe(ctx, cf, fOptions...) if err != nil { return err } @@ -168,19 +168,19 @@ type unsubscribeAllResult struct { } // FilterUnsubscribeAll is used to remove an active subscription to a peer. If no peerID is defined, it will stop all active filter subscriptions -func FilterUnsubscribeAll(peerID string, ms int) (string, error) { - if wakuState.node == nil { - return "", errWakuNodeNotReady +func FilterUnsubscribeAll(instance *WakuInstance, peerID string, ms int) (string, error) { + if err := validateInstance(instance, MustBeStarted); err != nil { + return "", err } var ctx context.Context var cancel context.CancelFunc if ms > 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) defer cancel() } else { - ctx = context.Background() + ctx = instance.ctx } var fOptions []filter.FilterSubscribeOption @@ -194,7 +194,7 @@ func FilterUnsubscribeAll(peerID string, ms int) (string, error) { fOptions = append(fOptions, filter.UnsubscribeAll()) } - result, err := wakuState.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...) + result, err := instance.node.FilterLightnode().UnsubscribeAll(ctx, fOptions...) if err != nil { return "", err } diff --git a/library/legacy_filter.go b/library/legacy_filter.go index e60c4993a..41433c682 100644 --- a/library/legacy_filter.go +++ b/library/legacy_filter.go @@ -34,24 +34,24 @@ func toLegacyContentFilter(filterJSON string) (legacy_filter.ContentFilter, erro // LegacyFilterSubscribe is used to create a subscription to a filter node to receive messages // Deprecated: Use FilterSubscribe instead -func LegacyFilterSubscribe(filterJSON string, peerID string, ms int) error { +func LegacyFilterSubscribe(instance *WakuInstance, filterJSON string, peerID string, ms int) error { cf, err := toLegacyContentFilter(filterJSON) if err != nil { return err } - if wakuState.node == nil { - return errWakuNodeNotReady + if err := validateInstance(instance, MustBeStarted); err != nil { + return err } var ctx context.Context var cancel context.CancelFunc if ms > 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) defer cancel() } else { - ctx = context.Background() + ctx = instance.ctx } var fOptions []legacy_filter.FilterSubscribeOption @@ -65,14 +65,14 @@ func LegacyFilterSubscribe(filterJSON string, peerID string, ms int) error { fOptions = append(fOptions, legacy_filter.WithAutomaticPeerSelection()) } - _, f, err := wakuState.node.LegacyFilter().Subscribe(ctx, cf, fOptions...) + _, f, err := instance.node.LegacyFilter().Subscribe(ctx, cf, fOptions...) if err != nil { return err } go func(f legacy_filter.Filter) { for envelope := range f.Chan { - send("message", toSubscriptionMessage(envelope)) + send(instance, "message", toSubscriptionMessage(envelope)) } }(f) @@ -81,25 +81,25 @@ func LegacyFilterSubscribe(filterJSON string, peerID string, ms int) error { // LegacyFilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node // Deprecated: Use FilterUnsubscribe or FilterUnsubscribeAll instead -func LegacyFilterUnsubscribe(filterJSON string, ms int) error { +func LegacyFilterUnsubscribe(instance *WakuInstance, filterJSON string, ms int) error { cf, err := toLegacyContentFilter(filterJSON) if err != nil { return err } - if wakuState.node == nil { - return errWakuNodeNotReady + if err := validateInstance(instance, MustBeStarted); err != nil { + return err } var ctx context.Context var cancel context.CancelFunc if ms > 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) defer cancel() } else { - ctx = context.Background() + ctx = instance.ctx } - return wakuState.node.LegacyFilter().UnsubscribeFilter(ctx, cf) + return instance.node.LegacyFilter().UnsubscribeFilter(ctx, cf) } diff --git a/library/lightpush.go b/library/lightpush.go index 62bc2bee8..39ebd0f40 100644 --- a/library/lightpush.go +++ b/library/lightpush.go @@ -11,19 +11,19 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" ) -func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms int) (string, error) { - if wakuState.node == nil { - return "", errWakuNodeNotReady +func lightpushPublish(instance *WakuInstance, msg *pb.WakuMessage, pubsubTopic string, peerID string, ms int) (string, error) { + if err := validateInstance(instance, MustBeStarted); err != nil { + return "", err } var ctx context.Context var cancel context.CancelFunc if ms > 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) defer cancel() } else { - ctx = context.Background() + ctx = instance.ctx } var lpOptions []lightpush.Option @@ -41,16 +41,16 @@ func lightpushPublish(msg *pb.WakuMessage, pubsubTopic string, peerID string, ms lpOptions = append(lpOptions, lightpush.WithPubSubTopic(pubsubTopic)) } - hash, err := wakuState.node.Lightpush().Publish(ctx, msg, lpOptions...) + hash, err := instance.node.Lightpush().Publish(ctx, msg, lpOptions...) return hexutil.Encode(hash), err } // LightpushPublish is used to publish a WakuMessage in a pubsub topic using Lightpush protocol -func LightpushPublish(messageJSON string, pubsubTopic string, peerID string, ms int) (string, error) { +func LightpushPublish(instance *WakuInstance, messageJSON string, pubsubTopic string, peerID string, ms int) (string, error) { msg, err := wakuMessage(messageJSON) if err != nil { return "", err } - return lightpushPublish(msg, pubsubTopic, peerID, ms) + return lightpushPublish(instance, msg, pubsubTopic, peerID, ms) } diff --git a/library/mobile/api.go b/library/mobile/api.go index 8a8d3b66b..9a0f3829d 100644 --- a/library/mobile/api.go +++ b/library/mobile/api.go @@ -7,68 +7,135 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol" ) -// NewNode initializes a waku node. Receives a JSON string containing the configuration, and use default values for those config items not specified -func NewNode(configJSON string) string { - err := library.NewNode(configJSON) - return makeJSONResponse(err) +// NewNode initializes a waku node. +// Receives a JSON string containing the configuration, and use default values for those config items not specified +// Returns an instance id +func NewNode(instanceID uint, configJSON string) string { + instance := library.Init() + err := library.NewNode(instance, configJSON) + if err != nil { + _ = library.Free(instance) + } + return prepareJSONResponse(instance.ID, err) } // Start starts the waku node -func Start() string { - err := library.Start() +func Start(instanceID uint) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.Start(instance) return makeJSONResponse(err) } // Stop stops a waku node -func Stop() string { - err := library.Stop() +func Stop(instanceID uint) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.Stop(instance) + return makeJSONResponse(err) +} + +// Release resources allocated to a waku node +func Free(instanceID uint) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.Free(instance) return makeJSONResponse(err) } // IsStarted is used to determine is a node is started or not -func IsStarted() string { - return prepareJSONResponse(library.IsStarted(), nil) +func IsStarted(instanceID uint) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + return prepareJSONResponse(library.IsStarted(instance), nil) } // PeerID is used to obtain the peer ID of the waku node -func PeerID() string { - peerID, err := library.PeerID() +func PeerID(instanceID uint) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + peerID, err := library.PeerID(instance) return prepareJSONResponse(peerID, err) } // ListenAddresses returns the multiaddresses the wakunode is listening to -func ListenAddresses() string { - addresses, err := library.ListenAddresses() +func ListenAddresses(instanceID uint) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + addresses, err := library.ListenAddresses(instance) return prepareJSONResponse(addresses, err) } // AddPeer adds a node multiaddress and protocol to the wakunode peerstore -func AddPeer(address string, protocolID string) string { - peerID, err := library.AddPeer(address, protocolID) +func AddPeer(instanceID uint, address string, protocolID string) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + peerID, err := library.AddPeer(instance, address, protocolID) return prepareJSONResponse(peerID, err) } // Connect is used to connect to a peer at multiaddress. if ms > 0, cancel the function execution if it takes longer than N milliseconds -func Connect(address string, ms int) string { - err := library.Connect(address, ms) +func Connect(instanceID uint, address string, ms int) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.Connect(instance, address, ms) return makeJSONResponse(err) } // ConnectPeerID is usedd to connect to a known peer by peerID. if ms > 0, cancel the function execution if it takes longer than N milliseconds -func ConnectPeerID(peerID string, ms int) string { - err := library.ConnectPeerID(peerID, ms) +func ConnectPeerID(instanceID uint, peerID string, ms int) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.ConnectPeerID(instance, peerID, ms) return makeJSONResponse(err) } // Disconnect closes a connection to a known peer by peerID -func Disconnect(peerID string) string { - err := library.Disconnect(peerID) +func Disconnect(instanceID uint, peerID string) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.Disconnect(instance, peerID) return makeJSONResponse(err) } // PeerCnt returns the number of connected peers -func PeerCnt() string { - peerCnt, err := library.PeerCnt() +func PeerCnt(instanceID uint) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + peerCnt, err := library.PeerCnt(instance) return prepareJSONResponse(peerCnt, err) } @@ -84,7 +151,12 @@ func DefaultPubsubTopic() string { } // Peers retrieves the list of peers known by the waku node -func Peers() string { - peers, err := library.Peers() +func Peers(instanceID uint) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + peers, err := library.Peers(instance) return prepareJSONResponse(peers, err) } diff --git a/library/mobile/api_discovery.go b/library/mobile/api_discovery.go index 679803aff..06c1b6340 100644 --- a/library/mobile/api_discovery.go +++ b/library/mobile/api_discovery.go @@ -11,19 +11,34 @@ func DNSDiscovery(url string, nameserver string, ms int) string { } // StartDiscoveryV5 starts discv5 discovery -func StartDiscoveryV5() string { - err := library.StartDiscoveryV5() +func StartDiscoveryV5(instanceID uint) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.StartDiscoveryV5(instance) return makeJSONResponse(err) } // StopDiscoveryV5 stops discv5 discovery -func StopDiscoveryV5() string { - err := library.StopDiscoveryV5() +func StopDiscoveryV5(instanceID uint) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.StopDiscoveryV5(instance) return makeJSONResponse(err) } // SetBootnodes is used to update the bootnodes receiving a JSON array of ENRs -func SetBootnodes(bootnodes string) string { - err := library.SetBootnodes(bootnodes) +func SetBootnodes(instanceID uint, bootnodes string) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.SetBootnodes(instance, bootnodes) return makeJSONResponse(err) } diff --git a/library/mobile/api_filter.go b/library/mobile/api_filter.go index 28a29701d..4e75153e9 100644 --- a/library/mobile/api_filter.go +++ b/library/mobile/api_filter.go @@ -5,25 +5,45 @@ import ( ) // FilterSubscribe is used to create a subscription to a filter node to receive messages -func FilterSubscribe(filterJSON string, peerID string, ms int) string { - response, err := library.FilterSubscribe(filterJSON, peerID, ms) +func FilterSubscribe(instanceID uint, filterJSON string, peerID string, ms int) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + response, err := library.FilterSubscribe(instance, filterJSON, peerID, ms) return prepareJSONResponse(response, err) } // FilterPing is used to determine if a peer has an active subscription -func FilterPing(peerID string, ms int) string { - err := library.FilterPing(peerID, ms) +func FilterPing(instanceID uint, peerID string, ms int) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.FilterPing(instance, peerID, ms) return makeJSONResponse(err) } // FilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node -func FilterUnsubscribe(filterJSON string, peerID string, ms int) string { - err := library.FilterUnsubscribe(filterJSON, peerID, ms) +func FilterUnsubscribe(instanceID uint, filterJSON string, peerID string, ms int) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.FilterUnsubscribe(instance, filterJSON, peerID, ms) return makeJSONResponse(err) } // FilterUnsubscribeAll is used to remove an active subscription to a peer. If no peerID is defined, it will stop all active filter subscriptions -func FilterUnsubscribeAll(peerID string, ms int) string { - response, err := library.FilterUnsubscribeAll(peerID, ms) +func FilterUnsubscribeAll(instanceID uint, peerID string, ms int) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + response, err := library.FilterUnsubscribeAll(instance, peerID, ms) return prepareJSONResponse(response, err) } diff --git a/library/mobile/api_legacy_filter.go b/library/mobile/api_legacy_filter.go index ab154f0ee..43382e95d 100644 --- a/library/mobile/api_legacy_filter.go +++ b/library/mobile/api_legacy_filter.go @@ -6,14 +6,24 @@ import ( // LegacyFilterSubscribe is used to create a subscription to a filter node to receive messages // Deprecated: Use FilterSubscribe instead -func LegacyFilterSubscribe(filterJSON string, peerID string, ms int) string { - err := library.LegacyFilterSubscribe(filterJSON, peerID, ms) +func LegacyFilterSubscribe(instanceID uint, filterJSON string, peerID string, ms int) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.LegacyFilterSubscribe(instance, filterJSON, peerID, ms) return makeJSONResponse(err) } // LegacyFilterUnsubscribe is used to remove a filter criteria from an active subscription with a filter node // Deprecated: Use FilterUnsubscribe or FilterUnsubscribeAll instead -func LegacyFilterUnsubscribe(filterJSON string, ms int) string { - err := library.LegacyFilterUnsubscribe(filterJSON, ms) +func LegacyFilterUnsubscribe(instanceID uint, filterJSON string, ms int) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.LegacyFilterUnsubscribe(instance, filterJSON, ms) return makeJSONResponse(err) } diff --git a/library/mobile/api_lightpush.go b/library/mobile/api_lightpush.go index 54c5b3136..0379ae3a0 100644 --- a/library/mobile/api_lightpush.go +++ b/library/mobile/api_lightpush.go @@ -3,7 +3,12 @@ package gowaku import "github.com/waku-org/go-waku/library" // LightpushPublish is used to publish a WakuMessage in a pubsub topic using Lightpush protocol -func LightpushPublish(messageJSON string, topic string, peerID string, ms int) string { - response, err := library.LightpushPublish(messageJSON, topic, peerID, ms) +func LightpushPublish(instanceID uint, messageJSON string, topic string, peerID string, ms int) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + response, err := library.LightpushPublish(instance, messageJSON, topic, peerID, ms) return prepareJSONResponse(response, err) } diff --git a/library/mobile/api_relay.go b/library/mobile/api_relay.go index 9a127d7f2..c8a40d68f 100644 --- a/library/mobile/api_relay.go +++ b/library/mobile/api_relay.go @@ -5,31 +5,56 @@ import ( ) // RelayEnoughPeers determines if there are enough peers to publish a message on a topic -func RelayEnoughPeers(topic string) string { - response, err := library.RelayEnoughPeers(topic) +func RelayEnoughPeers(instanceID uint, topic string) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + response, err := library.RelayEnoughPeers(instance, topic) return prepareJSONResponse(response, err) } // RelayPublish publishes a message using waku relay and returns the message ID -func RelayPublish(messageJSON string, topic string, ms int) string { - hash, err := library.RelayPublish(messageJSON, topic, ms) +func RelayPublish(instanceID uint, messageJSON string, topic string, ms int) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + hash, err := library.RelayPublish(instance, messageJSON, topic, ms) return prepareJSONResponse(hash, err) } // RelaySubscribe subscribes to a WakuRelay topic. -func RelaySubscribe(topic string) string { - err := library.RelaySubscribe(topic) +func RelaySubscribe(instanceID uint, topic string) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.RelaySubscribe(instance, topic) return makeJSONResponse(err) } // RelayTopics returns a list of pubsub topics the node is subscribed to in WakuRelay -func RelayTopics() string { - topics, err := library.RelayTopics() +func RelayTopics(instanceID uint) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + topics, err := library.RelayTopics(instance) return prepareJSONResponse(topics, err) } // RelayUnsubscribe closes the pubsub subscription to a pubsub topic -func RelayUnsubscribe(topic string) string { - err := library.RelayUnsubscribe(topic) +func RelayUnsubscribe(instanceID uint, topic string) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + err = library.RelayUnsubscribe(instance, topic) return makeJSONResponse(err) } diff --git a/library/mobile/api_store.go b/library/mobile/api_store.go index 9eb35426e..01ad25084 100644 --- a/library/mobile/api_store.go +++ b/library/mobile/api_store.go @@ -5,13 +5,23 @@ import ( ) // StoreQuery is used to retrieve historic messages using waku store protocol. -func StoreQuery(queryJSON string, peerID string, ms int) string { - response, err := library.StoreQuery(queryJSON, peerID, ms) +func StoreQuery(instanceID uint, queryJSON string, peerID string, ms int) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + response, err := library.StoreQuery(instance, queryJSON, peerID, ms) return prepareJSONResponse(response, err) } // StoreLocalQuery is used to retrieve historic messages stored in the localDB using waku store protocol. -func StoreLocalQuery(queryJSON string) string { - response, err := library.StoreLocalQuery(queryJSON) +func StoreLocalQuery(instanceID uint, queryJSON string) string { + instance, err := library.GetInstance(instanceID) + if err != nil { + return makeJSONResponse(err) + } + + response, err := library.StoreLocalQuery(instance, queryJSON) return prepareJSONResponse(response, err) } diff --git a/library/mobile/signals.go b/library/mobile/signals.go index 75225c6fe..1b6c5810d 100644 --- a/library/mobile/signals.go +++ b/library/mobile/signals.go @@ -12,8 +12,13 @@ type SignalHandler interface { // SetMobileSignalHandler setup geth callback to notify about new signal // used for gomobile builds // nolint -func SetMobileSignalHandler(handler SignalHandler) { - library.SetMobileSignalHandler(func(data []byte) { +func SetMobileSignalHandler(instanceID uint, handler SignalHandler) { + instance, err := library.GetInstance(instanceID) + if err != nil { + panic(err.Error()) // TODO: refactor to return an error instead + } + + library.SetMobileSignalHandler(instance, func(data []byte) { if len(data) > 0 { handler.HandleSignal(string(data)) } diff --git a/library/node.go b/library/node.go index cc8959c2b..0ebbde998 100644 --- a/library/node.go +++ b/library/node.go @@ -9,7 +9,9 @@ import ( "errors" "fmt" "net" + "sync" "time" + "unsafe" "go.uber.org/zap/zapcore" @@ -29,19 +31,31 @@ import ( "github.com/waku-org/go-waku/waku/v2/utils" ) -// WakuState represents the state of the waku node -type WakuState struct { +// WakuInstance represents the state of the waku node +type WakuInstance struct { ctx context.Context cancel context.CancelFunc + ID uint - node *node.WakuNode + node *node.WakuNode + cb unsafe.Pointer + mobileSignalHandler MobileSignalHandler relayTopics []string } -var wakuState WakuState +var wakuInstances map[uint]*WakuInstance +var wakuInstancesMutex sync.RWMutex -var errWakuNodeNotReady = errors.New("go-waku not initialized") +var errWakuNodeNotReady = errors.New("not initialized") +var errWakuNodeAlreadyConfigured = errors.New("already configured") +var errWakuNodeNotConfigured = errors.New("not configured") +var errWakuAlreadyStarted = errors.New("already started") +var errWakuNodeNotStarted = errors.New("not started") + +func init() { + wakuInstances = make(map[uint]*WakuInstance) +} func randomHex(n int) (string, error) { bytes := make([]byte, n) @@ -51,10 +65,72 @@ func randomHex(n int) (string, error) { return hex.EncodeToString(bytes), nil } +func Init() *WakuInstance { + wakuInstancesMutex.Lock() + defer wakuInstancesMutex.Unlock() + + id := uint(len(wakuInstances)) + instance := &WakuInstance{ + ID: id, + } + wakuInstances[id] = instance + return instance +} + +func GetInstance(id uint) (*WakuInstance, error) { + wakuInstancesMutex.RLock() + defer wakuInstancesMutex.RUnlock() + + instance, ok := wakuInstances[id] + if !ok { + return nil, errors.New("instance not found") + } + + return instance, nil +} + +type ValidationType int64 + +const ( + None ValidationType = iota + MustBeStarted ValidationType = iota + MustBeStopped + NotConfigured +) + +func validateInstance(instance *WakuInstance, validationType ValidationType) error { + if instance == nil { + return errWakuNodeNotReady + } + + switch validationType { + case NotConfigured: + if instance.node != nil { + return errWakuNodeAlreadyConfigured + } + case MustBeStarted: + if instance.node == nil { + return errWakuNodeNotConfigured + } + if instance.ctx == nil { + return errWakuNodeNotStarted + } + case MustBeStopped: + if instance.node == nil { + return errWakuNodeNotConfigured + } + if instance.ctx != nil { + return errWakuAlreadyStarted + } + } + + return nil +} + // NewNode initializes a waku node. Receives a JSON string containing the configuration, and use default values for those config items not specified -func NewNode(configJSON string) error { - if wakuState.node != nil { - return errors.New("go-waku already initialized. stop it first") +func NewNode(instance *WakuInstance, configJSON string) error { + if err := validateInstance(instance, NotConfigured); err != nil { + return err } config, err := getConfig(configJSON) @@ -152,7 +228,7 @@ func NewNode(configJSON string) error { opts = append(opts, discv5Opts) } - wakuState.relayTopics = config.RelayTopics + instance.relayTopics = config.RelayTopics lvl, err := zapcore.ParseLevel(*config.LogLevel) if err != nil { @@ -167,34 +243,43 @@ func NewNode(configJSON string) error { return err } - wakuState.node = w + instance.node = w return nil } +func stop(instance *WakuInstance) { + if instance.cancel != nil { + instance.node.Stop() + instance.cancel() + instance.cancel = nil + instance.ctx = nil + } +} + // Start starts the waku node -func Start() error { - if wakuState.node == nil { - return errWakuNodeNotReady +func Start(instance *WakuInstance) error { + if err := validateInstance(instance, MustBeStopped); err != nil { + return err } - wakuState.ctx, wakuState.cancel = context.WithCancel(context.Background()) + instance.ctx, instance.cancel = context.WithCancel(context.Background()) - if err := wakuState.node.Start(wakuState.ctx); err != nil { + if err := instance.node.Start(instance.ctx); err != nil { return err } - if wakuState.node.DiscV5() != nil { - if err := wakuState.node.DiscV5().Start(context.Background()); err != nil { - wakuState.node.Stop() + if instance.node.DiscV5() != nil { + if err := instance.node.DiscV5().Start(context.Background()); err != nil { + stop(instance) return err } } - for _, topic := range wakuState.relayTopics { - err := relaySubscribe(topic) + for _, topic := range instance.relayTopics { + err := relaySubscribe(instance, topic) if err != nil { - wakuState.node.Stop() + stop(instance) return err } } @@ -203,42 +288,55 @@ func Start() error { } // Stop stops a waku node -func Stop() error { - if wakuState.node == nil { - return errWakuNodeNotReady +func Stop(instance *WakuInstance) error { + if err := validateInstance(instance, None); err != nil { + return err } - wakuState.node.Stop() + stop(instance) + + return nil +} + +// Free stops a waku instance and frees the resources allocated to a waku node +func Free(instance *WakuInstance) error { + if err := validateInstance(instance, None); err != nil { + return err + } - wakuState.cancel() + if instance.cancel != nil { + stop(instance) + } - wakuState.node = nil + wakuInstancesMutex.Lock() + defer wakuInstancesMutex.Unlock() + delete(wakuInstances, instance.ID) return nil } // IsStarted is used to determine is a node is started or not -func IsStarted() bool { - return wakuState.node != nil +func IsStarted(instance *WakuInstance) bool { + return instance != nil && instance.ctx != nil } // PeerID is used to obtain the peer ID of the waku node -func PeerID() (string, error) { - if wakuState.node == nil { - return "", errWakuNodeNotReady +func PeerID(instance *WakuInstance) (string, error) { + if err := validateInstance(instance, MustBeStarted); err != nil { + return "", err } - return wakuState.node.ID(), nil + return instance.node.ID(), nil } // ListenAddresses returns the multiaddresses the wakunode is listening to -func ListenAddresses() (string, error) { - if wakuState.node == nil { - return "", errWakuNodeNotReady +func ListenAddresses(instance *WakuInstance) (string, error) { + if err := validateInstance(instance, MustBeStarted); err != nil { + return "", err } var addresses []string - for _, addr := range wakuState.node.ListenAddresses() { + for _, addr := range instance.node.ListenAddresses() { addresses = append(addresses, addr.String()) } @@ -246,9 +344,9 @@ func ListenAddresses() (string, error) { } // AddPeer adds a node multiaddress and protocol to the wakunode peerstore -func AddPeer(address string, protocolID string) (string, error) { - if wakuState.node == nil { - return "", errWakuNodeNotReady +func AddPeer(instance *WakuInstance, address string, protocolID string) (string, error) { + if err := validateInstance(instance, MustBeStarted); err != nil { + return "", err } ma, err := multiaddr.NewMultiaddr(address) @@ -256,7 +354,7 @@ func AddPeer(address string, protocolID string) (string, error) { return "", err } - peerID, err := wakuState.node.AddPeer(ma, peerstore.Static, wakuState.relayTopics, libp2pProtocol.ID(protocolID)) + peerID, err := instance.node.AddPeer(ma, peerstore.Static, instance.relayTopics, libp2pProtocol.ID(protocolID)) if err != nil { return "", err } @@ -265,28 +363,28 @@ func AddPeer(address string, protocolID string) (string, error) { } // Connect is used to connect to a peer at multiaddress. if ms > 0, cancel the function execution if it takes longer than N milliseconds -func Connect(address string, ms int) error { - if wakuState.node == nil { - return errWakuNodeNotReady +func Connect(instance *WakuInstance, address string, ms int) error { + if err := validateInstance(instance, MustBeStarted); err != nil { + return err } var ctx context.Context var cancel context.CancelFunc if ms > 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) defer cancel() } else { - ctx = context.Background() + ctx = instance.ctx } - return wakuState.node.DialPeer(ctx, address) + return instance.node.DialPeer(ctx, address) } // ConnectPeerID is usedd to connect to a known peer by peerID. if ms > 0, cancel the function execution if it takes longer than N milliseconds -func ConnectPeerID(peerID string, ms int) error { - if wakuState.node == nil { - return errWakuNodeNotReady +func ConnectPeerID(instance *WakuInstance, peerID string, ms int) error { + if err := validateInstance(instance, MustBeStarted); err != nil { + return err } var ctx context.Context @@ -298,19 +396,19 @@ func ConnectPeerID(peerID string, ms int) error { } if ms > 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) defer cancel() } else { - ctx = context.Background() + ctx = instance.ctx } - return wakuState.node.DialPeerByID(ctx, pID) + return instance.node.DialPeerByID(ctx, pID) } // Disconnect closes a connection to a known peer by peerID -func Disconnect(peerID string) error { - if wakuState.node == nil { - return errWakuNodeNotReady +func Disconnect(instance *WakuInstance, peerID string) error { + if err := validateInstance(instance, MustBeStarted); err != nil { + return err } pID, err := peer.Decode(peerID) @@ -318,16 +416,16 @@ func Disconnect(peerID string) error { return err } - return wakuState.node.ClosePeerById(pID) + return instance.node.ClosePeerById(pID) } // PeerCnt returns the number of connected peers -func PeerCnt() (int, error) { - if wakuState.node == nil { - return 0, errWakuNodeNotReady +func PeerCnt(instance *WakuInstance) (int, error) { + if err := validateInstance(instance, MustBeStarted); err != nil { + return 0, err } - return wakuState.node.PeerCount(), nil + return instance.node.PeerCount(), nil } // ContentTopic creates a content topic string according to RFC 23 @@ -356,12 +454,12 @@ func toSubscriptionMessage(msg *protocol.Envelope) *subscriptionMsg { } // Peers retrieves the list of peers known by the waku node -func Peers() (string, error) { - if wakuState.node == nil { - return "", errWakuNodeNotReady +func Peers(instance *WakuInstance) (string, error) { + if err := validateInstance(instance, MustBeStarted); err != nil { + return "", err } - peers, err := wakuState.node.Peers() + peers, err := instance.node.Peers() if err != nil { return "", err } diff --git a/library/relay.go b/library/relay.go index 0cd2093a0..452b56bc8 100644 --- a/library/relay.go +++ b/library/relay.go @@ -11,9 +11,9 @@ import ( ) // RelayEnoughPeers determines if there are enough peers to publish a message on a topic -func RelayEnoughPeers(topic string) (bool, error) { - if wakuState.node == nil { - return false, errWakuNodeNotReady +func RelayEnoughPeers(instance *WakuInstance, topic string) (bool, error) { + if err := validateInstance(instance, MustBeStarted); err != nil { + return false, err } topicToCheck := protocol.DefaultPubsubTopic{}.String() @@ -21,45 +21,45 @@ func RelayEnoughPeers(topic string) (bool, error) { topicToCheck = topic } - return wakuState.node.Relay().EnoughPeersToPublishToTopic(topicToCheck), nil + return instance.node.Relay().EnoughPeersToPublishToTopic(topicToCheck), nil } -func relayPublish(msg *pb.WakuMessage, pubsubTopic string, ms int) (string, error) { - if wakuState.node == nil { - return "", errWakuNodeNotReady +func relayPublish(instance *WakuInstance, msg *pb.WakuMessage, pubsubTopic string, ms int) (string, error) { + if err := validateInstance(instance, MustBeStarted); err != nil { + return "", err } var ctx context.Context var cancel context.CancelFunc if ms > 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) defer cancel() } else { - ctx = context.Background() + ctx = instance.ctx } - hash, err := wakuState.node.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic)) + hash, err := instance.node.Relay().Publish(ctx, msg, relay.WithPubSubTopic(pubsubTopic)) return hexutil.Encode(hash), err } // RelayPublish publishes a message using waku relay and returns the message ID -func RelayPublish(messageJSON string, topic string, ms int) (string, error) { +func RelayPublish(instance *WakuInstance, messageJSON string, topic string, ms int) (string, error) { msg, err := wakuMessage(messageJSON) if err != nil { return "", err } - return relayPublish(msg, topic, int(ms)) + return relayPublish(instance, msg, topic, int(ms)) } -func relaySubscribe(filterJSON string) error { +func relaySubscribe(instance *WakuInstance, filterJSON string) error { cf, err := toContentFilter(filterJSON) if err != nil { return err } - subscriptions, err := wakuState.node.Relay().Subscribe(context.Background(), cf) + subscriptions, err := instance.node.Relay().Subscribe(context.Background(), cf) if err != nil { return err } @@ -67,7 +67,7 @@ func relaySubscribe(filterJSON string) error { for _, sub := range subscriptions { go func(subscription *relay.Subscription) { for envelope := range subscription.Ch { - send("message", toSubscriptionMessage(envelope)) + send(instance, "message", toSubscriptionMessage(envelope)) } }(sub) } @@ -76,33 +76,33 @@ func relaySubscribe(filterJSON string) error { } // RelaySubscribe subscribes to a WakuRelay topic. -func RelaySubscribe(contentFilterJSON string) error { - if wakuState.node == nil { - return errWakuNodeNotReady +func RelaySubscribe(instance *WakuInstance, contentFilterJSON string) error { + if err := validateInstance(instance, MustBeStarted); err != nil { + return err } - return relaySubscribe(contentFilterJSON) + return relaySubscribe(instance, contentFilterJSON) } // RelayTopics returns a list of pubsub topics the node is subscribed to in WakuRelay -func RelayTopics() (string, error) { - if wakuState.node == nil { - return "", errWakuNodeNotReady +func RelayTopics(instance *WakuInstance) (string, error) { + if err := validateInstance(instance, MustBeStarted); err != nil { + return "", err } - return marshalJSON(wakuState.node.Relay().Topics()) + return marshalJSON(instance.node.Relay().Topics()) } // RelayUnsubscribe closes the pubsub subscription to a pubsub topic -func RelayUnsubscribe(contentFilterJSON string) error { +func RelayUnsubscribe(instance *WakuInstance, contentFilterJSON string) error { cf, err := toContentFilter(contentFilterJSON) if err != nil { return err } - if wakuState.node == nil { - return errWakuNodeNotReady + if err := validateInstance(instance, MustBeStarted); err != nil { + return err } - return wakuState.node.Relay().Unsubscribe(context.Background(), cf) + return instance.node.Relay().Unsubscribe(context.Background(), cf) } diff --git a/library/signals.c b/library/signals.c index 6756a562b..1ced521b2 100644 --- a/library/signals.c +++ b/library/signals.c @@ -8,16 +8,11 @@ #include "_cgo_export.h" typedef void (*callback)(int retCode, const char *jsonEvent, void* userData); -callback gCallback = 0; -bool ServiceSignalEvent(const char *jsonEvent) { - if (gCallback) { - gCallback(0, jsonEvent, NULL); +bool ServiceSignalEvent(void *cb, const char *jsonEvent) { + if (cb) { + ((callback)cb)(0, jsonEvent, NULL); } return true; } - -void SetEventCallback(void *cb) { - gCallback = (callback)cb; -} diff --git a/library/signals.go b/library/signals.go index 514872aa5..034487aeb 100644 --- a/library/signals.go +++ b/library/signals.go @@ -4,8 +4,7 @@ package library #include #include #include -extern bool ServiceSignalEvent(const char *jsonEvent); -extern void SetEventCallback(void *cb); +extern bool ServiceSignalEvent(void *cb, const char *jsonEvent); */ import "C" @@ -36,7 +35,7 @@ func newEnvelope(signalType string, event interface{}) *signalEnvelope { } // send sends application signal (in JSON) upwards to application (via default notification handler) -func send(signalType string, event interface{}) { +func send(instance *WakuInstance, signalType string, event interface{}) { signal := newEnvelope(signalType, event) data, err := json.Marshal(&signal) @@ -45,25 +44,33 @@ func send(signalType string, event interface{}) { return } // If a Go implementation of signal handler is set, let's use it. - if mobileSignalHandler != nil { - mobileSignalHandler(data) + if instance.mobileSignalHandler != nil { + instance.mobileSignalHandler(data) } else { // ...and fallback to C implementation otherwise. dataStr := string(data) str := C.CString(dataStr) - C.ServiceSignalEvent(str) + C.ServiceSignalEvent(instance.cb, str) C.free(unsafe.Pointer(str)) } } // SetEventCallback is to set a callback in order to receive application // signals which are used to react to asynchronous events in waku. -func SetEventCallback(cb unsafe.Pointer) { - C.SetEventCallback(cb) +func SetEventCallback(instance *WakuInstance, cb unsafe.Pointer) { + if err := validateInstance(instance, None); err != nil { + panic(err.Error()) + } + + instance.cb = cb } // SetMobileSignalHandler sets the callback to be executed when a signal // is received in a mobile device -func SetMobileSignalHandler(m MobileSignalHandler) { - mobileSignalHandler = m +func SetMobileSignalHandler(instance *WakuInstance, m MobileSignalHandler) { + if err := validateInstance(instance, None); err != nil { + panic(err.Error()) + } + + instance.mobileSignalHandler = m } diff --git a/library/store.go b/library/store.go index 9c8c8c3d0..59ad2703d 100644 --- a/library/store.go +++ b/library/store.go @@ -35,8 +35,8 @@ type storeMessagesReply struct { Error string `json:"error,omitempty"` } -func queryResponse(ctx context.Context, args storeMessagesArgs, options []store.HistoryRequestOption) (string, error) { - res, err := wakuState.node.Store().Query( +func queryResponse(ctx context.Context, instance *WakuInstance, args storeMessagesArgs, options []store.HistoryRequestOption) (string, error) { + res, err := instance.node.Store().Query( ctx, store.Query{ PubsubTopic: args.Topic, @@ -64,9 +64,9 @@ func queryResponse(ctx context.Context, args storeMessagesArgs, options []store. } // StoreQuery is used to retrieve historic messages using waku store protocol. -func StoreQuery(queryJSON string, peerID string, ms int) (string, error) { - if wakuState.node == nil { - return "", errWakuNodeNotReady +func StoreQuery(instance *WakuInstance, queryJSON string, peerID string, ms int) (string, error) { + if err := validateInstance(instance, MustBeStarted); err != nil { + return "", err } var args storeMessagesArgs @@ -95,19 +95,19 @@ func StoreQuery(queryJSON string, peerID string, ms int) (string, error) { var cancel context.CancelFunc if ms > 0 { - ctx, cancel = context.WithTimeout(context.Background(), time.Duration(int(ms))*time.Millisecond) + ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond) defer cancel() } else { - ctx = context.Background() + ctx = instance.ctx } - return queryResponse(ctx, args, options) + return queryResponse(ctx, instance, args, options) } // StoreLocalQuery is used to retrieve historic messages stored in the localDB using waku store protocol. -func StoreLocalQuery(queryJSON string) (string, error) { - if wakuState.node == nil { - return "", errWakuNodeNotReady +func StoreLocalQuery(instance *WakuInstance, queryJSON string) (string, error) { + if err := validateInstance(instance, MustBeStarted); err != nil { + return "", err } var args storeMessagesArgs @@ -123,5 +123,5 @@ func StoreLocalQuery(queryJSON string) (string, error) { store.WithLocalQuery(), } - return queryResponse(context.TODO(), args, options) + return queryResponse(instance.ctx, instance, args, options) }