Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement session handling for unsubscribe in subxt-client #242

Merged
merged 3 commits into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ client = ["substrate-subxt-client"]
log = "0.4.13"
thiserror = "1.0.23"
futures = "0.3.10"
jsonrpsee-types = "0.2.0-alpha"
jsonrpsee-ws-client = "0.2.0-alpha"
jsonrpsee-http-client = "0.2.0-alpha"
jsonrpsee-types = "0.2.0-alpha.2"
jsonrpsee-ws-client = "0.2.0-alpha.2"
jsonrpsee-http-client = "0.2.0-alpha.2"
num-traits = { version = "0.2.14", default-features = false }
serde = { version = "1.0.119", features = ["derive"] }
serde_json = "1.0.61"
Expand Down
111 changes: 82 additions & 29 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@

#![deny(missing_docs)]

use async_std::task;
use async_std::{
sync::{
Arc,
RwLock,
},
task,
};
use futures::{
channel::{
mpsc,
Expand All @@ -36,7 +42,10 @@ use futures01::sync::mpsc as mpsc01;
use jsonrpsee_types::{
client::{
FrontToBack,
NotificationMessage,
RequestMessage,
Subscription,
SubscriptionMessage,
},
error::Error as JsonRpseeError,
jsonrpc::{
Expand Down Expand Up @@ -75,7 +84,10 @@ use sc_service::{
RpcSession,
TaskManager,
};
use std::marker::PhantomData;
use std::{
collections::HashMap,
marker::PhantomData,
};
use thiserror::Error;

const DEFAULT_CHANNEL_SIZE: usize = 16;
Expand All @@ -102,15 +114,31 @@ impl SubxtClient {
pub fn new(mut task_manager: TaskManager, rpc: RpcHandlers) -> Self {
let (to_back, from_front) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

let request_id = Arc::new(RwLock::new(u64::MIN));
Copy link
Member

@niklasad1 niklasad1 Mar 9, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why you instantiate these here, inside task::spawn should work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess because the main closure is for each FrontToBack message. That said the context could be expanded although that is already an extremely indented block of code.

let subscriptions = Arc::new(RwLock::new(HashMap::<u64, String>::new()));

task::spawn(
select(
Box::pin(from_front.for_each(move |message: FrontToBack| {
let rpc = rpc.clone();
let (to_front, from_back) = mpsc01::channel(DEFAULT_CHANNEL_SIZE);
let session = RpcSession::new(to_front.clone());

let request_id = request_id.clone();
let subscriptions = subscriptions.clone();

async move {
let request_id = {
let mut request_id = request_id.write().await;
*request_id = request_id.wrapping_add(1);
*request_id
};

match message {
FrontToBack::Notification { method, params } => {
FrontToBack::Notification(NotificationMessage {
method,
params,
}) => {
let request =
Request::Single(Call::Notification(Notification {
jsonrpc: Version::V2,
Expand All @@ -122,17 +150,17 @@ impl SubxtClient {
}
}

FrontToBack::StartRequest {
FrontToBack::StartRequest(RequestMessage {
method,
params,
send_back,
} => {
}) => {
let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: method.into(),
params: params.into(),
id: Id::Num(0),
id: Id::Num(request_id),
}));
if let Ok(message) = serde_json::to_string(&request) {
if let Some(response) =
Expand All @@ -153,25 +181,31 @@ impl SubxtClient {
}
};

send_back
.send(result)
.expect("failed to send request response");
send_back.map(|tx| {
tx.send(result)
.expect("failed to send request response")
});
}
}
}

FrontToBack::Subscribe {
FrontToBack::Subscribe(SubscriptionMessage {
subscribe_method,
params,
unsubscribe_method: _,
unsubscribe_method,
send_back,
} => {
}) => {
{
let mut subscriptions = subscriptions.write().await;
subscriptions.insert(request_id, unsubscribe_method);
}

let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: subscribe_method,
params,
id: Id::Num(0),
id: Id::Num(request_id),
}));

let (mut send_front_sub, send_back_sub) =
Expand All @@ -188,10 +222,7 @@ impl SubxtClient {
Output::Success(_) => {
Ok((
send_back_sub,
// NOTE: The ID is used to unsubscribe to specific subscription
// which the `SubxtClient` doesn't support so hardcoding it to `0`
// is fine.
SubscriptionId::Num(0),
SubscriptionId::Num(request_id),
))
}
Output::Failure(failure) => {
Expand Down Expand Up @@ -219,16 +250,38 @@ impl SubxtClient {
&response
)
.expect("failed to decode subscription notif");
send_front_sub
// ignore send error since the channel is probably closed
let _ = send_front_sub
.send(notif.params.result)
.await
.expect("failed to send subscription notif")
.await;
}
});
}

FrontToBack::SubscriptionClosed(_) => {
// NOTE: unsubscriptions are not supported by SubxtClient.
FrontToBack::SubscriptionClosed(subscription_id) => {
let sub_id =
if let SubscriptionId::Num(num) = subscription_id {
num
} else {
unreachable!("subscription id should be num")
};
let json_sub_id = jsonrpc::to_value(sub_id).unwrap();

let subscriptions = subscriptions.read().await;
if let Some(unsubscribe) = subscriptions.get(&sub_id) {
let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: unsubscribe.into(),
params: jsonrpc::Params::Array(vec![
json_sub_id,
]),
id: Id::Num(request_id),
}));
if let Ok(message) = serde_json::to_string(&request) {
rpc.rpc_query(&session, &message).await;
}
}
}
}
}
Expand Down Expand Up @@ -265,10 +318,10 @@ impl SubxtClient {
{
self.to_back
.clone()
.send(FrontToBack::Notification {
.send(FrontToBack::Notification(NotificationMessage {
method: method.into(),
params: params.into(),
})
}))
.await
.map_err(|e| JsonRpseeError::TransportError(Box::new(e)))
}
Expand All @@ -288,11 +341,11 @@ impl SubxtClient {

self.to_back
.clone()
.send(FrontToBack::StartRequest {
.send(FrontToBack::StartRequest(RequestMessage {
method: method.into(),
params: params.into(),
send_back: send_back_tx,
})
send_back: Some(send_back_tx),
}))
.await
.map_err(|e| JsonRpseeError::TransportError(Box::new(e)))?;

Expand Down Expand Up @@ -324,12 +377,12 @@ impl SubxtClient {
let (send_back_tx, send_back_rx) = oneshot::channel();
self.to_back
.clone()
.send(FrontToBack::Subscribe {
.send(FrontToBack::Subscribe(SubscriptionMessage {
subscribe_method,
unsubscribe_method,
params,
send_back: send_back_tx,
})
}))
.await
.map_err(JsonRpseeError::Internal)?;

Expand Down
5 changes: 2 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,8 @@ impl<T: Runtime> ClientBuilder<T> {
let url = self.url.as_deref().unwrap_or("ws://127.0.0.1:9944");
if url.starts_with("ws://") || url.starts_with("wss://") {
let mut config = WsConfig::with_url(&url);
// max notifs per subscription capacity.
config.max_subscription_capacity = 4096;
RpcClient::WebSocket(WsClient::new(WsConfig::with_url(&url)).await?)
config.max_notifs_per_subscription = 4096;
RpcClient::WebSocket(WsClient::new(config).await?)
} else {
let client = HttpClient::new(url, HttpConfig::default())?;
RpcClient::Http(Arc::new(client))
Expand Down
2 changes: 2 additions & 0 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ pub enum RpcClient {
}

impl RpcClient {
/// Start a JSON-RPC request.
pub async fn request<T: DeserializeOwned>(
&self,
method: &str,
Expand All @@ -186,6 +187,7 @@ impl RpcClient {
}
}

/// Start a JSON-RPC Subscription.
pub async fn subscribe<T: DeserializeOwned>(
&self,
subscribe_method: &str,
Expand Down