Skip to content

Commit

Permalink
feat: add ETL to TxLookup stage (#6655)
Browse files Browse the repository at this point in the history
Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
  • Loading branch information
3 people authored Feb 22, 2024
1 parent 1431568 commit aeaabfb
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 124 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion book/run/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ The transaction lookup stage builds an index of transaction hashes to their sequ
#
# Lower thresholds correspond to more frequent disk I/O (writes),
# but lowers memory usage
commit_threshold = 5000000
chunk_size = 5000000
```

### `index_account_history`
Expand Down
8 changes: 4 additions & 4 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,13 @@ impl Default for MerkleConfig {
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Serialize)]
#[serde(default)]
pub struct TransactionLookupConfig {
/// The maximum number of transactions to process before committing progress to the database.
pub commit_threshold: u64,
/// The maximum number of transactions to process before writing to disk.
pub chunk_size: u64,
}

impl Default for TransactionLookupConfig {
fn default() -> Self {
Self { commit_threshold: 5_000_000 }
Self { chunk_size: 5_000_000 }
}
}

Expand Down Expand Up @@ -347,7 +347,7 @@ commit_threshold = 100000
clean_threshold = 50000
[stages.transaction_lookup]
commit_threshold = 5000000
chunk_size = 5000000
[stages.index_account_history]
commit_threshold = 100000
Expand Down
1 change: 1 addition & 0 deletions crates/etl/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ exclude.workspace = true
[dependencies]
tempfile.workspace = true
reth-db.workspace = true
rayon.workspace = true

[dev-dependencies]
reth-primitives.workspace = true
3 changes: 2 additions & 1 deletion crates/etl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use std::{
sync::Arc,
};

use rayon::prelude::*;
use reth_db::table::{Compress, Encode, Key, Value};
use tempfile::{NamedTempFile, TempDir};

Expand Down Expand Up @@ -99,7 +100,7 @@ where

fn flush(&mut self) {
self.buffer_size_bytes = 0;
self.buffer.sort_unstable_by(|a, b| a.0.cmp(&b.0));
self.buffer.par_sort_unstable_by(|a, b| a.0.cmp(&b.0));
let mut buf = Vec::with_capacity(self.buffer.len());
std::mem::swap(&mut buf, &mut self.buffer);
self.files.push(EtlFile::new(self.dir.path(), buf).expect("could not flush data to disk"))
Expand Down
2 changes: 1 addition & 1 deletion crates/node-core/src/node_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ impl NodeConfig {
))
.set(MerkleStage::new_execution(stage_config.merkle.clean_threshold))
.set(TransactionLookupStage::new(
stage_config.transaction_lookup.commit_threshold,
stage_config.transaction_lookup.chunk_size,
prune_modes.transaction_lookup,
))
.set(IndexAccountHistoryStage::new(
Expand Down
191 changes: 74 additions & 117 deletions crates/stages/src/stages/tx_lookup.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,51 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use rayon::prelude::*;
use num_traits::Zero;
use reth_db::{
cursor::{DbCursorRO, DbCursorRW},
database::Database,
tables,
transaction::{DbTx, DbTxMut},
RawKey, RawValue,
};
use reth_etl::Collector;
use reth_interfaces::provider::ProviderError;
use reth_primitives::{
stage::{EntitiesCheckpoint, StageCheckpoint, StageId},
PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment,
PruneCheckpoint, PruneMode, PrunePurpose, PruneSegment, TxHash, TxNumber,
};
use reth_provider::{
BlockReader, DatabaseProviderRW, PruneCheckpointReader, PruneCheckpointWriter, StatsReader,
TransactionsProvider, TransactionsProviderExt,
};
use std::sync::Arc;
use tempfile::TempDir;
use tracing::*;

/// The transaction lookup stage.
///
/// This stage walks over the bodies table, and sets the transaction hash of each transaction in a
/// block to the corresponding `BlockNumber` at each block. This is written to the
/// [`tables::TxHashNumber`] This is used for looking up changesets via the transaction hash.
///
/// It uses [`reth_etl::Collector`] to collect all entries before finally writing them to disk.
#[derive(Debug, Clone)]
pub struct TransactionLookupStage {
/// The number of lookup entries to commit at once
commit_threshold: u64,
/// The maximum number of lookup entries to hold in memory before pushing them to
/// [`reth_etl::Collector`].
chunk_size: u64,
prune_mode: Option<PruneMode>,
}

impl Default for TransactionLookupStage {
fn default() -> Self {
Self { commit_threshold: 5_000_000, prune_mode: None }
Self { chunk_size: 5_000_000, prune_mode: None }
}
}

impl TransactionLookupStage {
/// Create new instance of [TransactionLookupStage].
pub fn new(commit_threshold: u64, prune_mode: Option<PruneMode>) -> Self {
Self { commit_threshold, prune_mode }
pub fn new(chunk_size: u64, prune_mode: Option<PruneMode>) -> Self {
Self { chunk_size, prune_mode }
}
}

Expand Down Expand Up @@ -92,43 +99,71 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
return Ok(ExecOutput::done(input.checkpoint()))
}

let (tx_range, block_range, is_final_range) =
input.next_block_range_with_transaction_threshold(provider, self.commit_threshold)?;
let end_block = *block_range.end();
// 500MB temporary files
let mut hash_collector: Collector<TxHash, TxNumber> =
Collector::new(Arc::new(TempDir::new()?), 500 * (1024 * 1024));

debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Updating transaction lookup");
debug!(
target: "sync::stages::transaction_lookup",
tx_range = ?input.checkpoint().block_number..=input.target(),
"Updating transaction lookup"
);

let mut tx_list = provider.transaction_hashes_by_range(tx_range)?;
loop {
let (tx_range, block_range, is_final_range) =
input.next_block_range_with_transaction_threshold(provider, self.chunk_size)?;

// Sort before inserting the reverse lookup for hash -> tx_id.
tx_list.par_sort_unstable_by(|txa, txb| txa.0.cmp(&txb.0));
let end_block = *block_range.end();

let tx = provider.tx_ref();
let mut txhash_cursor = tx.cursor_write::<tables::TxHashNumber>()?;

// If the last inserted element in the database is equal or bigger than the first
// in our set, then we need to insert inside the DB. If it is smaller then last
// element in the DB, we can append to the DB.
// Append probably only ever happens during sync, on the first table insertion.
let insert = tx_list
.first()
.zip(txhash_cursor.last()?)
.map(|((first, _), (last, _))| first <= &last)
.unwrap_or_default();
// if txhash_cursor.last() is None we will do insert. `zip` would return none if any item is
// none. if it is some and if first is smaller than last, we will do append.
for (tx_hash, id) in tx_list {
if insert {
txhash_cursor.insert(tx_hash, id)?;
} else {
txhash_cursor.append(tx_hash, id)?;
debug!(target: "sync::stages::transaction_lookup", ?tx_range, "Calculating transaction hashes");

for (key, value) in provider.transaction_hashes_by_range(tx_range)? {
hash_collector.insert(key, value);
}

input.checkpoint = Some(
StageCheckpoint::new(end_block)
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
);

if is_final_range {
let append_only = provider.count_entries::<tables::TxHashNumber>()?.is_zero();
let mut txhash_cursor =
provider.tx_ref().cursor_write::<tables::RawTable<tables::TxHashNumber>>()?;

let total_hashes = hash_collector.len();
let interval = (total_hashes / 10).max(1);
for (index, hash_to_number) in hash_collector.iter()?.enumerate() {
let (hash, number) = hash_to_number?;
if index > 0 && index % interval == 0 {
debug!(
target: "sync::stages::transaction_lookup",
?append_only,
progress = format!("{:.2}%", (index as f64 / total_hashes as f64) * 100.0),
"Inserting hashes"
);
}

if append_only {
txhash_cursor.append(
RawKey::<TxHash>::from_vec(hash),
RawValue::<TxNumber>::from_vec(number),
)?;
} else {
txhash_cursor.insert(
RawKey::<TxHash>::from_vec(hash),
RawValue::<TxNumber>::from_vec(number),
)?;
}
}
break
}
}

Ok(ExecOutput {
checkpoint: StageCheckpoint::new(end_block)
checkpoint: StageCheckpoint::new(input.target())
.with_entities_stage_checkpoint(stage_checkpoint(provider)?),
done: is_final_range,
done: true,
})
}

Expand All @@ -139,7 +174,7 @@ impl<DB: Database> Stage<DB> for TransactionLookupStage {
input: UnwindInput,
) -> Result<UnwindOutput, StageError> {
let tx = provider.tx_ref();
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.commit_threshold);
let (range, unwind_to, _) = input.unwind_block_range_with_threshold(self.chunk_size);

// Cursors to unwind tx hash to number
let mut body_cursor = tx.cursor_read::<tables::BlockBodyIndices>()?;
Expand Down Expand Up @@ -257,77 +292,6 @@ mod tests {
assert!(runner.validate_execution(input, result.ok()).is_ok(), "execution validation");
}

/// Execute the stage twice with input range that exceeds the commit threshold
#[tokio::test]
async fn execute_intermediate_commit_transaction_lookup() {
let threshold = 50;
let mut runner = TransactionLookupTestRunner::default();
runner.set_commit_threshold(threshold);
let (stage_progress, previous_stage) = (1000, 1100); // input exceeds threshold
let first_input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(stage_progress)),
};
let mut rng = generators::rng();

// Seed only once with full input range
let seed =
random_block_range(&mut rng, stage_progress + 1..=previous_stage, B256::ZERO, 0..4); // set tx count range high enough to hit the threshold
runner
.db
.insert_blocks(seed.iter(), StorageKind::Static)
.expect("failed to seed execution");

let total_txs =
runner.db.factory.snapshot_provider().count_entries::<tables::Transactions>().unwrap()
as u64;

// Execute first time
let result = runner.execute(first_input).await.unwrap();
let mut tx_count = 0;
let expected_progress = seed
.iter()
.find(|x| {
tx_count += x.body.len();
tx_count as u64 > threshold
})
.map(|x| x.number)
.unwrap_or(previous_stage);
assert_matches!(result, Ok(_));
assert_eq!(
result.unwrap(),
ExecOutput {
checkpoint: StageCheckpoint::new(expected_progress).with_entities_stage_checkpoint(
EntitiesCheckpoint {
processed: runner.db.table::<tables::TxHashNumber>().unwrap().len() as u64,
total: total_txs
}
),
done: false
}
);

// Execute second time to completion
runner.set_commit_threshold(u64::MAX);
let second_input = ExecInput {
target: Some(previous_stage),
checkpoint: Some(StageCheckpoint::new(expected_progress)),
};
let result = runner.execute(second_input).await.unwrap();
assert_matches!(result, Ok(_));
assert_eq!(
result.as_ref().unwrap(),
&ExecOutput {
checkpoint: StageCheckpoint::new(previous_stage).with_entities_stage_checkpoint(
EntitiesCheckpoint { processed: total_txs, total: total_txs }
),
done: true
}
);

assert!(runner.validate_execution(first_input, result.ok()).is_ok(), "validation failed");
}

#[tokio::test]
async fn execute_pruned_transaction_lookup() {
let (previous_stage, prune_target, stage_progress) = (500, 400, 100);
Expand Down Expand Up @@ -428,21 +392,17 @@ mod tests {

struct TransactionLookupTestRunner {
db: TestStageDB,
commit_threshold: u64,
chunk_size: u64,
prune_mode: Option<PruneMode>,
}

impl Default for TransactionLookupTestRunner {
fn default() -> Self {
Self { db: TestStageDB::default(), commit_threshold: 1000, prune_mode: None }
Self { db: TestStageDB::default(), chunk_size: 1000, prune_mode: None }
}
}

impl TransactionLookupTestRunner {
fn set_commit_threshold(&mut self, threshold: u64) {
self.commit_threshold = threshold;
}

fn set_prune_mode(&mut self, prune_mode: PruneMode) {
self.prune_mode = Some(prune_mode);
}
Expand Down Expand Up @@ -483,10 +443,7 @@ mod tests {
}

fn stage(&self) -> Self::S {
TransactionLookupStage {
commit_threshold: self.commit_threshold,
prune_mode: self.prune_mode,
}
TransactionLookupStage { chunk_size: self.chunk_size, prune_mode: self.prune_mode }
}
}

Expand Down

0 comments on commit aeaabfb

Please sign in to comment.