diff --git a/Cargo.lock b/Cargo.lock index dfec793b6e00..97514fd599c6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7295,6 +7295,7 @@ dependencies = [ "reth-evm", "reth-evm-ethereum", "reth-exex-types", + "reth-fs-util", "reth-metrics", "reth-node-api", "reth-node-core", @@ -7309,8 +7310,11 @@ dependencies = [ "reth-testing-utils", "reth-tracing", "secp256k1", + "serde_json", + "tempfile", "tokio", "tokio-util", + "tracing", ] [[package]] diff --git a/clippy.toml b/clippy.toml index b498158094f9..cdfa4bc93a21 100644 --- a/clippy.toml +++ b/clippy.toml @@ -1,3 +1,3 @@ msrv = "1.81" too-large-for-stack = 128 -doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"] +doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "WAL", "MessagePack"] diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 4e082c4573e8..5b04974dd5a1 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -16,7 +16,8 @@ workspace = true reth-chainspec.workspace = true reth-config.workspace = true reth-evm.workspace = true -reth-exex-types.workspace = true +reth-exex-types = { workspace = true, features = ["serde"] } +reth-fs-util.workspace = true reth-metrics.workspace = true reth-node-api.workspace = true reth-node-core.workspace = true @@ -38,6 +39,8 @@ tokio.workspace = true ## misc eyre.workspace = true metrics.workspace = true +serde_json.workspace = true +tracing.workspace = true [dev-dependencies] reth-blockchain-tree.workspace = true @@ -50,6 +53,7 @@ reth-provider = { workspace = true, features = ["test-utils"] } reth-testing-utils.workspace = true secp256k1.workspace = true +tempfile.workspace = true [features] default = [] diff --git a/crates/exex/exex/src/lib.rs b/crates/exex/exex/src/lib.rs index 434036bccb3f..d54bc3d9f3cf 100644 --- a/crates/exex/exex/src/lib.rs +++ b/crates/exex/exex/src/lib.rs @@ -46,6 +46,8 @@ pub use event::*; mod manager; pub use manager::*; +mod wal; + // Re-export exex types #[doc(inline)] pub use reth_exex_types::*; diff --git a/crates/exex/exex/src/wal/cache.rs b/crates/exex/exex/src/wal/cache.rs new file mode 100644 index 000000000000..3e26fdcf4ca2 --- /dev/null +++ b/crates/exex/exex/src/wal/cache.rs @@ -0,0 +1,137 @@ +use std::collections::{BTreeMap, VecDeque}; + +use reth_exex_types::ExExNotification; +use reth_primitives::BlockNumHash; + +/// The block cache of the WAL. Acts as a mapping of `File ID -> List of Blocks`. +/// +/// For each notification written to the WAL, there will be an entry per block written to +/// the cache with the same file ID. I.e. for each notification, there may be multiple blocks in the +/// cache. +/// +/// This cache is needed to avoid walking the WAL directory every time we want to find a +/// notification corresponding to a block. +#[derive(Debug)] +pub(super) struct BlockCache(BTreeMap>); + +impl BlockCache { + /// Creates a new instance of [`BlockCache`]. + pub(super) const fn new() -> Self { + Self(BTreeMap::new()) + } + + /// Returns `true` if the cache is empty. + pub(super) fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Returns a front-to-back iterator. + pub(super) fn iter(&self) -> impl Iterator + '_ { + self.0.iter().flat_map(|(k, v)| v.iter().map(move |b| (*k, *b))) + } + + /// Provides a reference to the first block from the cache, or `None` if the cache is + /// empty. + pub(super) fn front(&self) -> Option<(u64, CachedBlock)> { + self.0.first_key_value().and_then(|(k, v)| v.front().map(|b| (*k, *b))) + } + + /// Provides a reference to the last block from the cache, or `None` if the cache is + /// empty. + pub(super) fn back(&self) -> Option<(u64, CachedBlock)> { + self.0.last_key_value().and_then(|(k, v)| v.back().map(|b| (*k, *b))) + } + + /// Removes the notification with the given file ID. + pub(super) fn remove_notification(&mut self, key: u64) -> Option> { + self.0.remove(&key) + } + + /// Pops the first block from the cache. If it resulted in the whole file entry being empty, + /// it will also remove the file entry. + pub(super) fn pop_front(&mut self) -> Option<(u64, CachedBlock)> { + let first_entry = self.0.first_entry()?; + let key = *first_entry.key(); + let blocks = first_entry.into_mut(); + let first_block = blocks.pop_front().unwrap(); + if blocks.is_empty() { + self.0.remove(&key); + } + + Some((key, first_block)) + } + + /// Pops the last block from the cache. If it resulted in the whole file entry being empty, + /// it will also remove the file entry. + pub(super) fn pop_back(&mut self) -> Option<(u64, CachedBlock)> { + let last_entry = self.0.last_entry()?; + let key = *last_entry.key(); + let blocks = last_entry.into_mut(); + let last_block = blocks.pop_back().unwrap(); + if blocks.is_empty() { + self.0.remove(&key); + } + + Some((key, last_block)) + } + + /// Appends a block to the back of the specified file entry. + pub(super) fn insert(&mut self, file_id: u64, block: CachedBlock) { + self.0.entry(file_id).or_default().push_back(block); + } + + /// Inserts the blocks from the notification into the cache with the given file ID. + /// + /// First, inserts the reverted blocks (if any), then the committed blocks (if any). + pub(super) fn insert_notification_blocks_with_file_id( + &mut self, + file_id: u64, + notification: &ExExNotification, + ) { + let reverted_chain = notification.reverted_chain(); + let committed_chain = notification.committed_chain(); + + if let Some(reverted_chain) = reverted_chain { + for block in reverted_chain.blocks().values() { + self.insert( + file_id, + CachedBlock { + action: CachedBlockAction::Revert, + block: (block.number, block.hash()).into(), + }, + ); + } + } + + if let Some(committed_chain) = committed_chain { + for block in committed_chain.blocks().values() { + self.insert( + file_id, + CachedBlock { + action: CachedBlockAction::Commit, + block: (block.number, block.hash()).into(), + }, + ); + } + } + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) struct CachedBlock { + pub(super) action: CachedBlockAction, + /// The block number and hash of the block. + pub(super) block: BlockNumHash, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum CachedBlockAction { + Commit, + Revert, +} + +impl CachedBlockAction { + pub(super) const fn is_commit(&self) -> bool { + matches!(self, Self::Commit) + } +} diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs new file mode 100644 index 000000000000..a5e0188ca593 --- /dev/null +++ b/crates/exex/exex/src/wal/mod.rs @@ -0,0 +1,492 @@ +#![allow(dead_code)] + +mod cache; +mod storage; + +use std::path::Path; + +use cache::BlockCache; +use reth_exex_types::ExExNotification; +use reth_primitives::BlockNumHash; +use reth_tracing::tracing::{debug, instrument}; +use storage::Storage; + +/// WAL is a write-ahead log (WAL) that stores the notifications sent to a particular ExEx. +/// +/// WAL is backed by a directory of binary files represented by [`Storage`] and a block cache +/// represented by [`BlockCache`]. The role of the block cache is to avoid walking the WAL directory +/// and decoding notifications every time we want to rollback/finalize the WAL. +/// +/// The expected mode of operation is as follows: +/// 1. On every new canonical chain notification, call [`Wal::commit`]. +/// 2. When ExEx is on a wrong fork, rollback the WAL using [`Wal::rollback`]. The caller is +/// expected to create reverts from the removed notifications and backfill the blocks between the +/// returned block and the given rollback block. After that, commit new notifications as usual +/// with [`Wal::commit`]. +/// 3. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the +/// WAL. +#[derive(Debug)] +pub(crate) struct Wal { + /// The underlying WAL storage backed by a file. + storage: Storage, + /// WAL block cache. See [`cache::BlockCache`] docs for more details. + block_cache: BlockCache, +} + +impl Wal { + /// Creates a new instance of [`Wal`]. + pub(crate) fn new(directory: impl AsRef) -> eyre::Result { + let mut wal = Self { storage: Storage::new(directory)?, block_cache: BlockCache::new() }; + wal.fill_block_cache()?; + Ok(wal) + } + + /// Fills the block cache with the notifications from the storage. + #[instrument(target = "exex::wal", skip(self))] + fn fill_block_cache(&mut self) -> eyre::Result<()> { + let Some(files_range) = self.storage.files_range()? else { return Ok(()) }; + + for entry in self.storage.iter_notifications(files_range) { + let (file_id, notification) = entry?; + + let committed_chain = notification.committed_chain(); + let reverted_chain = notification.reverted_chain(); + + debug!( + target: "exex::wal", + ?file_id, + reverted_block_range = ?reverted_chain.as_ref().map(|chain| chain.range()), + committed_block_range = ?committed_chain.as_ref().map(|chain| chain.range()), + "Inserting block cache entries" + ); + + self.block_cache.insert_notification_blocks_with_file_id(file_id, ¬ification); + } + + Ok(()) + } + + /// Commits the notification to WAL. + #[instrument(target = "exex::wal", skip_all, fields( + reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()), + committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range()) + ))] + pub(crate) fn commit(&mut self, notification: &ExExNotification) -> eyre::Result<()> { + debug!("Writing notification to WAL"); + let file_id = self.block_cache.back().map_or(0, |block| block.0 + 1); + self.storage.write_notification(file_id, notification)?; + + debug!(?file_id, "Inserting notification blocks into the block cache"); + self.block_cache.insert_notification_blocks_with_file_id(file_id, notification); + + Ok(()) + } + + /// Rollbacks the WAL to the given block, inclusive. + /// + /// 1. Walks the WAL from the end and searches for the first notification where committed chain + /// contains a block with the same number and hash as `to_block`. + /// 2. If the notification is found, truncates the WAL. It means that if the found notification + /// contains both given block and blocks before it, the whole notification will be truncated. + /// + /// # Returns + /// + /// 1. The block number and hash of the lowest removed block. + /// 2. The notifications that were removed. + #[instrument(target = "exex::wal", skip(self))] + pub(crate) fn rollback( + &mut self, + to_block: BlockNumHash, + ) -> eyre::Result)>> { + // First, pop items from the back of the cache until we find the notification with the + // specified block. When found, save the file ID of that notification. + let mut remove_from_file_id = None; + let mut remove_to_file_id = None; + let mut lowest_removed_block = None; + while let Some((file_id, block)) = self.block_cache.pop_back() { + debug!(?file_id, ?block, "Popped back block from the block cache"); + if block.action.is_commit() && block.block.number == to_block.number { + debug!( + ?file_id, + ?block, + ?remove_from_file_id, + ?lowest_removed_block, + "Found the requested block" + ); + + if block.block.hash != to_block.hash { + eyre::bail!("block hash mismatch in WAL") + } + + remove_from_file_id = Some(file_id); + + let notification = self.storage.read_notification(file_id)?; + lowest_removed_block = notification + .committed_chain() + .as_ref() + .map(|chain| chain.first()) + .map(|block| (block.number, block.hash()).into()); + + break + } + + remove_from_file_id = Some(file_id); + remove_to_file_id.get_or_insert(file_id); + } + + // If the specified block is still not found, we can't do anything and just return. The + // cache was empty. + let Some((remove_from_file_id, remove_to_file_id)) = + remove_from_file_id.zip(remove_to_file_id) + else { + debug!("No blocks were rolled back"); + return Ok(None) + }; + + // Remove the rest of the block cache entries for the file ID that we found. + self.block_cache.remove_notification(remove_from_file_id); + debug!(?remove_from_file_id, "Block cache was rolled back"); + + // Remove notifications from the storage. + let removed_notifications = + self.storage.take_notifications(remove_from_file_id..=remove_to_file_id)?; + debug!(removed_notifications = ?removed_notifications.len(), "Storage was rolled back"); + + Ok(Some((lowest_removed_block.expect("qed"), removed_notifications))) + } + + /// Finalizes the WAL to the given block, inclusive. + /// + /// 1. Finds a notification with first unfinalized block (first notification containing a + /// committed block higher than `to_block`). + /// 2. Removes the notifications from the beginning of WAL until the found notification. If this + /// notification includes both finalized and non-finalized blocks, it will not be removed. + #[instrument(target = "exex::wal", skip(self))] + pub(crate) fn finalize(&mut self, to_block: BlockNumHash) -> eyre::Result<()> { + // First, walk cache to find the file ID of the notification with the finalized block and + // save the file ID with the last unfinalized block. Do not remove any notifications + // yet. + let mut unfinalized_from_file_id = None; + { + let mut block_cache = self.block_cache.iter().peekable(); + while let Some((file_id, block)) = block_cache.next() { + debug!(?file_id, ?block, "Iterating over the block cache"); + if block.action.is_commit() && + block.block.number == to_block.number && + block.block.hash == to_block.hash + { + let notification = self.storage.read_notification(file_id)?; + if notification.committed_chain().unwrap().blocks().len() == 1 { + unfinalized_from_file_id = block_cache.peek().map(|(file_id, _)| *file_id); + } else { + unfinalized_from_file_id = Some(file_id); + } + + debug!( + ?file_id, + ?block, + ?unfinalized_from_file_id, + "Found the finalized block in the block cache" + ); + break + } + + unfinalized_from_file_id = Some(file_id); + } + } + + // If the finalized block is still not found, we can't do anything and just return. + let Some(remove_to_file_id) = unfinalized_from_file_id else { + debug!("Could not find the finalized block in WAL"); + return Ok(()) + }; + + // Remove notifications from the storage from the beginning up to the unfinalized block, not + // inclusive. + let (mut file_range_start, mut file_range_end) = (None, None); + while let Some((file_id, _)) = self.block_cache.front() { + if file_id == remove_to_file_id { + break + } + self.block_cache.pop_front(); + + file_range_start.get_or_insert(file_id); + file_range_end = Some(file_id); + } + debug!(?remove_to_file_id, "Block cache was finalized"); + + // Remove notifications from the storage. + if let Some((file_range_start, file_range_end)) = file_range_start.zip(file_range_end) { + let removed_notifications = + self.storage.remove_notifications(file_range_start..=file_range_end)?; + debug!(?removed_notifications, "Storage was finalized"); + } else { + debug!("No notifications were finalized from the storage"); + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use eyre::OptionExt; + use reth_exex_types::ExExNotification; + use reth_provider::Chain; + use reth_testing_utils::generators::{ + self, random_block, random_block_range, BlockParams, BlockRangeParams, + }; + + use crate::wal::{ + cache::{CachedBlock, CachedBlockAction}, + Wal, + }; + + fn read_notifications(wal: &Wal) -> eyre::Result> { + let Some(files_range) = wal.storage.files_range()? else { return Ok(Vec::new()) }; + + wal.storage + .iter_notifications(files_range) + .map(|entry| Ok(entry?.1)) + .collect::>() + } + + #[test] + fn test_wal() -> eyre::Result<()> { + reth_tracing::init_test_tracing(); + + let mut rng = generators::rng(); + + // Create an instance of the WAL in a temporary directory + let temp_dir = tempfile::tempdir()?; + let mut wal = Wal::new(&temp_dir)?; + assert!(wal.block_cache.is_empty()); + + // Create 4 canonical blocks and one reorged block with number 2 + let blocks = random_block_range(&mut rng, 0..=3, BlockRangeParams::default()) + .into_iter() + .map(|block| block.seal_with_senders().ok_or_eyre("failed to recover senders")) + .collect::>>()?; + let block_1_reorged = random_block( + &mut rng, + 1, + BlockParams { parent: Some(blocks[0].hash()), ..Default::default() }, + ) + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?; + let block_2_reorged = random_block( + &mut rng, + 2, + BlockParams { parent: Some(blocks[1].hash()), ..Default::default() }, + ) + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?; + + // Create notifications for the above blocks. + // 1. Committed notification for blocks with number 0 and 1 + // 2. Reverted notification for block with number 1 + // 3. Committed notification for block with number 1 and 2 + // 4. Reorged notification for block with number 2 that was reverted, and blocks with number + // 2 and 3 that were committed + let committed_notification_1 = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![blocks[0].clone(), blocks[1].clone()], + Default::default(), + None, + )), + }; + let reverted_notification = ExExNotification::ChainReverted { + old: Arc::new(Chain::new(vec![blocks[1].clone()], Default::default(), None)), + }; + let committed_notification_2 = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![block_1_reorged.clone(), blocks[2].clone()], + Default::default(), + None, + )), + }; + let reorged_notification = ExExNotification::ChainReorged { + old: Arc::new(Chain::new(vec![blocks[2].clone()], Default::default(), None)), + new: Arc::new(Chain::new( + vec![block_2_reorged.clone(), blocks[3].clone()], + Default::default(), + None, + )), + }; + + // Commit notifications, verify that the block cache is updated and the notifications are + // written to WAL. + + // First notification (commit block 0, 1) + let file_id = 0; + let committed_notification_1_cache = vec![ + ( + file_id, + CachedBlock { + action: CachedBlockAction::Commit, + block: (blocks[0].number, blocks[0].hash()).into(), + }, + ), + ( + file_id, + CachedBlock { + action: CachedBlockAction::Commit, + block: (blocks[1].number, blocks[1].hash()).into(), + }, + ), + ]; + wal.commit(&committed_notification_1)?; + assert_eq!(wal.block_cache.iter().collect::>(), committed_notification_1_cache); + assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]); + + // Second notification (revert block 1) + wal.commit(&reverted_notification)?; + let file_id = 1; + let reverted_notification_cache = vec![( + file_id, + CachedBlock { + action: CachedBlockAction::Revert, + block: (blocks[1].number, blocks[1].hash()).into(), + }, + )]; + assert_eq!( + wal.block_cache.iter().collect::>(), + [committed_notification_1_cache.clone(), reverted_notification_cache.clone()].concat() + ); + assert_eq!( + read_notifications(&wal)?, + vec![committed_notification_1.clone(), reverted_notification.clone()] + ); + + // Now, rollback to block 1 and verify that both the block cache and the storage are + // empty. We expect the rollback to delete the first notification (commit block 0, 1), + // because we can't delete blocks partly from the notification, and also the second + // notification (revert block 1). Additionally, check that the block that the rolled + // back to is the block with number 0. + let rollback_result = wal.rollback((blocks[1].number, blocks[1].hash()).into())?; + assert_eq!(wal.block_cache.iter().collect::>(), vec![]); + assert_eq!(read_notifications(&wal)?, vec![]); + assert_eq!( + rollback_result, + Some(( + (blocks[0].number, blocks[0].hash()).into(), + vec![committed_notification_1.clone(), reverted_notification.clone()] + )) + ); + + // Commit notifications 1 and 2 again + wal.commit(&committed_notification_1)?; + assert_eq!( + wal.block_cache.iter().collect::>(), + [committed_notification_1_cache.clone()].concat() + ); + assert_eq!(read_notifications(&wal)?, vec![committed_notification_1.clone()]); + wal.commit(&reverted_notification)?; + assert_eq!( + wal.block_cache.iter().collect::>(), + [committed_notification_1_cache.clone(), reverted_notification_cache.clone()].concat() + ); + assert_eq!( + read_notifications(&wal)?, + vec![committed_notification_1.clone(), reverted_notification.clone()] + ); + + // Third notification (commit block 1, 2) + wal.commit(&committed_notification_2)?; + let file_id = 2; + let committed_notification_2_cache = vec![ + ( + file_id, + CachedBlock { + action: CachedBlockAction::Commit, + block: (block_1_reorged.number, block_1_reorged.hash()).into(), + }, + ), + ( + file_id, + CachedBlock { + action: CachedBlockAction::Commit, + block: (blocks[2].number, blocks[2].hash()).into(), + }, + ), + ]; + assert_eq!( + wal.block_cache.iter().collect::>(), + [ + committed_notification_1_cache.clone(), + reverted_notification_cache.clone(), + committed_notification_2_cache.clone() + ] + .concat() + ); + assert_eq!( + read_notifications(&wal)?, + vec![ + committed_notification_1.clone(), + reverted_notification.clone(), + committed_notification_2.clone() + ] + ); + + // Fourth notification (revert block 2, commit block 2, 3) + wal.commit(&reorged_notification)?; + let file_id = 3; + let reorged_notification_cache = vec![ + ( + file_id, + CachedBlock { + action: CachedBlockAction::Revert, + block: (blocks[2].number, blocks[2].hash()).into(), + }, + ), + ( + file_id, + CachedBlock { + action: CachedBlockAction::Commit, + block: (block_2_reorged.number, block_2_reorged.hash()).into(), + }, + ), + ( + file_id, + CachedBlock { + action: CachedBlockAction::Commit, + block: (blocks[3].number, blocks[3].hash()).into(), + }, + ), + ]; + assert_eq!( + wal.block_cache.iter().collect::>(), + [ + committed_notification_1_cache, + reverted_notification_cache, + committed_notification_2_cache.clone(), + reorged_notification_cache.clone() + ] + .concat() + ); + assert_eq!( + read_notifications(&wal)?, + vec![ + committed_notification_1.clone(), + reverted_notification.clone(), + committed_notification_2.clone(), + reorged_notification.clone() + ] + ); + + // Now, finalize the WAL up to the block 1. Block 1 was in the third notification that also + // had block 2 committed. In this case, we can't split the notification into two parts, so + // we preserve the whole notification in both the block cache and the storage, and delete + // the notifications before it. + wal.finalize((block_1_reorged.number, block_1_reorged.hash()).into())?; + assert_eq!( + wal.block_cache.iter().collect::>(), + [committed_notification_2_cache, reorged_notification_cache].concat() + ); + assert_eq!(read_notifications(&wal)?, vec![committed_notification_2, reorged_notification]); + + Ok(()) + } +} diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs new file mode 100644 index 000000000000..283b303a346f --- /dev/null +++ b/crates/exex/exex/src/wal/storage.rs @@ -0,0 +1,186 @@ +use std::{ + fs::File, + io::{Read, Write}, + ops::RangeInclusive, + path::{Path, PathBuf}, +}; + +use eyre::OptionExt; +use reth_exex_types::ExExNotification; +use reth_tracing::tracing::debug; +use tracing::instrument; + +/// The underlying WAL storage backed by a directory of files. +/// +/// Each notification is represented by a single file that contains a MessagePack-encoded +/// notification. +#[derive(Debug)] +pub(super) struct Storage { + /// The path to the WAL file. + path: PathBuf, +} + +impl Storage { + /// Creates a new instance of [`Storage`] backed by the file at the given path and creates + /// it doesn't exist. + pub(super) fn new(path: impl AsRef) -> eyre::Result { + reth_fs_util::create_dir_all(&path)?; + + Ok(Self { path: path.as_ref().to_path_buf() }) + } + + fn file_path(&self, id: u64) -> PathBuf { + self.path.join(format!("{id}.wal")) + } + + fn parse_filename(filename: &str) -> eyre::Result { + filename + .strip_suffix(".wal") + .and_then(|s| s.parse().ok()) + .ok_or_eyre(format!("failed to parse file name: {filename}")) + } + + /// Removes notification for the given file ID from the storage. + #[instrument(target = "exex::wal::storage", skip(self))] + fn remove_notification(&self, file_id: u64) { + match reth_fs_util::remove_file(self.file_path(file_id)) { + Ok(()) => debug!("Notification was removed from the storage"), + Err(err) => debug!(?err, "Failed to remove notification from the storage"), + } + } + + /// Returns the range of file IDs in the storage. + /// + /// If there are no files in the storage, returns `None`. + pub(super) fn files_range(&self) -> eyre::Result>> { + let mut min_id = None; + let mut max_id = None; + + for entry in reth_fs_util::read_dir(&self.path)? { + let entry = entry?; + let file_name = entry.file_name(); + let file_id = Self::parse_filename(&file_name.to_string_lossy())?; + + min_id = min_id.map_or(Some(file_id), |min_id: u64| Some(min_id.min(file_id))); + max_id = max_id.map_or(Some(file_id), |max_id: u64| Some(max_id.max(file_id))); + } + + Ok(min_id.zip(max_id).map(|(min_id, max_id)| min_id..=max_id)) + } + + /// Removes notifications from the storage according to the given range. + /// + /// # Returns + /// + /// Number of removed notifications. + pub(super) fn remove_notifications(&self, range: RangeInclusive) -> eyre::Result { + for id in range.clone() { + self.remove_notification(id); + } + + Ok(range.count()) + } + + /// Removes notifications from the storage according to the given range. + /// + /// # Returns + /// + /// Notifications that were removed. + pub(super) fn take_notifications( + &self, + range: RangeInclusive, + ) -> eyre::Result> { + let notifications = self.iter_notifications(range).collect::>>()?; + + for (id, _) in ¬ifications { + self.remove_notification(*id); + } + + Ok(notifications.into_iter().map(|(_, notification)| notification).collect()) + } + + pub(super) fn iter_notifications( + &self, + range: RangeInclusive, + ) -> impl Iterator> + '_ { + range.map(move |id| self.read_notification(id).map(|notification| (id, notification))) + } + + /// Reads the notification from the file with the given id. + pub(super) fn read_notification(&self, file_id: u64) -> eyre::Result { + debug!(?file_id, "Reading notification from WAL"); + + let file_path = self.file_path(file_id); + let mut file = File::open(&file_path)?; + read_notification(&mut file) + } + + /// Writes the notification to the file with the given id. + pub(super) fn write_notification( + &self, + file_id: u64, + notification: &ExExNotification, + ) -> eyre::Result<()> { + debug!(?file_id, "Writing notification to WAL"); + + let file_path = self.file_path(file_id); + let mut file = File::create_new(&file_path)?; + write_notification(&mut file, notification)?; + + Ok(()) + } +} + +// TODO(alexey): use rmp-serde when Alloy and Reth serde issues are resolved + +fn write_notification(mut w: &mut impl Write, notification: &ExExNotification) -> eyre::Result<()> { + // rmp_serde::encode::write(w, notification)?; + serde_json::to_writer(&mut w, notification)?; + w.flush()?; + Ok(()) +} + +fn read_notification(r: &mut impl Read) -> eyre::Result { + // Ok(rmp_serde::from_read(r)?) + Ok(serde_json::from_reader(r)?) +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use eyre::OptionExt; + use reth_exex_types::ExExNotification; + use reth_provider::Chain; + use reth_testing_utils::generators::{self, random_block}; + + use super::Storage; + + #[test] + fn test_roundtrip() -> eyre::Result<()> { + let mut rng = generators::rng(); + + let temp_dir = tempfile::tempdir()?; + let storage = Storage::new(&temp_dir)?; + + let old_block = random_block(&mut rng, 0, Default::default()) + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?; + let new_block = random_block(&mut rng, 0, Default::default()) + .seal_with_senders() + .ok_or_eyre("failed to recover senders")?; + + let notification = ExExNotification::ChainReorged { + new: Arc::new(Chain::new(vec![new_block], Default::default(), None)), + old: Arc::new(Chain::new(vec![old_block], Default::default(), None)), + }; + + // Do a round trip serialization and deserialization + let file_id = 0; + storage.write_notification(file_id, ¬ification)?; + let deserialized_notification = storage.read_notification(file_id)?; + assert_eq!(deserialized_notification, notification); + + Ok(()) + } +} diff --git a/testing/testing-utils/src/generators.rs b/testing/testing-utils/src/generators.rs index 334df3ef315a..6451197aa006 100644 --- a/testing/testing-utils/src/generators.rs +++ b/testing/testing-utils/src/generators.rs @@ -36,7 +36,7 @@ pub struct BlockParams { } /// Used to pass arguments for random block generation function in tests -#[derive(Debug, Default)] +#[derive(Debug)] pub struct BlockRangeParams { /// The parent hash of the block. pub parent: Option, @@ -50,6 +50,17 @@ pub struct BlockRangeParams { pub withdrawals_count: Option>, } +impl Default for BlockRangeParams { + fn default() -> Self { + Self { + parent: None, + tx_count: 0..u8::MAX / 2, + requests_count: None, + withdrawals_count: None, + } + } +} + /// Returns a random number generator that can be seeded using the `SEED` environment variable. /// /// If `SEED` is not set, a random seed is used.