From 0cc175338e750b1085b0b69b9d98179bc1368dbc Mon Sep 17 00:00:00 2001 From: Alexey Shekhirin Date: Fri, 20 Sep 2024 13:33:19 +0100 Subject: [PATCH] return blocks on revert --- clippy.toml | 2 +- crates/exex/exex/src/wal/cache.rs | 8 +++++ crates/exex/exex/src/wal/mod.rs | 54 ++++++++++++++++++----------- crates/exex/exex/src/wal/storage.rs | 38 ++++++++++++++++++-- 4 files changed, 78 insertions(+), 24 deletions(-) diff --git a/clippy.toml b/clippy.toml index b498158094f9b..cdfa4bc93a21e 100644 --- a/clippy.toml +++ b/clippy.toml @@ -1,3 +1,3 @@ msrv = "1.81" too-large-for-stack = 128 -doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB"] +doc-valid-idents = ["P2P", "ExEx", "ExExes", "IPv4", "IPv6", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "WAL", "MessagePack"] diff --git a/crates/exex/exex/src/wal/cache.rs b/crates/exex/exex/src/wal/cache.rs index 7943dad7d7d4e..cf8e1fe77072b 100644 --- a/crates/exex/exex/src/wal/cache.rs +++ b/crates/exex/exex/src/wal/cache.rs @@ -3,6 +3,14 @@ use std::collections::VecDeque; use reth_exex_types::ExExNotification; use reth_primitives::BlockNumHash; +/// The block cache of the WAL. Acts as a FIFO queue with a specified maximum size. +/// +/// For each notification written to the WAL, there will be an entry per block written to +/// the cache with the same file offset as the notification in the file. I.e. for each +/// notification, there may be multiple blocks in the cache. +/// +/// This cache is needed to avoid walking the file every time we want to find a notification +/// corresponding to a block. #[derive(Debug)] pub(super) struct BlockCache { deque: VecDeque, diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index 6c44716d76dc9..ba51358ac9f39 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -18,19 +18,24 @@ use storage::Storage; /// = 48 megabytes. const MAX_CACHED_BLOCKS: usize = 1_000_000; +/// WAL is a write-ahead log (WAL) that stores the notifications sent to a particular ExEx. +/// +/// WAL is backed by a binary file represented by [`Storage`] and a block cache represented by +/// [`BlockCache`]. +/// +/// The expected mode of operation is as follows: +/// 1. On every new canonical chain notification, call [`Wal::commit`]. +/// 2. When ExEx is on a wrong fork, rollback the WAL using [`Wal::rollback`]. The caller is +/// expected to create reverts from the removed notifications and backfill the blocks between the +/// returned block and the given rollback block. After that, commit new notifications as usual +/// with [`Wal::commit`]. +/// 3. When the chain is finalized, call [`Wal::finalize`] to prevent the infinite growth of the +/// WAL. #[derive(Debug)] pub(crate) struct Wal { /// The underlying WAL storage backed by a file. storage: Storage, - /// The block cache of the WAL. Acts as a FIFO queue with a maximum size of - /// [`MAX_CACHED_BLOCKS`]. - /// - /// For each notification written to the WAL, there will be an entry per block written to - /// the cache with the same file offset as the notification in the [`Storage`]. I.e. for each - /// notification, there may be multiple blocks in the cache. - /// - /// This cache is needed only for convenience, so we can avoid walking the [`Storage`] every - /// time we want to find a notification corresponding to a block. + /// WAL block cache. See [`cache::BlockCache`] docs for more details. block_cache: BlockCache, } @@ -121,13 +126,13 @@ impl Wal { /// /// # Returns /// - /// The block number and hash of the lowest removed block. The caller is expected to backfill - /// the blocks between the returned block and the given `to_block`, if there's any. + /// 1. The block number and hash of the lowest removed block. + /// 2. The notifications that were removed. #[instrument(target = "exex::wal", skip(self))] pub(crate) fn rollback( &mut self, to_block: BlockNumHash, - ) -> eyre::Result> { + ) -> eyre::Result)>> { let mut truncate_to = None; let mut lowest_removed_block = None; loop { @@ -173,15 +178,17 @@ impl Wal { truncate_to = Some(block.file_offset); } - if let Some(truncate_to) = truncate_to { - self.storage.truncate_to_offset(truncate_to)?; + let result = if let Some(truncate_to) = truncate_to { + let removed_notifications = self.storage.truncate_to_offset(truncate_to)?; debug!(?truncate_to, "Truncated the storage"); + Some((lowest_removed_block.expect("qed"), removed_notifications)) } else { debug!("No blocks were truncated. Block cache was filled."); - } + None + }; self.fill_block_cache(u64::MAX)?; - Ok(lowest_removed_block) + Ok(result) } /// Finalizes the WAL to the given block, inclusive. @@ -388,12 +395,19 @@ mod tests { // Now, rollback to block 1 and verify that both the block cache and the storage are // empty. We expect the rollback to delete the first notification (commit block 0, 1), - // because we can't delete blocks partly from the notification. Additionally, check that - // the block that the rolled back to is the block with number 0. - let rolled_back_to = wal.rollback((blocks[1].number, blocks[1].hash()).into())?; + // because we can't delete blocks partly from the notification, and also the second + // notification (revert block 1). Additionally, check that the block that the rolled + // back to is the block with number 0. + let rollback_result = wal.rollback((blocks[1].number, blocks[1].hash()).into())?; assert_eq!(wal.block_cache.iter().copied().collect::>(), vec![]); assert_eq!(wal.storage.bytes_len()?, 0); - assert_eq!(rolled_back_to, Some((blocks[0].number, blocks[0].hash()).into())); + assert_eq!( + rollback_result, + Some(( + (blocks[0].number, blocks[0].hash()).into(), + vec![committed_notification_1.clone(), reverted_notification.clone()] + )) + ); // Commit notifications 1 and 2 again wal.commit(&committed_notification_1)?; diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index 043b8ec7e9df6..70abd26c793fa 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -7,6 +7,15 @@ use std::{ use reth_exex_types::ExExNotification; +/// The underlying WAL storage backed by a file. +/// +/// Each notification is written without any delimiters and structed as follows: +/// ```text +/// +--------------------------+----------------------------------+ +/// | little endian u32 length | MessagePack-encoded notification | +/// +--------------------------+----------------------------------+ +/// ``` +/// The length is the length of the MessagePack-encoded notification in bytes. #[derive(Debug)] pub(super) struct Storage { /// The path to the WAL file. @@ -73,9 +82,23 @@ impl Storage { } /// Truncates the underlying file to the given byte offset (exclusive). - pub(super) fn truncate_to_offset(&self, to_bytes_len: u64) -> eyre::Result<()> { + /// + /// # Returns + /// + /// Notifications that were removed. + pub(super) fn truncate_to_offset( + &mut self, + to_bytes_len: u64, + ) -> eyre::Result> { + let mut removed_notifications = Vec::new(); + self.for_each_notification_from_offset(to_bytes_len, |_, notification| { + removed_notifications.push(notification); + Ok(ControlFlow::Continue(())) + })?; + self.file.set_len(to_bytes_len)?; - Ok(()) + + Ok(removed_notifications) } /// Iterates over the notifications in the underlying file, decoding them and calling the @@ -83,9 +106,17 @@ impl Storage { /// Stops when the closure returns [`ControlFlow::Break`], or the end of the file is reached. pub(super) fn for_each_notification( &mut self, + f: impl FnMut(usize, ExExNotification) -> eyre::Result>, + ) -> eyre::Result<()> { + self.for_each_notification_from_offset(0, f) + } + + fn for_each_notification_from_offset( + &mut self, + file_offset: u64, mut f: impl FnMut(usize, ExExNotification) -> eyre::Result>, ) -> eyre::Result<()> { - self.file.seek(SeekFrom::Start(0))?; + self.file.seek(SeekFrom::Start(file_offset))?; let mut reader = BufReader::new(&self.file); loop { @@ -115,6 +146,7 @@ impl Storage { fn write_notification(w: &mut impl Write, notification: &ExExNotification) -> eyre::Result<()> { let data = rmp_serde::encode::to_vec(notification)?; + // Write the length of the notification as a u32 in little endian w.write_all(&(data.len() as u32).to_le_bytes())?; w.write_all(&data)?; w.flush()?;