Skip to content

Commit

Permalink
Test if SyncStatus waits for the chain tip
Browse files Browse the repository at this point in the history
Test if waiting for the chain tip to be reached correctly finishes when
the chain tip is reached. This is done by sending recent sync lengths to
the `SyncStatus` instance, and checking that every time a separate
`SyncStatus` instance determines it has reached the tip the original
instance wakes up.
  • Loading branch information
jvff committed Aug 28, 2021
1 parent ff776ba commit 7217936
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 0 deletions.
3 changes: 3 additions & 0 deletions zebrad/src/components/sync/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ use tokio::sync::watch;

use super::RecentSyncLengths;

#[cfg(test)]
mod tests;

/// A helper type to determine if the synchronizer has likely reached the chain tip.
///
/// This type can be used as a handle, so cloning it is cheap.
Expand Down
140 changes: 140 additions & 0 deletions zebrad/src/components/sync/status/tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
use std::{env, sync::Arc, time::Duration};

use futures::{select, FutureExt};
use proptest::prelude::*;
use tokio::{sync::Semaphore, time::timeout};

use super::{super::RecentSyncLengths, SyncStatus};

/// The default number of test cases to run.
const DEFAULT_ASYNC_SYNCHRONIZED_TASKS_PROPTEST_CASES: u32 = 32;

/// The maximum time one test instance should run.
///
/// If the test exceeds this time it is considered to have failed.
const MAX_TEST_EXECUTION: Duration = Duration::from_secs(1);

/// The maximum time to wait for an event to be received.
///
/// If an event is not received in this time, it is considered that it will never be received.
const EVENT_TIMEOUT: Duration = Duration::from_millis(5);

proptest! {
#![proptest_config(
proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_ASYNC_SYNCHRONIZED_TASKS_PROPTEST_CASES))
)]

/// Test if the [`SyncStatus`] correctly waits until the chain tip is reached.
///
/// This is an asynchronous test with two concurrent tasks. The main task mocks chain sync
/// length updates and verifies if the other task was awakened by the update.
#[test]
fn waits_until_close_to_tip(sync_lengths in any::<Vec<usize>>()) {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.expect("Failed to create Tokio runtime");
let _guard = runtime.enter();

runtime.block_on(timeout(MAX_TEST_EXECUTION, root_task(sync_lengths)))??;

/// The root task that the runtime executes.
///
/// Spawns the two concurrent tasks, and sets up the synchronization channels between them.
async fn root_task(sync_lengths: Vec<usize>) -> Result<(), TestCaseError> {
let update_events = Arc::new(Semaphore::new(0));
let wake_events = Arc::new(Semaphore::new(0));

let (status, recent_sync_lengths) = SyncStatus::new();

let mut wait_task_handle = tokio::spawn(wait_task(
status.clone(),
update_events.clone(),
wake_events.clone(),
))
.fuse();

let mut main_task_handle = tokio::spawn(main_task(
sync_lengths,
status,
recent_sync_lengths,
update_events,
wake_events,
))
.fuse();

select! {
result = main_task_handle => result.expect("Failed to wait for main test task"),
result = wait_task_handle => result.expect("Failed to wait for wait test task"),
}
}

/// The main task.
///
/// 1. Applies each chain sync length update from the `sync_lengths` parameter.
/// 2. If necessary, notify the other task that an update was applied. This is to avoid
/// having the other task enter an infinite loop while it thinks it has reached the
/// chain tip.
/// 3. Waits to see if the other task sends a wake event, meaning that it awoke because it
/// was notified that it has reached the chain tip.
/// 4. Compares to see if the there was an awake event and if it was expected or not based
/// on whether the [`SyncStatus`] says that it's close to the tip.
async fn main_task(
sync_lengths: Vec<usize>,
status: SyncStatus,
mut recent_sync_lengths: RecentSyncLengths,
update_events: Arc<Semaphore>,
wake_events: Arc<Semaphore>,
) -> Result<(), TestCaseError> {
let mut needs_update_event = true;

for length in sync_lengths {
recent_sync_lengths.push_extend_tips_length(length);

if needs_update_event {
update_events.add_permits(1);
}

let awoke = match timeout(EVENT_TIMEOUT, wake_events.acquire()).await {
Ok(permit) => {
permit.forget();
true
}
Err(_) => false,
};

needs_update_event = awoke;

assert_eq!(status.is_close_to_tip(), awoke);
}

Ok(())
}

/// The helper task that repeatedly waits until the chain tip is close.
///
/// 1. Waits for an update event granting permission to run an iteration. This avoids
/// looping repeatedly while [`SyncStatus`] reports that it is close to the chain tip.
/// 2. Waits until [`SyncStatus`] reports that it is close to the chain tip.
/// 3. Notifies the main task that it awoke, i.e., that the [`SyncStatus`] has finished
/// wating until it was close to the chain tip.
async fn wait_task(
mut status: SyncStatus,
update_events: Arc<Semaphore>,
wake_events: Arc<Semaphore>,
) -> Result<(), TestCaseError> {
loop {
update_events.acquire().await.forget();

if status.wait_until_close_to_tip().await.is_err() {
return Ok(());
}

wake_events.add_permits(1);
}
}
}
}

0 comments on commit 7217936

Please sign in to comment.