diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index b99d5e57dab5..e8e24c09db02 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -115,6 +115,7 @@ impl ExExHandle { // I.e., the ExEx has already processed the notification. if finished_height.number >= new.tip().number { debug!( + target: "exex::manager", exex_id = %self.id, %notification_id, ?finished_height, @@ -135,6 +136,7 @@ impl ExExHandle { } debug!( + target: "exex::manager", exex_id = %self.id, %notification_id, "Reserving slot for notification" @@ -145,6 +147,7 @@ impl ExExHandle { } debug!( + target: "exex::manager", exex_id = %self.id, %notification_id, "Sending notification" @@ -327,7 +330,7 @@ where /// This function checks if all ExExes are on the canonical chain and finalizes the WAL if /// necessary. fn finalize_wal(&self, finalized_header: SealedHeader) -> eyre::Result<()> { - debug!(header = ?finalized_header.num_hash(), "Received finalized header"); + debug!(target: "exex::manager", header = ?finalized_header.num_hash(), "Received finalized header"); // Check if all ExExes are on the canonical chain let exex_finished_heights = self @@ -368,9 +371,13 @@ where is_canonical.not().then_some((exex_id, num_hash)) }) .format_with(", ", |(exex_id, num_hash), f| { - f(&format_args!("{exex_id:?} = {num_hash:?}")) - }); + f(&format_args!("{exex_id} = {num_hash:?}")) + }) + // We need this because `debug!` uses the argument twice when formatting the final + // log message, but the result of `format_with` can only be used once + .to_string(); debug!( + target: "exex::manager", %unfinalized_exexes, "Not all ExExes are on the canonical chain, can't finalize the WAL" ); @@ -403,7 +410,7 @@ where // Handle incoming ExEx events for exex in &mut this.exex_handles { while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) { - debug!(exex_id = %exex.id, ?event, "Received event from ExEx"); + debug!(target: "exex::manager", exex_id = %exex.id, ?event, "Received event from ExEx"); exex.metrics.events_sent_total.increment(1); match event { ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height), @@ -424,10 +431,12 @@ where while this.buffer.len() < this.max_capacity { if let Poll::Ready(Some(notification)) = this.handle_rx.poll_recv(cx) { debug!( + target: "exex::manager", committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number), reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number), "Received new notification" ); + this.wal.commit(¬ification)?; this.push_notification(notification); continue } @@ -459,7 +468,7 @@ where } // Remove processed buffered notifications - debug!(%min_id, "Updating lowest notification id in buffer"); + debug!(target: "exex::manager", %min_id, "Updating lowest notification id in buffer"); this.buffer.retain(|&(id, _)| id >= min_id); this.min_id = min_id; @@ -602,7 +611,7 @@ mod tests { use super::*; use alloy_primitives::B256; use eyre::OptionExt; - use futures::StreamExt; + use futures::{FutureExt, StreamExt}; use rand::Rng; use reth_primitives::SealedBlockWithSenders; use reth_provider::{test_utils::create_test_provider_factory, BlockWriter, Chain}; @@ -1121,7 +1130,7 @@ mod tests { } #[tokio::test] - async fn test_exex_wal_finalize() -> eyre::Result<()> { + async fn test_exex_wal() -> eyre::Result<()> { reth_tracing::init_test_tracing(); let mut rng = generators::rng(); @@ -1141,12 +1150,11 @@ mod tests { let notification = ExExNotification::ChainCommitted { new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)), }; - wal.commit(¬ification)?; let (finalized_headers_tx, rx) = watch::channel(None); let finalized_header_stream = ForkChoiceStream::new(rx); - let (exex_handle, events_tx, _) = + let (exex_handle, events_tx, mut notifications) = ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle()); let mut exex_manager = std::pin::pin!(ExExManager::new( @@ -1159,7 +1167,13 @@ mod tests { let mut cx = Context::from_waker(futures::task::noop_waker_ref()); - assert!(exex_manager.as_mut().poll(&mut cx).is_pending()); + exex_manager.handle().send(notification.clone())?; + + assert!(exex_manager.as_mut().poll(&mut cx)?.is_pending()); + assert_eq!( + notifications.next().poll_unpin(&mut cx), + Poll::Ready(Some(notification.clone())) + ); assert_eq!( exex_manager.wal.iter_notifications()?.collect::>>()?, [notification.clone()] diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs index fc71716a5ea1..116dac95422b 100644 --- a/crates/exex/exex/src/notifications.rs +++ b/crates/exex/exex/src/notifications.rs @@ -223,21 +223,19 @@ where /// - ExEx is at the same block number as the node head (`node_head.number == /// exex_head.number`). Nothing to do. fn check_backfill(&mut self) -> eyre::Result<()> { - debug!(target: "exex::manager", "Synchronizing ExEx head"); - let backfill_job_factory = BackfillJobFactory::new(self.executor.clone(), self.provider.clone()); match self.exex_head.block.number.cmp(&self.node_head.number) { std::cmp::Ordering::Less => { // ExEx is behind the node head, start backfill - debug!(target: "exex::manager", "ExEx is behind the node head and on the canonical chain, starting backfill"); + debug!(target: "exex::notifications", "ExEx is behind the node head and on the canonical chain, starting backfill"); let backfill = backfill_job_factory .backfill(self.exex_head.block.number + 1..=self.node_head.number) .into_stream(); self.backfill_job = Some(backfill); } std::cmp::Ordering::Equal => { - debug!(target: "exex::manager", "ExEx is at the node head"); + debug!(target: "exex::notifications", "ExEx is at the node head"); } std::cmp::Ordering::Greater => { return Err(eyre::eyre!("ExEx is ahead of the node head")) diff --git a/crates/exex/exex/src/wal/mod.rs b/crates/exex/exex/src/wal/mod.rs index e8c9c6bc805e..94312bd71d0b 100644 --- a/crates/exex/exex/src/wal/mod.rs +++ b/crates/exex/exex/src/wal/mod.rs @@ -96,7 +96,7 @@ impl WalInner { } /// Fills the block cache with the notifications from the storage. - #[instrument(target = "exex::wal", skip(self))] + #[instrument(skip(self))] fn fill_block_cache(&mut self) -> eyre::Result<()> { let Some(files_range) = self.storage.files_range()? else { return Ok(()) }; self.next_file_id.store(files_range.end() + 1, Ordering::Relaxed); @@ -128,7 +128,7 @@ impl WalInner { Ok(()) } - #[instrument(target = "exex::wal", skip_all, fields( + #[instrument(skip_all, fields( reverted_block_range = ?notification.reverted_chain().as_ref().map(|chain| chain.range()), committed_block_range = ?notification.committed_chain().as_ref().map(|chain| chain.range()) ))] @@ -138,7 +138,7 @@ impl WalInner { let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed); let size = self.storage.write_notification(file_id, notification)?; - debug!(?file_id, "Inserting notification blocks into the block cache"); + debug!(target: "exex::wal", ?file_id, "Inserting notification blocks into the block cache"); block_cache.insert_notification_blocks_with_file_id(file_id, notification); self.update_metrics(&block_cache, size as i64); @@ -146,19 +146,19 @@ impl WalInner { Ok(()) } - #[instrument(target = "exex::wal", skip(self))] + #[instrument(skip(self))] fn finalize(&self, to_block: BlockNumHash) -> eyre::Result<()> { let mut block_cache = self.block_cache.write(); let file_ids = block_cache.remove_before(to_block.number); // Remove notifications from the storage. if file_ids.is_empty() { - debug!("No notifications were finalized from the storage"); + debug!(target: "exex::wal", "No notifications were finalized from the storage"); return Ok(()) } let (removed_notifications, removed_size) = self.storage.remove_notifications(file_ids)?; - debug!(?removed_notifications, ?removed_size, "Storage was finalized"); + debug!(target: "exex::wal", ?removed_notifications, ?removed_size, "Storage was finalized"); self.update_metrics(&block_cache, -(removed_size as i64)); diff --git a/crates/exex/exex/src/wal/storage.rs b/crates/exex/exex/src/wal/storage.rs index 166a9bb4eb6b..af3a590e5860 100644 --- a/crates/exex/exex/src/wal/storage.rs +++ b/crates/exex/exex/src/wal/storage.rs @@ -44,18 +44,18 @@ impl Storage { /// # Returns /// /// The size of the file that was removed in bytes, if any. - #[instrument(target = "exex::wal::storage", skip(self))] + #[instrument(skip(self))] fn remove_notification(&self, file_id: u32) -> Option { let path = self.file_path(file_id); let size = path.metadata().ok()?.len(); match reth_fs_util::remove_file(self.file_path(file_id)) { Ok(()) => { - debug!("Notification was removed from the storage"); + debug!(target: "exex::wal::storage", "Notification was removed from the storage"); Some(size) } Err(err) => { - debug!(?err, "Failed to remove notification from the storage"); + debug!(target: "exex::wal::storage", ?err, "Failed to remove notification from the storage"); None } } @@ -108,31 +108,33 @@ impl Storage { ) -> impl Iterator> + '_ { range.map(move |id| { let (notification, size) = - self.read_notification(id)?.ok_or_eyre("notification not found")?; + self.read_notification(id)?.ok_or_eyre("notification {id} not found")?; Ok((id, size, notification)) }) } /// Reads the notification from the file with the given ID. - #[instrument(target = "exex::wal::storage", skip(self))] + #[instrument(skip(self))] pub(super) fn read_notification( &self, file_id: u32, ) -> eyre::Result> { let file_path = self.file_path(file_id); - debug!(?file_path, "Reading notification from WAL"); + debug!(target: "exex::wal::storage", ?file_path, "Reading notification from WAL"); let mut file = match File::open(&file_path) { Ok(file) => file, Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(None), - Err(err) => return Err(err.into()), + Err(err) => return Err(reth_fs_util::FsPathError::open(err, &file_path).into()), }; let size = file.metadata()?.len(); // Deserialize using the bincode- and msgpack-compatible serde wrapper let notification: reth_exex_types::serde_bincode_compat::ExExNotification<'_> = - rmp_serde::decode::from_read(&mut file)?; + rmp_serde::decode::from_read(&mut file).map_err(|err| { + eyre::eyre!("failed to decode notification from {file_path:?}: {err:?}") + })?; Ok(Some((notification.into(), size))) } @@ -142,14 +144,14 @@ impl Storage { /// # Returns /// /// The size of the file that was written in bytes. - #[instrument(target = "exex::wal::storage", skip(self, notification))] + #[instrument(skip(self, notification))] pub(super) fn write_notification( &self, file_id: u32, notification: &ExExNotification, ) -> eyre::Result { let file_path = self.file_path(file_id); - debug!(?file_path, "Writing notification to WAL"); + debug!(target: "exex::wal::storage", ?file_path, "Writing notification to WAL"); // Serialize using the bincode- and msgpack-compatible serde wrapper let notification =