From f461daab06c61d94188f60dc2e79bd9d73b1f5fe Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 2 Feb 2024 13:37:52 +0000 Subject: [PATCH 1/4] refactor(storage): make snapshot provider non-optional in provider factory --- bin/reth/src/commands/db/clear.rs | 4 +- bin/reth/src/commands/db/mod.rs | 32 +++--- bin/reth/src/commands/db/snapshots/bench.rs | 9 +- bin/reth/src/commands/db/snapshots/headers.rs | 34 ++---- bin/reth/src/commands/db/snapshots/mod.rs | 37 +++--- .../src/commands/db/snapshots/receipts.rs | 33 ++---- .../src/commands/db/snapshots/transactions.rs | 35 ++---- .../src/commands/debug_cmd/build_block.rs | 12 +- bin/reth/src/commands/debug_cmd/execution.rs | 14 ++- .../commands/debug_cmd/in_memory_merkle.rs | 8 +- bin/reth/src/commands/debug_cmd/merkle.rs | 8 +- .../src/commands/debug_cmd/replay_engine.rs | 9 +- bin/reth/src/commands/import.rs | 8 +- bin/reth/src/commands/init_cmd.rs | 4 +- bin/reth/src/commands/p2p/mod.rs | 6 +- .../src/commands/recover/storage_tries.rs | 3 +- bin/reth/src/commands/stage/drop.rs | 9 +- bin/reth/src/commands/stage/dump/execution.rs | 34 +++--- .../commands/stage/dump/hashing_account.rs | 28 +++-- .../commands/stage/dump/hashing_storage.rs | 28 +++-- bin/reth/src/commands/stage/dump/merkle.rs | 28 +++-- bin/reth/src/commands/stage/dump/mod.rs | 59 +++++++--- bin/reth/src/commands/stage/run.rs | 11 +- bin/reth/src/commands/stage/unwind.rs | 3 +- crates/consensus/beacon/src/engine/mod.rs | 108 ++++++++++++++---- .../consensus/beacon/src/engine/test_utils.rs | 4 +- crates/net/downloaders/src/bodies/bodies.rs | 14 +-- crates/net/downloaders/src/bodies/task.rs | 8 +- crates/net/downloaders/src/file_client.rs | 8 +- crates/node-core/src/node_config.rs | 12 +- crates/prune/src/pruner.rs | 43 ++++--- crates/snapshot/src/snapshotter.rs | 8 +- crates/stages/src/stages/bodies.rs | 18 ++- crates/stages/src/stages/execution.rs | 51 +++++++-- crates/stages/src/test_utils/test_db.rs | 18 ++- .../provider/src/providers/database/mod.rs | 53 +++++---- .../src/providers/database/provider.rs | 99 ++++++++-------- crates/storage/provider/src/test_utils/mod.rs | 5 +- examples/db-access.rs | 6 +- examples/rpc-db/src/main.rs | 9 +- testing/ef-tests/src/cases/blockchain_test.rs | 12 +- 41 files changed, 536 insertions(+), 396 deletions(-) diff --git a/bin/reth/src/commands/db/clear.rs b/bin/reth/src/commands/db/clear.rs index 12f4437ba69b..419d0a9faf70 100644 --- a/bin/reth/src/commands/db/clear.rs +++ b/bin/reth/src/commands/db/clear.rs @@ -24,9 +24,7 @@ impl Command { table.view(&ClearViewer { db: provider_factory.db_ref() })? } Subcommands::Snapshot { segment } => { - let snapshot_provider = provider_factory - .snapshot_provider() - .expect("snapshot provider initialized via provider factory"); + let snapshot_provider = provider_factory.snapshot_provider(); let snapshots = iter_snapshots(snapshot_provider.directory())?; if let Some(segment_snapshots) = snapshots.get(&segment) { diff --git a/bin/reth/src/commands/db/mod.rs b/bin/reth/src/commands/db/mod.rs index d0c4f7407d35..3343a61402ad 100644 --- a/bin/reth/src/commands/db/mod.rs +++ b/bin/reth/src/commands/db/mod.rs @@ -104,8 +104,9 @@ impl Command { &db_path, DatabaseArguments::default().log_level(self.db.log_level), )?; - let provider_factory = ProviderFactory::new(db, self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let provider_factory = + ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?; + let tool = DbTool::new(provider_factory, self.chain.clone())?; command.execute(data_dir, &tool)?; } @@ -114,8 +115,9 @@ impl Command { &db_path, DatabaseArguments::default().log_level(self.db.log_level), )?; - let provider_factory = ProviderFactory::new(db, self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let provider_factory = + ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?; + let tool = DbTool::new(provider_factory, self.chain.clone())?; command.execute(&tool)?; } @@ -124,8 +126,9 @@ impl Command { &db_path, DatabaseArguments::default().log_level(self.db.log_level), )?; - let provider_factory = ProviderFactory::new(db, self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let provider_factory = + ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?; + let tool = DbTool::new(provider_factory, self.chain.clone())?; command.execute(&tool)?; } @@ -134,8 +137,9 @@ impl Command { &db_path, DatabaseArguments::default().log_level(self.db.log_level), )?; - let provider_factory = ProviderFactory::new(db, self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let provider_factory = + ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?; + let tool = DbTool::new(provider_factory, self.chain.clone())?; command.execute(&tool)?; } @@ -157,20 +161,22 @@ impl Command { let db = open_db(&db_path, DatabaseArguments::default().log_level(self.db.log_level))?; - let provider_factory = ProviderFactory::new(db, self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let provider_factory = + ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?; + let mut tool = DbTool::new(provider_factory, self.chain.clone())?; tool.drop(db_path)?; } Subcommands::Clear(command) => { let db = open_db(&db_path, DatabaseArguments::default().log_level(self.db.log_level))?; - let provider_factory = ProviderFactory::new(db, self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let provider_factory = + ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?; + command.execute(provider_factory)?; } Subcommands::Snapshot(command) => { - command.execute(&db_path, self.db.log_level, self.chain.clone())?; + command.execute(data_dir, self.db.log_level, self.chain.clone())?; } Subcommands::Version => { let local_db_version = match get_db_version(&db_path) { diff --git a/bin/reth/src/commands/db/snapshots/bench.rs b/bin/reth/src/commands/db/snapshots/bench.rs index 928898205f07..aca76280e65b 100644 --- a/bin/reth/src/commands/db/snapshots/bench.rs +++ b/bin/reth/src/commands/db/snapshots/bench.rs @@ -1,7 +1,7 @@ use reth_db::DatabaseEnv; use reth_primitives::{ snapshot::{Compression, Filters}, - ChainSpec, SnapshotSegment, + SnapshotSegment, }; use reth_provider::{DatabaseProviderRO, ProviderFactory}; use std::{fmt::Debug, sync::Arc, time::Instant}; @@ -16,7 +16,7 @@ pub(crate) enum BenchKind { pub(crate) fn bench( bench_kind: BenchKind, - db: (DatabaseEnv, Arc), + provider_factory: Arc>, segment: SnapshotSegment, filters: Filters, compression: Compression, @@ -28,8 +28,6 @@ where F2: Fn(DatabaseProviderRO) -> eyre::Result, R: Debug + PartialEq, { - let (db, chain) = db; - println!(); println!("############"); println!("## [{segment:?}] [{compression:?}] [{filters:?}] [{bench_kind:?}]"); @@ -42,8 +40,7 @@ where }; let db_result = { - let factory = ProviderFactory::new(db, chain); - let provider = factory.provider()?; + let provider = provider_factory.provider()?; let start = Instant::now(); let result = database_method(provider)?; let end = start.elapsed().as_micros(); diff --git a/bin/reth/src/commands/db/snapshots/headers.rs b/bin/reth/src/commands/db/snapshots/headers.rs index 8d983fa0a391..69cd600f5254 100644 --- a/bin/reth/src/commands/db/snapshots/headers.rs +++ b/bin/reth/src/commands/db/snapshots/headers.rs @@ -3,34 +3,25 @@ use super::{ Command, }; use rand::{seq::SliceRandom, Rng}; -use reth_db::{mdbx::DatabaseArguments, open_db_read_only, snapshot::HeaderMask}; -use reth_interfaces::db::LogLevel; +use reth_db::{snapshot::HeaderMask, DatabaseEnv}; use reth_primitives::{ snapshot::{Compression, Filters, InclusionFilter, PerfectHashingFunction}, - BlockHash, ChainSpec, Header, SnapshotSegment, + BlockHash, Header, SnapshotSegment, }; use reth_provider::{ providers::SnapshotProvider, BlockNumReader, HeaderProvider, ProviderError, ProviderFactory, }; -use std::{ - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{path::PathBuf, sync::Arc}; impl Command { pub(crate) fn bench_headers_snapshot( &self, - db_path: &Path, - log_level: Option, - chain: Arc, + provider_factory: Arc>, compression: Compression, inclusion_filter: InclusionFilter, phf: Option, ) -> eyre::Result<()> { - let db_args = DatabaseArguments::default().log_level(log_level); - - let factory = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone()); - let provider = factory.provider()?; + let provider = provider_factory.provider()?; let tip = provider.last_block_number()?; let block_range = self.block_ranges(tip).first().expect("has been generated before").clone(); @@ -58,7 +49,7 @@ impl Command { for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] { bench( bench_kind, - (open_db_read_only(db_path, db_args)?, chain.clone()), + provider_factory.clone(), SnapshotSegment::Headers, filters, compression, @@ -89,7 +80,7 @@ impl Command { let num = row_indexes[rng.gen_range(0..row_indexes.len())]; bench( BenchKind::RandomOne, - (open_db_read_only(db_path, db_args)?, chain.clone()), + provider_factory.clone(), SnapshotSegment::Headers, filters, compression, @@ -109,15 +100,14 @@ impl Command { // BENCHMARK QUERYING A RANDOM HEADER BY HASH { let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64; - let header_hash = - ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone()) - .header_by_number(num)? - .ok_or(ProviderError::HeaderNotFound(num.into()))? - .hash_slow(); + let header_hash = provider_factory + .header_by_number(num)? + .ok_or(ProviderError::HeaderNotFound(num.into()))? + .hash_slow(); bench( BenchKind::RandomHash, - (open_db_read_only(db_path, db_args)?, chain.clone()), + provider_factory.clone(), SnapshotSegment::Headers, filters, compression, diff --git a/bin/reth/src/commands/db/snapshots/mod.rs b/bin/reth/src/commands/db/snapshots/mod.rs index 988c14d00f73..8ab8c6dec6ee 100644 --- a/bin/reth/src/commands/db/snapshots/mod.rs +++ b/bin/reth/src/commands/db/snapshots/mod.rs @@ -9,6 +9,7 @@ use reth_db::{ }; use reth_interfaces::db::LogLevel; use reth_nippy_jar::{NippyJar, NippyJarCursor}; +use reth_node_core::dirs::{ChainPath, DataDirPath}; use reth_primitives::{ snapshot::{ Compression, Filters, InclusionFilter, PerfectHashingFunction, SegmentConfig, SegmentHeader, @@ -81,7 +82,7 @@ impl Command { /// Execute `db snapshot` command pub fn execute( self, - db_path: &Path, + data_dir: ChainPath, log_level: Option, chain: Arc, ) -> eyre::Result<()> { @@ -95,14 +96,16 @@ impl Command { self.phf.iter().copied().map(Some).collect::>() }); - { - let db = open_db_read_only( - db_path, - DatabaseArguments::default() - .max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)), - )?; - let factory = Arc::new(ProviderFactory::new(db, chain.clone())); + let db = open_db_read_only( + data_dir.db_path().as_path(), + DatabaseArguments::default() + .log_level(log_level) + .max_read_transaction_duration(Some(MaxReadTransactionDuration::Unbounded)), + )?; + let provider_factory = + Arc::new(ProviderFactory::new(db, chain.clone(), data_dir.snapshots_path())?); + { if !self.only_bench { for ((mode, compression), phf) in all_combinations.clone() { let filters = if let Some(phf) = self.with_filters.then_some(phf).flatten() { @@ -113,17 +116,17 @@ impl Command { match mode { SnapshotSegment::Headers => self.generate_snapshot::( - factory.clone(), + provider_factory.clone(), snap_segments::Headers, SegmentConfig { filters, compression }, )?, SnapshotSegment::Transactions => self.generate_snapshot::( - factory.clone(), + provider_factory.clone(), snap_segments::Transactions, SegmentConfig { filters, compression }, )?, SnapshotSegment::Receipts => self.generate_snapshot::( - factory.clone(), + provider_factory.clone(), snap_segments::Receipts, SegmentConfig { filters, compression }, )?, @@ -136,25 +139,19 @@ impl Command { for ((mode, compression), phf) in all_combinations { match mode { SnapshotSegment::Headers => self.bench_headers_snapshot( - db_path, - log_level, - chain.clone(), + provider_factory.clone(), compression, InclusionFilter::Cuckoo, phf, )?, SnapshotSegment::Transactions => self.bench_transactions_snapshot( - db_path, - log_level, - chain.clone(), + provider_factory.clone(), compression, InclusionFilter::Cuckoo, phf, )?, SnapshotSegment::Receipts => self.bench_receipts_snapshot( - db_path, - log_level, - chain.clone(), + provider_factory.clone(), compression, InclusionFilter::Cuckoo, phf, diff --git a/bin/reth/src/commands/db/snapshots/receipts.rs b/bin/reth/src/commands/db/snapshots/receipts.rs index 37c19ebaf65f..7e9c19aa48f8 100644 --- a/bin/reth/src/commands/db/snapshots/receipts.rs +++ b/bin/reth/src/commands/db/snapshots/receipts.rs @@ -3,37 +3,27 @@ use super::{ Command, Compression, PerfectHashingFunction, }; use rand::{seq::SliceRandom, Rng}; -use reth_db::{open_db_read_only, snapshot::ReceiptMask}; -use reth_interfaces::db::LogLevel; +use reth_db::{snapshot::ReceiptMask, DatabaseEnv}; use reth_primitives::{ snapshot::{Filters, InclusionFilter}, - ChainSpec, Receipt, SnapshotSegment, + Receipt, SnapshotSegment, }; use reth_provider::{ providers::SnapshotProvider, BlockNumReader, ProviderError, ProviderFactory, ReceiptProvider, TransactionsProvider, TransactionsProviderExt, }; -use reth_db::mdbx::DatabaseArguments; -use std::{ - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{path::PathBuf, sync::Arc}; impl Command { pub(crate) fn bench_receipts_snapshot( &self, - db_path: &Path, - log_level: Option, - chain: Arc, + provider_factory: Arc>, compression: Compression, inclusion_filter: InclusionFilter, phf: Option, ) -> eyre::Result<()> { - let db_args = DatabaseArguments::default().log_level(log_level); - - let factory = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone()); - let provider = factory.provider()?; + let provider = provider_factory.provider()?; let tip = provider.last_block_number()?; let block_range = self.block_ranges(tip).first().expect("has been generated before").clone(); @@ -46,9 +36,8 @@ impl Command { let mut rng = rand::thread_rng(); - let tx_range = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone()) - .provider()? - .transaction_range_by_block_range(block_range.clone())?; + let tx_range = + provider_factory.provider()?.transaction_range_by_block_range(block_range.clone())?; let mut row_indexes = tx_range.clone().collect::>(); @@ -67,7 +56,7 @@ impl Command { for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] { bench( bench_kind, - (open_db_read_only(db_path, db_args)?, chain.clone()), + provider_factory.clone(), SnapshotSegment::Receipts, filters, compression, @@ -98,7 +87,7 @@ impl Command { let num = row_indexes[rng.gen_range(0..row_indexes.len())]; bench( BenchKind::RandomOne, - (open_db_read_only(db_path, db_args)?, chain.clone()), + provider_factory.clone(), SnapshotSegment::Receipts, filters, compression, @@ -118,14 +107,14 @@ impl Command { // BENCHMARK QUERYING A RANDOM RECEIPT BY HASH { let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64; - let tx_hash = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone()) + let tx_hash = provider_factory .transaction_by_id(num)? .ok_or(ProviderError::ReceiptNotFound(num.into()))? .hash(); bench( BenchKind::RandomHash, - (open_db_read_only(db_path, db_args)?, chain.clone()), + provider_factory, SnapshotSegment::Receipts, filters, compression, diff --git a/bin/reth/src/commands/db/snapshots/transactions.rs b/bin/reth/src/commands/db/snapshots/transactions.rs index a64b0bd1b45a..9c9bf7a44d79 100644 --- a/bin/reth/src/commands/db/snapshots/transactions.rs +++ b/bin/reth/src/commands/db/snapshots/transactions.rs @@ -3,37 +3,27 @@ use super::{ Command, Compression, PerfectHashingFunction, }; use rand::{seq::SliceRandom, Rng}; -use reth_db::{open_db_read_only, snapshot::TransactionMask}; -use reth_interfaces::db::LogLevel; +use reth_db::{snapshot::TransactionMask, DatabaseEnv}; use reth_primitives::{ snapshot::{Filters, InclusionFilter}, - ChainSpec, SnapshotSegment, TransactionSignedNoHash, + SnapshotSegment, TransactionSignedNoHash, }; use reth_provider::{ providers::SnapshotProvider, BlockNumReader, ProviderError, ProviderFactory, TransactionsProvider, TransactionsProviderExt, }; -use reth_db::mdbx::DatabaseArguments; -use std::{ - path::{Path, PathBuf}, - sync::Arc, -}; +use std::{path::PathBuf, sync::Arc}; impl Command { pub(crate) fn bench_transactions_snapshot( &self, - db_path: &Path, - log_level: Option, - chain: Arc, + provider_factory: Arc>, compression: Compression, inclusion_filter: InclusionFilter, phf: Option, ) -> eyre::Result<()> { - let db_args = DatabaseArguments::default().log_level(log_level); - - let factory = ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone()); - let provider = factory.provider()?; + let provider = provider_factory.provider()?; let tip = provider.last_block_number()?; let block_range = self.block_ranges(tip).first().expect("has been generated before").clone(); @@ -64,7 +54,7 @@ impl Command { for bench_kind in [BenchKind::Walk, BenchKind::RandomAll] { bench( bench_kind, - (open_db_read_only(db_path, db_args)?, chain.clone()), + provider_factory.clone(), SnapshotSegment::Transactions, filters, compression, @@ -96,7 +86,7 @@ impl Command { let num = row_indexes[rng.gen_range(0..row_indexes.len())]; bench( BenchKind::RandomOne, - (open_db_read_only(db_path, db_args)?, chain.clone()), + provider_factory.clone(), SnapshotSegment::Transactions, filters, compression, @@ -117,15 +107,14 @@ impl Command { // BENCHMARK QUERYING A RANDOM TRANSACTION BY HASH { let num = row_indexes[rng.gen_range(0..row_indexes.len())] as u64; - let transaction_hash = - ProviderFactory::new(open_db_read_only(db_path, db_args)?, chain.clone()) - .transaction_by_id(num)? - .ok_or(ProviderError::TransactionNotFound(num.into()))? - .hash(); + let transaction_hash = provider_factory + .transaction_by_id(num)? + .ok_or(ProviderError::TransactionNotFound(num.into()))? + .hash(); bench( BenchKind::RandomHash, - (open_db_read_only(db_path, db_args)?, chain.clone()), + provider_factory, SnapshotSegment::Transactions, filters, compression, diff --git a/bin/reth/src/commands/debug_cmd/build_block.rs b/bin/reth/src/commands/debug_cmd/build_block.rs index 0608c357072c..beeac1918915 100644 --- a/bin/reth/src/commands/debug_cmd/build_block.rs +++ b/bin/reth/src/commands/debug_cmd/build_block.rs @@ -113,7 +113,11 @@ impl Command { /// /// If the database is empty, returns the genesis block. fn lookup_best_block(&self, db: Arc) -> RethResult> { - let factory = ProviderFactory::new(db, self.chain.clone()); + let factory = ProviderFactory::new( + db, + self.chain.clone(), + self.datadir.unwrap_or_chain_default(self.chain.chain).snapshots_path(), + )?; let provider = factory.provider()?; let best_number = @@ -152,7 +156,11 @@ impl Command { // initialize the database let db = Arc::new(init_db(db_path, DatabaseArguments::default().log_level(self.db.log_level))?); - let provider_factory = ProviderFactory::new(Arc::clone(&db), Arc::clone(&self.chain)); + let provider_factory = ProviderFactory::new( + Arc::clone(&db), + Arc::clone(&self.chain), + data_dir.snapshots_path(), + )?; let consensus: Arc = Arc::new(BeaconConsensus::new(Arc::clone(&self.chain))); diff --git a/bin/reth/src/commands/debug_cmd/execution.rs b/bin/reth/src/commands/debug_cmd/execution.rs index fe99203cfdb6..2525664bffdc 100644 --- a/bin/reth/src/commands/debug_cmd/execution.rs +++ b/bin/reth/src/commands/debug_cmd/execution.rs @@ -170,7 +170,11 @@ impl Command { self.network.discovery.addr, self.network.discovery.port, ))) - .build(ProviderFactory::new(db, self.chain.clone())) + .build(ProviderFactory::new( + db, + self.chain.clone(), + self.datadir.unwrap_or_chain_default(self.chain.chain).snapshots_path(), + )?) .start_network() .await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); @@ -206,8 +210,8 @@ impl Command { fs::create_dir_all(&db_path)?; let db = Arc::new(init_db(db_path, DatabaseArguments::default().log_level(self.db.log_level))?); - let provider_factory = ProviderFactory::new(db.clone(), self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let provider_factory = + ProviderFactory::new(db.clone(), self.chain.clone(), data_dir.snapshots_path())?; debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis"); init_genesis(provider_factory.clone(), self.chain.clone())?; @@ -229,9 +233,7 @@ impl Command { let snapshotter = Snapshotter::new( provider_factory.clone(), - provider_factory - .snapshot_provider() - .expect("snapshot provider initialized via provider factory"), + provider_factory.snapshot_provider(), PruneModes::default(), ); diff --git a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs index 357eb31b6393..e344823ac754 100644 --- a/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs +++ b/bin/reth/src/commands/debug_cmd/in_memory_merkle.rs @@ -94,7 +94,11 @@ impl Command { self.network.discovery.addr, self.network.discovery.port, ))) - .build(ProviderFactory::new(db, self.chain.clone())) + .build(ProviderFactory::new( + db, + self.chain.clone(), + self.datadir.unwrap_or_chain_default(self.chain.chain).snapshots_path(), + )?) .start_network() .await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); @@ -114,7 +118,7 @@ impl Command { // initialize the database let db = Arc::new(init_db(db_path, DatabaseArguments::default().log_level(self.db.log_level))?); - let factory = ProviderFactory::new(&db, self.chain.clone()); + let factory = ProviderFactory::new(&db, self.chain.clone(), data_dir.snapshots_path())?; let provider = factory.provider()?; // Look up merkle checkpoint diff --git a/bin/reth/src/commands/debug_cmd/merkle.rs b/bin/reth/src/commands/debug_cmd/merkle.rs index 2abcf8fe810c..70a5079ebb83 100644 --- a/bin/reth/src/commands/debug_cmd/merkle.rs +++ b/bin/reth/src/commands/debug_cmd/merkle.rs @@ -105,7 +105,11 @@ impl Command { self.network.discovery.addr, self.network.discovery.port, ))) - .build(ProviderFactory::new(db, self.chain.clone())) + .build(ProviderFactory::new( + db, + self.chain.clone(), + self.datadir.unwrap_or_chain_default(self.chain.chain).snapshots_path(), + )?) .start_network() .await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); @@ -125,7 +129,7 @@ impl Command { // initialize the database let db = Arc::new(init_db(db_path, DatabaseArguments::default().log_level(self.db.log_level))?); - let factory = ProviderFactory::new(&db, self.chain.clone()); + let factory = ProviderFactory::new(&db, self.chain.clone(), data_dir.snapshots_path())?; let provider_rw = factory.provider_rw()?; // Configure and build network diff --git a/bin/reth/src/commands/debug_cmd/replay_engine.rs b/bin/reth/src/commands/debug_cmd/replay_engine.rs index 4a5d9978ba89..0090e01938bb 100644 --- a/bin/reth/src/commands/debug_cmd/replay_engine.rs +++ b/bin/reth/src/commands/debug_cmd/replay_engine.rs @@ -101,7 +101,11 @@ impl Command { self.network.discovery.addr, self.network.discovery.port, ))) - .build(ProviderFactory::new(db, self.chain.clone())) + .build(ProviderFactory::new( + db, + self.chain.clone(), + self.datadir.unwrap_or_chain_default(self.chain.chain).snapshots_path(), + )?) .start_network() .await?; info!(target: "reth::cli", peer_id = %network.peer_id(), local_addr = %network.local_addr(), "Connected to P2P network"); @@ -121,7 +125,8 @@ impl Command { // Initialize the database let db = Arc::new(init_db(db_path, DatabaseArguments::default().log_level(self.db.log_level))?); - let provider_factory = ProviderFactory::new(db.clone(), self.chain.clone()); + let provider_factory = + ProviderFactory::new(db.clone(), self.chain.clone(), data_dir.snapshots_path())?; let consensus: Arc = Arc::new(BeaconConsensus::new(Arc::clone(&self.chain))); diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index ebc5e92eaa2f..dc9214f5c3ac 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -93,8 +93,8 @@ impl ImportCommand { let db = Arc::new(init_db(db_path, DatabaseArguments::default().log_level(self.db.log_level))?); info!(target: "reth::cli", "Database opened"); - let provider_factory = ProviderFactory::new(db.clone(), self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let provider_factory = + ProviderFactory::new(db.clone(), self.chain.clone(), data_dir.snapshots_path())?; debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis"); @@ -113,9 +113,7 @@ impl ImportCommand { let snapshotter = Snapshotter::new( provider_factory.clone(), - provider_factory - .snapshot_provider() - .expect("snapshot provider initialized via provider factory"), + provider_factory.snapshot_provider(), PruneModes::default(), ); diff --git a/bin/reth/src/commands/init_cmd.rs b/bin/reth/src/commands/init_cmd.rs index 65cfcd038b19..59bf467c6947 100644 --- a/bin/reth/src/commands/init_cmd.rs +++ b/bin/reth/src/commands/init_cmd.rs @@ -57,8 +57,8 @@ impl InitCommand { Arc::new(init_db(&db_path, DatabaseArguments::default().log_level(self.db.log_level))?); info!(target: "reth::cli", "Database opened"); - let provider_factory = ProviderFactory::new(db.clone(), self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let provider_factory = + ProviderFactory::new(db.clone(), self.chain.clone(), data_dir.snapshots_path())?; info!(target: "reth::cli", "Writing genesis block"); let hash = init_genesis(provider_factory, self.chain)?; diff --git a/bin/reth/src/commands/p2p/mod.rs b/bin/reth/src/commands/p2p/mod.rs index f307b0dcd0dd..eb04e126d8b7 100644 --- a/bin/reth/src/commands/p2p/mod.rs +++ b/bin/reth/src/commands/p2p/mod.rs @@ -131,7 +131,11 @@ impl Command { network_config_builder = self.discovery.apply_to_builder(network_config_builder); let network = network_config_builder - .build(Arc::new(ProviderFactory::new(noop_db, self.chain.clone()))) + .build(Arc::new(ProviderFactory::new( + noop_db, + self.chain.clone(), + data_dir.snapshots_path(), + )?)) .start_network() .await?; diff --git a/bin/reth/src/commands/recover/storage_tries.rs b/bin/reth/src/commands/recover/storage_tries.rs index a8926decd853..de19c30ebcbf 100644 --- a/bin/reth/src/commands/recover/storage_tries.rs +++ b/bin/reth/src/commands/recover/storage_tries.rs @@ -50,8 +50,7 @@ impl Command { fs::create_dir_all(&db_path)?; let db = Arc::new(init_db(db_path, Default::default())?); - let factory = ProviderFactory::new(&db, self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let factory = ProviderFactory::new(&db, self.chain.clone(), data_dir.snapshots_path())?; debug!(target: "reth::cli", chain=%self.chain.chain, genesis=?self.chain.genesis_hash(), "Initializing genesis"); init_genesis(factory.clone(), self.chain.clone())?; diff --git a/bin/reth/src/commands/stage/drop.rs b/bin/reth/src/commands/stage/drop.rs index 8a7f0d9a9c70..9ecbd710e382 100644 --- a/bin/reth/src/commands/stage/drop.rs +++ b/bin/reth/src/commands/stage/drop.rs @@ -60,8 +60,8 @@ impl Command { let db = open_db(db_path.as_ref(), DatabaseArguments::default().log_level(self.db.log_level))?; - let provider_factory = ProviderFactory::new(db, self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let provider_factory = + ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?; let tool = DbTool::new(provider_factory, self.chain.clone())?; @@ -188,10 +188,7 @@ impl Command { }; if let Some(snapshot_segment) = snapshot_segment { - let snapshot_provider = tool - .provider_factory - .snapshot_provider() - .expect("snapshot provider initialized via provider factory"); + let snapshot_provider = tool.provider_factory.snapshot_provider(); let snapshots = iter_snapshots(snapshot_provider.directory())?; if let Some(segment_snapshots) = snapshots.get(&snapshot_segment) { for (block_range, _) in segment_snapshots { diff --git a/bin/reth/src/commands/stage/dump/execution.rs b/bin/reth/src/commands/stage/dump/execution.rs index d9a9fb44d835..1f0d46049808 100644 --- a/bin/reth/src/commands/stage/dump/execution.rs +++ b/bin/reth/src/commands/stage/dump/execution.rs @@ -5,28 +5,37 @@ use reth_db::{ cursor::DbCursorRO, database::Database, table::TableImporter, tables, transaction::DbTx, DatabaseEnv, }; -use reth_primitives::{stage::StageCheckpoint, ChainSpec}; -use reth_provider::ProviderFactory; +use reth_node_core::dirs::{ChainPath, DataDirPath}; +use reth_primitives::stage::StageCheckpoint; +use reth_provider::{ChainSpecProvider, ProviderFactory}; use reth_revm::EvmProcessorFactory; use reth_stages::{stages::ExecutionStage, Stage, UnwindInput}; -use std::{path::PathBuf, sync::Arc}; use tracing::info; pub(crate) async fn dump_execution_stage( db_tool: &DbTool, from: u64, to: u64, - output_db: &PathBuf, + output_datadir: ChainPath, should_run: bool, ) -> Result<()> { - let (output_db, tip_block_number) = setup(from, to, output_db, db_tool)?; + let (output_db, tip_block_number) = setup(from, to, &output_datadir.db_path(), db_tool)?; import_tables_with_range(&output_db, db_tool, from, to)?; unwind_and_copy(db_tool, from, tip_block_number, &output_db).await?; if should_run { - dry_run(db_tool.chain.clone(), output_db, to, from).await?; + dry_run( + ProviderFactory::new( + output_db, + db_tool.chain.clone(), + output_datadir.snapshots_path(), + )?, + to, + from, + ) + .await?; } Ok(()) @@ -119,8 +128,7 @@ async fn unwind_and_copy( tip_block_number: u64, output_db: &DatabaseEnv, ) -> eyre::Result<()> { - let factory = ProviderFactory::new(db_tool.provider_factory.db_ref(), db_tool.chain.clone()); - let provider = factory.provider_rw()?; + let provider = db_tool.provider_factory.provider_rw()?; let mut exec_stage = ExecutionStage::new_with_factory(EvmProcessorFactory::new(db_tool.chain.clone())); @@ -146,19 +154,19 @@ async fn unwind_and_copy( /// Try to re-execute the stage without committing async fn dry_run( - chain: Arc, - output_db: DB, + output_provider_factory: ProviderFactory, to: u64, from: u64, ) -> eyre::Result<()> { info!(target: "reth::cli", "Executing stage. [dry-run]"); - let factory = ProviderFactory::new(&output_db, chain.clone()); - let mut exec_stage = ExecutionStage::new_with_factory(EvmProcessorFactory::new(chain.clone())); + let mut exec_stage = ExecutionStage::new_with_factory(EvmProcessorFactory::new( + output_provider_factory.chain_spec().clone(), + )); let input = reth_stages::ExecInput { target: Some(to), checkpoint: Some(StageCheckpoint::new(from)) }; - exec_stage.execute(&factory.provider_rw()?, input)?; + exec_stage.execute(&output_provider_factory.provider_rw()?, input)?; info!(target: "reth::cli", "Success"); diff --git a/bin/reth/src/commands/stage/dump/hashing_account.rs b/bin/reth/src/commands/stage/dump/hashing_account.rs index 93081c36a85f..3bc3c91e58c5 100644 --- a/bin/reth/src/commands/stage/dump/hashing_account.rs +++ b/bin/reth/src/commands/stage/dump/hashing_account.rs @@ -2,20 +2,20 @@ use super::setup; use crate::utils::DbTool; use eyre::Result; use reth_db::{database::Database, table::TableImporter, tables, DatabaseEnv}; -use reth_primitives::{stage::StageCheckpoint, BlockNumber, ChainSpec}; +use reth_node_core::dirs::{ChainPath, DataDirPath}; +use reth_primitives::{stage::StageCheckpoint, BlockNumber}; use reth_provider::ProviderFactory; use reth_stages::{stages::AccountHashingStage, Stage, UnwindInput}; -use std::{path::PathBuf, sync::Arc}; use tracing::info; pub(crate) async fn dump_hashing_account_stage( db_tool: &DbTool, from: BlockNumber, to: BlockNumber, - output_db: &PathBuf, + output_datadir: ChainPath, should_run: bool, ) -> Result<()> { - let (output_db, tip_block_number) = setup(from, to, output_db, db_tool)?; + let (output_db, tip_block_number) = setup(from, to, &output_datadir.db_path(), db_tool)?; // Import relevant AccountChangeSets output_db.update(|tx| { @@ -29,7 +29,16 @@ pub(crate) async fn dump_hashing_account_stage( unwind_and_copy(db_tool, from, tip_block_number, &output_db)?; if should_run { - dry_run(db_tool.chain.clone(), output_db, to, from).await?; + dry_run( + ProviderFactory::new( + output_db, + db_tool.chain.clone(), + output_datadir.snapshots_path(), + )?, + to, + from, + ) + .await?; } Ok(()) @@ -42,8 +51,7 @@ fn unwind_and_copy( tip_block_number: u64, output_db: &DatabaseEnv, ) -> eyre::Result<()> { - let factory = ProviderFactory::new(db_tool.provider_factory.db_ref(), db_tool.chain.clone()); - let provider = factory.provider_rw()?; + let provider = db_tool.provider_factory.provider_rw()?; let mut exec_stage = AccountHashingStage::default(); exec_stage.unwind( @@ -63,15 +71,13 @@ fn unwind_and_copy( /// Try to re-execute the stage straightaway async fn dry_run( - chain: Arc, - output_db: DB, + output_provider_factory: ProviderFactory, to: u64, from: u64, ) -> eyre::Result<()> { info!(target: "reth::cli", "Executing stage."); - let factory = ProviderFactory::new(&output_db, chain); - let provider = factory.provider_rw()?; + let provider = output_provider_factory.provider_rw()?; let mut stage = AccountHashingStage { clean_threshold: 1, // Forces hashing from scratch ..Default::default() diff --git a/bin/reth/src/commands/stage/dump/hashing_storage.rs b/bin/reth/src/commands/stage/dump/hashing_storage.rs index 0c05041db2eb..e739539caacb 100644 --- a/bin/reth/src/commands/stage/dump/hashing_storage.rs +++ b/bin/reth/src/commands/stage/dump/hashing_storage.rs @@ -2,25 +2,34 @@ use super::setup; use crate::utils::DbTool; use eyre::Result; use reth_db::{database::Database, table::TableImporter, tables, DatabaseEnv}; -use reth_primitives::{stage::StageCheckpoint, ChainSpec}; +use reth_node_core::dirs::{ChainPath, DataDirPath}; +use reth_primitives::stage::StageCheckpoint; use reth_provider::ProviderFactory; use reth_stages::{stages::StorageHashingStage, Stage, UnwindInput}; -use std::{path::PathBuf, sync::Arc}; use tracing::info; pub(crate) async fn dump_hashing_storage_stage( db_tool: &DbTool, from: u64, to: u64, - output_db: &PathBuf, + output_datadir: ChainPath, should_run: bool, ) -> Result<()> { - let (output_db, tip_block_number) = setup(from, to, output_db, db_tool)?; + let (output_db, tip_block_number) = setup(from, to, &output_datadir.db_path(), db_tool)?; unwind_and_copy(db_tool, from, tip_block_number, &output_db)?; if should_run { - dry_run(db_tool.chain.clone(), output_db, to, from).await?; + dry_run( + ProviderFactory::new( + output_db, + db_tool.chain.clone(), + output_datadir.snapshots_path(), + )?, + to, + from, + ) + .await?; } Ok(()) @@ -33,8 +42,7 @@ fn unwind_and_copy( tip_block_number: u64, output_db: &DatabaseEnv, ) -> eyre::Result<()> { - let factory = ProviderFactory::new(db_tool.provider_factory.db_ref(), db_tool.chain.clone()); - let provider = factory.provider_rw()?; + let provider = db_tool.provider_factory.provider_rw()?; let mut exec_stage = StorageHashingStage::default(); @@ -58,15 +66,13 @@ fn unwind_and_copy( /// Try to re-execute the stage straightaway async fn dry_run( - chain: Arc, - output_db: DB, + output_provider_factory: ProviderFactory, to: u64, from: u64, ) -> eyre::Result<()> { info!(target: "reth::cli", "Executing stage."); - let factory = ProviderFactory::new(&output_db, chain); - let provider = factory.provider_rw()?; + let provider = output_provider_factory.provider_rw()?; let mut stage = StorageHashingStage { clean_threshold: 1, // Forces hashing from scratch ..Default::default() diff --git a/bin/reth/src/commands/stage/dump/merkle.rs b/bin/reth/src/commands/stage/dump/merkle.rs index 9128d9f2fc3a..e8f2b26897d6 100644 --- a/bin/reth/src/commands/stage/dump/merkle.rs +++ b/bin/reth/src/commands/stage/dump/merkle.rs @@ -2,7 +2,8 @@ use super::setup; use crate::utils::DbTool; use eyre::Result; use reth_db::{database::Database, table::TableImporter, tables, DatabaseEnv}; -use reth_primitives::{stage::StageCheckpoint, BlockNumber, ChainSpec, PruneModes}; +use reth_node_core::dirs::{ChainPath, DataDirPath}; +use reth_primitives::{stage::StageCheckpoint, BlockNumber, PruneModes}; use reth_provider::ProviderFactory; use reth_stages::{ stages::{ @@ -11,17 +12,16 @@ use reth_stages::{ }, Stage, UnwindInput, }; -use std::{path::PathBuf, sync::Arc}; use tracing::info; pub(crate) async fn dump_merkle_stage( db_tool: &DbTool, from: BlockNumber, to: BlockNumber, - output_db: &PathBuf, + output_datadir: ChainPath, should_run: bool, ) -> Result<()> { - let (output_db, tip_block_number) = setup(from, to, output_db, db_tool)?; + let (output_db, tip_block_number) = setup(from, to, &output_datadir.db_path(), db_tool)?; output_db.update(|tx| { tx.import_table_with_range::( @@ -42,7 +42,16 @@ pub(crate) async fn dump_merkle_stage( unwind_and_copy(db_tool, (from, to), tip_block_number, &output_db).await?; if should_run { - dry_run(db_tool.chain.clone(), output_db, to, from).await?; + dry_run( + ProviderFactory::new( + output_db, + db_tool.chain.clone(), + output_datadir.snapshots_path(), + )?, + to, + from, + ) + .await?; } Ok(()) @@ -56,8 +65,7 @@ async fn unwind_and_copy( output_db: &DatabaseEnv, ) -> eyre::Result<()> { let (from, to) = range; - let factory = ProviderFactory::new(db_tool.provider_factory.db_ref(), db_tool.chain.clone()); - let provider = factory.provider_rw()?; + let provider = db_tool.provider_factory.provider_rw()?; let unwind = UnwindInput { unwind_to: from, @@ -119,14 +127,12 @@ async fn unwind_and_copy( /// Try to re-execute the stage straightaway async fn dry_run( - chain: Arc, - output_db: DB, + output_provider_factory: ProviderFactory, to: u64, from: u64, ) -> eyre::Result<()> { info!(target: "reth::cli", "Executing stage."); - let factory = ProviderFactory::new(&output_db, chain); - let provider = factory.provider_rw()?; + let provider = output_provider_factory.provider_rw()?; let mut stage = MerkleStage::Execution { // Forces updating the root instead of calculating from scratch diff --git a/bin/reth/src/commands/stage/dump/mod.rs b/bin/reth/src/commands/stage/dump/mod.rs index 36e608a692e6..78d1744b508d 100644 --- a/bin/reth/src/commands/stage/dump/mod.rs +++ b/bin/reth/src/commands/stage/dump/mod.rs @@ -14,6 +14,7 @@ use reth_db::{ cursor::DbCursorRO, database::Database, init_db, table::TableImporter, tables, transaction::DbTx, DatabaseEnv, }; +use reth_node_core::dirs::PlatformPath; use reth_primitives::ChainSpec; use reth_provider::ProviderFactory; use std::{path::PathBuf, sync::Arc}; @@ -80,9 +81,9 @@ pub enum Stages { /// Stage command that takes a range #[derive(Debug, Clone, Parser)] pub struct StageCommand { - /// The path to the new database folder. + /// The path to the new datadir folder. #[arg(long, value_name = "OUTPUT_PATH", verbatim_doc_comment)] - output_db: PathBuf, + output_datadir: PlatformPath, /// From which block. #[arg(long, short)] @@ -105,25 +106,53 @@ impl Command { info!(target: "reth::cli", path = ?db_path, "Opening database"); let db = Arc::new(init_db(db_path, DatabaseArguments::default().log_level(self.db.log_level))?); - let provider_factory = ProviderFactory::new(db, self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let provider_factory = + ProviderFactory::new(db, self.chain.clone(), data_dir.snapshots_path())?; info!(target: "reth::cli", "Database opened"); let tool = DbTool::new(provider_factory, self.chain.clone())?; match &self.command { - Stages::Execution(StageCommand { output_db, from, to, dry_run, .. }) => { - dump_execution_stage(&tool, *from, *to, output_db, *dry_run).await? + Stages::Execution(StageCommand { output_datadir, from, to, dry_run, .. }) => { + dump_execution_stage( + &tool, + *from, + *to, + output_datadir.with_chain(self.chain.chain), + *dry_run, + ) + .await? } - Stages::StorageHashing(StageCommand { output_db, from, to, dry_run, .. }) => { - dump_hashing_storage_stage(&tool, *from, *to, output_db, *dry_run).await? + Stages::StorageHashing(StageCommand { output_datadir, from, to, dry_run, .. }) => { + dump_hashing_storage_stage( + &tool, + *from, + *to, + output_datadir.with_chain(self.chain.chain), + *dry_run, + ) + .await? } - Stages::AccountHashing(StageCommand { output_db, from, to, dry_run, .. }) => { - dump_hashing_account_stage(&tool, *from, *to, output_db, *dry_run).await? + Stages::AccountHashing(StageCommand { output_datadir, from, to, dry_run, .. }) => { + dump_hashing_account_stage( + &tool, + *from, + *to, + output_datadir.with_chain(self.chain.chain), + *dry_run, + ) + .await? } - Stages::Merkle(StageCommand { output_db, from, to, dry_run, .. }) => { - dump_merkle_stage(&tool, *from, *to, output_db, *dry_run).await? + Stages::Merkle(StageCommand { output_datadir, from, to, dry_run, .. }) => { + dump_merkle_stage( + &tool, + *from, + *to, + output_datadir.with_chain(self.chain.chain), + *dry_run, + ) + .await? } } @@ -143,9 +172,9 @@ pub(crate) fn setup( info!(target: "reth::cli", ?output_db, "Creating separate db"); - let output_db = init_db(output_db, Default::default())?; + let output_datadir = init_db(output_db, Default::default())?; - output_db.update(|tx| { + output_datadir.update(|tx| { tx.import_table_with_range::( &db_tool.provider_factory.db_ref().tx()?, Some(from - 1), @@ -159,5 +188,5 @@ pub(crate) fn setup( .view(|tx| tx.cursor_read::()?.last())?? .expect("some"); - Ok((output_db, tip_block_number)) + Ok((output_datadir, tip_block_number)) } diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index d356546c23d9..eb7f04a9341d 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -127,8 +127,8 @@ impl Command { Arc::new(init_db(db_path, DatabaseArguments::default().log_level(self.db.log_level))?); info!(target: "reth::cli", "Database opened"); - let factory = ProviderFactory::new(Arc::clone(&db), self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let factory = + ProviderFactory::new(Arc::clone(&db), self.chain.clone(), data_dir.snapshots_path())?; let mut provider_rw = factory.provider_rw()?; if let Some(listen_addr) = self.metrics { @@ -166,8 +166,11 @@ impl Command { let default_peers_path = data_dir.known_peers_path(); - let provider_factory = - Arc::new(ProviderFactory::new(db.clone(), self.chain.clone())); + let provider_factory = Arc::new(ProviderFactory::new( + db.clone(), + self.chain.clone(), + data_dir.snapshots_path(), + )?); let network = self .network diff --git a/bin/reth/src/commands/stage/unwind.rs b/bin/reth/src/commands/stage/unwind.rs index f08be082b80c..1cc5039fba8c 100644 --- a/bin/reth/src/commands/stage/unwind.rs +++ b/bin/reth/src/commands/stage/unwind.rs @@ -68,8 +68,7 @@ impl Command { eyre::bail!("Cannot unwind genesis block") } - let factory = ProviderFactory::new(&db, self.chain.clone()) - .with_snapshots(data_dir.snapshots_path())?; + let factory = ProviderFactory::new(&db, self.chain.clone(), data_dir.snapshots_path())?; let provider = factory.provider_rw()?; let blocks_and_execution = provider diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index 99ad98b567b4..b7b806953af8 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -1887,9 +1887,7 @@ mod tests { }; use assert_matches::assert_matches; use reth_interfaces::test_utils::generators::{self, Rng}; - use reth_primitives::{ - stage::StageCheckpoint, ChainSpec, ChainSpecBuilder, B256, MAINNET, U256, - }; + use reth_primitives::{stage::StageCheckpoint, ChainSpecBuilder, MAINNET}; use reth_provider::{BlockWriter, ProviderFactory}; use reth_rpc_types::engine::{ForkchoiceState, ForkchoiceUpdated, PayloadStatus}; use reth_rpc_types_compat::engine::payload::try_block_to_payload_v1; @@ -2062,12 +2060,10 @@ mod tests { } fn insert_blocks<'a, DB: Database>( - db: DB, - chain: Arc, + provider_factory: ProviderFactory, mut blocks: impl Iterator, ) { - let factory = ProviderFactory::new(db, chain); - let provider = factory.provider_rw().unwrap(); + let provider = provider_factory.provider_rw().unwrap(); blocks .try_for_each(|b| { provider @@ -2083,8 +2079,9 @@ mod tests { mod fork_choice_updated { use super::*; - use reth_db::{tables, transaction::DbTxMut}; + use reth_db::{tables, test_utils::create_test_snapshots_dir, transaction::DbTxMut}; use reth_interfaces::test_utils::generators::random_block; + use reth_primitives::U256; use reth_rpc_types::engine::ForkchoiceUpdateError; #[tokio::test] @@ -2137,7 +2134,15 @@ mod tests { let genesis = random_block(&mut rng, 0, None, None, Some(0)); let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0)); - insert_blocks(env.db.as_ref(), chain_spec.clone(), [&genesis, &block1].into_iter()); + insert_blocks( + ProviderFactory::new( + env.db.as_ref(), + chain_spec.clone(), + create_test_snapshots_dir(), + ) + .expect("create provider factory with snapshots"), + [&genesis, &block1].into_iter(), + ); env.db .update(|tx| { tx.put::( @@ -2187,7 +2192,15 @@ mod tests { let genesis = random_block(&mut rng, 0, None, None, Some(0)); let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0)); - insert_blocks(env.db.as_ref(), chain_spec.clone(), [&genesis, &block1].into_iter()); + insert_blocks( + ProviderFactory::new( + env.db.as_ref(), + chain_spec.clone(), + create_test_snapshots_dir(), + ) + .expect("create provider factory with snapshots"), + [&genesis, &block1].into_iter(), + ); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -2203,7 +2216,15 @@ mod tests { let invalid_rx = env.send_forkchoice_updated(next_forkchoice_state).await; // Insert next head immediately after sending forkchoice update - insert_blocks(env.db.as_ref(), chain_spec.clone(), [&next_head].into_iter()); + insert_blocks( + ProviderFactory::new( + env.db.as_ref(), + chain_spec.clone(), + create_test_snapshots_dir(), + ) + .expect("create provider factory with snapshots"), + [&next_head].into_iter(), + ); let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); assert_matches!(invalid_rx, Ok(result) => assert_eq!(result, expected_result)); @@ -2237,7 +2258,15 @@ mod tests { let genesis = random_block(&mut rng, 0, None, None, Some(0)); let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0)); - insert_blocks(env.db.as_ref(), chain_spec.clone(), [&genesis, &block1].into_iter()); + insert_blocks( + ProviderFactory::new( + env.db.as_ref(), + chain_spec.clone(), + create_test_snapshots_dir(), + ) + .expect("create provider factory with snapshots"), + [&genesis, &block1].into_iter(), + ); let engine = spawn_consensus_engine(consensus_engine); @@ -2285,8 +2314,12 @@ mod tests { block3.header.difficulty = U256::from(1); insert_blocks( - env.db.as_ref(), - chain_spec.clone(), + ProviderFactory::new( + env.db.as_ref(), + chain_spec.clone(), + create_test_snapshots_dir(), + ) + .expect("create provider factory with snapshots"), [&genesis, &block1, &block2, &block3].into_iter(), ); @@ -2328,7 +2361,15 @@ mod tests { let genesis = random_block(&mut rng, 0, None, None, Some(0)); let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0)); - insert_blocks(env.db.as_ref(), chain_spec.clone(), [&genesis, &block1].into_iter()); + insert_blocks( + ProviderFactory::new( + env.db.as_ref(), + chain_spec.clone(), + create_test_snapshots_dir(), + ) + .expect("create provider factory with snapshots"), + [&genesis, &block1].into_iter(), + ); let _engine = spawn_consensus_engine(consensus_engine); @@ -2350,6 +2391,7 @@ mod tests { mod new_payload { use super::*; + use reth_db::test_utils::create_test_snapshots_dir; use reth_interfaces::test_utils::{generators, generators::random_block}; use reth_primitives::{ genesis::{Genesis, GenesisAllocator}, @@ -2424,8 +2466,12 @@ mod tests { let block1 = random_block(&mut rng, 1, Some(genesis.hash), None, Some(0)); let block2 = random_block(&mut rng, 2, Some(block1.hash), None, Some(0)); insert_blocks( - env.db.as_ref(), - chain_spec.clone(), + ProviderFactory::new( + env.db.as_ref(), + chain_spec.clone(), + create_test_snapshots_dir(), + ) + .expect("create provider factory with snapshots"), [&genesis, &block1, &block2].into_iter(), ); @@ -2490,7 +2536,15 @@ mod tests { // TODO: add transactions that transfer from the alloc accounts, generating the new // block tx and state root - insert_blocks(env.db.as_ref(), chain_spec.clone(), [&genesis, &block1].into_iter()); + insert_blocks( + ProviderFactory::new( + env.db.as_ref(), + chain_spec.clone(), + create_test_snapshots_dir(), + ) + .expect("create provider factory with snapshots"), + [&genesis, &block1].into_iter(), + ); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -2528,7 +2582,15 @@ mod tests { let genesis = random_block(&mut rng, 0, None, None, Some(0)); - insert_blocks(env.db.as_ref(), chain_spec.clone(), [&genesis].into_iter()); + insert_blocks( + ProviderFactory::new( + env.db.as_ref(), + chain_spec.clone(), + create_test_snapshots_dir(), + ) + .expect("create provider factory with snapshots"), + [&genesis].into_iter(), + ); let mut engine_rx = spawn_consensus_engine(consensus_engine); @@ -2585,8 +2647,12 @@ mod tests { .build(); insert_blocks( - env.db.as_ref(), - chain_spec.clone(), + ProviderFactory::new( + env.db.as_ref(), + chain_spec.clone(), + create_test_snapshots_dir(), + ) + .expect("create provider factory with snapshots"), [&data.genesis, &block1].into_iter(), ); diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index 420ea7635f00..77b682665c72 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -476,9 +476,7 @@ where let snapshotter = Snapshotter::new( provider_factory.clone(), - provider_factory - .snapshot_provider() - .expect("snapshot provider initialized via provider factory"), + provider_factory.snapshot_provider(), PruneModes::default(), ); diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index 2cfdeb42c1a6..76a8f05462fe 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -597,7 +597,7 @@ mod tests { }; use assert_matches::assert_matches; use futures_util::stream::StreamExt; - use reth_db::test_utils::create_test_rw_db; + use reth_db::test_utils::{create_test_rw_db, create_test_snapshots_dir}; use reth_interfaces::test_utils::{generators, generators::random_block_range, TestConsensus}; use reth_primitives::{BlockBody, B256, MAINNET}; use reth_provider::ProviderFactory; @@ -619,7 +619,7 @@ mod tests { let mut downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone()), + ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()).unwrap(), ); downloader.set_download_range(0..=19).expect("failed to set download range"); @@ -658,7 +658,7 @@ mod tests { BodiesDownloaderBuilder::default().with_request_limit(request_limit).build( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone()), + ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()).unwrap(), ); downloader.set_download_range(0..=199).expect("failed to set download range"); @@ -687,7 +687,7 @@ mod tests { .build( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone()), + ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()).unwrap(), ); let mut range_start = 0; @@ -717,7 +717,7 @@ mod tests { let mut downloader = BodiesDownloaderBuilder::default().with_stream_batch_size(100).build( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone()), + ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()).unwrap(), ); // Set and download the first range @@ -757,7 +757,7 @@ mod tests { .build( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone()), + ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()).unwrap(), ); // Set and download the entire range @@ -788,7 +788,7 @@ mod tests { .build( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone()), + ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()).unwrap(), ); // Download the requested range diff --git a/crates/net/downloaders/src/bodies/task.rs b/crates/net/downloaders/src/bodies/task.rs index 9a713a8539bc..3a508af16287 100644 --- a/crates/net/downloaders/src/bodies/task.rs +++ b/crates/net/downloaders/src/bodies/task.rs @@ -169,7 +169,7 @@ mod tests { test_utils::{generate_bodies, TestBodiesClient}, }; use assert_matches::assert_matches; - use reth_db::test_utils::create_test_rw_db; + use reth_db::test_utils::{create_test_rw_db, create_test_snapshots_dir}; use reth_interfaces::{p2p::error::DownloadError, test_utils::TestConsensus}; use reth_primitives::MAINNET; use reth_provider::ProviderFactory; @@ -190,7 +190,8 @@ mod tests { let downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone()), + ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()) + .expect("create provider factory with snapshots"), ); let mut downloader = TaskDownloader::spawn(downloader); @@ -212,7 +213,8 @@ mod tests { let downloader = BodiesDownloaderBuilder::default().build( Arc::new(TestBodiesClient::default()), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone()), + ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()) + .expect("create provider factory with snapshots"), ); let mut downloader = TaskDownloader::spawn(downloader); diff --git a/crates/net/downloaders/src/file_client.rs b/crates/net/downloaders/src/file_client.rs index 2ccd6d6bb244..aaaca9027aaa 100644 --- a/crates/net/downloaders/src/file_client.rs +++ b/crates/net/downloaders/src/file_client.rs @@ -241,7 +241,7 @@ mod tests { }; use assert_matches::assert_matches; use futures_util::stream::StreamExt; - use reth_db::test_utils::create_test_rw_db; + use reth_db::test_utils::{create_test_rw_db, create_test_snapshots_dir}; use reth_interfaces::{ p2p::{ bodies::downloader::BodyDownloader, @@ -269,7 +269,8 @@ mod tests { let mut downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone()), + ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()) + .expect("create provider factory with snapshots"), ); downloader.set_download_range(0..=19).expect("failed to set download range"); @@ -350,7 +351,8 @@ mod tests { let mut downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone()), + ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()) + .expect("create provider factory with snapshots"), ); downloader.set_download_range(0..=19).expect("failed to set download range"); diff --git a/crates/node-core/src/node_config.rs b/crates/node-core/src/node_config.rs index 60928b0fbaf0..00204c39d1df 100644 --- a/crates/node-core/src/node_config.rs +++ b/crates/node-core/src/node_config.rs @@ -1016,9 +1016,11 @@ impl NodeBuilderWit let prometheus_handle = self.config.install_prometheus_recorder()?; info!(target: "reth::cli", "Database opened"); - let provider_factory = - ProviderFactory::new(Arc::clone(&self.db), Arc::clone(&self.config.chain)) - .with_snapshots(self.data_dir.snapshots_path())?; + let provider_factory = ProviderFactory::new( + Arc::clone(&self.db), + Arc::clone(&self.config.chain), + self.data_dir.snapshots_path(), + )?; self.config.start_metrics_endpoint(prometheus_handle, Arc::clone(&self.db)).await?; @@ -1142,9 +1144,7 @@ impl NodeBuilderWit let snapshotter = Snapshotter::new( provider_factory.clone(), - provider_factory - .snapshot_provider() - .expect("snapshot provider initialized via provider factory"), + provider_factory.snapshot_provider(), prune_config.clone().unwrap_or_default().segments, ); hooks.add(SnapshotHook::new(snapshotter.clone(), Box::new(executor.clone()))); diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index d2dfac3733a0..ab9fd5cd53a3 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -196,28 +196,25 @@ impl Pruner { fn snapshot_segments(&self) -> Vec>> { let mut segments = Vec::>>::new(); - if let Some(snapshot_provider) = self.provider_factory.snapshot_provider() { - if let Some(to_block) = - snapshot_provider.get_highest_snapshot_block(SnapshotSegment::Transactions) - { - segments.push(Box::new(segments::Transactions::new(PruneMode::before_inclusive( - to_block, - )))) - } + let snapshot_provider = self.provider_factory.snapshot_provider(); - if let Some(to_block) = - snapshot_provider.get_highest_snapshot_block(SnapshotSegment::Headers) - { - segments - .push(Box::new(segments::Headers::new(PruneMode::before_inclusive(to_block)))) - } + if let Some(to_block) = + snapshot_provider.get_highest_snapshot_block(SnapshotSegment::Transactions) + { + segments + .push(Box::new(segments::Transactions::new(PruneMode::before_inclusive(to_block)))) + } - if let Some(to_block) = - snapshot_provider.get_highest_snapshot_block(SnapshotSegment::Receipts) - { - segments - .push(Box::new(segments::Receipts::new(PruneMode::before_inclusive(to_block)))) - } + if let Some(to_block) = + snapshot_provider.get_highest_snapshot_block(SnapshotSegment::Headers) + { + segments.push(Box::new(segments::Headers::new(PruneMode::before_inclusive(to_block)))) + } + + if let Some(to_block) = + snapshot_provider.get_highest_snapshot_block(SnapshotSegment::Receipts) + { + segments.push(Box::new(segments::Receipts::new(PruneMode::before_inclusive(to_block)))) } segments @@ -249,14 +246,16 @@ impl Pruner { #[cfg(test)] mod tests { use crate::Pruner; - use reth_db::test_utils::create_test_rw_db; + use reth_db::test_utils::{create_test_rw_db, create_test_snapshots_dir}; use reth_primitives::MAINNET; use reth_provider::ProviderFactory; #[test] fn is_pruning_needed() { let db = create_test_rw_db(); - let provider_factory = ProviderFactory::new(db, MAINNET.clone()); + let provider_factory = + ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()) + .expect("create provide factory with snapshots"); let mut pruner = Pruner::new(provider_factory, vec![], 5, 0, 5); // No last pruned block number was set before diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index 53ffd2862da5..2b897dd27a43 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -204,12 +204,8 @@ mod tests { } db.insert_receipts(receipts).expect("insert receipts"); - let snapshots_dir = tempfile::TempDir::new().unwrap(); - let provider_factory = db - .factory - .with_snapshots(snapshots_dir.path().to_path_buf()) - .expect("factory with snapshots"); - let snapshot_provider = provider_factory.snapshot_provider().unwrap(); + let provider_factory = db.factory; + let snapshot_provider = provider_factory.snapshot_provider(); let snapshotter = Snapshotter::new(provider_factory, snapshot_provider.clone(), PruneModes::default()); diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index f784ac830322..18845046056f 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -124,7 +124,7 @@ impl Stage for BodyStage { // Get id for the next tx_num of zero if there are no transactions. let mut next_tx_num = tx_block_cursor.last()?.map(|(id, _)| id + 1).unwrap_or_default(); - let snapshot_provider = provider.snapshot_provider().expect("should exist"); + let snapshot_provider = provider.snapshot_provider(); let mut snapshotter = snapshot_provider.get_writer(from_block, SnapshotSegment::Transactions)?; @@ -190,7 +190,7 @@ impl Stage for BodyStage { segment: SnapshotSegment::Transactions, database: block_number, static_file: appended_block_number, - }); + }) } } @@ -213,7 +213,7 @@ impl Stage for BodyStage { segment: SnapshotSegment::Transactions, database: next_tx_num, static_file: appended_tx_number, - }); + }) } // Increment transaction id for each transaction. @@ -265,7 +265,7 @@ impl Stage for BodyStage { ) -> Result { self.buffer.take(); - let snapshot_provider = provider.snapshot_provider().expect("should exist"); + let snapshot_provider = provider.snapshot_provider(); let tx = provider.tx_ref(); // Cursors to unwind bodies, ommers let mut body_cursor = tx.cursor_write::()?; @@ -537,7 +537,7 @@ mod tests { .expect("Written block data invalid"); // Delete a transaction - let snapshot_provider = runner.db().factory.snapshot_provider().expect("should exist"); + let snapshot_provider = runner.db().factory.snapshot_provider(); { let mut snapshotter = snapshot_provider.latest_writer(SnapshotSegment::Transactions).unwrap(); @@ -669,7 +669,7 @@ mod tests { let start = input.checkpoint().block_number; let end = input.target(); - let snapshot_provider = self.db.factory.snapshot_provider().expect("should exist"); + let snapshot_provider = self.db.factory.snapshot_provider(); let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, start..=end, GENESIS_HASH, 0..2); @@ -772,11 +772,7 @@ mod tests { prev_progress: BlockNumber, highest_block: BlockNumber, ) -> Result<(), TestRunnerError> { - let snapshot_provider = self - .db - .factory - .snapshot_provider() - .expect("snapshot provider should be initalized."); + let snapshot_provider = self.db.factory.snapshot_provider(); self.db.query(|tx| { // Acquire cursors on body related tables diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 383782218258..dc87988a2bb3 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -545,7 +545,7 @@ where .unwrap_or(0); // Get next expected receipt number in static files - let snapshot_provider = provider.snapshot_provider().expect("should exist"); + let snapshot_provider = provider.snapshot_provider(); let mut snapshotter = snapshot_provider.get_writer(start_block, SnapshotSegment::Receipts)?; let next_snapshot_receipt_num = snapshotter .get_highest_snapshot_tx(SnapshotSegment::Receipts) @@ -618,7 +618,9 @@ mod tests { #[test] fn execution_checkpoint_matches() { let state_db = create_test_rw_db(); - let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone()); + let factory = + ProviderFactory::new(state_db.as_ref(), MAINNET.clone(), create_test_snapshots_dir()) + .expect("create provider factory with snapshots"); let tx = factory.provider_rw().unwrap(); let previous_stage_checkpoint = ExecutionCheckpoint { @@ -643,7 +645,9 @@ mod tests { #[test] fn execution_checkpoint_precedes() { let state_db = create_test_rw_db(); - let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone()); + let factory = + ProviderFactory::new(state_db.as_ref(), MAINNET.clone(), create_test_snapshots_dir()) + .expect("create provider factory with snapshots"); let provider = factory.provider_rw().unwrap(); let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); @@ -687,7 +691,9 @@ mod tests { #[test] fn execution_checkpoint_recalculate_full_previous_some() { let state_db = create_test_rw_db(); - let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone()); + let factory = + ProviderFactory::new(state_db.as_ref(), MAINNET.clone(), create_test_snapshots_dir()) + .expect("create provider factory with snapshots"); let provider = factory.provider_rw().unwrap(); let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); @@ -723,7 +729,9 @@ mod tests { #[test] fn execution_checkpoint_recalculate_full_previous_none() { let state_db = create_test_rw_db(); - let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone()); + let factory = + ProviderFactory::new(state_db.as_ref(), MAINNET.clone(), create_test_snapshots_dir()) + .expect("create provider factory with snapshots"); let provider = factory.provider_rw().unwrap(); let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); @@ -753,9 +761,9 @@ mod tests { // TODO cleanup the setup after https://github.com/paradigmxyz/reth/issues/332 // is merged as it has similar framework let state_db = create_test_rw_db(); - let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone()) - .with_snapshots(create_test_snapshots_dir()) - .unwrap(); + let factory = + ProviderFactory::new(state_db.as_ref(), MAINNET.clone(), create_test_snapshots_dir()) + .expect("create provider factory with snapshots"); let provider = factory.provider_rw().unwrap(); let input = ExecInput { target: Some(1), checkpoint: None }; let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); @@ -790,7 +798,26 @@ mod tests { db_tx.put::(code_hash, Bytecode::new_raw(code.to_vec().into())).unwrap(); provider.commit().unwrap(); - // execute + let provider = factory.provider_rw().unwrap(); + let mut execution_stage: ExecutionStage = stage(); + let output = execution_stage.execute(&provider, input).unwrap(); + provider.commit().unwrap(); + assert_matches!(output, ExecOutput { + checkpoint: StageCheckpoint { + block_number: 1, + stage_checkpoint: Some(StageUnitCheckpoint::Execution(ExecutionCheckpoint { + block_range: CheckpointBlockRange { + from: 1, + to: 1, + }, + progress: EntitiesCheckpoint { + processed, + total + } + })) + }, + done: true + } if processed == total && total == block.gas_used); // If there is a pruning configuration, then it's forced to use the database. // This way we test both cases. @@ -893,9 +920,9 @@ mod tests { // is merged as it has similar framework let state_db = create_test_rw_db(); - let factory = ProviderFactory::new(state_db.as_ref(), MAINNET.clone()) - .with_snapshots(create_test_snapshots_dir()) - .unwrap(); + let factory = + ProviderFactory::new(state_db.as_ref(), MAINNET.clone(), create_test_snapshots_dir()) + .expect("create provider factory with snapshots"); let provider = factory.provider_rw().unwrap(); let input = ExecInput { target: Some(1), checkpoint: None }; let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 1090aca9a4b3..de47e19fc7bb 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -29,9 +29,12 @@ impl Default for TestStageDB { /// Create a new instance of [TestStageDB] fn default() -> Self { Self { - factory: ProviderFactory::new(create_test_rw_db(), MAINNET.clone()) - .with_snapshots(create_test_snapshots_dir()) - .unwrap(), + factory: ProviderFactory::new( + create_test_rw_db(), + MAINNET.clone(), + create_test_snapshots_dir(), + ) + .unwrap(), } } } @@ -39,9 +42,12 @@ impl Default for TestStageDB { impl TestStageDB { pub fn new(path: &Path) -> Self { Self { - factory: ProviderFactory::new(create_test_rw_db_with_path(path), MAINNET.clone()) - .with_snapshots(create_test_snapshots_dir()) - .unwrap(), + factory: ProviderFactory::new( + create_test_rw_db_with_path(path), + MAINNET.clone(), + create_test_snapshots_dir(), + ) + .unwrap(), } } diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index c0966db2bbfd..13b8dfe8c226 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -42,13 +42,21 @@ pub struct ProviderFactory { /// Chain spec chain_spec: Arc, /// Snapshot Provider - snapshot_provider: Option>, + snapshot_provider: Arc, } impl ProviderFactory { /// Create new database provider factory. - pub fn new(db: DB, chain_spec: Arc) -> Self { - Self { db, chain_spec, snapshot_provider: None } + pub fn new( + db: DB, + chain_spec: Arc, + snapshots_path: PathBuf, + ) -> RethResult> { + Ok(Self { + db, + chain_spec, + snapshot_provider: Arc::new(SnapshotProvider::new(snapshots_path)?), + }) } /// Create new database provider by passing a path. [`ProviderFactory`] will own the database @@ -57,27 +65,22 @@ impl ProviderFactory { path: P, chain_spec: Arc, args: DatabaseArguments, + snapshots_path: PathBuf, ) -> RethResult> { Ok(ProviderFactory:: { db: init_db(path, args).map_err(|e| RethError::Custom(e.to_string()))?, chain_spec, - snapshot_provider: None, + snapshot_provider: Arc::new(SnapshotProvider::new(snapshots_path)?), }) } - /// Database provider that comes with a shared snapshot provider. - pub fn with_snapshots(mut self, snapshots_path: PathBuf) -> ProviderResult { - self.snapshot_provider = Some(Arc::new(SnapshotProvider::new(snapshots_path)?)); - Ok(self) - } - /// Returns reference to the underlying database. pub fn db_ref(&self) -> &DB { &self.db } /// Returns snapshot provider - pub fn snapshot_provider(&self) -> Option> { + pub fn snapshot_provider(&self) -> Arc { self.snapshot_provider.clone() } } @@ -88,11 +91,11 @@ impl ProviderFactory { /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open. #[track_caller] pub fn provider(&self) -> ProviderResult> { - let mut provider = DatabaseProvider::new(self.db.tx()?, self.chain_spec.clone()); - - if let Some(snapshot_provider) = &self.snapshot_provider { - provider = provider.with_snapshot_provider(snapshot_provider.clone()); - } + let provider = DatabaseProvider::new( + self.db.tx()?, + self.chain_spec.clone(), + self.snapshot_provider.clone(), + ); Ok(provider) } @@ -103,11 +106,11 @@ impl ProviderFactory { /// open. #[track_caller] pub fn provider_rw(&self) -> ProviderResult> { - let mut provider = DatabaseProvider::new_rw(self.db.tx_mut()?, self.chain_spec.clone()); - - if let Some(snapshot_provider) = &self.snapshot_provider { - provider = provider.with_snapshot_provider(snapshot_provider.clone()); - } + let provider = DatabaseProvider::new_rw( + self.db.tx_mut()?, + self.chain_spec.clone(), + self.snapshot_provider.clone(), + ); Ok(DatabaseProviderRW(provider)) } @@ -501,7 +504,12 @@ mod tests { use alloy_rlp::Decodable; use assert_matches::assert_matches; use rand::Rng; - use reth_db::{tables, test_utils::ERROR_TEMPDIR, transaction::DbTxMut, DatabaseEnv}; + use reth_db::{ + tables, + test_utils::{create_test_snapshots_dir, ERROR_TEMPDIR}, + transaction::DbTxMut, + DatabaseEnv, + }; use reth_interfaces::{ provider::ProviderError, test_utils::{ @@ -549,6 +557,7 @@ mod tests { tempfile::TempDir::new().expect(ERROR_TEMPDIR).into_path(), Arc::new(chain_spec), Default::default(), + create_test_snapshots_dir(), ) .unwrap(); diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 979e046eb41e..9a4642d24dff 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -83,9 +83,7 @@ impl DerefMut for DatabaseProviderRW { impl DatabaseProviderRW { /// Commit database transaction and snapshot if it exists. pub fn commit(self) -> ProviderResult { - if let Some(snapshot_provider) = &self.0.snapshot_provider { - snapshot_provider.commit()?; - } + self.0.snapshot_provider.commit()?; self.0.commit() } @@ -104,20 +102,24 @@ pub struct DatabaseProvider { /// Chain spec chain_spec: Arc, /// Snapshot provider - snapshot_provider: Option>, + snapshot_provider: Arc, } impl DatabaseProvider { - /// Returns a snapshot provider reference - pub fn snapshot_provider(&self) -> Option<&Arc> { - self.snapshot_provider.as_ref() + /// Returns a snapshot provider + pub fn snapshot_provider(&self) -> &Arc { + &self.snapshot_provider } } impl DatabaseProvider { /// Creates a provider with an inner read-write transaction. - pub fn new_rw(tx: TX, chain_spec: Arc) -> Self { - Self { tx, chain_spec, snapshot_provider: None } + pub fn new_rw( + tx: TX, + chain_spec: Arc, + snapshot_provider: Arc, + ) -> Self { + Self { tx, chain_spec, snapshot_provider } } } @@ -219,14 +221,12 @@ where impl DatabaseProvider { /// Creates a provider with an inner read-only transaction. - pub fn new(tx: TX, chain_spec: Arc) -> Self { - Self { tx, chain_spec, snapshot_provider: None } - } - - /// Creates a new [`Self`] with access to a [`SnapshotProvider`]. - pub fn with_snapshot_provider(mut self, snapshot_provider: Arc) -> Self { - self.snapshot_provider = Some(snapshot_provider); - self + pub fn new( + tx: TX, + chain_spec: Arc, + snapshot_provider: Arc, + ) -> Self { + Self { tx, chain_spec, snapshot_provider } } /// Consume `DbTx` or `DbTxMut`. @@ -291,23 +291,20 @@ impl DatabaseProvider { { let mut data = Vec::new(); - if let Some(snapshot_provider) = &self.snapshot_provider { - // If there is, check the maximum block or transaction number of the segment. - if let Some(snapshot_upper_bound) = match segment { - SnapshotSegment::Headers => snapshot_provider.get_highest_snapshot_block(segment), - SnapshotSegment::Transactions | SnapshotSegment::Receipts => { - snapshot_provider.get_highest_snapshot_tx(segment) - } - } { - if block_or_tx_range.start <= snapshot_upper_bound { - let end = block_or_tx_range.end.min(snapshot_upper_bound + 1); - data.extend(fetch_from_snapshot( - snapshot_provider, - block_or_tx_range.start..end, - &mut predicate, - )?); - block_or_tx_range.start = end; - } + if let Some(snapshot_upper_bound) = match segment { + SnapshotSegment::Headers => self.snapshot_provider.get_highest_snapshot_block(segment), + SnapshotSegment::Transactions | SnapshotSegment::Receipts => { + self.snapshot_provider.get_highest_snapshot_tx(segment) + } + } { + if block_or_tx_range.start <= snapshot_upper_bound { + let end = block_or_tx_range.end.min(snapshot_upper_bound + 1); + data.extend(fetch_from_snapshot( + &self.snapshot_provider, + block_or_tx_range.start..end, + &mut predicate, + )?); + block_or_tx_range.start = end; } } @@ -338,21 +335,18 @@ impl DatabaseProvider { FS: Fn(&SnapshotProvider) -> ProviderResult>, FD: Fn() -> ProviderResult>, { - if let Some(provider) = &self.snapshot_provider { - // If there is, check the maximum block or transaction number of the segment. - let snapshot_upper_bound = match segment { - SnapshotSegment::Headers => provider.get_highest_snapshot_block(segment), - SnapshotSegment::Transactions | SnapshotSegment::Receipts => { - provider.get_highest_snapshot_tx(segment) - } - }; - - if snapshot_upper_bound - .map_or(false, |snapshot_upper_bound| snapshot_upper_bound >= number) - { - return fetch_from_snapshot(provider) + let snapshot_upper_bound = match segment { + SnapshotSegment::Headers => self.snapshot_provider.get_highest_snapshot_block(segment), + SnapshotSegment::Transactions | SnapshotSegment::Receipts => { + self.snapshot_provider.get_highest_snapshot_tx(segment) } + }; + + if snapshot_upper_bound.map_or(false, |snapshot_upper_bound| snapshot_upper_bound >= number) + { + return fetch_from_snapshot(&self.snapshot_provider) } + fetch_from_database() } @@ -2564,12 +2558,11 @@ impl PruneCheckpointWriter for DatabaseProvider { impl StatsReader for DatabaseProvider { fn count_entries(&self) -> ProviderResult { let db_entries = self.tx.entries::()?; - let snapshot_entries = - match self.snapshot_provider.as_ref().map(|provider| provider.count_entries::()) { - Some(Ok(entries)) => entries, - Some(Err(ProviderError::UnsupportedProvider)) | None => 0, - Some(Err(err)) => return Err(err), - }; + let snapshot_entries = match self.snapshot_provider.count_entries::() { + Ok(entries) => entries, + Err(ProviderError::UnsupportedProvider) => 0, + Err(err) => return Err(err), + }; Ok(db_entries + snapshot_entries) } diff --git a/crates/storage/provider/src/test_utils/mod.rs b/crates/storage/provider/src/test_utils/mod.rs index cbb36a799bf3..f19db949d65f 100644 --- a/crates/storage/provider/src/test_utils/mod.rs +++ b/crates/storage/provider/src/test_utils/mod.rs @@ -27,7 +27,6 @@ pub fn create_test_provider_factory_with_chain_spec( chain_spec: Arc, ) -> ProviderFactory>> { let db = create_test_rw_db(); - ProviderFactory::new(db, chain_spec) - .with_snapshots(create_test_snapshots_dir()) - .expect("with snapshots") + ProviderFactory::new(db, chain_spec, create_test_snapshots_dir()) + .expect("create provider factory with snapshots") } diff --git a/examples/db-access.rs b/examples/db-access.rs index b1985239375f..204ea4fd40f0 100644 --- a/examples/db-access.rs +++ b/examples/db-access.rs @@ -18,12 +18,14 @@ fn main() -> eyre::Result<()> { // Opens a RO handle to the database file. // TODO: Should be able to do `ProviderFactory::new_with_db_path_ro(...)` instead of // doing in 2 steps. - let db = open_db_read_only(Path::new(&std::env::var("RETH_DB_PATH")?), Default::default())?; + let db_path = std::env::var("RETH_DB_PATH")?; + let db_path = Path::new(&db_path); + let db = open_db_read_only(db_path.join("db").as_path(), Default::default())?; // Instantiate a provider factory for Ethereum mainnet using the provided DB. // TODO: Should the DB version include the spec so that you do not need to specify it here? let spec = ChainSpecBuilder::mainnet().build(); - let factory = ProviderFactory::new(db, spec.into()); + let factory = ProviderFactory::new(db, spec.into(), db_path.join("snapshots"))?; // This call opens a RO transaction on the database. To write to the DB you'd need to call // the `provider_rw` function and look for the `Writer` variants of the traits. diff --git a/examples/rpc-db/src/main.rs b/examples/rpc-db/src/main.rs index 6048a39bf4e2..40fbe18437f8 100644 --- a/examples/rpc-db/src/main.rs +++ b/examples/rpc-db/src/main.rs @@ -35,12 +35,11 @@ pub mod myrpc_ext; #[tokio::main] async fn main() -> eyre::Result<()> { // 1. Setup the DB - let db = Arc::new(open_db_read_only( - Path::new(&std::env::var("RETH_DB_PATH")?), - Default::default(), - )?); + let db_path = std::env::var("RETH_DB_PATH")?; + let db_path = Path::new(&db_path); + let db = Arc::new(open_db_read_only(db_path.join("db").as_path(), Default::default())?); let spec = Arc::new(ChainSpecBuilder::mainnet().build()); - let factory = ProviderFactory::new(db.clone(), spec.clone()); + let factory = ProviderFactory::new(db.clone(), spec.clone(), db_path.join("snapshots"))?; // 2. Setup the blockchain provider using only the database provider and a noop for the tree to // satisfy trait bounds. Tree is not used in this example since we are only operating on the diff --git a/testing/ef-tests/src/cases/blockchain_test.rs b/testing/ef-tests/src/cases/blockchain_test.rs index 0221e17f1212..c73ccdef46f8 100644 --- a/testing/ef-tests/src/cases/blockchain_test.rs +++ b/testing/ef-tests/src/cases/blockchain_test.rs @@ -77,11 +77,13 @@ impl Case for BlockchainTestCase { }) { // Create a new test database and initialize a provider for the test case. let db = create_test_rw_db(); - let provider = ProviderFactory::new(db.as_ref(), Arc::new(case.network.clone().into())) - .with_snapshots(create_test_snapshots_dir()) - .map_err(|err| Error::RethError(err.into()))? - .provider_rw() - .unwrap(); + let provider = ProviderFactory::new( + db.as_ref(), + Arc::new(case.network.clone().into()), + create_test_snapshots_dir(), + )? + .provider_rw() + .unwrap(); // Insert initial test state into the provider. provider From cf69ea966fe579ba67c56462effd3cb733403016 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 2 Feb 2024 13:48:57 +0000 Subject: [PATCH 2/4] fix doctest --- crates/stages/src/lib.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index 987d74bd9442..0c43a06dd393 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -43,9 +43,7 @@ //! # let executor_factory = EvmProcessorFactory::new(chain_spec.clone()); //! # let snapshotter = Snapshotter::new( //! # provider_factory.clone(), -//! # provider_factory -//! # .snapshot_provider() -//! # .expect("snapshot provider initialized via provider factory"), +//! # provider_factory.snapshot_provider(), //! # PruneModes::default() //! # ); //! // Create a pipeline that can fully sync From c85dcabedc2bc310a16cbe2399b54575a62b023d Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 2 Feb 2024 15:13:56 +0000 Subject: [PATCH 3/4] fix execution test --- crates/stages/src/stages/execution.rs | 31 ++++++--------------------- 1 file changed, 6 insertions(+), 25 deletions(-) diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index dc87988a2bb3..37907e9a12c5 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -594,7 +594,10 @@ mod tests { Bytecode, ChainSpecBuilder, PruneMode, PruneModes, ReceiptsLogPruneConfig, SealedBlock, StorageEntry, B256, MAINNET, U256, }; - use reth_provider::{AccountReader, BlockWriter, ProviderFactory, ReceiptProvider}; + use reth_provider::{ + test_utils::create_test_provider_factory, AccountReader, BlockWriter, ProviderFactory, + ReceiptProvider, + }; use reth_revm::EvmProcessorFactory; use std::{collections::BTreeMap, sync::Arc}; @@ -760,10 +763,7 @@ mod tests { async fn sanity_execution_of_block() { // TODO cleanup the setup after https://github.com/paradigmxyz/reth/issues/332 // is merged as it has similar framework - let state_db = create_test_rw_db(); - let factory = - ProviderFactory::new(state_db.as_ref(), MAINNET.clone(), create_test_snapshots_dir()) - .expect("create provider factory with snapshots"); + let factory = create_test_provider_factory(); let provider = factory.provider_rw().unwrap(); let input = ExecInput { target: Some(1), checkpoint: None }; let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); @@ -798,26 +798,7 @@ mod tests { db_tx.put::(code_hash, Bytecode::new_raw(code.to_vec().into())).unwrap(); provider.commit().unwrap(); - let provider = factory.provider_rw().unwrap(); - let mut execution_stage: ExecutionStage = stage(); - let output = execution_stage.execute(&provider, input).unwrap(); - provider.commit().unwrap(); - assert_matches!(output, ExecOutput { - checkpoint: StageCheckpoint { - block_number: 1, - stage_checkpoint: Some(StageUnitCheckpoint::Execution(ExecutionCheckpoint { - block_range: CheckpointBlockRange { - from: 1, - to: 1, - }, - progress: EntitiesCheckpoint { - processed, - total - } - })) - }, - done: true - } if processed == total && total == block.gas_used); + // execute // If there is a pruning configuration, then it's forced to use the database. // This way we test both cases. From 138c398bc92187e2c78c7c9b876b7d7be3d7f956 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 2 Feb 2024 15:34:06 +0000 Subject: [PATCH 4/4] use create_test_provider_factory everywhere --- crates/net/downloaders/src/bodies/task.rs | 15 +++----- crates/net/downloaders/src/file_client.rs | 19 +++++----- crates/stages/src/stages/execution.rs | 35 +++++-------------- .../provider/src/providers/database/mod.rs | 12 +++---- 4 files changed, 25 insertions(+), 56 deletions(-) diff --git a/crates/net/downloaders/src/bodies/task.rs b/crates/net/downloaders/src/bodies/task.rs index 3a508af16287..7fe63ca38347 100644 --- a/crates/net/downloaders/src/bodies/task.rs +++ b/crates/net/downloaders/src/bodies/task.rs @@ -169,20 +169,18 @@ mod tests { test_utils::{generate_bodies, TestBodiesClient}, }; use assert_matches::assert_matches; - use reth_db::test_utils::{create_test_rw_db, create_test_snapshots_dir}; use reth_interfaces::{p2p::error::DownloadError, test_utils::TestConsensus}; - use reth_primitives::MAINNET; - use reth_provider::ProviderFactory; + use reth_provider::test_utils::create_test_provider_factory; use std::sync::Arc; #[tokio::test(flavor = "multi_thread")] async fn download_one_by_one_on_task() { reth_tracing::init_test_tracing(); - let db = create_test_rw_db(); + let factory = create_test_provider_factory(); let (headers, mut bodies) = generate_bodies(0..=19); - insert_headers(db.db(), &headers); + insert_headers(factory.db_ref().db(), &headers); let client = Arc::new( TestBodiesClient::default().with_bodies(bodies.clone()).with_should_delay(true), @@ -190,8 +188,7 @@ mod tests { let downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()) - .expect("create provider factory with snapshots"), + factory, ); let mut downloader = TaskDownloader::spawn(downloader); @@ -209,12 +206,10 @@ mod tests { async fn set_download_range_error_returned() { reth_tracing::init_test_tracing(); - let db = create_test_rw_db(); let downloader = BodiesDownloaderBuilder::default().build( Arc::new(TestBodiesClient::default()), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()) - .expect("create provider factory with snapshots"), + create_test_provider_factory(), ); let mut downloader = TaskDownloader::spawn(downloader); diff --git a/crates/net/downloaders/src/file_client.rs b/crates/net/downloaders/src/file_client.rs index aaaca9027aaa..260d107799db 100644 --- a/crates/net/downloaders/src/file_client.rs +++ b/crates/net/downloaders/src/file_client.rs @@ -241,7 +241,6 @@ mod tests { }; use assert_matches::assert_matches; use futures_util::stream::StreamExt; - use reth_db::test_utils::{create_test_rw_db, create_test_snapshots_dir}; use reth_interfaces::{ p2p::{ bodies::downloader::BodyDownloader, @@ -249,17 +248,17 @@ mod tests { }, test_utils::TestConsensus, }; - use reth_primitives::{SealedHeader, MAINNET}; - use reth_provider::ProviderFactory; + use reth_primitives::SealedHeader; + use reth_provider::test_utils::create_test_provider_factory; use std::sync::Arc; #[tokio::test] async fn streams_bodies_from_buffer() { // Generate some random blocks - let db = create_test_rw_db(); + let factory = create_test_provider_factory(); let (headers, mut bodies) = generate_bodies(0..=19); - insert_headers(db.db(), &headers); + insert_headers(factory.db_ref().db(), &headers); // create an empty file let file = tempfile::tempfile().unwrap(); @@ -269,8 +268,7 @@ mod tests { let mut downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()) - .expect("create provider factory with snapshots"), + factory, ); downloader.set_download_range(0..=19).expect("failed to set download range"); @@ -339,20 +337,19 @@ mod tests { #[tokio::test] async fn test_download_bodies_from_file() { // Generate some random blocks - let db = create_test_rw_db(); + let factory = create_test_provider_factory(); let (file, headers, mut bodies) = generate_bodies_file(0..=19).await; // now try to read them back let client = Arc::new(FileClient::from_file(file).await.unwrap()); // insert headers in db for the bodies downloader - insert_headers(db.db(), &headers); + insert_headers(factory.db_ref().db(), &headers); let mut downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), - ProviderFactory::new(db, MAINNET.clone(), create_test_snapshots_dir()) - .expect("create provider factory with snapshots"), + factory, ); downloader.set_download_range(0..=19).expect("failed to set download range"); diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 37907e9a12c5..baf83223cee6 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -584,19 +584,15 @@ mod tests { use crate::test_utils::TestStageDB; use alloy_rlp::Decodable; use assert_matches::assert_matches; - use reth_db::{ - models::AccountBeforeTx, - test_utils::{create_test_rw_db, create_test_snapshots_dir}, - }; + use reth_db::models::AccountBeforeTx; use reth_interfaces::executor::BlockValidationError; use reth_primitives::{ address, hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Address, Bytecode, ChainSpecBuilder, PruneMode, PruneModes, ReceiptsLogPruneConfig, SealedBlock, - StorageEntry, B256, MAINNET, U256, + StorageEntry, B256, U256, }; use reth_provider::{ - test_utils::create_test_provider_factory, AccountReader, BlockWriter, ProviderFactory, - ReceiptProvider, + test_utils::create_test_provider_factory, AccountReader, BlockWriter, ReceiptProvider, }; use reth_revm::EvmProcessorFactory; use std::{collections::BTreeMap, sync::Arc}; @@ -620,10 +616,7 @@ mod tests { #[test] fn execution_checkpoint_matches() { - let state_db = create_test_rw_db(); - let factory = - ProviderFactory::new(state_db.as_ref(), MAINNET.clone(), create_test_snapshots_dir()) - .expect("create provider factory with snapshots"); + let factory = create_test_provider_factory(); let tx = factory.provider_rw().unwrap(); let previous_stage_checkpoint = ExecutionCheckpoint { @@ -647,10 +640,7 @@ mod tests { #[test] fn execution_checkpoint_precedes() { - let state_db = create_test_rw_db(); - let factory = - ProviderFactory::new(state_db.as_ref(), MAINNET.clone(), create_test_snapshots_dir()) - .expect("create provider factory with snapshots"); + let factory = create_test_provider_factory(); let provider = factory.provider_rw().unwrap(); let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); @@ -693,10 +683,7 @@ mod tests { #[test] fn execution_checkpoint_recalculate_full_previous_some() { - let state_db = create_test_rw_db(); - let factory = - ProviderFactory::new(state_db.as_ref(), MAINNET.clone(), create_test_snapshots_dir()) - .expect("create provider factory with snapshots"); + let factory = create_test_provider_factory(); let provider = factory.provider_rw().unwrap(); let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); @@ -731,10 +718,7 @@ mod tests { #[test] fn execution_checkpoint_recalculate_full_previous_none() { - let state_db = create_test_rw_db(); - let factory = - ProviderFactory::new(state_db.as_ref(), MAINNET.clone(), create_test_snapshots_dir()) - .expect("create provider factory with snapshots"); + let factory = create_test_provider_factory(); let provider = factory.provider_rw().unwrap(); let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); @@ -900,10 +884,7 @@ mod tests { // TODO cleanup the setup after https://github.com/paradigmxyz/reth/issues/332 // is merged as it has similar framework - let state_db = create_test_rw_db(); - let factory = - ProviderFactory::new(state_db.as_ref(), MAINNET.clone(), create_test_snapshots_dir()) - .expect("create provider factory with snapshots"); + let factory = create_test_provider_factory(); let provider = factory.provider_rw().unwrap(); let input = ExecInput { target: Some(1), checkpoint: None }; let mut genesis_rlp = hex!("f901faf901f5a00000000000000000000000000000000000000000000000000000000000000000a01dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347942adc25665018aa1fe0e6bc666dac8fc2697ff9baa045571b40ae66ca7480791bbb2887286e4e4c4b1b298b191c889d6959023a32eda056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421a056e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421b901000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000083020000808502540be400808000a00000000000000000000000000000000000000000000000000000000000000000880000000000000000c0c0").as_slice(); diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index 13b8dfe8c226..23b3a2d9b06e 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -91,13 +91,11 @@ impl ProviderFactory { /// [`BlockHashReader`]. This may fail if the inner read database transaction fails to open. #[track_caller] pub fn provider(&self) -> ProviderResult> { - let provider = DatabaseProvider::new( + Ok(DatabaseProvider::new( self.db.tx()?, self.chain_spec.clone(), self.snapshot_provider.clone(), - ); - - Ok(provider) + )) } /// Returns a provider with a created `DbTxMut` inside, which allows fetching and updating @@ -106,13 +104,11 @@ impl ProviderFactory { /// open. #[track_caller] pub fn provider_rw(&self) -> ProviderResult> { - let provider = DatabaseProvider::new_rw( + Ok(DatabaseProviderRW(DatabaseProvider::new_rw( self.db.tx_mut()?, self.chain_spec.clone(), self.snapshot_provider.clone(), - ); - - Ok(DatabaseProviderRW(provider)) + ))) } /// Storage provider for latest block