From e890f25201cd5cee17958f5333d47ccb548925ec Mon Sep 17 00:00:00 2001 From: Ben Schofield Date: Fri, 5 Jan 2024 10:36:30 -0800 Subject: [PATCH 1/2] Add a tracing warning when a thread blocks steps Add a warning to the sim when a given host or client blocks progress in a simulation run. This works by spawning a background thread for each run that periodically checks the steps taken by the simulation. If the number of steps is the same between checks then the thread adds the tracing info. --- src/sim.rs | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/src/sim.rs b/src/sim.rs index 6f2abbf..b854b49 100644 --- a/src/sim.rs +++ b/src/sim.rs @@ -5,7 +5,10 @@ use std::cell::RefCell; use std::future::Future; use std::net::IpAddr; use std::ops::DerefMut; -use std::sync::Arc; +use std::sync::mpsc::TryRecvError; +use std::sync::{Arc, mpsc}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; use std::time::UNIX_EPOCH; use tokio::time::Duration; use tracing::Level; @@ -30,7 +33,7 @@ pub struct Sim<'a> { /// Simulation elapsed time elapsed: Duration, - steps: usize, + steps: Arc, } impl<'a> Sim<'a> { @@ -46,7 +49,7 @@ impl<'a> Sim<'a> { rts: IndexMap::new(), since_epoch, elapsed: Duration::ZERO, - steps: 1, // bumped after each step + steps: Arc::new(1.into()), // bumped after each step } } @@ -309,10 +312,33 @@ impl<'a> Sim<'a> { /// Executes a simple event loop that calls [step](#method.step) each iteration, /// returning early if any host software errors. pub fn run(&mut self) -> Result { + let steps = self.steps.clone(); + let (tx, rx) = mpsc::channel(); + std::thread::spawn(move || { + let mut blocked = false; + loop { + let prev = steps.load(std::sync::atomic::Ordering::Relaxed); + // Exit if main thread has. + match rx.try_recv() { + Ok(_) | Err(TryRecvError::Disconnected) => break, + _ => {} + } + std::thread::sleep(Duration::from_secs(10)); + if steps.load(std::sync::atomic::Ordering::Relaxed) == prev { + if !blocked { + tracing::warn!("A task is blocking preventing simulation steps at step {}.", prev); + } + blocked = true; + } else { + blocked = false; + } + } + }); loop { let is_finished = self.step()?; if is_finished { + let _ = tx.send(()); return Ok(()); } } @@ -328,7 +354,7 @@ impl<'a> Sim<'a> { /// /// Returns whether or not all clients have completed. pub fn step(&mut self) -> Result { - tracing::debug!("step {}", self.steps); + tracing::debug!("step {}", self.steps.load(Relaxed)); let tick = self.config.tick; let mut is_finished = true; @@ -380,12 +406,12 @@ impl<'a> Sim<'a> { } self.elapsed += tick; - self.steps += 1; + let steps = self.steps.fetch_add(1, Relaxed) + 1; if self.elapsed > self.config.duration && !is_finished { return Err(format!( "Ran for duration: {:?} steps: {} without completing", - self.config.duration, self.steps, + self.config.duration, steps, ))?; } From 7c5c325b17cd95e9b304ea6a21f843a17242ee84 Mon Sep 17 00:00:00 2001 From: Ben Schofield Date: Wed, 10 Jan 2024 10:10:54 -0800 Subject: [PATCH 2/2] Cleanup early --- src/sim.rs | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/sim.rs b/src/sim.rs index b854b49..56549ae 100644 --- a/src/sim.rs +++ b/src/sim.rs @@ -5,7 +5,7 @@ use std::cell::RefCell; use std::future::Future; use std::net::IpAddr; use std::ops::DerefMut; -use std::sync::mpsc::TryRecvError; +use std::sync::mpsc::RecvTimeoutError; use std::sync::{Arc, mpsc}; use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering::Relaxed; @@ -313,17 +313,16 @@ impl<'a> Sim<'a> { /// returning early if any host software errors. pub fn run(&mut self) -> Result { let steps = self.steps.clone(); - let (tx, rx) = mpsc::channel(); + let (_tx, rx) = mpsc::channel::<()>(); std::thread::spawn(move || { let mut blocked = false; loop { let prev = steps.load(std::sync::atomic::Ordering::Relaxed); // Exit if main thread has. - match rx.try_recv() { - Ok(_) | Err(TryRecvError::Disconnected) => break, + match rx.recv_timeout(Duration::from_secs(10)) { + Ok(_) | Err(RecvTimeoutError::Disconnected) => break, _ => {} } - std::thread::sleep(Duration::from_secs(10)); if steps.load(std::sync::atomic::Ordering::Relaxed) == prev { if !blocked { tracing::warn!("A task is blocking preventing simulation steps at step {}.", prev); @@ -338,7 +337,6 @@ impl<'a> Sim<'a> { let is_finished = self.step()?; if is_finished { - let _ = tx.send(()); return Ok(()); } }