From cbf3925e1fe1383b998cfb428038c46da1577501 Mon Sep 17 00:00:00 2001 From: PG Herveou Date: Mon, 20 Jan 2025 23:58:21 +0100 Subject: [PATCH] [eth-indexer] subscribe to finalize blocks instead of best blocks (#7260) For eth-indexer, it's probably safer to use `subscribe_finalized` and index these blocks into the DB rather than `subscribe_best` --------- Co-authored-by: command-bot <> --- prdoc/pr_7260.prdoc | 10 ++++++ substrate/frame/revive/rpc/src/client.rs | 45 +++++++++++++++--------- 2 files changed, 39 insertions(+), 16 deletions(-) create mode 100644 prdoc/pr_7260.prdoc diff --git a/prdoc/pr_7260.prdoc b/prdoc/pr_7260.prdoc new file mode 100644 index 000000000000..62f73120bc19 --- /dev/null +++ b/prdoc/pr_7260.prdoc @@ -0,0 +1,10 @@ +title: '[eth-indexer] subscribe to finalize blocks instead of best blocks' +doc: +- audience: Runtime Dev + description: 'For eth-indexer, it''s probably safer to use `subscribe_finalized` + and index these blocks into the DB rather than `subscribe_best` + + ' +crates: +- name: pallet-revive-eth-rpc + bump: minor diff --git a/substrate/frame/revive/rpc/src/client.rs b/substrate/frame/revive/rpc/src/client.rs index a5a022f97228..7a72f8e26b0b 100644 --- a/substrate/frame/revive/rpc/src/client.rs +++ b/substrate/frame/revive/rpc/src/client.rs @@ -68,6 +68,14 @@ pub type Shared = Arc>; /// The runtime balance type. pub type Balance = u128; +/// The subscription type used to listen to new blocks. +pub enum SubscriptionType { + /// Subscribe to the best blocks. + BestBlocks, + /// Subscribe to the finalized blocks. + FinalizedBlocks, +} + /// Unwrap the original `jsonrpsee::core::client::Error::Call` error. fn unwrap_call_err(err: &subxt::error::RpcError) -> Option { use subxt::backend::rpc::reconnecting_rpc_client; @@ -278,19 +286,23 @@ impl Client { /// Subscribe to new best blocks, and execute the async closure with /// the extracted block and ethereum transactions - async fn subscribe_new_blocks(&self, callback: F) -> Result<(), ClientError> + async fn subscribe_new_blocks( + &self, + subscription_type: SubscriptionType, + callback: F, + ) -> Result<(), ClientError> where F: Fn(SubstrateBlock) -> Fut + Send + Sync, Fut: std::future::Future> + Send, { log::info!(target: LOG_TARGET, "Subscribing to new blocks"); - let mut block_stream = match self.api.blocks().subscribe_best().await { - Ok(s) => s, - Err(err) => { - log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}"); - return Err(err.into()); - }, - }; + let mut block_stream = match subscription_type { + SubscriptionType::BestBlocks => self.api.blocks().subscribe_best().await, + SubscriptionType::FinalizedBlocks => self.api.blocks().subscribe_finalized().await, + } + .inspect_err(|err| { + log::error!(target: LOG_TARGET, "Failed to subscribe to blocks: {err:?}"); + })?; while let Some(block) = block_stream.next().await { let block = match block { @@ -324,7 +336,7 @@ impl Client { let client = self.clone(); spawn_handle.spawn("subscribe-blocks", None, async move { let res = client - .subscribe_new_blocks(|block| async { + .subscribe_new_blocks(SubscriptionType::BestBlocks, |block| async { let receipts = extract_receipts_from_block(&block).await?; client.receipt_provider.insert(&block.hash(), &receipts).await; @@ -347,13 +359,14 @@ impl Client { &self, oldest_block: Option, ) -> Result<(), ClientError> { - let new_blocks_fut = self.subscribe_new_blocks(|block| async move { - let receipts = extract_receipts_from_block(&block).await.inspect_err(|err| { - log::error!(target: LOG_TARGET, "Failed to extract receipts from block: {err:?}"); - })?; - self.receipt_provider.insert(&block.hash(), &receipts).await; - Ok(()) - }); + let new_blocks_fut = + self.subscribe_new_blocks(SubscriptionType::FinalizedBlocks, |block| async move { + let receipts = extract_receipts_from_block(&block).await.inspect_err(|err| { + log::error!(target: LOG_TARGET, "Failed to extract receipts from block: {err:?}"); + })?; + self.receipt_provider.insert(&block.hash(), &receipts).await; + Ok(()) + }); let Some(oldest_block) = oldest_block else { return new_blocks_fut.await };