Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snapshot, stages): pass snapshot target per segment #6673

Merged
merged 7 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions crates/consensus/beacon/src/engine/hooks/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
use futures::FutureExt;
use reth_db::database::Database;
use reth_interfaces::RethResult;
use reth_primitives::BlockNumber;
use reth_primitives::{snapshot::HighestSnapshots, BlockNumber};
use reth_snapshot::{Snapshotter, SnapshotterWithResult};
use reth_tasks::TaskSpawner;
use std::task::{ready, Context, Poll};
Expand Down Expand Up @@ -81,7 +81,11 @@ impl<DB: Database + 'static> SnapshotHook<DB> {
return Ok(None)
};

let targets = snapshotter.get_snapshot_targets(finalized_block_number)?;
let targets = snapshotter.get_snapshot_targets(HighestSnapshots {
headers: Some(finalized_block_number),
receipts: Some(finalized_block_number),
transactions: Some(finalized_block_number),
})?;

// Check if the snapshotting of any data has been requested.
if targets.any() {
Expand Down
43 changes: 33 additions & 10 deletions crates/snapshot/src/snapshotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,32 +137,37 @@ impl<DB: Database> Snapshotter<DB> {
Ok(targets)
}

/// Returns a snapshot targets at the provided finalized block number.
/// Returns a snapshot targets at the provided finalized block numbers per segment.
/// The target is determined by the check against highest snapshots using
/// [SnapshotProvider::get_highest_snapshots].
pub fn get_snapshot_targets(
&self,
finalized_block_number: BlockNumber,
finalized_block_numbers: HighestSnapshots,
) -> RethResult<SnapshotTargets> {
let highest_snapshots = self.snapshot_provider.get_highest_snapshots();

let targets = SnapshotTargets {
headers: self.get_snapshot_target(highest_snapshots.headers, finalized_block_number),
headers: finalized_block_numbers.headers.and_then(|finalized_block_number| {
self.get_snapshot_target(highest_snapshots.headers, finalized_block_number)
}),
// Snapshot receipts only if they're not pruned according to the user configuration
receipts: if self.prune_modes.receipts.is_none() &&
self.prune_modes.receipts_log_filter.is_empty()
{
self.get_snapshot_target(highest_snapshots.receipts, finalized_block_number)
finalized_block_numbers.receipts.and_then(|finalized_block_number| {
self.get_snapshot_target(highest_snapshots.receipts, finalized_block_number)
})
} else {
None
},
transactions: self
.get_snapshot_target(highest_snapshots.transactions, finalized_block_number),
transactions: finalized_block_numbers.transactions.and_then(|finalized_block_number| {
self.get_snapshot_target(highest_snapshots.transactions, finalized_block_number)
}),
};

trace!(
target: "snapshot",
%finalized_block_number,
?finalized_block_numbers,
?highest_snapshots,
?targets,
any = %targets.any(),
Expand Down Expand Up @@ -237,7 +242,13 @@ mod tests {
let mut snapshotter =
Snapshotter::new(provider_factory, snapshot_provider.clone(), PruneModes::default());

let targets = snapshotter.get_snapshot_targets(1).expect("get snapshot targets");
let targets = snapshotter
.get_snapshot_targets(HighestSnapshots {
headers: Some(1),
receipts: Some(1),
transactions: Some(1),
})
.expect("get snapshot targets");
assert_eq!(
targets,
SnapshotTargets {
Expand All @@ -252,7 +263,13 @@ mod tests {
HighestSnapshots { headers: Some(1), receipts: Some(1), transactions: Some(1) }
);

let targets = snapshotter.get_snapshot_targets(3).expect("get snapshot targets");
let targets = snapshotter
.get_snapshot_targets(HighestSnapshots {
headers: Some(3),
receipts: Some(3),
transactions: Some(3),
})
.expect("get snapshot targets");
assert_eq!(
targets,
SnapshotTargets {
Expand All @@ -267,7 +284,13 @@ mod tests {
HighestSnapshots { headers: Some(3), receipts: Some(3), transactions: Some(3) }
);

let targets = snapshotter.get_snapshot_targets(4).expect("get snapshot targets");
let targets = snapshotter
.get_snapshot_targets(HighestSnapshots {
headers: Some(4),
receipts: Some(4),
transactions: Some(4),
})
.expect("get snapshot targets");
assert_eq!(
targets,
SnapshotTargets {
Expand Down
1 change: 1 addition & 0 deletions crates/stages/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl StageError {
StageError::ChannelClosed |
StageError::InconsistentBlockNumber { .. } |
StageError::InconsistentTxNumber { .. } |
StageError::Internal(_) |
StageError::Fatal(_)
)
}
Expand Down
22 changes: 17 additions & 5 deletions crates/stages/src/stages/snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use crate::{ExecInput, ExecOutput, Stage, StageError, UnwindInput, UnwindOutput};
use reth_db::database::Database;
use reth_primitives::stage::{StageCheckpoint, StageId};
use reth_provider::{BlockNumReader, DatabaseProviderRW};
use reth_primitives::{
snapshot::HighestSnapshots,
stage::{StageCheckpoint, StageId},
};
use reth_provider::{DatabaseProviderRW, StageCheckpointReader};
use reth_snapshot::Snapshotter;

/// The snapshot stage _copies_ all data from database to static files using [Snapshotter]. The
/// block range for copying is determined by the current highest blocks contained in static files
/// and [BlockNumReader::best_block_number],
/// i.e. the range is `highest_snapshotted_block_number..=best_block_number`.
/// and stage checkpoints for each segment individually.
#[derive(Debug)]
pub struct SnapshotStage<DB: Database> {
snapshotter: Snapshotter<DB>,
Expand All @@ -30,7 +32,17 @@ impl<DB: Database> Stage<DB> for SnapshotStage<DB> {
provider: &DatabaseProviderRW<DB>,
input: ExecInput,
) -> Result<ExecOutput, StageError> {
let targets = self.snapshotter.get_snapshot_targets(provider.best_block_number()?)?;
let targets = self.snapshotter.get_snapshot_targets(HighestSnapshots {
headers: provider
.get_stage_checkpoint(StageId::Headers)?
.map(|checkpoint| checkpoint.block_number),
receipts: provider
.get_stage_checkpoint(StageId::Execution)?
.map(|checkpoint| checkpoint.block_number),
transactions: provider
.get_stage_checkpoint(StageId::Bodies)?
.map(|checkpoint| checkpoint.block_number),
})?;
self.snapshotter.run(targets)?;
Ok(ExecOutput::done(input.checkpoint()))
}
Expand Down
Loading