From 62954591d25c164b1cc42084c32010e15cd4a75a Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Wed, 9 Oct 2024 17:33:29 +0900 Subject: [PATCH] perf(rpc): use `Arc>>>; /// The type that can send the response to a requested [`BlockWithSenders`] -type BlockWithSendersResponseSender = oneshot::Sender>>; +type BlockWithSendersResponseSender = + oneshot::Sender>>>; /// The type that can send the response to the requested receipts of a block. type ReceiptsResponseSender = oneshot::Sender>>>>; @@ -48,7 +49,7 @@ type EnvResponseSender = oneshot::Sender = MultiConsumerLruCache< B256, - BlockWithSenders, + Arc, L, Either, >; @@ -151,7 +152,7 @@ impl EthStateCache { rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?; if let Ok(Some(block_with_senders)) = block_with_senders_res { - Ok(Some(block_with_senders.block)) + Ok(Some(block_with_senders.block.clone())) } else { Ok(None) } @@ -186,7 +187,7 @@ impl EthStateCache { Ok(self .get_block_with_senders(block_hash) .await? - .map(|block| block.into_transactions_ecrecovered().collect())) + .map(|block| (*block).clone().into_transactions_ecrecovered().collect())) } /// Fetches both transactions and receipts for the given block hash. @@ -208,7 +209,7 @@ impl EthStateCache { pub async fn get_block_with_senders( &self, block_hash: B256, - ) -> ProviderResult> { + ) -> ProviderResult>> { let (response_tx, rx) = oneshot::channel(); let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx }); rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)? @@ -221,7 +222,10 @@ impl EthStateCache { &self, block_hash: B256, ) -> ProviderResult> { - Ok(self.get_block_with_senders(block_hash).await?.map(|block| block.seal(block_hash))) + Ok(self + .get_block_with_senders(block_hash) + .await? + .map(|block| (*block).clone().seal(block_hash))) } /// Requests the [Receipt] for the block hash @@ -288,7 +292,7 @@ pub(crate) struct EthStateCacheService< LimitReceipts = ByLength, LimitEnvs = ByLength, > where - LimitBlocks: Limiter, + LimitBlocks: Limiter>, LimitReceipts: Limiter>>, LimitEnvs: Limiter, { @@ -318,7 +322,11 @@ where Tasks: TaskSpawner + Clone + 'static, EvmConfig: ConfigureEvm
, { - fn on_new_block(&mut self, block_hash: B256, res: ProviderResult>) { + fn on_new_block( + &mut self, + block_hash: B256, + res: ProviderResult>>, + ) { if let Some(queued) = self.full_block_cache.remove(&block_hash) { // send the response to queued senders for tx in queued { @@ -328,7 +336,7 @@ where } Either::Right(transaction_tx) => { let _ = transaction_tx.send(res.clone().map(|maybe_block| { - maybe_block.map(|block| block.block.body.transactions) + maybe_block.map(|block| block.block.body.transactions.clone()) })); } } @@ -360,6 +368,7 @@ where } fn on_reorg_block(&mut self, block_hash: B256, res: ProviderResult>) { + let res = res.map(|b| b.map(Arc::new)); if let Some(queued) = self.full_block_cache.remove(&block_hash) { // send the response to queued senders for tx in queued { @@ -369,7 +378,7 @@ where } Either::Right(transaction_tx) => { let _ = transaction_tx.send(res.clone().map(|maybe_block| { - maybe_block.map(|block| block.block.body.transactions) + maybe_block.map(|block| block.block.body.transactions.clone()) })); } } @@ -431,10 +440,12 @@ where let _permit = rate_limiter.acquire().await; // Only look in the database to prevent situations where we // looking up the tree is blocking - let block_sender = provider.block_with_senders( - BlockHashOrNumber::Hash(block_hash), - TransactionVariant::WithHash, - ); + let block_sender = provider + .block_with_senders( + BlockHashOrNumber::Hash(block_hash), + TransactionVariant::WithHash, + ) + .map(|maybe_block| maybe_block.map(Arc::new)); let _ = action_tx.send(CacheAction::BlockWithSendersResult { block_hash, res: block_sender, @@ -459,10 +470,12 @@ where let _permit = rate_limiter.acquire().await; // Only look in the database to prevent situations where we // looking up the tree is blocking - let res = provider.block_with_senders( - BlockHashOrNumber::Hash(block_hash), - TransactionVariant::WithHash, - ); + let res = provider + .block_with_senders( + BlockHashOrNumber::Hash(block_hash), + TransactionVariant::WithHash, + ) + .map(|b| b.map(Arc::new)); let _ = action_tx.send(CacheAction::BlockWithSendersResult { block_hash, res, @@ -561,7 +574,7 @@ where } CacheAction::CacheNewCanonicalChain { chain_change } => { for block in chain_change.blocks { - this.on_new_block(block.hash(), Ok(Some(block.unseal()))); + this.on_new_block(block.hash(), Ok(Some(Arc::new(block.unseal())))); } for block_receipts in chain_change.receipts { @@ -601,7 +614,7 @@ enum CacheAction { GetBlockTransactions { block_hash: B256, response_tx: BlockTransactionsResponseSender }, GetEnv { block_hash: B256, response_tx: EnvResponseSender }, GetReceipts { block_hash: B256, response_tx: ReceiptsResponseSender }, - BlockWithSendersResult { block_hash: B256, res: ProviderResult> }, + BlockWithSendersResult { block_hash: B256, res: ProviderResult>> }, ReceiptsResult { block_hash: B256, res: ProviderResult>>> }, EnvResult { block_hash: B256, res: Box> }, CacheNewCanonicalChain { chain_change: ChainChange },