Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snapshot, stages): snapshot stage #6248

Merged
merged 11 commits into from
Jan 30, 2024
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 15 additions & 2 deletions bin/reth/src/commands/debug_cmd/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@ use reth_interfaces::{
};
use reth_network::{NetworkEvents, NetworkHandle};
use reth_network_api::NetworkInfo;

use reth_primitives::{fs, stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, B256};
use reth_primitives::{
fs, stage::StageId, BlockHashOrNumber, BlockNumber, ChainSpec, PruneModes, B256,
};
use reth_provider::{BlockExecutionWriter, HeaderSyncMode, ProviderFactory, StageCheckpointReader};
use reth_snapshot::Snapshotter;
use reth_stages::{
sets::DefaultStages,
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage},
Expand Down Expand Up @@ -93,6 +95,7 @@ impl Command {
consensus: Arc<dyn Consensus>,
provider_factory: ProviderFactory<DB>,
task_executor: &TaskExecutor,
snapshotter: Snapshotter<DB>,
) -> eyre::Result<Pipeline<DB>>
where
DB: Database + Unpin + Clone + 'static,
Expand Down Expand Up @@ -123,6 +126,7 @@ impl Command {
header_downloader,
body_downloader,
factory.clone(),
snapshotter,
)?
.set(SenderRecoveryStage {
commit_threshold: stage_conf.sender_recovery.commit_threshold,
Expand Down Expand Up @@ -223,6 +227,14 @@ impl Command {
)
.await?;

let snapshotter = Snapshotter::new(
provider_factory.clone(),
provider_factory
.snapshot_provider()
.expect("snapshot provider initialized via provider factory"),
PruneModes::default(),
);

// Configure the pipeline
let fetch_client = network.fetch_client().await?;
let mut pipeline = self.build_pipeline(
Expand All @@ -231,6 +243,7 @@ impl Command {
Arc::clone(&consensus),
provider_factory.clone(),
&ctx.task_executor,
snapshotter,
)?;

let provider = provider_factory.provider()?;
Expand Down
21 changes: 19 additions & 2 deletions bin/reth/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ use reth_downloaders::{
headers::reverse_headers::ReverseHeadersDownloaderBuilder,
};
use reth_interfaces::consensus::Consensus;
use reth_primitives::{stage::StageId, ChainSpec, B256};
use reth_primitives::{stage::StageId, ChainSpec, PruneModes, B256};
use reth_provider::{HeaderSyncMode, ProviderFactory, StageCheckpointReader};
use reth_snapshot::Snapshotter;
use reth_stages::{
prelude::*,
stages::{ExecutionStage, ExecutionStageThresholds, SenderRecoveryStage},
Expand Down Expand Up @@ -110,8 +111,22 @@ impl ImportCommand {
let tip = file_client.tip().expect("file client has no tip");
info!(target: "reth::cli", "Chain file imported");

let snapshotter = Snapshotter::new(
provider_factory.clone(),
provider_factory
.snapshot_provider()
.expect("snapshot provider initialized via provider factory"),
PruneModes::default(),
);

let (mut pipeline, events) = self
.build_import_pipeline(config, provider_factory.clone(), &consensus, file_client)
.build_import_pipeline(
config,
provider_factory.clone(),
&consensus,
file_client,
snapshotter,
)
.await?;

// override the tip
Expand Down Expand Up @@ -141,6 +156,7 @@ impl ImportCommand {
provider_factory: ProviderFactory<DB>,
consensus: &Arc<C>,
file_client: Arc<FileClient>,
snapshotter: Snapshotter<DB>,
) -> eyre::Result<(Pipeline<DB>, impl Stream<Item = NodeEvent>)>
where
DB: Database + Clone + Unpin + 'static,
Expand Down Expand Up @@ -174,6 +190,7 @@ impl ImportCommand {
header_downloader,
body_downloader,
factory.clone(),
snapshotter,
)?
.set(SenderRecoveryStage {
commit_threshold: config.stages.sender_recovery.commit_threshold,
Expand Down
15 changes: 2 additions & 13 deletions crates/consensus/beacon/src/engine/hooks/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ use crate::{
};
use futures::FutureExt;
use reth_db::database::Database;
use reth_interfaces::{RethError, RethResult};
use reth_interfaces::RethResult;
use reth_primitives::BlockNumber;
use reth_snapshot::{Snapshotter, SnapshotterError, SnapshotterWithResult};
use reth_snapshot::{Snapshotter, SnapshotterWithResult};
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
use tokio::sync::oneshot;
Expand Down Expand Up @@ -141,14 +141,3 @@ enum SnapshotterState<DB> {
/// Snapshotter is running and waiting for a response
Running(oneshot::Receiver<SnapshotterWithResult<DB>>),
}

impl From<SnapshotterError> for EngineHookError {
fn from(err: SnapshotterError) -> Self {
match err {
SnapshotterError::InconsistentData(_) => EngineHookError::Internal(Box::new(err)),
SnapshotterError::Interface(err) => err.into(),
SnapshotterError::Database(err) => RethError::Database(err).into(),
SnapshotterError::Provider(err) => RethError::Provider(err).into(),
}
}
}
30 changes: 18 additions & 12 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@ use crate::{
use reth_blockchain_tree::{
config::BlockchainTreeConfig, externals::TreeExternals, BlockchainTree, ShareableBlockchainTree,
};
use reth_db::{
test_utils::{create_test_rw_db, TempDatabase},
DatabaseEnv as DE,
};
use reth_db::{test_utils::TempDatabase, DatabaseEnv as DE};
type DatabaseEnv = TempDatabase<DE>;
use reth_downloaders::{
bodies::bodies::BodiesDownloaderBuilder,
Expand All @@ -26,15 +23,16 @@ use reth_node_builder::EthEngineTypes;
use reth_payload_builder::test_utils::spawn_test_payload_service;
use reth_primitives::{BlockNumber, ChainSpec, PruneModes, Receipt, B256, U256};
use reth_provider::{
providers::BlockchainProvider, test_utils::TestExecutorFactory, BlockExecutor,
BundleStateWithReceipts, ExecutorFactory, HeaderSyncMode, ProviderFactory,
PrunableBlockExecutor,
providers::BlockchainProvider,
test_utils::{create_test_provider_factory_with_chain_spec, TestExecutorFactory},
BlockExecutor, BundleStateWithReceipts, ExecutorFactory, HeaderSyncMode, PrunableBlockExecutor,
};
use reth_prune::Pruner;
use reth_revm::EvmProcessorFactory;
use reth_rpc_types::engine::{
CancunPayloadFields, ExecutionPayload, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
use reth_snapshot::Snapshotter;
use reth_stages::{sets::DefaultStages, test_utils::TestStages, ExecOutput, Pipeline, StageError};
use reth_tasks::TokioTaskExecutor;
use std::{collections::VecDeque, sync::Arc};
Expand Down Expand Up @@ -445,9 +443,8 @@ where
/// Builds the test consensus engine into a `TestConsensusEngine` and `TestEnv`.
pub fn build(self) -> (TestBeaconConsensusEngine<Client>, TestEnv<Arc<DatabaseEnv>>) {
reth_tracing::init_test_tracing();
let db = create_test_rw_db();
let provider_factory =
ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone());
create_test_provider_factory_with_chain_spec(self.base_config.chain_spec.clone());

let consensus: Arc<dyn Consensus> = match self.base_config.consensus {
TestConsensusConfig::Real => {
Expand Down Expand Up @@ -477,6 +474,14 @@ where
)),
};

let snapshotter = Snapshotter::new(
provider_factory.clone(),
provider_factory
.snapshot_provider()
.expect("snapshot provider initialized via provider factory"),
PruneModes::default(),
);

// Setup pipeline
let (tip_tx, tip_rx) = watch::channel(B256::default());
let mut pipeline = match self.base_config.pipeline_config {
Expand All @@ -494,12 +499,13 @@ where

Pipeline::builder().add_stages(
DefaultStages::new(
ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone()),
provider_factory.clone(),
HeaderSyncMode::Tip(tip_rx.clone()),
Arc::clone(&consensus),
header_downloader,
body_downloader,
executor_factory.clone(),
snapshotter,
)
.expect("should build"),
)
Expand All @@ -523,7 +529,7 @@ where
BlockchainProvider::with_latest(provider_factory.clone(), tree, latest);

let pruner = Pruner::new(
provider_factory,
provider_factory.clone(),
vec![],
5,
self.base_config.chain_spec.prune_delete_limit,
Expand Down Expand Up @@ -552,7 +558,7 @@ where
engine.sync.set_max_block(max_block)
}

(engine, TestEnv::new(db, tip_rx, handle))
(engine, TestEnv::new(provider_factory.db_ref().clone(), tip_rx, handle))
}
}

Expand Down
30 changes: 19 additions & 11 deletions crates/node-core/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ use reth_provider::{
use reth_prune::PrunerBuilder;
use reth_revm::EvmProcessorFactory;
use reth_rpc_engine_api::EngineApi;
use reth_snapshot::Snapshotter;
use reth_stages::{
prelude::*,
stages::{
Expand Down Expand Up @@ -616,6 +617,7 @@ impl NodeConfig {
metrics_tx: reth_stages::MetricEventsSender,
prune_config: Option<PruneConfig>,
max_block: Option<BlockNumber>,
snapshotter: Snapshotter<DB>,
) -> eyre::Result<Pipeline<DB>>
where
DB: Database + Unpin + Clone + 'static,
Expand All @@ -641,6 +643,7 @@ impl NodeConfig {
self.debug.continuous,
metrics_tx,
prune_config,
snapshotter,
)
.await?;

Expand Down Expand Up @@ -846,6 +849,7 @@ impl NodeConfig {
continuous: bool,
metrics_tx: reth_stages::MetricEventsSender,
prune_config: Option<PruneConfig>,
snapshotter: Snapshotter<DB>,
) -> eyre::Result<Pipeline<DB>>
where
DB: Database + Clone + 'static,
Expand Down Expand Up @@ -893,6 +897,7 @@ impl NodeConfig {
header_downloader,
body_downloader,
factory.clone(),
snapshotter,
)?
.set(SenderRecoveryStage {
commit_threshold: stage_config.sender_recovery.commit_threshold,
Expand Down Expand Up @@ -1133,6 +1138,18 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit
};
let max_block = self.config.max_block(&network_client, provider_factory.clone()).await?;

let mut hooks = EngineHooks::new();

let snapshotter = Snapshotter::new(
provider_factory.clone(),
provider_factory
.snapshot_provider()
.expect("snapshot provider initialized via provider factory"),
prune_config.clone().unwrap_or_default().segments,
);
hooks.add(SnapshotHook::new(snapshotter.clone(), Box::new(executor.clone())));
info!(target: "reth::cli", "Snapshotter initialized");

// Configure the pipeline
let (mut pipeline, client) = if self.config.dev.dev {
info!(target: "reth::cli", "Starting Reth in dev mode");
Expand Down Expand Up @@ -1160,6 +1177,7 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit
sync_metrics_tx,
prune_config.clone(),
max_block,
snapshotter,
)
.await?;

Expand All @@ -1181,6 +1199,7 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit
sync_metrics_tx,
prune_config.clone(),
max_block,
snapshotter,
)
.await?;

Expand All @@ -1190,7 +1209,6 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit
let pipeline_events = pipeline.events();

let initial_target = self.config.initial_pipeline_target(genesis_hash);
let mut hooks = EngineHooks::new();

let prune_config = prune_config.unwrap_or_default();
let mut pruner = PrunerBuilder::new(prune_config.clone())
Expand All @@ -1202,16 +1220,6 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit
hooks.add(PruneHook::new(pruner, Box::new(executor.clone())));
info!(target: "reth::cli", ?prune_config, "Pruner initialized");

let snapshotter = reth_snapshot::Snapshotter::new(
provider_factory.clone(),
provider_factory
.snapshot_provider()
.expect("snapshot provider initialized via provider factory"),
prune_config.segments,
);
hooks.add(SnapshotHook::new(snapshotter, Box::new(executor.clone())));
info!(target: "reth::cli", "Snapshotter initialized");

// Configure the consensus engine
let (beacon_consensus_engine, beacon_engine_handle) = BeaconConsensusEngine::with_channel(
client,
Expand Down
7 changes: 6 additions & 1 deletion crates/primitives/src/stage/id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
/// For custom stages, use [`StageId::Other`]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub enum StageId {
/// Snapshot stage in the process.
Snapshot,
/// Header stage in the process.
Headers,
/// Total difficulty stage in the process.
Expand Down Expand Up @@ -35,7 +37,8 @@ pub enum StageId {

impl StageId {
/// All supported Stages
pub const ALL: [StageId; 13] = [
pub const ALL: [StageId; 14] = [
StageId::Snapshot,
StageId::Headers,
StageId::TotalDifficulty,
StageId::Bodies,
Expand All @@ -54,6 +57,7 @@ impl StageId {
/// Return stage id formatted as string.
pub fn as_str(&self) -> &str {
match self {
StageId::Snapshot => "Snapshot",
StageId::Headers => "Headers",
StageId::TotalDifficulty => "TotalDifficulty",
StageId::Bodies => "Bodies",
Expand Down Expand Up @@ -94,6 +98,7 @@ mod tests {

#[test]
fn stage_id_as_string() {
assert_eq!(StageId::Snapshot.to_string(), "Snapshot");
assert_eq!(StageId::Headers.to_string(), "Headers");
assert_eq!(StageId::TotalDifficulty.to_string(), "TotalDifficulty");
assert_eq!(StageId::Bodies.to_string(), "Bodies");
Expand Down
1 change: 1 addition & 0 deletions crates/snapshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ tokio = { workspace = true, features = ["sync"] }
thiserror.workspace = true
tracing.workspace = true
clap = { workspace = true, features = ["derive"], optional = true }
rayon.workspace = true

[dev-dependencies]
# reth
Expand Down
Loading
Loading