Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(provider): handle race on fn transaction_id #11380

Merged
merged 1 commit into from
Oct 1, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 45 additions & 61 deletions crates/storage/provider/src/providers/blockchain_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
RequestsProvider, StageCheckpointReader, StateProviderBox, StateProviderFactory, StateReader,
StaticFileProviderFactory, TransactionVariant, TransactionsProvider, WithdrawalsProvider,
};
use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag};
use alloy_eips::{BlockHashOrNumber, BlockId, BlockNumHash, BlockNumberOrTag, HashOrNumber};
use alloy_primitives::{Address, BlockHash, BlockNumber, Sealable, TxHash, TxNumber, B256, U256};
use alloy_rpc_types_engine::ForkchoiceState;
use reth_chain_state::{
Expand Down Expand Up @@ -241,16 +241,16 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
Ok(self.canonical_in_memory_state.state_provider_from_state(state, latest_historical))
}

/// Fetches data from either in-memory state or storage by [`TxNumber`].
fn get_in_memory_or_storage_by_tx_id<S, M, R>(
/// Fetches data from either in-memory state or storage by transaction [`HashOrNumber`].
fn get_in_memory_or_storage_by_tx<S, M, R>(
&self,
id: TxNumber,
id: HashOrNumber,
fetch_from_db: S,
fetch_from_block_state: M,
) -> ProviderResult<Option<R>>
where
S: FnOnce(DatabaseProviderRO<N::DB, N::ChainSpec>) -> ProviderResult<Option<R>>,
M: Fn(usize, Arc<BlockState>) -> ProviderResult<Option<R>>,
M: Fn(usize, TxNumber, Arc<BlockState>) -> ProviderResult<Option<R>>,
{
// Order of instantiation matters. More information on: `fetch_db_mem_range_while`.
let in_mem_chain = self.canonical_in_memory_state.canonical_chain().collect::<Vec<_>>();
Expand All @@ -274,8 +274,10 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {

// If the transaction number is less than the first in-memory transaction number, make a
// database lookup
if id < in_memory_tx_num {
return fetch_from_db(provider)
if let HashOrNumber::Number(id) = id {
if id < in_memory_tx_num {
return fetch_from_db(provider)
}
}

// Iterate from the lowest block to the highest
Expand All @@ -284,14 +286,28 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
let block = executed_block.block();

for tx_index in 0..block.body.transactions.len() {
if id == in_memory_tx_num {
return fetch_from_block_state(tx_index, block_state)
match id {
HashOrNumber::Hash(tx_hash) => {
if tx_hash == block.body.transactions[tx_index].hash() {
return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
}
}
HashOrNumber::Number(id) => {
if id == in_memory_tx_num {
return fetch_from_block_state(tx_index, in_memory_tx_num, block_state)
}
}
}

in_memory_tx_num += 1;
}
}

// Not found in-memory, so check database.
if let HashOrNumber::Hash(_) = id {
return fetch_from_db(provider)
}

Ok(None)
}
}
Expand Down Expand Up @@ -680,50 +696,18 @@ impl<N: ProviderNodeTypes> BlockReader for BlockchainProvider2<N> {

impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {
fn transaction_id(&self, tx_hash: TxHash) -> ProviderResult<Option<TxNumber>> {
// First, check the database
if let Some(id) = self.database.transaction_id(tx_hash)? {
return Ok(Some(id))
}

// If the transaction is not found in the database, check the in-memory state

// Get the last transaction number stored in the database
let last_database_block_number = self.database.last_block_number()?;
let last_database_tx_id = self
.database
.block_body_indices(last_database_block_number)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?
.last_tx_num();

// Find the transaction in the in-memory state with the matching hash, and return its
// number
let mut in_memory_tx_id = last_database_tx_id + 1;
for block_number in last_database_block_number.saturating_add(1)..=
self.canonical_in_memory_state.get_canonical_block_number()
{
// TODO: there might be an update between loop iterations, we
// need to handle that situation.
let block_state = self
.canonical_in_memory_state
.state_by_number(block_number)
.ok_or(ProviderError::StateForNumberNotFound(block_number))?;
for tx in block_state.block().block().body.transactions() {
if tx.hash() == tx_hash {
return Ok(Some(in_memory_tx_id))
}

in_memory_tx_id += 1;
}
}

Ok(None)
self.get_in_memory_or_storage_by_tx(
tx_hash.into(),
|db_provider| db_provider.transaction_id(tx_hash),
|_, tx_number, _| Ok(Some(tx_number)),
)
}

fn transaction_by_id(&self, id: TxNumber) -> ProviderResult<Option<TransactionSigned>> {
self.get_in_memory_or_storage_by_tx_id(
id,
self.get_in_memory_or_storage_by_tx(
id.into(),
|provider| provider.transaction_by_id(id),
|tx_index, block_state| {
|tx_index, _, block_state| {
Ok(block_state.block().block().body.transactions.get(tx_index).cloned())
},
)
Expand All @@ -733,10 +717,10 @@ impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {
&self,
id: TxNumber,
) -> ProviderResult<Option<TransactionSignedNoHash>> {
self.get_in_memory_or_storage_by_tx_id(
id,
self.get_in_memory_or_storage_by_tx(
id.into(),
|provider| provider.transaction_by_id_no_hash(id),
|tx_index, block_state| {
|tx_index, _, block_state| {
Ok(block_state
.block()
.block()
Expand Down Expand Up @@ -771,10 +755,10 @@ impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {
}

fn transaction_block(&self, id: TxNumber) -> ProviderResult<Option<BlockNumber>> {
self.get_in_memory_or_storage_by_tx_id(
id,
self.get_in_memory_or_storage_by_tx(
id.into(),
|provider| provider.transaction_block(id),
|_, block_state| Ok(Some(block_state.block().block().number)),
|_, _, block_state| Ok(Some(block_state.block().block().number)),
)
}

Expand Down Expand Up @@ -847,10 +831,10 @@ impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {
}

fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
self.get_in_memory_or_storage_by_tx_id(
id,
self.get_in_memory_or_storage_by_tx(
id.into(),
|provider| provider.transaction_sender(id),
|tx_index, block_state| {
|tx_index, _, block_state| {
Ok(block_state
.block()
.block()
Expand All @@ -865,10 +849,10 @@ impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {

impl<N: ProviderNodeTypes> ReceiptProvider for BlockchainProvider2<N> {
fn receipt(&self, id: TxNumber) -> ProviderResult<Option<Receipt>> {
self.get_in_memory_or_storage_by_tx_id(
id,
self.get_in_memory_or_storage_by_tx(
id.into(),
|provider| provider.receipt(id),
|tx_index, block_state| {
|tx_index, _, block_state| {
Ok(block_state.executed_block_receipts().get(tx_index).cloned())
},
)
Expand Down
Loading