diff --git a/src/sim.rs b/src/sim.rs index 6f2abbf..b854b49 100644 --- a/src/sim.rs +++ b/src/sim.rs @@ -5,7 +5,10 @@ use std::cell::RefCell; use std::future::Future; use std::net::IpAddr; use std::ops::DerefMut; -use std::sync::Arc; +use std::sync::mpsc::TryRecvError; +use std::sync::{Arc, mpsc}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; use std::time::UNIX_EPOCH; use tokio::time::Duration; use tracing::Level; @@ -30,7 +33,7 @@ pub struct Sim<'a> { /// Simulation elapsed time elapsed: Duration, - steps: usize, + steps: Arc, } impl<'a> Sim<'a> { @@ -46,7 +49,7 @@ impl<'a> Sim<'a> { rts: IndexMap::new(), since_epoch, elapsed: Duration::ZERO, - steps: 1, // bumped after each step + steps: Arc::new(1.into()), // bumped after each step } } @@ -309,10 +312,33 @@ impl<'a> Sim<'a> { /// Executes a simple event loop that calls [step](#method.step) each iteration, /// returning early if any host software errors. pub fn run(&mut self) -> Result { + let steps = self.steps.clone(); + let (tx, rx) = mpsc::channel(); + std::thread::spawn(move || { + let mut blocked = false; + loop { + let prev = steps.load(std::sync::atomic::Ordering::Relaxed); + // Exit if main thread has. + match rx.try_recv() { + Ok(_) | Err(TryRecvError::Disconnected) => break, + _ => {} + } + std::thread::sleep(Duration::from_secs(10)); + if steps.load(std::sync::atomic::Ordering::Relaxed) == prev { + if !blocked { + tracing::warn!("A task is blocking preventing simulation steps at step {}.", prev); + } + blocked = true; + } else { + blocked = false; + } + } + }); loop { let is_finished = self.step()?; if is_finished { + let _ = tx.send(()); return Ok(()); } } @@ -328,7 +354,7 @@ impl<'a> Sim<'a> { /// /// Returns whether or not all clients have completed. pub fn step(&mut self) -> Result { - tracing::debug!("step {}", self.steps); + tracing::debug!("step {}", self.steps.load(Relaxed)); let tick = self.config.tick; let mut is_finished = true; @@ -380,12 +406,12 @@ impl<'a> Sim<'a> { } self.elapsed += tick; - self.steps += 1; + let steps = self.steps.fetch_add(1, Relaxed) + 1; if self.elapsed > self.config.duration && !is_finished { return Err(format!( "Ran for duration: {:?} steps: {} without completing", - self.config.duration, self.steps, + self.config.duration, steps, ))?; }