Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bin, snapshot, engine): emit and log snapshotter events #6490

Merged
merged 2 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 4 additions & 2 deletions bin/reth/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,12 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit

let mut hooks = EngineHooks::new();

let snapshotter = Snapshotter::new(
let mut snapshotter = Snapshotter::new(
provider_factory.clone(),
provider_factory.snapshot_provider(),
prune_config.clone().unwrap_or_default().segments,
);
let snapshotter_events = snapshotter.events();
hooks.add(SnapshotHook::new(snapshotter.clone(), Box::new(executor.clone())));
info!(target: "reth::cli", "Snapshotter initialized");

Expand Down Expand Up @@ -375,7 +376,8 @@ impl<DB: Database + DatabaseMetrics + DatabaseMetadata + 'static> NodeBuilderWit
} else {
Either::Right(stream::empty())
},
pruner_events.map(Into::into)
pruner_events.map(Into::into),
snapshotter_events.map(Into::into),
);
executor.spawn_critical(
"events task",
Expand Down
20 changes: 20 additions & 0 deletions bin/reth/src/commands/node/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use reth_primitives::{
BlockNumber,
};
use reth_prune::PrunerEvent;
use reth_snapshot::SnapshotterEvent;
use reth_stages::{ExecOutput, PipelineEvent};
use std::{
fmt::{Display, Formatter},
Expand Down Expand Up @@ -205,6 +206,14 @@ impl<DB> NodeState<DB> {
}
}
}

fn handle_snapshotter_event(&self, event: SnapshotterEvent) {
match event {
SnapshotterEvent::Finished { targets, elapsed } => {
info!(?targets, ?elapsed, "Snapshotter finished");
}
}
}
}

impl<DB: DatabaseMetadata> NodeState<DB> {
Expand Down Expand Up @@ -249,6 +258,8 @@ pub enum NodeEvent {
ConsensusLayerHealth(ConsensusLayerHealthEvent),
/// A pruner event
Pruner(PrunerEvent),
/// A snapshotter event
Snapshotter(SnapshotterEvent),
}

impl From<NetworkEvent> for NodeEvent {
Expand Down Expand Up @@ -281,6 +292,12 @@ impl From<PrunerEvent> for NodeEvent {
}
}

impl From<SnapshotterEvent> for NodeEvent {
fn from(event: SnapshotterEvent) -> Self {
NodeEvent::Snapshotter(event)
}
}

/// Displays relevant information to the user from components of the node, and periodically
/// displays the high-level status of the node.
pub async fn handle_events<E, DB>(
Expand Down Expand Up @@ -391,6 +408,9 @@ where
NodeEvent::Pruner(event) => {
this.state.handle_pruner_event(event);
}
NodeEvent::Snapshotter(event) => {
this.state.handle_snapshotter_event(event);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/consensus/beacon/src/engine/hooks/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<DB: Database + 'static> SnapshotHook<DB> {
) -> RethResult<Option<EngineHookEvent>> {
Ok(match &mut self.state {
SnapshotterState::Idle(snapshotter) => {
let Some(snapshotter) = snapshotter.take() else { return Ok(None) };
let Some(mut snapshotter) = snapshotter.take() else { return Ok(None) };

let targets = snapshotter.get_snapshot_targets(finalized_block_number)?;

Expand Down
2 changes: 1 addition & 1 deletion crates/prune/src/pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl<DB: Database> Pruner<DB> {
}
}

/// Listen for events on the prune.
/// Listen for events on the pruner.
pub fn events(&mut self) -> UnboundedReceiverStream<PrunerEvent> {
self.listeners.new_listener()
}
Expand Down
2 changes: 2 additions & 0 deletions crates/snapshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ reth-db.workspace = true
reth-provider.workspace = true
reth-interfaces.workspace = true
reth-nippy-jar.workspace = true
reth-tokio-util.workspace = true

# async
tokio = { workspace = true, features = ["sync"] }
tokio-stream.workspace = true

# misc
thiserror.workspace = true
Expand Down
14 changes: 14 additions & 0 deletions crates/snapshot/src/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::SnapshotTargets;
use std::time::Duration;

/// An event emitted by a [Snapshotter][crate::Snapshotter].
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum SnapshotterEvent {
/// Emitted when snapshotter finished running.
Finished {
/// Targets that were snapshotted
targets: SnapshotTargets,
/// Time it took to run the snapshotter
elapsed: Duration,
},
}
2 changes: 2 additions & 0 deletions crates/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
)]
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]

mod event;
pub mod segments;
mod snapshotter;

pub use event::SnapshotterEvent;
pub use snapshotter::{SnapshotTargets, Snapshotter, SnapshotterResult, SnapshotterWithResult};
18 changes: 14 additions & 4 deletions crates/snapshot/src/snapshotter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Support for snapshotting.

use crate::{segments, segments::Segment};
use crate::{segments, segments::Segment, SnapshotterEvent};
use rayon::prelude::*;
use reth_db::database::Database;
use reth_interfaces::RethResult;
Expand All @@ -9,7 +9,9 @@ use reth_provider::{
providers::{SnapshotProvider, SnapshotWriter},
ProviderFactory,
};
use reth_tokio_util::EventListeners;
use std::{ops::RangeInclusive, sync::Arc, time::Instant};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace};

/// Result of [Snapshotter::run] execution.
Expand All @@ -29,6 +31,7 @@ pub struct Snapshotter<DB> {
/// needed in [Snapshotter] to prevent snapshotting the prunable data.
/// See [Snapshotter::get_snapshot_targets].
prune_modes: PruneModes,
listeners: EventListeners<SnapshotterEvent>,
}

/// Snapshot targets, per data part, measured in [`BlockNumber`].
Expand Down Expand Up @@ -71,7 +74,12 @@ impl<DB: Database> Snapshotter<DB> {
snapshot_provider: Arc<SnapshotProvider>,
prune_modes: PruneModes,
) -> Self {
Self { provider_factory, snapshot_provider, prune_modes }
Self { provider_factory, snapshot_provider, prune_modes, listeners: Default::default() }
}

/// Listen for events on the snapshotter.
pub fn events(&mut self) -> UnboundedReceiverStream<SnapshotterEvent> {
self.listeners.new_listener()
}

/// Run the snapshotter.
Expand All @@ -82,7 +90,7 @@ impl<DB: Database> Snapshotter<DB> {
///
/// NOTE: it doesn't delete the data from database, and the actual deleting (aka pruning) logic
/// lives in the `prune` crate.
pub fn run(&self, targets: SnapshotTargets) -> SnapshotterResult {
pub fn run(&mut self, targets: SnapshotTargets) -> SnapshotterResult {
debug_assert!(targets
.is_contiguous_to_highest_snapshots(self.snapshot_provider.get_highest_snapshots()));

Expand Down Expand Up @@ -124,6 +132,8 @@ impl<DB: Database> Snapshotter<DB> {
let elapsed = start.elapsed(); // TODO(alexey): track in metrics
debug!(target: "snapshot", ?targets, ?elapsed, "Snapshotter finished");

self.listeners.notify(SnapshotterEvent::Finished { targets: targets.clone(), elapsed });

Ok(targets)
}

Expand Down Expand Up @@ -224,7 +234,7 @@ mod tests {
let provider_factory = db.factory;
let snapshot_provider = provider_factory.snapshot_provider();

let snapshotter =
let mut snapshotter =
Snapshotter::new(provider_factory, snapshot_provider.clone(), PruneModes::default());

let targets = snapshotter.get_snapshot_targets(1).expect("get snapshot targets");
Expand Down
Loading