From e449915caaf7494e389f2d0b2f15e26dc244b4f2 Mon Sep 17 00:00:00 2001 From: Konrad Stepniak Date: Thu, 19 Dec 2024 15:50:26 +0100 Subject: [PATCH] fix(storagext): losing transactions on reorg --- storagext/lib/src/runtime/client.rs | 203 +++++++++++----------------- 1 file changed, 79 insertions(+), 124 deletions(-) diff --git a/storagext/lib/src/runtime/client.rs b/storagext/lib/src/runtime/client.rs index 029e573f..434754bd 100644 --- a/storagext/lib/src/runtime/client.rs +++ b/storagext/lib/src/runtime/client.rs @@ -1,16 +1,14 @@ use std::time::Duration; -use codec::Encode; use hex::ToHex; use subxt::{ backend::{ legacy::LegacyRpcMethods, rpc::reconnecting_rpc_client::{FixedInterval, RpcClient}, }, - blocks::Block, + blocks::ExtrinsicEvents, config::DefaultExtrinsicParamsBuilder, - events::Events, - utils::H256, + tx::SubmittableExtrinsic, OnlineClient, }; use tokio::sync::Mutex; @@ -26,11 +24,8 @@ where /// Submission block hash. pub hash: Config::Hash, - /// Submission block height. - pub height: u64, - /// Resulting extrinsic's events. - pub events: Events, + pub events: ExtrinsicEvents, } /// Client to interact with a pallet extrinsics. @@ -78,10 +73,17 @@ impl Client { Call: subxt::tx::Payload, { if wait_for_finalization { - let submitted_extrinsic_hash = self.client.tx().create_unsigned(call)?.submit().await?; - self.traced_submission_with_finalization(submitted_extrinsic_hash) - .await - .map(Option::Some) + let tx = self + .client + .tx() + .create_unsigned(call)? + .submit_and_watch() + .await?; + let events = tx.wait_for_finalized_success().await?; + Ok(Some(SubmissionResult { + hash: events.extrinsic_hash(), + events: events, + })) } else { tracing::trace!("submitting unsigned extrinsic"); let extrinsic_hash = self.client.tx().create_unsigned(call)?.submit().await?; @@ -135,129 +137,82 @@ impl Client { Call: subxt::tx::Payload, Keypair: subxt::tx::Signer, { - // Critical section - let submitted_extrinsic_hash = { - let mut last_sent_nonce = self.last_sent_nonce.lock().await; - let current_nonce = self - .legacy_rpc - .system_account_next_index(&account_keypair.account_id()) - .await?; - let current_header = self.legacy_rpc.chain_get_header(None).await?.unwrap(); - let ext_params = DefaultExtrinsicParamsBuilder::new() - .mortal(¤t_header, 8) - .nonce(current_nonce) - .build(); + if wait_for_finalization { + // Critical section so the lock is released after nonce is updated in the node's txpool. + let tx = { + let mut last_sent_nonce = self.last_sent_nonce.lock().await; - let submitted_extrinsic_hash = self - .client - .tx() - .create_signed_offline(call, account_keypair, ext_params)? - .submit() - .await?; + let (current_nonce, ext) = self.extrinsic_with_nonce(call, account_keypair).await?; + let tx = ext.submit_and_watch().await?; - tracing::debug!( - "Previous nonce: {}, next nonce: {}", - last_sent_nonce, - current_nonce - ); - *last_sent_nonce = current_nonce; + tracing::debug!( + "Waiting for finalization, Previous nonce: {}, next nonce: {}", + last_sent_nonce, + current_nonce + ); + *last_sent_nonce = current_nonce; + tx + }; + + let events = tx.wait_for_finalized_success().await?; + Ok(Some(SubmissionResult { + hash: events.extrinsic_hash(), + events: events, + })) + } else { + // Critical section so the lock is released after nonce is updated in the node's txpool. + let hash = { + let mut last_sent_nonce = self.last_sent_nonce.lock().await; - submitted_extrinsic_hash - }; + let (current_nonce, ext) = self.extrinsic_with_nonce(call, account_keypair).await?; + let tx_hash = ext.submit().await?; - if wait_for_finalization { - self.traced_submission_with_finalization(submitted_extrinsic_hash) - .await - .map(Option::Some) - } else { + tracing::debug!( + "Not waiting for finalization, previous nonce: {}, next nonce: {}", + last_sent_nonce, + current_nonce + ); + *last_sent_nonce = current_nonce; + tx_hash + }; tracing::trace!( - extrinsic_hash = submitted_extrinsic_hash.encode_hex::(), + extrinsic_hash = hash.encode_hex::(), "extrinsic published, not waiting for the finalization" ); Ok(None) } } - pub(crate) async fn traced_submission_with_finalization( + async fn extrinsic_with_nonce( &self, - submitted_extrinsic_hash: H256, - ) -> Result, subxt::Error> { - tracing::trace!("submitting extrinsic"); - - let mut finalized_block_stream = self.client.blocks().subscribe_finalized().await?; - - tracing::debug!( - extrinsic_hash = submitted_extrinsic_hash.encode_hex::(), - "waiting for finalization" - ); - - let metadata = self.client.metadata(); - - tracing::debug!("ext metadata {:?}", metadata.extrinsic()); - - let finalized_block = tokio::task::spawn(async move { - 'outer: loop { - let Some(block) = finalized_block_stream.next().await else { - return Err(subxt::Error::Io(std::io::Error::new( - std::io::ErrorKind::BrokenPipe, - "stream was closed", - ))); - }; - - let block: Block = block?; - tracing::debug!( - "checking block number: {} hash: {}", - block.number(), - block.hash() - ); - - for extrinsic in block.extrinsics().await?.iter() { - // There's a bug on subxt that forces us to use this thing, - // in 0.38 we can just use .hash(), in fact, in 0.38 this line doesn't work! - // https://github.com/paritytech/subxt/discussions/1851#discussioncomment-11133684 - let extrinsic_hash = extrinsic.hash(); - - if submitted_extrinsic_hash == extrinsic_hash { - // Extrinsic failures are placed in the same block as the extrinsic. - let failed_extrinsic_event: Option< - crate::runtime::system::events::ExtrinsicFailed, - > = block.events().await?.find_first()?; - - if let Some(event) = failed_extrinsic_event { - // debug level since we're returning the error upwards - tracing::debug!("found a failing extrinsic: {:?}", event); - // this weird encode/decode is the shortest and simplest way to convert the - // generated subxt types into the canonical types since we can't replace them - // with the proper ones - let encoded_event = event.encode(); - let dispatch_error = - subxt::error::DispatchError::decode_from(encoded_event, metadata)?; - return Err(dispatch_error.into()); - } - - break 'outer Ok(block); - } - } - } - }); - - // 1 block = 6 seconds -> 60 seconds = 10 blocks - // since the subscription has like a ~6 block delay - let timeout = tokio::time::timeout(Duration::from_secs(60), finalized_block).await; - - match timeout { - Ok(Ok(result)) => { - let result = result?; - Ok(SubmissionResult { - hash: result.hash(), - height: result.number(), - events: result.events().await?, - }) - } - Ok(Err(_)) => Err(subxt::Error::Other("failed to join tasks".to_string())), - Err(_) => Err(subxt::Error::Other( - "timeout while waiting for the extrinsic call to be finalized".to_string(), - )), - } + call: &Call, + account_keypair: &Keypair, + ) -> Result< + ( + u64, + SubmittableExtrinsic>, + ), + subxt::Error, + > + where + Call: subxt::tx::Payload, + Keypair: subxt::tx::Signer, + { + let current_nonce = self + .legacy_rpc + .system_account_next_index(&account_keypair.account_id()) + .await?; + let current_header = self.legacy_rpc.chain_get_header(None).await?.unwrap(); + let ext_params = DefaultExtrinsicParamsBuilder::new() + .mortal(¤t_header, 8) + .nonce(current_nonce) + .build(); + + let ext = self + .client + .tx() + .create_signed_offline(call, account_keypair, ext_params)?; + + Ok((current_nonce, ext)) } }