Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storage): fallback lookups for pruned history #4121

Merged
merged 18 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions crates/interfaces/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,6 @@ pub enum ProviderError {
/// Block hash
block_hash: BlockHash,
},
#[error("State at block #{0} is pruned")]
StateAtBlockPruned(BlockNumber),
}
13 changes: 9 additions & 4 deletions crates/prune/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ impl<DB: Database> Pruner<DB> {
{
let mut processed = 0;
let mut cursor = provider.tx_ref().cursor_write::<T>()?;

// Prune history table:
// 1. If the shard has `highest_block_number` less than or equal to the target block number
// for pruning, delete the shard completely.
Expand Down Expand Up @@ -525,20 +526,24 @@ impl<DB: Database> Pruner<DB> {
// If there are no more blocks in this shard, we need to remove it, as empty
// shards are not allowed.
if key.as_ref().highest_block_number == u64::MAX {
// If current shard is the last shard for this sharded key, replace it
// with the previous shard.
if let Some(prev_value) = cursor
.prev()?
.filter(|(prev_key, _)| key_matches(prev_key, &key))
.map(|(_, prev_value)| prev_value)
{
// If current shard is the last shard for the sharded key that has
// previous shards, replace it with the previous shard.
cursor.delete_current()?;
// Upsert will replace the last shard for this sharded key with the
// previous value
// previous value.
cursor.upsert(key.clone(), prev_value)?;
} else {
// If there's no previous shard for this sharded key,
// just delete last shard completely.

// Jump back to the original last shard.
cursor.next()?;
// Delete shard.
cursor.delete_current()?;
}
} else {
Expand All @@ -551,7 +556,7 @@ impl<DB: Database> Pruner<DB> {
}
}

// Jump to the next address
// Jump to the next address.
cursor.seek_exact(last_key(&key))?;
}

Expand Down
51 changes: 34 additions & 17 deletions crates/storage/provider/src/providers/database/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl<DB: Database> ProviderFactory<DB> {
}

/// Storage provider for state at that given block
pub fn history_by_block_number(
fn state_provider_by_block_number(
&self,
mut block_number: BlockNumber,
) -> Result<StateProviderBox<'_>> {
Expand All @@ -102,30 +102,47 @@ impl<DB: Database> ProviderFactory<DB> {
// +1 as the changeset that we want is the one that was applied after this block.
block_number += 1;

let account_history_prune_checkpoint =
provider.get_prune_checkpoint(PrunePart::AccountHistory)?;
let storage_history_prune_checkpoint =
provider.get_prune_checkpoint(PrunePart::StorageHistory)?;

let mut state_provider = HistoricalStateProvider::new(provider.into_tx(), block_number);

// If we pruned account or storage history, we can't return state on every historical block.
// Instead, we should cap it at the latest prune checkpoint for corresponding prune part.
if let Some(prune_checkpoint) = account_history_prune_checkpoint {
state_provider = state_provider
.with_lowest_account_history_block_number(prune_checkpoint.block_number + 1);
}
if let Some(prune_checkpoint) = storage_history_prune_checkpoint {
state_provider = state_provider
.with_lowest_storage_history_block_number(prune_checkpoint.block_number + 1);
}

Ok(Box::new(state_provider))
}

/// Storage provider for state at that given block
pub fn history_by_block_number(
&self,
block_number: BlockNumber,
) -> Result<StateProviderBox<'_>> {
let state_provider = self.state_provider_by_block_number(block_number)?;
trace!(target: "providers::db", ?block_number, "Returning historical state provider for block number");
Ok(Box::new(HistoricalStateProvider::new(provider.into_tx(), block_number)))
Ok(state_provider)
}

/// Storage provider for state at that given block hash
pub fn history_by_block_hash(&self, block_hash: BlockHash) -> Result<StateProviderBox<'_>> {
let provider = self.provider()?;

let mut block_number = provider
let block_number = self
.provider()?
.block_number(block_hash)?
.ok_or(ProviderError::BlockHashNotFound(block_hash))?;

if block_number == provider.best_block_number().unwrap_or_default() &&
block_number == provider.last_block_number().unwrap_or_default()
{
return Ok(Box::new(LatestStateProvider::new(provider.into_tx())))
}

// +1 as the changeset that we want is the one that was applied after this block.
// as the changeset contains old values.
block_number += 1;

trace!(target: "providers::db", ?block_hash, "Returning historical state provider for block hash");
Ok(Box::new(HistoricalStateProvider::new(provider.into_tx(), block_number)))
let state_provider = self.state_provider_by_block_number(block_number)?;
trace!(target: "providers::db", ?block_number, "Returning historical state provider for block hash");
Ok(state_provider)
}
}

Expand Down
155 changes: 132 additions & 23 deletions crates/storage/provider/src/providers/state/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@ use reth_primitives::{
};
use std::marker::PhantomData;

/// State provider for a given transition id which takes a tx reference.
/// State provider for a given block number which takes a tx reference.
///
/// Historical state provider accesses the state at the start of the provided block number.
/// It means that all changes made in the provided block number are not included.
///
/// Historical state provider reads the following tables:
/// - [tables::AccountHistory]
Expand All @@ -29,6 +32,12 @@ pub struct HistoricalStateProviderRef<'a, 'b, TX: DbTx<'a>> {
tx: &'b TX,
/// Block number is main index for the history state of accounts and storages.
block_number: BlockNumber,
/// Lowest block number at which the account history is available.
/// [Option::None] means all history is available.
lowest_account_history_block_number: Option<BlockNumber>,
/// Lowest block number at which the storage history is available.
/// [Option::None] means all history is available.
lowest_storage_history_block_number: Option<BlockNumber>,
/// Phantom lifetime `'a`
_phantom: PhantomData<&'a TX>,
}
Expand All @@ -37,19 +46,57 @@ pub enum HistoryInfo {
NotYetWritten,
InChangeset(u64),
InPlainState,
MaybeInPlainState,
}

impl<'a, 'b, TX: DbTx<'a>> HistoricalStateProviderRef<'a, 'b, TX> {
/// Create new StateProvider from history transaction number
/// Create new StateProvider for historical block number
pub fn new(tx: &'b TX, block_number: BlockNumber) -> Self {
Self { tx, block_number, _phantom: PhantomData {} }
Self {
tx,
block_number,
lowest_account_history_block_number: None,
lowest_storage_history_block_number: None,
_phantom: PhantomData {},
}
}

/// Create new StateProvider for historical block number and lowest block numbers at which
/// account & storage histories are available.
pub fn new_with_lowest_history_block_numbers(
tx: &'b TX,
lowest_account_history_block_number: Option<BlockNumber>,
lowest_storage_history_block_number: Option<BlockNumber>,
block_number: BlockNumber,
) -> Self {
Self {
tx,
block_number,
lowest_account_history_block_number,
lowest_storage_history_block_number,
_phantom: PhantomData {},
}
}

/// Lookup an account in the AccountHistory table
pub fn account_history_lookup(&self, address: Address) -> Result<HistoryInfo> {
// Check if lowest available block number for storage history is more than the requested
// block number for this historical provider instance.
if self
.lowest_account_history_block_number
.map(|block_number| block_number > self.block_number)
.unwrap_or(false)
{
return Err(ProviderError::StateAtBlockPruned(self.block_number).into())
}

// history key to search IntegerList of block number changesets.
let history_key = ShardedKey::new(address, self.block_number);
self.history_info::<tables::AccountHistory, _>(history_key, |key| key.key == address)
self.history_info::<tables::AccountHistory, _>(
history_key,
|key| key.key == address,
self.lowest_account_history_block_number,
)
}

/// Lookup a storage key in the StorageHistory table
Expand All @@ -58,14 +105,31 @@ impl<'a, 'b, TX: DbTx<'a>> HistoricalStateProviderRef<'a, 'b, TX> {
address: Address,
storage_key: StorageKey,
) -> Result<HistoryInfo> {
// Check if lowest available block number for account history is more than the requested
// block number for this historical provider instance.
if self
.lowest_storage_history_block_number
.map(|block_number| block_number > self.block_number)
.unwrap_or(false)
{
return Err(ProviderError::StateAtBlockPruned(self.block_number).into())
}

// history key to search IntegerList of block number changesets.
let history_key = StorageShardedKey::new(address, storage_key, self.block_number);
self.history_info::<tables::StorageHistory, _>(history_key, |key| {
key.address == address && key.sharded_key.key == storage_key
})
self.history_info::<tables::StorageHistory, _>(
history_key,
|key| key.address == address && key.sharded_key.key == storage_key,
self.lowest_storage_history_block_number,
)
}

fn history_info<T, K>(&self, key: K, key_filter: impl Fn(&K) -> bool) -> Result<HistoryInfo>
fn history_info<T, K>(
&self,
key: K,
key_filter: impl Fn(&K) -> bool,
lowest_available_block_number: Option<BlockNumber>,
) -> Result<HistoryInfo>
where
T: Table<Key = K, Value = BlockNumberList>,
{
Expand All @@ -80,24 +144,36 @@ impl<'a, 'b, TX: DbTx<'a>> HistoricalStateProviderRef<'a, 'b, TX> {
// Get the rank of the first entry after our block.
let rank = chunk.rank(self.block_number as usize);

// If our block is before the first entry in the index chunk, it might be before
// the first write ever. To check, we look at the previous entry and check if the
// key is the same.
// If our block is before the first entry in the index chunk and this first entry
// doesn't equal to our block, it might be before the first write ever. To check, we
// look at the previous entry and check if the key is the same.
// This check is worth it, the `cursor.prev()` check is rarely triggered (the if will
// short-circuit) and when it passes we save a full seek into the changeset/plain state
// table.
if rank == 0 && !cursor.prev()?.is_some_and(|(key, _)| key_filter(&key)) {
// The key is written to, but only after our block.
return Ok(HistoryInfo::NotYetWritten)
}
if rank < chunk.len() {
if rank == 0 &&
chunk.select(rank) as u64 != self.block_number &&
!cursor.prev()?.is_some_and(|(key, _)| key_filter(&key))
{
if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets
// and history, so we need to make a changeset lookup.
Ok(HistoryInfo::InChangeset(chunk.select(rank) as u64))
} else {
// The key is written to, but only after our block.
Ok(HistoryInfo::NotYetWritten)
}
} else if rank < chunk.len() {
// The chunk contains an entry for a write after our block, return it.
Ok(HistoryInfo::InChangeset(chunk.select(rank) as u64))
} else {
// The chunk does not contain an entry for a write after our block. This can only
// happen if this is the last chunk and so we need to look in the plain state.
Ok(HistoryInfo::InPlainState)
}
} else if lowest_available_block_number.is_some() {
// The key may have been written, but due to pruning we may not have changesets and
// history, so we need to make a plain state lookup.
Ok(HistoryInfo::MaybeInPlainState)
} else {
// The key has not been written to at all.
Ok(HistoryInfo::NotYetWritten)
Expand All @@ -120,7 +196,9 @@ impl<'a, 'b, TX: DbTx<'a>> AccountReader for HistoricalStateProviderRef<'a, 'b,
address,
})?
.info),
HistoryInfo::InPlainState => Ok(self.tx.get::<tables::PlainAccountState>(address)?),
HistoryInfo::InPlainState | HistoryInfo::MaybeInPlainState => {
Ok(self.tx.get::<tables::PlainAccountState>(address)?)
}
}
}
}
Expand Down Expand Up @@ -168,7 +246,7 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for HistoricalStateProviderRef<'a, 'b,
})?
.value,
)),
HistoryInfo::InPlainState => Ok(self
HistoryInfo::InPlainState | HistoryInfo::MaybeInPlainState => Ok(self
.tx
.cursor_dup_read::<tables::PlainStorageState>()?
.seek_by_key_subkey(address, storage_key)?
Expand All @@ -193,26 +271,54 @@ impl<'a, 'b, TX: DbTx<'a>> StateProvider for HistoricalStateProviderRef<'a, 'b,
}
}

/// State provider for a given transition
/// State provider for a given block number.
/// For more detailed description, see [HistoricalStateProviderRef].
pub struct HistoricalStateProvider<'a, TX: DbTx<'a>> {
/// Database transaction
tx: TX,
/// State at the block number is the main indexer of the state.
block_number: BlockNumber,
/// Lowest block number at which the account history is available.
lowest_account_history_block_number: Option<BlockNumber>,
/// Lowest block number at which the storage history is available.
lowest_storage_history_block_number: Option<BlockNumber>,
/// Phantom lifetime `'a`
_phantom: PhantomData<&'a TX>,
}

impl<'a, TX: DbTx<'a>> HistoricalStateProvider<'a, TX> {
/// Create new StateProvider from history transaction number
/// Create new StateProvider for historical block number
pub fn new(tx: TX, block_number: BlockNumber) -> Self {
Self { tx, block_number, _phantom: PhantomData {} }
Self {
tx,
block_number,
lowest_account_history_block_number: None,
lowest_storage_history_block_number: None,
_phantom: PhantomData {},
}
}

/// Set the lowest block number at which the account history is available.
pub fn with_lowest_account_history_block_number(mut self, block_number: BlockNumber) -> Self {
self.lowest_account_history_block_number = Some(block_number);
self
}

/// Set the lowest block number at which the storage history is available.
pub fn with_lowest_storage_history_block_number(mut self, block_number: BlockNumber) -> Self {
self.lowest_storage_history_block_number = Some(block_number);
self
}

/// Returns a new provider that takes the `TX` as reference
#[inline(always)]
fn as_ref<'b>(&'b self) -> HistoricalStateProviderRef<'a, 'b, TX> {
HistoricalStateProviderRef::new(&self.tx, self.block_number)
HistoricalStateProviderRef::new_with_lowest_history_block_numbers(
&self.tx,
self.lowest_account_history_block_number,
self.lowest_storage_history_block_number,
self.block_number,
)
}
}

Expand Down Expand Up @@ -406,7 +512,10 @@ mod tests {

// run
assert_eq!(HistoricalStateProviderRef::new(&tx, 0).storage(ADDRESS, STORAGE), Ok(None));
assert_eq!(HistoricalStateProviderRef::new(&tx, 3).storage(ADDRESS, STORAGE), Ok(None));
assert_eq!(
HistoricalStateProviderRef::new(&tx, 3).storage(ADDRESS, STORAGE),
Ok(Some(U256::ZERO))
);
assert_eq!(
HistoricalStateProviderRef::new(&tx, 4).storage(ADDRESS, STORAGE),
Ok(Some(entry_at7.value))
Expand Down
Loading