Skip to content

Commit

Permalink
feat: add get_in_memory_or_storage_by_tx_range (#11414)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
  • Loading branch information
joshieDo and mattsse authored Oct 2, 2024
1 parent c4ce997 commit 5ec448e
Show file tree
Hide file tree
Showing 2 changed files with 158 additions and 18 deletions.
5 changes: 5 additions & 0 deletions crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,11 @@ impl BlockState {
self.block.clone()
}

/// Returns a reference to the executed block that determines the state.
pub const fn block_ref(&self) -> &ExecutedBlock {
&self.block
}

/// Returns the block with senders for the state.
pub fn block_with_senders(&self) -> BlockWithSenders {
let block = self.block.block().clone();
Expand Down
171 changes: 153 additions & 18 deletions crates/storage/provider/src/providers/blockchain_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,10 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
.unwrap_or_else(|| db_provider.last_block_number().unwrap_or_default())
});

if start > end {
return Ok(vec![])
}

// Split range into storage_range and in-memory range. If the in-memory range is not
// necessary drop it early.
//
Expand Down Expand Up @@ -241,6 +245,103 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
Ok(self.canonical_in_memory_state.state_provider_from_state(state, latest_historical))
}

/// Fetches data from either in-memory state or persistent storage for a range of transactions.
///
/// * `fetch_from_db`: has a [`DatabaseProviderRO`] and the storage specific range.
/// * `fetch_from_block_state`: has a [`RangeInclusive`] of elements that should be fetched from
/// [`BlockState`]. [`RangeInclusive`] is necessary to handle partial look-ups of a block.
fn get_in_memory_or_storage_by_tx_range<S, M, R>(
&self,
range: impl RangeBounds<BlockNumber>,
fetch_from_db: S,
fetch_from_block_state: M,
) -> ProviderResult<Vec<R>>
where
S: FnOnce(
DatabaseProviderRO<N::DB, N::ChainSpec>,
RangeInclusive<TxNumber>,
) -> ProviderResult<Vec<R>>,
M: Fn(RangeInclusive<usize>, Arc<BlockState>) -> ProviderResult<Vec<R>>,
{
let in_mem_chain = self.canonical_in_memory_state.canonical_chain().collect::<Vec<_>>();
let provider = self.database.provider()?;

// Get the last block number stored in the storage which does NOT overlap with in-memory
// chain.
let mut last_database_block_number = provider.last_block_number()?;
if let Some(lowest_in_mem_block) = in_mem_chain.last() {
if lowest_in_mem_block.number() <= last_database_block_number {
last_database_block_number = lowest_in_mem_block.number().saturating_sub(1);
}
}

// Get the next tx number for the last block stored in the storage, which marks the start of
// the in-memory state.
let last_block_body_index = provider
.block_body_indices(last_database_block_number)?
.ok_or_else(|| ProviderError::BlockBodyIndicesNotFound(last_database_block_number))?;
let mut in_memory_tx_num = last_block_body_index.next_tx_num();

let (start, end) = self.convert_range_bounds(range, || {
in_mem_chain
.iter()
.map(|b| b.block_ref().block().body.transactions.len() as u64)
.sum::<u64>() +
last_block_body_index.last_tx_num()
});

if start > end {
return Ok(vec![])
}

let mut tx_range = start..=end;

// If the range is entirely before the first in-memory transaction number, fetch from
// storage
if *tx_range.end() < in_memory_tx_num {
return fetch_from_db(provider, tx_range);
}

let mut items = Vec::with_capacity((tx_range.end() - tx_range.start() + 1) as usize);

// If the range spans storage and memory, get elements from storage first.
if *tx_range.start() < in_memory_tx_num {
// Determine the range that needs to be fetched from storage.
let db_range = *tx_range.start()..=in_memory_tx_num.saturating_sub(1);

// Set the remaining transaction range for in-memory
tx_range = in_memory_tx_num..=*tx_range.end();

items.extend(fetch_from_db(provider, db_range)?);
}

// Iterate from the lowest block to the highest in-memory chain
for block_state in in_mem_chain.into_iter().rev() {
let block_tx_count = block_state.block_ref().block().body.transactions.len();
let remaining = (tx_range.end() - tx_range.start() + 1) as usize;

// This should only be more than 0 in the first iteration, in case of a partial range
let skip = (tx_range.start() - in_memory_tx_num) as usize;

items.extend(fetch_from_block_state(
skip..=(remaining.min(block_tx_count) - 1),
block_state,
)?);

in_memory_tx_num += block_tx_count as u64;

// Break if the range has been fully processed
if in_memory_tx_num > *tx_range.end() {
break
}

// Set updated range
tx_range = in_memory_tx_num..=*tx_range.end();
}

Ok(items)
}

/// Fetches data from either in-memory state or persistent storage by transaction
/// [`HashOrNumber`].
fn get_in_memory_or_storage_by_tx<S, M, R>(
Expand Down Expand Up @@ -807,14 +908,28 @@ impl<N: ProviderNodeTypes> TransactionsProvider for BlockchainProvider2<N> {
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<TransactionSignedNoHash>> {
self.database.transactions_by_tx_range(range)
self.get_in_memory_or_storage_by_tx_range(
range,
|db_provider, db_range| db_provider.transactions_by_tx_range(db_range),
|index_range, block_state| {
Ok(block_state.block_ref().block().body.transactions[index_range]
.iter()
.cloned()
.map(Into::into)
.collect())
},
)
}

fn senders_by_tx_range(
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Address>> {
self.database.senders_by_tx_range(range)
self.get_in_memory_or_storage_by_tx_range(
range,
|db_provider, db_range| db_provider.senders_by_tx_range(db_range),
|index_range, block_state| Ok(block_state.block_ref().senders[index_range].to_vec()),
)
}

fn transaction_sender(&self, id: TxNumber) -> ProviderResult<Option<Address>> {
Expand Down Expand Up @@ -872,7 +987,13 @@ impl<N: ProviderNodeTypes> ReceiptProvider for BlockchainProvider2<N> {
&self,
range: impl RangeBounds<TxNumber>,
) -> ProviderResult<Vec<Receipt>> {
self.database.receipts_by_tx_range(range)
self.get_in_memory_or_storage_by_tx_range(
range,
|db_provider, db_range| db_provider.receipts_by_tx_range(db_range),
|index_range, block_state| {
Ok(block_state.executed_block_receipts().drain(index_range).collect())
},
)
}
}

Expand Down Expand Up @@ -4058,34 +4179,48 @@ mod tests {
#[test]
fn test_senders_by_tx_range() -> eyre::Result<()> {
let mut rng = generators::rng();
let (provider, database_blocks, _, _) = provider_with_random_blocks(
let (provider, database_blocks, in_memory_blocks, _) = provider_with_random_blocks(
&mut rng,
TEST_BLOCKS_COUNT,
0,
TEST_BLOCKS_COUNT,
BlockRangeParams {
tx_count: TEST_TRANSACTIONS_COUNT..TEST_TRANSACTIONS_COUNT,
..Default::default()
},
)?;

// Define a valid transaction range within the database
let start_tx_num = 0;
let end_tx_num = 1;
let db_tx_count =
database_blocks.iter().map(|b| b.body.transactions.len()).sum::<usize>() as u64;
let in_mem_tx_count =
in_memory_blocks.iter().map(|b| b.body.transactions.len()).sum::<usize>() as u64;

// Retrieve the senders for this transaction number range
let result = provider.senders_by_tx_range(start_tx_num..=end_tx_num)?;
let db_range = 0..=(db_tx_count - 1);
let in_mem_range = db_tx_count..=(in_mem_tx_count + db_range.end());

// Ensure the sender addresses match the expected addresses in the database
assert_eq!(result.len(), 2);
// Retrieve the senders for the whole database range
let database_senders =
database_blocks.iter().flat_map(|b| b.senders().unwrap()).collect::<Vec<_>>();
assert_eq!(provider.senders_by_tx_range(db_range)?, database_senders);

// Retrieve the senders for the whole in-memory range
let in_memory_senders =
in_memory_blocks.iter().flat_map(|b| b.senders().unwrap()).collect::<Vec<_>>();
assert_eq!(provider.senders_by_tx_range(in_mem_range.clone())?, in_memory_senders);

// Retrieve the senders for a partial in-memory range
assert_eq!(
result[0],
database_blocks[0].senders().unwrap()[0],
"The sender address should match the expected sender address"
&provider.senders_by_tx_range(in_mem_range.start() + 1..=in_mem_range.end() - 1)?,
&in_memory_senders[1..in_memory_senders.len() - 1]
);

// Retrieve the senders for a range that spans database and in-memory
assert_eq!(
result[1],
database_blocks[0].senders().unwrap()[1],
"The sender address should match the expected sender address"
provider.senders_by_tx_range(in_mem_range.start() - 2..=in_mem_range.end() - 1)?,
database_senders[database_senders.len() - 2..]
.iter()
.chain(&in_memory_senders[..in_memory_senders.len() - 1])
.copied()
.collect::<Vec<_>>()
);

// Define an empty range that should return no sender addresses
Expand Down

0 comments on commit 5ec448e

Please sign in to comment.