Skip to content

Commit

Permalink
Actually clone client data by reference when cloning the client (#1941)
Browse files Browse the repository at this point in the history
* actually clone client data by reference when clonning the client

* spelling

* clippy
  • Loading branch information
svyatonik authored and bkchr committed Apr 10, 2024
1 parent e0c0861 commit f4ca16b
Showing 1 changed file with 32 additions and 24 deletions.
56 changes: 32 additions & 24 deletions bridges/relays/client-substrate/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
Result, SignParam, TransactionTracker, UnsignedTransaction,
};

use async_std::sync::{Arc, Mutex};
use async_std::sync::{Arc, Mutex, RwLock};
use async_trait::async_trait;
use bp_runtime::{HeaderIdProvider, StorageDoubleMapKeyProvider, StorageMapKeyProvider};
use codec::{Decode, Encode};
Expand Down Expand Up @@ -111,46 +111,54 @@ pub enum ChainRuntimeVersion {

/// Substrate client type.
///
/// Cloning `Client` is a cheap operation.
/// Cloning `Client` is a cheap operation that only clones internal references. Different
/// clones of the same client are guaranteed to use the same references.
pub struct Client<C: Chain> {
/// Tokio runtime handle.
tokio: Arc<tokio::runtime::Runtime>,
// Lock order: `submit_signed_extrinsic_lock`, `data`
/// Client connection params.
params: Arc<ConnectionParams>,
/// Substrate RPC client.
client: Arc<RpcClient>,
/// Genesis block hash.
genesis_hash: HashOf<C>,
/// Saved chain runtime version.
chain_runtime_version: ChainRuntimeVersion,
/// If several tasks are submitting their transactions simultaneously using
/// `submit_signed_extrinsic` method, they may get the same transaction nonce. So one of
/// transactions will be rejected from the pool. This lock is here to prevent situations like
/// that.
submit_signed_extrinsic_lock: Arc<Mutex<()>>,
/// Saved chain runtime version
chain_runtime_version: ChainRuntimeVersion,
/// Genesis block hash.
genesis_hash: HashOf<C>,
/// Shared dynamic data.
data: Arc<RwLock<ClientData>>,
}

/// Client data, shared by all `Client` clones.
struct ClientData {
/// Tokio runtime handle.
tokio: Arc<tokio::runtime::Runtime>,
/// Substrate RPC client.
client: Arc<RpcClient>,
}

#[async_trait]
impl<C: Chain> relay_utils::relay_loop::Client for Client<C> {
type Error = Error;

async fn reconnect(&mut self) -> Result<()> {
let mut data = self.data.write().await;
let (tokio, client) = Self::build_client(&self.params).await?;
self.tokio = tokio;
self.client = client;
data.tokio = tokio;
data.client = client;
Ok(())
}
}

impl<C: Chain> Clone for Client<C> {
fn clone(&self) -> Self {
Client {
tokio: self.tokio.clone(),
params: self.params.clone(),
client: self.client.clone(),
genesis_hash: self.genesis_hash,
submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(),
chain_runtime_version: self.chain_runtime_version.clone(),
submit_signed_extrinsic_lock: self.submit_signed_extrinsic_lock.clone(),
genesis_hash: self.genesis_hash,
data: self.data.clone(),
}
}
}
Expand Down Expand Up @@ -199,12 +207,11 @@ impl<C: Chain> Client<C> {

let chain_runtime_version = params.chain_runtime_version.clone();
Ok(Self {
tokio,
params,
client,
genesis_hash,
submit_signed_extrinsic_lock: Arc::new(Mutex::new(())),
chain_runtime_version,
submit_signed_extrinsic_lock: Arc::new(Mutex::new(())),
genesis_hash,
data: Arc::new(RwLock::new(ClientData { tokio, client })),
})
}

Expand Down Expand Up @@ -572,7 +579,7 @@ impl<C: Chain> Client<C> {
Ok((tracker, subscription))
})
.await?;
self.tokio.spawn(Subscription::background_worker(
self.data.read().await.tokio.spawn(Subscription::background_worker(
C::NAME.into(),
"extrinsic".into(),
subscription,
Expand Down Expand Up @@ -719,7 +726,7 @@ impl<C: Chain> Client<C> {
})
.await?;
let (sender, receiver) = futures::channel::mpsc::channel(MAX_SUBSCRIPTION_CAPACITY);
self.tokio.spawn(Subscription::background_worker(
self.data.read().await.tokio.spawn(Subscription::background_worker(
C::NAME.into(),
"justification".into(),
subscription,
Expand All @@ -735,8 +742,9 @@ impl<C: Chain> Client<C> {
F: Future<Output = Result<T>> + Send,
T: Send + 'static,
{
let client = self.client.clone();
self.tokio.spawn(async move { make_jsonrpsee_future(client).await }).await?
let data = self.data.read().await;
let client = data.client.clone();
data.tokio.spawn(async move { make_jsonrpsee_future(client).await }).await?
}

/// Returns `true` if version guard can be started.
Expand Down

0 comments on commit f4ca16b

Please sign in to comment.