Skip to content

Commit

Permalink
fix(storagext): losing transactions on reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
th7nder committed Dec 19, 2024
1 parent d334858 commit e449915
Showing 1 changed file with 79 additions and 124 deletions.
203 changes: 79 additions & 124 deletions storagext/lib/src/runtime/client.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
use std::time::Duration;

use codec::Encode;
use hex::ToHex;
use subxt::{
backend::{
legacy::LegacyRpcMethods,
rpc::reconnecting_rpc_client::{FixedInterval, RpcClient},
},
blocks::Block,
blocks::ExtrinsicEvents,
config::DefaultExtrinsicParamsBuilder,
events::Events,
utils::H256,
tx::SubmittableExtrinsic,
OnlineClient,
};
use tokio::sync::Mutex;
Expand All @@ -26,11 +24,8 @@ where
/// Submission block hash.
pub hash: Config::Hash,

/// Submission block height.
pub height: u64,

/// Resulting extrinsic's events.
pub events: Events<Config>,
pub events: ExtrinsicEvents<Config>,
}

/// Client to interact with a pallet extrinsics.
Expand Down Expand Up @@ -78,10 +73,17 @@ impl Client {
Call: subxt::tx::Payload,
{
if wait_for_finalization {
let submitted_extrinsic_hash = self.client.tx().create_unsigned(call)?.submit().await?;
self.traced_submission_with_finalization(submitted_extrinsic_hash)
.await
.map(Option::Some)
let tx = self
.client
.tx()
.create_unsigned(call)?
.submit_and_watch()
.await?;
let events = tx.wait_for_finalized_success().await?;
Ok(Some(SubmissionResult {
hash: events.extrinsic_hash(),
events: events,
}))
} else {
tracing::trace!("submitting unsigned extrinsic");
let extrinsic_hash = self.client.tx().create_unsigned(call)?.submit().await?;
Expand Down Expand Up @@ -135,129 +137,82 @@ impl Client {
Call: subxt::tx::Payload,
Keypair: subxt::tx::Signer<PolkaStorageConfig>,
{
// Critical section
let submitted_extrinsic_hash = {
let mut last_sent_nonce = self.last_sent_nonce.lock().await;
let current_nonce = self
.legacy_rpc
.system_account_next_index(&account_keypair.account_id())
.await?;
let current_header = self.legacy_rpc.chain_get_header(None).await?.unwrap();
let ext_params = DefaultExtrinsicParamsBuilder::new()
.mortal(&current_header, 8)
.nonce(current_nonce)
.build();
if wait_for_finalization {
// Critical section so the lock is released after nonce is updated in the node's txpool.
let tx = {
let mut last_sent_nonce = self.last_sent_nonce.lock().await;

let submitted_extrinsic_hash = self
.client
.tx()
.create_signed_offline(call, account_keypair, ext_params)?
.submit()
.await?;
let (current_nonce, ext) = self.extrinsic_with_nonce(call, account_keypair).await?;
let tx = ext.submit_and_watch().await?;

tracing::debug!(
"Previous nonce: {}, next nonce: {}",
last_sent_nonce,
current_nonce
);
*last_sent_nonce = current_nonce;
tracing::debug!(
"Waiting for finalization, Previous nonce: {}, next nonce: {}",
last_sent_nonce,
current_nonce
);
*last_sent_nonce = current_nonce;
tx
};

let events = tx.wait_for_finalized_success().await?;
Ok(Some(SubmissionResult {
hash: events.extrinsic_hash(),
events: events,
}))
} else {
// Critical section so the lock is released after nonce is updated in the node's txpool.
let hash = {
let mut last_sent_nonce = self.last_sent_nonce.lock().await;

submitted_extrinsic_hash
};
let (current_nonce, ext) = self.extrinsic_with_nonce(call, account_keypair).await?;
let tx_hash = ext.submit().await?;

if wait_for_finalization {
self.traced_submission_with_finalization(submitted_extrinsic_hash)
.await
.map(Option::Some)
} else {
tracing::debug!(
"Not waiting for finalization, previous nonce: {}, next nonce: {}",
last_sent_nonce,
current_nonce
);
*last_sent_nonce = current_nonce;
tx_hash
};
tracing::trace!(
extrinsic_hash = submitted_extrinsic_hash.encode_hex::<String>(),
extrinsic_hash = hash.encode_hex::<String>(),
"extrinsic published, not waiting for the finalization"
);
Ok(None)
}
}

pub(crate) async fn traced_submission_with_finalization(
async fn extrinsic_with_nonce<Keypair, Call>(
&self,
submitted_extrinsic_hash: H256,
) -> Result<SubmissionResult<PolkaStorageConfig>, subxt::Error> {
tracing::trace!("submitting extrinsic");

let mut finalized_block_stream = self.client.blocks().subscribe_finalized().await?;

tracing::debug!(
extrinsic_hash = submitted_extrinsic_hash.encode_hex::<String>(),
"waiting for finalization"
);

let metadata = self.client.metadata();

tracing::debug!("ext metadata {:?}", metadata.extrinsic());

let finalized_block = tokio::task::spawn(async move {
'outer: loop {
let Some(block) = finalized_block_stream.next().await else {
return Err(subxt::Error::Io(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"stream was closed",
)));
};

let block: Block<PolkaStorageConfig, _> = block?;
tracing::debug!(
"checking block number: {} hash: {}",
block.number(),
block.hash()
);

for extrinsic in block.extrinsics().await?.iter() {
// There's a bug on subxt that forces us to use this thing,
// in 0.38 we can just use .hash(), in fact, in 0.38 this line doesn't work!
// https://github.com/paritytech/subxt/discussions/1851#discussioncomment-11133684
let extrinsic_hash = extrinsic.hash();

if submitted_extrinsic_hash == extrinsic_hash {
// Extrinsic failures are placed in the same block as the extrinsic.
let failed_extrinsic_event: Option<
crate::runtime::system::events::ExtrinsicFailed,
> = block.events().await?.find_first()?;

if let Some(event) = failed_extrinsic_event {
// debug level since we're returning the error upwards
tracing::debug!("found a failing extrinsic: {:?}", event);
// this weird encode/decode is the shortest and simplest way to convert the
// generated subxt types into the canonical types since we can't replace them
// with the proper ones
let encoded_event = event.encode();
let dispatch_error =
subxt::error::DispatchError::decode_from(encoded_event, metadata)?;
return Err(dispatch_error.into());
}

break 'outer Ok(block);
}
}
}
});

// 1 block = 6 seconds -> 60 seconds = 10 blocks
// since the subscription has like a ~6 block delay
let timeout = tokio::time::timeout(Duration::from_secs(60), finalized_block).await;

match timeout {
Ok(Ok(result)) => {
let result = result?;
Ok(SubmissionResult {
hash: result.hash(),
height: result.number(),
events: result.events().await?,
})
}
Ok(Err(_)) => Err(subxt::Error::Other("failed to join tasks".to_string())),
Err(_) => Err(subxt::Error::Other(
"timeout while waiting for the extrinsic call to be finalized".to_string(),
)),
}
call: &Call,
account_keypair: &Keypair,
) -> Result<
(
u64,
SubmittableExtrinsic<PolkaStorageConfig, OnlineClient<PolkaStorageConfig>>,
),
subxt::Error,
>
where
Call: subxt::tx::Payload,
Keypair: subxt::tx::Signer<PolkaStorageConfig>,
{
let current_nonce = self
.legacy_rpc
.system_account_next_index(&account_keypair.account_id())
.await?;
let current_header = self.legacy_rpc.chain_get_header(None).await?.unwrap();
let ext_params = DefaultExtrinsicParamsBuilder::new()
.mortal(&current_header, 8)
.nonce(current_nonce)
.build();

let ext = self
.client
.tx()
.create_signed_offline(call, account_keypair, ext_params)?;

Ok((current_nonce, ext))
}
}

0 comments on commit e449915

Please sign in to comment.