From 3063a3454ac62d0358e9db8ce015d0bf2b5a42b0 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Fri, 17 Nov 2023 12:35:35 +0200 Subject: [PATCH 1/3] chore(downloader): replace database with header provider --- Cargo.lock | 1 + bin/reth/src/chain/import.rs | 6 +- bin/reth/src/debug_cmd/execution.rs | 6 +- bin/reth/src/node/mod.rs | 6 +- bin/reth/src/stage/run.rs | 8 +- .../consensus/beacon/src/engine/test_utils.rs | 7 +- crates/net/downloaders/Cargo.toml | 5 +- crates/net/downloaders/src/bodies/bodies.rs | 127 +++++++++--------- crates/net/downloaders/src/bodies/task.rs | 15 ++- .../downloaders/src/test_utils/file_client.rs | 7 +- 10 files changed, 103 insertions(+), 85 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 124e6245b2d2..463dccf88c0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5890,6 +5890,7 @@ dependencies = [ "reth-interfaces", "reth-metrics", "reth-primitives", + "reth-provider", "reth-tasks", "reth-tracing", "tempfile", diff --git a/bin/reth/src/chain/import.rs b/bin/reth/src/chain/import.rs index 572f8c0ee2f8..7478aad47f17 100644 --- a/bin/reth/src/chain/import.rs +++ b/bin/reth/src/chain/import.rs @@ -147,7 +147,11 @@ impl ImportCommand { .into_task(); let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies) - .build(file_client.clone(), consensus.clone(), db.clone()) + .build( + file_client.clone(), + consensus.clone(), + ProviderFactory::new(db.clone(), self.chain.clone()), + ) .into_task(); let (tip_tx, tip_rx) = watch::channel(B256::ZERO); diff --git a/bin/reth/src/debug_cmd/execution.rs b/bin/reth/src/debug_cmd/execution.rs index 83c5549451ca..c248819a0ea2 100644 --- a/bin/reth/src/debug_cmd/execution.rs +++ b/bin/reth/src/debug_cmd/execution.rs @@ -102,7 +102,11 @@ impl Command { .into_task_with(task_executor); let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies) - .build(client, Arc::clone(&consensus), db.clone()) + .build( + client, + Arc::clone(&consensus), + ProviderFactory::new(db.clone(), self.chain.clone()), + ) .into_task_with(task_executor); let stage_conf = &config.stages; diff --git a/bin/reth/src/node/mod.rs b/bin/reth/src/node/mod.rs index a144a3bcea66..da07c17f1e80 100644 --- a/bin/reth/src/node/mod.rs +++ b/bin/reth/src/node/mod.rs @@ -617,7 +617,11 @@ impl NodeCommand { .into_task_with(task_executor); let body_downloader = BodiesDownloaderBuilder::from(config.stages.bodies) - .build(client, Arc::clone(&consensus), db.clone()) + .build( + client, + Arc::clone(&consensus), + ProviderFactory::new(db.clone(), self.chain.clone()), + ) .into_task_with(task_executor); let pipeline = self diff --git a/bin/reth/src/stage/run.rs b/bin/reth/src/stage/run.rs index 5eaeaf361ad8..589bcbf7d029 100644 --- a/bin/reth/src/stage/run.rs +++ b/bin/reth/src/stage/run.rs @@ -163,6 +163,9 @@ impl Command { let default_peers_path = data_dir.known_peers_path(); + let provider_factory = + Arc::new(ProviderFactory::new(db.clone(), self.chain.clone())); + let network = self .network .network_config( @@ -171,7 +174,7 @@ impl Command { p2p_secret_key, default_peers_path, ) - .build(Arc::new(ProviderFactory::new(db.clone(), self.chain.clone()))) + .build(provider_factory.clone()) .start_network() .await?; let fetch_client = Arc::new(network.fetch_client().await?); @@ -187,9 +190,8 @@ impl Command { config.stages.bodies.downloader_min_concurrent_requests..= config.stages.bodies.downloader_max_concurrent_requests, ) - .build(fetch_client, consensus.clone(), db.clone()), + .build(fetch_client, consensus.clone(), provider_factory), ); - (Box::new(stage), None) } StageEnum::Senders => (Box::new(SenderRecoveryStage::new(batch_size)), None), diff --git a/crates/consensus/beacon/src/engine/test_utils.rs b/crates/consensus/beacon/src/engine/test_utils.rs index f58ebf0133a5..b916d3e89c85 100644 --- a/crates/consensus/beacon/src/engine/test_utils.rs +++ b/crates/consensus/beacon/src/engine/test_utils.rs @@ -455,6 +455,8 @@ where pub fn build(self) -> (TestBeaconConsensusEngine, TestEnv>) { reth_tracing::init_test_tracing(); let db = create_test_rw_db(); + let provider_factory = + ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone()); let consensus: Arc = match self.base_config.consensus { TestConsensusConfig::Real => { @@ -496,7 +498,7 @@ where .into_task(); let body_downloader = BodiesDownloaderBuilder::default() - .build(client.clone(), consensus.clone(), db.clone()) + .build(client.clone(), consensus.clone(), provider_factory.clone()) .into_task(); Pipeline::builder().add_stages(DefaultStages::new( @@ -527,9 +529,8 @@ where let tree = ShareableBlockchainTree::new( BlockchainTree::new(externals, config, None).expect("failed to create tree"), ); - let shareable_db = ProviderFactory::new(db.clone(), self.base_config.chain_spec.clone()); let latest = self.base_config.chain_spec.genesis_header().seal_slow(); - let blockchain_provider = BlockchainProvider::with_latest(shareable_db, tree, latest); + let blockchain_provider = BlockchainProvider::with_latest(provider_factory, tree, latest); let pruner = Pruner::new( db.clone(), diff --git a/crates/net/downloaders/Cargo.toml b/crates/net/downloaders/Cargo.toml index cdb3317dbe84..3a50908b13d2 100644 --- a/crates/net/downloaders/Cargo.toml +++ b/crates/net/downloaders/Cargo.toml @@ -12,8 +12,8 @@ description = "Implementations of various block downloaders" # reth reth-interfaces.workspace = true reth-primitives.workspace = true -reth-db.workspace = true reth-tasks.workspace = true +reth-provider.workspace = true # async futures.workspace = true @@ -33,6 +33,7 @@ rayon.workspace = true thiserror.workspace = true # optional deps for the test-utils feature +reth-db = { workspace = true, optional = true } alloy-rlp = { workspace = true, optional = true } tempfile = { workspace = true, optional = true } itertools = { workspace = true, optional = true } @@ -50,4 +51,4 @@ itertools.workspace = true tempfile.workspace = true [features] -test-utils = ["dep:alloy-rlp", "dep:tempfile", "dep:itertools", "reth-interfaces/test-utils"] +test-utils = ["dep:alloy-rlp", "dep:tempfile", "dep:itertools", "reth-db/test-utils", "reth-interfaces/test-utils"] diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index a1451bb5b159..1912b66ff674 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -2,7 +2,6 @@ use super::queue::BodiesRequestQueue; use crate::{bodies::task::TaskDownloader, metrics::BodyDownloaderMetrics}; use futures::Stream; use futures_util::StreamExt; -use reth_db::{cursor::DbCursorRO, database::Database, tables, transaction::DbTx}; use reth_interfaces::{ consensus::Consensus, p2p::{ @@ -15,6 +14,7 @@ use reth_interfaces::{ }, }; use reth_primitives::{BlockNumber, SealedHeader}; +use reth_provider::HeaderProvider; use reth_tasks::{TaskSpawner, TokioTaskExecutor}; use std::{ cmp::Ordering, @@ -27,22 +27,18 @@ use std::{ }; use tracing::info; -/// The scope for headers downloader metrics. -pub const BODIES_DOWNLOADER_SCOPE: &str = "downloaders.bodies"; - /// Downloads bodies in batches. /// /// All blocks in a batch are fetched at the same time. #[must_use = "Stream does nothing unless polled"] #[derive(Debug)] -pub struct BodiesDownloader { +pub struct BodiesDownloader { /// The bodies client client: Arc, /// The consensus client consensus: Arc, - // TODO: make this a [HeaderProvider] /// The database handle - db: DB, + provider: Provider, /// The maximum number of non-empty blocks per one request request_limit: u64, /// The maximum number of block bodies returned at once from the stream @@ -67,10 +63,10 @@ pub struct BodiesDownloader { metrics: BodyDownloaderMetrics, } -impl BodiesDownloader +impl BodiesDownloader where B: BodiesClient + 'static, - DB: Database + Unpin + 'static, + Provider: HeaderProvider + Unpin + 'static, { /// Returns the next contiguous request. fn next_headers_request(&mut self) -> DownloadResult>> { @@ -103,47 +99,32 @@ where return Ok(None) } - // Collection of results - let mut headers = Vec::new(); - - // Non empty headers count - let mut non_empty_headers = 0; - let mut current_block_num = *range.start(); - - // Acquire cursors over canonical and header tables - let tx = self.db.tx()?; - let mut canonical_cursor = tx.cursor_read::()?; - let mut header_cursor = tx.cursor_read::()?; - // Collect headers while // 1. Current block number is in range // 2. The number of non empty headers is less than maximum // 3. The total number of headers is less than the stream batch size (this is only - // relevant if the range consists entirely of empty headers) - while range.contains(¤t_block_num) && - non_empty_headers < max_non_empty && - headers.len() < self.stream_batch_size - { - // Find the block hash. - let (number, hash) = canonical_cursor - .seek_exact(current_block_num)? - .ok_or(DownloadError::MissingHeader { block_number: current_block_num })?; - // Find the block header. - let (_, header) = header_cursor - .seek_exact(number)? - .ok_or(DownloadError::MissingHeader { block_number: number })?; - - // If the header is not empty, increment the counter - if !header.is_empty() { - non_empty_headers += 1; - } - - // Add header to the result collection - headers.push(header.seal(hash)); + // relevant if the range consists entirely of empty headers) + let mut collected = 0; + let mut non_empty_headers = 0; + let headers = self + .provider + .sealed_headers_while(range.clone(), |header| { + let should_take = range.contains(&header.number) && + non_empty_headers < max_non_empty && + collected < self.stream_batch_size; + + if should_take { + collected += 1; + if !header.is_empty() { + non_empty_headers += 1; + } + true + } else { + false + } + }) + .unwrap(); // TODO: decide what to do with RethError 😡 - // Increment current block number - current_block_num += 1; - } Ok(Some(headers).filter(|h| !h.is_empty())) } @@ -286,10 +267,10 @@ where } } -impl BodiesDownloader +impl BodiesDownloader where B: BodiesClient + 'static, - DB: Database + Unpin + 'static, + Provider: HeaderProvider + Unpin + 'static, Self: BodyDownloader + 'static, { /// Spawns the downloader task via [tokio::task::spawn] @@ -306,10 +287,10 @@ where } } -impl BodyDownloader for BodiesDownloader +impl BodyDownloader for BodiesDownloader where B: BodiesClient + 'static, - DB: Database + Unpin + 'static, + Provider: HeaderProvider + Unpin + 'static, { /// Set a new download range (exclusive). /// @@ -354,10 +335,10 @@ where } } -impl Stream for BodiesDownloader +impl Stream for BodiesDownloader where B: BodiesClient + 'static, - DB: Database + Unpin + 'static, + Provider: HeaderProvider + Unpin + 'static, { type Item = BodyDownloaderResult; @@ -557,15 +538,15 @@ impl BodiesDownloaderBuilder { } /// Consume self and return the concurrent downloader. - pub fn build( + pub fn build( self, client: B, consensus: Arc, - db: DB, - ) -> BodiesDownloader + provider: Provider, + ) -> BodiesDownloader where B: BodiesClient + 'static, - DB: Database, + Provider: HeaderProvider, { let Self { request_limit, @@ -578,7 +559,7 @@ impl BodiesDownloaderBuilder { BodiesDownloader { client: Arc::new(client), consensus, - db, + provider, request_limit, stream_batch_size, max_buffered_blocks_size_bytes, @@ -605,7 +586,8 @@ mod tests { use futures_util::stream::StreamExt; use reth_db::test_utils::create_test_rw_db; use reth_interfaces::test_utils::{generators, generators::random_block_range, TestConsensus}; - use reth_primitives::{BlockBody, B256}; + use reth_primitives::{BlockBody, B256, MAINNET}; + use reth_provider::ProviderFactory; use std::{collections::HashMap, sync::Arc}; // Check that the blocks are emitted in order of block number, not in order of @@ -624,7 +606,7 @@ mod tests { let mut downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), - db, + ProviderFactory::new(db, MAINNET.clone()), ); downloader.set_download_range(0..=19).expect("failed to set download range"); @@ -659,9 +641,12 @@ mod tests { let request_limit = 10; let client = Arc::new(TestBodiesClient::default().with_bodies(bodies.clone())); - let mut downloader = BodiesDownloaderBuilder::default() - .with_request_limit(request_limit) - .build(client.clone(), Arc::new(TestConsensus::default()), db); + let mut downloader = + BodiesDownloaderBuilder::default().with_request_limit(request_limit).build( + client.clone(), + Arc::new(TestConsensus::default()), + ProviderFactory::new(db, MAINNET.clone()), + ); downloader.set_download_range(0..=199).expect("failed to set download range"); let _ = downloader.collect::>().await; @@ -686,7 +671,11 @@ mod tests { let mut downloader = BodiesDownloaderBuilder::default() .with_stream_batch_size(stream_batch_size) .with_request_limit(request_limit) - .build(client.clone(), Arc::new(TestConsensus::default()), db); + .build( + client.clone(), + Arc::new(TestConsensus::default()), + ProviderFactory::new(db, MAINNET.clone()), + ); let mut range_start = 0; while range_start < 100 { @@ -715,7 +704,7 @@ mod tests { let mut downloader = BodiesDownloaderBuilder::default().with_stream_batch_size(100).build( client.clone(), Arc::new(TestConsensus::default()), - db, + ProviderFactory::new(db, MAINNET.clone()), ); // Set and download the first range @@ -752,7 +741,11 @@ mod tests { .with_stream_batch_size(10) .with_request_limit(1) .with_max_buffered_blocks_size_bytes(1) - .build(client.clone(), Arc::new(TestConsensus::default()), db); + .build( + client.clone(), + Arc::new(TestConsensus::default()), + ProviderFactory::new(db, MAINNET.clone()), + ); // Set and download the entire range downloader.set_download_range(0..=199).expect("failed to set download range"); @@ -779,7 +772,11 @@ mod tests { let mut downloader = BodiesDownloaderBuilder::default() .with_request_limit(3) .with_stream_batch_size(100) - .build(client.clone(), Arc::new(TestConsensus::default()), db); + .build( + client.clone(), + Arc::new(TestConsensus::default()), + ProviderFactory::new(db, MAINNET.clone()), + ); // Download the requested range downloader.set_download_range(0..=99).expect("failed to set download range"); diff --git a/crates/net/downloaders/src/bodies/task.rs b/crates/net/downloaders/src/bodies/task.rs index 97748f54f7e1..28965e51f8b6 100644 --- a/crates/net/downloaders/src/bodies/task.rs +++ b/crates/net/downloaders/src/bodies/task.rs @@ -42,16 +42,17 @@ impl TaskDownloader { /// # Example /// /// ``` - /// use reth_db::database::Database; /// use reth_downloaders::bodies::{bodies::BodiesDownloaderBuilder, task::TaskDownloader}; /// use reth_interfaces::{consensus::Consensus, p2p::bodies::client::BodiesClient}; + /// use reth_provider::HeaderProvider; /// use std::sync::Arc; - /// fn t( + /// + /// fn t( /// client: Arc, /// consensus: Arc, - /// db: Arc, + /// provider: Provider, /// ) { - /// let downloader = BodiesDownloaderBuilder::default().build(client, consensus, db); + /// let downloader = BodiesDownloaderBuilder::default().build(client, consensus, provider); /// let downloader = TaskDownloader::spawn(downloader); /// } /// ``` @@ -170,6 +171,8 @@ mod tests { use assert_matches::assert_matches; use reth_db::test_utils::create_test_rw_db; use reth_interfaces::{p2p::error::DownloadError, test_utils::TestConsensus}; + use reth_primitives::MAINNET; + use reth_provider::ProviderFactory; use std::sync::Arc; #[tokio::test(flavor = "multi_thread")] @@ -187,7 +190,7 @@ mod tests { let downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), - db, + ProviderFactory::new(db, MAINNET.clone()), ); let mut downloader = TaskDownloader::spawn(downloader); @@ -209,7 +212,7 @@ mod tests { let downloader = BodiesDownloaderBuilder::default().build( Arc::new(TestBodiesClient::default()), Arc::new(TestConsensus::default()), - db, + ProviderFactory::new(db, MAINNET.clone()), ); let mut downloader = TaskDownloader::spawn(downloader); diff --git a/crates/net/downloaders/src/test_utils/file_client.rs b/crates/net/downloaders/src/test_utils/file_client.rs index 45df474e1e90..69320fe4b171 100644 --- a/crates/net/downloaders/src/test_utils/file_client.rs +++ b/crates/net/downloaders/src/test_utils/file_client.rs @@ -267,7 +267,8 @@ mod tests { }, test_utils::TestConsensus, }; - use reth_primitives::SealedHeader; + use reth_primitives::{SealedHeader, MAINNET}; + use reth_provider::ProviderFactory; use std::{ io::{Read, Seek, SeekFrom, Write}, sync::Arc, @@ -291,7 +292,7 @@ mod tests { let mut downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), - db, + ProviderFactory::new(db, MAINNET.clone()), ); downloader.set_download_range(0..=19).expect("failed to set download range"); @@ -373,7 +374,7 @@ mod tests { let mut downloader = BodiesDownloaderBuilder::default().build( client.clone(), Arc::new(TestConsensus::default()), - db, + ProviderFactory::new(db, MAINNET.clone()), ); downloader.set_download_range(0..=19).expect("failed to set download range"); From 192fad599dafcc2e32b677738d727dd0cfbfbf42 Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Fri, 17 Nov 2023 17:30:11 +0200 Subject: [PATCH 2/3] fix error handling --- crates/interfaces/src/p2p/error.rs | 6 +-- crates/net/downloaders/src/bodies/bodies.rs | 31 +++++++--------- crates/stages/src/stages/bodies.rs | 41 ++++++++++++--------- 3 files changed, 40 insertions(+), 38 deletions(-) diff --git a/crates/interfaces/src/p2p/error.rs b/crates/interfaces/src/p2p/error.rs index 53cf68e4b4de..9758c8ab2e43 100644 --- a/crates/interfaces/src/p2p/error.rs +++ b/crates/interfaces/src/p2p/error.rs @@ -1,5 +1,5 @@ use super::headers::client::HeadersRequest; -use crate::{consensus::ConsensusError, db}; +use crate::{consensus::ConsensusError, provider::ProviderError}; use reth_network_api::ReputationChangeKind; use reth_primitives::{ BlockHashOrNumber, BlockNumber, GotExpected, GotExpectedBoxed, Header, WithPeerId, B256, @@ -177,9 +177,9 @@ pub enum DownloadError { /// Error while executing the request. #[error(transparent)] RequestError(#[from] RequestError), - /// Error while reading data from database. + /// Provider error. #[error(transparent)] - DatabaseError(#[from] db::DatabaseError), + Provider(#[from] ProviderError), } #[cfg(test)] diff --git a/crates/net/downloaders/src/bodies/bodies.rs b/crates/net/downloaders/src/bodies/bodies.rs index 1912b66ff674..b601865d3a6c 100644 --- a/crates/net/downloaders/src/bodies/bodies.rs +++ b/crates/net/downloaders/src/bodies/bodies.rs @@ -106,24 +106,21 @@ where // relevant if the range consists entirely of empty headers) let mut collected = 0; let mut non_empty_headers = 0; - let headers = self - .provider - .sealed_headers_while(range.clone(), |header| { - let should_take = range.contains(&header.number) && - non_empty_headers < max_non_empty && - collected < self.stream_batch_size; - - if should_take { - collected += 1; - if !header.is_empty() { - non_empty_headers += 1; - } - true - } else { - false + let headers = self.provider.sealed_headers_while(range.clone(), |header| { + let should_take = range.contains(&header.number) && + non_empty_headers < max_non_empty && + collected < self.stream_batch_size; + + if should_take { + collected += 1; + if !header.is_empty() { + non_empty_headers += 1; } - }) - .unwrap(); // TODO: decide what to do with RethError 😡 + true + } else { + false + } + })?; Ok(Some(headers).filter(|h| !h.is_empty())) } diff --git a/crates/stages/src/stages/bodies.rs b/crates/stages/src/stages/bodies.rs index cb908ebf95a9..9e69efaccd05 100644 --- a/crates/stages/src/stages/bodies.rs +++ b/crates/stages/src/stages/bodies.rs @@ -482,7 +482,7 @@ mod tests { tables, test_utils::TempDatabase, transaction::{DbTx, DbTxMut}, - DatabaseEnv, + DatabaseEnv, DatabaseError, }; use reth_interfaces::{ p2p::{ @@ -492,7 +492,7 @@ mod tests { response::BlockResponse, }, download::DownloadClient, - error::DownloadResult, + error::{DownloadError, DownloadResult}, priority::Priority, }, test_utils::{ @@ -778,22 +778,27 @@ mod tests { &mut self, range: RangeInclusive, ) -> DownloadResult<()> { - self.headers = - VecDeque::from(self.db.view(|tx| -> DownloadResult> { - let mut header_cursor = tx.cursor_read::()?; - - let mut canonical_cursor = tx.cursor_read::()?; - let walker = canonical_cursor.walk_range(range)?; - - let mut headers = Vec::default(); - for entry in walker { - let (num, hash) = entry?; - let (_, header) = - header_cursor.seek_exact(num)?.expect("missing header"); - headers.push(header.seal(hash)); - } - Ok(headers) - })??); + self.headers = VecDeque::from( + self.db + .view(|tx| -> Result, DatabaseError> { + let mut header_cursor = tx.cursor_read::()?; + + let mut canonical_cursor = + tx.cursor_read::()?; + let walker = canonical_cursor.walk_range(range)?; + + let mut headers = Vec::default(); + for entry in walker { + let (num, hash) = entry?; + let (_, header) = + header_cursor.seek_exact(num)?.expect("missing header"); + headers.push(header.seal(hash)); + } + Ok(headers) + }) + .map_err(|err| DownloadError::Provider(err.into()))? + .map_err(|err| DownloadError::Provider(err.into()))?, + ); Ok(()) } } From d8984cf4bd2cad0ddde2ee3cad543d0d2e7b6def Mon Sep 17 00:00:00 2001 From: Roman Krasiuk Date: Fri, 17 Nov 2023 21:25:01 +0200 Subject: [PATCH 3/3] fix docs tests --- crates/net/downloaders/src/bodies/task.rs | 2 +- crates/stages/src/lib.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/net/downloaders/src/bodies/task.rs b/crates/net/downloaders/src/bodies/task.rs index 28965e51f8b6..9a713a8539bc 100644 --- a/crates/net/downloaders/src/bodies/task.rs +++ b/crates/net/downloaders/src/bodies/task.rs @@ -47,7 +47,7 @@ impl TaskDownloader { /// use reth_provider::HeaderProvider; /// use std::sync::Arc; /// - /// fn t( + /// fn t( /// client: Arc, /// consensus: Arc, /// provider: Provider, diff --git a/crates/stages/src/lib.rs b/crates/stages/src/lib.rs index bf9ba9e8dd94..8651dce35194 100644 --- a/crates/stages/src/lib.rs +++ b/crates/stages/src/lib.rs @@ -36,7 +36,7 @@ //! # let bodies_downloader = BodiesDownloaderBuilder::default().build( //! # Arc::new(TestBodiesClient { responder: |_| Ok((PeerId::ZERO, vec![]).into()) }), //! # consensus.clone(), -//! # db.clone() +//! # ProviderFactory::new(db.clone(), MAINNET.clone()) //! # ); //! # let (tip_tx, tip_rx) = watch::channel(B256::default()); //! # let factory = Factory::new(chain_spec.clone());