Skip to content

Commit

Permalink
feat: adds parallelization to SenderRecovery and TxLookup using `…
Browse files Browse the repository at this point in the history
…SnapshotProvider` (#6654)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
  • Loading branch information
3 people authored Feb 20, 2024
1 parent 2a77e14 commit acecfd3
Show file tree
Hide file tree
Showing 36 changed files with 302 additions and 179 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion bin/reth/src/commands/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use reth_stages::{
},
ExecInput, Stage, StageExt, UnwindInput,
};
use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc};
use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc, time::Instant};
use tracing::*;

/// `reth stage` command
Expand Down Expand Up @@ -269,6 +269,8 @@ impl Command {
checkpoint: Some(checkpoint.with_block_number(self.from)),
};

let start = Instant::now();
info!(target: "reth::cli", stage = %self.stage, "Executing stage");
loop {
exec_stage.execute_ready(input).await?;
let output = exec_stage.execute(&provider_rw, input)?;
Expand All @@ -284,6 +286,7 @@ impl Command {
break
}
}
info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage");

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/net/eth-wire/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ reth-metrics.workspace = true
metrics.workspace = true

bytes.workspace = true
derive_more = "0.99.17"
derive_more.workspace = true
thiserror.workspace = true
serde = { workspace = true, optional = true }
tokio = { workspace = true, features = ["full"] }
Expand Down
1 change: 1 addition & 0 deletions crates/node-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ thiserror.workspace = true
const-str = "0.5.6"
rand.workspace = true
pin-project.workspace = true
derive_more.workspace = true

# io
dirs-next = "2.0.0"
Expand Down
3 changes: 2 additions & 1 deletion crates/node-core/src/args/stage_args.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Shared arguments related to stages
use derive_more::Display;

/// Represents a specific stage within the data pipeline.
///
/// Different stages within the pipeline have dedicated functionalities and operations.
#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, clap::ValueEnum)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, clap::ValueEnum, Display)]
pub enum StageEnum {
/// The headers stage within the pipeline.
///
Expand Down
2 changes: 1 addition & 1 deletion crates/node-core/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ pub fn insert_genesis_history<DB: Database>(
/// Inserts header for the genesis state.
pub fn insert_genesis_header<DB: Database>(
tx: &<DB as Database>::TXMut,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
chain: Arc<ChainSpec>,
) -> ProviderResult<()> {
let (header, block_hash) = chain.sealed_genesis_header().split();
Expand Down
2 changes: 1 addition & 1 deletion crates/primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ tracing.workspace = true
bytes.workspace = true
byteorder = "1"
clap = { workspace = true, features = ["derive"], optional = true }
derive_more = "0.99"
derive_more.workspace = true
itertools.workspace = true
modular-bitfield = "0.11.2"
num_enum = "0.7"
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/account_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ mod tests {
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::{collections::BTreeMap, ops::AddAssign};

#[test]
Expand All @@ -99,7 +99,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=5000, B256::ZERO, 0..1);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let accounts =
random_eoa_account_range(&mut rng, 0..2).into_iter().collect::<BTreeMap<_, _>>();
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ mod tests {
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::ops::Sub;

#[test]
Expand All @@ -108,7 +108,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let mut receipts = Vec::new();
for block in &blocks {
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/receipts_by_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ mod tests {
};
use reth_primitives::{PruneMode, PruneSegment, ReceiptsLogPruneConfig, B256};
use reth_provider::{PruneCheckpointReader, TransactionsProvider};
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::collections::BTreeMap;

#[test]
Expand All @@ -232,7 +232,7 @@ mod tests {
random_block_range(&mut rng, (tip - 100 + 1)..=tip, B256::ZERO, 1..5),
]
.concat();
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let mut receipts = Vec::new();

Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/sender_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ mod tests {
use reth_interfaces::test_utils::{generators, generators::random_block_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::ops::Sub;

#[test]
Expand All @@ -90,7 +90,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let mut transaction_senders = Vec::new();
for block in &blocks {
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/storage_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ mod tests {
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::{collections::BTreeMap, ops::AddAssign};

#[test]
Expand All @@ -103,7 +103,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 0..=5000, B256::ZERO, 0..1);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let accounts =
random_eoa_account_range(&mut rng, 0..2).into_iter().collect::<BTreeMap<_, _>>();
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/transaction_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ mod tests {
use reth_interfaces::test_utils::{generators, generators::random_block_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::ops::Sub;

#[test]
Expand All @@ -113,7 +113,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let mut tx_hash_numbers = Vec::new();
for block in &blocks {
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ mod tests {
use reth_interfaces::test_utils::{generators, generators::random_block_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::ops::Sub;

#[test]
Expand All @@ -89,7 +89,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=100, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let transactions = blocks.iter().flat_map(|block| &block.body).collect::<Vec<_>>();

Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ tracing.workspace = true
tracing-futures = "0.2"
schnellru.workspace = true
futures.workspace = true
derive_more = "0.99"
derive_more.workspace = true
lazy_static = "*"

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions crates/snapshot/src/segments/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reth_provider::{
providers::{SnapshotProvider, SnapshotWriter},
DatabaseProviderRO,
};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path};

/// Snapshot segment responsible for [SnapshotSegment::Headers] part of data.
#[derive(Debug, Default)]
Expand All @@ -23,7 +23,7 @@ impl<DB: Database> Segment<DB> for Headers {
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut snapshot_writer =
Expand Down
4 changes: 2 additions & 2 deletions crates/snapshot/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use reth_primitives::{
BlockNumber, SnapshotSegment,
};
use reth_provider::{providers::SnapshotProvider, DatabaseProviderRO, TransactionsProviderExt};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path};

pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];

Expand All @@ -36,7 +36,7 @@ pub trait Segment<DB: Database>: Send + Sync {
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()>;

Expand Down
4 changes: 2 additions & 2 deletions crates/snapshot/src/segments/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_provider::{
providers::{SnapshotProvider, SnapshotWriter},
BlockReader, DatabaseProviderRO, TransactionsProviderExt,
};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path};

/// Snapshot segment responsible for [SnapshotSegment::Receipts] part of data.
#[derive(Debug, Default)]
Expand All @@ -25,7 +25,7 @@ impl<DB: Database> Segment<DB> for Receipts {
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut snapshot_writer =
Expand Down
4 changes: 2 additions & 2 deletions crates/snapshot/src/segments/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_provider::{
providers::{SnapshotProvider, SnapshotWriter},
BlockReader, DatabaseProviderRO, TransactionsProviderExt,
};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path};

/// Snapshot segment responsible for [SnapshotSegment::Transactions] part of data.
#[derive(Debug, Default)]
Expand All @@ -27,7 +27,7 @@ impl<DB: Database> Segment<DB> for Transactions {
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut snapshot_writer =
Expand Down
10 changes: 5 additions & 5 deletions crates/snapshot/src/snapshotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_provider::{
ProviderFactory,
};
use reth_tokio_util::EventListeners;
use std::{ops::RangeInclusive, sync::Arc, time::Instant};
use std::{ops::RangeInclusive, time::Instant};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace};

Expand All @@ -26,7 +26,7 @@ pub struct Snapshotter<DB> {
/// Provider factory
provider_factory: ProviderFactory<DB>,
/// Snapshot provider
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
/// Pruning configuration for every part of the data that can be pruned. Set by user, and
/// needed in [Snapshotter] to prevent snapshotting the prunable data.
/// See [Snapshotter::get_snapshot_targets].
Expand Down Expand Up @@ -71,7 +71,7 @@ impl<DB: Database> Snapshotter<DB> {
/// Creates a new [Snapshotter].
pub fn new(
provider_factory: ProviderFactory<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
prune_modes: PruneModes,
) -> Self {
Self { provider_factory, snapshot_provider, prune_modes, listeners: Default::default() }
Expand Down Expand Up @@ -202,7 +202,7 @@ mod tests {
};
use reth_primitives::{snapshot::HighestSnapshots, PruneModes, SnapshotSegment, B256, U256};
use reth_provider::providers::SnapshotWriter;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};

#[test]
fn run() {
Expand All @@ -211,7 +211,7 @@ mod tests {
let db = TestStageDB::default();

let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
// Unwind headers from snapshots and manually insert them into the database, so we're able
// to check that snapshotter works
db.factory
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/benches/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use reth_interfaces::test_utils::{
use reth_primitives::{fs, Account, Address, SealedBlock, B256, U256};
use reth_stages::{
stages::{AccountHashingStage, StorageHashingStage},
test_utils::TestStageDB,
test_utils::{StorageKind, TestStageDB},
ExecInput, Stage, UnwindInput,
};
use reth_trie::StateRoot;
Expand Down Expand Up @@ -165,7 +165,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB {
updated_header.state_root = root;
*last_block = SealedBlock { header: updated_header.seal_slow(), ..cloned_last };

db.insert_blocks(blocks.iter(), None).unwrap();
db.insert_blocks(blocks.iter(), StorageKind::Static).unwrap();

// initialize TD
db.commit(|tx| {
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
fn write_headers<DB: Database>(
&mut self,
tx: &<DB as Database>::TXMut,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
) -> Result<BlockNumber, StageError> {
let total_headers = self.header_collector.len();

Expand Down
8 changes: 4 additions & 4 deletions crates/stages/src/stages/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ fn validate_state_root(
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestStageDB, UnwindStageTestRunner,
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
TestRunnerError, TestStageDB, UnwindStageTestRunner,
};
use assert_matches::assert_matches;
use reth_db::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO};
Expand Down Expand Up @@ -482,7 +482,7 @@ mod tests {
B256::ZERO,
0..1,
));
self.db.insert_blocks(preblocks.iter(), None)?;
self.db.insert_blocks(preblocks.iter(), StorageKind::Static)?;
}

let num_of_accounts = 31;
Expand Down Expand Up @@ -515,7 +515,7 @@ mod tests {
let mut blocks = vec![sealed_head];
blocks.extend(random_block_range(&mut rng, start..=end, head_hash, 0..3));
let last_block = blocks.last().cloned().unwrap();
self.db.insert_blocks(blocks.iter(), None)?;
self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;

let (transitions, final_state) = random_changeset_range(
&mut rng,
Expand Down
Loading

0 comments on commit acecfd3

Please sign in to comment.