Skip to content

Commit

Permalink
implement session handling for unsubscribe in subxt-client (paritytec…
Browse files Browse the repository at this point in the history
…h#242)

* implement session handling for unsubscribe in subxt-client

Signed-off-by: Gregory Hill <gregorydhill@outlook.com>

* update jsonrpsee to `v0.2.0-alpha.2`

Closes paritytech#241

* use new jsonrpsee request message types in subxt client

Signed-off-by: Gregory Hill <gregorydhill@outlook.com>

Co-authored-by: Niklas Adolfsson <niklasadolfsson1@gmail.com>
  • Loading branch information
gregdhill and niklasad1 authored Mar 9, 2021
1 parent 9959f0d commit c1d4804
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 35 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ client = ["substrate-subxt-client"]
log = "0.4.13"
thiserror = "1.0.23"
futures = "0.3.10"
jsonrpsee-types = "0.2.0-alpha"
jsonrpsee-ws-client = "0.2.0-alpha"
jsonrpsee-http-client = "0.2.0-alpha"
jsonrpsee-types = "0.2.0-alpha.2"
jsonrpsee-ws-client = "0.2.0-alpha.2"
jsonrpsee-http-client = "0.2.0-alpha.2"
num-traits = { version = "0.2.14", default-features = false }
serde = { version = "1.0.119", features = ["derive"] }
serde_json = "1.0.61"
Expand Down
111 changes: 82 additions & 29 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@

#![deny(missing_docs)]

use async_std::task;
use async_std::{
sync::{
Arc,
RwLock,
},
task,
};
use futures::{
channel::{
mpsc,
Expand All @@ -36,7 +42,10 @@ use futures01::sync::mpsc as mpsc01;
use jsonrpsee_types::{
client::{
FrontToBack,
NotificationMessage,
RequestMessage,
Subscription,
SubscriptionMessage,
},
error::Error as JsonRpseeError,
jsonrpc::{
Expand Down Expand Up @@ -75,7 +84,10 @@ use sc_service::{
RpcSession,
TaskManager,
};
use std::marker::PhantomData;
use std::{
collections::HashMap,
marker::PhantomData,
};
use thiserror::Error;

const DEFAULT_CHANNEL_SIZE: usize = 16;
Expand All @@ -102,15 +114,31 @@ impl SubxtClient {
pub fn new(mut task_manager: TaskManager, rpc: RpcHandlers) -> Self {
let (to_back, from_front) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

let request_id = Arc::new(RwLock::new(u64::MIN));
let subscriptions = Arc::new(RwLock::new(HashMap::<u64, String>::new()));

task::spawn(
select(
Box::pin(from_front.for_each(move |message: FrontToBack| {
let rpc = rpc.clone();
let (to_front, from_back) = mpsc01::channel(DEFAULT_CHANNEL_SIZE);
let session = RpcSession::new(to_front.clone());

let request_id = request_id.clone();
let subscriptions = subscriptions.clone();

async move {
let request_id = {
let mut request_id = request_id.write().await;
*request_id = request_id.wrapping_add(1);
*request_id
};

match message {
FrontToBack::Notification { method, params } => {
FrontToBack::Notification(NotificationMessage {
method,
params,
}) => {
let request =
Request::Single(Call::Notification(Notification {
jsonrpc: Version::V2,
Expand All @@ -122,17 +150,17 @@ impl SubxtClient {
}
}

FrontToBack::StartRequest {
FrontToBack::StartRequest(RequestMessage {
method,
params,
send_back,
} => {
}) => {
let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: method.into(),
params: params.into(),
id: Id::Num(0),
id: Id::Num(request_id),
}));
if let Ok(message) = serde_json::to_string(&request) {
if let Some(response) =
Expand All @@ -153,25 +181,31 @@ impl SubxtClient {
}
};

send_back
.send(result)
.expect("failed to send request response");
send_back.map(|tx| {
tx.send(result)
.expect("failed to send request response")
});
}
}
}

FrontToBack::Subscribe {
FrontToBack::Subscribe(SubscriptionMessage {
subscribe_method,
params,
unsubscribe_method: _,
unsubscribe_method,
send_back,
} => {
}) => {
{
let mut subscriptions = subscriptions.write().await;
subscriptions.insert(request_id, unsubscribe_method);
}

let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: subscribe_method,
params,
id: Id::Num(0),
id: Id::Num(request_id),
}));

let (mut send_front_sub, send_back_sub) =
Expand All @@ -188,10 +222,7 @@ impl SubxtClient {
Output::Success(_) => {
Ok((
send_back_sub,
// NOTE: The ID is used to unsubscribe to specific subscription
// which the `SubxtClient` doesn't support so hardcoding it to `0`
// is fine.
SubscriptionId::Num(0),
SubscriptionId::Num(request_id),
))
}
Output::Failure(failure) => {
Expand Down Expand Up @@ -219,16 +250,38 @@ impl SubxtClient {
&response
)
.expect("failed to decode subscription notif");
send_front_sub
// ignore send error since the channel is probably closed
let _ = send_front_sub
.send(notif.params.result)
.await
.expect("failed to send subscription notif")
.await;
}
});
}

FrontToBack::SubscriptionClosed(_) => {
// NOTE: unsubscriptions are not supported by SubxtClient.
FrontToBack::SubscriptionClosed(subscription_id) => {
let sub_id =
if let SubscriptionId::Num(num) = subscription_id {
num
} else {
unreachable!("subscription id should be num")
};
let json_sub_id = jsonrpc::to_value(sub_id).unwrap();

let subscriptions = subscriptions.read().await;
if let Some(unsubscribe) = subscriptions.get(&sub_id) {
let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: unsubscribe.into(),
params: jsonrpc::Params::Array(vec![
json_sub_id,
]),
id: Id::Num(request_id),
}));
if let Ok(message) = serde_json::to_string(&request) {
rpc.rpc_query(&session, &message).await;
}
}
}
}
}
Expand Down Expand Up @@ -265,10 +318,10 @@ impl SubxtClient {
{
self.to_back
.clone()
.send(FrontToBack::Notification {
.send(FrontToBack::Notification(NotificationMessage {
method: method.into(),
params: params.into(),
})
}))
.await
.map_err(|e| JsonRpseeError::TransportError(Box::new(e)))
}
Expand All @@ -288,11 +341,11 @@ impl SubxtClient {

self.to_back
.clone()
.send(FrontToBack::StartRequest {
.send(FrontToBack::StartRequest(RequestMessage {
method: method.into(),
params: params.into(),
send_back: send_back_tx,
})
send_back: Some(send_back_tx),
}))
.await
.map_err(|e| JsonRpseeError::TransportError(Box::new(e)))?;

Expand Down Expand Up @@ -324,12 +377,12 @@ impl SubxtClient {
let (send_back_tx, send_back_rx) = oneshot::channel();
self.to_back
.clone()
.send(FrontToBack::Subscribe {
.send(FrontToBack::Subscribe(SubscriptionMessage {
subscribe_method,
unsubscribe_method,
params,
send_back: send_back_tx,
})
}))
.await
.map_err(JsonRpseeError::Internal)?;

Expand Down
5 changes: 2 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,8 @@ impl<T: Runtime> ClientBuilder<T> {
let url = self.url.as_deref().unwrap_or("ws://127.0.0.1:9944");
if url.starts_with("ws://") || url.starts_with("wss://") {
let mut config = WsConfig::with_url(&url);
// max notifs per subscription capacity.
config.max_subscription_capacity = 4096;
RpcClient::WebSocket(WsClient::new(WsConfig::with_url(&url)).await?)
config.max_notifs_per_subscription = 4096;
RpcClient::WebSocket(WsClient::new(config).await?)
} else {
let client = HttpClient::new(url, HttpConfig::default())?;
RpcClient::Http(Arc::new(client))
Expand Down
2 changes: 2 additions & 0 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ pub enum RpcClient {
}

impl RpcClient {
/// Start a JSON-RPC request.
pub async fn request<T: DeserializeOwned>(
&self,
method: &str,
Expand All @@ -186,6 +187,7 @@ impl RpcClient {
}
}

/// Start a JSON-RPC Subscription.
pub async fn subscribe<T: DeserializeOwned>(
&self,
subscribe_method: &str,
Expand Down

0 comments on commit c1d4804

Please sign in to comment.