diff --git a/crates/exex/exex/src/wal/cache.rs b/crates/exex/exex/src/wal/cache.rs index 861ae9b506b1..1a8b914e868f 100644 --- a/crates/exex/exex/src/wal/cache.rs +++ b/crates/exex/exex/src/wal/cache.rs @@ -14,12 +14,12 @@ use reth_exex_types::ExExNotification; #[derive(Debug, Default)] pub struct BlockCache { /// A min heap of `(Block Number, File ID)` tuples. - pub(super) blocks: BinaryHeap>, + pub(super) blocks: BinaryHeap>, /// A mapping of committed blocks `Block Hash -> Block`. /// /// For each [`ExExNotification::ChainCommitted`] notification, there will be an entry per /// block. - pub(super) committed_blocks: FbHashMap<32, (u64, CachedBlock)>, + pub(super) committed_blocks: FbHashMap<32, (u32, CachedBlock)>, } impl BlockCache { @@ -34,7 +34,7 @@ impl BlockCache { /// # Returns /// /// A set of file IDs that were removed. - pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet { + pub(super) fn remove_before(&mut self, block_number: BlockNumber) -> HashSet { let mut file_ids = HashSet::default(); while let Some(block @ Reverse((max_block, file_id))) = self.blocks.peek().copied() { @@ -54,14 +54,14 @@ impl BlockCache { /// Returns the file ID for the notification containing the given committed block hash, if it /// exists. - pub(super) fn get_file_id_by_committed_block_hash(&self, block_hash: &B256) -> Option { + pub(super) fn get_file_id_by_committed_block_hash(&self, block_hash: &B256) -> Option { self.committed_blocks.get(block_hash).map(|entry| entry.0) } /// Inserts the blocks from the notification into the cache with the given file ID. pub(super) fn insert_notification_blocks_with_file_id( &mut self, - file_id: u64, + file_id: u32, notification: &ExExNotification, ) { let reverted_chain = notification.reverted_chain(); @@ -85,12 +85,12 @@ impl BlockCache { } #[cfg(test)] - pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u64)> { + pub(super) fn blocks_sorted(&self) -> Vec<(BlockNumber, u32)> { self.blocks.clone().into_sorted_vec().into_iter().map(|entry| entry.0).collect() } #[cfg(test)] - pub(super) fn committed_blocks_sorted(&self) -> Vec<(B256, u64, CachedBlock)> { + pub(super) fn committed_blocks_sorted(&self) -> Vec<(B256, u32, CachedBlock)> { use itertools::Itertools; self.committed_blocks diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index 923d5356f17c..4515029dcb3f 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -9,7 +9,7 @@ pub use storage::Storage; use std::{ path::Path, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicU32, Ordering}, Arc, }, }; @@ -69,7 +69,7 @@ impl Wal { /// Inner type for the WAL. #[derive(Debug)] struct WalInner { - next_file_id: AtomicUsize, + next_file_id: AtomicU32, /// The underlying WAL storage backed by a file. storage: Storage, /// WAL block cache. See [`cache::BlockCache`] docs for more details. @@ -79,7 +79,7 @@ struct WalInner { impl WalInner { fn new(directory: impl AsRef) -> eyre::Result { let mut wal = Self { - next_file_id: AtomicUsize::new(0), + next_file_id: AtomicU32::new(0), storage: Storage::new(directory)?, block_cache: RwLock::new(BlockCache::default()), }; @@ -95,6 +95,7 @@ impl WalInner { #[instrument(target = "exex::wal", skip(self))] fn fill_block_cache(&mut self) -> eyre::Result<()> { let Some(files_range) = self.storage.files_range()? else { return Ok(()) }; + self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed); let mut block_cache = self.block_cache.write(); @@ -113,8 +114,6 @@ impl WalInner { ); block_cache.insert_notification_blocks_with_file_id(file_id, ¬ification); - - self.next_file_id.fetch_add(1, Ordering::Relaxed); } Ok(()) @@ -127,7 +126,7 @@ impl WalInner { fn commit(&self, notification: &ExExNotification) -> eyre::Result<()> { let mut block_cache = self.block_cache.write(); - let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed) as u64; + let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed); self.storage.write_notification(file_id, notification)?; debug!(?file_id, "Inserting notification blocks into the block cache"); @@ -211,8 +210,8 @@ mod tests { } fn sort_committed_blocks( - committed_blocks: Vec<(B256, u64, CachedBlock)>, - ) -> Vec<(B256, u64, CachedBlock)> { + committed_blocks: Vec<(B256, u32, CachedBlock)>, + ) -> Vec<(B256, u32, CachedBlock)> { committed_blocks .into_iter() .sorted_by_key(|(_, _, block)| (block.block.number, block.block.hash)) diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index 63b65f71e4b8..e921bdac862b 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -28,11 +28,11 @@ impl Storage { Ok(Self { path: path.as_ref().to_path_buf() }) } - fn file_path(&self, id: u64) -> PathBuf { + fn file_path(&self, id: u32) -> PathBuf { self.path.join(format!("{id}.wal")) } - fn parse_filename(filename: &str) -> eyre::Result { + fn parse_filename(filename: &str) -> eyre::Result { filename .strip_suffix(".wal") .and_then(|s| s.parse().ok()) @@ -41,7 +41,7 @@ impl Storage { /// Removes notification for the given file ID from the storage. #[instrument(target = "exex::wal::storage", skip(self))] - fn remove_notification(&self, file_id: u64) -> bool { + fn remove_notification(&self, file_id: u32) -> bool { match reth_fs_util::remove_file(self.file_path(file_id)) { Ok(()) => { debug!("Notification was removed from the storage"); @@ -57,7 +57,7 @@ impl Storage { /// Returns the range of file IDs in the storage. /// /// If there are no files in the storage, returns `None`. - pub(super) fn files_range(&self) -> eyre::Result>> { + pub(super) fn files_range(&self) -> eyre::Result>> { let mut min_id = None; let mut max_id = None; @@ -66,8 +66,8 @@ impl Storage { let file_name = entry.file_name(); let file_id = Self::parse_filename(&file_name.to_string_lossy())?; - min_id = min_id.map_or(Some(file_id), |min_id: u64| Some(min_id.min(file_id))); - max_id = max_id.map_or(Some(file_id), |max_id: u64| Some(max_id.max(file_id))); + min_id = min_id.map_or(Some(file_id), |min_id: u32| Some(min_id.min(file_id))); + max_id = max_id.map_or(Some(file_id), |max_id: u32| Some(max_id.max(file_id))); } Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id)) @@ -80,7 +80,7 @@ impl Storage { /// Number of removed notifications. pub(super) fn remove_notifications( &self, - file_ids: impl IntoIterator, + file_ids: impl IntoIterator, ) -> eyre::Result { let mut deleted = 0; @@ -95,8 +95,8 @@ impl Storage { pub(super) fn iter_notifications( &self, - range: RangeInclusive, - ) -> impl Iterator> + '_ { + range: RangeInclusive, + ) -> impl Iterator> + '_ { range.map(move |id| { let notification = self.read_notification(id)?.ok_or_eyre("notification not found")?; @@ -106,7 +106,7 @@ impl Storage { /// Reads the notification from the file with the given ID. #[instrument(target = "exex::wal::storage", skip(self))] - pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result> { + pub(super) fn read_notification(&self, file_id: u32) -> eyre::Result> { let file_path = self.file_path(file_id); debug!(?file_path, "Reading notification from WAL"); @@ -127,7 +127,7 @@ impl Storage { #[instrument(target = "exex::wal::storage", skip(self, notification))] pub(super) fn write_notification( &self, - file_id: u64, + file_id: u32, notification: &ExExNotification, ) -> eyre::Result<()> { let file_path = self.file_path(file_id);