Skip to content

Commit

Permalink
fix(tree): make state methods work for historical blocks (#11265)
Browse files Browse the repository at this point in the history
Co-authored-by: Roman Krasiuk <rokrassyuk@gmail.com>
Co-authored-by: Federico Gimenez <fgimenez@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 7, 2024
1 parent 7842673 commit 4b12f32
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 9 deletions.
239 changes: 231 additions & 8 deletions crates/storage/provider/src/providers/blockchain_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,27 @@ use reth_chain_state::{
BlockState, CanonicalInMemoryState, ForkChoiceNotifications, ForkChoiceSubscriptions,
MemoryOverlayStateProvider,
};
use reth_chainspec::ChainInfo;
use reth_chainspec::{ChainInfo, EthereumHardforks};
use reth_db::models::BlockNumberAddress;
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_evm::ConfigureEvmEnv;
use reth_execution_types::ExecutionOutcome;
use reth_execution_types::{BundleStateInit, ExecutionOutcome, RevertsInit};
use reth_node_types::NodeTypesWithDB;
use reth_primitives::{
Account, Block, BlockWithSenders, EthereumHardforks, Header, Receipt, SealedBlock,
SealedBlockWithSenders, SealedHeader, TransactionMeta, TransactionSigned,
TransactionSignedNoHash, Withdrawal, Withdrawals,
Account, Block, BlockWithSenders, Header, Receipt, SealedBlock, SealedBlockWithSenders,
SealedHeader, StorageEntry, TransactionMeta, TransactionSigned, TransactionSignedNoHash,
Withdrawal, Withdrawals,
};
use reth_prune_types::{PruneCheckpoint, PruneSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::StorageChangeSetReader;
use reth_storage_errors::provider::ProviderResult;
use revm::primitives::{BlockEnv, CfgEnvWithHandlerCfg};
use revm::{
db::states::PlainStorageRevert,
primitives::{BlockEnv, CfgEnvWithHandlerCfg},
};
use std::{
collections::{hash_map, HashMap},
ops::{Add, Bound, RangeBounds, RangeInclusive, Sub},
sync::Arc,
time::Instant,
Expand Down Expand Up @@ -122,6 +128,145 @@ impl<N: ProviderNodeTypes> BlockchainProvider2<N> {
(start, end)
}

/// Return the last N blocks of state, recreating the [`ExecutionOutcome`].
///
/// If the range is empty, or there are no blocks for the given range, then this returns `None`.
pub fn get_state(
&self,
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<Option<ExecutionOutcome>> {
if range.is_empty() {
return Ok(None)
}
let start_block_number = *range.start();
let end_block_number = *range.end();

// We are not removing block meta as it is used to get block changesets.
let mut block_bodies = Vec::new();
for block_num in range.clone() {
let block_body = self
.block_body_indices(block_num)?
.ok_or(ProviderError::BlockBodyIndicesNotFound(block_num))?;
block_bodies.push((block_num, block_body))
}

// get transaction receipts
let Some(from_transaction_num) = block_bodies.first().map(|body| body.1.first_tx_num())
else {
return Ok(None)
};
let Some(to_transaction_num) = block_bodies.last().map(|body| body.1.last_tx_num()) else {
return Ok(None)
};

let mut account_changeset = Vec::new();
for block_num in range.clone() {
let changeset =
self.account_block_changeset(block_num)?.into_iter().map(|elem| (block_num, elem));
account_changeset.extend(changeset);
}

let mut storage_changeset = Vec::new();
for block_num in range {
let changeset = self.storage_changeset(block_num)?;
storage_changeset.extend(changeset);
}

let (state, reverts) =
self.populate_bundle_state(account_changeset, storage_changeset, end_block_number)?;

let mut receipt_iter =
self.receipts_by_tx_range(from_transaction_num..=to_transaction_num)?.into_iter();

let mut receipts = Vec::with_capacity(block_bodies.len());
// loop break if we are at the end of the blocks.
for (_, block_body) in block_bodies {
let mut block_receipts = Vec::with_capacity(block_body.tx_count as usize);
for tx_num in block_body.tx_num_range() {
let receipt =
receipt_iter.next().ok_or(ProviderError::ReceiptNotFound(tx_num.into()))?;
block_receipts.push(Some(receipt));
}
receipts.push(block_receipts);
}

Ok(Some(ExecutionOutcome::new_init(
state,
reverts,
// We skip new contracts since we never delete them from the database
Vec::new(),
receipts.into(),
start_block_number,
Vec::new(),
)))
}

/// Populate a [`BundleStateInit`] and [`RevertsInit`] using cursors over the
/// [`reth_db::PlainAccountState`] and [`reth_db::PlainStorageState`] tables, based on the given
/// storage and account changesets.
fn populate_bundle_state(
&self,
account_changeset: Vec<(u64, AccountBeforeTx)>,
storage_changeset: Vec<(BlockNumberAddress, StorageEntry)>,
block_range_end: BlockNumber,
) -> ProviderResult<(BundleStateInit, RevertsInit)> {
let mut state: BundleStateInit = HashMap::new();
let mut reverts: RevertsInit = HashMap::new();
let state_provider = self.state_by_block_number_or_tag(block_range_end.into())?;

// add account changeset changes
for (block_number, account_before) in account_changeset.into_iter().rev() {
let AccountBeforeTx { info: old_info, address } = account_before;
match state.entry(address) {
hash_map::Entry::Vacant(entry) => {
let new_info = state_provider.basic_account(address)?;
entry.insert((old_info, new_info, HashMap::new()));
}
hash_map::Entry::Occupied(mut entry) => {
// overwrite old account state.
entry.get_mut().0 = old_info;
}
}
// insert old info into reverts.
reverts.entry(block_number).or_default().entry(address).or_default().0 = Some(old_info);
}

// add storage changeset changes
for (block_and_address, old_storage) in storage_changeset.into_iter().rev() {
let BlockNumberAddress((block_number, address)) = block_and_address;
// get account state or insert from plain state.
let account_state = match state.entry(address) {
hash_map::Entry::Vacant(entry) => {
let present_info = state_provider.basic_account(address)?;
entry.insert((present_info, present_info, HashMap::new()))
}
hash_map::Entry::Occupied(entry) => entry.into_mut(),
};

// match storage.
match account_state.2.entry(old_storage.key) {
hash_map::Entry::Vacant(entry) => {
let new_storage_value =
state_provider.storage(address, old_storage.key)?.unwrap_or_default();
entry.insert((old_storage.value, new_storage_value));
}
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().0 = old_storage.value;
}
};

reverts
.entry(block_number)
.or_default()
.entry(address)
.or_default()
.1
.push(old_storage);
}

Ok((state, reverts))
}

/// Fetches a range of data from both in-memory state and persistent storage while a predicate
/// is met.
///
Expand Down Expand Up @@ -1426,6 +1571,57 @@ impl<N: NodeTypesWithDB> ForkChoiceSubscriptions for BlockchainProvider2<N> {
}
}

impl<N: ProviderNodeTypes> StorageChangeSetReader for BlockchainProvider2<N> {
fn storage_changeset(
&self,
block_number: BlockNumber,
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
if let Some(state) = self.canonical_in_memory_state.state_by_number(block_number) {
let changesets = state
.block()
.execution_output
.bundle
.reverts
.clone()
.into_plain_state_reverts()
.storage
.into_iter()
.flatten()
.flat_map(|revert: PlainStorageRevert| {
revert.storage_revert.into_iter().map(move |(key, value)| {
(
BlockNumberAddress((block_number, revert.address)),
StorageEntry { key: key.into(), value: value.to_previous_value() },
)
})
})
.collect();
Ok(changesets)
} else {
// Perform checks on whether or not changesets exist for the block.
let provider = self.database.provider()?;

// No prune checkpoint means history should exist and we should `unwrap_or(true)`
let storage_history_exists = provider
.get_prune_checkpoint(PruneSegment::StorageHistory)?
.and_then(|checkpoint| {
// return true if the block number is ahead of the prune checkpoint.
//
// The checkpoint stores the highest pruned block number, so we should make
// sure the block_number is strictly greater.
checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
})
.unwrap_or(true);

if !storage_history_exists {
return Err(ProviderError::StateAtBlockPruned(block_number))
}

provider.storage_changeset(block_number)
}
}
}

impl<N: ProviderNodeTypes> ChangeSetReader for BlockchainProvider2<N> {
fn account_block_changeset(
&self,
Expand All @@ -1446,7 +1642,25 @@ impl<N: ProviderNodeTypes> ChangeSetReader for BlockchainProvider2<N> {
.collect();
Ok(changesets)
} else {
self.database.provider()?.account_block_changeset(block_number)
// Perform checks on whether or not changesets exist for the block.
let provider = self.database.provider()?;
// No prune checkpoint means history should exist and we should `unwrap_or(true)`
let account_history_exists = provider
.get_prune_checkpoint(PruneSegment::AccountHistory)?
.and_then(|checkpoint| {
// return true if the block number is ahead of the prune checkpoint.
//
// The checkpoint stores the highest pruned block number, so we should make
// sure the block_number is strictly greater.
checkpoint.block_number.map(|checkpoint| block_number > checkpoint)
})
.unwrap_or(true);

if !account_history_exists {
return Err(ProviderError::StateAtBlockPruned(block_number))
}

provider.account_block_changeset(block_number)
}
}
}
Expand All @@ -1461,12 +1675,21 @@ impl<N: ProviderNodeTypes> AccountReader for BlockchainProvider2<N> {
}

impl<N: ProviderNodeTypes> StateReader for BlockchainProvider2<N> {
/// Re-constructs the [`ExecutionOutcome`] from in-memory and database state, if necessary.
///
/// If data for the block does not exist, this will return [`None`].
///
/// NOTE: This cannot be called safely in a loop outside of the blockchain tree thread. This is
/// because the [`CanonicalInMemoryState`] could change during a reorg, causing results to be
/// inconsistent. Currently this can safely be called within the blockchain tree thread,
/// because the tree thread is responsible for modifying the [`CanonicalInMemoryState`] in the
/// first place.
fn get_state(&self, block: BlockNumber) -> ProviderResult<Option<ExecutionOutcome>> {
if let Some(state) = self.canonical_in_memory_state.state_by_number(block) {
let state = state.block_ref().execution_outcome().clone();
Ok(Some(state))
} else {
self.database.provider()?.get_state(block..=block)
self.get_state(block..=block)
}
}
}
Expand Down
17 changes: 16 additions & 1 deletion crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use reth_primitives::{
};
use reth_prune_types::{PruneCheckpoint, PruneModes, PruneSegment};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::TryIntoHistoricalStateProvider;
use reth_storage_api::{StorageChangeSetReader, TryIntoHistoricalStateProvider};
use reth_storage_errors::provider::{ProviderResult, RootMismatch};
use reth_trie::{
prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSets},
Expand Down Expand Up @@ -1414,6 +1414,21 @@ impl<TX: DbTx, Spec: Send + Sync> AccountExtReader for DatabaseProvider<TX, Spec
}
}

impl<TX: DbTx, Spec: Send + Sync> StorageChangeSetReader for DatabaseProvider<TX, Spec> {
fn storage_changeset(
&self,
block_number: BlockNumber,
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>> {
let range = block_number..=block_number;
let storage_range = BlockNumberAddress::range(range);
self.tx
.cursor_dup_read::<tables::StorageChangeSets>()?
.walk_range(storage_range)?
.map(|result| -> ProviderResult<_> { Ok(result?) })
.collect()
}
}

impl<TX: DbTx, Spec: Send + Sync> ChangeSetReader for DatabaseProvider<TX, Spec> {
fn account_block_changeset(
&self,
Expand Down
11 changes: 11 additions & 0 deletions crates/storage/storage-api/src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use alloy_primitives::{Address, BlockNumber, B256};
use reth_db_api::models::BlockNumberAddress;
use reth_primitives::StorageEntry;
use reth_storage_errors::provider::ProviderResult;
use std::{
Expand Down Expand Up @@ -30,3 +31,13 @@ pub trait StorageReader: Send + Sync {
range: RangeInclusive<BlockNumber>,
) -> ProviderResult<BTreeMap<(Address, B256), Vec<u64>>>;
}

/// Storage ChangeSet reader
#[auto_impl::auto_impl(&, Arc, Box)]
pub trait StorageChangeSetReader: Send + Sync {
/// Iterate over storage changesets and return the storage state from before this block.
fn storage_changeset(
&self,
block_number: BlockNumber,
) -> ProviderResult<Vec<(BlockNumberAddress, StorageEntry)>>;
}

0 comments on commit 4b12f32

Please sign in to comment.