Skip to content

Commit

Permalink
perf(tree): streaming state root from proofs
Browse files Browse the repository at this point in the history
  • Loading branch information
rkrasiuk committed Sep 13, 2024
1 parent ef1d9e7 commit 42de71c
Show file tree
Hide file tree
Showing 20 changed files with 413 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion crates/chain-state/src/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,8 @@ impl NewCanonicalChain {

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use super::*;
use crate::test_utils::TestBlockBuilder;
use rand::Rng;
Expand All @@ -839,7 +841,7 @@ mod tests {
AccountReader, BlockHashReader, StateProofProvider, StateProvider, StateRootProvider,
StorageRootProvider,
};
use reth_trie::{prefix_set::TriePrefixSetsMut, AccountProof, HashedStorage};
use reth_trie::{prefix_set::TriePrefixSetsMut, AccountProof, HashedStorage, MultiProof};

fn create_mock_state(
test_block_builder: &mut TestBlockBuilder,
Expand Down Expand Up @@ -959,6 +961,14 @@ mod tests {
Ok(AccountProof::new(Address::random()))
}

fn multiproof(
&self,
hashed_state: HashedPostState,
targets: HashMap<Address, HashSet<B256>>,
) -> ProviderResult<MultiProof> {
Ok(MultiProof::default())
}

fn witness(
&self,
_overlay: HashedPostState,
Expand Down
18 changes: 16 additions & 2 deletions crates/chain-state/src/memory_overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ use reth_storage_api::{
};
use reth_trie::{
prefix_set::TriePrefixSetsMut, updates::TrieUpdates, AccountProof, HashedPostState,
HashedStorage,
HashedStorage, MultiProof,
};
use std::{
collections::{HashMap, HashSet},
sync::OnceLock,
};
use std::{collections::HashMap, sync::OnceLock};

/// A state provider that stores references to in-memory blocks along with their state as well as
/// the historical state provider for fallback lookups.
Expand Down Expand Up @@ -168,6 +171,17 @@ impl StateProofProvider for MemoryOverlayStateProvider {
self.historical.proof(hashed_state, address, slots)
}

// TODO: !!!!!
fn multiproof(
&self,
state: HashedPostState,
targets: HashMap<Address, HashSet<B256>>,
) -> ProviderResult<MultiProof> {
let mut hashed_state = self.trie_state().hashed_state.clone();
hashed_state.extend(state);
self.historical.multiproof(hashed_state, targets)
}

// TODO: Currently this does not reuse available in-memory trie nodes.
fn witness(
&self,
Expand Down
1 change: 1 addition & 0 deletions crates/engine/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ where
blockchain_db,
executor_factory,
consensus,
pipeline_task_spawner.clone(),
payload_validator,
persistence_handle,
payload_builder,
Expand Down
2 changes: 2 additions & 0 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ reth-consensus.workspace = true
reth-engine-primitives.workspace = true
reth-errors.workspace = true
reth-evm.workspace = true
reth-execution-errors.workspace = true
reth-network-p2p.workspace = true
reth-payload-builder.workspace = true
reth-payload-primitives.workspace = true
Expand All @@ -34,6 +35,7 @@ reth-tasks.workspace = true
reth-node-types.workspace = true
reth-trie.workspace = true
reth-trie-parallel.workspace = true
alloy-rlp.workspace = true

# common
futures.workspace = true
Expand Down
169 changes: 158 additions & 11 deletions crates/engine/tree/src/tree/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::{
backfill::{BackfillAction, BackfillSyncState},
chain::FromOrchestrator,
engine::{DownloadRequest, EngineApiEvent, FromEngine},
engine::{DownloadRequest, EngineApiEvent, EngineApiRequest, FromEngine},
persistence::PersistenceHandle,
};
use alloy_rlp::BufMut;
use reth_beacon_consensus::{
BeaconConsensusEngineEvent, BeaconEngineMessage, ForkchoiceStateTracker, InvalidHeaderCache,
OnForkChoiceUpdated, MIN_BLOCKS_FOR_PIPELINE_RUN,
Expand All @@ -28,8 +29,8 @@ use reth_primitives::{
};
use reth_provider::{
providers::ConsistentDbView, BlockReader, DatabaseProviderFactory, ExecutionOutcome,
ProviderError, StateProviderBox, StateProviderFactory, StateReader, StateRootProvider,
TransactionVariant,
ProviderError, StateProvider, StateProviderBox, StateProviderFactory, StateReader,
StateRootProvider, TransactionVariant,
};
use reth_revm::database::StateProviderDatabase;
use reth_rpc_types::{
Expand All @@ -40,7 +41,11 @@ use reth_rpc_types::{
ExecutionPayload,
};
use reth_stages_api::ControlFlow;
use reth_trie::{prefix_set::TriePrefixSetsMut, updates::TrieUpdates, HashedPostState};
use reth_tasks::TaskSpawner;
use reth_trie::{
prefix_set::TriePrefixSetsMut, updates::TrieUpdates, witness::TrieWitness, HashedPostState,
MultiProof, EMPTY_ROOT_HASH,
};
use reth_trie_parallel::parallel_root::ParallelStateRoot;
use std::{
cmp::Ordering,
Expand All @@ -53,21 +58,25 @@ use std::{
},
time::Instant,
};
use streaming_database::{StateAccess, StreamingDatabase};
use tokio::sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot,
oneshot::error::TryRecvError,
mpsc::{self, UnboundedReceiver, UnboundedSender},
oneshot::{self, error::TryRecvError},
};
use tracing::*;

mod config;
mod invalid_block_hook;
mod metrics;
use crate::{engine::EngineApiRequest, tree::metrics::EngineApiMetrics};
pub use config::TreeConfig;

mod invalid_block_hook;
pub use invalid_block_hook::{InvalidBlockHooks, NoopInvalidBlockHook};
pub use reth_engine_primitives::InvalidBlockHook;

mod metrics;
use metrics::EngineApiMetrics;

mod streaming_database;

/// Keeps track of the state of the tree.
///
/// ## Invariants
Expand Down Expand Up @@ -461,6 +470,7 @@ pub struct EngineApiTreeHandler<P, E, T: EngineTypes> {
provider: P,
executor_provider: E,
consensus: Arc<dyn Consensus>,
task_spawner: Box<dyn TaskSpawner>,
payload_validator: ExecutionPayloadValidator,
/// Keeps track of internals such as executed and buffered blocks.
state: EngineApiTreeState,
Expand Down Expand Up @@ -531,6 +541,7 @@ where
provider: P,
executor_provider: E,
consensus: Arc<dyn Consensus>,
task_spawner: Box<dyn TaskSpawner>,
payload_validator: ExecutionPayloadValidator,
outgoing: UnboundedSender<EngineApiEvent>,
state: EngineApiTreeState,
Expand All @@ -545,6 +556,7 @@ where
provider,
executor_provider,
consensus,
task_spawner,
payload_validator,
incoming,
outgoing,
Expand Down Expand Up @@ -576,6 +588,7 @@ where
provider: P,
executor_provider: E,
consensus: Arc<dyn Consensus>,
task_spawner: Box<dyn TaskSpawner>,
payload_validator: ExecutionPayloadValidator,
persistence: PersistenceHandle,
payload_builder: PayloadBuilderHandle<T>,
Expand Down Expand Up @@ -604,6 +617,7 @@ where
provider,
executor_provider,
consensus,
task_spawner,
payload_validator,
tx,
state,
Expand Down Expand Up @@ -2127,6 +2141,7 @@ where
missing_ancestor,
}))
};
let proof_provider = self.state_provider(block.parent_hash)?.unwrap();

// now validate against the parent
let parent_block = self.sealed_header_by_hash(block.parent_hash)?.ok_or_else(|| {
Expand All @@ -2139,7 +2154,39 @@ where
return Err(e.into())
}

let executor = self.executor_provider.executor(StateProviderDatabase::new(&state_provider));
let (database, mut rx) = StreamingDatabase::new_with_rx(&state_provider);
let executor = self.executor_provider.executor(StateProviderDatabase::new(database));

// Spawn proof gathering task
let (multiproof_tx, multiproof_rx) = oneshot::channel();
self.task_spawner.spawn(Box::pin(async move {
let mut multiproof = MultiProof::default();
while let Some(next) = rx.recv().await {
let mut targets = HashMap::from([match next {
StateAccess::Account(address) => (address, HashSet::default()),
StateAccess::StorageSlot(address, slot) => (address, HashSet::from(slot)),
}]);

'inner: loop {
let Ok(next) = rx.try_recv() else {
break 'inner;
};
match next {
StateAccess::Account(address) => {
targets.entry(address).or_default();
}
StateAccess::StorageSlot(address, slot) => {
targets.entry(address).or_default().insert(slot);
}
}
}

multiproof
.extend(proof_provider.multiproof(HashPostState::default(), targets).unwrap());
}

let _ = multiproof_tx.send((proof_provider, multiproof));
}));

let block_number = block.number;
let block_hash = block.hash();
Expand Down Expand Up @@ -2169,6 +2216,8 @@ where

let hashed_state = HashedPostState::from_bundle_state(&output.state.state);

let (proof_provider, multiproof) = multiproof_rx.blocking_recv().unwrap();

let root_time = Instant::now();
let mut state_root_result = None;

Expand Down Expand Up @@ -2249,6 +2298,104 @@ where
Ok(InsertPayloadOk2::Inserted(BlockStatus2::Valid))
}

fn compute_state_root_from_proofs(
&self,
state_provider: Box<dyn StateProvider>,
state: &HashedPostState,
multiproof: MultiProof,
) -> ProviderResult<(B256, TrieUpdates)> {
let prefix_sets = state.construct_prefix_sets();

let mut account_rlp = Vec::with_capacity(128);
let mut account_trie_nodes = BTreeMap::default();
for (hashed_address, hashed_slots) in proof_targets {
let storage_multiproof =
multiproof.storages.remove(&hashed_address).unwrap_or_default();

// Gather and record account trie nodes.
let account = state
.accounts
.get(&hashed_address)
.ok_or(TrieWitnessError::MissingAccount(hashed_address))?;
let value = if account.is_some() || storage_multiproof.root != EMPTY_ROOT_HASH {
account_rlp.clear();
TrieAccount::from((account.unwrap_or_default(), storage_multiproof.root))
.encode(&mut account_rlp as &mut dyn BufMut);
Some(account_rlp.clone())
} else {
None
};
let key = Nibbles::unpack(hashed_address);
let proof = multiproof.account_subtree.iter().filter(|e| key.starts_with(e.0));
account_trie_nodes.extend(self.target_nodes(key.clone(), value, proof)?);

// Gather and record storage trie nodes for this account.
let mut storage_trie_nodes = BTreeMap::default();
let storage = state.storages.get(&hashed_address);
for hashed_slot in hashed_slots {
let slot_key = Nibbles::unpack(hashed_slot);
let slot_value = storage
.and_then(|s| s.storage.get(&hashed_slot))
.filter(|v| !v.is_zero())
.map(|v| alloy_rlp::encode_fixed_size(v).to_vec());
let proof = storage_multiproof.subtree.iter().filter(|e| slot_key.starts_with(e.0));
storage_trie_nodes.extend(self.target_nodes(
slot_key.clone(),
slot_value,
proof,
)?);
}

let (storage_root, storage_trie_updates) =
TrieWitness::next_root_from_proofs(storage_trie_nodes, true, |key: Nibbles| {
// Right pad the target with 0s.
let mut padded_key = key.pack();
padded_key.resize(32, 0);
let target = (hashed_address, Vec::from([B256::from_slice(&padded_key)]));
// TODO: proof_provider.hashed_proof
let mut proof = Proof::new(
self.trie_cursor_factory.clone(),
self.hashed_cursor_factory.clone(),
)
.with_prefix_sets_mut(self.prefix_sets.clone())
.with_targets(HashMap::from([target]))
.storage_multiproof(hashed_address)?;

// The subtree only contains the proof for a single target.
let node = proof
.subtree
.remove(&key)
.ok_or(TrieWitnessError::MissingTargetNode(key))?;
Ok(node)
})?;
debug_assert_eq!(storage_multiproof.root, storage_root);
}

let (state_root, trie_updates) =
TrieWitness::next_root_from_proofs(account_trie_nodes, true, |key: Nibbles| {
// Right pad the target with 0s.
let mut padded_key = key.pack();
padded_key.resize(32, 0);
// TODO: proof_provider.hashed_proof
let mut proof = Proof::new(
self.trie_cursor_factory.clone(),
self.hashed_cursor_factory.clone(),
)
.with_prefix_sets_mut(self.prefix_sets.clone())
.with_targets(HashMap::from([(B256::from_slice(&padded_key), Vec::new())]))
.multiproof()?;

// The subtree only contains the proof for a single target.
let node = proof
.account_subtree
.remove(&key)
.ok_or(TrieWitnessError::MissingTargetNode(key))?;
Ok(node)
})?;

todo!()
}

/// Compute state root for the given hashed post state in parallel.
///
/// # Returns
Expand Down
Loading

0 comments on commit 42de71c

Please sign in to comment.