diff --git a/client/src/lib.rs b/client/src/lib.rs index 4297498684e..3a0e76da81f 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -18,7 +18,13 @@ #![deny(missing_docs)] -use async_std::task; +use async_std::{ + sync::{ + Arc, + RwLock, + }, + task, +}; use futures::{ channel::{ mpsc, @@ -75,7 +81,10 @@ use sc_service::{ RpcSession, TaskManager, }; -use std::marker::PhantomData; +use std::{ + collections::HashMap, + marker::PhantomData, +}; use thiserror::Error; const DEFAULT_CHANNEL_SIZE: usize = 16; @@ -102,12 +111,25 @@ impl SubxtClient { pub fn new(mut task_manager: TaskManager, rpc: RpcHandlers) -> Self { let (to_back, from_front) = mpsc::channel(DEFAULT_CHANNEL_SIZE); + let request_id = Arc::new(RwLock::new(u64::MIN)); + let subscriptions = Arc::new(RwLock::new(HashMap::::new())); + task::spawn( select( Box::pin(from_front.for_each(move |message: FrontToBack| { let rpc = rpc.clone(); let (to_front, from_back) = mpsc01::channel(DEFAULT_CHANNEL_SIZE); let session = RpcSession::new(to_front.clone()); + + let request_id = request_id.clone(); + let subscriptions = subscriptions.clone(); + + let request_id = { + let mut request_id = request_id.write().await; + *request_id = request_id.wrapping_add(1); + *request_id + }; + async move { match message { FrontToBack::Notification { method, params } => { @@ -132,7 +154,7 @@ impl SubxtClient { jsonrpc: Version::V2, method: method.into(), params: params.into(), - id: Id::Num(0), + id: Id::Num(request_id), })); if let Ok(message) = serde_json::to_string(&request) { if let Some(response) = @@ -163,15 +185,20 @@ impl SubxtClient { FrontToBack::Subscribe { subscribe_method, params, - unsubscribe_method: _, + unsubscribe_method, send_back, } => { + { + let mut subscriptions = subscriptions.write().await; + subscriptions.insert(request_id, unsubscribe_method); + } + let request = Request::Single(Call::MethodCall(MethodCall { jsonrpc: Version::V2, method: subscribe_method, params, - id: Id::Num(0), + id: Id::Num(request_id), })); let (mut send_front_sub, send_back_sub) = @@ -188,10 +215,7 @@ impl SubxtClient { Output::Success(_) => { Ok(( send_back_sub, - // NOTE: The ID is used to unsubscribe to specific subscription - // which the `SubxtClient` doesn't support so hardcoding it to `0` - // is fine. - SubscriptionId::Num(0), + SubscriptionId::Num(request_id), )) } Output::Failure(failure) => { @@ -219,16 +243,38 @@ impl SubxtClient { &response ) .expect("failed to decode subscription notif"); - send_front_sub + // ignore send error since the channel is probably closed + let _ = send_front_sub .send(notif.params.result) - .await - .expect("failed to send subscription notif") + .await; } }); } - FrontToBack::SubscriptionClosed(_) => { - // NOTE: unsubscriptions are not supported by SubxtClient. + FrontToBack::SubscriptionClosed(subscription_id) => { + let sub_id = + if let SubscriptionId::Num(num) = subscription_id { + num + } else { + unreachable!("subscription id should be num") + }; + let json_sub_id = jsonrpc::to_value(sub_id).unwrap(); + + let subscriptions = subscriptions.read().await; + if let Some(unsubscribe) = subscriptions.get(&sub_id) { + let request = + Request::Single(Call::MethodCall(MethodCall { + jsonrpc: Version::V2, + method: unsubscribe.into(), + params: jsonrpc::Params::Array(vec![ + json_sub_id, + ]), + id: Id::Num(request_id), + })); + if let Ok(message) = serde_json::to_string(&request) { + rpc.rpc_query(&session, &message).await; + } + } } } }