Skip to content

Commit

Permalink
perf(engine): parallel storage roots (#10666)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk authored Sep 12, 2024
1 parent b85a4e8 commit acdb7b7
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 35 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions crates/blockchain-tree/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,16 @@ impl AppendableChain {
provider.block_execution_data_provider.execution_outcome().clone();
execution_outcome.extend(initial_execution_outcome.clone());
let hashed_state = execution_outcome.hash_state_slow();
ParallelStateRoot::new(consistent_view, hashed_state)
.incremental_root_with_updates()
.map(|(root, updates)| (root, Some(updates)))
.map_err(ProviderError::from)?
let prefix_sets = hashed_state.construct_prefix_sets().freeze();
ParallelStateRoot::new(
consistent_view,
Default::default(),
hashed_state,
prefix_sets,
)
.incremental_root_with_updates()
.map(|(root, updates)| (root, Some(updates)))
.map_err(ProviderError::from)?
} else {
let hashed_state =
HashedPostState::from_bundle_state(&initial_execution_outcome.state().state);
Expand Down
3 changes: 2 additions & 1 deletion crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ reth-revm.workspace = true
reth-rpc-types.workspace = true
reth-stages-api.workspace = true
reth-tasks.workspace = true
reth-trie.workspace = true
reth-node-types.workspace = true
reth-trie.workspace = true
reth-trie-parallel.workspace = true

# common
futures.workspace = true
Expand Down
80 changes: 74 additions & 6 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ use reth_primitives::{
SealedHeader, B256, U256,
};
use reth_provider::{
BlockReader, ExecutionOutcome, ProviderError, StateProviderBox, StateProviderFactory,
StateReader, StateRootProvider, TransactionVariant,
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
ProviderError, StateProviderBox, StateProviderFactory, StateReader, StateRootProvider,
TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_types::{
Expand All @@ -39,7 +40,8 @@ use reth_rpc_types::{
ExecutionPayload,
};
use reth_stages_api::ControlFlow;
use reth_trie::{updates::TrieUpdates, HashedPostState};
use reth_trie::{prefix_set::TriePrefixSetsMut, updates::TrieUpdates, HashedPostState};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use std::{
cmp::Ordering,
collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque},
Expand Down Expand Up @@ -518,7 +520,8 @@ impl<P: Debug, E: Debug, T: EngineTypes + Debug> std::fmt::Debug for EngineApiTr

impl<P, E, T> EngineApiTreeHandler<P, E, T>
where
P: BlockReader + StateProviderFactory + StateReader + Clone + 'static,
P: DatabaseProviderFactory + BlockReader + StateProviderFactory + StateReader + Clone + 'static,
<P as DatabaseProviderFactory>::Provider: BlockReader,
E: BlockExecutorProvider,
T: EngineTypes,
{
Expand Down Expand Up @@ -2167,8 +2170,34 @@ where
let hashed_state = HashedPostState::from_bundle_state(&output.state.state);

let root_time = Instant::now();
let (state_root, trie_output) =
state_provider.state_root_with_updates(hashed_state.clone())?;
let mut state_root_result = None;

// We attempt to compute state root in parallel if we are currently not persisting anything
// to database. This is safe, because the database state cannot change until we
// finish parallel computation. It is important that nothing is being persisted as
// we are computing in parallel, because we initialize a different database transaction
// per thread and it might end up with a different view of the database.
let persistence_in_progress = self.persistence_state.in_progress();
if !persistence_in_progress {
state_root_result = match self
.compute_state_root_in_parallel(block.parent_hash, &hashed_state)
{
Ok((state_root, trie_output)) => Some((state_root, trie_output)),
Err(ProviderError::ConsistentView(error)) => {
debug!(target: "engine", %error, "Parallel state root computation failed consistency check, falling back");
None
}
Err(error) => return Err(error.into()),
};
}

let (state_root, trie_output) = if let Some(result) = state_root_result {
result
} else {
debug!(target: "engine", persistence_in_progress, "Failed to compute state root in parallel");
state_provider.state_root_with_updates(hashed_state.clone())?
};

if state_root != block.state_root {
// call post-block hook
self.invalid_block_hook.on_invalid_block(
Expand Down Expand Up @@ -2220,6 +2249,45 @@ where
Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid))
}

/// Compute state root for the given hashed post state in parallel.
///
/// # Returns
///
/// Returns `Ok(_)` if computed successfully.
/// Returns `Err(_)` if error was encountered during computation.
/// `Err(ProviderError::ConsistentView(_))` can be safely ignored and fallback computation
/// should be used instead.
fn compute_state_root_in_parallel(
&self,
parent_hash: B256,
hashed_state: &HashedPostState,
) -> ProviderResult<(B256, TrieUpdates)> {
let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?;
let mut trie_nodes = TrieUpdates::default();
let mut state = HashedPostState::default();
let mut prefix_sets = TriePrefixSetsMut::default();

if let Some((historical, blocks)) = self.state.tree_state.blocks_by_hash(parent_hash) {
// Retrieve revert state for historical block.
let revert_state = consistent_view.revert_state(historical)?;
prefix_sets.extend(revert_state.construct_prefix_sets());
state.extend(revert_state);

// Extend with contents of parent in-memory blocks.
for block in blocks.iter().rev() {
trie_nodes.extend_ref(block.trie.as_ref());
state.extend_ref(block.hashed_state.as_ref());
}
}

// Extend with block we are validating root for.
prefix_sets.extend(hashed_state.construct_prefix_sets());
state.extend_ref(hashed_state);

Ok(ParallelStateRoot::new(consistent_view, trie_nodes, state, prefix_sets.freeze())
.incremental_root_with_updates()?)
}

/// Handles an error that occurred while inserting a block.
///
/// If this is a validation error this will mark the block as invalid.
Expand Down
20 changes: 19 additions & 1 deletion crates/storage/provider/src/providers/consistent_view.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use crate::{BlockNumReader, DatabaseProviderFactory, HeaderProvider};
use reth_errors::ProviderError;
use reth_primitives::{GotExpected, B256};
use reth_storage_api::BlockReader;
use reth_storage_api::{BlockReader, DBProvider};
use reth_storage_errors::provider::ProviderResult;
use reth_trie::HashedPostState;
use reth_trie_db::DatabaseHashedPostState;

pub use reth_storage_errors::provider::ConsistentViewError;

Expand Down Expand Up @@ -43,6 +46,21 @@ where
Ok(Self::new(provider, tip))
}

/// Retrieve revert hashed state down to the given block hash.
pub fn revert_state(&self, block_hash: B256) -> ProviderResult<HashedPostState> {
let provider = self.provider_ro()?;
let block_number = provider
.block_number(block_hash)?
.ok_or(ProviderError::BlockHashNotFound(block_hash))?;
if block_number == provider.best_block_number()? &&
block_number == provider.last_block_number()?
{
Ok(HashedPostState::default())
} else {
Ok(HashedPostState::from_reverts(provider.tx_ref(), block_number + 1)?)
}
}

/// Creates new read-only provider and performs consistency checks on the current tip.
pub fn provider_ro(&self) -> ProviderResult<Factory::Provider> {
// Create a new provider.
Expand Down
26 changes: 19 additions & 7 deletions crates/storage/provider/src/test_utils/mock.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
use crate::{
traits::{BlockSource, ReceiptProvider},
AccountReader, BlockExecutionReader, BlockHashReader, BlockIdReader, BlockNumReader,
BlockReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader, EvmEnvProvider,
HeaderProvider, ReceiptProviderIdExt, RequestsProvider, StateProvider, StateProviderBox,
StateProviderFactory, StateReader, StateRootProvider, TransactionVariant, TransactionsProvider,
WithdrawalsProvider,
BlockReader, BlockReaderIdExt, ChainSpecProvider, ChangeSetReader, DatabaseProvider,
EvmEnvProvider, HeaderProvider, ReceiptProviderIdExt, RequestsProvider, StateProvider,
StateProviderBox, StateProviderFactory, StateReader, StateRootProvider, TransactionVariant,
TransactionsProvider, WithdrawalsProvider,
};
use parking_lot::Mutex;
use reth_chainspec::{ChainInfo, ChainSpec};
use reth_db::mock::{DatabaseMock, TxMock};
use reth_db_api::models::{AccountBeforeTx, StoredBlockBodyIndices};
use reth_evm::ConfigureEvmEnv;
use reth_execution_types::{Chain, ExecutionOutcome};
use reth_primitives::{
keccak256, Account, Address, Block, BlockHash, BlockHashOrNumber, BlockId, BlockNumber,
BlockNumberOrTag, BlockWithSenders, Bytecode, Bytes, Header, Receipt, SealedBlock,
BlockNumberOrTag, BlockWithSenders, Bytecode, Bytes, GotExpected, Header, Receipt, SealedBlock,
SealedBlockWithSenders, SealedHeader, StorageKey, StorageValue, TransactionMeta,
TransactionSigned, TransactionSignedNoHash, TxHash, TxNumber, Withdrawal, Withdrawals, B256,
U256,
};
use reth_stages_types::{StageCheckpoint, StageId};
use reth_storage_api::{StageCheckpointReader, StateProofProvider, StorageRootProvider};
use reth_storage_errors::provider::{ProviderError, ProviderResult};
use reth_storage_api::{
DatabaseProviderFactory, StageCheckpointReader, StateProofProvider, StorageRootProvider,
};
use reth_storage_errors::provider::{ConsistentViewError, ProviderError, ProviderResult};
use reth_trie::{
prefix_set::TriePrefixSetsMut, updates::TrieUpdates, AccountProof, HashedPostState,
HashedStorage,
Expand Down Expand Up @@ -141,6 +144,15 @@ impl MockEthProvider {
}
}

impl DatabaseProviderFactory for MockEthProvider {
type DB = DatabaseMock;
type Provider = DatabaseProvider<TxMock>;

fn database_provider_ro(&self) -> ProviderResult<Self::Provider> {
Err(ConsistentViewError::Syncing { best_block: GotExpected::new(0, 0) }.into())
}
}

impl HeaderProvider for MockEthProvider {
fn header(&self, block_hash: &BlockHash) -> ProviderResult<Option<Header>> {
let lock = self.headers.lock();
Expand Down
9 changes: 8 additions & 1 deletion crates/trie/parallel/benches/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ pub fn calculate_state_root(c: &mut Criterion) {
// parallel root
group.bench_function(BenchmarkId::new("parallel root", size), |b| {
b.to_async(&runtime).iter_with_setup(
|| ParallelStateRoot::new(view.clone(), updated_state.clone()),
|| {
ParallelStateRoot::new(
view.clone(),
Default::default(),
updated_state.clone(),
updated_state.construct_prefix_sets().freeze(),
)
},
|calculator| async { calculator.incremental_root() },
);
});
Expand Down
57 changes: 43 additions & 14 deletions crates/trie/parallel/src/parallel_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use reth_provider::{
use reth_trie::{
hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory},
node_iter::{TrieElement, TrieNodeIter},
trie_cursor::TrieCursorFactory,
prefix_set::TriePrefixSets,
trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory},
updates::TrieUpdates,
walker::TrieWalker,
HashBuilder, HashedPostState, Nibbles, StorageRoot, TrieAccount,
Expand All @@ -37,19 +38,30 @@ use tracing::*;
pub struct ParallelStateRoot<Factory> {
/// Consistent view of the database.
view: ConsistentDbView<Factory>,
/// Cached trie nodes.
trie_nodes: TrieUpdates,
/// Changed hashed state.
hashed_state: HashedPostState,
/// A set of prefix sets that have changed.
prefix_sets: TriePrefixSets,
/// Parallel state root metrics.
#[cfg(feature = "metrics")]
metrics: ParallelStateRootMetrics,
}

impl<Factory> ParallelStateRoot<Factory> {
/// Create new parallel state root calculator.
pub fn new(view: ConsistentDbView<Factory>, hashed_state: HashedPostState) -> Self {
pub fn new(
view: ConsistentDbView<Factory>,
trie_nodes: TrieUpdates,
hashed_state: HashedPostState,
prefix_sets: TriePrefixSets,
) -> Self {
Self {
view,
trie_nodes,
hashed_state,
prefix_sets,
#[cfg(feature = "metrics")]
metrics: ParallelStateRootMetrics::default(),
}
Expand Down Expand Up @@ -77,12 +89,15 @@ where
retain_updates: bool,
) -> Result<(B256, TrieUpdates), ParallelStateRootError> {
let mut tracker = ParallelTrieTracker::default();
let prefix_sets = self.hashed_state.construct_prefix_sets().freeze();
let trie_nodes_sorted = self.trie_nodes.into_sorted();
let hashed_state_sorted = self.hashed_state.into_sorted();
let storage_root_targets = StorageRootTargets::new(
self.hashed_state.accounts.keys().copied(),
prefix_sets.storage_prefix_sets,
self.prefix_sets
.account_prefix_set
.iter()
.map(|nibbles| B256::from_slice(&nibbles.pack())),
self.prefix_sets.storage_prefix_sets,
);
let hashed_state_sorted = self.hashed_state.into_sorted();

// Pre-calculate storage roots in parallel for accounts which were changed.
tracker.set_precomputed_storage_roots(storage_root_targets.len() as u64);
Expand All @@ -91,7 +106,10 @@ where
.into_par_iter()
.map(|(hashed_address, prefix_set)| {
let provider_ro = self.view.provider_ro()?;
let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_ro.tx_ref());
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
&trie_nodes_sorted,
);
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
&hashed_state_sorted,
Expand All @@ -113,15 +131,18 @@ where
let mut trie_updates = TrieUpdates::default();

let provider_ro = self.view.provider_ro()?;
let trie_cursor_factory = InMemoryTrieCursorFactory::new(
DatabaseTrieCursorFactory::new(provider_ro.tx_ref()),
&trie_nodes_sorted,
);
let hashed_cursor_factory = HashedPostStateCursorFactory::new(
DatabaseHashedCursorFactory::new(provider_ro.tx_ref()),
&hashed_state_sorted,
);
let trie_cursor_factory = DatabaseTrieCursorFactory::new(provider_ro.tx_ref());

let walker = TrieWalker::new(
trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?,
prefix_sets.account_prefix_set,
self.prefix_sets.account_prefix_set,
)
.with_deletions_retained(retain_updates);
let mut account_node_iter = TrieNodeIter::new(
Expand Down Expand Up @@ -171,7 +192,7 @@ where
trie_updates.finalize(
account_node_iter.walker,
hash_builder,
prefix_sets.destroyed_accounts,
self.prefix_sets.destroyed_accounts,
);

let stats = tracker.finish();
Expand Down Expand Up @@ -270,9 +291,14 @@ mod tests {
}

assert_eq!(
ParallelStateRoot::new(consistent_view.clone(), HashedPostState::default())
.incremental_root()
.unwrap(),
ParallelStateRoot::new(
consistent_view.clone(),
Default::default(),
HashedPostState::default(),
Default::default()
)
.incremental_root()
.unwrap(),
test_utils::state_root(state.clone())
);

Expand Down Expand Up @@ -301,8 +327,11 @@ mod tests {
}
}

let prefix_sets = hashed_state.construct_prefix_sets().freeze();
assert_eq!(
ParallelStateRoot::new(consistent_view, hashed_state).incremental_root().unwrap(),
ParallelStateRoot::new(consistent_view, Default::default(), hashed_state, prefix_sets)
.incremental_root()
.unwrap(),
test_utils::state_root(state)
);
}
Expand Down
Loading

0 comments on commit acdb7b7

Please sign in to comment.