Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ETL to TxLookup stage #6655

Merged
merged 43 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
065303a
SnapshotProvider becomes wrapper to Arc<SnapshotProviderInner>
joshieDo Feb 17, 2024
8002efd
make transaction_hashes_by_range on static files parallel
joshieDo Feb 17, 2024
da1d5d1
adjust sender recovery stage
joshieDo Feb 18, 2024
fe6b377
chunk_size for parallel work on recovery and hashing is 100
joshieDo Feb 18, 2024
30fbfe9
remove testing code
joshieDo Feb 18, 2024
cdaf758
clippy
joshieDo Feb 18, 2024
7f73631
add timing to stage run cli
joshieDo Feb 18, 2024
7c1116f
clippy
joshieDo Feb 18, 2024
44a4346
fix recovery tests
joshieDo Feb 18, 2024
28086cf
further fixes
joshieDo Feb 18, 2024
68fbd1c
add etl to tx-lookup
joshieDo Feb 18, 2024
b72d8f6
clippy
joshieDo Feb 18, 2024
5933fff
Update bin/reth/src/commands/stage/run.rs
joshieDo Feb 18, 2024
ddaf070
Update bin/reth/src/commands/stage/run.rs
joshieDo Feb 18, 2024
eb690ed
call cursor directly
joshieDo Feb 18, 2024
1e762df
change insert_blocks so it can insert to static and db
joshieDo Feb 20, 2024
356a796
Merge branch 'joshie/concurrent-range-fetch' into joshie/txlookup-etl
joshieDo Feb 20, 2024
9c50ed3
Update bin/reth/src/commands/stage/run.rs
joshieDo Feb 20, 2024
a849870
Update bin/reth/src/commands/stage/run.rs
joshieDo Feb 20, 2024
b74ef80
rename to snapshot provider
joshieDo Feb 20, 2024
b996229
add StorageKind to test_db
joshieDo Feb 20, 2024
f210db8
add derive_more::Display to StageEnum & set it to workspace on crates
joshieDo Feb 20, 2024
3622355
Merge remote-tracking branch 'origin/feat/static-files' into joshie/c…
joshieDo Feb 20, 2024
e3fefdf
Merge branch 'joshie/concurrent-range-fetch' into joshie/txlookup-etl
joshieDo Feb 20, 2024
3f3b5d0
make info log less wide
joshieDo Feb 20, 2024
8f6d1b7
change transaction lookup logs
joshieDo Feb 20, 2024
a161d1e
use StorageKind on multiple tests
joshieDo Feb 20, 2024
d237c47
add StorageKind to more tests
joshieDo Feb 20, 2024
5777768
fmt
joshieDo Feb 20, 2024
84cbfe3
Merge branch 'joshie/concurrent-range-fetch' into joshie/txlookup-etl
joshieDo Feb 20, 2024
475b974
fix is_static
joshieDo Feb 20, 2024
8cf077c
Merge branch 'joshie/concurrent-range-fetch' into joshie/txlookup-etl
joshieDo Feb 20, 2024
b623a23
fix txlookup unwind
joshieDo Feb 20, 2024
3edac90
Merge branch 'joshie/concurrent-range-fetch' into joshie/txlookup-etl
joshieDo Feb 20, 2024
256278d
Merge branch 'feat/static-files' into joshie/txlookup-etl
joshieDo Feb 20, 2024
916f9a7
remove intermediate commit test since it's no longer possible
joshieDo Feb 20, 2024
0635507
clippy
joshieDo Feb 20, 2024
8139630
rename commit_threshold to chunk_size
joshieDo Feb 21, 2024
cb2e1cc
Merge branch 'feat/static-files' into joshie/txlookup-etl
joshieDo Feb 22, 2024
bc4b921
Merge branch 'feat/static-files' into joshie/txlookup-etl
joshieDo Feb 22, 2024
f40565c
Merge branch 'feat/static-files' into joshie/txlookup-etl
joshieDo Feb 22, 2024
a9c09c7
update config and docs field name to chunk_size
joshieDo Feb 22, 2024
5cca8f6
missing chunk_size change
joshieDo Feb 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion bin/reth/src/commands/stage/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use reth_stages::{
},
ExecInput, Stage, StageExt, UnwindInput,
};
use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc};
use std::{any::Any, net::SocketAddr, path::PathBuf, sync::Arc, time::Instant};
use tracing::*;

/// `reth stage` command
Expand Down Expand Up @@ -269,6 +269,8 @@ impl Command {
checkpoint: Some(checkpoint.with_block_number(self.from)),
};

let start = Instant::now();
info!(target: "reth::cli", stage= ?self.stage, "Executing stage.");
loop {
exec_stage.execute_ready(input).await?;
let output = exec_stage.execute(&provider_rw, input)?;
Expand All @@ -284,6 +286,7 @@ impl Command {
break
}
}
info!(target: "reth::cli", stage= ?self.stage, time = ?start.elapsed(), "Finished stage.");

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions crates/etl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ exclude.workspace = true
[dependencies]
tempfile.workspace = true
reth-db.workspace = true
rayon.workspace = true

[dev-dependencies]
reth-primitives.workspace = true
3 changes: 2 additions & 1 deletion crates/etl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{
sync::Arc,
};

use rayon::prelude::*;
use reth_db::table::{Compress, Encode, Key, Value};
use tempfile::{NamedTempFile, TempDir};

Expand Down Expand Up @@ -99,7 +100,7 @@ where

fn flush(&mut self) {
self.buffer_size_bytes = 0;
self.buffer.sort_unstable_by(|a, b| a.0.cmp(&b.0));
self.buffer.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
Copy link
Collaborator

@mattsse mattsse Feb 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how large do we expect this to be?

we might not benefit from par sort here if not large, can do a length check and use par_sort if large if small buffers are possible

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given its only used on the pipeline, i'd say pretty large. Max at 100MB for headers, and 500MB worth for txlookup

let mut buf = Vec::with_capacity(self.buffer.len());
std::mem::swap(&mut buf, &mut self.buffer);
self.files.push(EtlFile::new(self.dir.path(), buf).expect("could not flush data to disk"))
Expand Down
2 changes: 1 addition & 1 deletion crates/node-core/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ pub fn insert_genesis_history<DB: Database>(
/// Inserts header for the genesis state.
pub fn insert_genesis_header<DB: Database>(
tx: &<DB as Database>::TXMut,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
chain: Arc<ChainSpec>,
) -> ProviderResult<()> {
let (header, block_hash) = chain.sealed_genesis_header().split();
Expand Down
2 changes: 1 addition & 1 deletion crates/prune/src/segments/account_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=5000, B256::ZERO, 0..1);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks");

let accounts =
random_eoa_account_range(&mut rng, 0..2).into_iter().collect::<BTreeMap<_, _>>();
Expand Down
2 changes: 1 addition & 1 deletion crates/prune/src/segments/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks");

let mut receipts = Vec::new();
for block in &blocks {
Expand Down
2 changes: 1 addition & 1 deletion crates/prune/src/segments/receipts_by_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ mod tests {
random_block_range(&mut rng, (tip - 100 + 1)..=tip, B256::ZERO, 1..5),
]
.concat();
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks");

let mut receipts = Vec::new();

Expand Down
2 changes: 1 addition & 1 deletion crates/prune/src/segments/sender_recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks");

let mut transaction_senders = Vec::new();
for block in &blocks {
Expand Down
2 changes: 1 addition & 1 deletion crates/prune/src/segments/storage_history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 0..=5000, B256::ZERO, 0..1);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks");

let accounts =
random_eoa_account_range(&mut rng, 0..2).into_iter().collect::<BTreeMap<_, _>>();
Expand Down
2 changes: 1 addition & 1 deletion crates/prune/src/segments/transaction_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=10, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks");

let mut tx_hash_numbers = Vec::new();
for block in &blocks {
Expand Down
2 changes: 1 addition & 1 deletion crates/prune/src/segments/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ mod tests {
let mut rng = generators::rng();

let blocks = random_block_range(&mut rng, 1..=100, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks");

let transactions = blocks.iter().flat_map(|block| &block.body).collect::<Vec<_>>();

Expand Down
4 changes: 2 additions & 2 deletions crates/snapshot/src/segments/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use reth_provider::{
providers::{SnapshotProvider, SnapshotWriter},
DatabaseProviderRO,
};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path};

/// Snapshot segment responsible for [SnapshotSegment::Headers] part of data.
#[derive(Debug, Default)]
Expand All @@ -23,7 +23,7 @@ impl<DB: Database> Segment<DB> for Headers {
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut snapshot_writer =
Expand Down
4 changes: 2 additions & 2 deletions crates/snapshot/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use reth_primitives::{
BlockNumber, SnapshotSegment,
};
use reth_provider::{providers::SnapshotProvider, DatabaseProviderRO, TransactionsProviderExt};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path};

pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];

Expand All @@ -36,7 +36,7 @@ pub trait Segment<DB: Database>: Send + Sync {
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()>;

Expand Down
4 changes: 2 additions & 2 deletions crates/snapshot/src/segments/receipts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_provider::{
providers::{SnapshotProvider, SnapshotWriter},
BlockReader, DatabaseProviderRO, TransactionsProviderExt,
};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path};

/// Snapshot segment responsible for [SnapshotSegment::Receipts] part of data.
#[derive(Debug, Default)]
Expand All @@ -25,7 +25,7 @@ impl<DB: Database> Segment<DB> for Receipts {
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut snapshot_writer =
Expand Down
4 changes: 2 additions & 2 deletions crates/snapshot/src/segments/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use reth_provider::{
providers::{SnapshotProvider, SnapshotWriter},
BlockReader, DatabaseProviderRO, TransactionsProviderExt,
};
use std::{ops::RangeInclusive, path::Path, sync::Arc};
use std::{ops::RangeInclusive, path::Path};

/// Snapshot segment responsible for [SnapshotSegment::Transactions] part of data.
#[derive(Debug, Default)]
Expand All @@ -27,7 +27,7 @@ impl<DB: Database> Segment<DB> for Transactions {
fn snapshot(
&self,
provider: DatabaseProviderRO<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
block_range: RangeInclusive<BlockNumber>,
) -> ProviderResult<()> {
let mut snapshot_writer =
Expand Down
8 changes: 4 additions & 4 deletions crates/snapshot/src/snapshotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use reth_provider::{
ProviderFactory,
};
use reth_tokio_util::EventListeners;
use std::{ops::RangeInclusive, sync::Arc, time::Instant};
use std::{ops::RangeInclusive, time::Instant};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace};

Expand All @@ -26,7 +26,7 @@ pub struct Snapshotter<DB> {
/// Provider factory
provider_factory: ProviderFactory<DB>,
/// Snapshot provider
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
/// Pruning configuration for every part of the data that can be pruned. Set by user, and
/// needed in [Snapshotter] to prevent snapshotting the prunable data.
/// See [Snapshotter::get_snapshot_targets].
Expand Down Expand Up @@ -71,7 +71,7 @@ impl<DB: Database> Snapshotter<DB> {
/// Creates a new [Snapshotter].
pub fn new(
provider_factory: ProviderFactory<DB>,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
prune_modes: PruneModes,
) -> Self {
Self { provider_factory, snapshot_provider, prune_modes, listeners: Default::default() }
Expand Down Expand Up @@ -206,7 +206,7 @@ mod tests {
let db = TestStageDB::default();

let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3);
db.insert_blocks(blocks.iter(), None).expect("insert blocks");
db.insert_blocks(blocks.iter(), Some(0)).expect("insert blocks");
// Unwind headers from snapshots and manually insert them into the database, so we're able
// to check that snapshotter works
db.factory
Expand Down
2 changes: 1 addition & 1 deletion crates/stages/src/stages/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ where
fn write_headers<DB: Database>(
&mut self,
tx: &<DB as Database>::TXMut,
snapshot_provider: Arc<SnapshotProvider>,
snapshot_provider: SnapshotProvider,
) -> Result<BlockNumber, StageError> {
let total_headers = self.header_collector.len();

Expand Down
81 changes: 41 additions & 40 deletions crates/stages/src/stages/sender_recovery.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,22 @@
use crate::{BlockErrorKind, ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use itertools::Itertools;
use reth_db::{
cursor::DbCursorRW,
database::Database,
snapshot::TransactionMask,
tables,
transaction::{DbTx, DbTxMut},
RawValue,
};
use reth_interfaces::{consensus, RethError};
use reth_interfaces::consensus;
use reth_primitives::{
keccak256,
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
Address, PruneSegment, TransactionSignedNoHash, TxNumber,
Address, PruneSegment, SnapshotSegment, TransactionSignedNoHash, TxNumber,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, HeaderProvider, ProviderError, PruneCheckpointReader,
StatsReader, TransactionsProvider,
StatsReader,
};
use std::{fmt::Debug, sync::mpsc};
use std::{fmt::Debug, ops::Range, sync::mpsc};
use thiserror::Error;
use tracing::*;

Expand Down Expand Up @@ -83,43 +82,49 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
// Acquire the cursor for inserting elements
let mut senders_cursor = tx.cursor_write::<tables::TxSenders>()?;

// Query the transactions from both database and static files
let transactions = provider.raw_transactions_by_tx_range(tx_range.clone())?;

// Iterate over transactions in chunks
info!(target: "sync::stages::sender_recovery", ?tx_range, "Recovering senders");

// channels used to return result of sender recovery.
let mut channels = Vec::new();

// Spawn recovery jobs onto the default rayon threadpool and send the result through the
// channel.
//
// We try to evenly divide the transactions to recover across all threads in the threadpool.
// Chunks are submitted instead of individual transactions to reduce the overhead of work
// stealing in the threadpool workers.
let chunk_size = self.commit_threshold as usize / rayon::current_num_threads();
// prevents an edge case
// where the chunk size is either 0 or too small
// to gain anything from using more than 1 thread
let chunk_size = chunk_size.max(16);

for chunk in &tx_range.zip(transactions).chunks(chunk_size) {
// Transactions are different size, so chunks will not all take the same processing time. If
// chunks are too big, there will be idle threads waiting for work. Choosing an
// arbitrary smaller value to make sure it doesn't happen.
let chunk_size = 100;

let chunks = (tx_range.start..tx_range.end)
.step_by(chunk_size as usize)
.map(|start| start..std::cmp::min(start + chunk_size as u64, tx_range.end))
.collect::<Vec<Range<u64>>>();

let mut channels = Vec::with_capacity(chunks.len());
for chunk_range in chunks {
// An _unordered_ channel to receive results from a rayon job
let (recovered_senders_tx, recovered_senders_rx) = mpsc::channel();
channels.push(recovered_senders_rx);
// Note: Unfortunate side-effect of how chunk is designed in itertools (it is not Send)
let chunk: Vec<_> = chunk.collect();

// Spawn the sender recovery task onto the global rayon pool
// This task will send the results through the channel after it recovered the senders.
let manager = provider.snapshot_provider().clone();

// Spawn the task onto the global rayon pool
// This task will send the results through the channel after it has read the transaction
// and calculated the sender.
rayon::spawn(move || {
let mut rlp_buf = Vec::with_capacity(128);
for entry in chunk {
rlp_buf.clear();
let recovery_result = recover_sender(entry, &mut rlp_buf);
let _ = recovered_senders_tx.send(recovery_result);
}
let _ = manager.fetch_range_with_predicate(
SnapshotSegment::Transactions,
chunk_range,
|cursor, number| {
Ok(cursor
.get_one::<TransactionMask<TransactionSignedNoHash>>(number.into())?
.map(|tx| {
rlp_buf.clear();
let _ = recovered_senders_tx
.send(recover_sender((number, tx), &mut rlp_buf));
}))
},
|_| true,
);
});
}

Expand Down Expand Up @@ -185,17 +190,11 @@ impl<DB: Database> Stage<DB> for SenderRecoveryStage {
}
}

#[inline]
fn recover_sender(
(tx_id, tx): (TxNumber, RawValue<TransactionSignedNoHash>),
(tx_id, tx): (TxNumber, TransactionSignedNoHash),
rlp_buf: &mut Vec<u8>,
) -> Result<(u64, Address), Box<SenderRecoveryStageError>> {
let tx = tx
.value()
.map_err(RethError::from)
.map_err(StageError::from)
.map_err(Into::into)
.map_err(Box::new)?;

tx.transaction.encode_without_signature(rlp_buf);

// We call [Signature::recover_signer_unchecked] because transactions run in the pipeline are
Expand Down Expand Up @@ -330,7 +329,9 @@ mod tests {
random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..4); // set tx count range high enough to hit the threshold
runner.db.insert_blocks(seed.iter(), None).expect("failed to seed execution");

let total_transactions = runner.db.table::<tables::Transactions>().unwrap().len() as u64;
let total_transactions =
runner.db.factory.snapshot_provider().count_entries::<tables::Transactions>().unwrap()
as u64;

let first_input = ExecInput {
target: Some(previous_stage),
Expand Down
Loading
Loading