Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(provider): add block_range_with_senders #5647

14 changes: 8 additions & 6 deletions crates/rpc/rpc/src/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use reth_rpc_api::TraceApiServer;
use reth_rpc_types::{
state::StateOverride,
trace::{filter::TraceFilter, parity::*, tracerequest::TraceCallRequest},
BlockError, BlockOverrides, CallRequest, Index,
BlockOverrides, CallRequest, Index,
};
use revm::{db::CacheDB, primitives::Env};
use std::{collections::HashSet, sync::Arc};
Expand Down Expand Up @@ -257,15 +257,17 @@ where
}

// fetch all blocks in that range
let blocks = self.provider().block_range(start..=end)?;
let blocks = self.provider().block_range_with_senders(start..=end)?;

// find relevant blocks to trace
let mut target_blocks = Vec::new();
for block in blocks {
for block_with_senders in blocks {
let mut transaction_indices = HashSet::new();
let mut highest_matching_index = 0;
for (tx_idx, tx) in block.body.iter().enumerate() {
let from = tx.recover_signer().ok_or(BlockError::InvalidSignature)?;

let block_number = block_with_senders.number;
for (tx_idx, tx) in block_with_senders.into_transactions_ecrecovered().enumerate() {
let from = tx.signer();
let to = tx.to();
if matcher.matches(from, to) {
let idx = tx_idx as u64;
Expand All @@ -274,7 +276,7 @@ where
}
}
if !transaction_indices.is_empty() {
target_blocks.push((block.number, transaction_indices, highest_matching_index));
target_blocks.push((block_number, transaction_indices, highest_matching_index));
}
}

Expand Down
7 changes: 7 additions & 0 deletions crates/storage/provider/src/providers/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,13 @@ impl<DB: Database> BlockReader for ProviderFactory<DB> {
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
self.provider()?.block_range(range)
}

fn block_range_with_senders(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<BlockWithSenders>> {
self.provider()?.block_range_with_senders(range)
}
}

impl<DB: Database> TransactionsProvider for ProviderFactory<DB> {
Expand Down
125 changes: 125 additions & 0 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1413,6 +1413,131 @@ impl<TX: DbTx> BlockReader for DatabaseProvider<TX> {
}
Ok(blocks)
}

fn block_range_with_senders(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ptal @joshieDo

&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<BlockWithSenders>> {
if range.is_empty() {
return Ok(Vec::new())
}

let len = range.end().saturating_sub(*range.start()) as usize;
let mut blocks = Vec::with_capacity(len);

let mut headers_cursor = self.tx.cursor_read::<tables::Headers>()?;
let mut ommers_cursor = self.tx.cursor_read::<tables::BlockOmmers>()?;
let mut withdrawals_cursor = self.tx.cursor_read::<tables::BlockWithdrawals>()?;
let mut block_body_cursor = self.tx.cursor_read::<tables::BlockBodyIndices>()?;
let mut tx_cursor = self.tx.cursor_read::<tables::Transactions>()?;
let mut senders_cursor = self.tx.cursor_read::<tables::TxSenders>()?;
for num in range {
if let Some((_, header)) = headers_cursor.seek_exact(num)? {
// If the body indices are not found, this means that the transactions either do
// not exist in the database yet, or they do exit but are
// not indexed. If they exist but are not indexed, we don't
// have enough information to return the block anyways, so
// we skip the block.
if let Some((_, block_body_indices)) = block_body_cursor.seek_exact(num)? {
let tx_range = block_body_indices.tx_num_range();
let body = if tx_range.is_empty() {
Vec::new()
} else {
tx_cursor
.walk_range(tx_range.clone())?
.map(|result| result.map(|(tx_number, tx)| (tx_number, tx.with_hash())))
.collect::<Result<Vec<_>, _>>()?
};

// If we are past shanghai, then all blocks should have a withdrawal list,
// even if empty
let withdrawals =
if self.chain_spec.is_shanghai_active_at_timestamp(header.timestamp) {
Some(
withdrawals_cursor
.seek_exact(num)?
.map(|(_, w)| w.withdrawals)
.unwrap_or_default(),
)
} else {
None
};
let ommers = if self.chain_spec.final_paris_total_difficulty(num).is_some() {
Vec::new()
} else {
ommers_cursor.seek_exact(num)?.map(|(_, o)| o.ommers).unwrap_or_default()
};

// Walking range to get the TxNumber along with the Address.
let mut senders =
senders_cursor.walk_range(tx_range)?.collect::<Result<Vec<_>, _>>()?;

// This approach is similar to what is done in
// `get_take_block_transaction_range`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use the existing function here?

Copy link
Contributor Author

@yash-atreya yash-atreya Jan 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. block_range can be used here, but we still need the body_indices cursor to get the TxNumber's in order to get the tx_range to walk using the senders_cursor and also populate any missing senders after recovering them.
  2. get_take_block_transaction_range cannot be used as it will require changing the trait bounds of the BlockReader implementation to add DbTxMut.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moreover, since this is a read-only method and TAKE would always be false in get_take_block_transaction_range. I don't think it makes sense to use it.

if senders.len() != body.len() {
let missing_len = body.len() - senders.len();
senders.reserve(missing_len);

let mut missing_senders = Vec::with_capacity(missing_len);

// Recover the missing senders by tracking the index of body.
{
let mut senders = senders.iter().peekable();

for (i, (tx_number, tx)) in body.iter().enumerate() {
if let Some((sender_tx_number, _)) = senders.peek() {
if sender_tx_number == tx_number {
// If current sender's `TxNumber` matches current
// transaction's
// `TxNumber`, advance the senders iterator.
senders.next();
} else {
// If current sender's `TxNumber` doesn't match current
// transaction's
// `TxNumber`, add it to missing senders.
missing_senders.push((i, tx_number, tx));
}
} else {
// If there's no more senders left, but we're still iterating
// over transactions, add
// them to missing senders
missing_senders.push((i, tx_number, tx));
}
}
}

// Recover the missing senders
let recovered_senders = TransactionSigned::recover_signers(
missing_senders.iter().map(|(_, _, tx)| *tx).collect::<Vec<_>>(),
missing_senders.len(),
)
.ok_or(ProviderError::SenderRecoveryError)?;

// Insert the recovered senders into the senders list
for ((i, tx_number, _), sender) in
missing_senders.into_iter().zip(recovered_senders)
{
// Insert will put recovered senders at necessary positions and shift
// the rest
senders.insert(i, (*tx_number, sender));
}
}
blocks.push(
Block {
header,
body: body.into_iter().map(|(_, tx)| tx).collect(),
ommers,
withdrawals,
}
.with_senders_unchecked(
senders.into_iter().map(|(_, sender)| sender).collect(),
),
)
}
}
}
Ok(blocks)
}
}

impl<TX: DbTx> TransactionsProviderExt for DatabaseProvider<TX> {
Expand Down
7 changes: 7 additions & 0 deletions crates/storage/provider/src/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,13 @@ where
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
self.database.provider()?.block_range(range)
}

fn block_range_with_senders(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<BlockWithSenders>> {
self.database.provider()?.block_range_with_senders(range)
}
}

impl<DB, Tree> TransactionsProvider for BlockchainProvider<DB, Tree>
Expand Down
8 changes: 8 additions & 0 deletions crates/storage/provider/src/providers/snapshot/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,14 @@ impl BlockReader for SnapshotProvider {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}

fn block_range_with_senders(
&self,
_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<BlockWithSenders>> {
// Required data not present in snapshots
Err(ProviderError::UnsupportedProvider)
}
}

impl WithdrawalsProvider for SnapshotProvider {
Expand Down
17 changes: 17 additions & 0 deletions crates/storage/provider/src/test_utils/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,23 @@ impl BlockReader for MockEthProvider {

Ok(blocks)
}

fn block_range_with_senders(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<BlockWithSenders>> {
let lock = self.blocks.lock();

let mut blocks: Vec<_> = lock
.values()
.filter(|block| range.contains(&block.number))
.map(|block| BlockWithSenders { block: block.clone(), senders: vec![] })
.collect();

blocks.sort_by_key(|block| block.block.number);

Ok(blocks)
}
}

impl BlockReaderIdExt for MockEthProvider {
Expand Down
7 changes: 7 additions & 0 deletions crates/storage/provider/src/test_utils/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ impl BlockReader for NoopProvider {
fn block_range(&self, _range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>> {
Ok(vec![])
}

fn block_range_with_senders(
&self,
_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<reth_primitives::BlockWithSenders>> {
Ok(vec![])
}
}

impl BlockReaderIdExt for NoopProvider {
Expand Down
8 changes: 8 additions & 0 deletions crates/storage/provider/src/traits/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ pub trait BlockReader:
///
/// Note: returns only available blocks
fn block_range(&self, range: RangeInclusive<BlockNumber>) -> ProviderResult<Vec<Block>>;

/// Returns all blocks with senders in the given inclusive range.
///
/// Note: returns only available blocks
fn block_range_with_senders(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Vec<BlockWithSenders>>;
}

/// Trait extension for `BlockReader`, for types that implement `BlockId` conversion.
Expand Down
Loading