From 8905fd22eac97d6aaa769350c217f05602efb2a5 Mon Sep 17 00:00:00 2001 From: Stephan Dilly Date: Wed, 1 Sep 2021 23:02:08 +0200 Subject: [PATCH] make test more stable --- asyncgit/src/asyncjob/mod.rs | 52 +++++++++++++++++++++++++++++------- 1 file changed, 43 insertions(+), 9 deletions(-) diff --git a/asyncgit/src/asyncjob/mod.rs b/asyncgit/src/asyncjob/mod.rs index 161df802b4..6b8df38a6c 100644 --- a/asyncgit/src/asyncjob/mod.rs +++ b/asyncgit/src/asyncjob/mod.rs @@ -63,7 +63,9 @@ impl } } - /// spawns `task` if nothing is running currently, otherwise schedules as `next` overwriting if `next` was set before + /// spawns `task` if nothing is running currently, + /// otherwise schedules as `next` overwriting if `next` was set before. + /// return `true` if the new task gets started right away. pub fn spawn(&mut self, task: J) -> bool { self.schedule_next(task); self.check_for_job() @@ -129,23 +131,35 @@ mod test { use crossbeam_channel::unbounded; use pretty_assertions::assert_eq; use std::{ - sync::atomic::AtomicU32, thread::sleep, time::Duration, + sync::atomic::{AtomicBool, AtomicU32, Ordering}, + thread, + time::Duration, }; #[derive(Clone)] struct TestJob { v: Arc, + finish: Arc, value_to_add: u32, } impl AsyncJob for TestJob { fn run(&mut self) { - sleep(Duration::from_millis(100)); + self.finish.store(false, Ordering::Relaxed); - self.v.fetch_add( - self.value_to_add, - std::sync::atomic::Ordering::Relaxed, - ); + println!("[job] wait"); + + while self.finish.load(Ordering::Relaxed) { + std::thread::yield_now(); + } + + println!("[job] sleep"); + + thread::sleep(Duration::from_millis(100)); + + println!("[job] done sleeping"); + + self.v.fetch_add(self.value_to_add, Ordering::Relaxed); } } @@ -160,15 +174,20 @@ mod test { let task = TestJob { v: Arc::new(AtomicU32::new(1)), + finish: Arc::new(AtomicBool::new(false)), value_to_add: 1, }; assert!(job.spawn(task.clone())); - sleep(Duration::from_millis(1)); + thread::sleep(Duration::from_millis(10)); + for _ in 0..5 { + println!("spawn"); assert!(!job.spawn(task.clone())); } + task.finish.store(true, Ordering::Relaxed); + let _foo = receiver.recv().unwrap(); let _foo = receiver.recv().unwrap(); assert!(receiver.is_empty()); @@ -179,6 +198,12 @@ mod test { ); } + fn wait_for_job(job: &AsyncSingleJob) { + while job.is_pending() { + thread::sleep(Duration::from_millis(10)); + } + } + #[test] fn test_cancel() { let (sender, receiver) = unbounded(); @@ -188,17 +213,26 @@ mod test { let task = TestJob { v: Arc::new(AtomicU32::new(1)), + finish: Arc::new(AtomicBool::new(false)), value_to_add: 1, }; assert!(job.spawn(task.clone())); - sleep(Duration::from_millis(1)); + task.finish.store(true, Ordering::Relaxed); + thread::sleep(Duration::from_millis(10)); for _ in 0..5 { + println!("spawn"); assert!(!job.spawn(task.clone())); } + + println!("cancel"); assert!(job.cancel()); + task.finish.store(true, Ordering::Relaxed); + + wait_for_job(&job); + let _foo = receiver.recv().unwrap(); assert_eq!(