Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make reth-stages independent of concrete DatabaseProvider #10934

Merged
merged 12 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/reth/src/commands/debug_cmd/in_memory_merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
.try_seal_with_senders()
.map_err(|_| BlockValidationError::SenderRecoveryError)?,
)?;
let mut storage_writer = UnifiedStorageWriter::from_database(&provider_rw);
let mut storage_writer = UnifiedStorageWriter::from_database(&provider_rw.0);
storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?;
let storage_lists = provider_rw.changed_storages_with_range(block.number..=block.number)?;
let storages = provider_rw.plain_state_storages(storage_lists)?;
Expand Down
7 changes: 4 additions & 3 deletions bin/reth/src/commands/debug_cmd/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ use reth_node_api::{NodeTypesWithDB, NodeTypesWithEngine};
use reth_node_ethereum::EthExecutorProvider;
use reth_primitives::BlockHashOrNumber;
use reth_provider::{
writer::UnifiedStorageWriter, BlockNumReader, BlockWriter, ChainSpecProvider, HeaderProvider,
LatestStateProviderRef, OriginalValuesKnown, ProviderError, ProviderFactory, StateWriter,
writer::UnifiedStorageWriter, BlockNumReader, BlockWriter, ChainSpecProvider,
DatabaseProviderFactory, HeaderProvider, LatestStateProviderRef, OriginalValuesKnown,
ProviderError, ProviderFactory, StateWriter, StaticFileProviderFactory,
};
use reth_revm::database::StateProviderDatabase;
use reth_stages::{
Expand Down Expand Up @@ -84,7 +85,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
let Environment { provider_factory, config, data_dir } =
self.env.init::<N>(AccessRights::RW)?;

let provider_rw = provider_factory.provider_rw()?;
let provider_rw = provider_factory.database_provider_rw()?;

// Configure and build network
let network_secret_path =
Expand Down
6 changes: 3 additions & 3 deletions crates/cli/commands/src/stage/dump/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_db_common::DbTool;
use reth_evm::{execute::BlockExecutorProvider, noop::NoopBlockExecutorProvider};
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{providers::StaticFileProvider, ProviderFactory};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderFactory, ProviderFactory};
use reth_stages::{stages::ExecutionStage, Stage, StageCheckpoint, UnwindInput};
use tracing::info;

Expand Down Expand Up @@ -135,7 +135,7 @@ fn unwind_and_copy<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
tip_block_number: u64,
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let provider = db_tool.provider_factory.provider_rw()?;
let provider = db_tool.provider_factory.database_provider_rw()?;

let mut exec_stage = ExecutionStage::new_with_executor(NoopBlockExecutorProvider::default());

Expand Down Expand Up @@ -175,7 +175,7 @@ where

let input =
reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) };
exec_stage.execute(&output_provider_factory.provider_rw()?, input)?;
exec_stage.execute(&output_provider_factory.database_provider_rw()?, input)?;

info!(target: "reth::cli", "Success");

Expand Down
6 changes: 3 additions & 3 deletions crates/cli/commands/src/stage/dump/hashing_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reth_db_api::{database::Database, table::TableImporter};
use reth_db_common::DbTool;
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{providers::StaticFileProvider, ProviderFactory};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderFactory, ProviderFactory};
use reth_stages::{stages::AccountHashingStage, Stage, StageCheckpoint, UnwindInput};
use tracing::info;

Expand Down Expand Up @@ -55,7 +55,7 @@ fn unwind_and_copy<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
tip_block_number: u64,
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let provider = db_tool.provider_factory.provider_rw()?;
let provider = db_tool.provider_factory.database_provider_rw()?;
let mut exec_stage = AccountHashingStage::default();

exec_stage.unwind(
Expand All @@ -81,7 +81,7 @@ fn dry_run<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage.");

let provider = output_provider_factory.provider_rw()?;
let provider = output_provider_factory.database_provider_rw()?;
let mut stage = AccountHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()
Expand Down
6 changes: 3 additions & 3 deletions crates/cli/commands/src/stage/dump/hashing_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use reth_db_api::{database::Database, table::TableImporter};
use reth_db_common::DbTool;
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{providers::StaticFileProvider, ProviderFactory};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderFactory, ProviderFactory};
use reth_stages::{stages::StorageHashingStage, Stage, StageCheckpoint, UnwindInput};
use tracing::info;

Expand Down Expand Up @@ -45,7 +45,7 @@ fn unwind_and_copy<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
tip_block_number: u64,
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let provider = db_tool.provider_factory.provider_rw()?;
let provider = db_tool.provider_factory.database_provider_rw()?;

let mut exec_stage = StorageHashingStage::default();

Expand Down Expand Up @@ -76,7 +76,7 @@ fn dry_run<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage.");

let provider = output_provider_factory.provider_rw()?;
let provider = output_provider_factory.database_provider_rw()?;
let mut stage = StorageHashingStage {
clean_threshold: 1, // Forces hashing from scratch
..Default::default()
Expand Down
6 changes: 3 additions & 3 deletions crates/cli/commands/src/stage/dump/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_evm::noop::NoopBlockExecutorProvider;
use reth_exex::ExExManagerHandle;
use reth_node_builder::{NodeTypesWithDB, NodeTypesWithDBAdapter};
use reth_node_core::dirs::{ChainPath, DataDirPath};
use reth_provider::{providers::StaticFileProvider, ProviderFactory};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderFactory, ProviderFactory};
use reth_prune::PruneModes;
use reth_stages::{
stages::{
Expand Down Expand Up @@ -73,7 +73,7 @@ fn unwind_and_copy<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
output_db: &DatabaseEnv,
) -> eyre::Result<()> {
let (from, to) = range;
let provider = db_tool.provider_factory.provider_rw()?;
let provider = db_tool.provider_factory.database_provider_rw()?;

let unwind = UnwindInput {
unwind_to: from,
Expand Down Expand Up @@ -150,7 +150,7 @@ fn dry_run<N: NodeTypesWithDB<ChainSpec = ChainSpec>>(
from: u64,
) -> eyre::Result<()> {
info!(target: "reth::cli", "Executing stage.");
let provider = output_provider_factory.provider_rw()?;
let provider = output_provider_factory.database_provider_rw()?;

let mut stage = MerkleStage::Execution {
// Forces updating the root instead of calculating from scratch
Expand Down
10 changes: 5 additions & 5 deletions crates/cli/commands/src/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ use reth_node_metrics::{
version::VersionInfo,
};
use reth_provider::{
writer::UnifiedStorageWriter, ChainSpecProvider, StageCheckpointReader, StageCheckpointWriter,
StaticFileProviderFactory,
writer::UnifiedStorageWriter, ChainSpecProvider, DatabaseProviderFactory,
StageCheckpointReader, StageCheckpointWriter, StaticFileProviderFactory,
};
use reth_stages::{
stages::{
Expand Down Expand Up @@ -117,7 +117,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
let Environment { provider_factory, config, data_dir } =
self.env.init::<N>(AccessRights::RW)?;

let mut provider_rw = provider_factory.provider_rw()?;
let mut provider_rw = provider_factory.database_provider_rw()?;

if let Some(listen_addr) = self.metrics {
info!(target: "reth::cli", "Starting metrics endpoint at {}", listen_addr);
Expand Down Expand Up @@ -333,7 +333,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
provider_rw,
provider_factory.static_file_provider(),
)?;
provider_rw = provider_factory.provider_rw()?;
provider_rw = provider_factory.database_provider_rw()?;
}
}
}
Expand All @@ -356,7 +356,7 @@ impl<C: ChainSpecParser<ChainSpec = ChainSpec>> Command<C> {
}
if self.commit {
UnifiedStorageWriter::commit(provider_rw, provider_factory.static_file_provider())?;
provider_rw = provider_factory.provider_rw()?;
provider_rw = provider_factory.database_provider_rw()?;
}

if done {
Expand Down
7 changes: 3 additions & 4 deletions crates/consensus/beacon/src/engine/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use reth_network_p2p::{
full_block::{FetchFullBlockFuture, FetchFullBlockRangeFuture, FullBlockClient},
BlockClient,
};
use reth_node_types::NodeTypesWithDB;
use reth_primitives::SealedBlock;
use reth_provider::providers::ProviderNodeTypes;
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
Expand All @@ -34,7 +33,7 @@ use tracing::trace;
/// database while the pipeline is still active.
pub(crate) struct EngineSyncController<N, Client>
where
N: NodeTypesWithDB,
N: ProviderNodeTypes,
Client: BlockClient,
{
/// A downloader that can download full blocks from the network.
Expand Down Expand Up @@ -394,14 +393,14 @@ pub(crate) enum EngineSyncEvent {
/// running, it acquires the write lock over the database. This means that we cannot forward to the
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
enum PipelineState<N: NodeTypesWithDB> {
enum PipelineState<N: ProviderNodeTypes> {
/// Pipeline is idle.
Idle(Option<Pipeline<N>>),
/// Pipeline is running and waiting for a response
Running(oneshot::Receiver<PipelineWithResult<N>>),
}

impl<N: NodeTypesWithDB> PipelineState<N> {
impl<N: ProviderNodeTypes> PipelineState<N> {
/// Returns `true` if the state matches idle.
const fn is_idle(&self) -> bool {
matches!(self, Self::Idle(_))
Expand Down
1 change: 0 additions & 1 deletion crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ reth-revm.workspace = true
reth-rpc-types.workspace = true
reth-stages-api.workspace = true
reth-tasks.workspace = true
reth-node-types.workspace = true
reth-trie.workspace = true
reth-trie-parallel.workspace = true

Expand Down
7 changes: 3 additions & 4 deletions crates/engine/tree/src/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
//! These modes are mutually exclusive and the node can only be in one mode at a time.

use futures::FutureExt;
use reth_node_types::NodeTypesWithDB;
use reth_provider::providers::ProviderNodeTypes;
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineTarget, PipelineWithResult};
use reth_tasks::TaskSpawner;
Expand Down Expand Up @@ -79,7 +78,7 @@ pub enum BackfillEvent {

/// Pipeline sync.
#[derive(Debug)]
pub struct PipelineSync<N: NodeTypesWithDB> {
pub struct PipelineSync<N: ProviderNodeTypes> {
/// The type that can spawn the pipeline task.
pipeline_task_spawner: Box<dyn TaskSpawner>,
/// The current state of the pipeline.
Expand Down Expand Up @@ -213,14 +212,14 @@ impl<N: ProviderNodeTypes> BackfillSync for PipelineSync<N> {
/// blockchain tree any messages that would result in database writes, since it would result in a
/// deadlock.
#[derive(Debug)]
enum PipelineState<N: NodeTypesWithDB> {
enum PipelineState<N: ProviderNodeTypes> {
/// Pipeline is idle.
Idle(Option<Pipeline<N>>),
/// Pipeline is running and waiting for a response
Running(oneshot::Receiver<PipelineWithResult<N>>),
}

impl<N: NodeTypesWithDB> PipelineState<N> {
impl<N: ProviderNodeTypes> PipelineState<N> {
/// Returns `true` if the state matches idle.
const fn is_idle(&self) -> bool {
matches!(self, Self::Idle(_))
Expand Down
8 changes: 4 additions & 4 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use reth_chain_state::ExecutedBlock;
use reth_errors::ProviderError;
use reth_primitives::BlockNumHash;
use reth_provider::{
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader, ProviderFactory,
StaticFileProviderFactory,
providers::ProviderNodeTypes, writer::UnifiedStorageWriter, BlockHashReader,
DatabaseProviderFactory, ProviderFactory, StaticFileProviderFactory,
};
use reth_prune::{PrunerError, PrunerOutput, PrunerWithFactory};
use reth_stages_api::{MetricEvent, MetricEventsSender};
Expand Down Expand Up @@ -103,7 +103,7 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
) -> Result<Option<BlockNumHash>, PersistenceError> {
debug!(target: "engine::persistence", ?new_tip_num, "Removing blocks");
let start_time = Instant::now();
let provider_rw = self.provider.provider_rw()?;
let provider_rw = self.provider.database_provider_rw()?;
let sf_provider = self.provider.static_file_provider();

let new_tip_hash = provider_rw.block_hash(new_tip_num)?;
Expand All @@ -126,7 +126,7 @@ impl<N: ProviderNodeTypes> PersistenceService<N> {
.map(|block| BlockNumHash { hash: block.block().hash(), number: block.block().number });

if last_block_hash_num.is_some() {
let provider_rw = self.provider.provider_rw()?;
let provider_rw = self.provider.database_provider_rw()?;
let static_file_provider = self.provider.static_file_provider();

UnifiedStorageWriter::from(&provider_rw, &static_file_provider).save_blocks(&blocks)?;
Expand Down
6 changes: 3 additions & 3 deletions crates/exex/exex/src/backfill/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use reth_primitives::{
};
use reth_provider::{
providers::ProviderNodeTypes, BlockWriter as _, ExecutionOutcome, LatestStateProviderRef,
ProviderFactory,
ProviderFactory, StaticFileProviderFactory,
};
use reth_revm::database::StateProviderDatabase;
use reth_testing_utils::generators::sign_tx_with_key_pair;
Expand Down Expand Up @@ -63,7 +63,7 @@ where
let mut block_execution_output = EthExecutorProvider::ethereum(chain_spec)
.executor(StateProviderDatabase::new(LatestStateProviderRef::new(
provider.tx_ref(),
provider.static_file_provider().clone(),
provider.static_file_provider(),
)))
.execute(BlockExecutionInput { block, total_difficulty: U256::ZERO })?;
block_execution_output.state.reverts.sort();
Expand Down Expand Up @@ -187,7 +187,7 @@ where

let executor =
EthExecutorProvider::ethereum(chain_spec).batch_executor(StateProviderDatabase::new(
LatestStateProviderRef::new(provider.tx_ref(), provider.static_file_provider().clone()),
LatestStateProviderRef::new(provider.tx_ref(), provider.static_file_provider()),
));

let mut execution_outcome = executor.execute_and_verify_batch(vec![
Expand Down
4 changes: 2 additions & 2 deletions crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use reth_prune::{PruneModes, PrunerBuilder};
use reth_rpc_api::clients::EthApiClient;
use reth_rpc_builder::config::RethRpcServerConfig;
use reth_rpc_layer::JwtSecret;
use reth_stages::{sets::DefaultStages, MetricEvent, Pipeline, PipelineTarget, StageId};
use reth_stages::{sets::DefaultStages, MetricEvent, PipelineBuilder, PipelineTarget, StageId};
use reth_static_file::StaticFileProducer;
use reth_tasks::TaskExecutor;
use reth_tracing::tracing::{debug, error, info, warn};
Expand Down Expand Up @@ -430,7 +430,7 @@ where
let (_tip_tx, tip_rx) = watch::channel(B256::ZERO);

// Builds an unwind-only pipeline
let pipeline = Pipeline::<N>::builder()
let pipeline = PipelineBuilder::default()
.add_stages(DefaultStages::new(
factory.clone(),
tip_rx,
Expand Down
2 changes: 0 additions & 2 deletions crates/stages/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ workspace = true
# reth
reth-primitives-traits.workspace = true
reth-provider.workspace = true
reth-db-api.workspace = true
reth-static-file.workspace = true
reth-network-p2p.workspace = true
reth-tokio-util.workspace = true
Expand All @@ -23,7 +22,6 @@ reth-prune.workspace = true
reth-errors.workspace = true
reth-stages-types.workspace = true
reth-static-file-types.workspace = true
reth-node-types.workspace = true

alloy-primitives.workspace = true

Expand Down
Loading
Loading