Skip to content

Commit

Permalink
Merge branch 'main' into cargo-update
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse committed Aug 6, 2023
2 parents 687ebb5 + d8b9660 commit ee52a83
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 53 deletions.
171 changes: 125 additions & 46 deletions crates/storage/provider/src/providers/database/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use reth_interfaces::Result;
use reth_primitives::{
keccak256,
stage::{StageCheckpoint, StageId},
trie::Nibbles,
Account, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders,
ChainInfo, ChainSpec, Hardfork, Head, Header, PruneCheckpoint, PrunePart, Receipt, SealedBlock,
SealedBlockWithSenders, SealedHeader, StorageEntry, TransactionMeta, TransactionSigned,
Expand All @@ -37,9 +38,9 @@ use reth_revm_primitives::{
env::{fill_block_env, fill_cfg_and_block_env, fill_cfg_env},
primitives::{BlockEnv, CfgEnv, SpecId},
};
use reth_trie::StateRoot;
use reth_trie::{prefix_set::PrefixSetMut, StateRoot};
use std::{
collections::{btree_map::Entry, BTreeMap, BTreeSet},
collections::{btree_map::Entry, BTreeMap, BTreeSet, HashMap},
fmt::Debug,
ops::{Deref, DerefMut, Range, RangeBounds, RangeInclusive},
sync::Arc,
Expand Down Expand Up @@ -1407,25 +1408,47 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider
end_block_hash: H256,
expected_state_root: H256,
) -> Result<()> {
// Initialize prefix sets.
let mut account_prefix_set = PrefixSetMut::default();
let mut storage_prefix_set: HashMap<H256, PrefixSetMut> = HashMap::default();

// storage hashing stage
{
let lists = self.changed_storages_with_range(range.clone())?;
let storages = self.plainstate_storages(lists)?;
self.insert_storage_for_hashing(storages)?;
let storage_entries = self.insert_storage_for_hashing(storages)?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
for slot in hashed_slots {
storage_prefix_set
.entry(hashed_address)
.or_default()
.insert(Nibbles::unpack(slot));
}
}
}

// account hashing stage
{
let lists = self.changed_accounts_with_range(range.clone())?;
let accounts = self.basic_accounts(lists)?;
self.insert_account_for_hashing(accounts)?;
let hashed_addresses = self.insert_account_for_hashing(accounts)?;
for hashed_address in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
}
}

// merkle tree
{
let (state_root, trie_updates) =
StateRoot::incremental_root_with_updates(&self.tx, range.clone())
.map_err(Into::<reth_db::DatabaseError>::into)?;
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
// are pre-loaded.
let (state_root, trie_updates) = StateRoot::new(&self.tx)
.with_changed_account_prefixes(account_prefix_set.freeze())
.with_changed_storage_prefixes(
storage_prefix_set.into_iter().map(|(k, v)| (k, v.freeze())).collect(),
)
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;
if state_root != expected_state_root {
return Err(ProviderError::StateRootMismatch {
got: state_root,
Expand All @@ -1440,11 +1463,15 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider
Ok(())
}

fn unwind_storage_hashing(&self, range: Range<BlockNumberAddress>) -> Result<()> {
fn unwind_storage_hashing(
&self,
range: Range<BlockNumberAddress>,
) -> Result<HashMap<H256, BTreeSet<H256>>> {
let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorage>()?;

// Aggregate all block changesets and make list of accounts that have been changed.
self.tx
let hashed_storages = self
.tx
.cursor_read::<tables::StorageChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?
Expand All @@ -1463,7 +1490,14 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|((address, key), value)| ((keccak256(address), keccak256(key)), value))
.collect::<BTreeMap<_, _>>()
.collect::<BTreeMap<_, _>>();

let mut hashed_storage_keys: HashMap<H256, BTreeSet<H256>> = HashMap::default();
for (hashed_address, hashed_slot) in hashed_storages.keys() {
hashed_storage_keys.entry(*hashed_address).or_default().insert(*hashed_slot);
}

hashed_storages
.into_iter()
// Apply values to HashedStorage (if Value is zero just remove it);
.try_for_each(|((hashed_address, key), value)| -> Result<()> {
Expand All @@ -1481,50 +1515,58 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider
Ok(())
})?;

Ok(())
Ok(hashed_storage_keys)
}

fn insert_storage_for_hashing(
&self,
storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
) -> Result<()> {
) -> Result<HashMap<H256, BTreeSet<H256>>> {
// hash values
let hashed = storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
map.insert(keccak256(entry.key), entry.value);
let hashed_storages =
storages.into_iter().fold(BTreeMap::new(), |mut map, (address, storage)| {
let storage = storage.into_iter().fold(BTreeMap::new(), |mut map, entry| {
map.insert(keccak256(entry.key), entry.value);
map
});
map.insert(keccak256(address), storage);
map
});
map.insert(keccak256(address), storage);
map
});

let mut hashed_storage = self.tx.cursor_dup_write::<tables::HashedStorage>()?;
let hashed_storage_keys =
HashMap::from_iter(hashed_storages.iter().map(|(hashed_address, entries)| {
(*hashed_address, BTreeSet::from_iter(entries.keys().copied()))
}));

let mut hashed_storage_cursor = self.tx.cursor_dup_write::<tables::HashedStorage>()?;
// Hash the address and key and apply them to HashedStorage (if Storage is None
// just remove it);
hashed.into_iter().try_for_each(|(hashed_address, storage)| {
hashed_storages.into_iter().try_for_each(|(hashed_address, storage)| {
storage.into_iter().try_for_each(|(key, value)| -> Result<()> {
if hashed_storage
if hashed_storage_cursor
.seek_by_key_subkey(hashed_address, key)?
.filter(|entry| entry.key == key)
.is_some()
{
hashed_storage.delete_current()?;
hashed_storage_cursor.delete_current()?;
}

if value != U256::ZERO {
hashed_storage.upsert(hashed_address, StorageEntry { key, value })?;
hashed_storage_cursor.upsert(hashed_address, StorageEntry { key, value })?;
}
Ok(())
})
})?;
Ok(())

Ok(hashed_storage_keys)
}

fn unwind_account_hashing(&self, range: RangeInclusive<BlockNumber>) -> Result<()> {
let mut hashed_accounts = self.tx.cursor_write::<tables::HashedAccount>()?;
fn unwind_account_hashing(&self, range: RangeInclusive<BlockNumber>) -> Result<BTreeSet<H256>> {
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccount>()?;

// Aggregate all block changesets and make a list of accounts that have been changed.
self.tx
let hashed_accounts = self
.tx
.cursor_read::<tables::AccountChangeSet>()?
.walk_range(range)?
.collect::<std::result::Result<Vec<_>, _>>()?
Expand All @@ -1542,44 +1584,51 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> HashingWriter for DatabaseProvider
// hash addresses and collect it inside sorted BTreeMap.
// We are doing keccak only once per address.
.map(|(address, account)| (keccak256(address), account))
.collect::<BTreeMap<_, _>>()
.collect::<BTreeMap<_, _>>();

let hashed_account_keys = BTreeSet::from_iter(hashed_accounts.keys().copied());

hashed_accounts
.into_iter()
// Apply values to HashedState (if Account is None remove it);
.try_for_each(|(hashed_address, account)| -> Result<()> {
if let Some(account) = account {
hashed_accounts.upsert(hashed_address, account)?;
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
hashed_accounts.delete_current()?;
hashed_accounts_cursor.upsert(hashed_address, account)?;
} else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
Ok(())
})?;

Ok(())
Ok(hashed_account_keys)
}

fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> Result<()> {
let mut hashed_accounts = self.tx.cursor_write::<tables::HashedAccount>()?;
) -> Result<BTreeSet<H256>> {
let mut hashed_accounts_cursor = self.tx.cursor_write::<tables::HashedAccount>()?;

let hashes_accounts = accounts.into_iter().fold(
let hashed_accounts = accounts.into_iter().fold(
BTreeMap::new(),
|mut map: BTreeMap<H256, Option<Account>>, (address, account)| {
map.insert(keccak256(address), account);
map
},
);

hashes_accounts.into_iter().try_for_each(|(hashed_address, account)| -> Result<()> {
let hashed_addresses = BTreeSet::from_iter(hashed_accounts.keys().copied());

hashed_accounts.into_iter().try_for_each(|(hashed_address, account)| -> Result<()> {
if let Some(account) = account {
hashed_accounts.upsert(hashed_address, account)?
} else if hashed_accounts.seek_exact(hashed_address)?.is_some() {
hashed_accounts.delete_current()?;
hashed_accounts_cursor.upsert(hashed_address, account)?
} else if hashed_accounts_cursor.seek_exact(hashed_address)?.is_some() {
hashed_accounts_cursor.delete_current()?;
}
Ok(())
})?;
Ok(())

Ok(hashed_addresses)
}
}

Expand Down Expand Up @@ -1718,15 +1767,45 @@ impl<'this, TX: DbTxMut<'this> + DbTx<'this>> BlockExecutionWriter for DatabaseP
if TAKE {
let storage_range = BlockNumberAddress::range(range.clone());

self.unwind_account_hashing(range.clone())?;
// Initialize prefix sets.
let mut account_prefix_set = PrefixSetMut::default();
let mut storage_prefix_set: HashMap<H256, PrefixSetMut> = HashMap::default();

// Unwind account hashes. Add changed accounts to account prefix set.
let hashed_addresses = self.unwind_account_hashing(range.clone())?;
for hashed_address in hashed_addresses {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
}

// Unwind account history indices.
self.unwind_account_history_indices(range.clone())?;
self.unwind_storage_hashing(storage_range.clone())?;

// Unwind storage hashes. Add changed account and storage keys to corresponding prefix
// sets.
let storage_entries = self.unwind_storage_hashing(storage_range.clone())?;
for (hashed_address, hashed_slots) in storage_entries {
account_prefix_set.insert(Nibbles::unpack(hashed_address));
for slot in hashed_slots {
storage_prefix_set
.entry(hashed_address)
.or_default()
.insert(Nibbles::unpack(slot));
}
}

// Unwind storage history indices.
self.unwind_storage_history_indices(storage_range)?;

// merkle tree
let (new_state_root, trie_updates) =
StateRoot::incremental_root_with_updates(&self.tx, range.clone())
.map_err(Into::<reth_db::DatabaseError>::into)?;
// Calculate the reverted merkle root.
// This is the same as `StateRoot::incremental_root_with_updates`, only the prefix sets
// are pre-loaded.
let (new_state_root, trie_updates) = StateRoot::new(&self.tx)
.with_changed_account_prefixes(account_prefix_set.freeze())
.with_changed_storage_prefixes(
storage_prefix_set.into_iter().map(|(k, v)| (k, v.freeze())).collect(),
)
.root_with_updates()
.map_err(Into::<reth_db::DatabaseError>::into)?;

let parent_number = range.start().saturating_sub(1);
let parent_state_root = self
Expand Down
36 changes: 29 additions & 7 deletions crates/storage/provider/src/traits/hashing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,50 @@ use auto_impl::auto_impl;
use reth_db::models::BlockNumberAddress;
use reth_interfaces::Result;
use reth_primitives::{Account, Address, BlockNumber, StorageEntry, H256};
use std::ops::{Range, RangeInclusive};
use std::{
collections::{BTreeSet, HashMap},
ops::{Range, RangeInclusive},
};

/// Hashing Writer
#[auto_impl(&, Arc, Box)]
pub trait HashingWriter: Send + Sync {
/// Unwind and clear account hashing
fn unwind_account_hashing(&self, range: RangeInclusive<BlockNumber>) -> Result<()>;
/// Unwind and clear account hashing.
///
/// # Returns
///
/// Set of hashed keys of updated accounts.
fn unwind_account_hashing(&self, range: RangeInclusive<BlockNumber>) -> Result<BTreeSet<H256>>;

/// Inserts all accounts into [reth_db::tables::AccountHistory] table.
///
/// # Returns
///
/// Set of hashed keys of updated accounts.
fn insert_account_for_hashing(
&self,
accounts: impl IntoIterator<Item = (Address, Option<Account>)>,
) -> Result<()>;
) -> Result<BTreeSet<H256>>;

/// Unwind and clear storage hashing
fn unwind_storage_hashing(&self, range: Range<BlockNumberAddress>) -> Result<()>;
///
/// # Returns
///
/// Mapping of hashed keys of updated accounts to their respective updated hashed slots.
fn unwind_storage_hashing(
&self,
range: Range<BlockNumberAddress>,
) -> Result<HashMap<H256, BTreeSet<H256>>>;

/// iterate over storages and insert them to hashing table
/// Iterates over storages and inserts them to hashing table.
///
/// # Returns
///
/// Mapping of hashed keys of updated accounts to their respective updated hashed slots.
fn insert_storage_for_hashing(
&self,
storages: impl IntoIterator<Item = (Address, impl IntoIterator<Item = StorageEntry>)>,
) -> Result<()>;
) -> Result<HashMap<H256, BTreeSet<H256>>>;

/// Calculate the hashes of all changed accounts and storages, and finally calculate the state
/// root.
Expand Down

0 comments on commit ee52a83

Please sign in to comment.