Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(exex): WAL metrics #11431

Merged
merged 2 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl ExExHandle {

/// Metrics for the `ExEx` manager.
#[derive(Metrics)]
#[metrics(scope = "exex_manager")]
#[metrics(scope = "exex.manager")]
pub struct ExExManagerMetrics {
/// Max size of the internal state notifications buffer.
max_capacity: Gauge,
Expand Down
48 changes: 41 additions & 7 deletions crates/exex/exex/src/wal/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,25 @@ use reth_exex_types::ExExNotification;
#[derive(Debug, Default)]
pub struct BlockCache {
/// A min heap of `(Block Number, File ID)` tuples.
pub(super) blocks: BinaryHeap<Reverse<(BlockNumber, u32)>>,
///
/// Contains one highest block in notification. In a notification with both committed and
/// reverted chain, the highest block is chosen between both chains.
pub(super) notification_max_blocks: BinaryHeap<Reverse<(BlockNumber, u32)>>,
/// A mapping of committed blocks `Block Hash -> Block`.
///
/// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per
/// block.
pub(super) committed_blocks: FbHashMap<32, (u32, CachedBlock)>,
/// Block height of the lowest committed block currently in the cache.
pub(super) lowest_committed_block_height: Option<BlockNumber>,
/// Block height of the highest committed block currently in the cache.
pub(super) highest_committed_block_height: Option<BlockNumber>,
}

impl BlockCache {
/// Returns `true` if the cache is empty.
pub(super) fn is_empty(&self) -> bool {
self.blocks.is_empty()
self.notification_max_blocks.is_empty()
}

/// Removes all files from the cache that has notifications with a tip block less than or equal
Expand All @@ -37,17 +44,37 @@ impl BlockCache {
pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet<u32> {
let mut file_ids = HashSet::default();

while let Some(block @ Reverse((max_block, file_id))) = self.blocks.peek().copied() {
while let Some(block @ Reverse((max_block, file_id))) =
self.notification_max_blocks.peek().copied()
{
if max_block <= block_number {
let popped_block = self.blocks.pop().unwrap();
let popped_block = self.notification_max_blocks.pop().unwrap();
debug_assert_eq!(popped_block, block);
file_ids.insert(file_id);
} else {
break
}
}

self.committed_blocks.retain(|_, (file_id, _)| !file_ids.contains(file_id));
let (mut lowest_committed_block_height, mut highest_committed_block_height) = (None, None);
self.committed_blocks.retain(|_, (file_id, block)| {
let retain = !file_ids.contains(file_id);

if retain {
lowest_committed_block_height = Some(
lowest_committed_block_height
.map_or(block.block.number, |lowest| block.block.number.min(lowest)),
);
highest_committed_block_height = Some(
highest_committed_block_height
.map_or(block.block.number, |highest| block.block.number.max(highest)),
);
}

retain
});
self.lowest_committed_block_height = lowest_committed_block_height;
self.highest_committed_block_height = highest_committed_block_height;

file_ids
}
Expand All @@ -70,7 +97,7 @@ impl BlockCache {
let max_block =
reverted_chain.iter().chain(&committed_chain).map(|chain| chain.tip().number).max();
if let Some(max_block) = max_block {
self.blocks.push(Reverse((max_block, file_id)));
self.notification_max_blocks.push(Reverse((max_block, file_id)));
}

if let Some(committed_chain) = &committed_chain {
Expand All @@ -81,12 +108,19 @@ impl BlockCache {
};
self.committed_blocks.insert(block.hash(), (file_id, cached_block));
}

self.highest_committed_block_height = Some(committed_chain.tip().number);
}
}

#[cfg(test)]
pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u32)> {
self.blocks.clone().into_sorted_vec().into_iter().map(|entry| entry.0).collect()
self.notification_max_blocks
.clone()
.into_sorted_vec()
.into_iter()
.map(|entry| entry.0)
.collect()
}

#[cfg(test)]
Expand Down
18 changes: 18 additions & 0 deletions crates/exex/exex/src/wal/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
use metrics::Gauge;
use reth_metrics::Metrics;

/// Metrics for the [WAL](`super::Wal`)
#[derive(Metrics)]
#[metrics(scope = "exex.wal")]
pub(super) struct Metrics {
/// Size of all notifications in WAL in bytes
pub size_bytes: Gauge,
/// Total number of notifications in WAL
pub notifications_total: Gauge,
/// Total number of committed blocks in WAL
pub committed_blocks_total: Gauge,
/// Lowest committed block height in WAL
pub lowest_committed_block_height: Gauge,
/// Highest committed block height in WAL
pub highest_committed_block_height: Gauge,
}
55 changes: 46 additions & 9 deletions crates/exex/exex/src/wal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
mod cache;
pub use cache::BlockCache;
mod storage;
use parking_lot::{RwLock, RwLockReadGuard};
pub use storage::Storage;
mod metrics;
use metrics::Metrics;

use std::{
path::Path,
Expand All @@ -16,6 +17,7 @@ use std::{

use alloy_eips::BlockNumHash;
use alloy_primitives::B256;
use parking_lot::{RwLock, RwLockReadGuard};
use reth_exex_types::ExExNotification;
use reth_tracing::tracing::{debug, instrument};

Expand Down Expand Up @@ -74,6 +76,7 @@ struct WalInner {
storage: Storage,
/// WAL block cache. See [`cache::BlockCache`] docs for more details.
block_cache: RwLock<BlockCache>,
metrics: Metrics,
}

impl WalInner {
Expand All @@ -82,6 +85,7 @@ impl WalInner {
next_file_id: AtomicU32::new(0),
storage: Storage::new(directory)?,
block_cache: RwLock::new(BlockCache::default()),
metrics: Metrics::default(),
};
wal.fill_block_cache()?;
Ok(wal)
Expand All @@ -98,9 +102,12 @@ impl WalInner {
self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed);

let mut block_cache = self.block_cache.write();
let mut notifications_size = 0;

for entry in self.storage.iter_notifications(files_range) {
let (file_id, notification) = entry?;
let (file_id, size, notification) = entry?;

notifications_size += size;

let committed_chain = notification.committed_chain();
let reverted_chain = notification.reverted_chain();
Expand All @@ -116,6 +123,8 @@ impl WalInner {
block_cache.insert_notification_blocks_with_file_id(file_id, &notification);
}

self.update_metrics(&block_cache, notifications_size as i64);

Ok(())
}

Expand All @@ -127,30 +136,55 @@ impl WalInner {
let mut block_cache = self.block_cache.write();

let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed);
self.storage.write_notification(file_id, notification)?;
let size = self.storage.write_notification(file_id, notification)?;

debug!(?file_id, "Inserting notification blocks into the block cache");
block_cache.insert_notification_blocks_with_file_id(file_id, notification);

self.update_metrics(&block_cache, size as i64);

Ok(())
}

#[instrument(target = "exex::wal", skip(self))]
fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> {
let file_ids = self.block_cache.write().remove_before(to_block.number);
let mut block_cache = self.block_cache.write();
let file_ids = block_cache.remove_before(to_block.number);

// Remove notifications from the storage.
if file_ids.is_empty() {
debug!("No notifications were finalized from the storage");
return Ok(())
}

let removed_notifications = self.storage.remove_notifications(file_ids)?;
debug!(?removed_notifications, "Storage was finalized");
let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?;
debug!(?removed_notifications, ?removed_size, "Storage was finalized");

self.update_metrics(&block_cache, -(removed_size as i64));

Ok(())
}

fn update_metrics(&self, block_cache: &BlockCache, size_delta: i64) {
if size_delta >= 0 {
self.metrics.size_bytes.increment(size_delta as f64);
} else {
self.metrics.size_bytes.decrement(size_delta as f64);
}

self.metrics.notifications_total.set(block_cache.notification_max_blocks.len() as f64);

self.metrics.committed_blocks_total.set(block_cache.committed_blocks.len() as f64);

if let Some(lowest_committed_block_height) = block_cache.lowest_committed_block_height {
self.metrics.lowest_committed_block_height.set(lowest_committed_block_height as f64);
}

if let Some(highest_committed_block_height) = block_cache.highest_committed_block_height {
self.metrics.highest_committed_block_height.set(highest_committed_block_height as f64);
}
}

/// Returns an iterator over all notifications in the WAL.
fn iter_notifications(
&self,
Expand All @@ -159,7 +193,7 @@ impl WalInner {
return Ok(Box::new(std::iter::empty()))
};

Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.1))))
Ok(Box::new(self.storage.iter_notifications(range).map(|entry| Ok(entry?.2))))
}
}

Expand All @@ -180,7 +214,10 @@ impl WalHandle {
return Ok(None)
};

self.wal.storage.read_notification(file_id)
self.wal
.storage
.read_notification(file_id)
.map(|entry| entry.map(|(notification, _)| notification))
}
}

Expand All @@ -205,7 +242,7 @@ mod tests {
wal.inner
.storage
.iter_notifications(files_range)
.map(|entry| Ok(entry?.1))
.map(|entry| Ok(entry?.2))
.collect::<eyre::Result<_>>()
}

Expand Down
Loading
Loading