Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc_module: SubscriptionSink::close -> SubscriptionSink::close_with_error #1013

Merged
merged 5 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 38 additions & 23 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::{SubscriptionCallbackError, SubscriptionResult};
use futures_util::future::Either;
use futures_util::{future::BoxFuture, FutureExt};
use jsonrpsee_types::error::{CallError, ErrorCode, ErrorObject, ErrorObjectOwned};
use jsonrpsee_types::response::{SubscriptionError, SubscriptionPayloadError};
use jsonrpsee_types::response::SubscriptionError;
use jsonrpsee_types::{
ErrorResponse, Id, Params, Request, Response, SubscriptionId as RpcSubscriptionId, SubscriptionResponse,
};
Expand Down Expand Up @@ -78,7 +78,7 @@ pub type MaxResponseSize = usize;
/// - a [`crate::server::helpers::SubscriptionPermit`] to allow subscribers to notify their [`SubscriptionSink`] when they disconnect.
pub type RawRpcResponse = (MethodResponse, mpsc::Receiver<String>, SubscriptionPermit, mpsc::Sender<String>);

/// Error that may occur during `SubscriptionSink::try_send`.
/// Error that may occur during [`SubscriptionSink::try_send`].
#[derive(Debug)]
pub enum TrySendError {
/// The channel is closed.
Expand Down Expand Up @@ -290,6 +290,20 @@ pub enum MethodResult<T> {
Async(BoxFuture<'static, T>),
}

enum SubNotifResultOrError {
Result,
Error,
}

impl SubNotifResultOrError {
const fn as_str(&self) -> &str {
match self {
Self::Result => "result",
Self::Error => "error",
}
}
}

impl<T: Debug> Debug for MethodResult<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Expand Down Expand Up @@ -1036,7 +1050,7 @@ impl SubscriptionSink {
return Err(DisconnectError(msg));
}

let json = self.sub_message_to_json(msg);
let json = self.sub_message_to_json(msg, SubNotifResultOrError::Result);
self.inner.send(json).await.map_err(Into::into)
}

Expand All @@ -1047,7 +1061,7 @@ impl SubscriptionSink {
return Err(SendTimeoutError::Closed(msg));
}

let json = self.sub_message_to_json(msg);
let json = self.sub_message_to_json(msg, SubNotifResultOrError::Result);
self.inner.send_timeout(json, timeout).await.map_err(Into::into)
}

Expand All @@ -1063,7 +1077,7 @@ impl SubscriptionSink {
return Err(TrySendError::Closed(msg));
}

let json = self.sub_message_to_json(msg);
let json = self.sub_message_to_json(msg, SubNotifResultOrError::Result);
self.inner.try_send(json).map_err(Into::into)
}

Expand All @@ -1081,33 +1095,29 @@ impl SubscriptionSink {
}
}

fn sub_message_to_json(&self, msg: SubscriptionMessage) -> String {
fn sub_message_to_json(&self, msg: SubscriptionMessage, result_or_err: SubNotifResultOrError) -> String {
let result_or_err = result_or_err.as_str();

match msg.0 {
SubscriptionMessageInner::Complete(msg) => msg,
SubscriptionMessageInner::NeedsData(result) => {
let sub_id = serde_json::to_string(&self.uniq_sub.sub_id).expect("valid JSON; qed");
let method = self.method;
format!(
r#"{{"jsonrpc":"2.0","method":"{method}","params":{{"subscription":{sub_id},"result":{result}}}}}"#,
r#"{{"jsonrpc":"2.0","method":"{method}","params":{{"subscription":{sub_id},"{result_or_err}":{result}}}}}"#,
)
}
}
}

fn build_error_message<T: Serialize>(&self, error: &T) -> Result<String, serde_json::Error> {
serde_json::to_string(&SubscriptionError::new(
self.method.into(),
SubscriptionPayloadError { subscription: self.uniq_sub.sub_id.clone(), error },
))
.map_err(Into::into)
}

/// Close the subscription, sending a notification with a special `error` field containing the provided error.
/// Close the subscription, sending a notification with a special `error` field containing the provided close reason.
///
/// This can be used to signal an actual error, or just to signal that the subscription has been closed,
/// depending on your preference.
/// This can be used to signal that an subscription was closed because of some particular state
/// and doesn't imply that subscription was closed because of an error occurred. Just
/// a custom way to indicate to the client that the subscription was closed.
///
/// If you'd like to to close the subscription without sending an error, just drop it and don't call this method.
/// If you'd like to to close the subscription without sending an extra notification,
/// just drop it and don't call this method.
///
///
/// ```json
Expand All @@ -1116,20 +1126,25 @@ impl SubscriptionSink {
/// "method": "<method>",
/// "params": {
/// "subscription": "<subscriptionID>",
/// "error": { "code": <code from error>, "message": <message from error>, "data": <data from error> }
/// "error": <your msg>
/// }
/// }
/// }
/// ```
///
pub fn close(self, err: impl Into<ErrorObjectOwned>) -> impl Future<Output = ()> {
pub fn close_with_error(self, msg: SubscriptionMessage) -> impl Future<Output = ()> {
self.inner_close(msg, SubNotifResultOrError::Error)
}

fn inner_close(self, msg: SubscriptionMessage, result_or_err: SubNotifResultOrError) -> impl Future<Output = ()> {
if self.is_active_subscription() {
if let Some((sink, _)) = self.subscribers.lock().remove(&self.uniq_sub) {
tracing::debug!("Closing subscription: {:?}", self.uniq_sub.sub_id);

let msg = self.build_error_message(&err.into()).expect("valid json infallible; qed");
let msg = self.sub_message_to_json(msg, result_or_err);

return Either::Right(async move {
// This only fails if the connection was closed
// Fine to ignore
let _ = sink.send(msg).await;
});
}
Expand Down
36 changes: 19 additions & 17 deletions examples/examples/ws_pubsub_with_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use jsonrpsee::core::client::{Subscription, SubscriptionClientT};
use jsonrpsee::core::server::rpc_module::{SubscriptionMessage, TrySendError};
use jsonrpsee::core::{Serialize, SubscriptionResult};
use jsonrpsee::server::{RpcModule, ServerBuilder};
use jsonrpsee::types::{ErrorObject, ErrorObjectOwned};
use jsonrpsee::types::ErrorObjectOwned;
use jsonrpsee::ws_client::WsClientBuilder;
use jsonrpsee::{rpc_params, PendingSubscriptionSink};
use tokio::time::interval;
Expand Down Expand Up @@ -111,40 +111,42 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
Ok(addr)
}

async fn pipe_from_stream_and_drop<S, T>(pending: PendingSubscriptionSink, mut stream: S) -> SubscriptionResult
where
S: Stream<Item = T> + Unpin,
T: Serialize,
{
pub async fn pipe_from_stream_and_drop<T: Serialize>(
pending: PendingSubscriptionSink,
mut stream: impl Stream<Item = T> + Unpin,
) -> SubscriptionResult {
let mut sink = pending.accept().await?;

loop {
let msg = loop {
tokio::select! {
_ = sink.closed() => break,
_ = sink.closed() => break "Subscription was closed".to_string(),
maybe_item = stream.next() => {
let item = match maybe_item {
Some(item) => item,
None => break,
None => break "Subscription executed successful".to_string(),
};
let msg = match SubscriptionMessage::from_json(&item) {
Ok(msg) => msg,
Err(e) => {
sink.close(ErrorObject::owned(1, e.to_string(), None::<()>)).await;
return Err(e.into());
}
Err(e) => break e.to_string(),
};

match sink.try_send(msg) {
Ok(_) => (),
Err(TrySendError::Closed(_)) => break,
Err(TrySendError::Closed(_)) => break "Subscription was closed".to_string(),
// channel is full, let's be naive an just drop the message.
Err(TrySendError::Full(_)) => (),
}
}
}
}

sink.close(ErrorObject::owned(1, "Ok", None::<()>)).await;
};

// NOTE: we are using `close_with_error` for the jsonrpsee client to terminate the subscription
// when the "close message" is received without any custom logic.
//
// Otherwise, the subscription would need custom logic on the client side
// for example with dedicate states/different messages to know whether the
// server closed just the particular subscription.
sink.close_with_error(SubscriptionMessage::from_json(&msg).unwrap()).await;

Ok(())
}
21 changes: 9 additions & 12 deletions tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ pub async fn server_with_subscription_and_handle() -> (SocketAddr, ServerHandle)
.register_subscription("subscribe_noop", "subscribe_noop", "unsubscribe_noop", |_, pending, _| async {
let sink = pending.accept().await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let err = ErrorObject::owned(1, "Server closed the stream because it was lazy", None::<()>);
sink.close(err).await;
let err = ErrorObject::borrowed(1, &"Server closed the stream because it was lazy", None);
sink.close_with_error(SubscriptionMessage::from_json(&err).unwrap()).await;

Ok(())
})
Expand Down Expand Up @@ -224,33 +224,30 @@ pub async fn pipe_from_stream_and_drop<T: Serialize>(
) -> SubscriptionResult {
let mut sink = pending.accept().await?;

loop {
let msg = loop {
tokio::select! {
_ = sink.closed() => break,
_ = sink.closed() => break "Subscription was closed".to_string(),
maybe_item = stream.next() => {
let item = match maybe_item {
Some(item) => item,
None => break,
None => break "Subscription executed successful".to_string(),
};
let msg = match SubscriptionMessage::from_json(&item) {
Ok(msg) => msg,
Err(e) => {
sink.close(ErrorObject::owned(1, e.to_string(), None::<()>)).await;
return Err(e.into());
}
Err(e) => break e.to_string(),
};

match sink.try_send(msg) {
Ok(_) => (),
Err(TrySendError::Closed(_)) => break,
Err(TrySendError::Closed(_)) => break "Subscription was closed".to_string(),
// channel is full, let's be naive an just drop the message.
Err(TrySendError::Full(_)) => (),
}
}
}
}
};

sink.close(ErrorObject::owned(1, "Subscription executed successful", None::<()>)).await;
sink.close_with_error(SubscriptionMessage::from_json(&msg).unwrap()).await;

Ok(())
}
2 changes: 1 addition & 1 deletion tests/tests/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ async fn subscribing_without_server() {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
let close = ErrorObject::borrowed(0, &"closed successfully", None);
let _ = sink.close(close.into_owned()).await;
let _ = sink.close_with_error(SubscriptionMessage::from_json(&close).unwrap()).await;

Ok(())
})
Expand Down