From f1899c0b15d14e97485b1eed714e7aed6a69524f Mon Sep 17 00:00:00 2001 From: Thomas Coratger <60488569+tcoratger@users.noreply.github.com> Date: Tue, 20 Aug 2024 00:08:47 -0700 Subject: [PATCH] exex: add unit tests for exex manager (#10380) --- crates/exex/exex/Cargo.toml | 1 + crates/exex/exex/src/manager.rs | 304 +++++++++++++++++++++++++++++++- 2 files changed, 299 insertions(+), 6 deletions(-) diff --git a/crates/exex/exex/Cargo.toml b/crates/exex/exex/Cargo.toml index 03400f78cd26e..1ad906e6f0103 100644 --- a/crates/exex/exex/Cargo.toml +++ b/crates/exex/exex/Cargo.toml @@ -47,6 +47,7 @@ reth-evm-ethereum.workspace = true reth-node-api.workspace = true reth-provider = { workspace = true, features = ["test-utils"] } reth-testing-utils.workspace = true +reth-primitives-traits = { workspace = true, features = ["test-utils"] } secp256k1.workspace = true diff --git a/crates/exex/exex/src/manager.rs b/crates/exex/exex/src/manager.rs index ce4994c34b038..0772d1b78bd5c 100644 --- a/crates/exex/exex/src/manager.rs +++ b/crates/exex/exex/src/manager.rs @@ -432,7 +432,7 @@ impl ExExManagerHandle { /// If this returns `false`, the owner of the handle should **NOT** send new notifications over /// the channel until the manager is ready again, as this can lead to unbounded memory growth. pub fn has_capacity(&self) -> bool { - self.current_capacity.load(Ordering::Relaxed) > 0 + self.capacity() > 0 } /// Returns `true` if there are `ExEx`'s installed in the node. @@ -481,18 +481,310 @@ impl Clone for ExExManagerHandle { #[cfg(test)] mod tests { + use super::*; + use reth_primitives::{SealedBlockWithSenders, B256}; + use reth_provider::Chain; + + #[tokio::test] + async fn test_delivers_events() { + let (mut exex_handle, event_tx, mut _notification_rx) = + ExExHandle::new("test_exex".to_string()); + + // Send an event and check that it's delivered correctly + event_tx.send(ExExEvent::FinishedHeight(42)).unwrap(); + let received_event = exex_handle.receiver.recv().await.unwrap(); + assert_eq!(received_event, ExExEvent::FinishedHeight(42)); + } + + #[tokio::test] + async fn test_has_exexs() { + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string()); + + assert!(!ExExManager::new(vec![], 0).handle.has_exexs()); + + assert!(ExExManager::new(vec![exex_handle_1], 0).handle.has_exexs()); + } + + #[tokio::test] + async fn test_has_capacity() { + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string()); + + assert!(!ExExManager::new(vec![], 0).handle.has_capacity()); + + assert!(ExExManager::new(vec![exex_handle_1], 10).handle.has_capacity()); + } + + #[test] + fn test_push_notification() { + let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string()); + + // Create a mock ExExManager and add the exex_handle to it + let mut exex_manager = ExExManager::new(vec![exex_handle], 10); + + // Define the notification for testing + let mut block1 = SealedBlockWithSenders::default(); + block1.block.header.set_hash(B256::new([0x01; 32])); + block1.block.header.set_block_number(10); + + let notification1 = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())), + }; + + // Push the first notification + exex_manager.push_notification(notification1.clone()); + + // Verify the buffer contains the notification with the correct ID + assert_eq!(exex_manager.buffer.len(), 1); + assert_eq!(exex_manager.buffer.front().unwrap().0, 0); + assert_eq!(exex_manager.buffer.front().unwrap().1, notification1); + assert_eq!(exex_manager.next_id, 1); + + // Push another notification + let mut block2 = SealedBlockWithSenders::default(); + block2.block.header.set_hash(B256::new([0x02; 32])); + block2.block.header.set_block_number(20); + + let notification2 = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())), + }; + + exex_manager.push_notification(notification2.clone()); + + // Verify the buffer contains both notifications with correct IDs + assert_eq!(exex_manager.buffer.len(), 2); + assert_eq!(exex_manager.buffer.front().unwrap().0, 0); + assert_eq!(exex_manager.buffer.front().unwrap().1, notification1); + assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1); + assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2); + assert_eq!(exex_manager.next_id, 2); + } + + #[test] + fn test_update_capacity() { + let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string()); + + // Create a mock ExExManager and add the exex_handle to it + let max_capacity = 5; + let mut exex_manager = ExExManager::new(vec![exex_handle], max_capacity); + + // Push some notifications to fill part of the buffer + let mut block1 = SealedBlockWithSenders::default(); + block1.block.header.set_hash(B256::new([0x01; 32])); + block1.block.header.set_block_number(10); + + let notification1 = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())), + }; + + exex_manager.push_notification(notification1.clone()); + exex_manager.push_notification(notification1); + + // Update capacity + exex_manager.update_capacity(); + + // Verify current capacity and metrics + assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity - 2); + + // Clear the buffer and update capacity + exex_manager.buffer.clear(); + exex_manager.update_capacity(); + + // Verify current capacity + assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity); + } + #[tokio::test] - async fn delivers_events() {} + async fn test_updates_block_height() { + let (exex_handle, event_tx, mut _notification_rx) = + ExExHandle::new("test_exex".to_string()); + + // Check initial block height + assert!(exex_handle.finished_height.is_none()); + + // Update the block height via an event + event_tx.send(ExExEvent::FinishedHeight(42)).unwrap(); + + // Create a mock ExExManager and add the exex_handle to it + let exex_manager = ExExManager::new(vec![exex_handle], 10); + + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + + // Pin the ExExManager to call the poll method + let mut pinned_manager = std::pin::pin!(exex_manager); + let _ = pinned_manager.as_mut().poll(&mut cx); + + // Check that the block height was updated + let updated_exex_handle = &pinned_manager.exex_handles[0]; + assert_eq!(updated_exex_handle.finished_height, Some(42)); + } + + #[tokio::test] + async fn test_exex_manager_capacity() { + let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string()); + + // Create an ExExManager with a small max capacity + let max_capacity = 2; + let mut exex_manager = ExExManager::new(vec![exex_handle_1], max_capacity); + + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + + // Setup a notification + let notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![Default::default()], + Default::default(), + Default::default(), + )), + }; + + // Send notifications to go over the max capacity + exex_manager.handle.exex_tx.send(notification.clone()).unwrap(); + exex_manager.handle.exex_tx.send(notification.clone()).unwrap(); + exex_manager.handle.exex_tx.send(notification).unwrap(); + + // Pin the ExExManager to call the poll method + let mut pinned_manager = std::pin::pin!(exex_manager); + + // Before polling, the next notification ID should be 0 and the buffer should be empty + assert_eq!(pinned_manager.next_id, 0); + assert_eq!(pinned_manager.buffer.len(), 0); + + let _ = pinned_manager.as_mut().poll(&mut cx); + + // After polling, the next notification ID and buffer size should be updated + assert_eq!(pinned_manager.next_id, 2); + assert_eq!(pinned_manager.buffer.len(), 2); + } #[tokio::test] - async fn capacity() {} + async fn exex_handle_new() { + let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); + + // Check initial state + assert_eq!(exex_handle.id, "test_exex"); + assert_eq!(exex_handle.next_notification_id, 0); + + // Setup two blocks for the chain commit notification + let mut block1 = SealedBlockWithSenders::default(); + block1.block.header.set_hash(B256::new([0x01; 32])); + block1.block.header.set_block_number(10); + + let mut block2 = SealedBlockWithSenders::default(); + block2.block.header.set_hash(B256::new([0x02; 32])); + block2.block.header.set_block_number(11); + + // Setup a notification + let notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new( + vec![block1.clone(), block2.clone()], + Default::default(), + Default::default(), + )), + }; + + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + + // Send a notification and ensure it's received correctly + match exex_handle.send(&mut cx, &(22, notification.clone())) { + Poll::Ready(Ok(())) => { + let received_notification = notification_rx.recv().await.unwrap(); + assert_eq!(received_notification, notification); + } + Poll::Pending => panic!("Notification send is pending"), + Poll::Ready(Err(e)) => panic!("Failed to send notification: {:?}", e), + } + + // Ensure the notification ID was incremented + assert_eq!(exex_handle.next_notification_id, 23); + } #[tokio::test] - async fn updates_block_height() {} + async fn test_notification_if_finished_height_gt_chain_tip() { + let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); + + // Set finished_height to a value higher than the block tip + exex_handle.finished_height = Some(15); + + let mut block1 = SealedBlockWithSenders::default(); + block1.block.header.set_hash(B256::new([0x01; 32])); + block1.block.header.set_block_number(10); + + let notification = ExExNotification::ChainCommitted { + new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())), + }; + + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + + // Send the notification + match exex_handle.send(&mut cx, &(22, notification)) { + Poll::Ready(Ok(())) => { + // The notification should be skipped, so nothing should be sent. + // Check that the receiver channel is indeed empty + assert!(notification_rx.try_recv().is_err(), "Receiver channel should be empty"); + } + Poll::Pending | Poll::Ready(Err(_)) => { + panic!("Notification should not be pending or fail"); + } + } + + // Ensure the notification ID was still incremented + assert_eq!(exex_handle.next_notification_id, 23); + } #[tokio::test] - async fn slow_exex() {} + async fn test_sends_chain_reorged_notification() { + let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); + + let notification = ExExNotification::ChainReorged { + old: Arc::new(Chain::default()), + new: Arc::new(Chain::default()), + }; + + // Even if the finished height is higher than the tip of the new chain, the reorg + // notification should be received + exex_handle.finished_height = Some(u64::MAX); + + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + + // Send the notification + match exex_handle.send(&mut cx, &(22, notification.clone())) { + Poll::Ready(Ok(())) => { + let received_notification = notification_rx.recv().await.unwrap(); + assert_eq!(received_notification, notification); + } + Poll::Pending | Poll::Ready(Err(_)) => { + panic!("Notification should not be pending or fail") + } + } + + // Ensure the notification ID was incremented + assert_eq!(exex_handle.next_notification_id, 23); + } #[tokio::test] - async fn is_ready() {} + async fn test_sends_chain_reverted_notification() { + let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string()); + + let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) }; + + // Even if the finished height is higher than the tip of the new chain, the reorg + // notification should be received + exex_handle.finished_height = Some(u64::MAX); + + let mut cx = Context::from_waker(futures::task::noop_waker_ref()); + + // Send the notification + match exex_handle.send(&mut cx, &(22, notification.clone())) { + Poll::Ready(Ok(())) => { + let received_notification = notification_rx.recv().await.unwrap(); + assert_eq!(received_notification, notification); + } + Poll::Pending | Poll::Ready(Err(_)) => { + panic!("Notification should not be pending or fail") + } + } + + // Ensure the notification ID was incremented + assert_eq!(exex_handle.next_notification_id, 23); + } }