diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index 126622c26ac7..b99d5e57dab5 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -162,7 +162,7 @@ impl ExExHandle { /// Metrics for the `ExEx` manager. #[derive(Metrics)] -#[metrics(scope = "exex_manager")] +#[metrics(scope = "exex.manager")] pub struct ExExManagerMetrics { /// Max size of the internal state notifications buffer. max_capacity: Gauge, diff --git a/crates/exex/exex/src/wal/cache.rs b/crates/exex/exex/src/wal/cache.rs index 1a8b914e868f..882b65e15892 100644 --- a/crates/exex/exex/src/wal/cache.rs +++ b/crates/exex/exex/src/wal/cache.rs @@ -14,18 +14,25 @@ use reth_exex_types::ExExNotification; #[derive(Debug, Default)] pub struct BlockCache { /// A min heap of `(Block Number, File ID)` tuples. - pub(super) blocks: BinaryHeap>, + /// + /// Contains one highest block in notification. In a notification with both committed and + /// reverted chain, the highest block is chosen between both chains. + pub(super) notification_max_blocks: BinaryHeap>, /// A mapping of committed blocks `Block Hash -> Block`. /// /// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per /// block. pub(super) committed_blocks: FbHashMap<32, (u32, CachedBlock)>, + /// Block height of the lowest committed block currently in the cache. + pub(super) lowest_committed_block_height: Option, + /// Block height of the highest committed block currently in the cache. + pub(super) highest_committed_block_height: Option, } impl BlockCache { /// Returns `true` if the cache is empty. pub(super) fn is_empty(&self) -> bool { - self.blocks.is_empty() + self.notification_max_blocks.is_empty() } /// Removes all files from the cache that has notifications with a tip block less than or equal @@ -37,9 +44,11 @@ impl BlockCache { pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet { let mut file_ids = HashSet::default(); - while let Some(block @ Reverse((max_block, file_id))) = self.blocks.peek().copied() { + while let Some(block @ Reverse((max_block, file_id))) = + self.notification_max_blocks.peek().copied() + { if max_block <= block_number { - let popped_block = self.blocks.pop().unwrap(); + let popped_block = self.notification_max_blocks.pop().unwrap(); debug_assert_eq!(popped_block, block); file_ids.insert(file_id); } else { @@ -47,7 +56,25 @@ impl BlockCache { } } - self.committed_blocks.retain(|_, (file_id, _)| !file_ids.contains(file_id)); + let (mut lowest_committed_block_height, mut highest_committed_block_height) = (None, None); + self.committed_blocks.retain(|_, (file_id, block)| { + let retain = !file_ids.contains(file_id); + + if retain { + lowest_committed_block_height = Some( + lowest_committed_block_height + .map_or(block.block.number, |lowest| block.block.number.min(lowest)), + ); + highest_committed_block_height = Some( + highest_committed_block_height + .map_or(block.block.number, |highest| block.block.number.max(highest)), + ); + } + + retain + }); + self.lowest_committed_block_height = lowest_committed_block_height; + self.highest_committed_block_height = highest_committed_block_height; file_ids } @@ -70,7 +97,7 @@ impl BlockCache { let max_block = reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number).max(); if let Some(max_block) = max_block { - self.blocks.push(Reverse((max_block, file_id))); + self.notification_max_blocks.push(Reverse((max_block, file_id))); } if let Some(committed_chain) = &committed_chain { @@ -81,12 +108,19 @@ impl BlockCache { }; self.committed_blocks.insert(block.hash(), (file_id, cached_block)); } + + self.highest_committed_block_height = Some(committed_chain.tip().number); } } #[cfg(test)] pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u32)> { - self.blocks.clone().into_sorted_vec().into_iter().map(|entry| entry.0).collect() + self.notification_max_blocks + .clone() + .into_sorted_vec() + .into_iter() + .map(|entry| entry.0) + .collect() } #[cfg(test)] diff --git a/crates/exex/exex/src/wal/metrics.rs b/crates/exex/exex/src/wal/metrics.rs new file mode 100644 index 000000000000..7726fc978d47 --- /dev/null +++ b/crates/exex/exex/src/wal/metrics.rs @@ -0,0 +1,18 @@ +use metrics::Gauge; +use reth_metrics::Metrics; + +/// Metrics for the [WAL](`super::Wal`) +#[derive(Metrics)] +#[metrics(scope = "exex.wal")] +pub(super) struct Metrics { + /// Size of all notifications in WAL in bytes + pub size_bytes: Gauge, + /// Total number of notifications in WAL + pub notifications_total: Gauge, + /// Total number of committed blocks in WAL + pub committed_blocks_total: Gauge, + /// Lowest committed block height in WAL + pub lowest_committed_block_height: Gauge, + /// Highest committed block height in WAL + pub highest_committed_block_height: Gauge, +} diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index 4515029dcb3f..e8c9c6bc805e 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -3,8 +3,9 @@ mod cache; pub use cache::BlockCache; mod storage; -use parking_lot::{RwLock, RwLockReadGuard}; pub use storage::Storage; +mod metrics; +use metrics::Metrics; use std::{ path::Path, @@ -16,6 +17,7 @@ use std::{ use alloy_eips::BlockNumHash; use alloy_primitives::B256; +use parking_lot::{RwLock, RwLockReadGuard}; use reth_exex_types::ExExNotification; use reth_tracing::tracing::{debug, instrument}; @@ -74,6 +76,7 @@ struct WalInner { storage: Storage, /// WAL block cache. See [`cache::BlockCache`] docs for more details. block_cache: RwLock, + metrics: Metrics, } impl WalInner { @@ -82,6 +85,7 @@ impl WalInner { next_file_id: AtomicU32::new(0), storage: Storage::new(directory)?, block_cache: RwLock::new(BlockCache::default()), + metrics: Metrics::default(), }; wal.fill_block_cache()?; Ok(wal) @@ -98,9 +102,12 @@ impl WalInner { self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed); let mut block_cache = self.block_cache.write(); + let mut notifications_size = 0; for entry in self.storage.iter_notifications(files_range) { - let (file_id, notification) = entry?; + let (file_id, size, notification) = entry?; + + notifications_size += size; let committed_chain = notification.committed_chain(); let reverted_chain = notification.reverted_chain(); @@ -116,6 +123,8 @@ impl WalInner { block_cache.insert_notification_blocks_with_file_id(file_id, ¬ification); } + self.update_metrics(&block_cache, notifications_size as i64); + Ok(()) } @@ -127,17 +136,20 @@ impl WalInner { let mut block_cache = self.block_cache.write(); let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed); - self.storage.write_notification(file_id, notification)?; + let size = self.storage.write_notification(file_id, notification)?; debug!(?file_id, "Inserting notification blocks into the block cache"); block_cache.insert_notification_blocks_with_file_id(file_id, notification); + self.update_metrics(&block_cache, size as i64); + Ok(()) } #[instrument(target = "exex::wal", skip(self))] fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { - let file_ids = self.block_cache.write().remove_before(to_block.number); + let mut block_cache = self.block_cache.write(); + let file_ids = block_cache.remove_before(to_block.number); // Remove notifications from the storage. if file_ids.is_empty() { @@ -145,12 +157,34 @@ impl WalInner { return Ok(()) } - let removed_notifications = self.storage.remove_notifications(file_ids)?; - debug!(?removed_notifications, "Storage was finalized"); + let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?; + debug!(?removed_notifications, ?removed_size, "Storage was finalized"); + + self.update_metrics(&block_cache, -(removed_size as i64)); Ok(()) } + fn update_metrics(&self, block_cache: &BlockCache, size_delta: i64) { + if size_delta >= 0 { + self.metrics.size_bytes.increment(size_delta as f64); + } else { + self.metrics.size_bytes.decrement(size_delta as f64); + } + + self.metrics.notifications_total.set(block_cache.notification_max_blocks.len() as f64); + + self.metrics.committed_blocks_total.set(block_cache.committed_blocks.len() as f64); + + if let Some(lowest_committed_block_height) = block_cache.lowest_committed_block_height { + self.metrics.lowest_committed_block_height.set(lowest_committed_block_height as f64); + } + + if let Some(highest_committed_block_height) = block_cache.highest_committed_block_height { + self.metrics.highest_committed_block_height.set(highest_committed_block_height as f64); + } + } + /// Returns an iterator over all notifications in the WAL. fn iter_notifications( &self, @@ -159,7 +193,7 @@ impl WalInner { return Ok(Box::new(std::iter::empty())) }; - Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.1)))) + Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.2)))) } } @@ -180,7 +214,10 @@ impl WalHandle { return Ok(None) }; - self.wal.storage.read_notification(file_id) + self.wal + .storage + .read_notification(file_id) + .map(|entry| entry.map(|(notification, _)| notification)) } } @@ -205,7 +242,7 @@ mod tests { wal.inner .storage .iter_notifications(files_range) - .map(|entry| Ok(entry?.1)) + .map(|entry| Ok(entry?.2)) .collect::>() } diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index e921bdac862b..166a9bb4eb6b 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -40,16 +40,23 @@ impl Storage { } /// Removes notification for the given file ID from the storage. + /// + /// # Returns + /// + /// The size of the file that was removed in bytes, if any. #[instrument(target = "exex::wal::storage", skip(self))] - fn remove_notification(&self, file_id: u32) -> bool { + fn remove_notification(&self, file_id: u32) -> Option { + let path = self.file_path(file_id); + let size = path.metadata().ok()?.len(); + match reth_fs_util::remove_file(self.file_path(file_id)) { Ok(()) => { debug!("Notification was removed from the storage"); - true + Some(size) } Err(err) => { debug!(?err, "Failed to remove notification from the storage"); - false + None } } } @@ -77,36 +84,42 @@ impl Storage { /// /// # Returns /// - /// Number of removed notifications. + /// Number of removed notifications and the total size of the removed files in bytes. pub(super) fn remove_notifications( &self, file_ids: impl IntoIterator, - ) -> eyre::Result { - let mut deleted = 0; + ) -> eyre::Result<(usize, u64)> { + let mut deleted_total = 0; + let mut deleted_size = 0; for id in file_ids { - if self.remove_notification(id) { - deleted += 1; + if let Some(size) = self.remove_notification(id) { + deleted_total += 1; + deleted_size += size; } } - Ok(deleted) + Ok((deleted_total, deleted_size)) } pub(super) fn iter_notifications( &self, range: RangeInclusive, - ) -> impl Iterator> + '_ { + ) -> impl Iterator> + '_ { range.map(move |id| { - let notification = self.read_notification(id)?.ok_or_eyre("notification not found")?; + let (notification, size) = + self.read_notification(id)?.ok_or_eyre("notification not found")?; - Ok((id, notification)) + Ok((id, size, notification)) }) } /// Reads the notification from the file with the given ID. #[instrument(target = "exex::wal::storage", skip(self))] - pub(super) fn read_notification(&self, file_id: u32) -> eyre::Result> { + pub(super) fn read_notification( + &self, + file_id: u32, + ) -> eyre::Result> { let file_path = self.file_path(file_id); debug!(?file_path, "Reading notification from WAL"); @@ -115,21 +128,26 @@ impl Storage { Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), Err(err) => return Err(err.into()), }; + let size = file.metadata()?.len(); // Deserialize using the bincode- and msgpack-compatible serde wrapper let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_> = rmp_serde::decode::from_read(&mut file)?; - Ok(Some(notification.into())) + Ok(Some((notification.into(), size))) } /// Writes the notification to the file with the given ID. + /// + /// # Returns + /// + /// The size of the file that was written in bytes. #[instrument(target = "exex::wal::storage", skip(self, notification))] pub(super) fn write_notification( &self, file_id: u32, notification: &ExExNotification, - ) -> eyre::Result<()> { + ) -> eyre::Result { let file_path = self.file_path(file_id); debug!(?file_path, "Writing notification to WAL"); @@ -137,9 +155,11 @@ impl Storage { let notification = reth_exex_types::serde_bincode_compat::ExExNotification::from(notification); - Ok(reth_fs_util::atomic_write_file(&file_path, |file| { + reth_fs_util::atomic_write_file(&file_path, |file| { rmp_serde::encode::write(file, ¬ification) - })?) + })?; + + Ok(file_path.metadata()?.len()) } } @@ -177,7 +197,10 @@ mod tests { let file_id = 0; storage.write_notification(file_id, ¬ification)?; let deserialized_notification = storage.read_notification(file_id)?; - assert_eq!(deserialized_notification, Some(notification)); + assert_eq!( + deserialized_notification.map(|(notification, _)| notification), + Some(notification) + ); Ok(()) }