Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Commit

Permalink
feat(c binding unsubscribe_from_websocket)
Browse files Browse the repository at this point in the history
  • Loading branch information
niklasad1 committed Nov 17, 2018
1 parent e17c728 commit 5f7f1ae
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 119 deletions.
52 changes: 34 additions & 18 deletions parity-clib-examples/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@
#include <cstring>
#include <unistd.h>
#include <parity.h>
#include <regex>

void* parity_light();
void* parity_full_run();
int parity_subscribe_to_websocket(void*);
int parity_rpc_queries(void*);

const int SUBSCRIPTION_ID_LEN = 18;

// global variable to keep track of the received rpc responses
static int g_rpc_counter = 0;

// list of websocket queries
static const char* ws_queries[] = {
"{\"method\":\"parity_subscribe\",\"params\":[\"eth_getBalance\",[\"0x0066Dc48bb833d2B59f730F33952B3c29fE926F5\",\"latest\"]],\"id\":1,\"jsonrpc\":\"2.0\"}",
"{\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"],\"id\":1,\"jsonrpc\":\"2.0\"}"
};
// global string for callbacks
static char g_str[60];

// list of rpc queries
static const char* rpc_queries[] = {
Expand All @@ -49,13 +49,17 @@ static const char* rpc_queries[] = {
void on_restart(void*, const char*, size_t) {}

// callback that is invoked on ws responses
void ws_response(void *, const char* response, size_t len) {
printf("ws_callback: %s \r\n", response);
void ws_response(void* _unused, const char* response, size_t len) {
printf("ws_response: %s\r\n", response);
std::regex is_subscription ("\\{\"jsonrpc\":\"2.0\",\"result\":\"0[xX][a-fA-F0-9]{16}\",\"id\":1\\}");
if (std::regex_match(response, is_subscription) == true) {
strncpy(g_str, response, 55);
}
}

// callback that is invoked on ws responses
void rpc_response(void *, const char* response, size_t len) {
printf("rpc_callback: %s \r\n", response);
void rpc_response(void* _unused, const char* response, size_t len) {
printf("rpc_response: %s\r\n", response);
g_rpc_counter -= 1;
}

Expand All @@ -73,7 +77,6 @@ int main() {
}

if (parity != NULL) {
printf("parity client was err couldn't shutdown\r\n");
parity_destroy(parity);
}

Expand Down Expand Up @@ -106,16 +109,29 @@ int parity_subscribe_to_websocket(void* parity) {
return 1;
}

int num_queries = sizeof(ws_queries) / sizeof(ws_queries[0]);
size_t timeout = 1000;
int num_queries = 1;

for (int i = 0; i < num_queries; i++) {
if (parity_subscribe_ws(parity, ws_queries[i], strlen(ws_queries[i]), ws_response) != 0) {
char subscribe[] = "{\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"],\"id\":1,\"jsonrpc\":\"2.0\"}";
char unsubscribe[] = "{\"method\":\"eth_unsubscribe\",\"params\":[\"0x1234567891234567\"],\"id\":1,\"jsonrpc\":\"2.0\"}";

const void *const handle = parity_subscribe_ws(parity, subscribe, strlen(subscribe), ws_response);

if (!handle) {
return 1;
}

while(g_str[0] == 0);
sleep(60);

// Replace subscription_id with the id we got in the callback
// (this is not a good practice use your favorite JSON parser)
strncpy(&unsubscribe[39], &g_str[27], SUBSCRIPTION_ID_LEN);
if (parity_unsubscribe_ws(parity, handle, unsubscribe, strlen(unsubscribe), timeout, ws_response) != 0) {
return 1;
}
}

// wait forever
while(1);
return 0;
}

void* parity_full_run() {
Expand All @@ -128,7 +144,7 @@ void* parity_full_run() {
const char* args[] = {"--no-ipc" , "--jsonrpc-apis=all", "--chain", "kovan"};
size_t str_lens[] = {strlen(args[0]), strlen(args[1]), strlen(args[2]), strlen(args[3])};

if (parity_config_from_cli(args, str_lens, sizeof(str_lens)/sizeof(str_lens[0]), &cfg.configuration) != 0) {
if (parity_config_from_cli(args, str_lens, sizeof(str_lens) / sizeof(str_lens[0]), &cfg.configuration) != 0) {
return nullptr;
}

Expand All @@ -150,7 +166,7 @@ void* parity_light_run() {
const char* args[] = {"--light", "--no-ipc","--chain", "kovan", "--jsonrpc-apis=all"};
size_t str_lens[] = {strlen(args[0]), strlen(args[1]), strlen(args[2]), strlen(args[3]), strlen(args[4])};

if (parity_config_from_cli(args, str_lens, sizeof(str_lens)/sizeof(str_lens[0]), &cfg.configuration) != 0) {
if (parity_config_from_cli(args, str_lens, sizeof(str_lens) / sizeof(str_lens[0]), &cfg.configuration) != 0) {
return nullptr;
}

Expand Down
34 changes: 25 additions & 9 deletions parity-clib/parity.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include <stddef.h>

typedef void (subscribe)(void*, const char*, size_t);
typedef void (callback)(void*, const char*, size_t);

/// Parameters to pass to `parity_start`.
struct ParityParams {
Expand All @@ -33,7 +33,7 @@ struct ParityParams {
///
/// The first parameter of the callback is the value of `on_client_restart_cb_custom`.
/// The second and third parameters of the callback are the string pointer and length.
subscribe *on_client_restart_cb;
callback *on_client_restart_cb;

/// Custom parameter passed to the `on_client_restart_cb` callback as first parameter.
void *on_client_restart_cb_custom;
Expand Down Expand Up @@ -103,21 +103,37 @@ void parity_destroy(void* parity);
/// - On success : The parity client reference and the query string were valid
/// - On error : The parity client reference and the query string were not valid
///
int parity_rpc(void* parity, const char* rpc_query, size_t rpc_len, size_t timeout_ms, subscribe response);
int parity_rpc(const void *const parity, const char* rpc_query, size_t rpc_len, size_t timeout_ms, callback response);


/// Subscribes to a specific websocket event
/// FIXME: provide functionality to cancel a "subscription"
/// Subscribes to the specified websocket event
///
/// - parity : Reference to the running parity client
/// - ws_query : JSON encoded string representing the websocket and which event to subscribe to
/// - len : Length of the queury
/// - response : Callback to invoke when a websocket event occured
///
/// - On success : The function returns a callback with a JSON encoded string
/// - On error : The function returns a callback with the error (empty or timeout)
/// - On success : The function returns the underlying pointer to a atomic reference counter of the session object
// which should later be used cancel the subscription
/// - On error : The function returns a null pointer
///
int parity_subscribe_ws(void* parity, const char* ws_query, size_t len, subscribe response);
const void *const parity_subscribe_ws(const void *const parity, const char* ws_query, size_t len, callback response);

/// Unsubscribes from a specific websocket event. Caution this function consumes the session object and must only be
/// exactly per session.
///
/// - parity : Reference to the running parity client
/// - session : Underlying pointer to an atomic reference counter
/// - ws_query : JSON encoded string representing the websocket event to unsubscribe from
/// - len : Length of the query
// - timeout : Maximum time in milliseconds to wait for a response
/// - response : Callback to invoke when a websocket event is received
///
/// - On success : The function return 0
/// - On error : The function returns non-zero
//
int parity_unsubscribe_ws(const void *const parity, const void *const session, const char* ws_query,
size_t len, size_t timeout, callback response);

/// Sets a callback to call when a panic happens in the Rust code.
///
Expand All @@ -134,7 +150,7 @@ int parity_subscribe_ws(void* parity, const char* ws_query, size_t len, subscrib
/// The callback can be called from any thread and multiple times simultaneously. Make sure that
/// your code is thread safe.
///
int parity_set_panic_hook(subscribe panic, void* param);
int parity_set_panic_hook(callback panic, void* param);

#ifdef __cplusplus
}
Expand Down
Loading

0 comments on commit 5f7f1ae

Please sign in to comment.