diff --git a/Cargo.lock b/Cargo.lock
index 47763e75121..0f5c445e289 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2328,9 +2328,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
name = "parity-clib"
version = "1.12.0"
dependencies = [
+ "futures 0.1.25 (registry+https://github.com/rust-lang/crates.io-index)",
"jni 0.10.2 (registry+https://github.com/rust-lang/crates.io-index)",
"panic_hook 0.1.0",
"parity-ethereum 2.3.0",
+ "tokio 0.1.11 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-current-thread 0.1.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
diff --git a/parity-clib/Cargo.toml b/parity-clib/Cargo.toml
index 5397225eab2..b3635c6e08f 100644
--- a/parity-clib/Cargo.toml
+++ b/parity-clib/Cargo.toml
@@ -10,9 +10,12 @@ name = "parity"
crate-type = ["cdylib", "staticlib"]
[dependencies]
+futures = "0.1.6"
+jni = { version = "0.10.1", optional = true }
panic_hook = { path = "../util/panic-hook" }
parity-ethereum = { path = "../", default-features = false }
-jni = { version = "0.10.1", optional = true }
+tokio = "0.1.11"
+tokio-current-thread = "0.1.3"
[features]
default = []
diff --git a/parity-clib/Parity.java b/parity-clib/Parity.java
index 7e70917ae36..37a6722b781 100644
--- a/parity-clib/Parity.java
+++ b/parity-clib/Parity.java
@@ -20,44 +20,67 @@
* Interface to the Parity client.
*/
public class Parity {
- /**
- * Starts the Parity client with the CLI options passed as an array of strings.
- *
- * Each space-delimited option corresponds to an array entry.
- * For example: `["--port", "12345"]`
- *
- * @param options The CLI options to start Parity with
- */
- public Parity(String[] options) {
- long config = configFromCli(options);
- inner = build(config);
- }
-
- /** Performs a synchronous RPC query.
- *
- * Note that this will block the current thread until the query is finished. You are
- * encouraged to create a background thread if you don't want to block.
- *
- * @param query The JSON-encoded RPC query to perform
- * @return A JSON-encoded result
- */
- public String rpcQuery(String query) {
- return rpcQueryNative(inner, query);
- }
-
- @Override
- protected void finalize() {
- destroy(inner);
- }
-
- static {
- System.loadLibrary("parity");
- }
-
- private static native long configFromCli(String[] cliOptions);
- private static native long build(long config);
- private static native void destroy(long inner);
- private static native String rpcQueryNative(long inner, String rpc);
-
- private long inner;
+ /**
+ * Starts the Parity client with the CLI options passed as an array of strings.
+ *
+ * Each space-delimited option corresponds to an array entry.
+ * For example: `["--port", "12345"]`
+ *
+ * @param options The CLI options to start Parity with
+ */
+ public Parity(String[] options) {
+ long config = configFromCli(options);
+ inner = build(config);
+ }
+
+ /** Performs an asynchronous RPC query by spawning a background thread that is executed until
+ * either a response is received or the timeout has been expired.
+ *
+ * @param query The JSON-encoded RPC query to perform
+ * @param timeoutMillis The maximum time in milliseconds that the query will run
+ * @param callback An instance of class which must have a instance method named `callback` that will be
+ * invoke when the result is ready
+ */
+ public void rpcQuery(String query, long timeoutMillis, Object callback) {
+ rpcQueryNative(inner, query, timeoutMillis, callback);
+ }
+
+ /** Subscribes to a specific WebSocket event that will run in a background thread until it is canceled.
+ *
+ * @param query The JSON-encoded RPC query to perform
+ * @param callback An instance of class which must have a instance method named `callback` that will be invoked
+ * when the result is ready
+ *
+ * @return A pointer to the current sessions which can be used to terminate the session later
+ */
+ public long subscribeWebSocket(String query, Object callback) {
+ return subscribeWebSocketNative(inner, query, callback);
+ }
+
+ /** Unsubscribes to a specific WebSocket event
+ *
+ * @param session Pointer the the session to terminate
+ */
+ public void unsubscribeWebSocket(long session) {
+ unsubscribeWebSocketNative(session);
+ }
+
+ // FIXME: `finalize` is deprecated - https://github.com/paritytech/parity-ethereum/issues/10066
+ @Override
+ protected void finalize() {
+ destroy(inner);
+ }
+
+ static {
+ System.loadLibrary("parity");
+ }
+
+ private static native long configFromCli(String[] cliOptions);
+ private static native long build(long config);
+ private static native void destroy(long inner);
+ private static native void rpcQueryNative(long inner, String rpc, long timeoutMillis, Object callback);
+ private static native long subscribeWebSocketNative(long inner, String rpc, Object callback);
+ private static native void unsubscribeWebSocketNative(long session);
+
+ private long inner;
}
diff --git a/parity-clib/examples/cpp/CMakeLists.txt b/parity-clib/examples/cpp/CMakeLists.txt
index 69b58c211a1..d3aaf457b3b 100644
--- a/parity-clib/examples/cpp/CMakeLists.txt
+++ b/parity-clib/examples/cpp/CMakeLists.txt
@@ -1,8 +1,7 @@
cmake_minimum_required(VERSION 3.5)
include(ExternalProject)
-
-include_directories("${CMAKE_SOURCE_DIR}/../../../parity-clib")
-
+include_directories("${CMAKE_SOURCE_DIR}/../..")
+set (CMAKE_CXX_STANDARD 11) # Enfore C++11
add_executable(parity-example main.cpp)
ExternalProject_Add(
diff --git a/parity-clib/examples/cpp/main.cpp b/parity-clib/examples/cpp/main.cpp
index c5e83d06492..aab05c9066c 100644
--- a/parity-clib/examples/cpp/main.cpp
+++ b/parity-clib/examples/cpp/main.cpp
@@ -14,44 +14,169 @@
// You should have received a copy of the GNU General Public License
// along with Parity. If not, see .
-#include
-#include
-#include
-#include
-#include
+#include
#include
+#include
+#include
+#include
-void on_restart(void*, const char*, size_t) {}
+void* parity_run(std::vector);
+int parity_subscribe_to_websocket(void*);
+int parity_rpc_queries(void*);
+
+const int SUBSCRIPTION_ID_LEN = 18;
+const size_t TIMEOUT_ONE_MIN_AS_MILLIS = 60 * 1000;
+const unsigned int CALLBACK_RPC = 1;
+const unsigned int CALLBACK_WS = 2;
+
+struct Callback {
+ unsigned int type;
+ long unsigned int counter;
+};
+
+// list of rpc queries
+const std::vector rpc_queries {
+ "{\"method\":\"parity_versionInfo\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}",
+ "{\"method\":\"eth_getTransactionReceipt\",\"params\":[\"0x444172bef57ad978655171a8af2cfd89baa02a97fcb773067aef7794d6913fff\"],\"id\":1,\"jsonrpc\":\"2.0\"}",
+ "{\"method\":\"eth_estimateGas\",\"params\":[{\"from\":\"0x0066Dc48bb833d2B59f730F33952B3c29fE926F5\"}],\"id\":1,\"jsonrpc\":\"2.0\"}",
+ "{\"method\":\"eth_getBalance\",\"params\":[\"0x0066Dc48bb833d2B59f730F33952B3c29fE926F5\"],\"id\":1,\"jsonrpc\":\"2.0\"}"
+};
+
+// list of subscriptions
+const std::vector ws_subscriptions {
+ "{\"method\":\"parity_subscribe\",\"params\":[\"eth_getBalance\",[\"0xcd2a3d9f938e13cd947ec05abc7fe734df8dd826\",\"latest\"]],\"id\":1,\"jsonrpc\":\"2.0\"}",
+ "{\"method\":\"parity_subscribe\",\"params\":[\"parity_netPeers\"],\"id\":1,\"jsonrpc\":\"2.0\"}",
+ "{\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"],\"id\":1,\"jsonrpc\":\"2.0\"}"
+};
+
+// callback that gets invoked upon an event
+void callback(void* user_data, const char* response, size_t _len) {
+ Callback* cb = static_cast(user_data);
+ if (cb->type == CALLBACK_RPC) {
+ printf("rpc response: %s\r\n", response);
+ cb->counter -= 1;
+ } else if (cb->type == CALLBACK_WS) {
+ printf("websocket response: %s\r\n", response);
+ std::regex is_subscription ("\\{\"jsonrpc\":\"2.0\",\"result\":\"0[xX][a-fA-F0-9]{16}\",\"id\":1\\}");
+ if (std::regex_match(response, is_subscription) == true) {
+ cb->counter -= 1;
+ }
+ }
+}
int main() {
- ParityParams cfg = { 0 };
- cfg.on_client_restart_cb = on_restart;
-
- const char* args[] = {"--no-ipc"};
- size_t str_lens[] = {8};
- if (parity_config_from_cli(args, str_lens, 1, &cfg.configuration) != 0) {
- return 1;
- }
-
- void* parity;
- if (parity_start(&cfg, &parity) != 0) {
- return 1;
- }
-
- const char* rpc = "{\"method\":\"parity_versionInfo\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}";
- size_t out_len = 256;
- char* out = (char*)malloc(out_len + 1);
- if (parity_rpc(parity, rpc, strlen(rpc), out, &out_len)) {
- return 1;
- }
- out[out_len] = '\0';
- printf("RPC output: %s", out);
- free(out);
-
- sleep(5);
- if (parity != NULL) {
- parity_destroy(parity);
- }
-
- return 0;
+ // run full-client
+ {
+ std::vector config = {"--no-ipc" , "--jsonrpc-apis=all", "--chain", "kovan"};
+ void* parity = parity_run(config);
+ if (parity_rpc_queries(parity)) {
+ printf("rpc_queries failed\r\n");
+ return 1;
+ }
+
+ if (parity_subscribe_to_websocket(parity)) {
+ printf("ws_queries failed\r\n");
+ return 1;
+ }
+
+ if (parity != nullptr) {
+ parity_destroy(parity);
+ }
+ }
+
+ // run light-client
+ {
+ std::vector light_config = {"--no-ipc", "--light", "--jsonrpc-apis=all", "--chain", "kovan"};
+ void* parity = parity_run(light_config);
+
+ if (parity_rpc_queries(parity)) {
+ printf("rpc_queries failed\r\n");
+ return 1;
+ }
+
+ if (parity_subscribe_to_websocket(parity)) {
+ printf("ws_queries failed\r\n");
+ return 1;
+ }
+
+ if (parity != nullptr) {
+ parity_destroy(parity);
+ }
+ }
+ return 0;
+}
+
+int parity_rpc_queries(void* parity) {
+ if (!parity) {
+ return 1;
+ }
+
+ Callback cb { .type = CALLBACK_RPC, .counter = rpc_queries.size() };
+
+ for (auto query : rpc_queries) {
+ if (parity_rpc(parity, query.c_str(), query.length(), TIMEOUT_ONE_MIN_AS_MILLIS, callback, &cb) != 0) {
+ return 1;
+ }
+ }
+
+ while(cb.counter != 0);
+ return 0;
+}
+
+
+int parity_subscribe_to_websocket(void* parity) {
+ if (!parity) {
+ return 1;
+ }
+
+ std::vector sessions;
+
+ Callback cb { .type = CALLBACK_WS, .counter = ws_subscriptions.size() };
+
+ for (auto sub : ws_subscriptions) {
+ void *const session = parity_subscribe_ws(parity, sub.c_str(), sub.length(), callback, &cb);
+ if (!session) {
+ return 1;
+ }
+ sessions.push_back(session);
+ }
+
+ while(cb.counter != 0);
+ std::this_thread::sleep_for(std::chrono::seconds(60));
+ for (auto session : sessions) {
+ parity_unsubscribe_ws(session);
+ }
+ return 0;
+}
+
+void* parity_run(std::vector args) {
+ ParityParams cfg = {
+ .configuration = nullptr,
+ .on_client_restart_cb = callback,
+ .on_client_restart_cb_custom = nullptr
+ };
+
+ std::vector str_lens;
+
+ for (auto arg: args) {
+ str_lens.push_back(std::strlen(arg));
+ }
+
+ // make sure no out-of-range access happens here
+ if (args.empty()) {
+ if (parity_config_from_cli(nullptr, nullptr, 0, &cfg.configuration) != 0) {
+ return nullptr;
+ }
+ } else {
+ if (parity_config_from_cli(&args[0], &str_lens[0], args.size(), &cfg.configuration) != 0) {
+ return nullptr;
+ }
+ }
+
+ void *parity = nullptr;
+ if (parity_start(&cfg, &parity) != 0) {
+ return nullptr;
+ }
+
+ return parity;
}
diff --git a/parity-clib/examples/java/Main.java b/parity-clib/examples/java/Main.java
new file mode 100644
index 00000000000..88189af1c53
--- /dev/null
+++ b/parity-clib/examples/java/Main.java
@@ -0,0 +1,109 @@
+// Copyright 2018 Parity Technologies (UK) Ltd.
+// This file is part of Parity.
+
+// Parity is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Parity is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Parity. If not, see .
+
+import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
+import io.parity.ethereum.Parity;
+
+class Main {
+ public static final int ONE_MINUTE_AS_MILLIS = 60 * 1000;
+
+ public static final String[] rpc_queries = {
+ "{\"method\":\"parity_versionInfo\",\"params\":[],\"id\":1,\"jsonrpc\":\"2.0\"}",
+ "{\"method\":\"eth_getTransactionReceipt\",\"params\":[\"0x444172bef57ad978655171a8af2cfd89baa02a97fcb773067aef7794d6913fff\"],\"id\":1,\"jsonrpc\":\"2.0\"}",
+ "{\"method\":\"eth_estimateGas\",\"params\":[{\"from\":\"0x0066Dc48bb833d2B59f730F33952B3c29fE926F5\"}],\"id\":1,\"jsonrpc\":\"2.0\"}",
+ "{\"method\":\"eth_getBalance\",\"params\":[\"0x0066Dc48bb833d2B59f730F33952B3c29fE926F5\"],\"id\":1,\"jsonrpc\":\"2.0\"}"
+ };
+
+ public static final String[] ws_queries = {
+ "{\"method\":\"parity_subscribe\",\"params\":[\"eth_getBalance\",[\"0xcd2a3d9f938e13cd947ec05abc7fe734df8dd826\",\"latest\"]],\"id\":1,\"jsonrpc\":\"2.0\"}",
+ "{\"method\":\"parity_subscribe\",\"params\":[\"parity_netPeers\"],\"id\":1,\"jsonrpc\":\"2.0\"}",
+ "{\"method\":\"eth_subscribe\",\"params\":[\"newHeads\"],\"id\":1,\"jsonrpc\":\"2.0\"}"
+ };
+
+ public static void runParity(String[] config) {
+ Parity parity = new Parity(config);
+
+ Callback rpcCallback = new Callback(1);
+ Callback webSocketCallback = new Callback(2);
+
+ for (String query : rpc_queries) {
+ parity.rpcQuery(query, ONE_MINUTE_AS_MILLIS, rpcCallback);
+ }
+
+ while (rpcCallback.getNumCallbacks() != 4);
+
+ Vector sessions = new Vector();
+
+ for (String ws : ws_queries) {
+ long session = parity.subscribeWebSocket(ws, webSocketCallback);
+ sessions.add(session);
+ }
+
+ try {
+ Thread.sleep(ONE_MINUTE_AS_MILLIS);
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+
+ for (long session : sessions) {
+ parity.unsubscribeWebSocket(session);
+ }
+
+ // Force GC to destroy parity
+ parity = null;
+ System.gc();
+ }
+
+ public static void main(String[] args) {
+ String[] full = {"--no-ipc" , "--jsonrpc-apis=all", "--chain", "kovan"};
+ String[] light = {"--no-ipc", "--light", "--jsonrpc-apis=all", "--chain", "kovan"};
+
+ runParity(full);
+
+ try {
+ Thread.sleep(ONE_MINUTE_AS_MILLIS);
+ } catch (Exception e) {
+ System.out.println(e);
+ }
+
+ runParity(light);
+ }
+}
+
+class Callback {
+ private AtomicInteger counter;
+ private final int callbackType;
+
+ public Callback(int type) {
+ counter = new AtomicInteger();
+ callbackType = type;
+ }
+
+ public void callback(Object response) {
+ response = (String) response;
+ if (callbackType == 1) {
+ System.out.println("rpc: " + response);
+ } else if (callbackType == 2) {
+ System.out.println("ws: " + response);
+ }
+ counter.getAndIncrement();
+ }
+
+ public int getNumCallbacks() {
+ return counter.intValue();
+ }
+}
diff --git a/parity-clib/examples/java/README.md b/parity-clib/examples/java/README.md
new file mode 100644
index 00000000000..ec83905bf2b
--- /dev/null
+++ b/parity-clib/examples/java/README.md
@@ -0,0 +1,9 @@
+parity-clib: Java example
+===================================
+
+An example Java application to demonstrate how to use `jni` bindings to parity-ethereum. Note, that the example is built in debug-mode to reduce the build time. If you want to use it in real project use release-mode instead to facilitate all compiler optimizations.
+
+## How to compile and run
+
+1. Make sure you have installed [JDK](https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html)
+2. Run `run.sh`
\ No newline at end of file
diff --git a/parity-clib/examples/java/run.sh b/parity-clib/examples/java/run.sh
new file mode 100755
index 00000000000..428a7dc751f
--- /dev/null
+++ b/parity-clib/examples/java/run.sh
@@ -0,0 +1,15 @@
+#!/usr/bin/env bash
+
+FLAGS="-Xlint:deprecation"
+PARITY_JAVA="../../Parity.java"
+# parity-clib must be built with feature `jni` in debug-mode to work
+PARITY_LIB=".:../../../target/debug/"
+
+# build
+cd ..
+cargo build --features jni
+cd -
+javac $FLAGS -d $PWD $PARITY_JAVA
+javac $FLAGS *.java
+# Setup the path `libparity.so` and run
+java -Djava.library.path=$PARITY_LIB Main
diff --git a/parity-clib/parity.h b/parity-clib/parity.h
index 9be077b4d30..71d6ca775db 100644
--- a/parity-clib/parity.h
+++ b/parity-clib/parity.h
@@ -57,7 +57,7 @@ extern "C" {
/// const char *args[] = {"--light", "--can-restart"};
/// size_t str_lens[] = {7, 13};
/// if (parity_config_from_cli(args, str_lens, 2, &cfg) != 0) {
-/// return 1;
+/// return 1;
/// }
/// ```
///
@@ -86,21 +86,44 @@ int parity_start(const ParityParams* params, void** out);
/// must not call this function.
void parity_destroy(void* parity);
-/// Performs an RPC request.
+/// Performs an asynchronous RPC request running in a background thread for at most X milliseconds
///
-/// Blocks the current thread until the request is finished. You are therefore encouraged to spawn
-/// a new thread for each RPC request that requires accessing the blockchain.
+/// - parity : Reference to the running parity client
+/// - rpc_query : JSON encoded string representing the RPC request.
+/// - len : Length of the RPC query
+/// - timeout_ms : Maximum time that request is waiting for a response
+/// - response : Callback to invoke when the query gets answered. It will respond with a JSON encoded the string
+/// with the result both on success and error.
+/// - ud : Specific user defined data that can used in the callback
///
-/// - `rpc` and `len` must contain the JSON string representing the RPC request.
-/// - `out_str` and `out_len` point to a buffer where the output JSON result will be stored. If the
-/// buffer is not large enough, the function fails.
-/// - `out_len` will receive the final length of the string.
-/// - On success, the function returns 0. On failure, it returns 1.
+/// - On success : The function returns 0
+/// - On error : The function returns 1
///
-/// **Important**: Keep in mind that this function doesn't write any null terminator on the output
-/// string.
+int parity_rpc(const void *const parity, const char* rpc_query, size_t rpc_len, size_t timeout_ms,
+ void (*subscribe)(void* ud, const char* response, size_t len), void* ud);
+
+
+/// Subscribes to a specific websocket event that will run until it is canceled
+///
+/// - parity : Reference to the running parity client
+/// - ws_query : JSON encoded string representing the websocket event to subscribe to
+/// - len : Length of the query
+/// - response : Callback to invoke when a websocket event occurs
+/// - ud : Specific user defined data that can used in the callback
+///
+/// - On success : The function returns an object to the current session
+/// which can be used cancel the subscription
+/// - On error : The function returns a null pointer
+///
+void* parity_subscribe_ws(const void *const parity, const char* ws_query, size_t len,
+ void (*subscribe)(void* ud, const char* response, size_t len), void* ud);
+
+/// Unsubscribes from a websocket subscription. Caution this function consumes the session object and must only be
+/// used exactly once per session.
+///
+/// - session : Pointer to the session to unsubscribe from
///
-int parity_rpc(void* parity, const char* rpc, size_t len, char* out_str, size_t* out_len);
+int parity_unsubscribe_ws(const void *const session);
/// Sets a callback to call when a panic happens in the Rust code.
///
diff --git a/parity-clib/src/java.rs b/parity-clib/src/java.rs
new file mode 100644
index 00000000000..30e63e60162
--- /dev/null
+++ b/parity-clib/src/java.rs
@@ -0,0 +1,211 @@
+use std::{mem, ptr};
+use std::sync::Arc;
+use std::time::Duration;
+use std::thread;
+use std::os::raw::c_void;
+
+use {parity_config_from_cli, parity_destroy, parity_start, parity_unsubscribe_ws, ParityParams, error};
+
+use futures::{Future, Stream};
+use futures::sync::mpsc;
+use jni::{JavaVM, JNIEnv};
+use jni::objects::{JClass, JString, JObject, JValue, GlobalRef};
+use jni::sys::{jlong, jobjectArray, va_list};
+use tokio_current_thread::CurrentThread;
+use parity_ethereum::{RunningClient, PubSubSession};
+
+type CheckedQuery<'a> = (&'a RunningClient, String, JavaVM, GlobalRef);
+
+// Creates a Java callback to a static method named `void callback(Object)`
+struct Callback<'a> {
+ jvm: JavaVM,
+ callback: GlobalRef,
+ method_name: &'a str,
+ method_descriptor: &'a str,
+}
+
+unsafe impl<'a> Send for Callback<'a> {}
+unsafe impl<'a> Sync for Callback<'a> {}
+impl<'a> Callback<'a> {
+ fn new(jvm: JavaVM, callback: GlobalRef) -> Self {
+ Self {
+ jvm,
+ callback,
+ method_name: "callback",
+ method_descriptor: "(Ljava/lang/Object;)V",
+ }
+ }
+
+ fn call(&self, msg: &str) {
+ let env = self.jvm.attach_current_thread().expect("JavaVM should have an environment; qed");
+ let java_str = env.new_string(msg.to_string()).expect("Rust String is valid JString; qed");
+ let val = &[JValue::Object(JObject::from(java_str))];
+ env.call_method(self.callback.as_obj(), self.method_name, self.method_descriptor, val).expect(
+ "The callback must be an instance method and be named \"void callback(Object)\"; qed)");
+ }
+}
+
+#[no_mangle]
+pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: JNIEnv, _: JClass, cli: jobjectArray) -> jlong {
+ let cli_len = env.get_array_length(cli).expect("invalid Java bindings");
+
+ let mut jni_strings = Vec::with_capacity(cli_len as usize);
+ let mut opts = Vec::with_capacity(cli_len as usize);
+ let mut opts_lens = Vec::with_capacity(cli_len as usize);
+
+ for n in 0..cli_len {
+ let elem = env.get_object_array_element(cli, n).expect("invalid Java bindings");
+ let elem_str: JString = elem.into();
+ match env.get_string(elem_str) {
+ Ok(s) => {
+ opts.push(s.as_ptr());
+ opts_lens.push(s.to_bytes().len());
+ jni_strings.push(s);
+ },
+ Err(err) => {
+ let _ = env.throw_new("java/lang/Exception", err.to_string());
+ return 0
+ }
+ };
+ }
+
+ let mut out = ptr::null_mut();
+ match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len as usize, &mut out) {
+ 0 => out as jlong,
+ _ => {
+ let _ = env.throw_new("java/lang/Exception", "failed to create config object");
+ 0
+ },
+ }
+}
+
+#[no_mangle]
+pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_build(env: JNIEnv, _: JClass, config: va_list) -> jlong {
+ let params = ParityParams {
+ configuration: config,
+ .. mem::zeroed()
+ };
+
+ let mut out = ptr::null_mut();
+ match parity_start(¶ms, &mut out) {
+ 0 => out as jlong,
+ _ => {
+ let _ = env.throw_new("java/lang/Exception", "failed to start Parity");
+ 0
+ },
+ }
+}
+
+#[no_mangle]
+pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_destroy(_env: JNIEnv, _: JClass, parity: va_list) {
+ parity_destroy(parity);
+}
+
+unsafe fn async_checker<'a>(client: va_list, rpc: JString, callback: JObject, env: &JNIEnv<'a>)
+-> Result, String> {
+ let query: String = env.get_string(rpc)
+ .map(Into::into)
+ .map_err(|e| e.to_string())?;
+
+ let client: &RunningClient = &*(client as *const RunningClient);
+ let jvm = env.get_java_vm().map_err(|e| e.to_string())?;
+ let global_ref = env.new_global_ref(callback).map_err(|e| e.to_string())?;
+ Ok((client, query, jvm, global_ref))
+}
+
+#[no_mangle]
+pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_rpcQueryNative(
+ env: JNIEnv,
+ _: JClass,
+ parity: va_list,
+ rpc: JString,
+ timeout_ms: jlong,
+ callback: JObject,
+ )
+{
+ let _ = async_checker(parity, rpc, callback, &env)
+ .map(|(client, query, jvm, global_ref)| {
+ let callback = Arc::new(Callback::new(jvm, global_ref));
+ let cb = callback.clone();
+ let future = client.rpc_query(&query, None).map(move |response| {
+ let response = response.unwrap_or_else(|| error::EMPTY.to_string());
+ callback.call(&response);
+ });
+
+ let _handle = thread::Builder::new()
+ .name("rpc_query".to_string())
+ .spawn(move || {
+ let mut current_thread = CurrentThread::new();
+ current_thread.spawn(future);
+ let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64))
+ .map_err(|_e| {
+ cb.call(error::TIMEOUT);
+ });
+ })
+ .expect("rpc-query thread shouldn't fail; qed");
+ })
+ .map_err(|e| {
+ let _ = env.throw_new("java/lang/Exception", e);
+ });
+}
+
+#[no_mangle]
+pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_subscribeWebSocketNative(
+ env: JNIEnv,
+ _: JClass,
+ parity: va_list,
+ rpc: JString,
+ callback: JObject,
+ ) -> va_list {
+
+ async_checker(parity, rpc, callback, &env)
+ .map(move |(client, query, jvm, global_ref)| {
+ let callback = Arc::new(Callback::new(jvm, global_ref));
+ let (tx, mut rx) = mpsc::channel(1);
+ let session = Arc::new(PubSubSession::new(tx));
+ let weak_session = Arc::downgrade(&session);
+ let query_future = client.rpc_query(&query, Some(session.clone()));;
+
+ let _handle = thread::Builder::new()
+ .name("ws-subscriber".into())
+ .spawn(move || {
+ // Wait for subscription ID
+ // Note this may block forever and can't be destroyed using the session object
+ // However, this will likely timeout or be catched the RPC layer
+ if let Ok(Some(response)) = query_future.wait() {
+ callback.call(&response);
+ } else {
+ callback.call(error::SUBSCRIBE);
+ return;
+ };
+
+ loop {
+ for response in rx.by_ref().wait() {
+ if let Ok(r) = response {
+ callback.call(&r);
+ }
+ }
+
+ let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session));
+ // No subscription left, then terminate
+ if rc <= 1 {
+ break;
+ }
+ }
+ })
+ .expect("rpc-subscriber thread shouldn't fail; qed");
+ Arc::into_raw(session) as va_list
+ })
+ .unwrap_or_else(|e| {
+ let _ = env.throw_new("java/lang/Exception", e);
+ ptr::null_mut()
+ })
+}
+
+#[no_mangle]
+pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_unsubscribeWebSocketNative(
+ _: JNIEnv,
+ _: JClass,
+ session: va_list) {
+ parity_unsubscribe_ws(session as *const c_void);
+}
diff --git a/parity-clib/src/lib.rs b/parity-clib/src/lib.rs
index 7791f97bd5e..b4f776c89c0 100644
--- a/parity-clib/src/lib.rs
+++ b/parity-clib/src/lib.rs
@@ -17,31 +17,52 @@
//! Note that all the structs and functions here are documented in `parity.h`, to avoid
//! duplicating documentation.
-#[cfg(feature = "jni")]
-extern crate jni;
-extern crate parity_ethereum;
+extern crate futures;
extern crate panic_hook;
-
-use std::os::raw::{c_char, c_void, c_int};
-use std::panic;
-use std::ptr;
-use std::slice;
-use std::str;
+extern crate parity_ethereum;
+extern crate tokio;
+extern crate tokio_current_thread;
#[cfg(feature = "jni")]
-use std::mem;
+extern crate jni;
+
#[cfg(feature = "jni")]
-use jni::{JNIEnv, objects::JClass, objects::JString, sys::jlong, sys::jobjectArray};
+mod java;
+
+use std::ffi::CString;
+use std::os::raw::{c_char, c_void, c_int};
+use std::{panic, ptr, slice, str, thread};
+use std::sync::Arc;
+use std::time::Duration;
+
+use futures::{Future, Stream};
+use futures::sync::mpsc;
+use parity_ethereum::{PubSubSession, RunningClient};
+use tokio_current_thread::CurrentThread;
+
+type Callback = Option;
+type CheckedQuery<'a> = (&'a RunningClient, &'static str);
+
+pub mod error {
+ pub const EMPTY: &str = r#"{"jsonrpc":"2.0","result":"null","id":1}"#;
+ pub const TIMEOUT: &str = r#"{"jsonrpc":"2.0","result":"timeout","id":1}"#;
+ pub const SUBSCRIBE: &str = r#"{"jsonrpc":"2.0","result":"subcribe_fail","id":1}"#;
+}
#[repr(C)]
pub struct ParityParams {
pub configuration: *mut c_void,
- pub on_client_restart_cb: Option,
+ pub on_client_restart_cb: Callback,
pub on_client_restart_cb_custom: *mut c_void,
}
#[no_mangle]
-pub unsafe extern fn parity_config_from_cli(args: *const *const c_char, args_lens: *const usize, len: usize, output: *mut *mut c_void) -> c_int {
+pub unsafe extern fn parity_config_from_cli(
+ args: *const *const c_char,
+ args_lens: *const usize,
+ len: usize,
+ output: *mut *mut c_void
+) -> c_int {
panic::catch_unwind(|| {
*output = ptr::null_mut();
@@ -59,7 +80,6 @@ pub unsafe extern fn parity_config_from_cli(args: *const *const c_char, args_len
Err(_) => return 1,
};
}
-
args
};
@@ -95,8 +115,11 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_
let config = Box::from_raw(cfg.configuration as *mut parity_ethereum::Configuration);
let on_client_restart_cb = {
- let cb = CallbackStr(cfg.on_client_restart_cb, cfg.on_client_restart_cb_custom);
- move |new_chain: String| { cb.call(&new_chain); }
+ let cb = CallbackStr {
+ user_data: cfg.on_client_restart_cb_custom,
+ function: cfg.on_client_restart_cb,
+ };
+ move |new_chain: String| { cb.call(new_chain.as_bytes()); }
};
let action = match parity_ethereum::start(*config, on_client_restart_cb, || {}) {
@@ -118,32 +141,53 @@ pub unsafe extern fn parity_start(cfg: *const ParityParams, output: *mut *mut c_
#[no_mangle]
pub unsafe extern fn parity_destroy(client: *mut c_void) {
let _ = panic::catch_unwind(|| {
- let client = Box::from_raw(client as *mut parity_ethereum::RunningClient);
+ let client = Box::from_raw(client as *mut RunningClient);
client.shutdown();
});
}
-#[no_mangle]
-pub unsafe extern fn parity_rpc(client: *mut c_void, query: *const c_char, len: usize, out_str: *mut c_char, out_len: *mut usize) -> c_int {
- panic::catch_unwind(|| {
- let client: &mut parity_ethereum::RunningClient = &mut *(client as *mut parity_ethereum::RunningClient);
- let query_str = {
+unsafe fn parity_rpc_query_checker<'a>(client: *const c_void, query: *const c_char, len: usize)
+ -> Option>
+{
+ let query_str = {
let string = slice::from_raw_parts(query as *const u8, len);
- match str::from_utf8(string) {
- Ok(a) => a,
- Err(_) => return 1,
- }
- };
-
- if let Some(output) = client.rpc_query_sync(query_str) {
- let q_out_len = output.as_bytes().len();
- if *out_len < q_out_len {
- return 1;
- }
+ str::from_utf8(string).ok()?
+ };
+ let client: &RunningClient = &*(client as *const RunningClient);
+ Some((client, query_str))
+}
- ptr::copy_nonoverlapping(output.as_bytes().as_ptr(), out_str as *mut u8, q_out_len);
- *out_len = q_out_len;
+#[no_mangle]
+pub unsafe extern fn parity_rpc(
+ client: *const c_void,
+ query: *const c_char,
+ len: usize,
+ timeout_ms: usize,
+ callback: Callback,
+ user_data: *mut c_void,
+) -> c_int {
+ panic::catch_unwind(|| {
+ if let Some((client, query)) = parity_rpc_query_checker(client, query, len) {
+ let client = client as &RunningClient;
+ let callback = Arc::new(CallbackStr {user_data, function: callback} );
+ let cb = callback.clone();
+ let query = client.rpc_query(query, None).map(move |response| {
+ let response = response.unwrap_or_else(|| error::EMPTY.to_string());
+ callback.call(response.as_bytes());
+ });
+
+ let _handle = thread::Builder::new()
+ .name("rpc_query".to_string())
+ .spawn(move || {
+ let mut current_thread = CurrentThread::new();
+ current_thread.spawn(query);
+ let _ = current_thread.run_timeout(Duration::from_millis(timeout_ms as u64))
+ .map_err(|_e| {
+ cb.call(error::TIMEOUT.as_bytes());
+ });
+ })
+ .expect("rpc-query thread shouldn't fail; qed");
0
} else {
1
@@ -152,116 +196,86 @@ pub unsafe extern fn parity_rpc(client: *mut c_void, query: *const c_char, len:
}
#[no_mangle]
-pub unsafe extern fn parity_set_panic_hook(callback: extern "C" fn(*mut c_void, *const c_char, usize), param: *mut c_void) {
- let cb = CallbackStr(Some(callback), param);
- panic_hook::set_with(move |panic_msg| {
- cb.call(panic_msg);
- });
-}
+pub unsafe extern fn parity_subscribe_ws(
+ client: *const c_void,
+ query: *const c_char,
+ len: usize,
+ callback: Callback,
+ user_data: *mut c_void,
+) -> *const c_void {
-// Internal structure for handling callbacks that get passed a string.
-struct CallbackStr(Option, *mut c_void);
-unsafe impl Send for CallbackStr {}
-unsafe impl Sync for CallbackStr {}
-impl CallbackStr {
- fn call(&self, new_chain: &str) {
- if let Some(ref cb) = self.0 {
- cb(self.1, new_chain.as_bytes().as_ptr() as *const _, new_chain.len())
+ panic::catch_unwind(|| {
+ if let Some((client, query)) = parity_rpc_query_checker(client, query, len) {
+ let (tx, mut rx) = mpsc::channel(1);
+ let session = Arc::new(PubSubSession::new(tx));
+ let query_future = client.rpc_query(query, Some(session.clone()));
+ let weak_session = Arc::downgrade(&session);
+ let cb = CallbackStr { user_data, function: callback};
+
+ let _handle = thread::Builder::new()
+ .name("ws-subscriber".into())
+ .spawn(move || {
+ // Wait for subscription ID
+ // Note this may block forever and be can't destroyed using the session object
+ // However, this will likely timeout or be catched the RPC layer
+ if let Ok(Some(response)) = query_future.wait() {
+ cb.call(response.as_bytes());
+ } else {
+ cb.call(error::SUBSCRIBE.as_bytes());
+ return;
+ }
+
+ loop {
+ for response in rx.by_ref().wait() {
+ if let Ok(r) = response {
+ cb.call(r.as_bytes());
+ }
+ }
+
+ let rc = weak_session.upgrade().map_or(0,|session| Arc::strong_count(&session));
+ // No subscription left, then terminate
+ if rc <= 1 {
+ break;
+ }
+ }
+ })
+ .expect("rpc-subscriber thread shouldn't fail; qed");
+ Arc::into_raw(session) as *const c_void
+ } else {
+ ptr::null()
}
- }
+ })
+ .unwrap_or(ptr::null())
}
-#[cfg(feature = "jni")]
#[no_mangle]
-pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_configFromCli(env: JNIEnv, _: JClass, cli: jobjectArray) -> jlong {
- let cli_len = env.get_array_length(cli).expect("invalid Java bindings");
-
- let mut jni_strings = Vec::with_capacity(cli_len as usize);
- let mut opts = Vec::with_capacity(cli_len as usize);
- let mut opts_lens = Vec::with_capacity(cli_len as usize);
-
- for n in 0 .. cli_len {
- let elem = env.get_object_array_element(cli, n).expect("invalid Java bindings");
- let elem_str: JString = elem.into();
- match env.get_string(elem_str) {
- Ok(s) => {
- opts.push(s.as_ptr());
- opts_lens.push(s.to_bytes().len());
- jni_strings.push(s);
- },
- Err(err) => {
- let _ = env.throw_new("java/lang/Exception", err.to_string());
- return 0
- }
- };
- }
-
- let mut out = ptr::null_mut();
- match parity_config_from_cli(opts.as_ptr(), opts_lens.as_ptr(), cli_len as usize, &mut out) {
- 0 => out as usize as jlong,
- _ => {
- let _ = env.throw_new("java/lang/Exception", "failed to create config object");
- 0
- },
- }
+pub unsafe extern fn parity_unsubscribe_ws(session: *const c_void) {
+ let _ = panic::catch_unwind(|| {
+ let _session = Arc::from_raw(session as *const PubSubSession);
+ });
}
-#[cfg(feature = "jni")]
#[no_mangle]
-pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_build(env: JNIEnv, _: JClass, config: jlong) -> jlong {
- let params = ParityParams {
- configuration: config as usize as *mut c_void,
- .. mem::zeroed()
- };
-
- let mut out = ptr::null_mut();
- match parity_start(¶ms, &mut out) {
- 0 => out as usize as jlong,
- _ => {
- let _ = env.throw_new("java/lang/Exception", "failed to start Parity");
- 0
- },
- }
+pub unsafe extern fn parity_set_panic_hook(callback: Callback, param: *mut c_void) {
+ let cb = CallbackStr {user_data: param, function: callback};
+ panic_hook::set_with(move |panic_msg| {
+ cb.call(panic_msg.as_bytes());
+ });
}
-#[cfg(feature = "jni")]
-#[no_mangle]
-pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_destroy(_env: JNIEnv, _: JClass, parity: jlong) {
- let parity = parity as usize as *mut c_void;
- parity_destroy(parity);
+// Internal structure for handling callbacks that get passed a string.
+struct CallbackStr {
+ user_data: *mut c_void,
+ function: Callback,
}
-#[cfg(feature = "jni")]
-#[no_mangle]
-pub unsafe extern "system" fn Java_io_parity_ethereum_Parity_rpcQueryNative<'a>(env: JNIEnv<'a>, _: JClass, parity: jlong, rpc: JString) -> JString<'a> {
- let parity = parity as usize as *mut c_void;
-
- let rpc = match env.get_string(rpc) {
- Ok(s) => s,
- Err(err) => {
- let _ = env.throw_new("java/lang/Exception", err.to_string());
- return env.new_string("").expect("Creating an empty string never fails");
- },
- };
-
- let mut out_len = 255;
- let mut out = [0u8; 256];
-
- match parity_rpc(parity, rpc.as_ptr(), rpc.to_bytes().len(), out.as_mut_ptr() as *mut c_char, &mut out_len) {
- 0 => (),
- _ => {
- let _ = env.throw_new("java/lang/Exception", "failed to perform RPC query");
- return env.new_string("").expect("Creating an empty string never fails");
- },
- }
-
- let out = str::from_utf8(&out[..out_len])
- .expect("parity always generates an UTF-8 RPC response");
- match env.new_string(out) {
- Ok(s) => s,
- Err(err) => {
- let _ = env.throw_new("java/lang/Exception", err.to_string());
- return env.new_string("").expect("Creating an empty string never fails");
+unsafe impl Send for CallbackStr {}
+unsafe impl Sync for CallbackStr {}
+impl CallbackStr {
+ fn call(&self, msg: &[u8]) {
+ if let Some(ref cb) = self.function {
+ let cstr = CString::new(msg).expect("valid string with no null bytes in the middle; qed").into_raw();
+ cb(self.user_data, cstr, msg.len())
}
}
}
diff --git a/parity/lib.rs b/parity/lib.rs
index b5a614a7bf4..88330f1ac09 100644
--- a/parity/lib.rs
+++ b/parity/lib.rs
@@ -121,6 +121,7 @@ use std::alloc::System;
pub use self::configuration::Configuration;
pub use self::run::RunningClient;
+pub use parity_rpc::PubSubSession;
#[cfg(feature = "memory_profiling")]
#[global_allocator]
diff --git a/parity/run.rs b/parity/run.rs
index 28536ea6a45..6dad5fd6d54 100644
--- a/parity/run.rs
+++ b/parity/run.rs
@@ -41,7 +41,8 @@ use light::Cache as LightDataCache;
use miner::external::ExternalMiner;
use node_filter::NodeFilter;
use parity_runtime::Runtime;
-use parity_rpc::{Origin, Metadata, NetworkSettings, informant, is_major_importing};
+use parity_rpc::{Origin, Metadata, NetworkSettings, informant, is_major_importing, PubSubSession, FutureResult,
+ FutureResponse, FutureOutput};
use updater::{UpdatePolicy, Updater};
use parity_version::version;
use ethcore_private_tx::{ProviderConfig, EncryptorConfig, SecretStoreEncryptor};
@@ -875,21 +876,19 @@ enum RunningClientInner {
}
impl RunningClient {
- /// Performs a synchronous RPC query.
- /// Blocks execution until the result is ready.
- pub fn rpc_query_sync(&self, request: &str) -> Option {
+ /// Performs an asynchronous RPC query.
+ // FIXME: [tomaka] This API should be better, with for example a Future
+ pub fn rpc_query(&self, request: &str, session: Option>)
+ -> FutureResult
+ {
let metadata = Metadata {
origin: Origin::CApi,
- session: None,
+ session,
};
match self.inner {
- RunningClientInner::Light { ref rpc, .. } => {
- rpc.handle_request_sync(request, metadata)
- },
- RunningClientInner::Full { ref rpc, .. } => {
- rpc.handle_request_sync(request, metadata)
- },
+ RunningClientInner::Light { ref rpc, .. } => rpc.handle_request(request, metadata),
+ RunningClientInner::Full { ref rpc, .. } => rpc.handle_request(request, metadata),
}
}
diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs
index ac0d3dd6f7e..ca5ed28bae3 100644
--- a/rpc/src/lib.rs
+++ b/rpc/src/lib.rs
@@ -108,6 +108,7 @@ pub mod v1;
pub mod tests;
+pub use jsonrpc_core::{FutureOutput, FutureResult, FutureResponse, FutureRpcResult};
pub use jsonrpc_pubsub::Session as PubSubSession;
pub use ipc::{Server as IpcServer, MetaExtractor as IpcMetaExtractor, RequestContext as IpcRequestContext};
pub use http::{