From 7321792721691f70f71a76f386cec5bdff9fcc7a Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 8 Feb 2024 14:49:54 +0000 Subject: [PATCH 1/2] feat(bin, snapshot, engine): emit and log snapshotter events --- Cargo.lock | 2 ++ bin/reth/src/builder.rs | 6 ++++-- bin/reth/src/commands/node/events.rs | 20 +++++++++++++++++++ .../beacon/src/engine/hooks/snapshot.rs | 2 +- crates/prune/src/pruner.rs | 2 +- crates/snapshot/Cargo.toml | 2 ++ crates/snapshot/src/event.rs | 9 +++++++++ crates/snapshot/src/lib.rs | 2 ++ crates/snapshot/src/snapshotter.rs | 18 +++++++++++++---- 9 files changed, 55 insertions(+), 8 deletions(-) create mode 100644 crates/snapshot/src/event.rs diff --git a/Cargo.lock b/Cargo.lock index b8a5ff0254df..330f72248f77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6913,9 +6913,11 @@ dependencies = [ "reth-primitives", "reth-provider", "reth-stages", + "reth-tokio-util", "tempfile", "thiserror", "tokio", + "tokio-stream", "tracing", ] diff --git a/bin/reth/src/builder.rs b/bin/reth/src/builder.rs index b7e056238913..74d382ce6676 100644 --- a/bin/reth/src/builder.rs +++ b/bin/reth/src/builder.rs @@ -263,11 +263,12 @@ impl NodeBuilderWit let mut hooks = EngineHooks::new(); - let snapshotter = Snapshotter::new( + let mut snapshotter = Snapshotter::new( provider_factory.clone(), provider_factory.snapshot_provider(), prune_config.clone().unwrap_or_default().segments, ); + let snapshotter_events = snapshotter.events(); hooks.add(SnapshotHook::new(snapshotter.clone(), Box::new(executor.clone()))); info!(target: "reth::cli", "Snapshotter initialized"); @@ -375,7 +376,8 @@ impl NodeBuilderWit } else { Either::Right(stream::empty()) }, - pruner_events.map(Into::into) + pruner_events.map(Into::into), + snapshotter_events.map(Into::into), ); executor.spawn_critical( "events task", diff --git a/bin/reth/src/commands/node/events.rs b/bin/reth/src/commands/node/events.rs index 4aaea44efe17..787e01a42125 100644 --- a/bin/reth/src/commands/node/events.rs +++ b/bin/reth/src/commands/node/events.rs @@ -12,6 +12,7 @@ use reth_primitives::{ BlockNumber, }; use reth_prune::PrunerEvent; +use reth_snapshot::SnapshotterEvent; use reth_stages::{ExecOutput, PipelineEvent}; use std::{ fmt::{Display, Formatter}, @@ -205,6 +206,14 @@ impl NodeState { } } } + + fn handle_snapshotter_event(&self, event: SnapshotterEvent) { + match event { + SnapshotterEvent::Finished { targets, elapsed } => { + info!(?targets, ?elapsed, "Snapshotter finished"); + } + } + } } impl NodeState { @@ -249,6 +258,8 @@ pub enum NodeEvent { ConsensusLayerHealth(ConsensusLayerHealthEvent), /// A pruner event Pruner(PrunerEvent), + /// A snapshotter event + Snapshotter(SnapshotterEvent), } impl From for NodeEvent { @@ -281,6 +292,12 @@ impl From for NodeEvent { } } +impl From for NodeEvent { + fn from(event: SnapshotterEvent) -> Self { + NodeEvent::Snapshotter(event) + } +} + /// Displays relevant information to the user from components of the node, and periodically /// displays the high-level status of the node. pub async fn handle_events( @@ -391,6 +408,9 @@ where NodeEvent::Pruner(event) => { this.state.handle_pruner_event(event); } + NodeEvent::Snapshotter(event) => { + this.state.handle_snapshotter_event(event); + } } } diff --git a/crates/consensus/beacon/src/engine/hooks/snapshot.rs b/crates/consensus/beacon/src/engine/hooks/snapshot.rs index f2c9363c5172..6bc03ebb6312 100644 --- a/crates/consensus/beacon/src/engine/hooks/snapshot.rs +++ b/crates/consensus/beacon/src/engine/hooks/snapshot.rs @@ -75,7 +75,7 @@ impl SnapshotHook { ) -> RethResult> { Ok(match &mut self.state { SnapshotterState::Idle(snapshotter) => { - let Some(snapshotter) = snapshotter.take() else { return Ok(None) }; + let Some(mut snapshotter) = snapshotter.take() else { return Ok(None) }; let targets = snapshotter.get_snapshot_targets(finalized_block_number)?; diff --git a/crates/prune/src/pruner.rs b/crates/prune/src/pruner.rs index ab9fd5cd53a3..8538807ba9fc 100644 --- a/crates/prune/src/pruner.rs +++ b/crates/prune/src/pruner.rs @@ -65,7 +65,7 @@ impl Pruner { } } - /// Listen for events on the prune. + /// Listen for events on the pruner. pub fn events(&mut self) -> UnboundedReceiverStream { self.listeners.new_listener() } diff --git a/crates/snapshot/Cargo.toml b/crates/snapshot/Cargo.toml index a17c1d0ac65c..df3216c5abb3 100644 --- a/crates/snapshot/Cargo.toml +++ b/crates/snapshot/Cargo.toml @@ -18,9 +18,11 @@ reth-db.workspace = true reth-provider.workspace = true reth-interfaces.workspace = true reth-nippy-jar.workspace = true +reth-tokio-util.workspace = true # async tokio = { workspace = true, features = ["sync"] } +tokio-stream.workspace = true # misc thiserror.workspace = true diff --git a/crates/snapshot/src/event.rs b/crates/snapshot/src/event.rs new file mode 100644 index 000000000000..01d0d1fdde76 --- /dev/null +++ b/crates/snapshot/src/event.rs @@ -0,0 +1,9 @@ +use crate::SnapshotTargets; +use std::time::Duration; + +/// An event emitted by a [Pruner][crate::Pruner]. +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum SnapshotterEvent { + /// Emitted when snapshotter finished running. + Finished { targets: SnapshotTargets, elapsed: Duration }, +} diff --git a/crates/snapshot/src/lib.rs b/crates/snapshot/src/lib.rs index ba05a3ba7549..f28b63b326ed 100644 --- a/crates/snapshot/src/lib.rs +++ b/crates/snapshot/src/lib.rs @@ -7,7 +7,9 @@ )] #![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] +mod event; pub mod segments; mod snapshotter; +pub use event::SnapshotterEvent; pub use snapshotter::{SnapshotTargets, Snapshotter, SnapshotterResult, SnapshotterWithResult}; diff --git a/crates/snapshot/src/snapshotter.rs b/crates/snapshot/src/snapshotter.rs index 84bd3ec8b4e2..93be15e5dc20 100644 --- a/crates/snapshot/src/snapshotter.rs +++ b/crates/snapshot/src/snapshotter.rs @@ -1,6 +1,6 @@ //! Support for snapshotting. -use crate::{segments, segments::Segment}; +use crate::{segments, segments::Segment, SnapshotterEvent}; use rayon::prelude::*; use reth_db::database::Database; use reth_interfaces::RethResult; @@ -9,7 +9,9 @@ use reth_provider::{ providers::{SnapshotProvider, SnapshotWriter}, ProviderFactory, }; +use reth_tokio_util::EventListeners; use std::{ops::RangeInclusive, sync::Arc, time::Instant}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, trace}; /// Result of [Snapshotter::run] execution. @@ -29,6 +31,7 @@ pub struct Snapshotter { /// needed in [Snapshotter] to prevent snapshotting the prunable data. /// See [Snapshotter::get_snapshot_targets]. prune_modes: PruneModes, + listeners: EventListeners, } /// Snapshot targets, per data part, measured in [`BlockNumber`]. @@ -71,7 +74,12 @@ impl Snapshotter { snapshot_provider: Arc, prune_modes: PruneModes, ) -> Self { - Self { provider_factory, snapshot_provider, prune_modes } + Self { provider_factory, snapshot_provider, prune_modes, listeners: Default::default() } + } + + /// Listen for events on the snapshotter. + pub fn events(&mut self) -> UnboundedReceiverStream { + self.listeners.new_listener() } /// Run the snapshotter. @@ -82,7 +90,7 @@ impl Snapshotter { /// /// NOTE: it doesn't delete the data from database, and the actual deleting (aka pruning) logic /// lives in the `prune` crate. - pub fn run(&self, targets: SnapshotTargets) -> SnapshotterResult { + pub fn run(&mut self, targets: SnapshotTargets) -> SnapshotterResult { debug_assert!(targets .is_contiguous_to_highest_snapshots(self.snapshot_provider.get_highest_snapshots())); @@ -124,6 +132,8 @@ impl Snapshotter { let elapsed = start.elapsed(); // TODO(alexey): track in metrics debug!(target: "snapshot", ?targets, ?elapsed, "Snapshotter finished"); + self.listeners.notify(SnapshotterEvent::Finished { targets: targets.clone(), elapsed }); + Ok(targets) } @@ -224,7 +234,7 @@ mod tests { let provider_factory = db.factory; let snapshot_provider = provider_factory.snapshot_provider(); - let snapshotter = + let mut snapshotter = Snapshotter::new(provider_factory, snapshot_provider.clone(), PruneModes::default()); let targets = snapshotter.get_snapshot_targets(1).expect("get snapshot targets"); From e61f99112879f5c2ac5d3fff78e35d04b9551ef3 Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Thu, 8 Feb 2024 14:55:43 +0000 Subject: [PATCH 2/2] add comments --- crates/snapshot/src/event.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/crates/snapshot/src/event.rs b/crates/snapshot/src/event.rs index 01d0d1fdde76..b9e45a11118c 100644 --- a/crates/snapshot/src/event.rs +++ b/crates/snapshot/src/event.rs @@ -1,9 +1,14 @@ use crate::SnapshotTargets; use std::time::Duration; -/// An event emitted by a [Pruner][crate::Pruner]. +/// An event emitted by a [Snapshotter][crate::Snapshotter]. #[derive(Debug, PartialEq, Eq, Clone)] pub enum SnapshotterEvent { /// Emitted when snapshotter finished running. - Finished { targets: SnapshotTargets, elapsed: Duration }, + Finished { + /// Targets that were snapshotted + targets: SnapshotTargets, + /// Time it took to run the snapshotter + elapsed: Duration, + }, }