Skip to content

Commit

Permalink
feat(pipeline): prune receipts based on log emitters (#4044)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshieDo authored Aug 16, 2023
1 parent 6edbc0e commit 8a2c3ab
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 54 deletions.
18 changes: 14 additions & 4 deletions bin/reth/src/args/pruning_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

use clap::Args;
use reth_config::config::PruneConfig;
use reth_primitives::{ChainSpec, PruneMode, PruneModes};
use reth_primitives::{
ChainSpec, ContractLogsPruneConfig, PruneMode, PruneModes, MINIMUM_PRUNING_DISTANCE,
};
use std::sync::Arc;

/// Parameters for pruning and full node
Expand All @@ -25,14 +27,22 @@ impl PruningArgs {
Some(PruneConfig {
block_interval: 5,
parts: PruneModes {
sender_recovery: Some(PruneMode::Distance(128)),
sender_recovery: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
transaction_lookup: None,
receipts: _chain_spec
.deposit_contract
.as_ref()
.map(|contract| PruneMode::Before(contract.block)),
account_history: Some(PruneMode::Distance(128)),
storage_history: Some(PruneMode::Distance(128)),
account_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
storage_history: Some(PruneMode::Distance(MINIMUM_PRUNING_DISTANCE)),
contract_logs_filter: ContractLogsPruneConfig(
_chain_spec
.deposit_contract
.as_ref()
.map(|contract| (contract.address, PruneMode::Before(contract.block)))
.into_iter()
.collect(),
),
},
})
} else {
Expand Down
2 changes: 1 addition & 1 deletion bin/reth/src/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ impl Command {
.clean_threshold
.max(stage_conf.account_hashing.clean_threshold)
.max(stage_conf.storage_hashing.clean_threshold),
config.prune.map(|prune| prune.parts).unwrap_or_default(),
config.prune.as_ref().map(|prune| prune.parts.clone()).unwrap_or_default(),
)),
)
.build(db, self.chain.clone());
Expand Down
7 changes: 4 additions & 3 deletions bin/reth/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,8 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
None
};

let prune_config = self.pruning.prune_config(Arc::clone(&self.chain))?.or(config.prune);
let prune_config =
self.pruning.prune_config(Arc::clone(&self.chain))?.or(config.prune.clone());

// Configure the pipeline
let (mut pipeline, client) = if self.dev.dev {
Expand Down Expand Up @@ -373,7 +374,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
db.clone(),
&ctx.task_executor,
metrics_tx,
prune_config,
prune_config.clone(),
max_block,
)
.await?;
Expand All @@ -393,7 +394,7 @@ impl<Ext: RethCliExt> NodeCommand<Ext> {
db.clone(),
&ctx.task_executor,
metrics_tx,
prune_config,
prune_config.clone(),
max_block,
)
.await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ impl Default for IndexHistoryConfig {
}

/// Pruning configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[derive(Debug, Clone, Deserialize, PartialEq, Serialize)]
#[serde(default)]
pub struct PruneConfig {
/// Minimum pruning interval measured in blocks.
Expand Down
5 changes: 4 additions & 1 deletion crates/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ pub use net::{
SEPOLIA_BOOTNODES,
};
pub use peer::{PeerId, WithPeerId};
pub use prune::{PruneCheckpoint, PruneMode, PruneModes, PrunePart, PrunePartError};
pub use prune::{
ContractLogsPruneConfig, PruneCheckpoint, PruneMode, PruneModes, PrunePart, PrunePartError,
MINIMUM_PRUNING_DISTANCE,
};
pub use receipt::{Receipt, ReceiptWithBloom, ReceiptWithBloomRef};
pub use revm_primitives::JumpMap;
pub use serde_helper::JsonU256;
Expand Down
40 changes: 39 additions & 1 deletion crates/primitives/src/prune/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,45 @@ mod mode;
mod part;
mod target;

use crate::{Address, BlockNumber};
pub use checkpoint::PruneCheckpoint;
pub use mode::PruneMode;
pub use part::{PrunePart, PrunePartError};
pub use target::PruneModes;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
pub use target::{PruneModes, MINIMUM_PRUNING_DISTANCE};

/// Configuration for pruning receipts not associated with logs emitted by the specified contracts.
#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
pub struct ContractLogsPruneConfig(pub BTreeMap<Address, PruneMode>);

impl ContractLogsPruneConfig {
/// Checks if the configuration is empty
pub fn is_empty(&self) -> bool {
self.0.is_empty()
}

/// Given the `tip` block number, consolidates the structure so it can easily be queried for
/// filtering across a range of blocks.
///
/// The [`BlockNumber`] key of the map should be viewed as `PruneMode::Before(block)`.
pub fn group_by_block(
&self,
tip: BlockNumber,
) -> Result<BTreeMap<BlockNumber, Vec<&Address>>, PrunePartError> {
let mut map = BTreeMap::new();
for (address, mode) in self.0.iter() {
// Getting `None`, means that there is nothing to prune yet, so we need it to include in
// the BTreeMap (block = 0), otherwise it will be excluded.
// Reminder that this BTreeMap works as an inclusion list that excludes (prunes) all
// other receipts.
let block = mode
.prune_target_block(tip, MINIMUM_PRUNING_DISTANCE, PrunePart::ContractLogs)?
.map(|(block, _)| block)
.unwrap_or_default();

map.entry(block).or_insert_with(Vec::new).push(address)
}
Ok(map)
}
}
109 changes: 107 additions & 2 deletions crates/primitives/src/prune/mode.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::BlockNumber;
use crate::{BlockNumber, PrunePart, PrunePartError};
use reth_codecs::{main_codec, Compact};

/// Prune mode.
Expand All @@ -14,6 +14,43 @@ pub enum PruneMode {
Before(BlockNumber),
}

impl PruneMode {
/// Returns block up to which variant pruning needs to be done, inclusive, according to the
/// provided tip.
pub fn prune_target_block(
&self,
tip: BlockNumber,
min_blocks: u64,
prune_part: PrunePart,
) -> Result<Option<(BlockNumber, PruneMode)>, PrunePartError> {
let result = match self {
PruneMode::Full if min_blocks == 0 => Some((tip, *self)),
PruneMode::Distance(distance) if *distance > tip => None, // Nothing to prune yet
PruneMode::Distance(distance) if *distance >= min_blocks => {
Some((tip - distance, *self))
}
PruneMode::Before(n) if *n > tip => None, // Nothing to prune yet
PruneMode::Before(n) if tip - n >= min_blocks => Some((n - 1, *self)),
_ => return Err(PrunePartError::Configuration(prune_part)),
};
Ok(result)
}

/// Check if target block should be pruned according to the provided prune mode and tip.
pub fn should_prune(&self, block: BlockNumber, tip: BlockNumber) -> bool {
match self {
PruneMode::Full => true,
PruneMode::Distance(distance) => {
if *distance > tip {
return false
}
block < tip - *distance
}
PruneMode::Before(n) => *n > block,
}
}
}

#[cfg(test)]
impl Default for PruneMode {
fn default() -> Self {
Expand All @@ -23,10 +60,78 @@ impl Default for PruneMode {

#[cfg(test)]
mod tests {
use crate::prune::PruneMode;
use crate::{prune::PruneMode, PrunePart, PrunePartError, MINIMUM_PRUNING_DISTANCE};
use assert_matches::assert_matches;
use serde::Deserialize;

#[test]
fn test_prune_target_block() {
let tip = 1000;
let min_blocks = MINIMUM_PRUNING_DISTANCE;
let prune_part = PrunePart::Receipts;

let tests = vec![
// MINIMUM_PRUNING_DISTANCE makes this impossible
(PruneMode::Full, Err(PrunePartError::Configuration(prune_part))),
// Nothing to prune
(PruneMode::Distance(tip + 1), Ok(None)),
(PruneMode::Distance(min_blocks + 1), Ok(Some(tip - (min_blocks + 1)))),
// Nothing to prune
(PruneMode::Before(tip + 1), Ok(None)),
(
PruneMode::Before(tip - MINIMUM_PRUNING_DISTANCE),
Ok(Some(tip - MINIMUM_PRUNING_DISTANCE - 1)),
),
(
PruneMode::Before(tip - MINIMUM_PRUNING_DISTANCE - 1),
Ok(Some(tip - MINIMUM_PRUNING_DISTANCE - 2)),
),
// MINIMUM_PRUNING_DISTANCE is 128
(PruneMode::Before(tip - 1), Err(PrunePartError::Configuration(prune_part))),
];

for (index, (mode, expected_result)) in tests.into_iter().enumerate() {
assert_eq!(
mode.prune_target_block(tip, min_blocks, prune_part),
expected_result.map(|r| r.map(|b| (b, mode))),
"Test {} failed",
index + 1,
);
}

// Test for a scenario where there are no minimum blocks and Full can be used
assert_eq!(
PruneMode::Full.prune_target_block(tip, 0, prune_part),
Ok(Some((tip, PruneMode::Full))),
);
}

#[test]
fn test_should_prune() {
let tip = 1000;
let should_prune = true;

let tests = vec![
(PruneMode::Distance(tip + 1), 1, !should_prune),
(
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE + 1),
tip - MINIMUM_PRUNING_DISTANCE - 1,
!should_prune,
),
(
PruneMode::Distance(MINIMUM_PRUNING_DISTANCE + 1),
tip - MINIMUM_PRUNING_DISTANCE - 2,
should_prune,
),
(PruneMode::Before(tip + 1), 1, should_prune),
(PruneMode::Before(tip + 1), tip + 1, !should_prune),
];

for (index, (mode, block, expected_result)) in tests.into_iter().enumerate() {
assert_eq!(mode.should_prune(block, tip), expected_result, "Test {} failed", index + 1,);
}
}

#[test]
fn prune_mode_deserialize() {
#[derive(Debug, Deserialize)]
Expand Down
6 changes: 4 additions & 2 deletions crates/primitives/src/prune/part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,18 @@ pub enum PrunePart {
SenderRecovery,
/// Prune part responsible for the `TxHashNumber` table.
TransactionLookup,
/// Prune part responsible for the `Receipts` table.
/// Prune part responsible for all `Receipts`.
Receipts,
/// Prune part responsible for some `Receipts` filtered by logs.
ContractLogs,
/// Prune part responsible for the `AccountChangeSet` and `AccountHistory` tables.
AccountHistory,
/// Prune part responsible for the `StorageChangeSet` and `StorageHistory` tables.
StorageHistory,
}

/// PrunePart error type.
#[derive(Debug, Error)]
#[derive(Debug, Error, PartialEq, Eq)]
pub enum PrunePartError {
/// Invalid configuration of a prune part.
#[error("The configuration provided for {0} is invalid.")]
Expand Down
45 changes: 17 additions & 28 deletions crates/primitives/src/prune/target.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
use crate::{
prune::PrunePartError, serde_helper::deserialize_opt_prune_mode_with_min_blocks, BlockNumber,
PruneMode, PrunePart,
ContractLogsPruneConfig, PruneMode, PrunePart,
};
use paste::paste;
use serde::{Deserialize, Serialize};

/// Minimum distance necessary from the tip so blockchain tree can work correctly.
pub const MINIMUM_PRUNING_DISTANCE: u64 = 128;

/// Pruning configuration for every part of the data that can be pruned.
#[derive(Debug, Clone, Default, Copy, Deserialize, Eq, PartialEq, Serialize)]
#[derive(Debug, Clone, Default, Deserialize, Eq, PartialEq, Serialize)]
#[serde(default)]
pub struct PruneModes {
/// Sender Recovery pruning configuration.
Expand All @@ -20,7 +23,8 @@ pub struct PruneModes {
/// Transaction Lookup pruning configuration.
#[serde(skip_serializing_if = "Option::is_none")]
pub transaction_lookup: Option<PruneMode>,
/// Receipts pruning configuration.
/// Configuration for pruning of receipts. This setting overrides
/// `PruneModes::contract_logs_filter` and offers improved performance.
#[serde(
skip_serializing_if = "Option::is_none",
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>"
Expand All @@ -38,6 +42,12 @@ pub struct PruneModes {
deserialize_with = "deserialize_opt_prune_mode_with_min_blocks::<64, _>"
)]
pub storage_history: Option<PruneMode>,
/// Retains only those receipts that contain logs emitted by the specified addresses,
/// discarding all others. Note that this setting is overridden by `PruneModes::receipts`.
///
/// The [`BlockNumber`] represents the starting block from which point onwards the receipts are
/// preserved.
pub contract_logs_filter: ContractLogsPruneConfig,
}

macro_rules! impl_prune_parts {
Expand All @@ -51,7 +61,7 @@ macro_rules! impl_prune_parts {
)]
pub fn [<should_prune_ $part>](&self, block: BlockNumber, tip: BlockNumber) -> bool {
if let Some(mode) = &self.$part {
return self.should_prune(mode, block, tip)
return mode.should_prune(block, tip)
}
false
}
Expand All @@ -66,16 +76,8 @@ macro_rules! impl_prune_parts {
" pruning needs to be done, inclusive, according to the provided tip."
)]
pub fn [<prune_target_block_ $part>](&self, tip: BlockNumber) -> Result<Option<(BlockNumber, PruneMode)>, PrunePartError> {
let min_blocks: u64 = $min_blocks.unwrap_or_default();
match self.$part {
Some(mode) => Ok(match mode {
PruneMode::Full if min_blocks == 0 => Some((tip, mode)),
PruneMode::Distance(distance) if distance > tip => None, // Nothing to prune yet
PruneMode::Distance(distance) if distance >= min_blocks => Some((tip - distance, mode)),
PruneMode::Before(n) if n > tip => None, // Nothing to prune yet
PruneMode::Before(n) if tip - n >= min_blocks => Some((n - 1, mode)),
_ => return Err(PrunePartError::Configuration(PrunePart::$variant)),
}),
match self.$part {
Some(mode) => mode.prune_target_block(tip, $min_blocks.unwrap_or_default(), PrunePart::$variant),
None => Ok(None)
}
}
Expand All @@ -88,6 +90,7 @@ macro_rules! impl_prune_parts {
$(
$part: Some(PruneMode::Full),
)+
contract_logs_filter: Default::default()
}
}

Expand All @@ -100,20 +103,6 @@ impl PruneModes {
PruneModes::default()
}

/// Check if target block should be pruned according to the provided prune mode and tip.
pub fn should_prune(&self, mode: &PruneMode, block: BlockNumber, tip: BlockNumber) -> bool {
match mode {
PruneMode::Full => true,
PruneMode::Distance(distance) => {
if *distance > tip {
return false
}
block < tip - *distance
}
PruneMode::Before(n) => *n > block,
}
}

impl_prune_parts!(
(sender_recovery, SenderRecovery, Some(64)),
(transaction_lookup, TransactionLookup, None),
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ impl<EF: ExecutorFactory> ExecutionStage<EF> {
start_block: u64,
max_block: u64,
) -> Result<PruneModes, StageError> {
let mut prune_modes = self.prune_modes;
let mut prune_modes = self.prune_modes.clone();

// If we're not executing MerkleStage from scratch (by threshold or first-sync), then erase
// changeset related pruning configurations
Expand Down
Loading

0 comments on commit 8a2c3ab

Please sign in to comment.