Skip to content

Commit

Permalink
exex: add unit tests for exex manager (#10380)
Browse files Browse the repository at this point in the history
  • Loading branch information
tcoratger authored and fgimenez committed Aug 20, 2024
1 parent fa0a61e commit f1899c0
Show file tree
Hide file tree
Showing 2 changed files with 299 additions and 6 deletions.
1 change: 1 addition & 0 deletions crates/exex/exex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ reth-evm-ethereum.workspace = true
reth-node-api.workspace = true
reth-provider = { workspace = true, features = ["test-utils"] }
reth-testing-utils.workspace = true
reth-primitives-traits = { workspace = true, features = ["test-utils"] }

secp256k1.workspace = true

Expand Down
304 changes: 298 additions & 6 deletions crates/exex/exex/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ impl ExExManagerHandle {
/// If this returns `false`, the owner of the handle should **NOT** send new notifications over
/// the channel until the manager is ready again, as this can lead to unbounded memory growth.
pub fn has_capacity(&self) -> bool {
self.current_capacity.load(Ordering::Relaxed) > 0
self.capacity() > 0
}

/// Returns `true` if there are `ExEx`'s installed in the node.
Expand Down Expand Up @@ -481,18 +481,310 @@ impl Clone for ExExManagerHandle {

#[cfg(test)]
mod tests {
use super::*;
use reth_primitives::{SealedBlockWithSenders, B256};
use reth_provider::Chain;

#[tokio::test]
async fn test_delivers_events() {
let (mut exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string());

// Send an event and check that it's delivered correctly
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();
let received_event = exex_handle.receiver.recv().await.unwrap();
assert_eq!(received_event, ExExEvent::FinishedHeight(42));
}

#[tokio::test]
async fn test_has_exexs() {
let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string());

assert!(!ExExManager::new(vec![], 0).handle.has_exexs());

assert!(ExExManager::new(vec![exex_handle_1], 0).handle.has_exexs());
}

#[tokio::test]
async fn test_has_capacity() {
let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string());

assert!(!ExExManager::new(vec![], 0).handle.has_capacity());

assert!(ExExManager::new(vec![exex_handle_1], 10).handle.has_capacity());
}

#[test]
fn test_push_notification() {
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string());

// Create a mock ExExManager and add the exex_handle to it
let mut exex_manager = ExExManager::new(vec![exex_handle], 10);

// Define the notification for testing
let mut block1 = SealedBlockWithSenders::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
block1.block.header.set_block_number(10);

let notification1 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};

// Push the first notification
exex_manager.push_notification(notification1.clone());

// Verify the buffer contains the notification with the correct ID
assert_eq!(exex_manager.buffer.len(), 1);
assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
assert_eq!(exex_manager.next_id, 1);

// Push another notification
let mut block2 = SealedBlockWithSenders::default();
block2.block.header.set_hash(B256::new([0x02; 32]));
block2.block.header.set_block_number(20);

let notification2 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block2.clone()], Default::default(), Default::default())),
};

exex_manager.push_notification(notification2.clone());

// Verify the buffer contains both notifications with correct IDs
assert_eq!(exex_manager.buffer.len(), 2);
assert_eq!(exex_manager.buffer.front().unwrap().0, 0);
assert_eq!(exex_manager.buffer.front().unwrap().1, notification1);
assert_eq!(exex_manager.buffer.get(1).unwrap().0, 1);
assert_eq!(exex_manager.buffer.get(1).unwrap().1, notification2);
assert_eq!(exex_manager.next_id, 2);
}

#[test]
fn test_update_capacity() {
let (exex_handle, _, _) = ExExHandle::new("test_exex".to_string());

// Create a mock ExExManager and add the exex_handle to it
let max_capacity = 5;
let mut exex_manager = ExExManager::new(vec![exex_handle], max_capacity);

// Push some notifications to fill part of the buffer
let mut block1 = SealedBlockWithSenders::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
block1.block.header.set_block_number(10);

let notification1 = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};

exex_manager.push_notification(notification1.clone());
exex_manager.push_notification(notification1);

// Update capacity
exex_manager.update_capacity();

// Verify current capacity and metrics
assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity - 2);

// Clear the buffer and update capacity
exex_manager.buffer.clear();
exex_manager.update_capacity();

// Verify current capacity
assert_eq!(exex_manager.current_capacity.load(Ordering::Relaxed), max_capacity);
}

#[tokio::test]
async fn delivers_events() {}
async fn test_updates_block_height() {
let (exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string());

// Check initial block height
assert!(exex_handle.finished_height.is_none());

// Update the block height via an event
event_tx.send(ExExEvent::FinishedHeight(42)).unwrap();

// Create a mock ExExManager and add the exex_handle to it
let exex_manager = ExExManager::new(vec![exex_handle], 10);

let mut cx = Context::from_waker(futures::task::noop_waker_ref());

// Pin the ExExManager to call the poll method
let mut pinned_manager = std::pin::pin!(exex_manager);
let _ = pinned_manager.as_mut().poll(&mut cx);

// Check that the block height was updated
let updated_exex_handle = &pinned_manager.exex_handles[0];
assert_eq!(updated_exex_handle.finished_height, Some(42));
}

#[tokio::test]
async fn test_exex_manager_capacity() {
let (exex_handle_1, _, _) = ExExHandle::new("test_exex_1".to_string());

// Create an ExExManager with a small max capacity
let max_capacity = 2;
let mut exex_manager = ExExManager::new(vec![exex_handle_1], max_capacity);

let mut cx = Context::from_waker(futures::task::noop_waker_ref());

// Setup a notification
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![Default::default()],
Default::default(),
Default::default(),
)),
};

// Send notifications to go over the max capacity
exex_manager.handle.exex_tx.send(notification.clone()).unwrap();
exex_manager.handle.exex_tx.send(notification.clone()).unwrap();
exex_manager.handle.exex_tx.send(notification).unwrap();

// Pin the ExExManager to call the poll method
let mut pinned_manager = std::pin::pin!(exex_manager);

// Before polling, the next notification ID should be 0 and the buffer should be empty
assert_eq!(pinned_manager.next_id, 0);
assert_eq!(pinned_manager.buffer.len(), 0);

let _ = pinned_manager.as_mut().poll(&mut cx);

// After polling, the next notification ID and buffer size should be updated
assert_eq!(pinned_manager.next_id, 2);
assert_eq!(pinned_manager.buffer.len(), 2);
}

#[tokio::test]
async fn capacity() {}
async fn exex_handle_new() {
let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string());

// Check initial state
assert_eq!(exex_handle.id, "test_exex");
assert_eq!(exex_handle.next_notification_id, 0);

// Setup two blocks for the chain commit notification
let mut block1 = SealedBlockWithSenders::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
block1.block.header.set_block_number(10);

let mut block2 = SealedBlockWithSenders::default();
block2.block.header.set_hash(B256::new([0x02; 32]));
block2.block.header.set_block_number(11);

// Setup a notification
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(
vec![block1.clone(), block2.clone()],
Default::default(),
Default::default(),
)),
};

let mut cx = Context::from_waker(futures::task::noop_waker_ref());

// Send a notification and ensure it's received correctly
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notification_rx.recv().await.unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending => panic!("Notification send is pending"),
Poll::Ready(Err(e)) => panic!("Failed to send notification: {:?}", e),
}

// Ensure the notification ID was incremented
assert_eq!(exex_handle.next_notification_id, 23);
}

#[tokio::test]
async fn updates_block_height() {}
async fn test_notification_if_finished_height_gt_chain_tip() {
let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string());

// Set finished_height to a value higher than the block tip
exex_handle.finished_height = Some(15);

let mut block1 = SealedBlockWithSenders::default();
block1.block.header.set_hash(B256::new([0x01; 32]));
block1.block.header.set_block_number(10);

let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block1.clone()], Default::default(), Default::default())),
};

let mut cx = Context::from_waker(futures::task::noop_waker_ref());

// Send the notification
match exex_handle.send(&mut cx, &(22, notification)) {
Poll::Ready(Ok(())) => {
// The notification should be skipped, so nothing should be sent.
// Check that the receiver channel is indeed empty
assert!(notification_rx.try_recv().is_err(), "Receiver channel should be empty");
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail");
}
}

// Ensure the notification ID was still incremented
assert_eq!(exex_handle.next_notification_id, 23);
}

#[tokio::test]
async fn slow_exex() {}
async fn test_sends_chain_reorged_notification() {
let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string());

let notification = ExExNotification::ChainReorged {
old: Arc::new(Chain::default()),
new: Arc::new(Chain::default()),
};

// Even if the finished height is higher than the tip of the new chain, the reorg
// notification should be received
exex_handle.finished_height = Some(u64::MAX);

let mut cx = Context::from_waker(futures::task::noop_waker_ref());

// Send the notification
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notification_rx.recv().await.unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail")
}
}

// Ensure the notification ID was incremented
assert_eq!(exex_handle.next_notification_id, 23);
}

#[tokio::test]
async fn is_ready() {}
async fn test_sends_chain_reverted_notification() {
let (mut exex_handle, _, mut notification_rx) = ExExHandle::new("test_exex".to_string());

let notification = ExExNotification::ChainReverted { old: Arc::new(Chain::default()) };

// Even if the finished height is higher than the tip of the new chain, the reorg
// notification should be received
exex_handle.finished_height = Some(u64::MAX);

let mut cx = Context::from_waker(futures::task::noop_waker_ref());

// Send the notification
match exex_handle.send(&mut cx, &(22, notification.clone())) {
Poll::Ready(Ok(())) => {
let received_notification = notification_rx.recv().await.unwrap();
assert_eq!(received_notification, notification);
}
Poll::Pending | Poll::Ready(Err(_)) => {
panic!("Notification should not be pending or fail")
}
}

// Ensure the notification ID was incremented
assert_eq!(exex_handle.next_notification_id, 23);
}
}

0 comments on commit f1899c0

Please sign in to comment.