Skip to content

Commit

Permalink
run snapshot segments in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
shekhirin committed Jan 29, 2024
1 parent 03c7c40 commit e39f409
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/snapshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ tokio = { workspace = true, features = ["sync"] }
thiserror.workspace = true
tracing.workspace = true
clap = { workspace = true, features = ["derive"], optional = true }
rayon.workspace = true

[dev-dependencies]
# reth
Expand Down
2 changes: 1 addition & 1 deletion crates/snapshot/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::{ops::RangeInclusive, path::Path, sync::Arc};
pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];

/// A segment represents a snapshotting of some portion of the data.
pub trait Segment<DB: Database> {
pub trait Segment<DB: Database>: Send + Sync {
/// Returns the [`SnapshotSegment`].
fn segment(&self) -> SnapshotSegment;

Expand Down
9 changes: 6 additions & 3 deletions crates/snapshot/src/snapshotter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Support for snapshotting.

use crate::{segments, segments::Segment};
use rayon::prelude::*;
use reth_db::database::Database;
use reth_interfaces::RethResult;
use reth_primitives::{snapshot::HighestSnapshots, BlockNumber};
Expand Down Expand Up @@ -98,18 +99,20 @@ impl<DB: Database> Snapshotter<DB> {
segments.push((Box::new(segments::Receipts), block_range));
}

for (segment, block_range) in &segments {
segments.par_iter().try_for_each(|(segment, block_range)| -> RethResult<()> {
debug!(target: "snapshot", segment = %segment.segment(), ?block_range, "Snapshotting segment");
let start = Instant::now();

// Create a new database transaction on every segment to prevent long-lived read-only
// transactions
let provider = self.provider_factory.provider()?;
let provider = self.provider_factory.provider()?.disable_long_read_transaction_safety();
segment.snapshot(provider, self.snapshot_provider.clone(), block_range.clone())?;

let elapsed = start.elapsed(); // TODO(alexey): track in metrics
debug!(target: "snapshot", segment = %segment.segment(), ?block_range, ?elapsed, "Finished snapshotting segment");
}

Ok(())
})?;

self.snapshot_provider.commit()?;
for (segment, block_range) in segments {
Expand Down

0 comments on commit e39f409

Please sign in to comment.