Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ETL to TxLookup stage #6655

Merged
merged 43 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
065303a
SnapshotProvider becomes wrapper to Arc<SnapshotProviderInner>
joshieDo Feb 17, 2024
8002efd
make transaction_hashes_by_range on static files parallel
joshieDo Feb 17, 2024
da1d5d1
adjust sender recovery stage
joshieDo Feb 18, 2024
fe6b377
chunk_size for parallel work on recovery and hashing is 100
joshieDo Feb 18, 2024
30fbfe9
remove testing code
joshieDo Feb 18, 2024
cdaf758
clippy
joshieDo Feb 18, 2024
7f73631
add timing to stage run cli
joshieDo Feb 18, 2024
7c1116f
clippy
joshieDo Feb 18, 2024
44a4346
fix recovery tests
joshieDo Feb 18, 2024
28086cf
further fixes
joshieDo Feb 18, 2024
68fbd1c
add etl to tx-lookup
joshieDo Feb 18, 2024
b72d8f6
clippy
joshieDo Feb 18, 2024
5933fff
Update bin/reth/src/commands/stage/run.rs
joshieDo Feb 18, 2024
ddaf070
Update bin/reth/src/commands/stage/run.rs
joshieDo Feb 18, 2024
eb690ed
call cursor directly
joshieDo Feb 18, 2024
1e762df
change insert_blocks so it can insert to static and db
joshieDo Feb 20, 2024
356a796
Merge branch 'joshie/concurrent-range-fetch' into joshie/txlookup-etl
joshieDo Feb 20, 2024
9c50ed3
Update bin/reth/src/commands/stage/run.rs
joshieDo Feb 20, 2024
a849870
Update bin/reth/src/commands/stage/run.rs
joshieDo Feb 20, 2024
b74ef80
rename to snapshot provider
joshieDo Feb 20, 2024
b996229
add StorageKind to test_db
joshieDo Feb 20, 2024
f210db8
add derive_more::Display to StageEnum & set it to workspace on crates
joshieDo Feb 20, 2024
3622355
Merge remote-tracking branch 'origin/feat/static-files' into joshie/c…
joshieDo Feb 20, 2024
e3fefdf
Merge branch 'joshie/concurrent-range-fetch' into joshie/txlookup-etl
joshieDo Feb 20, 2024
3f3b5d0
make info log less wide
joshieDo Feb 20, 2024
8f6d1b7
change transaction lookup logs
joshieDo Feb 20, 2024
a161d1e
use StorageKind on multiple tests
joshieDo Feb 20, 2024
d237c47
add StorageKind to more tests
joshieDo Feb 20, 2024
5777768
fmt
joshieDo Feb 20, 2024
84cbfe3
Merge branch 'joshie/concurrent-range-fetch' into joshie/txlookup-etl
joshieDo Feb 20, 2024
475b974
fix is_static
joshieDo Feb 20, 2024
8cf077c
Merge branch 'joshie/concurrent-range-fetch' into joshie/txlookup-etl
joshieDo Feb 20, 2024
b623a23
fix txlookup unwind
joshieDo Feb 20, 2024
3edac90
Merge branch 'joshie/concurrent-range-fetch' into joshie/txlookup-etl
joshieDo Feb 20, 2024
256278d
Merge branch 'feat/static-files' into joshie/txlookup-etl
joshieDo Feb 20, 2024
916f9a7
remove intermediate commit test since it's no longer possible
joshieDo Feb 20, 2024
0635507
clippy
joshieDo Feb 20, 2024
8139630
rename commit_threshold to chunk_size
joshieDo Feb 21, 2024
cb2e1cc
Merge branch 'feat/static-files' into joshie/txlookup-etl
joshieDo Feb 22, 2024
bc4b921
Merge branch 'feat/static-files' into joshie/txlookup-etl
joshieDo Feb 22, 2024
f40565c
Merge branch 'feat/static-files' into joshie/txlookup-etl
joshieDo Feb 22, 2024
a9c09c7
update config and docs field name to chunk_size
joshieDo Feb 22, 2024
5cca8f6
missing chunk_size change
joshieDo Feb 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2409,7 +2409,7 @@ mod tests {
mod new_payload {
use super::*;
use reth_db::test_utils::create_test_snapshots_dir;
use reth_interfaces::test_utils::{generators, generators::random_block};
use reth_interfaces::test_utils::generators::random_block;
use reth_primitives::{
genesis::{Genesis, GenesisAllocator},
Hardfork, U256,
Expand Down
1 change: 1 addition & 0 deletions crates/etl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ exclude.workspace = true
[dependencies]
tempfile.workspace = true
reth-db.workspace = true
rayon.workspace = true

[dev-dependencies]
reth-primitives.workspace = true
3 changes: 2 additions & 1 deletion crates/etl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{
sync::Arc,
};

use rayon::prelude::*;
use reth_db::table::{Compress, Encode, Key, Value};
use tempfile::{NamedTempFile, TempDir};

Expand Down Expand Up @@ -99,7 +100,7 @@ where

fn flush(&mut self) {
self.buffer_size_bytes = 0;
self.buffer.sort_unstable_by(|a, b| a.0.cmp(&b.0));
self.buffer.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
Copy link
Collaborator

@mattsse mattsse Feb 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how large do we expect this to be?

we might not benefit from par sort here if not large, can do a length check and use par_sort if large if small buffers are possible

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given its only used on the pipeline, i'd say pretty large. Max at 100MB for headers, and 500MB worth for txlookup

let mut buf = Vec::with_capacity(self.buffer.len());
std::mem::swap(&mut buf, &mut self.buffer);
self.files.push(EtlFile::new(self.dir.path(), buf).expect("could not flush data to disk"))
Expand Down
4 changes: 2 additions & 2 deletions crates/stages/src/stages/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,8 @@ mod tests {
use reth_node_ethereum::EthEvmConfig;
use reth_primitives::{
address, hex_literal::hex, keccak256, stage::StageUnitCheckpoint, Account, Address,
Bytecode, ChainSpecBuilder, PruneMode, PruneModes, ReceiptsLogPruneConfig, SealedBlock,
StorageEntry, B256, U256,
Bytecode, ChainSpecBuilder, PruneMode, ReceiptsLogPruneConfig, SealedBlock, StorageEntry,
B256,
};
use reth_provider::{test_utils::create_test_provider_factory, AccountReader, ReceiptProvider};
use reth_revm::EvmProcessorFactory;
Expand Down
193 changes: 75 additions & 118 deletions crates/stages/src/stages/tx_lookup.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,51 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use rayon::prelude::*;
use num_traits::Zero;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
tables,
transaction::{DbTx, DbTxMut},
RawKey, RawValue,
};
use reth_etl::Collector;
use reth_interfaces::provider::ProviderError;
use reth_primitives::{
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment,
PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, TxHash, TxNumber,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, StatsReader,
TransactionsProvider, TransactionsProviderExt,
};
use std::sync::Arc;
use tempfile::TempDir;
use tracing::*;

/// The transaction lookup stage.
///
/// This stage walks over the bodies table, and sets the transaction hash of each transaction in a
/// block to the corresponding `BlockNumber` at each block. This is written to the
/// [`tables::TxHashNumber`] This is used for looking up changesets via the transaction hash.
///
/// It uses [`reth_etl::Collector`] to collect all entries before finally writing them to disk.
#[derive(Debug, Clone)]
pub struct TransactionLookupStage {
/// The number of lookup entries to commit at once
commit_threshold: u64,
/// The maximum number of lookup entries to hold in memory before pushing them to
/// [`reth_etl::Collector`].
chunk_size: u64,
prune_mode: Option<PruneMode>,
}

impl Default for TransactionLookupStage {
fn default() -> Self {
Self { commit_threshold: 5_000_000, prune_mode: None }
Self { chunk_size: 5_000_000, prune_mode: None }
}
}

impl TransactionLookupStage {
/// Create new instance of [TransactionLookupStage].
pub fn new(commit_threshold: u64, prune_mode: Option<PruneMode>) -> Self {
Self { commit_threshold, prune_mode }
pub fn new(chunk_size: u64, prune_mode: Option<PruneMode>) -> Self {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config and documentation need to be changed to reflect this

Self { chunk_size, prune_mode }
}
}

Expand Down Expand Up @@ -92,43 +99,71 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
return Ok(ExecOutput::done(input.checkpoint()))
}

let (tx_range, block_range, is_final_range) =
input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?;
let end_block = *block_range.end();
// 500MB temporary files
let mut hash_collector: Collector<TxHash, TxNumber> =
Collector::new(Arc::new(TempDir::new()?), 500 * (1024 * 1024));
shekhirin marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +103 to +104
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason to make buffer capacity configurable here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to #6696, but should make things slightly faster on bigger sized chunks


debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Updating transaction lookup");
debug!(
target: "sync::stages::transaction_lookup",
tx_range = ?input.checkpoint().block_number..=input.target(),
"Updating transaction lookup"
);

let mut tx_list = provider.transaction_hashes_by_range(tx_range)?;
loop {
shekhirin marked this conversation as resolved.
Show resolved Hide resolved
let (tx_range, block_range, is_final_range) =
input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?;

// Sort before inserting the reverse lookup for hash -> tx_id.
tx_list.par_sort_unstable_by(|txa, txb| txa.0.cmp(&txb.0));
let end_block = *block_range.end();

let tx = provider.tx_ref();
let mut txhash_cursor = tx.cursor_write::<tables::TxHashNumber>()?;

// If the last inserted element in the database is equal or bigger than the first
// in our set, then we need to insert inside the DB. If it is smaller then last
// element in the DB, we can append to the DB.
// Append probably only ever happens during sync, on the first table insertion.
let insert = tx_list
.first()
.zip(txhash_cursor.last()?)
.map(|((first, _), (last, _))| first <= &last)
.unwrap_or_default();
// if txhash_cursor.last() is None we will do insert. `zip` would return none if any item is
// none. if it is some and if first is smaller than last, we will do append.
for (tx_hash, id) in tx_list {
if insert {
txhash_cursor.insert(tx_hash, id)?;
} else {
txhash_cursor.append(tx_hash, id)?;
debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes");

for (key, value) in provider.transaction_hashes_by_range(tx_range)? {
hash_collector.insert(key, value);
}

input.checkpoint = Some(
StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
);

if is_final_range {
let append_only = provider.count_entries::<tables::TxHashNumber>()?.is_zero();
let mut txhash_cursor =
provider.tx_ref().cursor_write::<tables::RawTable<tables::TxHashNumber>>()?;

let total_hashes = hash_collector.len();
let interval = (total_hashes / 10).max(1);
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
Comment on lines +134 to +136
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ETL API is remarkably clean, loop, insert in collector loop, when end of range iterate over the collector and write to DB. We should document this general pattern.

let (hash, number) = hash_to_number?;
if index > 0 && index % interval == 0 {
debug!(
target: "sync::stages::transaction_lookup",
?append_only,
progress = format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
}

if append_only {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this do anything meaningful anymore?

Copy link
Collaborator Author

@joshieDo joshieDo Feb 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, first run will be append_only, subsequent runs won't be, although they will still benefit from ETL when dealing with large sets

txhash_cursor.append(
RawKey::<TxHash>::from_vec(hash),
RawValue::<TxNumber>::from_vec(number),
)?;
} else {
txhash_cursor.insert(
RawKey::<TxHash>::from_vec(hash),
RawValue::<TxNumber>::from_vec(number),
Comment on lines +148 to +155
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice wonder how much usage of RawKey can impact perf, are we missing on any compressions due to its usage?

)?;
}
}
break
}
}

Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
checkpoint: StageCheckpoint::new(input.target())
joshieDo marked this conversation as resolved.
Show resolved Hide resolved
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
done: is_final_range,
done: true,
})
}

Expand All @@ -139,7 +174,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref();
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);

// Cursors to unwind tx hash to number
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
Expand Down Expand Up @@ -200,7 +235,7 @@ mod tests {
generators::{random_block, random_block_range},
};
use reth_primitives::{stage::StageUnitCheckpoint, BlockNumber, SealedBlock, B256};
use reth_provider::{providers::SnapshotWriter, TransactionsProvider};
use reth_provider::providers::SnapshotWriter;
use std::ops::Sub;

// Implement stage test suite.
Expand Down Expand Up @@ -257,77 +292,6 @@ mod tests {
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
}

/// Execute the stage twice with input range that exceeds the commit threshold
#[tokio::test]
async fn execute_intermediate_commit_transaction_lookup() {
let threshold = 50;
let mut runner = TransactionLookupTestRunner::default();
runner.set_commit_threshold(threshold);
let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold
let first_input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
let mut rng = generators::rng();

// Seed only once with full input range
let seed =
random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..4); // set tx count range high enough to hit the threshold
runner
.db
.insert_blocks(seed.iter(), StorageKind::Static)
.expect("failed to seed execution");

let total_txs =
runner.db.factory.snapshot_provider().count_entries::<tables::Transactions>().unwrap()
as u64;

// Execute first time
let result = runner.execute(first_input).await.unwrap();
let mut tx_count = 0;
let expected_progress = seed
.iter()
.find(|x| {
tx_count += x.body.len();
tx_count as u64 > threshold
})
.map(|x| x.number)
.unwrap_or(previous_stage);
assert_matches!(result, Ok(_));
assert_eq!(
result.unwrap(),
ExecOutput {
checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint(
EntitiesCheckpoint {
processed: runner.db.table::<tables::TxHashNumber>().unwrap().len() as u64,
total: total_txs
}
),
done: false
}
);

// Execute second time to completion
runner.set_commit_threshold(u64::MAX);
let second_input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(expected_progress)),
};
let result = runner.execute(second_input).await.unwrap();
assert_matches!(result, Ok(_));
assert_eq!(
result.as_ref().unwrap(),
&ExecOutput {
checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint(
EntitiesCheckpoint { processed: total_txs, total: total_txs }
),
done: true
}
);

assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
}

#[tokio::test]
async fn execute_pruned_transaction_lookup() {
let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
Expand Down Expand Up @@ -428,21 +392,17 @@ mod tests {

struct TransactionLookupTestRunner {
db: TestStageDB,
commit_threshold: u64,
chunk_size: u64,
prune_mode: Option<PruneMode>,
}

impl Default for TransactionLookupTestRunner {
fn default() -> Self {
Self { db: TestStageDB::default(), commit_threshold: 1000, prune_mode: None }
Self { db: TestStageDB::default(), chunk_size: 1000, prune_mode: None }
}
}

impl TransactionLookupTestRunner {
fn set_commit_threshold(&mut self, threshold: u64) {
self.commit_threshold = threshold;
}

fn set_prune_mode(&mut self, prune_mode: PruneMode) {
self.prune_mode = Some(prune_mode);
}
Expand Down Expand Up @@ -483,10 +443,7 @@ mod tests {
}

fn stage(&self) -> Self::S {
TransactionLookupStage {
commit_threshold: self.commit_threshold,
prune_mode: self.prune_mode,
}
TransactionLookupStage { chunk_size: self.chunk_size, prune_mode: self.prune_mode }
}
}

Expand Down
1 change: 0 additions & 1 deletion crates/storage/provider/src/providers/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ mod tests {
CanonicalHeaders, HeaderNumbers, HeaderTD, Headers, RawTable,
};
use reth_interfaces::test_utils::generators::{self, random_header_range};
use reth_nippy_jar::NippyJar;
use reth_primitives::{snapshot::find_fixed_range, BlockNumber, B256, U256};

#[test]
Expand Down
Loading