Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rpc module): unsubscribe according ethereum pubsub spec #693

Merged
merged 5 commits into from
Feb 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 15 additions & 19 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ use crate::error::{Error, SubscriptionClosed, SubscriptionClosedReason};
use crate::id_providers::RandomIntegerIdProvider;
use crate::server::helpers::MethodSink;
use crate::server::resource_limiting::{ResourceGuard, ResourceTable, ResourceVec, Resources};
use crate::to_json_raw_value;
use crate::traits::{IdProvider, ToRpcParams};
use futures_channel::{mpsc, oneshot};
use futures_util::future::Either;
use futures_util::{future::BoxFuture, FutureExt, Stream, StreamExt};
use jsonrpsee_types::error::{invalid_subscription_err, ErrorCode, CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_types::error::{ErrorCode, CALL_EXECUTION_FAILED_CODE};
use jsonrpsee_types::{
Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionPayload, SubscriptionResponse,
};
Expand Down Expand Up @@ -573,7 +572,14 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
Ok(MethodResourcesBuilder { build: ResourceVec::new(), callback })
}

/// Register a new RPC subscription that invokes s callback on every subscription call.
/// Register a new publish/subscribe interface using JSON-RPC notifications.
///
/// It implements the [ethereum pubsub specification](https://geth.ethereum.org/docs/rpc/pubsub)
/// with an option to choose custom subscription ID generation.
///
/// Furthermore, it generates the `unsubscribe implementation` where a `bool` is used as
/// the result to indicate whether the subscription was successfully unsubscribed to or not.
/// For instance an `unsubscribe call` may fail if a non-existent subscriptionID is used in the call.
///
/// This method ensures that the `subscription_method_name` and `unsubscription_method_name` are unique.
/// The `notif_method_name` argument sets the content of the `method` field in the JSON document that
Expand All @@ -585,7 +591,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// * `subscription_method_name` - name of the method to call to initiate a subscription
/// * `notif_method_name` - name of method to be used in the subscription payload (technically a JSON-RPC notification)
/// * `unsubscription_method` - name of the method to call to terminate a subscription
/// * `callback` - A callback to invoke on each subscription; it takes three parameters:
/// * `callback` - A callback to invoke on each subscription; it takes three parameters:
/// - [`Params`]: JSON-RPC parameters in the subscription call.
/// - [`SubscriptionSink`]: A sink to send messages to the subscriber.
/// - Context: Any type that can be embedded into the [`RpcModule`].
Expand Down Expand Up @@ -681,27 +687,17 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
params,
id
);
let err =
to_json_raw_value(&"Invalid subscription ID type, must be Integer or String").ok();
return sink.send_error(id, invalid_subscription_err(err.as_deref()));
return sink.send_response(id, false);
}
};
let sub_id = sub_id.into_owned();

if subscribers
let result = subscribers
.lock()
.remove(&SubscriptionKey { conn_id: conn.conn_id, sub_id: sub_id.clone() })
.is_some()
{
sink.send_response(id, "Unsubscribed")
} else {
let err = to_json_raw_value(&format!(
"Invalid subscription ID={}",
Comment on lines -698 to -699
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only downside to this PR is that we loose this information (I think?).

With this PR, when an unsubscribe call fails, what do users get back?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, unfortunately they will just receive a bool set to false and then the client(s) have to figure it on their own :(

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another route is to actually require users to customize their unsubscribe impl as jsonrpc does but probably not worth the effort we still only support String and u64 as subscription ID.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's document this properly in the macro docs (and on this method): "When attempting to unsubscribe with an unknown subscription ID this call will return false" or something along those lines?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's better to document that we implement the ethereum pubsub specification when we refer to subscriptions and it's possible to provide custom subscription IDs but that's it.

Sure, we could add some documentation to both RpcModule::register_subscription and the macros.

serde_json::to_string(&sub_id).expect("valid JSON; qed")
))
.ok();
sink.send_error(id, invalid_subscription_err(err.as_deref()))
}
.is_some();

sink.send_response(id, result)
})),
);
}
Expand Down
1 change: 1 addition & 0 deletions proc-macros/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ jsonrpsee = { path = "../jsonrpsee", features = ["full"] }
trybuild = "1.0"
tokio = { version = "1.8", features = ["rt", "macros"] }
futures-channel = { version = "0.3.14", default-features = false }
futures-util = { version = "0.3.14", default-features = false }
45 changes: 44 additions & 1 deletion proc-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ pub(crate) mod visitor;
///
/// ### `subscription` attribute
///
/// `subscription` attribute is used to define a publish/subscribe interface according to the [ethereum pubsub specification](https://geth.ethereum.org/docs/rpc/pubsub)
///
/// **Arguments:**
///
/// - `name` (mandatory): name of the RPC method. Does not have to be the same as the Rust method name.
Expand Down Expand Up @@ -226,7 +228,37 @@ pub(crate) mod visitor;
/// #[method(name = "baz", blocking)]
/// fn blocking_method(&self) -> RpcResult<u16>;
///
/// #[subscription(name = "sub", unsubscribe = "unsub", item = String)]
/// /// Override the `foo_sub` and use `foo_subNotif` for the notifications.
/// ///
/// /// The item field indicates which type goes into result field below.
/// ///
/// /// The notification format:
/// ///
/// /// ```
/// /// {
/// /// "jsonrpc":"2.0",
/// /// "method":"foo_subNotif",
/// /// "params":["subscription":"someID", "result":"some string"]
/// /// }
/// /// ```
/// #[subscription(name = "sub" => "subNotif", unsubscribe = "unsub", item = String)]
/// fn sub_override_notif_method(&self) -> RpcResult<()>;
///
/// /// Use the same method name for both the `subscribe call` and `notifications`
/// ///
/// /// The unsubscribe method name is generated here `foo_unsubscribe`
/// /// Thus the `unsubscribe attribute` is not needed unless a custom unsubscribe method name is wanted.
/// ///
/// /// The notification format:
/// ///
/// /// ```
/// /// {
/// /// "jsonrpc":"2.0",
/// /// "method":"foo_subscribe",
/// /// "params":["subscription":"someID", "result":"some string"]
/// /// }
/// /// ```
/// #[subscription(name = "subscribe", item = String)]
/// fn sub(&self) -> RpcResult<()>;
/// }
///
Expand All @@ -252,6 +284,17 @@ pub(crate) mod visitor;
/// Ok(11)
/// }
///
/// // The stream API can be used to pipe items from the underlying stream
/// // as subscription responses.
/// fn sub_override_notif_method(&self, mut sink: SubscriptionSink) -> RpcResult<()> {
/// tokio::spawn(async move {
/// let stream = futures_util::stream::iter(["one", "two", "three"]);
/// sink.pipe_from_stream(stream).await;
/// });
///
/// Ok(())
/// }
///
/// // We could've spawned a `tokio` future that yields values while our program works,
/// // but for simplicity of the example we will only send two values and then close
/// // the subscription.
Expand Down
4 changes: 2 additions & 2 deletions proc-macros/src/rpc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ impl RpcSubscription {
let docs = extract_doc_comments(&sub.attrs);
let unsubscribe = match parse_subscribe(unsubscribe)? {
Some(unsub) => unsub,
None => build_unsubscribe_method(&name).expect(
&format!("Could not generate the unsubscribe method with name '{}'. You need to provide the name manually using the `unsubscribe` attribute in your RPC API definition", name),
None => build_unsubscribe_method(&name).unwrap_or_else(||
panic!("Could not generate the unsubscribe method with name '{}'. You need to provide the name manually using the `unsubscribe` attribute in your RPC API definition", name),
),
};

Expand Down
5 changes: 0 additions & 5 deletions types/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,11 +262,6 @@ impl CallError {
}
}

/// Create a invalid subscription ID error.
pub fn invalid_subscription_err(data: Option<&RawValue>) -> ErrorObject {
ErrorObject::new(ErrorCode::ServerError(INVALID_SUBSCRIPTION_CODE), data)
}

#[cfg(test)]
mod tests {
use super::{ErrorCode, ErrorObject, ErrorResponse, Id, TwoPointZero};
Expand Down
25 changes: 10 additions & 15 deletions ws-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,14 @@ use std::net::SocketAddr;
use std::time::Duration;

use crate::types::error::CallError;
use crate::types::{self, ErrorResponse, Response, SubscriptionId};
use crate::types::{Response, SubscriptionId};
use crate::{future::ServerHandle, RpcModule, WsServerBuilder};
use anyhow::anyhow;
use futures_util::future::join;
use jsonrpsee_core::{to_json_raw_value, traits::IdProvider, DeserializeOwned, Error};
use jsonrpsee_core::{traits::IdProvider, DeserializeOwned, Error};
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::{Id, TestContext, WebSocketTestClient, WebSocketTestError};
use jsonrpsee_test_utils::TimeoutFutureExt;
use jsonrpsee_types::error::invalid_subscription_err;
use serde_json::Value as JsonValue;
use tracing_subscriber::{EnvFilter, FmtSubscriber};

Expand Down Expand Up @@ -606,16 +605,13 @@ async fn unsubscribe_twice_should_indicate_error() {
let sub_id: u64 = deser_call(client.send_request_text(sub_call).await.unwrap());

let unsub_call = call("unsubscribe_hello", vec![sub_id], Id::Num(1));
let unsub_1: String = deser_call(client.send_request_text(unsub_call).await.unwrap());
assert_eq!(&unsub_1, "Unsubscribed");
let unsub_1: bool = deser_call(client.send_request_text(unsub_call).await.unwrap());
assert!(unsub_1);

let unsub_call = call("unsubscribe_hello", vec![sub_id], Id::Num(2));
let unsub_2 = client.send_request_text(unsub_call).await.unwrap();
let unsub_2_err: ErrorResponse = serde_json::from_str(&unsub_2).unwrap();
let sub_id = to_json_raw_value(&sub_id).unwrap();
let unsub_2: bool = deser_call(client.send_request_text(unsub_call).await.unwrap());

let err = Some(to_json_raw_value(&format!("Invalid subscription ID={}", sub_id)).unwrap());
assert_eq!(unsub_2_err, ErrorResponse::new(invalid_subscription_err(err.as_deref()), types::Id::Number(2)));
assert!(!unsub_2);
}

#[tokio::test]
Expand All @@ -624,10 +620,9 @@ async fn unsubscribe_wrong_sub_id_type() {
let addr = server().await;
let mut client = WebSocketTestClient::new(addr).with_default_timeout().await.unwrap().unwrap();

let unsub = client.send_request_text(call("unsubscribe_hello", vec![13.99_f64], Id::Num(0))).await.unwrap();
let unsub_2_err: ErrorResponse = serde_json::from_str(&unsub).unwrap();
let err = Some(to_json_raw_value(&"Invalid subscription ID type, must be Integer or String").unwrap());
assert_eq!(unsub_2_err, ErrorResponse::new(invalid_subscription_err(err.as_deref()), types::Id::Number(0)));
let unsub: bool =
deser_call(client.send_request_text(call("unsubscribe_hello", vec![13.99_f64], Id::Num(0))).await.unwrap());
assert!(!unsub);
}

#[tokio::test]
Expand Down Expand Up @@ -667,5 +662,5 @@ async fn custom_subscription_id_works() {
let sub = client.send_request_text(call("subscribe_hello", Vec::<()>::new(), Id::Num(0))).await.unwrap();
assert_eq!(&sub, r#"{"jsonrpc":"2.0","result":"0xdeadbeef","id":0}"#);
let unsub = client.send_request_text(call("unsubscribe_hello", vec!["0xdeadbeef"], Id::Num(1))).await.unwrap();
assert_eq!(&unsub, r#"{"jsonrpc":"2.0","result":"Unsubscribed","id":1}"#);
assert_eq!(&unsub, r#"{"jsonrpc":"2.0","result":true,"id":1}"#);
}