-
Notifications
You must be signed in to change notification settings - Fork 274
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement subscription support in the client #436
Merged
Merged
Changes from 33 commits
Commits
Show all changes
39 commits
Select commit
Hold shift + click to select a range
222bd50
Rustfmt fixes.
dvc94ch 42ddba9
Enable logging in tests.
dvc94ch 44ab59a
Improve pubsub example documentation.
dvc94ch 524e778
Allow Response to contain a Notification.
dvc94ch 38d5022
Add jsonrpc-pubsub as a core-client dependency.
dvc94ch 1fd5fb9
Add pubsub support to local transport.
dvc94ch 0917910
Turn RpcMessage into an enum.
dvc94ch 6fea32b
Add SubscriptionStream.
dvc94ch 0ce7ce2
Add subscription support to RawClient.
dvc94ch ecf8b3a
Add subscription support to RequestBuilder.
dvc94ch 8fdde0d
Add subscription support to Duplex.
dvc94ch 4cdb76c
Add subscription test.
dvc94ch cc40350
Add TypedSubscriptionStream.
dvc94ch da986c8
Add subscription support to TypedClient.
dvc94ch fb5cdfb
Test typed client subscription.
dvc94ch bc8feaa
Add subscription support to procmacro.
dvc94ch 82b7a96
Handle typed::Subscriber.
dvc94ch 553a371
Address grumbles.
dvc94ch ce8cf50
rustfmt fixes.
dvc94ch 831b8c6
Fix tests.
dvc94ch f25fd78
Avoid unwrapping.
dvc94ch 76539b3
Fix doc tests.
dvc94ch 2966888
Impl From instead of Into.
dvc94ch fdf38ac
Improve code.
dvc94ch 6d53332
Deny warnings and missing docs.
dvc94ch 7b1a512
Fix explicit dyn warning.
dvc94ch 22094f7
Implement Debug for Duplex.
dvc94ch d989870
Remove allow(deprecated).
dvc94ch 48e7363
Fix build.
dvc94ch 8db12d2
Fix build on windows.
dvc94ch 1daed1d
Add constructor to LocalRpc.
dvc94ch 027d707
Parse output into subscription id.
dvc94ch 6662f13
Should handle multiple subscriptions.
dvc94ch 108ed61
Add Deserialize bound for subscription args in client
ascjones df467ff
Merge pull request #1 from ascjones/aj-client-pubsub
dvc94ch eb918b4
Remove deny(warnings)
tomusdrw 9f4a0a1
Rewrite using less maps.
tomusdrw 2c795e4
Merge pull request #2 from tomusdrw/td-fix
dvc94ch ce823e6
Bounds for wrapped Subscriber generic types (#3)
ascjones File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
//! JSON-RPC client implementation. | ||
|
||
#![deny(missing_docs)] | ||
#![deny(warnings)] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||
|
||
use failure::{format_err, Fail}; | ||
use futures::sync::{mpsc, oneshot}; | ||
|
@@ -9,6 +10,7 @@ use jsonrpc_core::{Error, Params}; | |
use serde::de::DeserializeOwned; | ||
use serde::Serialize; | ||
use serde_json::Value; | ||
use std::marker::PhantomData; | ||
|
||
pub mod transports; | ||
|
||
|
@@ -27,9 +29,6 @@ pub enum RpcError { | |
/// Request timed out. | ||
#[fail(display = "Request timed out")] | ||
Timeout, | ||
/// The server returned a response with an unknown id. | ||
#[fail(display = "Server returned a response with an unknown id")] | ||
UnknownId, | ||
/// Not rpc specific errors. | ||
#[fail(display = "{}", _0)] | ||
Other(failure::Error), | ||
|
@@ -41,9 +40,8 @@ impl From<Error> for RpcError { | |
} | ||
} | ||
|
||
/// A message sent to the `RpcClient`. This is public so that | ||
/// the derive crate can generate a client. | ||
struct RpcMessage { | ||
/// A rpc call message. | ||
struct CallMessage { | ||
/// The rpc method name. | ||
method: String, | ||
/// The rpc method parameters. | ||
|
@@ -53,6 +51,46 @@ struct RpcMessage { | |
sender: oneshot::Sender<Result<Value, RpcError>>, | ||
} | ||
|
||
/// A rpc subscription. | ||
struct Subscription { | ||
/// The subscribe method name. | ||
subscribe: String, | ||
/// The subscribe method parameters. | ||
subscribe_params: Params, | ||
/// The name of the notification. | ||
notification: String, | ||
/// The unsubscribe method name. | ||
unsubscribe: String, | ||
} | ||
|
||
/// A rpc subscribe message. | ||
struct SubscribeMessage { | ||
/// The subscription to subscribe to. | ||
subscription: Subscription, | ||
/// The channel to send notifications to. | ||
sender: mpsc::Sender<Result<Value, RpcError>>, | ||
} | ||
|
||
/// A message sent to the `RpcClient`. | ||
enum RpcMessage { | ||
/// Make a rpc call. | ||
Call(CallMessage), | ||
/// Subscribe to a notification. | ||
Subscribe(SubscribeMessage), | ||
} | ||
|
||
impl From<CallMessage> for RpcMessage { | ||
fn from(msg: CallMessage) -> Self { | ||
RpcMessage::Call(msg) | ||
} | ||
} | ||
|
||
impl From<SubscribeMessage> for RpcMessage { | ||
fn from(msg: SubscribeMessage) -> Self { | ||
RpcMessage::Subscribe(msg) | ||
} | ||
} | ||
|
||
/// A channel to a `RpcClient`. | ||
#[derive(Clone)] | ||
pub struct RpcChannel(mpsc::Sender<RpcMessage>); | ||
|
@@ -99,6 +137,67 @@ impl Future for RpcFuture { | |
} | ||
} | ||
|
||
/// The stream returned by a subscribe. | ||
pub struct SubscriptionStream { | ||
recv: mpsc::Receiver<Result<Value, RpcError>>, | ||
} | ||
|
||
impl SubscriptionStream { | ||
/// Crates a new `SubscriptionStream`. | ||
pub fn new(recv: mpsc::Receiver<Result<Value, RpcError>>) -> Self { | ||
SubscriptionStream { recv } | ||
} | ||
} | ||
|
||
impl Stream for SubscriptionStream { | ||
type Item = Value; | ||
type Error = RpcError; | ||
|
||
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { | ||
match self.recv.poll() { | ||
Ok(Async::Ready(Some(Ok(value)))) => Ok(Async::Ready(Some(value))), | ||
Ok(Async::Ready(Some(Err(error)))) => Err(error), | ||
Ok(Async::Ready(None)) => Ok(Async::Ready(None)), | ||
Ok(Async::NotReady) => Ok(Async::NotReady), | ||
Err(()) => Err(RpcError::Other(format_err!("mpsc channel returned an error."))), | ||
} | ||
} | ||
} | ||
|
||
/// A typed subscription stream. | ||
pub struct TypedSubscriptionStream<T> { | ||
_marker: PhantomData<T>, | ||
returns: &'static str, | ||
stream: SubscriptionStream, | ||
} | ||
|
||
impl<T> TypedSubscriptionStream<T> { | ||
/// Creates a new `TypedSubscriptionStream`. | ||
pub fn new(stream: SubscriptionStream, returns: &'static str) -> Self { | ||
TypedSubscriptionStream { | ||
_marker: PhantomData, | ||
returns, | ||
stream, | ||
} | ||
} | ||
} | ||
|
||
impl<T: DeserializeOwned + 'static> Stream for TypedSubscriptionStream<T> { | ||
type Item = T; | ||
type Error = RpcError; | ||
|
||
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> { | ||
let result = match self.stream.poll()? { | ||
Async::Ready(Some(value)) => serde_json::from_value::<T>(value) | ||
.map(|result| Async::Ready(Some(result))) | ||
.map_err(|error| RpcError::ParseError(self.returns.into(), error.into()))?, | ||
Async::Ready(None) => Async::Ready(None), | ||
Async::NotReady => Async::NotReady, | ||
}; | ||
Ok(result) | ||
} | ||
} | ||
|
||
/// Client for raw JSON RPC requests | ||
#[derive(Clone)] | ||
pub struct RawClient(RpcChannel); | ||
|
@@ -113,16 +212,40 @@ impl RawClient { | |
/// Call RPC with raw JSON | ||
pub fn call_method(&self, method: &str, params: Params) -> impl Future<Item = Value, Error = RpcError> { | ||
let (sender, receiver) = oneshot::channel(); | ||
let msg = RpcMessage { | ||
let msg = CallMessage { | ||
method: method.into(), | ||
params, | ||
sender, | ||
}; | ||
self.0 | ||
.send(msg) | ||
.send(msg.into()) | ||
.map_err(|error| RpcError::Other(error.into())) | ||
.and_then(|_| RpcFuture::new(receiver)) | ||
} | ||
|
||
/// Subscribe to topic with raw JSON | ||
pub fn subscribe( | ||
&self, | ||
subscribe: &str, | ||
subscribe_params: Params, | ||
notification: &str, | ||
unsubscribe: &str, | ||
) -> impl Future<Item = SubscriptionStream, Error = RpcError> { | ||
let (sender, receiver) = mpsc::channel(0); | ||
let msg = SubscribeMessage { | ||
subscription: Subscription { | ||
subscribe: subscribe.into(), | ||
subscribe_params, | ||
notification: notification.into(), | ||
unsubscribe: unsubscribe.into(), | ||
}, | ||
sender, | ||
}; | ||
self.0 | ||
.send(msg.into()) | ||
.map_err(|error| RpcError::Other(error.into())) | ||
.map(|_| SubscriptionStream::new(receiver)) | ||
} | ||
} | ||
|
||
/// Client for typed JSON RPC requests | ||
|
@@ -167,14 +290,46 @@ impl TypedClient { | |
future::done(result) | ||
})) | ||
} | ||
|
||
/// Subscribe with serialization of request and deserialization of response | ||
pub fn subscribe<T: Serialize, R: DeserializeOwned + 'static>( | ||
&self, | ||
subscribe: &str, | ||
subscribe_params: T, | ||
topic: &str, | ||
unsubscribe: &str, | ||
returns: &'static str, | ||
) -> impl Future<Item = TypedSubscriptionStream<R>, Error = RpcError> { | ||
let args = serde_json::to_value(subscribe_params) | ||
.expect("Only types with infallible serialisation can be used for JSON-RPC"); | ||
|
||
let params = match args { | ||
Value::Array(vec) => Params::Array(vec), | ||
Value::Null => Params::None, | ||
_ => { | ||
return future::Either::A(future::err(RpcError::Other(format_err!( | ||
"RPC params should serialize to a JSON array, or null" | ||
)))) | ||
} | ||
}; | ||
|
||
let typed_stream = self | ||
.0 | ||
.subscribe(subscribe, params, topic, unsubscribe) | ||
.map(move |stream| TypedSubscriptionStream::new(stream, returns)); | ||
future::Either::B(typed_stream) | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
use crate::transports::local; | ||
use crate::{RpcChannel, RpcError, TypedClient}; | ||
use jsonrpc_core::{self, IoHandler}; | ||
use jsonrpc_core::{self as core, IoHandler}; | ||
use jsonrpc_pubsub::{PubSubHandler, Subscriber, SubscriptionId}; | ||
use std::sync::atomic::{AtomicBool, Ordering}; | ||
use std::sync::Arc; | ||
|
||
#[derive(Clone)] | ||
struct AddClient(TypedClient); | ||
|
@@ -193,6 +348,7 @@ mod tests { | |
|
||
#[test] | ||
fn test_client_terminates() { | ||
crate::logger::init_log(); | ||
let mut handler = IoHandler::new(); | ||
handler.add_method("add", |params: Params| { | ||
let (a, b) = params.parse::<(u64, u64)>()?; | ||
|
@@ -215,4 +371,64 @@ mod tests { | |
}); | ||
tokio::run(fut); | ||
} | ||
|
||
#[test] | ||
fn should_handle_subscription() { | ||
crate::logger::init_log(); | ||
// given | ||
let mut handler = PubSubHandler::<local::LocalMeta, _>::default(); | ||
let called = Arc::new(AtomicBool::new(false)); | ||
let called2 = called.clone(); | ||
handler.add_subscription( | ||
"hello", | ||
("subscribe_hello", |params, _meta, subscriber: Subscriber| { | ||
assert_eq!(params, core::Params::None); | ||
let sink = subscriber | ||
.assign_id(SubscriptionId::Number(5)) | ||
.expect("assigned subscription id"); | ||
std::thread::spawn(move || { | ||
for i in 0..3 { | ||
std::thread::sleep(std::time::Duration::from_millis(100)); | ||
let value = serde_json::json!({ | ||
"subscription": 5, | ||
"result": vec![i], | ||
}); | ||
sink.notify(serde_json::from_value(value).unwrap()) | ||
.wait() | ||
.expect("sent notification"); | ||
} | ||
}); | ||
}), | ||
("unsubscribe_hello", move |id, _meta| { | ||
// Should be called because session is dropped. | ||
called2.store(true, Ordering::SeqCst); | ||
assert_eq!(id, SubscriptionId::Number(5)); | ||
future::ok(core::Value::Bool(true)) | ||
}), | ||
); | ||
|
||
// when | ||
let (client, rpc_client) = local::connect_with_pubsub::<TypedClient, _>(handler); | ||
let fut = client | ||
.subscribe::<_, (u32,)>("subscribe_hello", (), "hello", "unsubscribe_hello", "u32") | ||
.and_then(|stream| { | ||
stream | ||
.into_future() | ||
.map(move |(result, _)| { | ||
drop(client); | ||
result.unwrap() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we check that the message is one of the three we're sending? |
||
}) | ||
.map_err(|(err, _)| err) | ||
}) | ||
.join(rpc_client) | ||
.map(|(res, _)| { | ||
log::info!("ok {:?}", res); | ||
}) | ||
.map_err(|err| { | ||
log::error!("err {:?}", err); | ||
}); | ||
tokio::run(fut); | ||
assert_eq!(called.load(Ordering::SeqCst), true); | ||
} | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't
deny(warnings)
to prevent uncontrolled build failures in newer versions of the compiler or external crates. Please remove.