Skip to content

Commit

Permalink
perf(rpc): use Arc<BlockWithSenders on full_block_cache (paradigm…
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo authored and ebo committed Oct 14, 2024
1 parent 05910f2 commit 24272b2
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 21 deletions.
2 changes: 1 addition & 1 deletion crates/rpc/rpc-eth-api/src/helpers/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ pub trait LoadTransaction: SpawnBlocking + FullEthApiTypes {
.get_block_with_senders(block_hash)
.await
.map_err(Self::Error::from_eth_err)?;
Ok(block.map(|block| (transaction, block.seal(block_hash))))
Ok(block.map(|block| (transaction, (*block).clone().seal(block_hash))))
}
}
}
53 changes: 33 additions & 20 deletions crates/rpc/rpc-eth-types/src/cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type BlockTransactionsResponseSender =
oneshot::Sender<ProviderResult<Option<Vec<TransactionSigned>>>>;

/// The type that can send the response to a requested [`BlockWithSenders`]
type BlockWithSendersResponseSender = oneshot::Sender<ProviderResult<Option<BlockWithSenders>>>;
type BlockWithSendersResponseSender =
oneshot::Sender<ProviderResult<Option<Arc<BlockWithSenders>>>>;

/// The type that can send the response to the requested receipts of a block.
type ReceiptsResponseSender = oneshot::Sender<ProviderResult<Option<Arc<Vec<Receipt>>>>>;
Expand All @@ -48,7 +49,7 @@ type EnvResponseSender = oneshot::Sender<ProviderResult<(CfgEnvWithHandlerCfg, B

type BlockLruCache<L> = MultiConsumerLruCache<
B256,
BlockWithSenders,
Arc<BlockWithSenders>,
L,
Either<BlockWithSendersResponseSender, BlockTransactionsResponseSender>,
>;
Expand Down Expand Up @@ -151,7 +152,7 @@ impl EthStateCache {
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?;

if let Ok(Some(block_with_senders)) = block_with_senders_res {
Ok(Some(block_with_senders.block))
Ok(Some(block_with_senders.block.clone()))
} else {
Ok(None)
}
Expand Down Expand Up @@ -186,7 +187,7 @@ impl EthStateCache {
Ok(self
.get_block_with_senders(block_hash)
.await?
.map(|block| block.into_transactions_ecrecovered().collect()))
.map(|block| (*block).clone().into_transactions_ecrecovered().collect()))
}

/// Fetches both transactions and receipts for the given block hash.
Expand All @@ -208,7 +209,7 @@ impl EthStateCache {
pub async fn get_block_with_senders(
&self,
block_hash: B256,
) -> ProviderResult<Option<BlockWithSenders>> {
) -> ProviderResult<Option<Arc<BlockWithSenders>>> {
let (response_tx, rx) = oneshot::channel();
let _ = self.to_service.send(CacheAction::GetBlockWithSenders { block_hash, response_tx });
rx.await.map_err(|_| ProviderError::CacheServiceUnavailable)?
Expand All @@ -221,7 +222,10 @@ impl EthStateCache {
&self,
block_hash: B256,
) -> ProviderResult<Option<SealedBlockWithSenders>> {
Ok(self.get_block_with_senders(block_hash).await?.map(|block| block.seal(block_hash)))
Ok(self
.get_block_with_senders(block_hash)
.await?
.map(|block| (*block).clone().seal(block_hash)))
}

/// Requests the [Receipt] for the block hash
Expand Down Expand Up @@ -288,7 +292,7 @@ pub(crate) struct EthStateCacheService<
LimitReceipts = ByLength,
LimitEnvs = ByLength,
> where
LimitBlocks: Limiter<B256, BlockWithSenders>,
LimitBlocks: Limiter<B256, Arc<BlockWithSenders>>,
LimitReceipts: Limiter<B256, Arc<Vec<Receipt>>>,
LimitEnvs: Limiter<B256, (CfgEnvWithHandlerCfg, BlockEnv)>,
{
Expand Down Expand Up @@ -318,7 +322,11 @@ where
Tasks: TaskSpawner + Clone + 'static,
EvmConfig: ConfigureEvm<Header = Header>,
{
fn on_new_block(&mut self, block_hash: B256, res: ProviderResult<Option<BlockWithSenders>>) {
fn on_new_block(
&mut self,
block_hash: B256,
res: ProviderResult<Option<Arc<BlockWithSenders>>>,
) {
if let Some(queued) = self.full_block_cache.remove(&block_hash) {
// send the response to queued senders
for tx in queued {
Expand All @@ -328,7 +336,7 @@ where
}
Either::Right(transaction_tx) => {
let _ = transaction_tx.send(res.clone().map(|maybe_block| {
maybe_block.map(|block| block.block.body.transactions)
maybe_block.map(|block| block.block.body.transactions.clone())
}));
}
}
Expand Down Expand Up @@ -360,6 +368,7 @@ where
}

fn on_reorg_block(&mut self, block_hash: B256, res: ProviderResult<Option<BlockWithSenders>>) {
let res = res.map(|b| b.map(Arc::new));
if let Some(queued) = self.full_block_cache.remove(&block_hash) {
// send the response to queued senders
for tx in queued {
Expand All @@ -369,7 +378,7 @@ where
}
Either::Right(transaction_tx) => {
let _ = transaction_tx.send(res.clone().map(|maybe_block| {
maybe_block.map(|block| block.block.body.transactions)
maybe_block.map(|block| block.block.body.transactions.clone())
}));
}
}
Expand Down Expand Up @@ -431,10 +440,12 @@ where
let _permit = rate_limiter.acquire().await;
// Only look in the database to prevent situations where we
// looking up the tree is blocking
let block_sender = provider.block_with_senders(
BlockHashOrNumber::Hash(block_hash),
TransactionVariant::WithHash,
);
let block_sender = provider
.block_with_senders(
BlockHashOrNumber::Hash(block_hash),
TransactionVariant::WithHash,
)
.map(|maybe_block| maybe_block.map(Arc::new));
let _ = action_tx.send(CacheAction::BlockWithSendersResult {
block_hash,
res: block_sender,
Expand All @@ -459,10 +470,12 @@ where
let _permit = rate_limiter.acquire().await;
// Only look in the database to prevent situations where we
// looking up the tree is blocking
let res = provider.block_with_senders(
BlockHashOrNumber::Hash(block_hash),
TransactionVariant::WithHash,
);
let res = provider
.block_with_senders(
BlockHashOrNumber::Hash(block_hash),
TransactionVariant::WithHash,
)
.map(|b| b.map(Arc::new));
let _ = action_tx.send(CacheAction::BlockWithSendersResult {
block_hash,
res,
Expand Down Expand Up @@ -561,7 +574,7 @@ where
}
CacheAction::CacheNewCanonicalChain { chain_change } => {
for block in chain_change.blocks {
this.on_new_block(block.hash(), Ok(Some(block.unseal())));
this.on_new_block(block.hash(), Ok(Some(Arc::new(block.unseal()))));
}

for block_receipts in chain_change.receipts {
Expand Down Expand Up @@ -601,7 +614,7 @@ enum CacheAction {
GetBlockTransactions { block_hash: B256, response_tx: BlockTransactionsResponseSender },
GetEnv { block_hash: B256, response_tx: EnvResponseSender },
GetReceipts { block_hash: B256, response_tx: ReceiptsResponseSender },
BlockWithSendersResult { block_hash: B256, res: ProviderResult<Option<BlockWithSenders>> },
BlockWithSendersResult { block_hash: B256, res: ProviderResult<Option<Arc<BlockWithSenders>>> },
ReceiptsResult { block_hash: B256, res: ProviderResult<Option<Arc<Vec<Receipt>>>> },
EnvResult { block_hash: B256, res: Box<ProviderResult<(CfgEnvWithHandlerCfg, BlockEnv)>> },
CacheNewCanonicalChain { chain_change: ChainChange },
Expand Down

0 comments on commit 24272b2

Please sign in to comment.