Skip to content

Commit

Permalink
implement session handling for unsubscribe in subxt-client
Browse files Browse the repository at this point in the history
Signed-off-by: Gregory Hill <gregorydhill@outlook.com>
  • Loading branch information
gregdhill committed Mar 9, 2021
1 parent 9959f0d commit 05fe7ce
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 14 deletions.
85 changes: 71 additions & 14 deletions client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
#![deny(missing_docs)]

use async_std::task;
use async_std::{
sync::{
Arc,
RwLock,
},
task,
};
use futures::{
channel::{
mpsc,
Expand Down Expand Up @@ -75,7 +81,10 @@ use sc_service::{
RpcSession,
TaskManager,
};
use std::marker::PhantomData;
use std::{
collections::HashMap,
marker::PhantomData,
};
use thiserror::Error;

const DEFAULT_CHANNEL_SIZE: usize = 16;
Expand All @@ -102,12 +111,19 @@ impl SubxtClient {
pub fn new(mut task_manager: TaskManager, rpc: RpcHandlers) -> Self {
let (to_back, from_front) = mpsc::channel(DEFAULT_CHANNEL_SIZE);

let request_id = Arc::new(RwLock::new(u64::MIN));
let subscriptions = Arc::new(RwLock::new(HashMap::<u64, String>::new()));

task::spawn(
select(
Box::pin(from_front.for_each(move |message: FrontToBack| {
let rpc = rpc.clone();
let (to_front, from_back) = mpsc01::channel(DEFAULT_CHANNEL_SIZE);
let session = RpcSession::new(to_front.clone());

let request_id = request_id.clone();
let subscriptions = subscriptions.clone();

async move {
match message {
FrontToBack::Notification { method, params } => {
Expand All @@ -127,12 +143,18 @@ impl SubxtClient {
params,
send_back,
} => {
let request_id = {
let mut request_id = request_id.write().await;
*request_id = request_id.wrapping_add(1);
*request_id
};

let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: method.into(),
params: params.into(),
id: Id::Num(0),
id: Id::Num(request_id),
}));
if let Ok(message) = serde_json::to_string(&request) {
if let Some(response) =
Expand Down Expand Up @@ -163,15 +185,25 @@ impl SubxtClient {
FrontToBack::Subscribe {
subscribe_method,
params,
unsubscribe_method: _,
unsubscribe_method,
send_back,
} => {
let request_id = {
let mut request_id = request_id.write().await;
*request_id = request_id.wrapping_add(1);
*request_id
};
{
let mut subscriptions = subscriptions.write().await;
subscriptions.insert(request_id, unsubscribe_method);
}

let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: subscribe_method,
params,
id: Id::Num(0),
id: Id::Num(request_id),
}));

let (mut send_front_sub, send_back_sub) =
Expand All @@ -188,10 +220,7 @@ impl SubxtClient {
Output::Success(_) => {
Ok((
send_back_sub,
// NOTE: The ID is used to unsubscribe to specific subscription
// which the `SubxtClient` doesn't support so hardcoding it to `0`
// is fine.
SubscriptionId::Num(0),
SubscriptionId::Num(request_id),
))
}
Output::Failure(failure) => {
Expand Down Expand Up @@ -219,16 +248,44 @@ impl SubxtClient {
&response
)
.expect("failed to decode subscription notif");
send_front_sub
// ignore send error since the channel is probably closed
let _ = send_front_sub
.send(notif.params.result)
.await
.expect("failed to send subscription notif")
.await;
}
});
}

FrontToBack::SubscriptionClosed(_) => {
// NOTE: unsubscriptions are not supported by SubxtClient.
FrontToBack::SubscriptionClosed(subscription_id) => {
let request_id = {
let mut request_id = request_id.write().await;
*request_id = request_id.wrapping_add(1);
*request_id
};

let sub_id =
if let SubscriptionId::Num(num) = subscription_id {
num
} else {
unreachable!("subscription id should be num")
};
let json_sub_id = jsonrpc::to_value(sub_id).unwrap();

let subscriptions = subscriptions.read().await;
if let Some(unsubscribe) = subscriptions.get(&sub_id) {
let request =
Request::Single(Call::MethodCall(MethodCall {
jsonrpc: Version::V2,
method: unsubscribe.into(),
params: jsonrpc::Params::Array(vec![
json_sub_id,
]),
id: Id::Num(request_id),
}));
if let Ok(message) = serde_json::to_string(&request) {
rpc.rpc_query(&session, &message).await;
}
}
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ pub enum RpcClient {
}

impl RpcClient {
/// request
pub async fn request<T: DeserializeOwned>(
&self,
method: &str,
Expand All @@ -186,6 +187,7 @@ impl RpcClient {
}
}

/// subscribe
pub async fn subscribe<T: DeserializeOwned>(
&self,
subscribe_method: &str,
Expand Down

0 comments on commit 05fe7ce

Please sign in to comment.