Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adds parallelization to SenderRecovery and TxLookup using SnapshotProvider #6654

Merged
merged 24 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
065303a
SnapshotProvider becomes wrapper to Arc<SnapshotProviderInner>
joshieDo Feb 17, 2024
8002efd
make transaction_hashes_by_range on static files parallel
joshieDo Feb 17, 2024
da1d5d1
adjust sender recovery stage
joshieDo Feb 18, 2024
fe6b377
chunk_size for parallel work on recovery and hashing is 100
joshieDo Feb 18, 2024
30fbfe9
remove testing code
joshieDo Feb 18, 2024
cdaf758
clippy
joshieDo Feb 18, 2024
7f73631
add timing to stage run cli
joshieDo Feb 18, 2024
7c1116f
clippy
joshieDo Feb 18, 2024
44a4346
fix recovery tests
joshieDo Feb 18, 2024
28086cf
further fixes
joshieDo Feb 18, 2024
5933fff
Update bin/reth/src/commands/stage/run.rs
joshieDo Feb 18, 2024
ddaf070
Update bin/reth/src/commands/stage/run.rs
joshieDo Feb 18, 2024
1e762df
change insert_blocks so it can insert to static and db
joshieDo Feb 20, 2024
9c50ed3
Update bin/reth/src/commands/stage/run.rs
joshieDo Feb 20, 2024
a849870
Update bin/reth/src/commands/stage/run.rs
joshieDo Feb 20, 2024
b74ef80
rename to snapshot provider
joshieDo Feb 20, 2024
b996229
add StorageKind to test_db
joshieDo Feb 20, 2024
f210db8
add derive_more::Display to StageEnum & set it to workspace on crates
joshieDo Feb 20, 2024
3622355
Merge remote-tracking branch 'origin/feat/static-files' into joshie/c…
joshieDo Feb 20, 2024
a161d1e
use StorageKind on multiple tests
joshieDo Feb 20, 2024
d237c47
add StorageKind to more tests
joshieDo Feb 20, 2024
5777768
fmt
joshieDo Feb 20, 2024
475b974
fix is_static
joshieDo Feb 20, 2024
b623a23
fix txlookup unwind
joshieDo Feb 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion bin/reth/src/commands/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use reth_stages::{
},
ExecInput, Stage, StageExt, UnwindInput,
};
use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc};
use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc, time::Instant};
use tracing::*;

/// `reth stage` command
Expand Down Expand Up @@ -269,6 +269,8 @@ impl Command {
checkpoint: Some(checkpoint.with_block_number(self.from)),
};

let start = Instant::now();
info!(target: "reth::cli", stage = %self.stage, "Executing stage");
loop {
exec_stage.execute_ready(input).await?;
let output = exec_stage.execute(&provider_rw, input)?;
Expand All @@ -284,6 +286,7 @@ impl Command {
break
}
}
info!(target: "reth::cli", stage = %self.stage, time = ?start.elapsed(), "Finished stage");

Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion crates/net/eth-wire/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ reth-metrics.workspace = true
metrics.workspace = true

bytes.workspace = true
derive_more = "0.99.17"
derive_more.workspace = true
thiserror.workspace = true
serde = { workspace = true, optional = true }
tokio = { workspace = true, features = ["full"] }
Expand Down
1 change: 1 addition & 0 deletions crates/node-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ thiserror.workspace = true
const-str = "0.5.6"
rand.workspace = true
pin-project.workspace = true
derive_more.workspace = true

# io
dirs-next = "2.0.0"
Expand Down
3 changes: 2 additions & 1 deletion crates/node-core/src/args/stage_args.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Shared arguments related to stages
use derive_more::Display;

/// Represents a specific stage within the data pipeline.
///
/// Different stages within the pipeline have dedicated functionalities and operations.
#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, clap::ValueEnum)]
#[derive(Debug, Clone, Copy, Eq, PartialEq, PartialOrd, Ord, clap::ValueEnum, Display)]
pub enum StageEnum {
/// The headers stage within the pipeline.
///
Expand Down
2 changes: 1 addition & 1 deletion crates/node-core/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ pub fn insert_genesis_history<DB: Database>(
/// Inserts header for the genesis state.
pub fn insert_genesis_header<DB: Database>(
tx: &<DB as Database>::TXMut,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
chain: Arc<ChainSpec>,
) -> ProviderResult<()> {
let (header, block_hash) = chain.sealed_genesis_header().split();
Expand Down
2 changes: 1 addition & 1 deletion crates/primitives/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ tracing.workspace = true
bytes.workspace = true
byteorder = "1"
clap = { workspace = true, features = ["derive"], optional = true }
derive_more = "0.99"
derive_more.workspace = true
itertools.workspace = true
modular-bitfield = "0.11.2"
num_enum = "0.7"
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/account_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ mod tests {
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::{collections::BTreeMap, ops::AddAssign};

#[test]
Expand All @@ -99,7 +99,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=5000, B256::ZERO, 0..1);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let accounts =
random_eoa_account_range(&mut rng, 0..2).into_iter().collect::<BTreeMap<_, _>>();
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ mod tests {
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::ops::Sub;

#[test]
Expand All @@ -108,7 +108,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let mut receipts = Vec::new();
for block in &blocks {
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/receipts_by_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ mod tests {
};
use reth_primitives::{PruneMode, PruneSegment, ReceiptsLogPruneConfig, B256};
use reth_provider::{PruneCheckpointReader, TransactionsProvider};
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::collections::BTreeMap;

#[test]
Expand All @@ -232,7 +232,7 @@ mod tests {
random_block_range(&mut rng, (tip - 100 + 1)..=tip, B256::ZERO, 1..5),
]
.concat();
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let mut receipts = Vec::new();

Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/sender_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ mod tests {
use reth_interfaces::test_utils::{generators, generators::random_block_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::ops::Sub;

#[test]
Expand All @@ -90,7 +90,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let mut transaction_senders = Vec::new();
for block in &blocks {
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/storage_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ mod tests {
};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::{collections::BTreeMap, ops::AddAssign};

#[test]
Expand All @@ -103,7 +103,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 0..=5000, B256::ZERO, 0..1);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let accounts =
random_eoa_account_range(&mut rng, 0..2).into_iter().collect::<BTreeMap<_, _>>();
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/transaction_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ mod tests {
use reth_interfaces::test_utils::{generators, generators::random_block_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::ops::Sub;

#[test]
Expand All @@ -113,7 +113,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let mut tx_hash_numbers = Vec::new();
for block in &blocks {
Expand Down
4 changes: 2 additions & 2 deletions crates/prune/src/segments/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ mod tests {
use reth_interfaces::test_utils::{generators, generators::random_block_range};
use reth_primitives::{BlockNumber, PruneCheckpoint, PruneMode, PruneSegment, TxNumber, B256};
use reth_provider::PruneCheckpointReader;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::ops::Sub;

#[test]
Expand All @@ -89,7 +89,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=100, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");

let transactions = blocks.iter().flat_map(|block| &block.body).collect::<Vec<_>>();

Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ tracing.workspace = true
tracing-futures = "0.2"
schnellru.workspace = true
futures.workspace = true
derive_more = "0.99"
derive_more.workspace = true
lazy_static = "*"

[dev-dependencies]
Expand Down
4 changes: 2 additions & 2 deletions crates/snapshot/src/segments/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reth_provider::{
providers::{SnapshotProvider, SnapshotWriter},
DatabaseProviderRO,
};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path};

/// Snapshot segment responsible for [SnapshotSegment::Headers] part of data.
#[derive(Debug, Default)]
Expand All @@ -23,7 +23,7 @@ impl<DB: Database> Segment<DB> for Headers {
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut snapshot_writer =
Expand Down
4 changes: 2 additions & 2 deletions crates/snapshot/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use reth_primitives::{
BlockNumber, SnapshotSegment,
};
use reth_provider::{providers::SnapshotProvider, DatabaseProviderRO, TransactionsProviderExt};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path};

pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];

Expand All @@ -36,7 +36,7 @@ pub trait Segment<DB: Database>: Send + Sync {
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()>;

Expand Down
4 changes: 2 additions & 2 deletions crates/snapshot/src/segments/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_provider::{
providers::{SnapshotProvider, SnapshotWriter},
BlockReader, DatabaseProviderRO, TransactionsProviderExt,
};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path};

/// Snapshot segment responsible for [SnapshotSegment::Receipts] part of data.
#[derive(Debug, Default)]
Expand All @@ -25,7 +25,7 @@ impl<DB: Database> Segment<DB> for Receipts {
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut snapshot_writer =
Expand Down
4 changes: 2 additions & 2 deletions crates/snapshot/src/segments/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_provider::{
providers::{SnapshotProvider, SnapshotWriter},
BlockReader, DatabaseProviderRO, TransactionsProviderExt,
};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path};

/// Snapshot segment responsible for [SnapshotSegment::Transactions] part of data.
#[derive(Debug, Default)]
Expand All @@ -27,7 +27,7 @@ impl<DB: Database> Segment<DB> for Transactions {
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut snapshot_writer =
Expand Down
10 changes: 5 additions & 5 deletions crates/snapshot/src/snapshotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_provider::{
ProviderFactory,
};
use reth_tokio_util::EventListeners;
use std::{ops::RangeInclusive, sync::Arc, time::Instant};
use std::{ops::RangeInclusive, time::Instant};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace};

Expand All @@ -26,7 +26,7 @@ pub struct Snapshotter<DB> {
/// Provider factory
provider_factory: ProviderFactory<DB>,
/// Snapshot provider
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
/// Pruning configuration for every part of the data that can be pruned. Set by user, and
/// needed in [Snapshotter] to prevent snapshotting the prunable data.
/// See [Snapshotter::get_snapshot_targets].
Expand Down Expand Up @@ -71,7 +71,7 @@ impl<DB: Database> Snapshotter<DB> {
/// Creates a new [Snapshotter].
pub fn new(
provider_factory: ProviderFactory<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
prune_modes: PruneModes,
) -> Self {
Self { provider_factory, snapshot_provider, prune_modes, listeners: Default::default() }
Expand Down Expand Up @@ -197,7 +197,7 @@ mod tests {
};
use reth_primitives::{snapshot::HighestSnapshots, PruneModes, SnapshotSegment, B256, U256};
use reth_provider::providers::SnapshotWriter;
use reth_stages::test_utils::TestStageDB;
use reth_stages::test_utils::{StorageKind, TestStageDB};

#[test]
fn run() {
Expand All @@ -206,7 +206,7 @@ mod tests {
let db = TestStageDB::default();

let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), StorageKind::Database(None)).expect("insert blocks");
// Unwind headers from snapshots and manually insert them into the database, so we're able
// to check that snapshotter works
db.factory
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/benches/setup/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use reth_interfaces::test_utils::{
use reth_primitives::{fs, Account, Address, SealedBlock, B256, U256};
use reth_stages::{
stages::{AccountHashingStage, StorageHashingStage},
test_utils::TestStageDB,
test_utils::{StorageKind, TestStageDB},
ExecInput, Stage, UnwindInput,
};
use reth_trie::StateRoot;
Expand Down Expand Up @@ -165,7 +165,7 @@ pub(crate) fn txs_testdata(num_blocks: u64) -> TestStageDB {
updated_header.state_root = root;
*last_block = SealedBlock { header: updated_header.seal_slow(), ..cloned_last };

db.insert_blocks(blocks.iter(), None).unwrap();
db.insert_blocks(blocks.iter(), StorageKind::Static).unwrap();

// initialize TD
db.commit(|tx| {
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
fn write_headers<DB: Database>(
&mut self,
tx: &<DB as Database>::TXMut,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
) -> Result<BlockNumber, StageError> {
let total_headers = self.header_collector.len();

Expand Down
8 changes: 4 additions & 4 deletions crates/stages/src/stages/merkle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ fn validate_state_root(
mod tests {
use super::*;
use crate::test_utils::{
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, TestRunnerError,
TestStageDB, UnwindStageTestRunner,
stage_test_suite_ext, ExecuteStageTestRunner, StageTestRunner, StorageKind,
TestRunnerError, TestStageDB, UnwindStageTestRunner,
};
use assert_matches::assert_matches;
use reth_db::cursor::{DbCursorRO, DbCursorRW, DbDupCursorRO};
Expand Down Expand Up @@ -482,7 +482,7 @@ mod tests {
B256::ZERO,
0..1,
));
self.db.insert_blocks(preblocks.iter(), None)?;
self.db.insert_blocks(preblocks.iter(), StorageKind::Static)?;
}

let num_of_accounts = 31;
Expand Down Expand Up @@ -515,7 +515,7 @@ mod tests {
let mut blocks = vec![sealed_head];
blocks.extend(random_block_range(&mut rng, start..=end, head_hash, 0..3));
let last_block = blocks.last().cloned().unwrap();
self.db.insert_blocks(blocks.iter(), None)?;
self.db.insert_blocks(blocks.iter(), StorageKind::Static)?;

let (transitions, final_state) = random_changeset_range(
&mut rng,
Expand Down
Loading
Loading