Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement subscription support in the client #436

Merged
merged 39 commits into from
Jul 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
222bd50
Rustfmt fixes.
dvc94ch May 24, 2019
42ddba9
Enable logging in tests.
dvc94ch May 24, 2019
44ab59a
Improve pubsub example documentation.
dvc94ch May 24, 2019
524e778
Allow Response to contain a Notification.
dvc94ch May 24, 2019
38d5022
Add jsonrpc-pubsub as a core-client dependency.
dvc94ch May 24, 2019
1fd5fb9
Add pubsub support to local transport.
dvc94ch May 24, 2019
0917910
Turn RpcMessage into an enum.
dvc94ch May 24, 2019
6fea32b
Add SubscriptionStream.
dvc94ch May 24, 2019
0ce7ce2
Add subscription support to RawClient.
dvc94ch May 24, 2019
ecf8b3a
Add subscription support to RequestBuilder.
dvc94ch May 24, 2019
8fdde0d
Add subscription support to Duplex.
dvc94ch May 24, 2019
4cdb76c
Add subscription test.
dvc94ch May 24, 2019
cc40350
Add TypedSubscriptionStream.
dvc94ch May 24, 2019
da986c8
Add subscription support to TypedClient.
dvc94ch May 24, 2019
fb5cdfb
Test typed client subscription.
dvc94ch May 24, 2019
bc8feaa
Add subscription support to procmacro.
dvc94ch May 24, 2019
82b7a96
Handle typed::Subscriber.
dvc94ch May 24, 2019
553a371
Address grumbles.
dvc94ch Jun 15, 2019
ce8cf50
rustfmt fixes.
dvc94ch Jun 15, 2019
831b8c6
Fix tests.
dvc94ch Jun 15, 2019
f25fd78
Avoid unwrapping.
dvc94ch Jun 15, 2019
76539b3
Fix doc tests.
dvc94ch Jun 15, 2019
2966888
Impl From instead of Into.
dvc94ch Jun 28, 2019
fdf38ac
Improve code.
dvc94ch Jun 28, 2019
6d53332
Deny warnings and missing docs.
dvc94ch Jun 28, 2019
7b1a512
Fix explicit dyn warning.
dvc94ch Jun 28, 2019
22094f7
Implement Debug for Duplex.
dvc94ch Jun 28, 2019
d989870
Remove allow(deprecated).
dvc94ch Jun 28, 2019
48e7363
Fix build.
dvc94ch Jun 28, 2019
8db12d2
Fix build on windows.
dvc94ch Jun 28, 2019
1daed1d
Add constructor to LocalRpc.
dvc94ch Jul 1, 2019
027d707
Parse output into subscription id.
dvc94ch Jul 1, 2019
6662f13
Should handle multiple subscriptions.
dvc94ch Jul 1, 2019
108ed61
Add Deserialize bound for subscription args in client
ascjones Jul 2, 2019
df467ff
Merge pull request #1 from ascjones/aj-client-pubsub
dvc94ch Jul 2, 2019
eb918b4
Remove deny(warnings)
tomusdrw Jul 2, 2019
9f4a0a1
Rewrite using less maps.
tomusdrw Jul 2, 2019
2c795e4
Merge pull request #2 from tomusdrw/td-fix
dvc94ch Jul 2, 2019
ce823e6
Bounds for wrapped Subscriber generic types (#3)
ascjones Jul 3, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@
//!
//! See documentation of [`jsonrpc-client-transports`](../jsonrpc_client_transports/) for more details.

#![deny(missing_docs)]

pub use jsonrpc_client_transports::*;
1 change: 1 addition & 0 deletions core-client/transports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ futures = "0.1.26"
hyper = { version = "0.12", optional = true }
hyper-tls = { version = "0.3.2", optional = true }
jsonrpc-core = { version = "12.0", path = "../../core" }
jsonrpc-pubsub = { version = "12.0", path = "../../pubsub" }
log = "0.4"
serde = "1.0"
serde_json = "1.0"
Expand Down
238 changes: 229 additions & 9 deletions core-client/transports/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use jsonrpc_core::{Error, Params};
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;
use std::marker::PhantomData;

pub mod transports;

Expand All @@ -27,9 +28,6 @@ pub enum RpcError {
/// Request timed out.
#[fail(display = "Request timed out")]
Timeout,
/// The server returned a response with an unknown id.
#[fail(display = "Server returned a response with an unknown id")]
UnknownId,
/// Not rpc specific errors.
#[fail(display = "{}", _0)]
Other(failure::Error),
Expand All @@ -41,9 +39,8 @@ impl From<Error> for RpcError {
}
}

/// A message sent to the `RpcClient`. This is public so that
/// the derive crate can generate a client.
struct RpcMessage {
/// A rpc call message.
struct CallMessage {
/// The rpc method name.
method: String,
/// The rpc method parameters.
Expand All @@ -53,6 +50,46 @@ struct RpcMessage {
sender: oneshot::Sender<Result<Value, RpcError>>,
}

/// A rpc subscription.
struct Subscription {
/// The subscribe method name.
subscribe: String,
/// The subscribe method parameters.
subscribe_params: Params,
/// The name of the notification.
notification: String,
/// The unsubscribe method name.
unsubscribe: String,
}

/// A rpc subscribe message.
struct SubscribeMessage {
/// The subscription to subscribe to.
subscription: Subscription,
/// The channel to send notifications to.
sender: mpsc::Sender<Result<Value, RpcError>>,
}

/// A message sent to the `RpcClient`.
enum RpcMessage {
/// Make a rpc call.
Call(CallMessage),
/// Subscribe to a notification.
Subscribe(SubscribeMessage),
}

impl From<CallMessage> for RpcMessage {
fn from(msg: CallMessage) -> Self {
RpcMessage::Call(msg)
}
}

impl From<SubscribeMessage> for RpcMessage {
fn from(msg: SubscribeMessage) -> Self {
RpcMessage::Subscribe(msg)
}
}

/// A channel to a `RpcClient`.
#[derive(Clone)]
pub struct RpcChannel(mpsc::Sender<RpcMessage>);
Expand Down Expand Up @@ -99,6 +136,67 @@ impl Future for RpcFuture {
}
}

/// The stream returned by a subscribe.
pub struct SubscriptionStream {
recv: mpsc::Receiver<Result<Value, RpcError>>,
}

impl SubscriptionStream {
/// Crates a new `SubscriptionStream`.
pub fn new(recv: mpsc::Receiver<Result<Value, RpcError>>) -> Self {
SubscriptionStream { recv }
}
}

impl Stream for SubscriptionStream {
type Item = Value;
type Error = RpcError;

fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
match self.recv.poll() {
Ok(Async::Ready(Some(Ok(value)))) => Ok(Async::Ready(Some(value))),
Ok(Async::Ready(Some(Err(error)))) => Err(error),
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(()) => Err(RpcError::Other(format_err!("mpsc channel returned an error."))),
}
}
}

/// A typed subscription stream.
pub struct TypedSubscriptionStream<T> {
_marker: PhantomData<T>,
returns: &'static str,
stream: SubscriptionStream,
}

impl<T> TypedSubscriptionStream<T> {
/// Creates a new `TypedSubscriptionStream`.
pub fn new(stream: SubscriptionStream, returns: &'static str) -> Self {
TypedSubscriptionStream {
_marker: PhantomData,
returns,
stream,
}
}
}

impl<T: DeserializeOwned + 'static> Stream for TypedSubscriptionStream<T> {
type Item = T;
type Error = RpcError;

fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
let result = match self.stream.poll()? {
Async::Ready(Some(value)) => serde_json::from_value::<T>(value)
.map(|result| Async::Ready(Some(result)))
.map_err(|error| RpcError::ParseError(self.returns.into(), error.into()))?,
Async::Ready(None) => Async::Ready(None),
Async::NotReady => Async::NotReady,
};
Ok(result)
}
}

/// Client for raw JSON RPC requests
#[derive(Clone)]
pub struct RawClient(RpcChannel);
Expand All @@ -113,16 +211,40 @@ impl RawClient {
/// Call RPC with raw JSON
pub fn call_method(&self, method: &str, params: Params) -> impl Future<Item = Value, Error = RpcError> {
let (sender, receiver) = oneshot::channel();
let msg = RpcMessage {
let msg = CallMessage {
method: method.into(),
params,
sender,
};
self.0
.send(msg)
.send(msg.into())
.map_err(|error| RpcError::Other(error.into()))
.and_then(|_| RpcFuture::new(receiver))
}

/// Subscribe to topic with raw JSON
pub fn subscribe(
&self,
subscribe: &str,
subscribe_params: Params,
notification: &str,
unsubscribe: &str,
) -> impl Future<Item = SubscriptionStream, Error = RpcError> {
let (sender, receiver) = mpsc::channel(0);
let msg = SubscribeMessage {
subscription: Subscription {
subscribe: subscribe.into(),
subscribe_params,
notification: notification.into(),
unsubscribe: unsubscribe.into(),
},
sender,
};
self.0
.send(msg.into())
.map_err(|error| RpcError::Other(error.into()))
.map(|_| SubscriptionStream::new(receiver))
}
}

/// Client for typed JSON RPC requests
Expand Down Expand Up @@ -167,14 +289,46 @@ impl TypedClient {
future::done(result)
}))
}

/// Subscribe with serialization of request and deserialization of response
pub fn subscribe<T: Serialize, R: DeserializeOwned + 'static>(
&self,
subscribe: &str,
subscribe_params: T,
topic: &str,
unsubscribe: &str,
returns: &'static str,
) -> impl Future<Item = TypedSubscriptionStream<R>, Error = RpcError> {
let args = serde_json::to_value(subscribe_params)
.expect("Only types with infallible serialisation can be used for JSON-RPC");

let params = match args {
Value::Array(vec) => Params::Array(vec),
Value::Null => Params::None,
_ => {
return future::Either::A(future::err(RpcError::Other(format_err!(
"RPC params should serialize to a JSON array, or null"
))))
}
};

let typed_stream = self
.0
.subscribe(subscribe, params, topic, unsubscribe)
.map(move |stream| TypedSubscriptionStream::new(stream, returns));
future::Either::B(typed_stream)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::transports::local;
use crate::{RpcChannel, RpcError, TypedClient};
use jsonrpc_core::{self, IoHandler};
use jsonrpc_core::{self as core, IoHandler};
use jsonrpc_pubsub::{PubSubHandler, Subscriber, SubscriptionId};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

#[derive(Clone)]
struct AddClient(TypedClient);
Expand All @@ -193,6 +347,7 @@ mod tests {

#[test]
fn test_client_terminates() {
crate::logger::init_log();
let mut handler = IoHandler::new();
handler.add_method("add", |params: Params| {
let (a, b) = params.parse::<(u64, u64)>()?;
Expand All @@ -215,4 +370,69 @@ mod tests {
});
tokio::run(fut);
}

#[test]
fn should_handle_subscription() {
crate::logger::init_log();
// given
let mut handler = PubSubHandler::<local::LocalMeta, _>::default();
let called = Arc::new(AtomicBool::new(false));
let called2 = called.clone();
handler.add_subscription(
"hello",
("subscribe_hello", |params, _meta, subscriber: Subscriber| {
assert_eq!(params, core::Params::None);
let sink = subscriber
.assign_id(SubscriptionId::Number(5))
.expect("assigned subscription id");
std::thread::spawn(move || {
for i in 0..3 {
std::thread::sleep(std::time::Duration::from_millis(100));
let value = serde_json::json!({
"subscription": 5,
"result": vec![i],
});
sink.notify(serde_json::from_value(value).unwrap())
.wait()
.expect("sent notification");
}
});
}),
("unsubscribe_hello", move |id, _meta| {
// Should be called because session is dropped.
called2.store(true, Ordering::SeqCst);
assert_eq!(id, SubscriptionId::Number(5));
future::ok(core::Value::Bool(true))
}),
);

// when
let (client, rpc_client) = local::connect_with_pubsub::<TypedClient, _>(handler);
let received = Arc::new(std::sync::Mutex::new(vec![]));
let r2 = received.clone();
let fut = client
.subscribe::<_, (u32,)>("subscribe_hello", (), "hello", "unsubscribe_hello", "u32")
.and_then(|stream| {
stream
.into_future()
.map(move |(result, _)| {
drop(client);
r2.lock().unwrap().push(result.unwrap());
})
.map_err(|_| {
panic!("Expected message not received.");
})
})
.join(rpc_client)
.map(|(res, _)| {
log::info!("ok {:?}", res);
})
.map_err(|err| {
log::error!("err {:?}", err);
});
tokio::run(fut);
assert_eq!(called.load(Ordering::SeqCst), true);
assert!(!received.lock().unwrap().is_empty(), "Expected at least one received item.");
}

}
Loading