Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storagext): allow extrinsics to be submitted in parallel #643

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 70 additions & 21 deletions storagext/lib/src/runtime/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,17 @@ use std::time::Duration;
use codec::Encode;
use hex::ToHex;
use subxt::{
backend::rpc::reconnecting_rpc_client::{FixedInterval, RpcClient},
backend::{
legacy::LegacyRpcMethods,
rpc::reconnecting_rpc_client::{FixedInterval, RpcClient},
},
blocks::Block,
config::DefaultExtrinsicParamsBuilder,
events::Events,
utils::H256,
OnlineClient,
};
use tokio::sync::Mutex;

use crate::PolkaStorageConfig;

Expand All @@ -32,6 +37,10 @@ where
/// You can call any extrinsic via [`Client::traced_submission`].
pub struct Client {
pub(crate) client: OnlineClient<PolkaStorageConfig>,
pub(crate) legacy_rpc: LegacyRpcMethods<PolkaStorageConfig>,
/// We're not using AtomicU64 as we need to hold the critical sections across many instructions.
/// Look at [`Self::traced_submission`].
last_sent_nonce: Mutex<u64>,
th7nder marked this conversation as resolved.
Show resolved Hide resolved
}

impl Client {
Expand All @@ -54,7 +63,9 @@ impl Client {
.map_err(|e| subxt::error::RpcError::ClientError(Box::new(e)))?;

Ok(Self {
client: OnlineClient::<_>::from_rpc_client(rpc_client).await?,
client: OnlineClient::<_>::from_rpc_client(rpc_client.clone()).await?,
legacy_rpc: LegacyRpcMethods::<_>::new(rpc_client.into()),
last_sent_nonce: Mutex::new(0),
})
}

Expand Down Expand Up @@ -85,9 +96,35 @@ impl Client {
}

/// Submit an extrinsic and wait for finalization, returning the block hash it was included in.
/// It is thread-safe, allows to submit multiple extrinsics at the same time.
/// If another process is submitting the transactions at the same time, the retry mechanism at the higher layer is needed.
///
/// Equivalent to performing [`OnlineClient::sign_and_submit_then_watch_default`],
/// followed by [`TxInBlock::wait_for_finalized`] and [`TxInBlock::wait_for_success`].
///
/// ## Nonce mechanism
///
/// ### Context
/// Each transaction sent to the blockchain must have a nonce. Nonces are incremented sequentially and cannot have gaps.
/// If you submit a transaction with the same nonce, one of them will fail or be replaced. Dependent on the priority (transaction size).
///
/// ### Solution
///
/// The current solution for this is optimistic. It is fetching the nonce using `system_account_next_index` from the **best block** and using it as a nonce.
/// Returned index is taking into the account transactions already included in the blocks and the ones pending (in the transaction pool).
/// To avoid the race condition between the tasks in the same process a critical section is introduced.
/// It locks the extrinsic submission, so the next task is allowed to fetch the next index only after the previous has been submitted (txpool updated).
///
/// 1. We assume we connect to the same node for each transaction performed, if we didn't, then the possibility of nonce collisions would be more frequent.
/// 2. When we `.submit()` a transaction and it fails, the nonce is not updated, so next time we call `system_account_next_index`, it'll return the same nonce.
/// 3. When we `.submit()` a transaction and it succeeds, the nonce is updated, next returned nonce will be incremented.
/// 4. If any other process submits the transaction, after we fetch the current_nonce, this call will:
/// a) fail (transaction outdated)
/// b) fail (will be replaced by the other process transaction)
/// c) succeed (replace the other process transaction)
/// 5. Because of the 1. and 4., the retry mechanism would be needed and the error is detectable:
/// a) at the `.submit()` level, when nonce < chain_nonce OR nonce == chain_nonce && tx1_priority < tx2_priority.
/// b) only after waiting for finalization and not getting the event (TimeoutError).
pub(crate) async fn traced_submission<Call, Keypair>(
&self,
call: &Call,
Expand All @@ -98,26 +135,44 @@ impl Client {
Call: subxt::tx::Payload,
Keypair: subxt::tx::Signer<PolkaStorageConfig>,
{
if wait_for_finalization {
// Critical section
let submitted_extrinsic_hash = {
let mut last_sent_nonce = self.last_sent_nonce.lock().await;
let current_nonce = self
.legacy_rpc
.system_account_next_index(&account_keypair.account_id())
.await?;
let current_header = self.legacy_rpc.chain_get_header(None).await?.unwrap();
let ext_params = DefaultExtrinsicParamsBuilder::new()
.mortal(&current_header, 8)
.nonce(current_nonce)
.build();

let submitted_extrinsic_hash = self
.client
.tx()
.sign_and_submit_default(call, account_keypair)
.create_signed_offline(call, account_keypair, ext_params)?
.submit()
.await?;

tracing::debug!(
"Previous nonce: {}, next nonce: {}",
last_sent_nonce,
current_nonce
);
*last_sent_nonce = current_nonce;

submitted_extrinsic_hash
};

if wait_for_finalization {
self.traced_submission_with_finalization(submitted_extrinsic_hash)
.await
.map(Option::Some)
} else {
tracing::trace!("submitting extrinsic");
let extrinsic_hash = self
.client
.tx()
.sign_and_submit_default(call, account_keypair)
.await?;

tracing::trace!(
extrinsic_hash = extrinsic_hash.encode_hex::<String>(),
"waiting for finalization"
extrinsic_hash = submitted_extrinsic_hash.encode_hex::<String>(),
"extrinsic published, not waiting for the finalization"
);
Ok(None)
}
Expand Down Expand Up @@ -186,9 +241,9 @@ impl Client {
}
});

// 1 block = 6 seconds -> 120 seconds = 20 blocks
// 1 block = 6 seconds -> 60 seconds = 10 blocks
// since the subscription has like a ~6 block delay
let timeout = tokio::time::timeout(Duration::from_secs(120), finalized_block).await;
let timeout = tokio::time::timeout(Duration::from_secs(60), finalized_block).await;

match timeout {
Ok(Ok(result)) => {
Expand All @@ -206,9 +261,3 @@ impl Client {
}
}
}

impl From<OnlineClient<PolkaStorageConfig>> for Client {
fn from(client: OnlineClient<PolkaStorageConfig>) -> Self {
Self { client }
}
}
Loading