From 065303a896857ffb7ee3bb67c5317d793a8808b1 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sat, 17 Feb 2024 12:18:40 +0000 Subject: [PATCH 01/33] SnapshotProvider becomes wrapper to Arc --- crates/node-core/src/init.rs | 2 +- crates/snapshot/src/segments/headers.rs | 4 +- crates/snapshot/src/segments/mod.rs | 4 +- crates/snapshot/src/segments/receipts.rs | 4 +- crates/snapshot/src/segments/transactions.rs | 4 +- crates/snapshot/src/snapshotter.rs | 6 +-- crates/stages/src/stages/finish.rs | 1 + crates/stages/src/stages/headers.rs | 2 +- .../provider/src/providers/database/mod.rs | 12 ++--- .../src/providers/database/provider.rs | 16 ++----- .../src/providers/snapshot/manager.rs | 45 ++++++++++++++----- .../provider/src/providers/snapshot/writer.rs | 7 ++- .../src/providers/state/historical.rs | 19 +++----- .../provider/src/providers/state/latest.rs | 9 ++-- 14 files changed, 68 insertions(+), 67 deletions(-) diff --git a/crates/node-core/src/init.rs b/crates/node-core/src/init.rs index 79e1cdb596a9..346d35fdd31f 100644 --- a/crates/node-core/src/init.rs +++ b/crates/node-core/src/init.rs @@ -209,7 +209,7 @@ pub fn insert_genesis_history( /// Inserts header for the genesis state. pub fn insert_genesis_header( tx: &::TXMut, - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, chain: Arc, ) -> ProviderResult<()> { let (header, block_hash) = chain.sealed_genesis_header().split(); diff --git a/crates/snapshot/src/segments/headers.rs b/crates/snapshot/src/segments/headers.rs index 9fb0518a93ee..f8b8f2563f83 100644 --- a/crates/snapshot/src/segments/headers.rs +++ b/crates/snapshot/src/segments/headers.rs @@ -9,7 +9,7 @@ use reth_provider::{ providers::{SnapshotProvider, SnapshotWriter}, DatabaseProviderRO, }; -use std::{ops::RangeInclusive, path::Path, sync::Arc}; +use std::{ops::RangeInclusive, path::Path}; /// Snapshot segment responsible for [SnapshotSegment::Headers] part of data. #[derive(Debug, Default)] @@ -23,7 +23,7 @@ impl Segment for Headers { fn snapshot( &self, provider: DatabaseProviderRO, - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, block_range: RangeInclusive, ) -> ProviderResult<()> { let mut snapshot_writer = diff --git a/crates/snapshot/src/segments/mod.rs b/crates/snapshot/src/segments/mod.rs index e4ca2eb04f3d..de9dfee12f7a 100644 --- a/crates/snapshot/src/segments/mod.rs +++ b/crates/snapshot/src/segments/mod.rs @@ -22,7 +22,7 @@ use reth_primitives::{ BlockNumber, SnapshotSegment, }; use reth_provider::{providers::SnapshotProvider, DatabaseProviderRO, TransactionsProviderExt}; -use std::{ops::RangeInclusive, path::Path, sync::Arc}; +use std::{ops::RangeInclusive, path::Path}; pub(crate) type Rows = [Vec>; COLUMNS]; @@ -36,7 +36,7 @@ pub trait Segment: Send + Sync { fn snapshot( &self, provider: DatabaseProviderRO, - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, block_range: RangeInclusive, ) -> ProviderResult<()>; diff --git a/crates/snapshot/src/segments/receipts.rs b/crates/snapshot/src/segments/receipts.rs index 3310d2615a32..bd6cfdb5aac5 100644 --- a/crates/snapshot/src/segments/receipts.rs +++ b/crates/snapshot/src/segments/receipts.rs @@ -11,7 +11,7 @@ use reth_provider::{ providers::{SnapshotProvider, SnapshotWriter}, BlockReader, DatabaseProviderRO, TransactionsProviderExt, }; -use std::{ops::RangeInclusive, path::Path, sync::Arc}; +use std::{ops::RangeInclusive, path::Path}; /// Snapshot segment responsible for [SnapshotSegment::Receipts] part of data. #[derive(Debug, Default)] @@ -25,7 +25,7 @@ impl Segment for Receipts { fn snapshot( &self, provider: DatabaseProviderRO, - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, block_range: RangeInclusive, ) -> ProviderResult<()> { let mut snapshot_writer = diff --git a/crates/snapshot/src/segments/transactions.rs b/crates/snapshot/src/segments/transactions.rs index 17c5f5e6c37f..11a69fba0dcc 100644 --- a/crates/snapshot/src/segments/transactions.rs +++ b/crates/snapshot/src/segments/transactions.rs @@ -11,7 +11,7 @@ use reth_provider::{ providers::{SnapshotProvider, SnapshotWriter}, BlockReader, DatabaseProviderRO, TransactionsProviderExt, }; -use std::{ops::RangeInclusive, path::Path, sync::Arc}; +use std::{ops::RangeInclusive, path::Path}; /// Snapshot segment responsible for [SnapshotSegment::Transactions] part of data. #[derive(Debug, Default)] @@ -27,7 +27,7 @@ impl Segment for Transactions { fn snapshot( &self, provider: DatabaseProviderRO, - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, block_range: RangeInclusive, ) -> ProviderResult<()> { let mut snapshot_writer = diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index 93be15e5dc20..a75f2f84b7d2 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -10,7 +10,7 @@ use reth_provider::{ ProviderFactory, }; use reth_tokio_util::EventListeners; -use std::{ops::RangeInclusive, sync::Arc, time::Instant}; +use std::{ops::RangeInclusive, time::Instant}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, trace}; @@ -26,7 +26,7 @@ pub struct Snapshotter { /// Provider factory provider_factory: ProviderFactory, /// Snapshot provider - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, /// Pruning configuration for every part of the data that can be pruned. Set by user, and /// needed in [Snapshotter] to prevent snapshotting the prunable data. /// See [Snapshotter::get_snapshot_targets]. @@ -71,7 +71,7 @@ impl Snapshotter { /// Creates a new [Snapshotter]. pub fn new( provider_factory: ProviderFactory, - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, prune_modes: PruneModes, ) -> Self { Self { provider_factory, snapshot_provider, prune_modes, listeners: Default::default() } diff --git a/crates/stages/src/stages/finish.rs b/crates/stages/src/stages/finish.rs index 341be77dd1e1..7efd02114085 100644 --- a/crates/stages/src/stages/finish.rs +++ b/crates/stages/src/stages/finish.rs @@ -21,6 +21,7 @@ impl Stage for FinishStage { _provider: &DatabaseProviderRW, input: ExecInput, ) -> Result { + std::process::exit(1); Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) } diff --git a/crates/stages/src/stages/headers.rs b/crates/stages/src/stages/headers.rs index a02d4c6534a2..6c16464e3d8a 100644 --- a/crates/stages/src/stages/headers.rs +++ b/crates/stages/src/stages/headers.rs @@ -99,7 +99,7 @@ where fn write_headers( &mut self, tx: &::TXMut, - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, ) -> Result { let total_headers = self.header_collector.len(); diff --git a/crates/storage/provider/src/providers/database/mod.rs b/crates/storage/provider/src/providers/database/mod.rs index a10069d945d3..6dc58ad6ab31 100644 --- a/crates/storage/provider/src/providers/database/mod.rs +++ b/crates/storage/provider/src/providers/database/mod.rs @@ -44,7 +44,7 @@ pub struct ProviderFactory { /// Chain spec chain_spec: Arc, /// Snapshot Provider - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, } impl ProviderFactory { @@ -54,11 +54,7 @@ impl ProviderFactory { chain_spec: Arc, snapshots_path: PathBuf, ) -> RethResult> { - Ok(Self { - db, - chain_spec, - snapshot_provider: Arc::new(SnapshotProvider::new(snapshots_path)?), - }) + Ok(Self { db, chain_spec, snapshot_provider: SnapshotProvider::new(snapshots_path)? }) } /// Create new database provider by passing a path. [`ProviderFactory`] will own the database @@ -72,7 +68,7 @@ impl ProviderFactory { Ok(ProviderFactory:: { db: init_db(path, args).map_err(|e| RethError::Custom(e.to_string()))?, chain_spec, - snapshot_provider: Arc::new(SnapshotProvider::new(snapshots_path)?), + snapshot_provider: SnapshotProvider::new(snapshots_path)?, }) } @@ -82,7 +78,7 @@ impl ProviderFactory { } /// Returns snapshot provider - pub fn snapshot_provider(&self) -> Arc { + pub fn snapshot_provider(&self) -> SnapshotProvider { self.snapshot_provider.clone() } diff --git a/crates/storage/provider/src/providers/database/provider.rs b/crates/storage/provider/src/providers/database/provider.rs index 81191d5f7624..170e056d811f 100644 --- a/crates/storage/provider/src/providers/database/provider.rs +++ b/crates/storage/provider/src/providers/database/provider.rs @@ -104,23 +104,19 @@ pub struct DatabaseProvider { /// Chain spec chain_spec: Arc, /// Snapshot provider - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, } impl DatabaseProvider { /// Returns a snapshot provider - pub fn snapshot_provider(&self) -> &Arc { + pub fn snapshot_provider(&self) -> &SnapshotProvider { &self.snapshot_provider } } impl DatabaseProvider { /// Creates a provider with an inner read-write transaction. - pub fn new_rw( - tx: TX, - chain_spec: Arc, - snapshot_provider: Arc, - ) -> Self { + pub fn new_rw(tx: TX, chain_spec: Arc, snapshot_provider: SnapshotProvider) -> Self { Self { tx, chain_spec, snapshot_provider } } } @@ -246,11 +242,7 @@ where impl DatabaseProvider { /// Creates a provider with an inner read-only transaction. - pub fn new( - tx: TX, - chain_spec: Arc, - snapshot_provider: Arc, - ) -> Self { + pub fn new(tx: TX, chain_spec: Arc, snapshot_provider: SnapshotProvider) -> Self { Self { tx, chain_spec, snapshot_provider } } diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index 6edc9db2058a..b965664fb376 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -29,7 +29,7 @@ use reth_primitives::{ }; use std::{ collections::{hash_map::Entry, BTreeMap, HashMap}, - ops::{Range, RangeBounds, RangeInclusive}, + ops::{Deref, Range, RangeBounds, RangeInclusive}, path::{Path, PathBuf}, sync::Arc, }; @@ -39,8 +39,29 @@ use std::{ type SegmentRanges = HashMap>; /// [`SnapshotProvider`] manages all existing [`SnapshotJarProvider`]. +#[derive(Debug, Default, Clone)] +pub struct SnapshotProvider(Arc); + +impl SnapshotProvider { + /// Creates a new [`SnapshotProvider`]. + pub fn new(path: impl AsRef) -> ProviderResult { + let provider = Self(Arc::new(SnapshotProviderInner::new(path)?)); + provider.initialize_index()?; + Ok(provider) + } +} + +impl Deref for SnapshotProvider { + type Target = SnapshotProviderInner; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// [`SnapshotProviderInner`] manages all existing [`SnapshotJarProvider`]. #[derive(Debug, Default)] -pub struct SnapshotProvider { +pub struct SnapshotProviderInner { /// Maintains a map which allows for concurrent access to different `NippyJars`, over different /// segments and ranges. map: DashMap<(BlockNumber, SnapshotSegment), LoadedJar>, @@ -57,9 +78,9 @@ pub struct SnapshotProvider { writers: DashMap>, } -impl SnapshotProvider { - /// Creates a new [`SnapshotProvider`]. - pub fn new(path: impl AsRef) -> ProviderResult { +impl SnapshotProviderInner { + /// Creates a new [`SnapshotProviderInner`]. + fn new(path: impl AsRef) -> ProviderResult { let provider = Self { map: Default::default(), writers: Default::default(), @@ -69,14 +90,16 @@ impl SnapshotProvider { load_filters: false, }; - provider.initialize_index()?; Ok(provider) } - +} +impl SnapshotProvider { /// Loads filters into memory when creating a [`SnapshotJarProvider`]. - pub fn with_filters(mut self) -> Self { - self.load_filters = true; - self + pub fn with_filters(self) -> Self { + let mut provider = + Arc::try_unwrap(self.0).expect("should be called when initializing only."); + provider.load_filters = true; + Self(Arc::new(provider)) } /// Gets the [`SnapshotJarProvider`] of the requested segment and block. @@ -599,7 +622,7 @@ pub trait SnapshotWriter { fn commit(&self) -> ProviderResult<()>; } -impl SnapshotWriter for Arc { +impl SnapshotWriter for SnapshotProvider { fn get_writer( &self, block: BlockNumber, diff --git a/crates/storage/provider/src/providers/snapshot/writer.rs b/crates/storage/provider/src/providers/snapshot/writer.rs index fc8b2d426194..f56c4d5a21b8 100644 --- a/crates/storage/provider/src/providers/snapshot/writer.rs +++ b/crates/storage/provider/src/providers/snapshot/writer.rs @@ -12,7 +12,6 @@ use reth_primitives::{ use std::{ ops::Deref, path::{Path, PathBuf}, - sync::Arc, }; /// Mutable reference to a dashmap element of [`SnapshotProviderRW`]. @@ -21,7 +20,7 @@ pub type SnapshotProviderRWRefMut<'a> = RefMut<'a, SnapshotSegment, SnapshotProv #[derive(Debug)] /// Extends `SnapshotProvider` with writing capabilities pub struct SnapshotProviderRW<'a> { - reader: Arc, + reader: SnapshotProvider, writer: NippyJarWriter<'a, SegmentHeader>, data_path: PathBuf, buf: Vec, @@ -32,7 +31,7 @@ impl<'a> SnapshotProviderRW<'a> { pub fn new( segment: SnapshotSegment, block: BlockNumber, - reader: Arc, + reader: SnapshotProvider, ) -> ProviderResult { let (writer, data_path) = Self::open(segment, block, reader.clone())?; Ok(Self { writer, data_path, buf: Vec::with_capacity(100), reader }) @@ -41,7 +40,7 @@ impl<'a> SnapshotProviderRW<'a> { fn open( segment: SnapshotSegment, block: u64, - reader: Arc, + reader: SnapshotProvider, ) -> ProviderResult<(NippyJarWriter<'a, SegmentHeader>, PathBuf)> { let block_range = find_fixed_range(block); let (jar, path) = diff --git a/crates/storage/provider/src/providers/state/historical.rs b/crates/storage/provider/src/providers/state/historical.rs index 8224346ec204..c9a9a5d57846 100644 --- a/crates/storage/provider/src/providers/state/historical.rs +++ b/crates/storage/provider/src/providers/state/historical.rs @@ -17,7 +17,6 @@ use reth_primitives::{ SnapshotSegment, StorageKey, StorageValue, B256, }; use reth_trie::{updates::TrieUpdates, HashedPostState}; -use std::sync::Arc; /// State provider for a given block number which takes a tx reference. /// @@ -39,7 +38,7 @@ pub struct HistoricalStateProviderRef<'b, TX: DbTx> { /// Lowest blocks at which different parts of the state are available. lowest_available_blocks: LowestAvailableBlocks, /// Snapshot provider - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, } #[derive(Debug, Eq, PartialEq)] @@ -52,11 +51,7 @@ pub enum HistoryInfo { impl<'b, TX: DbTx> HistoricalStateProviderRef<'b, TX> { /// Create new StateProvider for historical block number - pub fn new( - tx: &'b TX, - block_number: BlockNumber, - snapshot_provider: Arc, - ) -> Self { + pub fn new(tx: &'b TX, block_number: BlockNumber, snapshot_provider: SnapshotProvider) -> Self { Self { tx, block_number, lowest_available_blocks: Default::default(), snapshot_provider } } @@ -66,7 +61,7 @@ impl<'b, TX: DbTx> HistoricalStateProviderRef<'b, TX> { tx: &'b TX, block_number: BlockNumber, lowest_available_blocks: LowestAvailableBlocks, - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, ) -> Self { Self { tx, block_number, lowest_available_blocks, snapshot_provider } } @@ -319,16 +314,12 @@ pub struct HistoricalStateProvider { /// Lowest blocks at which different parts of the state are available. lowest_available_blocks: LowestAvailableBlocks, /// Snapshot provider - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, } impl HistoricalStateProvider { /// Create new StateProvider for historical block number - pub fn new( - tx: TX, - block_number: BlockNumber, - snapshot_provider: Arc, - ) -> Self { + pub fn new(tx: TX, block_number: BlockNumber, snapshot_provider: SnapshotProvider) -> Self { Self { tx, block_number, lowest_available_blocks: Default::default(), snapshot_provider } } diff --git a/crates/storage/provider/src/providers/state/latest.rs b/crates/storage/provider/src/providers/state/latest.rs index aeeafa17779d..53d59ec5726b 100644 --- a/crates/storage/provider/src/providers/state/latest.rs +++ b/crates/storage/provider/src/providers/state/latest.rs @@ -13,7 +13,6 @@ use reth_primitives::{ StorageValue, B256, }; use reth_trie::{proof::Proof, updates::TrieUpdates}; -use std::sync::Arc; /// State provider over latest state that takes tx reference. #[derive(Debug)] @@ -21,12 +20,12 @@ pub struct LatestStateProviderRef<'b, TX: DbTx> { /// database transaction db: &'b TX, /// Snapshot provider - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, } impl<'b, TX: DbTx> LatestStateProviderRef<'b, TX> { /// Create new state provider - pub fn new(db: &'b TX, snapshot_provider: Arc) -> Self { + pub fn new(db: &'b TX, snapshot_provider: SnapshotProvider) -> Self { Self { db, snapshot_provider } } } @@ -127,12 +126,12 @@ pub struct LatestStateProvider { /// database transaction db: TX, /// Snapshot provider - snapshot_provider: Arc, + snapshot_provider: SnapshotProvider, } impl LatestStateProvider { /// Create new state provider - pub fn new(db: TX, snapshot_provider: Arc) -> Self { + pub fn new(db: TX, snapshot_provider: SnapshotProvider) -> Self { Self { db, snapshot_provider } } From 8002efd9c0d4cc59c90ba5dfb42db9faa0df683c Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sat, 17 Feb 2024 16:45:46 +0000 Subject: [PATCH 02/33] make transaction_hashes_by_range on static files parallel --- .../src/providers/snapshot/manager.rs | 77 +++++++++++++++---- 1 file changed, 64 insertions(+), 13 deletions(-) diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index b965664fb376..8deb82decf92 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -21,6 +21,7 @@ use reth_db::{ use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_nippy_jar::NippyJar; use reth_primitives::{ + keccak256, snapshot::{find_fixed_range, HighestSnapshots, SegmentHeader, SegmentRangeInclusive}, Address, Block, BlockHash, BlockHashOrNumber, BlockNumber, BlockWithSenders, ChainInfo, Header, Receipt, SealedBlock, SealedBlockWithSenders, SealedHeader, SnapshotSegment, TransactionMeta, @@ -31,7 +32,7 @@ use std::{ collections::{hash_map::Entry, BTreeMap, HashMap}, ops::{Deref, Range, RangeBounds, RangeInclusive}, path::{Path, PathBuf}, - sync::Arc, + sync::{mpsc, Arc}, }; /// Alias type for a map that can be queried for block ranges from a transaction @@ -433,11 +434,11 @@ impl SnapshotProvider { &self, segment: SnapshotSegment, range: Range, - get_fn: F, + mut get_fn: F, mut predicate: P, ) -> ProviderResult> where - F: Fn(&mut SnapshotCursor<'_>, u64) -> ProviderResult>, + F: FnMut(&mut SnapshotCursor<'_>, u64) -> ProviderResult>, P: FnMut(&T) -> bool, { let get_provider = |start: u64| match segment { @@ -772,16 +773,66 @@ impl TransactionsProviderExt for SnapshotProvider { &self, tx_range: Range, ) -> ProviderResult> { - self.fetch_range_with_predicate( - SnapshotSegment::Transactions, - tx_range, - |cursor, number| { - let tx = - cursor.get_one::>(number.into())?; - Ok(tx.map(|tx| (tx.hash(), cursor.number()))) - }, - |_| true, - ) + let tx_range_size = (tx_range.end - tx_range.start) as usize; + let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1); + + let chunks = (tx_range.start..tx_range.end) + .step_by(chunk_size as usize) + .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end)) + .collect::>>(); + let mut channels = Vec::with_capacity(chunk_size); + + #[inline] + fn calculate_hash( + entry: (TxNumber, TransactionSignedNoHash), + rlp_buf: &mut Vec, + ) -> Result<(B256, TxNumber), Box> { + let (tx_id, tx) = entry; + tx.transaction.encode_with_signature(&tx.signature, rlp_buf, false); + Ok((keccak256(rlp_buf), tx_id)) + } + + for chunk_range in chunks { + let (channel_tx, channel_rx) = mpsc::channel(); + channels.push(channel_rx); + + let manager = self.clone(); + + // Spawn the task onto the global rayon pool + // This task will send the results through the channel after it has calculated + // the hash. + rayon::spawn(move || { + let mut rlp_buf = Vec::with_capacity(128); + let _ = manager.fetch_range_with_predicate( + SnapshotSegment::Transactions, + chunk_range, + |cursor, number| { + Ok(cursor + .get_one::>(number.into())? + .map(|transaction| { + rlp_buf.clear(); + let _ = channel_tx.send(calculate_hash( + (cursor.number(), transaction), + &mut rlp_buf, + )); + })) + }, + |_| true, + ); + }); + } + + let mut tx_list = Vec::with_capacity(tx_range_size); + + // Iterate over channels and append the tx hashes unsorted + for channel in channels { + while let Ok(tx) = channel.recv() { + let (tx_hash, tx_id) = tx.map_err(|boxed| *boxed)?; + tx_list.push((tx_hash, tx_id)); + } + } + + Ok(tx_list) } } From da1d5d10abf196aa87fcac04d357a501723b9008 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sun, 18 Feb 2024 12:19:36 +0000 Subject: [PATCH 03/33] adjust sender recovery stage --- crates/stages/src/stages/sender_recovery.rs | 72 +++++++++++---------- 1 file changed, 37 insertions(+), 35 deletions(-) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 67b0fec5f648..0918d8002869 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -3,6 +3,7 @@ use itertools::Itertools; use reth_db::{ cursor::DbCursorRW, database::Database, + snapshot::TransactionMask, tables, transaction::{DbTx, DbTxMut}, RawValue, @@ -11,13 +12,13 @@ use reth_interfaces::{consensus, RethError}; use reth_primitives::{ keccak256, stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, - Address, PruneSegment, TransactionSignedNoHash, TxNumber, + Address, PruneSegment, SnapshotSegment, TransactionSignedNoHash, TxNumber, }; use reth_provider::{ BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader, StatsReader, TransactionsProvider, }; -use std::{fmt::Debug, sync::mpsc}; +use std::{fmt::Debug, ops::Range, sync::mpsc}; use thiserror::Error; use tracing::*; @@ -83,43 +84,50 @@ impl Stage for SenderRecoveryStage { // Acquire the cursor for inserting elements let mut senders_cursor = tx.cursor_write::()?; - // Query the transactions from both database and static files - let transactions = provider.raw_transactions_by_tx_range(tx_range.clone())?; - // Iterate over transactions in chunks info!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders"); - // channels used to return result of sender recovery. - let mut channels = Vec::new(); - // Spawn recovery jobs onto the default rayon threadpool and send the result through the // channel. // - // We try to evenly divide the transactions to recover across all threads in the threadpool. - // Chunks are submitted instead of individual transactions to reduce the overhead of work - // stealing in the threadpool workers. - let chunk_size = self.commit_threshold as usize / rayon::current_num_threads(); - // prevents an edge case - // where the chunk size is either 0 or too small - // to gain anything from using more than 1 thread - let chunk_size = chunk_size.max(16); - - for chunk in &tx_range.zip(transactions).chunks(chunk_size) { + // Transactions are different size, so chunks will not all take the processing time. If + // chunks are too big, there will be idle threads waiting for work. Choosing an + // arbitrary smaller value to make sure it doesn't happen. + let chunk_size = 100; + + let chunks = (tx_range.start..tx_range.end) + .step_by(chunk_size as usize) + .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end)) + .collect::>>(); + + let mut channels = Vec::with_capacity(chunks.len()); + for chunk_range in chunks { // An _unordered_ channel to receive results from a rayon job let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel(); channels.push(recovered_senders_rx); - // Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send) - let chunk: Vec<_> = chunk.collect(); - // Spawn the sender recovery task onto the global rayon pool - // This task will send the results through the channel after it recovered the senders. + let manager = provider.snapshot_provider().clone(); + + // Spawn the task onto the global rayon pool + // This task will send the results through the channel after it has read the transaction + // and calculated the sender. rayon::spawn(move || { let mut rlp_buf = Vec::with_capacity(128); - for entry in chunk { - rlp_buf.clear(); - let recovery_result = recover_sender(entry, &mut rlp_buf); - let _ = recovered_senders_tx.send(recovery_result); - } + let _ = manager.fetch_range_with_predicate( + SnapshotSegment::Transactions, + chunk_range, + |cursor, number| { + if let Some(tx) = cursor + .get_one::>(number.into())? + { + rlp_buf.clear(); + let _ = recovered_senders_tx + .send(recover_sender((cursor.number(), tx), &mut rlp_buf)); + } + Ok(Some(())) + }, + |_| true, + ); }); } @@ -185,17 +193,11 @@ impl Stage for SenderRecoveryStage { } } +#[inline] fn recover_sender( - (tx_id, tx): (TxNumber, RawValue), + (tx_id, tx): (TxNumber, TransactionSignedNoHash), rlp_buf: &mut Vec, ) -> Result<(u64, Address), Box> { - let tx = tx - .value() - .map_err(RethError::from) - .map_err(StageError::from) - .map_err(Into::into) - .map_err(Box::new)?; - tx.transaction.encode_without_signature(rlp_buf); // We call [Signature::recover_signer_unchecked] because transactions run in the pipeline are From fe6b37714f814ff29548879c0f084db4222dbd79 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sun, 18 Feb 2024 12:21:10 +0000 Subject: [PATCH 04/33] chunk_size for parallel work on recovery and hashing is 100 --- crates/stages/src/stages/sender_recovery.rs | 2 +- crates/storage/provider/src/providers/snapshot/manager.rs | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 0918d8002869..43cafa8f06e1 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -90,7 +90,7 @@ impl Stage for SenderRecoveryStage { // Spawn recovery jobs onto the default rayon threadpool and send the result through the // channel. // - // Transactions are different size, so chunks will not all take the processing time. If + // Transactions are different size, so chunks will not all take the same processing time. If // chunks are too big, there will be idle threads waiting for work. Choosing an // arbitrary smaller value to make sure it doesn't happen. let chunk_size = 100; diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index 8deb82decf92..6e2b0d7fc318 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -774,7 +774,11 @@ impl TransactionsProviderExt for SnapshotProvider { tx_range: Range, ) -> ProviderResult> { let tx_range_size = (tx_range.end - tx_range.start) as usize; - let chunk_size = (tx_range_size / rayon::current_num_threads()).max(1); + + // Transactions are different size, so chunks will not all take the same processing time. If + // chunks are too big, there will be idle threads waiting for work. Choosing an + // arbitrary smaller value to make sure it doesn't happen. + let chunk_size = 100; let chunks = (tx_range.start..tx_range.end) .step_by(chunk_size as usize) From 30fbfe9c2cef2670ac64d7d231c20f6ac5e32437 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sun, 18 Feb 2024 12:22:05 +0000 Subject: [PATCH 05/33] remove testing code --- crates/stages/src/stages/finish.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/stages/src/stages/finish.rs b/crates/stages/src/stages/finish.rs index 7efd02114085..341be77dd1e1 100644 --- a/crates/stages/src/stages/finish.rs +++ b/crates/stages/src/stages/finish.rs @@ -21,7 +21,6 @@ impl Stage for FinishStage { _provider: &DatabaseProviderRW, input: ExecInput, ) -> Result { - std::process::exit(1); Ok(ExecOutput { checkpoint: StageCheckpoint::new(input.target()), done: true }) } From cdaf75863718bb7bbcf46305bdded3718802e54a Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sun, 18 Feb 2024 12:53:07 +0000 Subject: [PATCH 06/33] clippy --- crates/stages/src/stages/sender_recovery.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 43cafa8f06e1..760b33606d7a 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -1,14 +1,12 @@ use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; -use itertools::Itertools; use reth_db::{ cursor::DbCursorRW, database::Database, snapshot::TransactionMask, tables, transaction::{DbTx, DbTxMut}, - RawValue, }; -use reth_interfaces::{consensus, RethError}; +use reth_interfaces::consensus; use reth_primitives::{ keccak256, stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, @@ -16,7 +14,7 @@ use reth_primitives::{ }; use reth_provider::{ BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader, - StatsReader, TransactionsProvider, + StatsReader, }; use std::{fmt::Debug, ops::Range, sync::mpsc}; use thiserror::Error; From 7f736317ca59a5559ce215af56f418b61471dfdc Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sun, 18 Feb 2024 13:17:16 +0000 Subject: [PATCH 07/33] add timing to stage run cli --- bin/reth/src/commands/stage/run.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index 9923bc360c94..b83dfef684ef 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -28,7 +28,7 @@ use reth_stages::{ }, ExecInput, Stage, StageExt, UnwindInput, }; -use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc}; +use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc, time::Instant}; use tracing::*; /// `reth stage` command @@ -269,6 +269,8 @@ impl Command { checkpoint: Some(checkpoint.with_block_number(self.from)), }; + let start = Instant::now(); + info!(target: "reth::cli", stage= ?self.stage, "Executing stage.", ); loop { exec_stage.execute_ready(input).await?; let output = exec_stage.execute(&provider_rw, input)?; @@ -284,6 +286,7 @@ impl Command { break } } + info!(target: "reth::cli", stage= ?self.stage, time = ?start.elapsed().as_secs(), "Finished stage."); Ok(()) } From 7c1116f43ec429bb4c2bb4af206b4b8d28fb0bfc Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sun, 18 Feb 2024 14:07:49 +0000 Subject: [PATCH 08/33] clippy --- crates/storage/provider/src/providers/snapshot/manager.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index 6e2b0d7fc318..832b3a7a6e73 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -781,7 +781,7 @@ impl TransactionsProviderExt for SnapshotProvider { let chunk_size = 100; let chunks = (tx_range.start..tx_range.end) - .step_by(chunk_size as usize) + .step_by(chunk_size) .map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end)) .collect::>>(); let mut channels = Vec::with_capacity(chunk_size); From 44a43460fa944341df1270a4a8028a277c6ce1be Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sun, 18 Feb 2024 14:59:20 +0000 Subject: [PATCH 09/33] fix recovery tests --- crates/stages/src/stages/sender_recovery.rs | 4 ++-- crates/stages/src/test_utils/test_db.rs | 9 ++++++--- .../storage/provider/src/providers/snapshot/manager.rs | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 760b33606d7a..a6144b4e414d 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -120,7 +120,7 @@ impl Stage for SenderRecoveryStage { { rlp_buf.clear(); let _ = recovered_senders_tx - .send(recover_sender((cursor.number(), tx), &mut rlp_buf)); + .send(recover_sender((number, tx), &mut rlp_buf)); } Ok(Some(())) }, @@ -330,7 +330,7 @@ mod tests { random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..4); // set tx count range high enough to hit the threshold runner.db.insert_blocks(seed.iter(), None).expect("failed to seed execution"); - let total_transactions = runner.db.table::().unwrap().len() as u64; + let total_transactions = runner.db.factory.snapshot_provider().count_entries::().unwrap() as u64; let first_input = ExecInput { target: Some(previous_stage), diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 28108e4e3ca2..8bcb0213fc61 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -200,12 +200,13 @@ impl TestStageDB { I: Iterator, { let provider = self.factory.snapshot_provider(); - let mut writer = provider.latest_writer(reth_primitives::SnapshotSegment::Headers)?; + let mut txs_writer = provider.latest_writer(reth_primitives::SnapshotSegment::Transactions)?; + let mut headers_writer = provider.latest_writer(reth_primitives::SnapshotSegment::Headers)?; let tx = self.factory.provider_rw().unwrap().into_tx(); let mut next_tx_num = tx_offset.unwrap_or_default(); blocks.into_iter().try_for_each(|block| { - Self::insert_header(Some(&mut writer), &tx, &block.header, U256::ZERO)?; + Self::insert_header(Some(&mut headers_writer), &tx, &block.header, U256::ZERO)?; // Insert into body tables. let block_body_indices = StoredBlockBodyIndices { @@ -220,13 +221,15 @@ impl TestStageDB { block.body.iter().try_for_each(|body_tx| { tx.put::(next_tx_num, body_tx.clone().into())?; + txs_writer.append_transaction(next_tx_num, body_tx.clone().into())?; next_tx_num += 1; Ok::<(), ProviderError>(()) }) })?; tx.commit()?; - writer.commit() + headers_writer.commit()?; + txs_writer.commit() } pub fn insert_tx_hash_numbers(&self, tx_hash_numbers: I) -> ProviderResult<()> diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index 832b3a7a6e73..8b8e4fae79f2 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -816,7 +816,7 @@ impl TransactionsProviderExt for SnapshotProvider { .map(|transaction| { rlp_buf.clear(); let _ = channel_tx.send(calculate_hash( - (cursor.number(), transaction), + (number, transaction), &mut rlp_buf, )); })) From 28086cf182ab542b13abac66abfcf608c2d7d423 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sun, 18 Feb 2024 15:23:29 +0000 Subject: [PATCH 10/33] further fixes --- crates/stages/src/stages/sender_recovery.rs | 17 +++++++++-------- crates/stages/src/test_utils/test_db.rs | 7 ++++--- .../provider/src/providers/snapshot/manager.rs | 6 ++---- 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index a6144b4e414d..27e7058daae4 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -115,14 +115,13 @@ impl Stage for SenderRecoveryStage { SnapshotSegment::Transactions, chunk_range, |cursor, number| { - if let Some(tx) = cursor + Ok(cursor .get_one::>(number.into())? - { - rlp_buf.clear(); - let _ = recovered_senders_tx - .send(recover_sender((number, tx), &mut rlp_buf)); - } - Ok(Some(())) + .map(|tx| { + rlp_buf.clear(); + let _ = recovered_senders_tx + .send(recover_sender((number, tx), &mut rlp_buf)); + })) }, |_| true, ); @@ -330,7 +329,9 @@ mod tests { random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..4); // set tx count range high enough to hit the threshold runner.db.insert_blocks(seed.iter(), None).expect("failed to seed execution"); - let total_transactions = runner.db.factory.snapshot_provider().count_entries::().unwrap() as u64; + let total_transactions = + runner.db.factory.snapshot_provider().count_entries::().unwrap() + as u64; let first_input = ExecInput { target: Some(previous_stage), diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 8bcb0213fc61..31582f4ff9b0 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -200,8 +200,10 @@ impl TestStageDB { I: Iterator, { let provider = self.factory.snapshot_provider(); - let mut txs_writer = provider.latest_writer(reth_primitives::SnapshotSegment::Transactions)?; - let mut headers_writer = provider.latest_writer(reth_primitives::SnapshotSegment::Headers)?; + let mut txs_writer = + provider.latest_writer(reth_primitives::SnapshotSegment::Transactions)?; + let mut headers_writer = + provider.latest_writer(reth_primitives::SnapshotSegment::Headers)?; let tx = self.factory.provider_rw().unwrap().into_tx(); let mut next_tx_num = tx_offset.unwrap_or_default(); @@ -220,7 +222,6 @@ impl TestStageDB { tx.put::(block.number, block_body_indices)?; block.body.iter().try_for_each(|body_tx| { - tx.put::(next_tx_num, body_tx.clone().into())?; txs_writer.append_transaction(next_tx_num, body_tx.clone().into())?; next_tx_num += 1; Ok::<(), ProviderError>(()) diff --git a/crates/storage/provider/src/providers/snapshot/manager.rs b/crates/storage/provider/src/providers/snapshot/manager.rs index 8b8e4fae79f2..499032a3d714 100644 --- a/crates/storage/provider/src/providers/snapshot/manager.rs +++ b/crates/storage/provider/src/providers/snapshot/manager.rs @@ -815,10 +815,8 @@ impl TransactionsProviderExt for SnapshotProvider { .get_one::>(number.into())? .map(|transaction| { rlp_buf.clear(); - let _ = channel_tx.send(calculate_hash( - (number, transaction), - &mut rlp_buf, - )); + let _ = channel_tx + .send(calculate_hash((number, transaction), &mut rlp_buf)); })) }, |_| true, From 68fbd1c3ff52c6a59cfc5d2d86d2fec31346fcfe Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sun, 18 Feb 2024 15:25:09 +0000 Subject: [PATCH 11/33] add etl to tx-lookup --- Cargo.lock | 1 + crates/etl/Cargo.toml | 1 + crates/etl/src/lib.rs | 3 +- crates/stages/src/stages/tx_lookup.rs | 81 +++++++++++++++++---------- 4 files changed, 56 insertions(+), 30 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3f33b92267b2..728f904c1101 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6144,6 +6144,7 @@ dependencies = [ name = "reth-etl" version = "0.1.0-alpha.18" dependencies = [ + "rayon", "reth-db", "reth-primitives", "tempfile", diff --git a/crates/etl/Cargo.toml b/crates/etl/Cargo.toml index 5b7724f03ae8..07af6c72968f 100644 --- a/crates/etl/Cargo.toml +++ b/crates/etl/Cargo.toml @@ -11,6 +11,7 @@ exclude.workspace = true [dependencies] tempfile.workspace = true reth-db.workspace = true +rayon.workspace = true [dev-dependencies] reth-primitives.workspace = true diff --git a/crates/etl/src/lib.rs b/crates/etl/src/lib.rs index 6ff6fc0c5a1e..4f9e0ee52baa 100644 --- a/crates/etl/src/lib.rs +++ b/crates/etl/src/lib.rs @@ -23,6 +23,7 @@ use std::{ sync::Arc, }; +use rayon::prelude::*; use reth_db::table::{Compress, Encode, Key, Value}; use tempfile::{NamedTempFile, TempDir}; @@ -99,7 +100,7 @@ where fn flush(&mut self) { self.buffer_size_bytes = 0; - self.buffer.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + self.buffer.par_sort_unstable_by(|a, b| a.0.cmp(&b.0)); let mut buf = Vec::with_capacity(self.buffer.len()); std::mem::swap(&mut buf, &mut self.buffer); self.files.push(EtlFile::new(self.dir.path(), buf).expect("could not flush data to disk")) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 5a925be688bc..68842f202dec 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -1,20 +1,25 @@ +use std::sync::Arc; use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; +use num_traits::Zero; use rayon::prelude::*; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, tables, transaction::{DbTx, DbTxMut}, + RawKey, RawValue, }; +use reth_etl::Collector; use reth_interfaces::provider::ProviderError; use reth_primitives::{ stage::{EntitiesCheckpoint, StageCheckpoint, StageId}, - PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, + PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, TxHash, TxNumber, }; use reth_provider::{ BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, StatsReader, TransactionsProviderExt, }; +use tempfile::TempDir; use tracing::*; /// The transaction lookup stage. @@ -92,43 +97,61 @@ impl Stage for TransactionLookupStage { return Ok(ExecOutput::done(input.checkpoint())) } - let (tx_range, block_range, is_final_range) = - input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?; - let end_block = *block_range.end(); + // 500MB temporary files + let mut hash_collector: Collector = + Collector::new(Arc::new(TempDir::new()?), 500 * (1024 * 1024)); - debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Updating transaction lookup"); + loop { + let (tx_range, block_range, is_final_range) = input + .next_block_range_with_transaction_threshold(provider, self.commit_threshold)?; - let mut tx_list = provider.transaction_hashes_by_range(tx_range)?; + let end_block = *block_range.end(); - // Sort before inserting the reverse lookup for hash -> tx_id. - tx_list.par_sort_unstable_by(|txa, txb| txa.0.cmp(&txb.0)); + debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Updating transaction lookup"); - let tx = provider.tx_ref(); - let mut txhash_cursor = tx.cursor_write::()?; - - // If the last inserted element in the database is equal or bigger than the first - // in our set, then we need to insert inside the DB. If it is smaller then last - // element in the DB, we can append to the DB. - // Append probably only ever happens during sync, on the first table insertion. - let insert = tx_list - .first() - .zip(txhash_cursor.last()?) - .map(|((first, _), (last, _))| first <= &last) - .unwrap_or_default(); - // if txhash_cursor.last() is None we will do insert. `zip` would return none if any item is - // none. if it is some and if first is smaller than last, we will do append. - for (tx_hash, id) in tx_list { - if insert { - txhash_cursor.insert(tx_hash, id)?; - } else { - txhash_cursor.append(tx_hash, id)?; + for (key, value) in provider.transaction_hashes_by_range(tx_range)? { + hash_collector.insert(key, value); + } + + input.checkpoint = Some( + StageCheckpoint::new(end_block) + .with_entities_stage_checkpoint(stage_checkpoint(provider)?), + ); + + if is_final_range { + let tx = provider.tx_ref(); + let append_only = provider.count_entries::()?.is_zero(); + let mut txhash_cursor = + tx.cursor_write::>()?; + + let total_hashes = hash_collector.len(); + let interval = (total_hashes / 10).max(1); + for (index, hash_to_number) in hash_collector.iter()?.enumerate() { + let (hash, number) = hash_to_number?; + if index > 0 && index % interval == 0 { + info!(target: "sync::stages::transaction_lookup", ?append_only, progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0), "Writing transaction hash index"); + } + + if append_only { + txhash_cursor.append( + RawKey::::from_vec(hash), + RawValue::::from_vec(number), + )?; + } else { + txhash_cursor.insert( + RawKey::::from_vec(hash), + RawValue::::from_vec(number), + )?; + } + } + break } } Ok(ExecOutput { - checkpoint: StageCheckpoint::new(end_block) + checkpoint: StageCheckpoint::new(input.target()) .with_entities_stage_checkpoint(stage_checkpoint(provider)?), - done: is_final_range, + done: true, }) } From b72d8f69c0390631e0607d8d9a5b8b077470ea3b Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sun, 18 Feb 2024 15:30:39 +0000 Subject: [PATCH 12/33] clippy --- crates/stages/src/stages/tx_lookup.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 68842f202dec..180ffccec8b1 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -1,7 +1,5 @@ -use std::sync::Arc; use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput}; use num_traits::Zero; -use rayon::prelude::*; use reth_db::{ cursor::{DbCursorRO, DbCursorRW}, database::Database, @@ -19,6 +17,7 @@ use reth_provider::{ BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, StatsReader, TransactionsProviderExt, }; +use std::sync::Arc; use tempfile::TempDir; use tracing::*; From 5933fffc013462bcc3c6d002093c1a4e4f48eccd Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Sun, 18 Feb 2024 15:34:08 +0000 Subject: [PATCH 13/33] Update bin/reth/src/commands/stage/run.rs Co-authored-by: Matthias Seitz --- bin/reth/src/commands/stage/run.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index b83dfef684ef..4094f2985ff3 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -286,7 +286,7 @@ impl Command { break } } - info!(target: "reth::cli", stage= ?self.stage, time = ?start.elapsed().as_secs(), "Finished stage."); + info!(target: "reth::cli", stage= ?self.stage, time = ?start.elapsed(), "Finished stage."); Ok(()) } From ddaf070eaf86bde08fa05a23cc2babb3f135aa59 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Sun, 18 Feb 2024 15:34:14 +0000 Subject: [PATCH 14/33] Update bin/reth/src/commands/stage/run.rs Co-authored-by: Matthias Seitz --- bin/reth/src/commands/stage/run.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index 4094f2985ff3..c9b6dc825e2f 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -270,7 +270,7 @@ impl Command { }; let start = Instant::now(); - info!(target: "reth::cli", stage= ?self.stage, "Executing stage.", ); + info!(target: "reth::cli", stage= ?self.stage, "Executing stage."); loop { exec_stage.execute_ready(input).await?; let output = exec_stage.execute(&provider_rw, input)?; From eb690ed40fc727c00ff88b2f39c89b3bd2122cd1 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Sun, 18 Feb 2024 15:36:21 +0000 Subject: [PATCH 15/33] call cursor directly --- crates/stages/src/stages/tx_lookup.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 180ffccec8b1..a1575e96f7cd 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -118,10 +118,9 @@ impl Stage for TransactionLookupStage { ); if is_final_range { - let tx = provider.tx_ref(); let append_only = provider.count_entries::()?.is_zero(); let mut txhash_cursor = - tx.cursor_write::>()?; + provider.tx_ref().cursor_write::>()?; let total_hashes = hash_collector.len(); let interval = (total_hashes / 10).max(1); From 1e762dfb774f6973cc529efb901dcd5be466bb37 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 12:10:23 +0000 Subject: [PATCH 16/33] change insert_blocks so it can insert to static and db --- crates/prune/src/segments/account_history.rs | 2 +- crates/prune/src/segments/receipts.rs | 2 +- crates/prune/src/segments/receipts_by_logs.rs | 2 +- crates/prune/src/segments/sender_recovery.rs | 2 +- crates/prune/src/segments/storage_history.rs | 2 +- .../prune/src/segments/transaction_lookup.rs | 2 +- crates/prune/src/segments/transactions.rs | 2 +- crates/snapshot/src/snapshotter.rs | 2 +- crates/stages/src/test_utils/test_db.rs | 30 ++++++++++++++----- 9 files changed, 31 insertions(+), 15 deletions(-) diff --git a/crates/prune/src/segments/account_history.rs b/crates/prune/src/segments/account_history.rs index bfebad1a95c0..b92cc5cca41c 100644 --- a/crates/prune/src/segments/account_history.rs +++ b/crates/prune/src/segments/account_history.rs @@ -99,7 +99,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 1..=5000, B256::ZERO, 0..1); - db.insert_blocks(blocks.iter(), None).expect("insert blocks"); + db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); let accounts = random_eoa_account_range(&mut rng, 0..2).into_iter().collect::>(); diff --git a/crates/prune/src/segments/receipts.rs b/crates/prune/src/segments/receipts.rs index fdd4d0402e40..1f50c0a9e61e 100644 --- a/crates/prune/src/segments/receipts.rs +++ b/crates/prune/src/segments/receipts.rs @@ -108,7 +108,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3); - db.insert_blocks(blocks.iter(), None).expect("insert blocks"); + db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); let mut receipts = Vec::new(); for block in &blocks { diff --git a/crates/prune/src/segments/receipts_by_logs.rs b/crates/prune/src/segments/receipts_by_logs.rs index cb9b6b84dc1f..4ca991c92573 100644 --- a/crates/prune/src/segments/receipts_by_logs.rs +++ b/crates/prune/src/segments/receipts_by_logs.rs @@ -232,7 +232,7 @@ mod tests { random_block_range(&mut rng, (tip - 100 + 1)..=tip, B256::ZERO, 1..5), ] .concat(); - db.insert_blocks(blocks.iter(), None).expect("insert blocks"); + db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); let mut receipts = Vec::new(); diff --git a/crates/prune/src/segments/sender_recovery.rs b/crates/prune/src/segments/sender_recovery.rs index ec2d189f55cc..f663b3a320ac 100644 --- a/crates/prune/src/segments/sender_recovery.rs +++ b/crates/prune/src/segments/sender_recovery.rs @@ -90,7 +90,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3); - db.insert_blocks(blocks.iter(), None).expect("insert blocks"); + db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); let mut transaction_senders = Vec::new(); for block in &blocks { diff --git a/crates/prune/src/segments/storage_history.rs b/crates/prune/src/segments/storage_history.rs index 45713760c7da..4380311e91b8 100644 --- a/crates/prune/src/segments/storage_history.rs +++ b/crates/prune/src/segments/storage_history.rs @@ -103,7 +103,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 0..=5000, B256::ZERO, 0..1); - db.insert_blocks(blocks.iter(), None).expect("insert blocks"); + db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); let accounts = random_eoa_account_range(&mut rng, 0..2).into_iter().collect::>(); diff --git a/crates/prune/src/segments/transaction_lookup.rs b/crates/prune/src/segments/transaction_lookup.rs index 342a764a68a6..88b74b1dafdd 100644 --- a/crates/prune/src/segments/transaction_lookup.rs +++ b/crates/prune/src/segments/transaction_lookup.rs @@ -113,7 +113,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3); - db.insert_blocks(blocks.iter(), None).expect("insert blocks"); + db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); let mut tx_hash_numbers = Vec::new(); for block in &blocks { diff --git a/crates/prune/src/segments/transactions.rs b/crates/prune/src/segments/transactions.rs index 7155cd8888ad..b8b7b5d41265 100644 --- a/crates/prune/src/segments/transactions.rs +++ b/crates/prune/src/segments/transactions.rs @@ -89,7 +89,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 1..=100, B256::ZERO, 2..3); - db.insert_blocks(blocks.iter(), None).expect("insert blocks"); + db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); let transactions = blocks.iter().flat_map(|block| &block.body).collect::>(); diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index a75f2f84b7d2..8304412a273f 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -206,7 +206,7 @@ mod tests { let db = TestStageDB::default(); let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3); - db.insert_blocks(blocks.iter(), None).expect("insert blocks"); + db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); // Unwind headers from snapshots and manually insert them into the database, so we're able // to check that snapshotter works db.factory diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 31582f4ff9b0..df66ceee3ea5 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -194,14 +194,19 @@ impl TestStageDB { /// Insert ordered collection of [SealedBlock] into corresponding tables. /// Superset functionality of [TestStageDB::insert_headers]. /// + /// If tx_offset is set to `None`, then transactions will be stored on static files, otherwise + /// database. + /// /// Assumes that there's a single transition for each transaction (i.e. no block rewards). pub fn insert_blocks<'a, I>(&self, blocks: I, tx_offset: Option) -> ProviderResult<()> where I: Iterator, { let provider = self.factory.snapshot_provider(); - let mut txs_writer = - provider.latest_writer(reth_primitives::SnapshotSegment::Transactions)?; + let mut txs_writer = tx_offset.map_or_else( + || Some(provider.latest_writer(reth_primitives::SnapshotSegment::Transactions)?), + |_| None, + ); let mut headers_writer = provider.latest_writer(reth_primitives::SnapshotSegment::Headers)?; let tx = self.factory.provider_rw().unwrap().into_tx(); @@ -221,16 +226,27 @@ impl TestStageDB { } tx.put::(block.number, block_body_indices)?; - block.body.iter().try_for_each(|body_tx| { - txs_writer.append_transaction(next_tx_num, body_tx.clone().into())?; + let res = block.body.iter().try_for_each(|body_tx| { + if let Some(txs_writer) = &mut txs_writer { + txs_writer.append_transaction(next_tx_num, body_tx.clone().into())?; + } else { + tx.put::(next_tx_num, body_tx.clone().into())? + } next_tx_num += 1; Ok::<(), ProviderError>(()) - }) + }); + + if let Some(txs_writer) = &mut txs_writer { + txs_writer.increment_block(reth_primitives::SnapshotSegment::Transactions)?; + } + res })?; tx.commit()?; - headers_writer.commit()?; - txs_writer.commit() + if let Some(txs_writer) = &mut txs_writer { + txs_writer.commit()?; + } + headers_writer.commit() } pub fn insert_tx_hash_numbers(&self, tx_hash_numbers: I) -> ProviderResult<()> From 9c50ed3640296642644ed114a16fd69ada0f053e Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 20 Feb 2024 12:39:06 +0000 Subject: [PATCH 17/33] Update bin/reth/src/commands/stage/run.rs Co-authored-by: Alexey Shekhirin --- bin/reth/src/commands/stage/run.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index c9b6dc825e2f..69480d45997a 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -270,7 +270,7 @@ impl Command { }; let start = Instant::now(); - info!(target: "reth::cli", stage= ?self.stage, "Executing stage."); + info!(target: "reth::cli", stage = %self.stage, "Executing stage"); loop { exec_stage.execute_ready(input).await?; let output = exec_stage.execute(&provider_rw, input)?; From a849870499b3c4f2b805d07c6549e1af3091d6d9 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Tue, 20 Feb 2024 12:39:12 +0000 Subject: [PATCH 18/33] Update bin/reth/src/commands/stage/run.rs Co-authored-by: Alexey Shekhirin --- bin/reth/src/commands/stage/run.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/reth/src/commands/stage/run.rs b/bin/reth/src/commands/stage/run.rs index 69480d45997a..114606e0430a 100644 --- a/bin/reth/src/commands/stage/run.rs +++ b/bin/reth/src/commands/stage/run.rs @@ -286,7 +286,7 @@ impl Command { break } } - info!(target: "reth::cli", stage= ?self.stage, time = ?start.elapsed(), "Finished stage."); + info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage"); Ok(()) } From b74ef80eb2e8cc13cf7ec3620f61a9ed9df9e863 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 12:40:23 +0000 Subject: [PATCH 19/33] rename to snapshot provider --- crates/stages/src/stages/sender_recovery.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 27e7058daae4..aaa4293c87bf 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -104,14 +104,14 @@ impl Stage for SenderRecoveryStage { let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel(); channels.push(recovered_senders_rx); - let manager = provider.snapshot_provider().clone(); + let snapshot_provider = provider.snapshot_provider().clone(); // Spawn the task onto the global rayon pool // This task will send the results through the channel after it has read the transaction // and calculated the sender. rayon::spawn(move || { let mut rlp_buf = Vec::with_capacity(128); - let _ = manager.fetch_range_with_predicate( + let _ = snapshot_provider.fetch_range_with_predicate( SnapshotSegment::Transactions, chunk_range, |cursor, number| { From b9962297f02126d64c7f3c6d7bf8138b3ab662b7 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 12:50:49 +0000 Subject: [PATCH 20/33] add StorageKind to test_db --- crates/stages/src/test_utils/test_db.rs | 38 +++++++++++++++++++++---- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index df66ceee3ea5..699729304951 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -198,20 +198,21 @@ impl TestStageDB { /// database. /// /// Assumes that there's a single transition for each transaction (i.e. no block rewards). - pub fn insert_blocks<'a, I>(&self, blocks: I, tx_offset: Option) -> ProviderResult<()> + pub fn insert_blocks<'a, I>(&self, blocks: I, storage_kind: StorageKind) -> ProviderResult<()> where I: Iterator, { let provider = self.factory.snapshot_provider(); - let mut txs_writer = tx_offset.map_or_else( - || Some(provider.latest_writer(reth_primitives::SnapshotSegment::Transactions)?), - |_| None, - ); + + let mut txs_writer = storage_kind.is_static().then(|| { + provider.latest_writer(reth_primitives::SnapshotSegment::Transactions).unwrap() + }); + let mut headers_writer = provider.latest_writer(reth_primitives::SnapshotSegment::Headers)?; let tx = self.factory.provider_rw().unwrap().into_tx(); - let mut next_tx_num = tx_offset.unwrap_or_default(); + let mut next_tx_num = storage_kind.tx_offset(); blocks.into_iter().try_for_each(|block| { Self::insert_header(Some(&mut headers_writer), &tx, &block.header, U256::ZERO)?; @@ -385,3 +386,28 @@ impl TestStageDB { Ok(()) } } + +/// Used to identify where to store data when setting up a test. +#[derive(Debug)] +pub enum StorageKind { + Database(Option), + Static, +} + +impl StorageKind { + #[allow(dead_code)] + fn is_database(&self) -> bool { + matches!(self, Self::Database(_)) + } + + fn is_static(&self) -> bool { + matches!(self, Self::Database(_)) + } + + fn tx_offset(&self) -> u64 { + if let Self::Database(offset) = self { + return offset.unwrap_or_default() + } + 0 + } +} From f210db85f7920120b5fc3ea9f186d470818f8e74 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 13:56:35 +0000 Subject: [PATCH 21/33] add derive_more::Display to StageEnum & set it to workspace on crates --- Cargo.lock | 1 + crates/net/eth-wire/Cargo.toml | 2 +- crates/node-core/Cargo.toml | 1 + crates/node-core/src/args/stage_args.rs | 3 ++- crates/primitives/Cargo.toml | 2 +- crates/rpc/rpc/Cargo.toml | 2 +- crates/storage/libmdbx-rs/Cargo.toml | 2 +- crates/storage/nippy-jar/Cargo.toml | 2 +- crates/trie/Cargo.toml | 2 +- 9 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3f33b92267b2..eb4be478f114 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6411,6 +6411,7 @@ dependencies = [ "assert_matches", "clap", "const-str", + "derive_more", "dirs-next", "eyre", "futures", diff --git a/crates/net/eth-wire/Cargo.toml b/crates/net/eth-wire/Cargo.toml index daf29b13816d..c7de737665e3 100644 --- a/crates/net/eth-wire/Cargo.toml +++ b/crates/net/eth-wire/Cargo.toml @@ -25,7 +25,7 @@ reth-metrics.workspace = true metrics.workspace = true bytes.workspace = true -derive_more = "0.99.17" +derive_more.workspace = true thiserror.workspace = true serde = { workspace = true, optional = true } tokio = { workspace = true, features = ["full"] } diff --git a/crates/node-core/Cargo.toml b/crates/node-core/Cargo.toml index b8d716b63dad..154b55918283 100644 --- a/crates/node-core/Cargo.toml +++ b/crates/node-core/Cargo.toml @@ -67,6 +67,7 @@ thiserror.workspace = true const-str = "0.5.6" rand.workspace = true pin-project.workspace = true +derive_more.workspace = true # io dirs-next = "2.0.0" diff --git a/crates/node-core/src/args/stage_args.rs b/crates/node-core/src/args/stage_args.rs index 46618ff2f726..cdbeceb0a0ef 100644 --- a/crates/node-core/src/args/stage_args.rs +++ b/crates/node-core/src/args/stage_args.rs @@ -1,9 +1,10 @@ //! Shared arguments related to stages +use derive_more::Display; /// Represents a specific stage within the data pipeline. /// /// Different stages within the pipeline have dedicated functionalities and operations. -#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, clap::ValueEnum)] +#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, clap::ValueEnum, Display)] pub enum StageEnum { /// The headers stage within the pipeline. /// diff --git a/crates/primitives/Cargo.toml b/crates/primitives/Cargo.toml index 0e611300e08f..e0dbc5150712 100644 --- a/crates/primitives/Cargo.toml +++ b/crates/primitives/Cargo.toml @@ -41,7 +41,7 @@ tracing.workspace = true bytes.workspace = true byteorder = "1" clap = { workspace = true, features = ["derive"], optional = true } -derive_more = "0.99" +derive_more.workspace = true itertools.workspace = true modular-bitfield = "0.11.2" num_enum = "0.7" diff --git a/crates/rpc/rpc/Cargo.toml b/crates/rpc/rpc/Cargo.toml index 517c7ca15742..566fa90eec36 100644 --- a/crates/rpc/rpc/Cargo.toml +++ b/crates/rpc/rpc/Cargo.toml @@ -81,7 +81,7 @@ tracing.workspace = true tracing-futures = "0.2" schnellru.workspace = true futures.workspace = true -derive_more = "0.99" +derive_more.workspace = true lazy_static = "*" [dev-dependencies] diff --git a/crates/storage/libmdbx-rs/Cargo.toml b/crates/storage/libmdbx-rs/Cargo.toml index 2a7bc2b94a6f..2330b6f79e47 100644 --- a/crates/storage/libmdbx-rs/Cargo.toml +++ b/crates/storage/libmdbx-rs/Cargo.toml @@ -17,7 +17,7 @@ name = "reth_libmdbx" [dependencies] bitflags.workspace = true byteorder = "1" -derive_more = "0.99" +derive_more.workspace = true indexmap = "2" libc = "0.2" parking_lot.workspace = true diff --git a/crates/storage/nippy-jar/Cargo.toml b/crates/storage/nippy-jar/Cargo.toml index ee7c90a7f456..7ed18e6a6598 100644 --- a/crates/storage/nippy-jar/Cargo.toml +++ b/crates/storage/nippy-jar/Cargo.toml @@ -35,7 +35,7 @@ serde = { version = "1.0", features = ["derive"] } tracing = "0.1.0" anyhow = "1.0" thiserror.workspace = true -derive_more = "0.99" +derive_more.workspace = true [dev-dependencies] rand = { version = "0.8", features = ["small_rng"] } diff --git a/crates/trie/Cargo.toml b/crates/trie/Cargo.toml index 280189eada02..a6bb0bbc3224 100644 --- a/crates/trie/Cargo.toml +++ b/crates/trie/Cargo.toml @@ -27,7 +27,7 @@ tracing.workspace = true # misc thiserror.workspace = true -derive_more = "0.99" +derive_more.workspace = true auto_impl = "1" # test-utils From 3f3b5d0cf6015ff3346515ddaf4096b03921b024 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 14:14:40 +0000 Subject: [PATCH 22/33] make info log less wide --- crates/stages/src/stages/tx_lookup.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index b4aa0ee1380d..92e347ddbdfc 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -127,7 +127,12 @@ impl Stage for TransactionLookupStage { for (index, hash_to_number) in hash_collector.iter()?.enumerate() { let (hash, number) = hash_to_number?; if index > 0 && index % interval == 0 { - info!(target: "sync::stages::transaction_lookup", ?append_only, progress = %format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0), "Writing transaction hash index"); + info!( + target: "sync::stages::transaction_lookup", + ?append_only, + progress = format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0), + "Writing transaction hash index" + ); } if append_only { From 8f6d1b7f15d1812075a59cde6f3960c05e99775f Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 14:38:59 +0000 Subject: [PATCH 23/33] change transaction lookup logs --- crates/stages/src/stages/tx_lookup.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 92e347ddbdfc..c5649811411e 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -100,13 +100,19 @@ impl Stage for TransactionLookupStage { let mut hash_collector: Collector = Collector::new(Arc::new(TempDir::new()?), 500 * (1024 * 1024)); + debug!( + target: "sync::stages::transaction_lookup", + tx_range = ?input.checkpoint().block_number..=input.target(), + "Updating transaction lookup" + ); + loop { let (tx_range, block_range, is_final_range) = input .next_block_range_with_transaction_threshold(provider, self.commit_threshold)?; let end_block = *block_range.end(); - debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Updating transaction lookup"); + debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes"); for (key, value) in provider.transaction_hashes_by_range(tx_range)? { hash_collector.insert(key, value); @@ -127,11 +133,11 @@ impl Stage for TransactionLookupStage { for (index, hash_to_number) in hash_collector.iter()?.enumerate() { let (hash, number) = hash_to_number?; if index > 0 && index % interval == 0 { - info!( + debug!( target: "sync::stages::transaction_lookup", ?append_only, progress = format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0), - "Writing transaction hash index" + "Inserting hashes" ); } From a161d1e6ea26d7a41e51b52415a02067c9f52e5d Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 14:49:49 +0000 Subject: [PATCH 24/33] use StorageKind on multiple tests --- crates/prune/src/segments/account_history.rs | 4 ++-- crates/prune/src/segments/receipts.rs | 4 ++-- crates/prune/src/segments/receipts_by_logs.rs | 4 ++-- crates/prune/src/segments/sender_recovery.rs | 4 ++-- crates/prune/src/segments/storage_history.rs | 4 ++-- crates/prune/src/segments/transaction_lookup.rs | 4 ++-- crates/prune/src/segments/transactions.rs | 4 ++-- crates/snapshot/src/snapshotter.rs | 4 ++-- crates/stages/benches/setup/mod.rs | 4 ++-- crates/stages/src/stages/merkle.rs | 5 ++--- crates/stages/src/stages/sender_recovery.rs | 9 ++++----- crates/stages/src/stages/tx_lookup.rs | 9 ++++----- crates/stages/src/test_utils/mod.rs | 2 +- 13 files changed, 29 insertions(+), 32 deletions(-) diff --git a/crates/prune/src/segments/account_history.rs b/crates/prune/src/segments/account_history.rs index b92cc5cca41c..f71c1f9916a8 100644 --- a/crates/prune/src/segments/account_history.rs +++ b/crates/prune/src/segments/account_history.rs @@ -90,7 +90,7 @@ mod tests { }; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256}; use reth_provider::PruneCheckpointReader; - use reth_stages::test_utils::TestStageDB; + use reth_stages::test_utils::{TestStageDB, StorageKind}; use std::{collections::BTreeMap, ops::AddAssign}; #[test] @@ -99,7 +99,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 1..=5000, B256::ZERO, 0..1); - db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); let accounts = random_eoa_account_range(&mut rng, 0..2).into_iter().collect::>(); diff --git a/crates/prune/src/segments/receipts.rs b/crates/prune/src/segments/receipts.rs index 1f50c0a9e61e..0f4052816382 100644 --- a/crates/prune/src/segments/receipts.rs +++ b/crates/prune/src/segments/receipts.rs @@ -99,7 +99,7 @@ mod tests { }; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; use reth_provider::PruneCheckpointReader; - use reth_stages::test_utils::TestStageDB; + use reth_stages::test_utils::{TestStageDB, StorageKind}; use std::ops::Sub; #[test] @@ -108,7 +108,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3); - db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); let mut receipts = Vec::new(); for block in &blocks { diff --git a/crates/prune/src/segments/receipts_by_logs.rs b/crates/prune/src/segments/receipts_by_logs.rs index 4ca991c92573..9bafc3a4f7bd 100644 --- a/crates/prune/src/segments/receipts_by_logs.rs +++ b/crates/prune/src/segments/receipts_by_logs.rs @@ -217,7 +217,7 @@ mod tests { }; use reth_primitives::{PruneMode, PruneSegment, ReceiptsLogPruneConfig, B256}; use reth_provider::{PruneCheckpointReader, TransactionsProvider}; - use reth_stages::test_utils::TestStageDB; + use reth_stages::test_utils::{TestStageDB, StorageKind}; use std::collections::BTreeMap; #[test] @@ -232,7 +232,7 @@ mod tests { random_block_range(&mut rng, (tip - 100 + 1)..=tip, B256::ZERO, 1..5), ] .concat(); - db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); let mut receipts = Vec::new(); diff --git a/crates/prune/src/segments/sender_recovery.rs b/crates/prune/src/segments/sender_recovery.rs index f663b3a320ac..7ab7756d150a 100644 --- a/crates/prune/src/segments/sender_recovery.rs +++ b/crates/prune/src/segments/sender_recovery.rs @@ -81,7 +81,7 @@ mod tests { use reth_interfaces::test_utils::{generators, generators::random_block_range}; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; use reth_provider::PruneCheckpointReader; - use reth_stages::test_utils::TestStageDB; + use reth_stages::test_utils::{TestStageDB, StorageKind}; use std::ops::Sub; #[test] @@ -90,7 +90,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3); - db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); let mut transaction_senders = Vec::new(); for block in &blocks { diff --git a/crates/prune/src/segments/storage_history.rs b/crates/prune/src/segments/storage_history.rs index 4380311e91b8..acc403444229 100644 --- a/crates/prune/src/segments/storage_history.rs +++ b/crates/prune/src/segments/storage_history.rs @@ -94,7 +94,7 @@ mod tests { }; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256}; use reth_provider::PruneCheckpointReader; - use reth_stages::test_utils::TestStageDB; + use reth_stages::test_utils::{TestStageDB, StorageKind}; use std::{collections::BTreeMap, ops::AddAssign}; #[test] @@ -103,7 +103,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 0..=5000, B256::ZERO, 0..1); - db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); let accounts = random_eoa_account_range(&mut rng, 0..2).into_iter().collect::>(); diff --git a/crates/prune/src/segments/transaction_lookup.rs b/crates/prune/src/segments/transaction_lookup.rs index 88b74b1dafdd..e61422a18911 100644 --- a/crates/prune/src/segments/transaction_lookup.rs +++ b/crates/prune/src/segments/transaction_lookup.rs @@ -104,7 +104,7 @@ mod tests { use reth_interfaces::test_utils::{generators, generators::random_block_range}; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; use reth_provider::PruneCheckpointReader; - use reth_stages::test_utils::TestStageDB; + use reth_stages::test_utils::{TestStageDB, StorageKind}; use std::ops::Sub; #[test] @@ -113,7 +113,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3); - db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); let mut tx_hash_numbers = Vec::new(); for block in &blocks { diff --git a/crates/prune/src/segments/transactions.rs b/crates/prune/src/segments/transactions.rs index b8b7b5d41265..a2f7c0458e00 100644 --- a/crates/prune/src/segments/transactions.rs +++ b/crates/prune/src/segments/transactions.rs @@ -80,7 +80,7 @@ mod tests { use reth_interfaces::test_utils::{generators, generators::random_block_range}; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; use reth_provider::PruneCheckpointReader; - use reth_stages::test_utils::TestStageDB; + use reth_stages::test_utils::{TestStageDB, StorageKind}; use std::ops::Sub; #[test] @@ -89,7 +89,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 1..=100, B256::ZERO, 2..3); - db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); let transactions = blocks.iter().flat_map(|block| &block.body).collect::>(); diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index 8304412a273f..263b80cd86e9 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -197,7 +197,7 @@ mod tests { }; use reth_primitives::{snapshot::HighestSnapshots, PruneModes, SnapshotSegment, B256, U256}; use reth_provider::providers::SnapshotWriter; - use reth_stages::test_utils::TestStageDB; + use reth_stages::test_utils::{TestStageDB, StorageKind}; #[test] fn run() { @@ -206,7 +206,7 @@ mod tests { let db = TestStageDB::default(); let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3); - db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks"); + db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks"); // Unwind headers from snapshots and manually insert them into the database, so we're able // to check that snapshotter works db.factory diff --git a/crates/stages/benches/setup/mod.rs b/crates/stages/benches/setup/mod.rs index d0aa9e94fcc5..5f3bd9719be3 100644 --- a/crates/stages/benches/setup/mod.rs +++ b/crates/stages/benches/setup/mod.rs @@ -17,7 +17,7 @@ use reth_interfaces::test_utils::{ use reth_primitives::{fs, Account, Address, SealedBlock, B256, U256}; use reth_stages::{ stages::{AccountHashingStage, StorageHashingStage}, - test_utils::TestStageDB, + test_utils::{StorageKind, TestStageDB}, ExecInput, Stage, UnwindInput, }; use reth_trie::StateRoot; @@ -165,7 +165,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB { updated_header.state_root = root; *last_block = SealedBlock { header: updated_header.seal_slow(), ..cloned_last }; - db.insert_blocks(blocks.iter(), None).unwrap(); + db.insert_blocks(blocks.iter(), StorageKind::Static).unwrap(); // initialize TD db.commit(|tx| { diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 40982cafbd41..ffdb04a80da2 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -340,8 +340,7 @@ fn validate_state_root( mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestStageDB, UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind, TestRunnerError, TestStageDB, UnwindStageTestRunner }; use assert_matches::assert_matches; use reth_db::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}; @@ -515,7 +514,7 @@ mod tests { let mut blocks = vec![sealed_head]; blocks.extend(random_block_range(&mut rng, start..=end, head_hash, 0..3)); let last_block = blocks.last().cloned().unwrap(); - self.db.insert_blocks(blocks.iter(), None)?; + self.db.insert_blocks(blocks.iter(), StorageKind::Static)?; let (transitions, final_state) = random_changeset_range( &mut rng, diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 2ddbd4902378..649d4b8362cc 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -261,8 +261,7 @@ mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestStageDB, UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind, TestRunnerError, TestStageDB, UnwindStageTestRunner }; stage_test_suite_ext!(SenderRecoveryTestRunner, sender_recovery); @@ -293,7 +292,7 @@ mod tests { ) }) .collect::>(); - runner.db.insert_blocks(blocks.iter(), None).expect("failed to insert blocks"); + runner.db.insert_blocks(blocks.iter(), StorageKind::Static).expect("failed to insert blocks"); let rx = runner.execute(input); @@ -390,7 +389,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 0..=100, B256::ZERO, 0..10); - db.insert_blocks(blocks.iter(), None).expect("insert blocks"); + db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks"); let max_pruned_block = 30; let max_processed_block = 70; @@ -502,7 +501,7 @@ mod tests { let end = input.target(); let blocks = random_block_range(&mut rng, stage_progress..=end, B256::ZERO, 0..2); - self.db.insert_blocks(blocks.iter(), None)?; + self.db.insert_blocks(blocks.iter(), StorageKind::Static)?; Ok(blocks) } diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 676fbb9ef7a4..3dfffc52f152 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -191,8 +191,7 @@ fn stage_checkpoint( mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError, - TestStageDB, UnwindStageTestRunner, + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind, TestRunnerError, TestStageDB, UnwindStageTestRunner }; use assert_matches::assert_matches; use reth_interfaces::test_utils::{ @@ -231,7 +230,7 @@ mod tests { ) }) .collect::>(); - runner.db.insert_blocks(blocks.iter(), None).expect("failed to insert blocks"); + runner.db.insert_blocks(blocks.iter(), StorageKind::Static).expect("failed to insert blocks"); let rx = runner.execute(input); @@ -366,7 +365,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, 0..=100, B256::ZERO, 0..10); - db.insert_blocks(blocks.iter(), None).expect("insert blocks"); + db.insert_blocks(blocks.iter(), StorageKind::Static).expect("insert blocks"); let max_pruned_block = 30; let max_processed_block = 70; @@ -488,7 +487,7 @@ mod tests { let mut rng = generators::rng(); let blocks = random_block_range(&mut rng, stage_progress + 1..=end, B256::ZERO, 0..2); - self.db.insert_blocks(blocks.iter(), None)?; + self.db.insert_blocks(blocks.iter(), StorageKind::Static)?; Ok(blocks) } diff --git a/crates/stages/src/test_utils/mod.rs b/crates/stages/src/test_utils/mod.rs index f48aa46d5003..1b6c9354fab5 100644 --- a/crates/stages/src/test_utils/mod.rs +++ b/crates/stages/src/test_utils/mod.rs @@ -13,7 +13,7 @@ pub(crate) use runner::{ }; mod test_db; -pub use test_db::TestStageDB; +pub use test_db::{TestStageDB, StorageKind}; mod stage; pub use stage::TestStage; From d237c47a96c3d395675008a4f5c5569d4ce2de44 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 16:15:19 +0000 Subject: [PATCH 25/33] add StorageKind to more tests --- crates/stages/src/stages/merkle.rs | 2 +- crates/stages/src/stages/sender_recovery.rs | 4 ++-- crates/stages/src/stages/tx_lookup.rs | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index ffdb04a80da2..7bfa8ea31d0d 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -481,7 +481,7 @@ mod tests { B256::ZERO, 0..1, )); - self.db.insert_blocks(preblocks.iter(), None)?; + self.db.insert_blocks(preblocks.iter(), StorageKind::Static)?; } let num_of_accounts = 31; diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 649d4b8362cc..5bb62fcae8e5 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -257,7 +257,7 @@ mod tests { stage::StageUnitCheckpoint, BlockNumber, PruneCheckpoint, PruneMode, SealedBlock, TransactionSigned, B256, }; - use reth_provider::PruneCheckpointWriter; + use reth_provider::{PruneCheckpointWriter, TransactionsProvider}; use super::*; use crate::test_utils::{ @@ -326,7 +326,7 @@ mod tests { // Manually seed once with full input range let seed = random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..4); // set tx count range high enough to hit the threshold - runner.db.insert_blocks(seed.iter(), None).expect("failed to seed execution"); + runner.db.insert_blocks(seed.iter(), StorageKind::Static).expect("failed to seed execution"); let total_transactions = runner.db.factory.snapshot_provider().count_entries::().unwrap() diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 3dfffc52f152..032ba8b570f2 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -269,7 +269,7 @@ mod tests { // Seed only once with full input range let seed = random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..4); // set tx count range high enough to hit the threshold - runner.db.insert_blocks(seed.iter(), None).expect("failed to seed execution"); + runner.db.insert_blocks(seed.iter(), StorageKind::Static).expect("failed to seed execution"); let total_txs = runner.db.table::().unwrap().len() as u64; @@ -334,7 +334,7 @@ mod tests { // Seed only once with full input range let seed = random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..2); - runner.db.insert_blocks(seed.iter(), None).expect("failed to seed execution"); + runner.db.insert_blocks(seed.iter(), StorageKind::Static).expect("failed to seed execution"); runner.set_prune_mode(PruneMode::Before(prune_target)); From 577776805e54460ccc08e64c294cbaa426202245 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 16:17:26 +0000 Subject: [PATCH 26/33] fmt --- crates/prune/src/segments/account_history.rs | 2 +- crates/prune/src/segments/receipts.rs | 2 +- crates/prune/src/segments/receipts_by_logs.rs | 2 +- crates/prune/src/segments/sender_recovery.rs | 2 +- crates/prune/src/segments/storage_history.rs | 2 +- .../prune/src/segments/transaction_lookup.rs | 2 +- crates/prune/src/segments/transactions.rs | 2 +- crates/snapshot/src/snapshotter.rs | 2 +- crates/stages/src/stages/merkle.rs | 3 ++- crates/stages/src/stages/sender_recovery.rs | 13 ++++++++++--- crates/stages/src/stages/tx_lookup.rs | 18 ++++++++++++++---- crates/stages/src/test_utils/mod.rs | 2 +- 12 files changed, 35 insertions(+), 17 deletions(-) diff --git a/crates/prune/src/segments/account_history.rs b/crates/prune/src/segments/account_history.rs index f71c1f9916a8..b4dd641d34aa 100644 --- a/crates/prune/src/segments/account_history.rs +++ b/crates/prune/src/segments/account_history.rs @@ -90,7 +90,7 @@ mod tests { }; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256}; use reth_provider::PruneCheckpointReader; - use reth_stages::test_utils::{TestStageDB, StorageKind}; + use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::{collections::BTreeMap, ops::AddAssign}; #[test] diff --git a/crates/prune/src/segments/receipts.rs b/crates/prune/src/segments/receipts.rs index 0f4052816382..d1ce5324e6af 100644 --- a/crates/prune/src/segments/receipts.rs +++ b/crates/prune/src/segments/receipts.rs @@ -99,7 +99,7 @@ mod tests { }; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; use reth_provider::PruneCheckpointReader; - use reth_stages::test_utils::{TestStageDB, StorageKind}; + use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::ops::Sub; #[test] diff --git a/crates/prune/src/segments/receipts_by_logs.rs b/crates/prune/src/segments/receipts_by_logs.rs index 9bafc3a4f7bd..6733308499a3 100644 --- a/crates/prune/src/segments/receipts_by_logs.rs +++ b/crates/prune/src/segments/receipts_by_logs.rs @@ -217,7 +217,7 @@ mod tests { }; use reth_primitives::{PruneMode, PruneSegment, ReceiptsLogPruneConfig, B256}; use reth_provider::{PruneCheckpointReader, TransactionsProvider}; - use reth_stages::test_utils::{TestStageDB, StorageKind}; + use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::collections::BTreeMap; #[test] diff --git a/crates/prune/src/segments/sender_recovery.rs b/crates/prune/src/segments/sender_recovery.rs index 7ab7756d150a..36bb447d9be4 100644 --- a/crates/prune/src/segments/sender_recovery.rs +++ b/crates/prune/src/segments/sender_recovery.rs @@ -81,7 +81,7 @@ mod tests { use reth_interfaces::test_utils::{generators, generators::random_block_range}; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; use reth_provider::PruneCheckpointReader; - use reth_stages::test_utils::{TestStageDB, StorageKind}; + use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::ops::Sub; #[test] diff --git a/crates/prune/src/segments/storage_history.rs b/crates/prune/src/segments/storage_history.rs index acc403444229..caf01057959b 100644 --- a/crates/prune/src/segments/storage_history.rs +++ b/crates/prune/src/segments/storage_history.rs @@ -94,7 +94,7 @@ mod tests { }; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256}; use reth_provider::PruneCheckpointReader; - use reth_stages::test_utils::{TestStageDB, StorageKind}; + use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::{collections::BTreeMap, ops::AddAssign}; #[test] diff --git a/crates/prune/src/segments/transaction_lookup.rs b/crates/prune/src/segments/transaction_lookup.rs index e61422a18911..cc8fbb1a6922 100644 --- a/crates/prune/src/segments/transaction_lookup.rs +++ b/crates/prune/src/segments/transaction_lookup.rs @@ -104,7 +104,7 @@ mod tests { use reth_interfaces::test_utils::{generators, generators::random_block_range}; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; use reth_provider::PruneCheckpointReader; - use reth_stages::test_utils::{TestStageDB, StorageKind}; + use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::ops::Sub; #[test] diff --git a/crates/prune/src/segments/transactions.rs b/crates/prune/src/segments/transactions.rs index a2f7c0458e00..3c2ac425536b 100644 --- a/crates/prune/src/segments/transactions.rs +++ b/crates/prune/src/segments/transactions.rs @@ -80,7 +80,7 @@ mod tests { use reth_interfaces::test_utils::{generators, generators::random_block_range}; use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256}; use reth_provider::PruneCheckpointReader; - use reth_stages::test_utils::{TestStageDB, StorageKind}; + use reth_stages::test_utils::{StorageKind, TestStageDB}; use std::ops::Sub; #[test] diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index 263b80cd86e9..1c1e1b378153 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -197,7 +197,7 @@ mod tests { }; use reth_primitives::{snapshot::HighestSnapshots, PruneModes, SnapshotSegment, B256, U256}; use reth_provider::providers::SnapshotWriter; - use reth_stages::test_utils::{TestStageDB, StorageKind}; + use reth_stages::test_utils::{StorageKind, TestStageDB}; #[test] fn run() { diff --git a/crates/stages/src/stages/merkle.rs b/crates/stages/src/stages/merkle.rs index 7bfa8ea31d0d..6305d709b838 100644 --- a/crates/stages/src/stages/merkle.rs +++ b/crates/stages/src/stages/merkle.rs @@ -340,7 +340,8 @@ fn validate_state_root( mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind, TestRunnerError, TestStageDB, UnwindStageTestRunner + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind, + TestRunnerError, TestStageDB, UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_db::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO}; diff --git a/crates/stages/src/stages/sender_recovery.rs b/crates/stages/src/stages/sender_recovery.rs index 5bb62fcae8e5..0d2edcb7ebac 100644 --- a/crates/stages/src/stages/sender_recovery.rs +++ b/crates/stages/src/stages/sender_recovery.rs @@ -261,7 +261,8 @@ mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind, TestRunnerError, TestStageDB, UnwindStageTestRunner + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind, + TestRunnerError, TestStageDB, UnwindStageTestRunner, }; stage_test_suite_ext!(SenderRecoveryTestRunner, sender_recovery); @@ -292,7 +293,10 @@ mod tests { ) }) .collect::>(); - runner.db.insert_blocks(blocks.iter(), StorageKind::Static).expect("failed to insert blocks"); + runner + .db + .insert_blocks(blocks.iter(), StorageKind::Static) + .expect("failed to insert blocks"); let rx = runner.execute(input); @@ -326,7 +330,10 @@ mod tests { // Manually seed once with full input range let seed = random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..4); // set tx count range high enough to hit the threshold - runner.db.insert_blocks(seed.iter(), StorageKind::Static).expect("failed to seed execution"); + runner + .db + .insert_blocks(seed.iter(), StorageKind::Static) + .expect("failed to seed execution"); let total_transactions = runner.db.factory.snapshot_provider().count_entries::().unwrap() diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 032ba8b570f2..be6383b45c67 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -191,7 +191,8 @@ fn stage_checkpoint( mod tests { use super::*; use crate::test_utils::{ - stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind, TestRunnerError, TestStageDB, UnwindStageTestRunner + stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind, + TestRunnerError, TestStageDB, UnwindStageTestRunner, }; use assert_matches::assert_matches; use reth_interfaces::test_utils::{ @@ -230,7 +231,10 @@ mod tests { ) }) .collect::>(); - runner.db.insert_blocks(blocks.iter(), StorageKind::Static).expect("failed to insert blocks"); + runner + .db + .insert_blocks(blocks.iter(), StorageKind::Static) + .expect("failed to insert blocks"); let rx = runner.execute(input); @@ -269,7 +273,10 @@ mod tests { // Seed only once with full input range let seed = random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..4); // set tx count range high enough to hit the threshold - runner.db.insert_blocks(seed.iter(), StorageKind::Static).expect("failed to seed execution"); + runner + .db + .insert_blocks(seed.iter(), StorageKind::Static) + .expect("failed to seed execution"); let total_txs = runner.db.table::().unwrap().len() as u64; @@ -334,7 +341,10 @@ mod tests { // Seed only once with full input range let seed = random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..2); - runner.db.insert_blocks(seed.iter(), StorageKind::Static).expect("failed to seed execution"); + runner + .db + .insert_blocks(seed.iter(), StorageKind::Static) + .expect("failed to seed execution"); runner.set_prune_mode(PruneMode::Before(prune_target)); diff --git a/crates/stages/src/test_utils/mod.rs b/crates/stages/src/test_utils/mod.rs index 1b6c9354fab5..dd788bca74e3 100644 --- a/crates/stages/src/test_utils/mod.rs +++ b/crates/stages/src/test_utils/mod.rs @@ -13,7 +13,7 @@ pub(crate) use runner::{ }; mod test_db; -pub use test_db::{TestStageDB, StorageKind}; +pub use test_db::{StorageKind, TestStageDB}; mod stage; pub use stage::TestStage; From 475b9748604fda0f67846fba64d1479c7dc5042f Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 16:37:53 +0000 Subject: [PATCH 27/33] fix is_static --- crates/stages/src/test_utils/test_db.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/stages/src/test_utils/test_db.rs b/crates/stages/src/test_utils/test_db.rs index 699729304951..976da6de4b99 100644 --- a/crates/stages/src/test_utils/test_db.rs +++ b/crates/stages/src/test_utils/test_db.rs @@ -401,7 +401,7 @@ impl StorageKind { } fn is_static(&self) -> bool { - matches!(self, Self::Database(_)) + matches!(self, Self::Static) } fn tx_offset(&self) -> u64 { From b623a23b3f8c2b1313553d3e82ac7c2a2fdcf4a5 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 17:30:31 +0000 Subject: [PATCH 28/33] fix txlookup unwind --- crates/stages/src/stages/tx_lookup.rs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index be6383b45c67..77c0074d5215 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -13,7 +13,7 @@ use reth_primitives::{ }; use reth_provider::{ BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, StatsReader, - TransactionsProviderExt, + TransactionsProvider, TransactionsProviderExt, }; use tracing::*; @@ -144,7 +144,7 @@ impl Stage for TransactionLookupStage { // Cursors to unwind tx hash to number let mut body_cursor = tx.cursor_read::()?; let mut tx_hash_number_cursor = tx.cursor_write::()?; - let mut transaction_cursor = tx.cursor_read::()?; + let snapshot_provider = provider.snapshot_provider(); let mut rev_walker = body_cursor.walk_back(Some(*range.end()))?; while let Some((number, body)) = rev_walker.next().transpose()? { if number <= unwind_to { @@ -154,7 +154,7 @@ impl Stage for TransactionLookupStage { // Delete all transactions that belong to this block for tx_id in body.tx_num_range() { // First delete the transaction and hash to id mapping - if let Some((_, transaction)) = transaction_cursor.seek_exact(tx_id)? { + if let Some(transaction) = snapshot_provider.transaction_by_id(tx_id)? { if tx_hash_number_cursor.seek_exact(transaction.hash())?.is_some() { tx_hash_number_cursor.delete_current()?; } @@ -250,7 +250,7 @@ mod tests { total })) }, done: true }) if block_number == previous_stage && processed == total && - total == runner.db.table::().unwrap().len() as u64 + total == runner.db.factory.snapshot_provider().count_entries::().unwrap() as u64 ); // Validate the stage execution @@ -278,7 +278,9 @@ mod tests { .insert_blocks(seed.iter(), StorageKind::Static) .expect("failed to seed execution"); - let total_txs = runner.db.table::().unwrap().len() as u64; + let total_txs = + runner.db.factory.snapshot_provider().count_entries::().unwrap() + as u64; // Execute first time let result = runner.execute(first_input).await.unwrap(); @@ -362,7 +364,7 @@ mod tests { total })) }, done: true }) if block_number == previous_stage && processed == total && - total == runner.db.table::().unwrap().len() as u64 + total == runner.db.factory.snapshot_provider().count_entries::().unwrap() as u64 ); // Validate the stage execution From 916f9a78d42d68c6ad9ca935975e75dcd85fee9d Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 19:10:42 +0000 Subject: [PATCH 29/33] remove intermediate commit test since it's no longer possible --- crates/consensus/beacon/src/engine/mod.rs | 2 +- crates/stages/src/stages/execution.rs | 4 +- crates/stages/src/stages/tx_lookup.rs | 72 ------------------- .../provider/src/providers/snapshot/mod.rs | 1 - 4 files changed, 3 insertions(+), 76 deletions(-) diff --git a/crates/consensus/beacon/src/engine/mod.rs b/crates/consensus/beacon/src/engine/mod.rs index c3a197b18f7f..efa858782ff3 100644 --- a/crates/consensus/beacon/src/engine/mod.rs +++ b/crates/consensus/beacon/src/engine/mod.rs @@ -2409,7 +2409,7 @@ mod tests { mod new_payload { use super::*; use reth_db::test_utils::create_test_snapshots_dir; - use reth_interfaces::test_utils::{generators, generators::random_block}; + use reth_interfaces::test_utils::generators::random_block; use reth_primitives::{ genesis::{Genesis, GenesisAllocator}, Hardfork, U256, diff --git a/crates/stages/src/stages/execution.rs b/crates/stages/src/stages/execution.rs index 216963c6c377..dcb90ec1af5f 100644 --- a/crates/stages/src/stages/execution.rs +++ b/crates/stages/src/stages/execution.rs @@ -597,8 +597,8 @@ mod tests { use reth_node_ethereum::EthEvmConfig; use reth_primitives::{ address, hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Address, - Bytecode, ChainSpecBuilder, PruneMode, PruneModes, ReceiptsLogPruneConfig, SealedBlock, - StorageEntry, B256, U256, + Bytecode, ChainSpecBuilder, PruneMode, ReceiptsLogPruneConfig, SealedBlock, StorageEntry, + B256, }; use reth_provider::{test_utils::create_test_provider_factory, AccountReader, ReceiptProvider}; use reth_revm::EvmProcessorFactory; diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index f37cff6368a1..6b3eef455232 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -232,7 +232,6 @@ mod tests { generators::{random_block, random_block_range}, }; use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, B256}; - use reth_provider::TransactionsProvider; use std::ops::Sub; // Implement stage test suite. @@ -289,77 +288,6 @@ mod tests { assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation"); } - /// Execute the stage twice with input range that exceeds the commit threshold - #[tokio::test] - async fn execute_intermediate_commit_transaction_lookup() { - let threshold = 50; - let mut runner = TransactionLookupTestRunner::default(); - runner.set_commit_threshold(threshold); - let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold - let first_input = ExecInput { - target: Some(previous_stage), - checkpoint: Some(StageCheckpoint::new(stage_progress)), - }; - let mut rng = generators::rng(); - - // Seed only once with full input range - let seed = - random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..4); // set tx count range high enough to hit the threshold - runner - .db - .insert_blocks(seed.iter(), StorageKind::Static) - .expect("failed to seed execution"); - - let total_txs = - runner.db.factory.snapshot_provider().count_entries::().unwrap() - as u64; - - // Execute first time - let result = runner.execute(first_input).await.unwrap(); - let mut tx_count = 0; - let expected_progress = seed - .iter() - .find(|x| { - tx_count += x.body.len(); - tx_count as u64 > threshold - }) - .map(|x| x.number) - .unwrap_or(previous_stage); - assert_matches!(result, Ok(_)); - assert_eq!( - result.unwrap(), - ExecOutput { - checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint( - EntitiesCheckpoint { - processed: runner.db.table::().unwrap().len() as u64, - total: total_txs - } - ), - done: false - } - ); - - // Execute second time to completion - runner.set_commit_threshold(u64::MAX); - let second_input = ExecInput { - target: Some(previous_stage), - checkpoint: Some(StageCheckpoint::new(expected_progress)), - }; - let result = runner.execute(second_input).await.unwrap(); - assert_matches!(result, Ok(_)); - assert_eq!( - result.as_ref().unwrap(), - &ExecOutput { - checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint( - EntitiesCheckpoint { processed: total_txs, total: total_txs } - ), - done: true - } - ); - - assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed"); - } - #[tokio::test] async fn execute_pruned_transaction_lookup() { let (previous_stage, prune_target, stage_progress) = (500, 400, 100); diff --git a/crates/storage/provider/src/providers/snapshot/mod.rs b/crates/storage/provider/src/providers/snapshot/mod.rs index 6b01b90f31f4..3469aaaee56b 100644 --- a/crates/storage/provider/src/providers/snapshot/mod.rs +++ b/crates/storage/provider/src/providers/snapshot/mod.rs @@ -55,7 +55,6 @@ mod tests { CanonicalHeaders, HeaderNumbers, HeaderTD, Headers, RawTable, }; use reth_interfaces::test_utils::generators::{self, random_header_range}; - use reth_nippy_jar::NippyJar; use reth_primitives::{snapshot::find_fixed_range, BlockNumber, B256, U256}; #[test] From 063550762c254f38c303e0bd05a6d8e42a5b1763 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Tue, 20 Feb 2024 19:25:48 +0000 Subject: [PATCH 30/33] clippy --- crates/stages/src/stages/tx_lookup.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 6b3eef455232..2a9597caafe3 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -399,10 +399,6 @@ mod tests { } impl TransactionLookupTestRunner { - fn set_commit_threshold(&mut self, threshold: u64) { - self.commit_threshold = threshold; - } - fn set_prune_mode(&mut self, prune_mode: PruneMode) { self.prune_mode = Some(prune_mode); } From 81396304a1889833ee49b1ef46bc9a8cc1f81f2c Mon Sep 17 00:00:00 2001 From: joshieDo Date: Wed, 21 Feb 2024 17:22:46 +0000 Subject: [PATCH 31/33] rename commit_threshold to chunk_size --- crates/stages/src/stages/tx_lookup.rs | 28 +++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/crates/stages/src/stages/tx_lookup.rs b/crates/stages/src/stages/tx_lookup.rs index 2a9597caafe3..edf1702db62b 100644 --- a/crates/stages/src/stages/tx_lookup.rs +++ b/crates/stages/src/stages/tx_lookup.rs @@ -26,23 +26,26 @@ use tracing::*; /// This stage walks over the bodies table, and sets the transaction hash of each transaction in a /// block to the corresponding `BlockNumber` at each block. This is written to the /// [`tables::TxHashNumber`] This is used for looking up changesets via the transaction hash. +/// +/// It uses [`reth_etl::Collector`] to collect all entries before finally writing them to disk. #[derive(Debug, Clone)] pub struct TransactionLookupStage { - /// The number of lookup entries to commit at once - commit_threshold: u64, + /// The maximum number of lookup entries to hold in memory before pushing them to + /// [`reth_etl::Collector`]. + chunk_size: u64, prune_mode: Option, } impl Default for TransactionLookupStage { fn default() -> Self { - Self { commit_threshold: 5_000_000, prune_mode: None } + Self { chunk_size: 5_000_000, prune_mode: None } } } impl TransactionLookupStage { /// Create new instance of [TransactionLookupStage]. - pub fn new(commit_threshold: u64, prune_mode: Option) -> Self { - Self { commit_threshold, prune_mode } + pub fn new(chunk_size: u64, prune_mode: Option) -> Self { + Self { chunk_size, prune_mode } } } @@ -107,8 +110,8 @@ impl Stage for TransactionLookupStage { ); loop { - let (tx_range, block_range, is_final_range) = input - .next_block_range_with_transaction_threshold(provider, self.commit_threshold)?; + let (tx_range, block_range, is_final_range) = + input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?; let end_block = *block_range.end(); @@ -171,7 +174,7 @@ impl Stage for TransactionLookupStage { input: UnwindInput, ) -> Result { let tx = provider.tx_ref(); - let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold); + let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size); // Cursors to unwind tx hash to number let mut body_cursor = tx.cursor_read::()?; @@ -388,13 +391,13 @@ mod tests { struct TransactionLookupTestRunner { db: TestStageDB, - commit_threshold: u64, + chunk_size: u64, prune_mode: Option, } impl Default for TransactionLookupTestRunner { fn default() -> Self { - Self { db: TestStageDB::default(), commit_threshold: 1000, prune_mode: None } + Self { db: TestStageDB::default(), chunk_size: 1000, prune_mode: None } } } @@ -439,10 +442,7 @@ mod tests { } fn stage(&self) -> Self::S { - TransactionLookupStage { - commit_threshold: self.commit_threshold, - prune_mode: self.prune_mode, - } + TransactionLookupStage { chunk_size: self.chunk_size, prune_mode: self.prune_mode } } } From a9c09c7fc4e9624ba166e559eede41a4ab7ac76e Mon Sep 17 00:00:00 2001 From: joshieDo Date: Thu, 22 Feb 2024 17:44:08 +0000 Subject: [PATCH 32/33] update config and docs field name to chunk_size --- book/run/config.md | 2 +- crates/config/src/config.rs | 6 +++--- crates/node-core/src/node_config.rs | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/book/run/config.md b/book/run/config.md index d5889c6a22ac..96172c0e10c6 100644 --- a/book/run/config.md +++ b/book/run/config.md @@ -207,7 +207,7 @@ The transaction lookup stage builds an index of transaction hashes to their sequ # # Lower thresholds correspond to more frequent disk I/O (writes), # but lowers memory usage -commit_threshold = 5000000 +chunk_size = 5000000 ``` ### `index_account_history` diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index 3583033f85a7..87379a50cc3c 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -239,13 +239,13 @@ impl Default for MerkleConfig { #[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)] #[serde(default)] pub struct TransactionLookupConfig { - /// The maximum number of transactions to process before committing progress to the database. - pub commit_threshold: u64, + /// The maximum number of transactions to process before writing to disk. + pub chunk_size: u64, } impl Default for TransactionLookupConfig { fn default() -> Self { - Self { commit_threshold: 5_000_000 } + Self { chunk_size: 5_000_000 } } } diff --git a/crates/node-core/src/node_config.rs b/crates/node-core/src/node_config.rs index 5bb2d2a6730d..e6d88bfd48d8 100644 --- a/crates/node-core/src/node_config.rs +++ b/crates/node-core/src/node_config.rs @@ -876,7 +876,7 @@ impl NodeConfig { )) .set(MerkleStage::new_execution(stage_config.merkle.clean_threshold)) .set(TransactionLookupStage::new( - stage_config.transaction_lookup.commit_threshold, + stage_config.transaction_lookup.chunk_size, prune_modes.transaction_lookup, )) .set(IndexAccountHistoryStage::new( From 5cca8f6773d1590102de16456796bc6c763714e3 Mon Sep 17 00:00:00 2001 From: joshieDo Date: Thu, 22 Feb 2024 17:49:15 +0000 Subject: [PATCH 33/33] missing chunk_size change --- crates/config/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/config/src/config.rs b/crates/config/src/config.rs index 87379a50cc3c..8b8d0b360305 100644 --- a/crates/config/src/config.rs +++ b/crates/config/src/config.rs @@ -367,7 +367,7 @@ commit_threshold = 100000 clean_threshold = 50000 [stages.transaction_lookup] -commit_threshold = 5000000 +chunk_size = 5000000 [stages.index_account_history] commit_threshold = 100000