From 808da97aa45f473e5d9ac8c7b9e104f4640230e4 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Fri, 20 Sep 2024 13:53:15 +0200 Subject: [PATCH 1/5] stream state changes --- Cargo.lock | 2 + crates/ethereum/evm/Cargo.toml | 2 + crates/ethereum/evm/src/execute.rs | 60 ++++++++++++++++++--- crates/evm/Cargo.toml | 1 + crates/evm/src/either.rs | 12 +++++ crates/evm/src/execute.rs | 16 +++++- crates/evm/src/noop.rs | 9 ++++ crates/evm/src/system_calls/eip2935.rs | 32 +++++++++++ crates/evm/src/system_calls/eip4788.rs | 73 ++++++++++++++++++++++++++ 9 files changed, 200 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e1a575bb3158..0c9a1c750884 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7274,6 +7274,7 @@ dependencies = [ "reth-storage-errors", "revm", "revm-primitives", + "tokio", ] [[package]] @@ -7294,6 +7295,7 @@ dependencies = [ "revm-primitives", "secp256k1", "serde_json", + "tokio", ] [[package]] diff --git a/crates/ethereum/evm/Cargo.toml b/crates/ethereum/evm/Cargo.toml index 25f2d9c6af78..d382ab97d851 100644 --- a/crates/ethereum/evm/Cargo.toml +++ b/crates/ethereum/evm/Cargo.toml @@ -28,6 +28,8 @@ revm-primitives.workspace = true alloy-eips.workspace = true alloy-sol-types.workspace = true +tokio = { workspace = true, features = ["sync"] } + [dev-dependencies] reth-testing-utils.workspace = true reth-revm = { workspace = true, features = ["test-utils"] } diff --git a/crates/ethereum/evm/src/execute.rs b/crates/ethereum/evm/src/execute.rs index 096b44d0e828..60193ae1321f 100644 --- a/crates/ethereum/evm/src/execute.rs +++ b/crates/ethereum/evm/src/execute.rs @@ -14,7 +14,8 @@ use reth_evm::{ BlockExecutorProvider, BlockValidationError, Executor, ProviderError, }, system_calls::{ - apply_beacon_root_contract_call, apply_blockhashes_contract_call, + apply_beacon_root_contract_call, apply_beacon_root_contract_call2, + apply_blockhashes_contract_call, apply_blockhashes_contract_call2, apply_consolidation_requests_contract_call, apply_withdrawal_requests_contract_call, }, ConfigureEvm, @@ -37,7 +38,8 @@ use revm_primitives::{ db::{Database, DatabaseCommit}, BlockEnv, CfgEnvWithHandlerCfg, EnvWithHandlerCfg, ResultAndState, }; -use std::collections::hash_map::Entry; +use std::collections::{hash_map::Entry, HashMap}; +use tokio::sync::mpsc; /// Provides executors to execute regular ethereum blocks #[derive(Debug, Clone)] @@ -122,6 +124,7 @@ struct EthEvmExecutor { chain_spec: Arc, /// How to create an EVM. evm_config: EvmConfig, + sender: Option>, } impl EthEvmExecutor @@ -148,21 +151,23 @@ where DB::Error: Into + Display, { // apply pre execution changes - apply_beacon_root_contract_call( + apply_beacon_root_contract_call2( &self.evm_config, &self.chain_spec, block.timestamp, block.number, block.parent_beacon_block_root, &mut evm, + &self.sender, )?; - apply_blockhashes_contract_call( + apply_blockhashes_contract_call2( &self.evm_config, &self.chain_spec, block.timestamp, block.number, block.parent_hash, &mut evm, + &self.sender, )?; // execute transactions @@ -191,6 +196,9 @@ where error: Box::new(new_err), } })?; + if let Some(tx) = &self.sender { + let _ = tx.send(state.clone()); + } evm.db_mut().commit(state); // append gas used @@ -212,6 +220,7 @@ where ); } + // TODO: ignored for testing as we are not in prague yet let requests = if self.chain_spec.is_prague_active_at_timestamp(block.timestamp) { // Collect all EIP-6110 deposits let deposit_requests = @@ -250,7 +259,7 @@ pub struct EthBlockExecutor { impl EthBlockExecutor { /// Creates a new Ethereum block executor. pub const fn new(chain_spec: Arc, evm_config: EvmConfig, state: State) -> Self { - Self { executor: EthEvmExecutor { chain_spec, evm_config }, state } + Self { executor: EthEvmExecutor { chain_spec, evm_config, sender: None }, state } } #[inline] @@ -347,9 +356,30 @@ where } // increment balances self.state - .increment_balances(balance_increments) + .increment_balances(balance_increments.clone()) .map_err(|_| BlockValidationError::IncrementBalanceFailed)?; + if let Some(tx) = &self.executor.sender { + let mut updated_state = HashMap::with_capacity(balance_increments.len()); + for address in balance_increments.keys() { + let cache_account = self + .state + .load_cache_account(*address) + .map_err(|_| BlockExecutionError::msg("failed to load cache account"))?; + let account = revm_primitives::Account { + status: revm_primitives::AccountStatus::Touched, + info: cache_account + .account + .as_ref() + .map(|a| a.info.clone().without_code()) + .unwrap_or_default(), + storage: HashMap::default(), // storage cannot be updated here + }; + updated_state.insert(*address, account); + } + let _ = tx.send(updated_state); + } + Ok(()) } } @@ -378,6 +408,15 @@ where Ok(BlockExecutionOutput { state: self.state.take_bundle(), receipts, requests, gas_used }) } + + fn execute_and_stream( + mut self, + input: Self::Input<'_>, + tx: mpsc::UnboundedSender, + ) -> Result { + self.executor.sender = Some(tx); + self.execute(input) + } } /// An executor that retains all cache state from execution in its bundle state. @@ -460,6 +499,15 @@ where Ok(BlockExecutionOutput { state: bundle_state, receipts, requests, gas_used }) } + + fn execute_and_stream( + mut self, + input: Self::Input<'_>, + tx: mpsc::UnboundedSender, + ) -> Result { + self.executor.executor.sender = Some(tx); + self.execute(input) + } } /// An executor for a batch of blocks. diff --git a/crates/evm/Cargo.toml b/crates/evm/Cargo.toml index f520c75eeaa9..a1b5b1cd2a27 100644 --- a/crates/evm/Cargo.toml +++ b/crates/evm/Cargo.toml @@ -26,6 +26,7 @@ revm.workspace = true alloy-eips.workspace = true auto_impl.workspace = true futures-util.workspace = true +tokio = { workspace = true, features = ["sync"] } metrics = { workspace = true, optional = true } parking_lot = { workspace = true, optional = true } diff --git a/crates/evm/src/either.rs b/crates/evm/src/either.rs index 84e1733e4812..dc82bea79b6f 100644 --- a/crates/evm/src/either.rs +++ b/crates/evm/src/either.rs @@ -12,6 +12,7 @@ use revm_primitives::db::Database; // re-export Either pub use futures_util::future::Either; +use tokio::sync::mpsc; impl BlockExecutorProvider for Either where @@ -71,6 +72,17 @@ where Self::Right(b) => b.execute(input), } } + + fn execute_and_stream( + self, + input: Self::Input<'_>, + tx: mpsc::UnboundedSender, + ) -> Result { + match self { + Self::Left(a) => a.execute_and_stream(input, tx), + Self::Right(b) => b.execute_and_stream(input, tx), + } + } } impl BatchExecutor for Either diff --git a/crates/evm/src/execute.rs b/crates/evm/src/execute.rs index 2109d557f8ec..1c366b0e2e7b 100644 --- a/crates/evm/src/execute.rs +++ b/crates/evm/src/execute.rs @@ -6,10 +6,10 @@ pub use reth_execution_types::{BlockExecutionInput, BlockExecutionOutput, Execut pub use reth_storage_errors::provider::ProviderError; use core::fmt::Display; - use reth_primitives::{BlockNumber, BlockWithSenders, Receipt}; use reth_prune_types::PruneModes; use revm_primitives::db::Database; +use tokio::sync::mpsc; /// A general purpose executor trait that executes an input (e.g. block) and produces an output /// (e.g. state changes and receipts). @@ -32,6 +32,12 @@ pub trait Executor { /// # Returns /// The output of the block execution. fn execute(self, input: Self::Input<'_>) -> Result; + + fn execute_and_stream( + self, + input: Self::Input<'_>, + tx: mpsc::UnboundedSender, + ) -> Result; } /// A general purpose executor that can execute multiple inputs in sequence, validate the outputs, @@ -178,6 +184,14 @@ mod tests { fn execute(self, _input: Self::Input<'_>) -> Result { Err(BlockExecutionError::msg("execution unavailable for tests")) } + + fn execute_and_stream( + self, + _input: Self::Input<'_>, + _tx: mpsc::UnboundedSender, + ) -> Result { + Err(BlockExecutionError::msg("execution unavailable for tests")) + } } impl BatchExecutor for TestExecutor { diff --git a/crates/evm/src/noop.rs b/crates/evm/src/noop.rs index ff8e893b2b6b..a00c7cd3057f 100644 --- a/crates/evm/src/noop.rs +++ b/crates/evm/src/noop.rs @@ -8,6 +8,7 @@ use reth_primitives::{BlockNumber, BlockWithSenders, Receipt}; use reth_prune_types::PruneModes; use reth_storage_errors::provider::ProviderError; use revm_primitives::db::Database; +use tokio::sync::mpsc; use crate::execute::{BatchExecutor, BlockExecutorProvider, Executor}; @@ -46,6 +47,14 @@ impl Executor for NoopBlockExecutorProvider { fn execute(self, _: Self::Input<'_>) -> Result { Err(BlockExecutionError::msg(UNAVAILABLE_FOR_NOOP)) } + + fn execute_and_stream( + self, + _: Self::Input<'_>, + _: mpsc::UnboundedSender, + ) -> Result { + Err(BlockExecutionError::msg(UNAVAILABLE_FOR_NOOP)) + } } impl BatchExecutor for NoopBlockExecutorProvider { diff --git a/crates/evm/src/system_calls/eip2935.rs b/crates/evm/src/system_calls/eip2935.rs index d32a58966653..e02521758cbf 100644 --- a/crates/evm/src/system_calls/eip2935.rs +++ b/crates/evm/src/system_calls/eip2935.rs @@ -2,6 +2,7 @@ use alloc::{boxed::Box, string::ToString}; use alloy_eips::eip2935::HISTORY_STORAGE_ADDRESS; +use tokio::sync::mpsc; use crate::ConfigureEvm; use core::fmt::Display; @@ -149,3 +150,34 @@ where Ok(()) } + +pub fn apply_blockhashes_contract_call2( + evm_config: &EvmConfig, + chain_spec: &ChainSpec, + block_timestamp: u64, + block_number: u64, + parent_block_hash: B256, + evm: &mut Evm<'_, EXT, DB>, + sender: &Option>, +) -> Result<(), BlockExecutionError> +where + DB: Database + DatabaseCommit, + DB::Error: core::fmt::Display, + EvmConfig: ConfigureEvm
, +{ + if let Some(res) = transact_blockhashes_contract_call( + evm_config, + chain_spec, + block_timestamp, + block_number, + parent_block_hash, + evm, + )? { + if let Some(tx) = sender { + let _ = tx.send(res.state.clone()); + } + evm.context.evm.db.commit(res.state); + } + + Ok(()) +} diff --git a/crates/evm/src/system_calls/eip4788.rs b/crates/evm/src/system_calls/eip4788.rs index 79beaca695d3..5ef232d8fe3c 100644 --- a/crates/evm/src/system_calls/eip4788.rs +++ b/crates/evm/src/system_calls/eip4788.rs @@ -1,5 +1,6 @@ //! [EIP-4788](https://eips.ethereum.org/EIPS/eip-4788) system call implementation. use alloc::{boxed::Box, string::ToString}; +use tokio::sync::mpsc; use crate::ConfigureEvm; use alloy_eips::eip4788::BEACON_ROOTS_ADDRESS; @@ -124,3 +125,75 @@ where Ok(()) } + +pub fn apply_beacon_root_contract_call2( + evm_config: &EvmConfig, + chain_spec: &Spec, + block_timestamp: u64, + block_number: u64, + parent_beacon_block_root: Option, + evm: &mut Evm<'_, EXT, DB>, + sender: &Option>, +) -> Result<(), BlockExecutionError> +where + DB: Database + DatabaseCommit, + DB::Error: core::fmt::Display, + EvmConfig: ConfigureEvm
, + Spec: EthereumHardforks, +{ + if !chain_spec.is_cancun_active_at_timestamp(block_timestamp) { + return Ok(()) + } + + let parent_beacon_block_root = + parent_beacon_block_root.ok_or(BlockValidationError::MissingParentBeaconBlockRoot)?; + + // if the block number is zero (genesis block) then the parent beacon block root must + // be 0x0 and no system transaction may occur as per EIP-4788 + if block_number == 0 { + if !parent_beacon_block_root.is_zero() { + return Err(BlockValidationError::CancunGenesisParentBeaconBlockRootNotZero { + parent_beacon_block_root, + } + .into()) + } + return Ok(()) + } + + // get previous env + let previous_env = Box::new(evm.context.env().clone()); + + // modify env for pre block call + evm_config.fill_tx_env_system_contract_call( + &mut evm.context.evm.env, + alloy_eips::eip4788::SYSTEM_ADDRESS, + BEACON_ROOTS_ADDRESS, + parent_beacon_block_root.0.into(), + ); + + let mut state = match evm.transact() { + Ok(res) => res.state, + Err(e) => { + evm.context.evm.env = previous_env; + return Err(BlockValidationError::BeaconRootContractCall { + parent_beacon_block_root: Box::new(parent_beacon_block_root), + message: e.to_string(), + } + .into()) + } + }; + + state.remove(&alloy_eips::eip4788::SYSTEM_ADDRESS); + state.remove(&evm.block().coinbase); + + if let Some(tx) = sender { + let _ = tx.send(state.clone()); + } + + evm.context.evm.db.commit(state); + + // re-set the previous env + evm.context.evm.env = previous_env; + + Ok(()) +} From 04431ea58b594861db3963d2ba98abdcc8b43c56 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Fri, 20 Sep 2024 15:06:28 +0200 Subject: [PATCH 2/5] compute --- Cargo.lock | 3 + crates/engine/tree/Cargo.toml | 4 + crates/engine/tree/src/tree/mod.rs | 77 ++++++++++++++-- crates/engine/tree/src/tree/root.rs | 136 ++++++++++++++++++++++++++++ crates/trie/trie/src/input.rs | 2 +- crates/trie/trie/src/prefix_set.rs | 26 ++++-- 6 files changed, 227 insertions(+), 21 deletions(-) create mode 100644 crates/engine/tree/src/tree/root.rs diff --git a/Cargo.lock b/Cargo.lock index 0c9a1c750884..19364a324196 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7031,6 +7031,7 @@ dependencies = [ "futures", "metrics", "rand 0.8.5", + "rayon", "reth-beacon-consensus", "reth-blockchain-tree", "reth-blockchain-tree-api", @@ -7062,8 +7063,10 @@ dependencies = [ "reth-tracing", "reth-trie", "reth-trie-parallel", + "revm-primitives", "thiserror", "tokio", + "tokio-stream", "tracing", ] diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 4697e7fb87a5..10d1bf3d9fcc 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -39,10 +39,14 @@ reth-trie-parallel.workspace = true alloy-primitives.workspace = true alloy-eips.workspace = true +revm-primitives.workspace = true + # common futures.workspace = true tokio = { workspace = true, features = ["macros", "sync"] } +tokio-stream.workspace = true thiserror.workspace = true +rayon.workspace = true # metrics metrics.workspace = true diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index dc71d2d75ef1..963e0ec9a2d8 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -43,7 +43,7 @@ use reth_rpc_types::{ }; use reth_stages_api::ControlFlow; use reth_trie::{updates::TrieUpdates, HashedPostState, TrieInput}; -use reth_trie_parallel::parallel_root::ParallelStateRoot; +use reth_trie_parallel::{async_root::AsyncStateRootError, parallel_root::ParallelStateRoot}; use std::{ cmp::Ordering, collections::{btree_map, hash_map, BTreeMap, HashMap, HashSet, VecDeque}, @@ -56,10 +56,10 @@ use std::{ time::Instant, }; use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - oneshot, - oneshot::error::TryRecvError, + mpsc::{self, UnboundedReceiver, UnboundedSender}, + oneshot::{self, error::TryRecvError}, }; +use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::*; pub mod config; @@ -70,6 +70,9 @@ pub use config::TreeConfig; pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook}; pub use reth_engine_primitives::InvalidBlockHook; +mod root; +use root::StateRootTask; + /// Keeps track of the state of the tree. /// /// ## Invariants @@ -524,7 +527,13 @@ impl std::fmt::Debug impl EngineApiTreeHandler where - P: DatabaseProviderFactory + BlockReader + StateProviderFactory + StateReader + Clone + 'static, + P: DatabaseProviderFactory + + BlockReader + + StateProviderFactory + + StateReader + + Clone + + Unpin + + 'static,

::Provider: BlockReader, E: BlockExecutorProvider, T: EngineTypes, @@ -2151,11 +2160,47 @@ where let sealed_block = Arc::new(block.block.clone()); let block = block.unseal(); + let (state_root_tx, state_root_rx) = oneshot::channel(); + let (state_tx, state_rx) = mpsc::unbounded_channel(); + let persistence_in_progress = self.persistence_state.in_progress(); + if !persistence_in_progress { + let started_at = Instant::now(); + let consistent_view = ConsistentDbView::new_with_latest_tip(self.provider.clone())?; + let mut input = TrieInput::default(); + if let Some((historical, blocks)) = + self.state.tree_state.blocks_by_hash(block.parent_hash) + { + input.append(consistent_view.revert_state(historical)?); + for block in blocks.iter().rev() { + input.append_cached_ref(block.trie_updates(), block.hashed_state()) + } + } else { + input.append(consistent_view.revert_state(block.parent_hash)?); + } + + let parent_state_root = parent_block.state_root; + rayon::spawn(move || { + let result: Result<(B256, TrieUpdates), AsyncStateRootError> = + futures::executor::block_on(async move { + StateRootTask::new( + consistent_view, + input, + UnboundedReceiverStream::from(state_rx), + parent_state_root, + ) + .await + }); + let _ = state_root_tx.send((started_at.elapsed(), result)); + }); + } else { + drop(state_rx); + drop(state_root_tx); + } + let exec_time = Instant::now(); - let output = self - .metrics - .executor - .metered((&block, U256::MAX).into(), |input| executor.execute(input))?; + let output = self.metrics.executor.metered((&block, U256::MAX).into(), |input| { + executor.execute_and_stream(input, state_tx) + })?; debug!(target: "engine::tree", elapsed=?exec_time.elapsed(), ?block_number, "Executed block"); if let Err(err) = self.consensus.validate_block_post_execution( @@ -2202,6 +2247,19 @@ where debug!(target: "engine", persistence_in_progress, "Failed to compute state root in parallel"); state_provider.state_root_with_updates(hashed_state.clone())? }; + let root_elapsed = root_time.elapsed(); + + match state_root_rx.blocking_recv() { + Ok((elapsed, Ok((root, _updates)))) => { + info!(target: "engine", regular_root = %state_root, regular_elapsed = ?root_elapsed, task_root = %root, task_elapsed = ?elapsed, "State root task finished"); + } + Ok((_, Err(error))) => { + error!(target: "engine", %error, persistence_in_progress, "State root task failed"); + } + Err(error) => { + warn!(target: "engine", %error, persistence_in_progress, "State root task skipped"); + } + } if state_root != block.state_root { // call post-block hook @@ -2217,7 +2275,6 @@ where .into()) } - let root_elapsed = root_time.elapsed(); self.metrics.block_validation.record_state_root(root_elapsed.as_secs_f64()); debug!(target: "engine::tree", ?root_elapsed, ?block_number, "Calculated state root"); diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs new file mode 100644 index 000000000000..dd7ee2803863 --- /dev/null +++ b/crates/engine/tree/src/tree/root.rs @@ -0,0 +1,136 @@ +use futures::{FutureExt, StreamExt}; +use reth_provider::{providers::ConsistentDbView, BlockReader, DatabaseProviderFactory}; +use reth_tasks::pool::BlockingTaskPool; +use reth_trie::{ + prefix_set::TriePrefixSetsMut, updates::TrieUpdates, HashedPostState, HashedStorage, TrieInput, +}; +use reth_trie_parallel::async_root::{AsyncStateRoot, AsyncStateRootError}; +use revm_primitives::{keccak256, B256}; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; + +pub type AsyncStateRootFut = + Pin> + Send>>; + +pub struct StateRootTask { + consistent_view: ConsistentDbView, + blocking_task_pool: BlockingTaskPool, + input: TrieInput, + state_stream: UnboundedReceiverStream, + state_stream_closed: bool, + state: HashedPostState, + prefix_sets: TriePrefixSetsMut, + trie_updates: TrieUpdates, + pending_state_root: Option, + state_root: B256, +} + +impl StateRootTask { + pub fn new( + consistent_view: ConsistentDbView, + input: TrieInput, + state_stream: UnboundedReceiverStream, + parent_state_root: B256, + ) -> Self { + Self { + consistent_view, + blocking_task_pool: BlockingTaskPool::build().unwrap(), + input, + state_stream, + state_stream_closed: false, + state: HashedPostState::default(), + prefix_sets: TriePrefixSetsMut::default(), + trie_updates: TrieUpdates::default(), + pending_state_root: None, + state_root: parent_state_root, + } + } + + fn on_state_update(&mut self, update: revm_primitives::EvmState) { + let mut hashed_state_update = HashedPostState::default(); + for (address, account) in update { + if account.is_touched() { + let hashed_address = keccak256(address); + + let destroyed = account.is_selfdestructed(); + hashed_state_update.accounts.insert( + hashed_address, + if destroyed || account.is_empty() { None } else { Some(account.info.into()) }, + ); + + if destroyed || !account.storage.is_empty() { + let storage = account.storage.into_iter().filter_map(|(slot, value)| { + (!destroyed && value.is_changed()) + .then(|| (keccak256(B256::from(slot)), value.present_value)) + }); + hashed_state_update + .storages + .insert(hashed_address, HashedStorage::from_iter(destroyed, storage)); + } + } + } + self.prefix_sets.extend(hashed_state_update.construct_prefix_sets()); + self.state.extend(hashed_state_update); + } +} + +impl Future for StateRootTask +where + Factory: DatabaseProviderFactory + Clone + Send + Sync + Unpin + 'static, +{ + type Output = Result<(B256, TrieUpdates), AsyncStateRootError>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.get_mut(); + + loop { + if this.state_stream_closed && + this.prefix_sets.is_empty() && + this.pending_state_root.is_none() + { + return Poll::Ready(Ok((this.state_root, std::mem::take(&mut this.trie_updates)))) + } + + if let Poll::Ready(next) = this.state_stream.poll_next_unpin(cx) { + if let Some(update) = next { + this.on_state_update(update); + continue + } else { + this.state_stream_closed = true; + } + } + + if let Some(mut pending) = this.pending_state_root.take() { + match pending.poll_unpin(cx)? { + Poll::Ready((state_root, trie_updates)) => { + this.state_root = state_root; + this.trie_updates.extend(trie_updates); + continue + } + Poll::Pending => { + this.pending_state_root = Some(pending); + } + } + } + + if this.pending_state_root.is_none() && !this.prefix_sets.is_empty() { + let view = this.consistent_view.clone(); + let task_pool = this.blocking_task_pool.clone(); + let mut input = this.input.clone(); // TODO: avoid cloning? + input.nodes.extend_ref(&this.trie_updates); + input.state.extend_ref(&this.state); + input.prefix_sets.extend(std::mem::take(&mut this.prefix_sets)); + this.pending_state_root = Some(Box::pin( + AsyncStateRoot::new(view, task_pool, input).incremental_root_with_updates(), + )); + continue + } + + return Poll::Pending + } + } +} diff --git a/crates/trie/trie/src/input.rs b/crates/trie/trie/src/input.rs index 18f9ada2f4ab..c149f544b49b 100644 --- a/crates/trie/trie/src/input.rs +++ b/crates/trie/trie/src/input.rs @@ -1,7 +1,7 @@ use crate::{prefix_set::TriePrefixSetsMut, updates::TrieUpdates, HashedPostState}; /// Inputs for trie-related computations. -#[derive(Default, Debug)] +#[derive(Clone, Default, Debug)] pub struct TrieInput { /// The collection of cached in-memory intermediate trie nodes that /// can be reused for computation. diff --git a/crates/trie/trie/src/prefix_set.rs b/crates/trie/trie/src/prefix_set.rs index 4997228050a3..0aa85653482d 100644 --- a/crates/trie/trie/src/prefix_set.rs +++ b/crates/trie/trie/src/prefix_set.rs @@ -18,6 +18,12 @@ pub struct TriePrefixSetsMut { } impl TriePrefixSetsMut { + pub fn is_empty(&self) -> bool { + self.account_prefix_set.is_empty() && + self.storage_prefix_sets.is_empty() && + self.destroyed_accounts.is_empty() + } + /// Extends prefix sets with contents of another prefix set. pub fn extend(&mut self, other: Self) { self.account_prefix_set.extend(other.account_prefix_set.keys); @@ -110,6 +116,16 @@ impl PrefixSetMut { Self { all: true, keys: Vec::new() } } + /// Returns `true` if the set is empty. + pub fn is_empty(&self) -> bool { + !self.all && self.keys.is_empty() + } + + /// Returns the number of elements in the set. + pub fn len(&self) -> usize { + self.keys.len() + } + /// Inserts the given `nibbles` into the set. pub fn insert(&mut self, nibbles: Nibbles) { self.keys.push(nibbles); @@ -123,16 +139,6 @@ impl PrefixSetMut { self.keys.extend(nibbles_iter); } - /// Returns the number of elements in the set. - pub fn len(&self) -> usize { - self.keys.len() - } - - /// Returns `true` if the set is empty. - pub fn is_empty(&self) -> bool { - self.keys.is_empty() - } - /// Returns a `PrefixSet` with the same elements as this set. /// /// If not yet sorted, the elements will be sorted and deduplicated. From a149dbbd6b3bd8764dbe9fb128a76c9e1796de44 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Fri, 20 Sep 2024 15:11:52 +0200 Subject: [PATCH 3/5] add debug logs --- crates/engine/tree/src/tree/root.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index dd7ee2803863..dfd053761047 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -12,6 +12,7 @@ use std::{ task::{Context, Poll}, }; use tokio_stream::wrappers::UnboundedReceiverStream; +use tracing::debug; pub type AsyncStateRootFut = Pin> + Send>>; @@ -97,6 +98,7 @@ where if let Poll::Ready(next) = this.state_stream.poll_next_unpin(cx) { if let Some(update) = next { + debug!(target: "engine::root", len = update.len(), "Received new state update"); this.on_state_update(update); continue } else { @@ -107,6 +109,7 @@ where if let Some(mut pending) = this.pending_state_root.take() { match pending.poll_unpin(cx)? { Poll::Ready((state_root, trie_updates)) => { + debug!(target: "engine::root", %state_root, "Computed intermediate root"); this.state_root = state_root; this.trie_updates.extend(trie_updates); continue @@ -118,6 +121,7 @@ where } if this.pending_state_root.is_none() && !this.prefix_sets.is_empty() { + debug!(target: "engine::root", accounts_len = this.prefix_sets.account_prefix_set.len(), "Spawning state root task"); let view = this.consistent_view.clone(); let task_pool = this.blocking_task_pool.clone(); let mut input = this.input.clone(); // TODO: avoid cloning? From 8c1fbd6f88d1da3b376cfd5585b592e2451774c6 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Mon, 23 Sep 2024 14:01:16 +0200 Subject: [PATCH 4/5] edits --- crates/engine/tree/src/tree/root.rs | 14 +- crates/trie/common/src/lib.rs | 4 +- crates/trie/parallel/src/async_proof.rs | 246 ++++++++++++++++++++++++ crates/trie/parallel/src/lib.rs | 4 + crates/trie/trie/src/input.rs | 22 ++- crates/trie/trie/src/lib.rs | 2 +- crates/trie/trie/src/proof.rs | 4 +- 7 files changed, 287 insertions(+), 9 deletions(-) create mode 100644 crates/trie/parallel/src/async_proof.rs diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index dfd053761047..5e5e01864866 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -11,6 +11,7 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use tokio::sync::oneshot; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::debug; @@ -128,9 +129,16 @@ where input.nodes.extend_ref(&this.trie_updates); input.state.extend_ref(&this.state); input.prefix_sets.extend(std::mem::take(&mut this.prefix_sets)); - this.pending_state_root = Some(Box::pin( - AsyncStateRoot::new(view, task_pool, input).incremental_root_with_updates(), - )); + let (tx, rx) = oneshot::channel(); + rayon::spawn(|| { + let result = futures::executor::block_on(async move { + AsyncStateRoot::new(view, task_pool, input) + .incremental_root_with_updates() + .await + }); + let _ = tx.send(result); + }); + this.pending_state_root = Some(Box::pin(async move { rx.await.unwrap() })); continue } diff --git a/crates/trie/common/src/lib.rs b/crates/trie/common/src/lib.rs index bdec36028b94..b52aae9bf2e3 100644 --- a/crates/trie/common/src/lib.rs +++ b/crates/trie/common/src/lib.rs @@ -30,4 +30,6 @@ pub use proofs::*; pub mod root; -pub use alloy_trie::{nodes::*, proof, BranchNodeCompact, HashBuilder, TrieMask, EMPTY_ROOT_HASH}; +pub use alloy_trie::{ + nodes::*, proof::*, BranchNodeCompact, HashBuilder, TrieMask, EMPTY_ROOT_HASH, +}; diff --git a/crates/trie/parallel/src/async_proof.rs b/crates/trie/parallel/src/async_proof.rs new file mode 100644 index 000000000000..79db67622bc3 --- /dev/null +++ b/crates/trie/parallel/src/async_proof.rs @@ -0,0 +1,246 @@ +use crate::{async_root::AsyncStateRootError, stats::ParallelTrieTracker, StorageRootTargets}; +use alloy_primitives::B256; +use alloy_rlp::{BufMut, Encodable}; +use itertools::Itertools; +use reth_db::DatabaseError; +use reth_provider::{ + providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, ProviderError, +}; +use reth_tasks::pool::BlockingTaskPool; +use reth_trie::{ + hashed_cursor::{HashedCursorFactory, HashedPostStateCursorFactory, HashedStorageCursor}, + node_iter::{TrieElement, TrieNodeIter}, + prefix_set::{PrefixSet, PrefixSetMut, TriePrefixSetsMut}, + trie_cursor::{InMemoryTrieCursorFactory, TrieCursorFactory}, + walker::TrieWalker, + HashBuilder, MultiProof, Nibbles, ProofRetainer, StorageMultiProof, TrieAccount, + TrieInputSorted, +}; +use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; +use tracing::debug; + +#[cfg(feature = "metrics")] +use crate::metrics::ParallelStateRootMetrics; + +/// TODO: +#[derive(Debug)] +pub struct AsyncProof { + /// Consistent view of the database. + view: ConsistentDbView, + /// Blocking task pool. + blocking_pool: BlockingTaskPool, + /// Trie input. + input: Arc, + /// Parallel state root metrics. + #[cfg(feature = "metrics")] + metrics: ParallelStateRootMetrics, +} + +impl AsyncProof { + /// Create new async state proof generator. + pub fn new( + view: ConsistentDbView, + blocking_pool: BlockingTaskPool, + input: Arc, + ) -> Self { + Self { + view, + blocking_pool, + input, + #[cfg(feature = "metrics")] + metrics: ParallelStateRootMetrics::default(), + } + } +} + +impl AsyncProof +where + Factory: DatabaseProviderFactory + Clone + Send + Sync + 'static, +{ + /// Generate a state multiproof according to specified targets. + pub async fn multiproof( + &self, + targets: HashMap>, + ) -> Result { + let mut tracker = ParallelTrieTracker::default(); + + // Extend prefix sets with targets + let mut prefix_sets = self.input.prefix_sets.clone(); + prefix_sets.extend(TriePrefixSetsMut { + account_prefix_set: PrefixSetMut::from(targets.keys().map(Nibbles::unpack)), + storage_prefix_sets: targets + .iter() + .filter_map(|(hashed_address, slots)| { + (!slots.is_empty()).then(|| { + ( + *hashed_address, + PrefixSetMut::from(slots.into_iter().map(Nibbles::unpack)), + ) + }) + }) + .collect(), + destroyed_accounts: Default::default(), + }); + let prefix_sets = prefix_sets.freeze(); + + let storage_root_targets = StorageRootTargets::new( + prefix_sets.account_prefix_set.iter().map(|nibbles| B256::from_slice(&nibbles.pack())), + prefix_sets.storage_prefix_sets.clone(), + ); + + // Pre-calculate storage roots async for accounts which were changed. + tracker.set_precomputed_storage_roots(storage_root_targets.len() as u64); + debug!(target: "trie::async_state_root", len = storage_root_targets.len(), "pre-generating storage proofs"); + let mut storage_proofs = HashMap::with_capacity(storage_root_targets.len()); + for (hashed_address, prefix_set) in + storage_root_targets.into_iter().sorted_unstable_by_key(|(address, _)| *address) + { + let view = self.view.clone(); + let input = self.input.clone(); + let targets = targets + .get(&hashed_address) + .map_or(Vec::new(), |slots| slots.iter().map(Nibbles::unpack).collect()); + let handle = + self.blocking_pool.spawn_fifo(move || -> Result<_, AsyncStateRootError> { + let provider_ro = view.provider_ro()?; + let trie_cursor_factory = InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), + &input.nodes, + ); + let hashed_cursor_factory = HashedPostStateCursorFactory::new( + DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), + &input.state, + ); + + Ok(storage_multiproof( + trie_cursor_factory, + hashed_cursor_factory, + prefix_set, + hashed_address, + targets, + ) + .map_err(ProviderError::Database)?) + }); + storage_proofs.insert(hashed_address, handle); + } + + let provider_ro = self.view.provider_ro()?; + let trie_cursor_factory = InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), + &self.input.nodes, + ); + let hashed_cursor_factory = HashedPostStateCursorFactory::new( + DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), + &self.input.state, + ); + + // Create the walker. + let walker = TrieWalker::new( + trie_cursor_factory.account_trie_cursor().map_err(ProviderError::Database)?, + prefix_sets.account_prefix_set, + ); + + // Create a hash builder to rebuild the root node since it is not available in the database. + let retainer = ProofRetainer::from_iter(targets.keys().map(Nibbles::unpack)); + let mut hash_builder = HashBuilder::default().with_proof_retainer(retainer); + + let mut storages = HashMap::default(); + let mut account_rlp = Vec::with_capacity(128); + let mut account_node_iter = TrieNodeIter::new( + walker, + hashed_cursor_factory.hashed_account_cursor().map_err(ProviderError::Database)?, + ); + while let Some(account_node) = + account_node_iter.try_next().map_err(ProviderError::Database)? + { + match account_node { + TrieElement::Branch(node) => { + hash_builder.add_branch(node.key, node.value, node.children_are_in_trie); + } + TrieElement::Leaf(hashed_address, account) => { + let storage_multiproof = match storage_proofs.remove(&hashed_address) { + Some(rx) => rx.await.map_err(|_| { + AsyncStateRootError::StorageRootChannelClosed { hashed_address } + })??, + // Since we do not store all intermediate nodes in the database, there might + // be a possibility of re-adding a non-modified leaf to the hash builder. + None => { + tracker.inc_missed_leaves(); + storage_multiproof( + trie_cursor_factory.clone(), + hashed_cursor_factory.clone(), + Default::default(), // TODO: check? + hashed_address, + targets.get(&hashed_address).map_or(Vec::new(), |slots| { + slots.iter().map(Nibbles::unpack).collect() + }), + ) + .map_err(ProviderError::Database)? + } + }; + + // Encode account + account_rlp.clear(); + let account = TrieAccount::from((account, storage_multiproof.root)); + account.encode(&mut account_rlp as &mut dyn BufMut); + + hash_builder.add_leaf(Nibbles::unpack(hashed_address), &account_rlp); + storages.insert(hashed_address, storage_multiproof); + } + } + } + let _ = hash_builder.root(); + + #[cfg(feature = "metrics")] + self.metrics.record_state_trie(tracker.finish()); + + Ok(MultiProof { account_subtree: hash_builder.take_proofs(), storages }) + } +} + +/// Generate a storage multiproof according to specified targets. +pub fn storage_multiproof( + trie_cursor_factory: T, + hashed_cursor_factory: H, + prefix_set: PrefixSet, // must already include targets + hashed_address: B256, + targets: Vec, +) -> Result +where + T: TrieCursorFactory, + H: HashedCursorFactory, +{ + let mut hashed_storage_cursor = hashed_cursor_factory.hashed_storage_cursor(hashed_address)?; + + // short circuit on empty storage + if hashed_storage_cursor.is_storage_empty()? { + return Ok(StorageMultiProof::default()) + } + + let trie_cursor = trie_cursor_factory.storage_trie_cursor(hashed_address)?; + let walker = TrieWalker::new(trie_cursor, prefix_set); + + let retainer = ProofRetainer::from_iter(targets); + let mut hash_builder = HashBuilder::default().with_proof_retainer(retainer); + let mut storage_node_iter = TrieNodeIter::new(walker, hashed_storage_cursor); + while let Some(node) = storage_node_iter.try_next()? { + match node { + TrieElement::Branch(node) => { + hash_builder.add_branch(node.key, node.value, node.children_are_in_trie); + } + TrieElement::Leaf(hashed_slot, value) => { + hash_builder.add_leaf( + Nibbles::unpack(hashed_slot), + alloy_rlp::encode_fixed_size(&value).as_ref(), + ); + } + } + } + + let root = hash_builder.root(); + Ok(StorageMultiProof { root, subtree: hash_builder.take_proofs() }) +} diff --git a/crates/trie/parallel/src/lib.rs b/crates/trie/parallel/src/lib.rs index ff130b2187e7..84b04d25cb01 100644 --- a/crates/trie/parallel/src/lib.rs +++ b/crates/trie/parallel/src/lib.rs @@ -17,6 +17,10 @@ pub mod stats; #[cfg(feature = "async")] pub mod async_root; +/// Implementation of async state proof generation. +#[cfg(feature = "async")] +pub mod async_proof; + /// Implementation of parallel state root computation. #[cfg(feature = "parallel")] pub mod parallel_root; diff --git a/crates/trie/trie/src/input.rs b/crates/trie/trie/src/input.rs index c149f544b49b..cdb4b1997c86 100644 --- a/crates/trie/trie/src/input.rs +++ b/crates/trie/trie/src/input.rs @@ -1,4 +1,16 @@ -use crate::{prefix_set::TriePrefixSetsMut, updates::TrieUpdates, HashedPostState}; +use crate::{ + prefix_set::TriePrefixSetsMut, + updates::{TrieUpdates, TrieUpdatesSorted}, + HashedPostState, HashedPostStateSorted, +}; + +// TODO: move +#[derive(Debug)] +pub struct TrieInputSorted { + pub nodes: TrieUpdatesSorted, + pub state: HashedPostStateSorted, + pub prefix_sets: TriePrefixSetsMut, +} /// Inputs for trie-related computations. #[derive(Clone, Default, Debug)] @@ -72,4 +84,12 @@ impl TrieInput { self.nodes.extend_ref(nodes); self.state.extend_ref(state); } + + pub fn into_sorted(self) -> TrieInputSorted { + TrieInputSorted { + nodes: self.nodes.into_sorted(), + state: self.state.into_sorted(), + prefix_sets: self.prefix_sets, + } + } } diff --git a/crates/trie/trie/src/lib.rs b/crates/trie/trie/src/lib.rs index 317ec3655400..e747ec52b972 100644 --- a/crates/trie/trie/src/lib.rs +++ b/crates/trie/trie/src/lib.rs @@ -38,7 +38,7 @@ pub use state::*; /// Input for trie computation. mod input; -pub use input::TrieInput; +pub use input::*; /// Merkle proof generation. pub mod proof; diff --git a/crates/trie/trie/src/proof.rs b/crates/trie/trie/src/proof.rs index 8b9d2f9d09fb..ae04742fb8a1 100644 --- a/crates/trie/trie/src/proof.rs +++ b/crates/trie/trie/src/proof.rs @@ -9,9 +9,7 @@ use crate::{ use alloy_primitives::{keccak256, Address, B256}; use alloy_rlp::{BufMut, Encodable}; use reth_execution_errors::trie::StateProofError; -use reth_trie_common::{ - proof::ProofRetainer, AccountProof, MultiProof, StorageMultiProof, TrieAccount, -}; +use reth_trie_common::{AccountProof, MultiProof, ProofRetainer, StorageMultiProof, TrieAccount}; use std::collections::{HashMap, HashSet}; /// A struct for generating merkle proofs. From 7f0336d220e437027582b7f83a47bff169419090 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Tue, 24 Sep 2024 10:36:44 +0200 Subject: [PATCH 5/5] another batch --- Cargo.lock | 2 + crates/engine/tree/Cargo.toml | 3 + crates/engine/tree/src/tree/mod.rs | 1 + crates/engine/tree/src/tree/root.rs | 291 +++++++++++++++++++++++----- crates/trie/common/src/proofs.rs | 17 +- crates/trie/trie/src/witness.rs | 197 ++++++++++--------- 6 files changed, 371 insertions(+), 140 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19364a324196..29a296414c7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7043,6 +7043,7 @@ dependencies = [ "reth-errors", "reth-ethereum-engine-primitives", "reth-evm", + "reth-execution-errors", "reth-exex-types", "reth-metrics", "reth-network-p2p", @@ -7062,6 +7063,7 @@ dependencies = [ "reth-tasks", "reth-tracing", "reth-trie", + "reth-trie-db", "reth-trie-parallel", "revm-primitives", "thiserror", diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 10d1bf3d9fcc..8907941002c3 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -33,11 +33,14 @@ reth-rpc-types.workspace = true reth-stages-api.workspace = true reth-tasks.workspace = true reth-trie.workspace = true +reth-trie-db.workspace = true reth-trie-parallel.workspace = true +reth-execution-errors.workspace = true # alloy alloy-primitives.workspace = true alloy-eips.workspace = true +alloy-rlp.workspace = true revm-primitives.workspace = true diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 963e0ec9a2d8..d0d000aa256e 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -2178,6 +2178,7 @@ where input.append(consistent_view.revert_state(block.parent_hash)?); } + let input = Arc::new(input.into_sorted()); let parent_state_root = parent_block.state_root; rayon::spawn(move || { let result: Result<(B256, TrieUpdates), AsyncStateRootError> = diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index 5e5e01864866..9786c7c0ce62 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -1,54 +1,97 @@ -use futures::{FutureExt, StreamExt}; -use reth_provider::{providers::ConsistentDbView, BlockReader, DatabaseProviderFactory}; +use alloy_rlp::{BufMut, Encodable}; +use futures::{stream::FuturesOrdered, FutureExt, StreamExt}; +use rayon::prelude::*; +use reth_errors::ProviderResult; +use reth_execution_errors::TrieWitnessError; +use reth_provider::{ + providers::ConsistentDbView, BlockReader, DBProvider, DatabaseProviderFactory, +}; use reth_tasks::pool::BlockingTaskPool; use reth_trie::{ - prefix_set::TriePrefixSetsMut, updates::TrieUpdates, HashedPostState, HashedStorage, TrieInput, + hashed_cursor::HashedPostStateCursorFactory, + proof::Proof, + trie_cursor::InMemoryTrieCursorFactory, + updates::TrieUpdates, + witness::{next_root_from_proofs, target_nodes}, + HashedPostState, HashedStorage, MultiProof, Nibbles, TrieAccount, TrieInputSorted, + EMPTY_ROOT_HASH, }; -use reth_trie_parallel::async_root::{AsyncStateRoot, AsyncStateRootError}; +use reth_trie_db::{DatabaseHashedCursorFactory, DatabaseTrieCursorFactory}; +use reth_trie_parallel::{async_proof::AsyncProof, async_root::AsyncStateRootError}; use revm_primitives::{keccak256, B256}; use std::{ + collections::{HashMap, HashSet}, future::Future, pin::Pin, + sync::Arc, task::{Context, Poll}, + time::{Duration, Instant}, }; use tokio::sync::oneshot; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::debug; -pub type AsyncStateRootFut = - Pin> + Send>>; +type AsyncStateRootFut = + Pin> + Send>>; + +type AsyncStateProofFut = Pin< + Box< + dyn Future< + Output = Result, oneshot::error::RecvError>, + > + Send, + >, +>; -pub struct StateRootTask { +pub(crate) struct StateRootTask { consistent_view: ConsistentDbView, blocking_task_pool: BlockingTaskPool, - input: TrieInput, state_stream: UnboundedReceiverStream, state_stream_closed: bool, + input: Arc, state: HashedPostState, - prefix_sets: TriePrefixSetsMut, trie_updates: TrieUpdates, - pending_state_root: Option, - state_root: B256, + pending_proofs: FuturesOrdered, + task_state: StateRootTaskState, +} + +enum StateRootTaskState { + Idle(MultiProof, B256), + Pending(MultiProof, AsyncStateRootFut), +} + +impl StateRootTaskState { + fn add_proofs(&mut self, proofs: MultiProof) { + match self { + Self::Idle(multiproof, _) => { + multiproof.extend(proofs); + } + Self::Pending(multiproof, _) => { + multiproof.extend(proofs); + } + } + } } -impl StateRootTask { - pub fn new( +impl StateRootTask +where + Factory: DatabaseProviderFactory + Clone + Send + Sync + Unpin + 'static, +{ + pub(crate) fn new( consistent_view: ConsistentDbView, - input: TrieInput, + input: Arc, state_stream: UnboundedReceiverStream, parent_state_root: B256, ) -> Self { Self { consistent_view, blocking_task_pool: BlockingTaskPool::build().unwrap(), - input, state_stream, state_stream_closed: false, + input, state: HashedPostState::default(), - prefix_sets: TriePrefixSetsMut::default(), trie_updates: TrieUpdates::default(), - pending_state_root: None, - state_root: parent_state_root, + pending_proofs: FuturesOrdered::new(), + task_state: StateRootTaskState::Idle(MultiProof::default(), parent_state_root), } } @@ -75,7 +118,36 @@ impl StateRootTask { } } } - self.prefix_sets.extend(hashed_state_update.construct_prefix_sets()); + + // Dispatch proof gathering for this state update + // TODO: batch these + let view = self.consistent_view.clone(); + let task_pool = self.blocking_task_pool.clone(); + let input = self.input.clone(); + + let targets = hashed_state_update + .accounts + .keys() + .filter(|hashed_address| { + !self.state.accounts.contains_key(*hashed_address) && + !self.state.storages.contains_key(*hashed_address) + }) + .map(|hashed_address| (*hashed_address, HashSet::default())) + // TODO: filter storages + .chain(hashed_state_update.storages.iter().map(|(hashed_address, storage)| { + (*hashed_address, storage.storage.keys().copied().collect()) + })) + .collect::>(); + + let (tx, rx) = oneshot::channel(); + rayon::spawn(|| { + let result = futures::executor::block_on(async move { + AsyncProof::new(view, task_pool, input).multiproof(targets).await + }); + let _ = tx.send(result); + }); + self.pending_proofs.push_back(Box::pin(async move { rx.await })); + self.state.extend(hashed_state_update); } } @@ -90,11 +162,11 @@ where let this = self.get_mut(); loop { - if this.state_stream_closed && - this.prefix_sets.is_empty() && - this.pending_state_root.is_none() - { - return Poll::Ready(Ok((this.state_root, std::mem::take(&mut this.trie_updates)))) + // TODO: fix + if this.state_stream_closed && this.pending_proofs.is_empty() { + if let StateRootTaskState::Idle(_multiproof, state_root) = &mut this.task_state { + return Poll::Ready(Ok((*state_root, std::mem::take(&mut this.trie_updates)))) + } } if let Poll::Ready(next) = this.state_stream.poll_next_unpin(cx) { @@ -107,38 +179,39 @@ where } } - if let Some(mut pending) = this.pending_state_root.take() { - match pending.poll_unpin(cx)? { - Poll::Ready((state_root, trie_updates)) => { - debug!(target: "engine::root", %state_root, "Computed intermediate root"); - this.state_root = state_root; - this.trie_updates.extend(trie_updates); - continue - } - Poll::Pending => { - this.pending_state_root = Some(pending); - } + if let Poll::Ready(Some(result)) = this.pending_proofs.poll_next_unpin(cx) { + let multiproof = result.unwrap().unwrap(); + this.task_state.add_proofs(multiproof); + continue + } + + if let StateRootTaskState::Pending(multiproof, pending) = &mut this.task_state { + if let Poll::Ready((state_root, mut multiproof2, trie_updates, elapsed)) = + pending.poll_unpin(cx)? + { + debug!(target: "engine::root", %state_root, ?elapsed, "Computed intermediate root"); + this.trie_updates.extend(trie_updates); + multiproof2.extend(std::mem::take(multiproof)); + this.task_state = StateRootTaskState::Idle(multiproof2, state_root); + continue } } - if this.pending_state_root.is_none() && !this.prefix_sets.is_empty() { - debug!(target: "engine::root", accounts_len = this.prefix_sets.account_prefix_set.len(), "Spawning state root task"); + if let StateRootTaskState::Idle(multiproof, _) = &mut this.task_state { + debug!(target: "engine::root", accounts_len = this.state.accounts.len(), "Spawning state root task"); let view = this.consistent_view.clone(); - let task_pool = this.blocking_task_pool.clone(); - let mut input = this.input.clone(); // TODO: avoid cloning? - input.nodes.extend_ref(&this.trie_updates); - input.state.extend_ref(&this.state); - input.prefix_sets.extend(std::mem::take(&mut this.prefix_sets)); + let input = this.input.clone(); + let multiproof = std::mem::take(multiproof); + let state = this.state.clone(); let (tx, rx) = oneshot::channel(); rayon::spawn(|| { - let result = futures::executor::block_on(async move { - AsyncStateRoot::new(view, task_pool, input) - .incremental_root_with_updates() - .await - }); + let result = calculate_state_root_from_proofs(view, input, multiproof, state); let _ = tx.send(result); }); - this.pending_state_root = Some(Box::pin(async move { rx.await.unwrap() })); + this.task_state = StateRootTaskState::Pending( + Default::default(), + Box::pin(async move { rx.await.unwrap() }), + ); continue } @@ -146,3 +219,125 @@ where } } } + +fn calculate_state_root_from_proofs( + view: ConsistentDbView, + input: Arc, + multiproof: MultiProof, + state: HashedPostState, +) -> ProviderResult<(B256, MultiProof, TrieUpdates, Duration)> +where + Factory: DatabaseProviderFactory + Clone, +{ + let started_at = Instant::now(); + let provider_ro = view.provider_ro()?; + + let proof_targets: HashMap> = HashMap::from_iter( + state.accounts.keys().map(|hashed_address| (*hashed_address, HashSet::default())).chain( + state.storages.iter().map(|(hashed_address, storage)| { + (*hashed_address, storage.storage.keys().copied().collect()) + }), + ), + ); + + let account_trie_nodes = proof_targets + .into_par_iter() + .map(|(hashed_address, hashed_slots)| { + // Gather and record storage trie nodes for this account. + let mut storage_trie_nodes = Vec::with_capacity(hashed_slots.len()); + let storage = state.storages.get(&hashed_address); + for hashed_slot in hashed_slots { + let slot_key = Nibbles::unpack(hashed_slot); + let slot_value = storage + .and_then(|s| s.storage.get(&hashed_slot)) + .filter(|v| !v.is_zero()) + .map(|v| alloy_rlp::encode_fixed_size(v).to_vec()); + let proof = multiproof + .storages + .get(&hashed_address) + .map(|proof| { + proof + .subtree + .iter() + .filter(|e| slot_key.starts_with(e.0)) + .collect::>() + }) + .unwrap_or_default(); + storage_trie_nodes.extend(target_nodes(slot_key.clone(), slot_value, proof, None)?); + } + + let storage_root = next_root_from_proofs(storage_trie_nodes, true, |key: Nibbles| { + // Right pad the target with 0s. + let mut padded_key = key.pack(); + padded_key.resize(32, 0); + let mut proof = Proof::new( + InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), + &input.nodes, + ), + HashedPostStateCursorFactory::new( + DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), + &input.state, + ), + ) + .with_target((hashed_address, HashSet::from([B256::from_slice(&padded_key)]))) + .multiproof() + .unwrap(); + + // The subtree only contains the proof for a single target. + let node = proof + .storages + .get_mut(&hashed_address) + .and_then(|storage_multiproof| storage_multiproof.subtree.remove(&key)) + .ok_or(TrieWitnessError::MissingTargetNode(key))?; + Ok(node) + })?; + + // Gather and record account trie nodes. + let account = state + .accounts + .get(&hashed_address) + .ok_or(TrieWitnessError::MissingAccount(hashed_address))?; + let value = if account.is_some() || storage_root != EMPTY_ROOT_HASH { + let mut encoded = Vec::with_capacity(128); + TrieAccount::from((account.unwrap_or_default(), storage_root)) + .encode(&mut encoded as &mut dyn BufMut); + Some(encoded) + } else { + None + }; + let key = Nibbles::unpack(hashed_address); + let proof = multiproof.account_subtree.iter().filter(|e| key.starts_with(e.0)); + Ok(target_nodes(key.clone(), value, proof, None)?) + }) + .collect::>>()?; + + let state_root = + next_root_from_proofs(account_trie_nodes.into_iter().flatten(), true, |key: Nibbles| { + // Right pad the target with 0s. + let mut padded_key = key.pack(); + padded_key.resize(32, 0); + let mut proof = Proof::new( + InMemoryTrieCursorFactory::new( + DatabaseTrieCursorFactory::new(provider_ro.tx_ref()), + &input.nodes, + ), + HashedPostStateCursorFactory::new( + DatabaseHashedCursorFactory::new(provider_ro.tx_ref()), + &input.state, + ), + ) + .with_target((B256::from_slice(&padded_key), Default::default())) + .multiproof() + .unwrap(); + + // The subtree only contains the proof for a single target. + let node = proof + .account_subtree + .remove(&key) + .ok_or(TrieWitnessError::MissingTargetNode(key))?; + Ok(node) + })?; + + Ok((state_root, multiproof, Default::default(), started_at.elapsed())) +} diff --git a/crates/trie/common/src/proofs.rs b/crates/trie/common/src/proofs.rs index df32b1cb9f6a..4cb9b448d924 100644 --- a/crates/trie/common/src/proofs.rs +++ b/crates/trie/common/src/proofs.rs @@ -10,7 +10,7 @@ use alloy_trie::{ }; use reth_primitives_traits::{constants::KECCAK_EMPTY, Account}; use serde::{Deserialize, Serialize}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::{hash_map, BTreeMap, HashMap}; /// The state multiproof of target accounts and multiproofs of their storage tries. /// Multiproof is effectively a state subtrie that only contains the nodes @@ -74,6 +74,21 @@ impl MultiProof { } Ok(AccountProof { address, info, proof, storage_root, storage_proofs }) } + + pub fn extend(&mut self, other: Self) { + self.account_subtree.extend(other.account_subtree); + for (hashed_address, storage) in other.storages { + match self.storages.entry(hashed_address) { + hash_map::Entry::Occupied(mut entry) => { + debug_assert_eq!(entry.get().root, storage.root); + entry.get_mut().subtree.extend(storage.subtree); + } + hash_map::Entry::Vacant(entry) => { + entry.insert(storage); + } + } + } + } } /// The merkle multiproof of storage trie. diff --git a/crates/trie/trie/src/witness.rs b/crates/trie/trie/src/witness.rs index 0d0839616021..b79c30998aa6 100644 --- a/crates/trie/trie/src/witness.rs +++ b/crates/trie/trie/src/witness.rs @@ -1,14 +1,15 @@ use crate::{ hashed_cursor::HashedCursorFactory, prefix_set::TriePrefixSetsMut, proof::Proof, - trie_cursor::TrieCursorFactory, HashedPostState, + trie_cursor::TrieCursorFactory, updates::TrieUpdates, HashedPostState, }; use alloy_primitives::{keccak256, Bytes, B256}; use alloy_rlp::{BufMut, Decodable, Encodable}; use itertools::Either; +use rayon::slice::ParallelSliceMut; use reth_execution_errors::{StateProofError, TrieWitnessError}; use reth_primitives::constants::EMPTY_ROOT_HASH; use reth_trie_common::{ - BranchNode, HashBuilder, Nibbles, TrieAccount, TrieNode, CHILD_INDEX_RANGE, + BranchNode, BranchNodeCompact, HashBuilder, Nibbles, TrieAccount, TrieNode, CHILD_INDEX_RANGE, }; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -96,7 +97,7 @@ where // Attempt to compute state root from proofs and gather additional // information for the witness. let mut account_rlp = Vec::with_capacity(128); - let mut account_trie_nodes = BTreeMap::default(); + let mut account_trie_nodes = Vec::new(); for (hashed_address, hashed_slots) in proof_targets { let storage_multiproof = account_multiproof.storages.remove(&hashed_address).unwrap_or_default(); @@ -116,7 +117,12 @@ where }; let key = Nibbles::unpack(hashed_address); let proof = account_multiproof.account_subtree.iter().filter(|e| key.starts_with(e.0)); - account_trie_nodes.extend(self.target_nodes(key.clone(), value, proof)?); + account_trie_nodes.extend(target_nodes( + key.clone(), + value, + proof, + Some(&mut self.witness), + )?); // Gather and record storage trie nodes for this account. let mut storage_trie_nodes = BTreeMap::default(); @@ -128,14 +134,15 @@ where .filter(|v| !v.is_zero()) .map(|v| alloy_rlp::encode_fixed_size(v).to_vec()); let proof = storage_multiproof.subtree.iter().filter(|e| slot_key.starts_with(e.0)); - storage_trie_nodes.extend(self.target_nodes( + storage_trie_nodes.extend(target_nodes( slot_key.clone(), slot_value, proof, + Some(&mut self.witness), )?); } - Self::next_root_from_proofs(storage_trie_nodes, |key: Nibbles| { + next_root_from_proofs(storage_trie_nodes, false, |key: Nibbles| { // Right pad the target with 0s. let mut padded_key = key.pack(); padded_key.resize(32, 0); @@ -156,7 +163,7 @@ where })?; } - Self::next_root_from_proofs(account_trie_nodes, |key: Nibbles| { + next_root_from_proofs(account_trie_nodes, false, |key: Nibbles| { // Right pad the target with 0s. let mut padded_key = key.pack(); padded_key.resize(32, 0); @@ -177,108 +184,116 @@ where Ok(self.witness) } +} - /// Decodes and unrolls all nodes from the proof. Returns only sibling nodes - /// in the path of the target and the final leaf node with updated value. - fn target_nodes<'b>( - &mut self, - key: Nibbles, - value: Option>, - proof: impl IntoIterator, - ) -> Result>>, StateProofError> { - let mut trie_nodes = BTreeMap::default(); - for (path, encoded) in proof { - // Record the node in witness. - self.witness.insert(keccak256(encoded.as_ref()), encoded.clone()); +/// TODO: +/// Decodes and unrolls all nodes from the proof. Returns only sibling nodes +/// in the path of the target and the final leaf node with updated value. +pub fn target_nodes<'b>( + key: Nibbles, + value: Option>, + proof: impl IntoIterator, + mut witness: Option<&mut HashMap>, +) -> Result>)>, StateProofError> { + let mut trie_nodes = Vec::new(); + for (path, encoded) in proof { + // Record the node in witness. + if let Some(witness) = witness.as_mut() { + witness.insert(keccak256(encoded.as_ref()), encoded.clone()); + } - let mut next_path = path.clone(); - match TrieNode::decode(&mut &encoded[..])? { - TrieNode::Branch(branch) => { - next_path.push(key[path.len()]); - let children = branch_node_children(path.clone(), &branch); - for (child_path, node_hash) in children { - if !key.starts_with(&child_path) { - trie_nodes.insert(child_path, Either::Left(node_hash)); - } + let mut next_path = path.clone(); + match TrieNode::decode(&mut &encoded[..])? { + TrieNode::Branch(branch) => { + next_path.push(key[path.len()]); + let children = branch_node_children(path.clone(), &branch); + for (child_path, node_hash) in children { + if !key.starts_with(&child_path) { + trie_nodes.push((child_path, Either::Left(node_hash))); } } - TrieNode::Extension(extension) => { - next_path.extend_from_slice(&extension.key); - } - TrieNode::Leaf(leaf) => { - next_path.extend_from_slice(&leaf.key); - if next_path != key { - trie_nodes.insert(next_path.clone(), Either::Right(leaf.value.clone())); - } + } + TrieNode::Extension(extension) => { + next_path.extend_from_slice(&extension.key); + } + TrieNode::Leaf(leaf) => { + next_path.extend_from_slice(&leaf.key); + if next_path != key { + trie_nodes.push((next_path.clone(), Either::Right(leaf.value.clone()))); } - }; - } - - if let Some(value) = value { - trie_nodes.insert(key, Either::Right(value)); - } + } + }; + } - Ok(trie_nodes) + if let Some(value) = value { + trie_nodes.push((key, Either::Right(value))); } - fn next_root_from_proofs( - trie_nodes: BTreeMap>>, - mut trie_node_provider: impl FnMut(Nibbles) -> Result, - ) -> Result { - // Ignore branch child hashes in the path of leaves or lower child hashes. - let mut keys = trie_nodes.keys().peekable(); - let mut ignored = HashSet::::default(); - while let Some(key) = keys.next() { - if keys.peek().map_or(false, |next| next.starts_with(key)) { - ignored.insert(key.clone()); - } + Ok(trie_nodes) +} + +/// TODO: +pub fn next_root_from_proofs( + trie_nodes: impl IntoIterator>)>, + retain_updates: bool, + mut trie_node_provider: impl FnMut(Nibbles) -> Result, +) -> Result { + let mut trie_nodes = Vec::from_iter(trie_nodes.into_iter()); + trie_nodes.par_sort_by_key(|(n, _)| n.clone()); + + // Ignore branch child hashes in the path of leaves or lower child hashes. + let mut keys = trie_nodes.iter().peekable(); + let mut ignored = HashSet::::default(); + while let Some((key, _)) = keys.next() { + if keys.peek().map_or(false, |(next, _)| next.starts_with(key)) { + ignored.insert(key.clone()); } + } - let mut hash_builder = HashBuilder::default(); - let mut trie_nodes = trie_nodes.into_iter().filter(|e| !ignored.contains(&e.0)).peekable(); - while let Some((path, value)) = trie_nodes.next() { - match value { - Either::Left(branch_hash) => { - let parent_branch_path = path.slice(..path.len() - 1); - if hash_builder.key.starts_with(&parent_branch_path) || - trie_nodes - .peek() - .map_or(false, |next| next.0.starts_with(&parent_branch_path)) - { - hash_builder.add_branch(path, branch_hash, false); - } else { - // Parent is a branch node that needs to be turned into an extension node. - let mut path = path.clone(); - loop { - let node = trie_node_provider(path.clone())?; - match TrieNode::decode(&mut &node[..])? { - TrieNode::Branch(branch) => { - let children = branch_node_children(path, &branch); - for (child_path, branch_hash) in children { - hash_builder.add_branch(child_path, branch_hash, false); - } - break - } - TrieNode::Leaf(leaf) => { - let mut child_path = path; - child_path.extend_from_slice(&leaf.key); - hash_builder.add_leaf(child_path, &leaf.value); - break - } - TrieNode::Extension(ext) => { - path.extend_from_slice(&ext.key); + let mut hash_builder = HashBuilder::default().with_updates(retain_updates); + let mut trie_nodes = trie_nodes.into_iter().filter(|e| !ignored.contains(&e.0)).peekable(); + while let Some((path, value)) = trie_nodes.next() { + match value { + Either::Left(branch_hash) => { + let parent_branch_path = path.slice(..path.len() - 1); + if hash_builder.key.starts_with(&parent_branch_path) || + trie_nodes + .peek() + .map_or(false, |next| next.0.starts_with(&parent_branch_path)) + { + hash_builder.add_branch(path, branch_hash, false); + } else { + // Parent is a branch node that needs to be turned into an extension node. + let mut path = path.clone(); + loop { + let node = trie_node_provider(path.clone())?; + match TrieNode::decode(&mut &node[..])? { + TrieNode::Branch(branch) => { + let children = branch_node_children(path, &branch); + for (child_path, branch_hash) in children { + hash_builder.add_branch(child_path, branch_hash, false); } + break + } + TrieNode::Leaf(leaf) => { + let mut child_path = path; + child_path.extend_from_slice(&leaf.key); + hash_builder.add_leaf(child_path, &leaf.value); + break + } + TrieNode::Extension(ext) => { + path.extend_from_slice(&ext.key); } } } } - Either::Right(leaf_value) => { - hash_builder.add_leaf(path, &leaf_value); - } + } + Either::Right(leaf_value) => { + hash_builder.add_leaf(path, &leaf_value); } } - Ok(hash_builder.root()) } + Ok(hash_builder.root()) } /// Returned branch node children with keys in order.