Skip to content

Commit

Permalink
Revert "feat(snapshot, prune): transactions" (#6121)
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin authored Jan 18, 2024
1 parent 38f013c commit 083ac14
Show file tree
Hide file tree
Showing 20 changed files with 614 additions and 457 deletions.
46 changes: 27 additions & 19 deletions bin/reth/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use metrics_exporter_prometheus::PrometheusHandle;
use once_cell::sync::Lazy;
use reth_auto_seal_consensus::{AutoSealBuilder, AutoSealConsensus, MiningMode};
use reth_beacon_consensus::{
hooks::{EngineHooks, PruneHook, SnapshotHook},
hooks::{EngineHooks, PruneHook},
BeaconConsensus, BeaconConsensusEngine, BeaconConsensusEngineError,
MIN_BLOCKS_FOR_PIPELINE_RUN,
};
Expand Down Expand Up @@ -1012,9 +1012,20 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit
let prometheus_handle = self.config.install_prometheus_recorder()?;
info!(target: "reth::cli", "Database opened");

let provider_factory =
ProviderFactory::new(Arc::clone(&self.db), Arc::clone(&self.config.chain))
.with_snapshots(self.data_dir.snapshots_path())?;
let mut provider_factory =
ProviderFactory::new(Arc::clone(&self.db), Arc::clone(&self.config.chain));

// configure snapshotter
let snapshotter = reth_snapshot::Snapshotter::new(
provider_factory.clone(),
self.data_dir.snapshots_path(),
self.config.chain.snapshot_block_interval,
)?;

provider_factory = provider_factory.with_snapshots(
self.data_dir.snapshots_path(),
snapshotter.highest_snapshot_receiver(),
)?;

self.config.start_metrics_endpoint(prometheus_handle, Arc::clone(&self.db)).await?;

Expand Down Expand Up @@ -1193,23 +1204,20 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit
let initial_target = self.config.initial_pipeline_target(genesis_hash);
let mut hooks = EngineHooks::new();

let mut pruner = PrunerBuilder::new(prune_config.clone().unwrap_or_default())
.max_reorg_depth(tree_config.max_reorg_depth() as usize)
.prune_delete_limit(self.config.chain.prune_delete_limit)
.build(provider_factory.clone());
let pruner_events = if let Some(prune_config) = prune_config {
let mut pruner = PrunerBuilder::new(prune_config.clone())
.max_reorg_depth(tree_config.max_reorg_depth() as usize)
.prune_delete_limit(self.config.chain.prune_delete_limit)
.build(provider_factory, snapshotter.highest_snapshot_receiver());

let pruner_events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(executor.clone())));
info!(target: "reth::cli", ?prune_config, "Pruner initialized");
let events = pruner.events();
hooks.add(PruneHook::new(pruner, Box::new(executor.clone())));

let snapshotter = reth_snapshot::Snapshotter::new(
provider_factory.clone(),
provider_factory
.snapshot_provider()
.expect("snapshot provider initialized via provider factory"),
);
hooks.add(SnapshotHook::new(snapshotter, Box::new(executor.clone())));
info!(target: "reth::cli", "Snapshotter initialized");
info!(target: "reth::cli", ?prune_config, "Pruner initialized");
Either::Left(events)
} else {
Either::Right(stream::empty())
};

// Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
Expand Down
48 changes: 18 additions & 30 deletions bin/reth/src/commands/db/snapshots/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use reth_db::{database::Database, open_db_read_only, DatabaseEnv};
use reth_interfaces::db::LogLevel;
use reth_nippy_jar::{NippyJar, NippyJarCursor};
use reth_primitives::{
snapshot::{
Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentConfig, SegmentHeader,
},
snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentHeader},
BlockNumber, ChainSpec, SnapshotSegment,
};
use reth_provider::{BlockNumReader, ProviderFactory};
Expand Down Expand Up @@ -81,15 +79,14 @@ impl Command {
log_level: Option<LogLevel>,
chain: Arc<ChainSpec>,
) -> eyre::Result<()> {
let all_combinations = self
.segments
.iter()
.cartesian_product(self.compression.iter().copied())
.cartesian_product(if self.phf.is_empty() {
vec![None]
} else {
self.phf.iter().copied().map(Some).collect::<Vec<_>>()
});
let all_combinations =
self.segments.iter().cartesian_product(self.compression.iter()).cartesian_product(
if self.phf.is_empty() {
vec![None]
} else {
self.phf.iter().copied().map(Some).collect::<Vec<_>>()
},
);

{
let db = open_db_read_only(db_path, None)?;
Expand All @@ -106,48 +103,45 @@ impl Command {
match mode {
SnapshotSegment::Headers => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
snap_segments::Headers,
SegmentConfig { filters, compression },
snap_segments::Headers::new(*compression, filters),
)?,
SnapshotSegment::Transactions => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
snap_segments::Transactions,
SegmentConfig { filters, compression },
snap_segments::Transactions::new(*compression, filters),
)?,
SnapshotSegment::Receipts => self.generate_snapshot::<DatabaseEnv>(
factory.clone(),
snap_segments::Receipts,
SegmentConfig { filters, compression },
snap_segments::Receipts::new(*compression, filters),
)?,
}
}
}
}

if self.only_bench || self.bench {
for ((mode, compression), phf) in all_combinations {
for ((mode, compression), phf) in all_combinations.clone() {
match mode {
SnapshotSegment::Headers => self.bench_headers_snapshot(
db_path,
log_level,
chain.clone(),
compression,
*compression,
InclusionFilter::Cuckoo,
phf,
)?,
SnapshotSegment::Transactions => self.bench_transactions_snapshot(
db_path,
log_level,
chain.clone(),
compression,
*compression,
InclusionFilter::Cuckoo,
phf,
)?,
SnapshotSegment::Receipts => self.bench_receipts_snapshot(
db_path,
log_level,
chain.clone(),
compression,
*compression,
InclusionFilter::Cuckoo,
phf,
)?,
Expand Down Expand Up @@ -177,8 +171,7 @@ impl Command {
fn generate_snapshot<DB: Database>(
&self,
factory: Arc<ProviderFactory<DB>>,
segment: impl Segment<DB> + Send + Sync,
config: SegmentConfig,
segment: impl Segment + Send + Sync,
) -> eyre::Result<()> {
let dir = PathBuf::default();
let ranges = self.block_ranges(factory.best_block_number()?);
Expand All @@ -193,12 +186,7 @@ impl Command {
let provider = factory.provider()?;

if !self.only_stats {
segment.create_snapshot_file(
&provider,
dir.as_path(),
config,
block_range.clone(),
)?;
segment.snapshot::<DB>(&provider, &dir, block_range.clone())?;
}

Ok(segment.segment().filename(block_range))
Expand Down
1 change: 1 addition & 0 deletions crates/consensus/beacon/src/engine/hooks/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use tracing::debug;

#[derive(Debug)]
pub(crate) struct PolledHook {
#[allow(dead_code)]
pub(crate) name: &'static str,
pub(crate) event: EngineHookEvent,
pub(crate) db_access_level: EngineHookDBAccessLevel,
Expand Down
6 changes: 4 additions & 2 deletions crates/consensus/beacon/src/engine/hooks/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<DB: Database + 'static> SnapshotHook<DB> {
) -> RethResult<Option<EngineHookEvent>> {
Ok(match &mut self.state {
SnapshotterState::Idle(snapshotter) => {
let Some(snapshotter) = snapshotter.take() else { return Ok(None) };
let Some(mut snapshotter) = snapshotter.take() else { return Ok(None) };

let targets = snapshotter.get_snapshot_targets(finalized_block_number)?;

Expand Down Expand Up @@ -112,7 +112,9 @@ impl<DB: Database + 'static> EngineHook for SnapshotHook<DB> {
cx: &mut Context<'_>,
ctx: EngineContext,
) -> Poll<RethResult<EngineHookEvent>> {
let Some(finalized_block_number) = ctx.finalized_block_number else { return Poll::Pending };
let Some(finalized_block_number) = ctx.finalized_block_number else {
return Poll::Ready(Ok(EngineHookEvent::NotReady))
};

// Try to spawn a snapshotter
match self.try_spawn_snapshotter(finalized_block_number)? {
Expand Down
21 changes: 4 additions & 17 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1697,18 +1697,9 @@ where
None
}

fn on_hook_result(&self, polled_hook: PolledHook) -> Result<(), BeaconConsensusEngineError> {
if let EngineHookEvent::Finished(Err(error)) = &polled_hook.event {
error!(
target: "consensus::engine",
name = %polled_hook.name,
?error,
"Hook finished with error"
)
}

if polled_hook.db_access_level.is_read_write() {
match polled_hook.event {
fn on_hook_result(&self, result: PolledHook) -> Result<(), BeaconConsensusEngineError> {
if result.db_access_level.is_read_write() {
match result.event {
EngineHookEvent::NotReady => {}
EngineHookEvent::Started => {
// If the hook has read-write access to the database, it means that the engine
Expand All @@ -1726,11 +1717,7 @@ where
if let Err(error) =
self.blockchain.connect_buffered_blocks_to_canonical_hashes()
{
error!(
target: "consensus::engine",
?error,
"Error connecting buffered blocks to canonical hashes on hook result"
);
error!(target: "consensus::engine", ?error, "Error connecting buffered blocks to canonical hashes on hook result");
return Err(error.into())
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ where
5,
self.base_config.chain_spec.prune_delete_limit,
config.max_reorg_depth() as usize,
watch::channel(None).1,
);

let mut hooks = EngineHooks::new();
Expand Down
7 changes: 0 additions & 7 deletions crates/primitives/src/prune/mode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,6 @@ pub enum PruneMode {
}

impl PruneMode {
/// Prune blocks up to the specified block number. The specified block number is also pruned.
///
/// This acts as `PruneMode::Before(block_number + 1)`.
pub fn before_inclusive(block_number: BlockNumber) -> Self {
Self::Before(block_number + 1)
}

/// Returns block up to which variant pruning needs to be done, inclusive, according to the
/// provided tip.
pub fn prune_target_block(
Expand Down
8 changes: 7 additions & 1 deletion crates/prune/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use reth_config::PruneConfig;
use reth_db::database::Database;
use reth_primitives::{PruneModes, MAINNET};
use reth_provider::ProviderFactory;
use reth_snapshot::HighestSnapshotsTracker;

/// Contains the information required to build a pruner
#[derive(Debug, Clone, PartialEq, Eq)]
Expand Down Expand Up @@ -52,7 +53,11 @@ impl PrunerBuilder {
}

/// Builds a [Pruner] from the current configuration.
pub fn build<DB: Database>(self, provider_factory: ProviderFactory<DB>) -> Pruner<DB> {
pub fn build<DB: Database>(
self,
provider_factory: ProviderFactory<DB>,
highest_snapshots_rx: HighestSnapshotsTracker,
) -> Pruner<DB> {
let segments = SegmentSet::<DB>::from_prune_modes(self.segments);

Pruner::new(
Expand All @@ -61,6 +66,7 @@ impl PrunerBuilder {
self.block_interval,
self.prune_delete_limit,
self.max_reorg_depth,
highest_snapshots_rx,
)
}
}
Expand Down
Loading

0 comments on commit 083ac14

Please sign in to comment.