diff --git a/test/src/main.rs b/test/src/main.rs index 4b538f6094..d6a8187e70 100644 --- a/test/src/main.rs +++ b/test/src/main.rs @@ -148,6 +148,7 @@ fn main() { let mut test_results = Vec::new(); let mut worker_running = worker_count; let mut done_specs = 0; + let mut started_sequential = false; while worker_running > 0 { if max_time > 0 && start_time.elapsed().as_secs() > max_time { // shutdown, specs running to long @@ -155,6 +156,11 @@ fn main() { break; } + if worker_running == 1 && !started_sequential { + started_sequential = true; + workers.start_sequencial() + } + let msg = match notify_rx.recv_timeout(Duration::from_secs(5)) { Ok(msg) => msg, Err(err) => { @@ -590,9 +596,9 @@ fn all_specs() -> Vec> { Box::new(CheckVmVersion2), Box::new(CheckVmBExtension), Box::new(RandomlyKill), + Box::new(SyncChurn), ]; specs.shuffle(&mut thread_rng()); - specs.insert(0, Box::new(SyncChurn) as Box); specs } diff --git a/test/src/worker.rs b/test/src/worker.rs index 4fec1ccd3f..88ae61624f 100644 --- a/test/src/worker.rs +++ b/test/src/worker.rs @@ -13,6 +13,7 @@ use std::time::Instant; #[derive(PartialEq, Eq)] pub enum Command { Shutdown, + StartSequencial, } /// Notify from worker @@ -45,6 +46,9 @@ pub struct Worker { inbox: Receiver, outbox: Sender, start_port: Arc, + + sequencial_tasks: Arc>>>, + sequencial_worker: bool, } impl Clone for Worker { @@ -54,13 +58,18 @@ impl Clone for Worker { inbox: self.inbox.clone(), outbox: self.outbox.clone(), start_port: Arc::clone(&self.start_port), + sequencial_tasks: Arc::clone(&self.sequencial_tasks), + sequencial_worker: self.sequencial_worker, } } } +const SEQUENCIAL_TASKS: &[&str] = &["RandomlyKill", "SyncChurn"]; + impl Worker { pub fn new( tasks: Arc>>>, + sequencial_tasks: Arc>>>, inbox: Receiver, outbox: Sender, start_port: Arc, @@ -70,12 +79,16 @@ impl Worker { inbox, outbox, start_port, + sequencial_tasks, + sequencial_worker: false, } } /// start handle tasks pub fn start(self) -> JoinHandle<()> { thread::spawn(move || { + let mut start_sequencial_task = false; + loop { let msg = match self.inbox.try_recv() { Ok(msg) => Some(msg), @@ -88,20 +101,54 @@ impl Worker { } }; // check command - if Some(Command::Shutdown) == msg { - self.outbox.send(Notify::Stop).unwrap(); - return; + match msg { + Some(Command::StartSequencial) => { + start_sequencial_task = true; + } + Some(Command::Shutdown) => { + self.outbox.send(Notify::Stop).unwrap(); + return; + } + _ => {} } + // pick a spec to run - let spec = match self.tasks.lock().pop() { - Some(spec) => spec, + + let task = self.tasks.lock().pop(); + match task { + Some(spec) => { + // if spec.name() is RandomlyKill or SyncChurn, then push it to sequencial_tasks + if SEQUENCIAL_TASKS.contains(&spec.name()) { + info!("push {} to sequencial_tasks", spec.name()); + self.sequencial_tasks.lock().push(spec); + } else { + self.run_spec(spec.as_ref(), 0); + } + } None => { - self.outbox.send(Notify::Stop).unwrap(); - return; + if self.sequencial_worker { + info!("sequencial worker is waiting for command"); + if start_sequencial_task { + match self.sequencial_tasks.lock().pop() { + Some(spec) => { + self.run_spec(spec.as_ref(), 0); + } + None => { + info!("sequencial worker has no task to run"); + self.outbox.send(Notify::Stop).unwrap(); + return; + } + }; + } else { + info!("sequencial worker is waiting for parallel workers finish"); + std::thread::sleep(std::time::Duration::from_secs(1)); + } + } else { + self.outbox.send(Notify::Stop).unwrap(); + return; + } } }; - - self.run_spec(spec.as_ref(), 0); } }) } @@ -176,13 +223,17 @@ impl Workers { start_port: u16, ) -> Self { let start_port = Arc::new(AtomicU16::new(start_port)); + + let sequencial_tasks = Arc::new(Mutex::new(Vec::new())); let workers: Vec<_> = (0..count) .map({ let tasks = Arc::clone(&tasks); + let sequencial_tasks = Arc::clone(&sequencial_tasks); move |_| { let (command_tx, command_rx) = unbounded(); let worker = Worker::new( Arc::clone(&tasks), + Arc::clone(&sequencial_tasks), command_rx, outbox.clone(), Arc::clone(&start_port), @@ -200,6 +251,8 @@ impl Workers { /// start all workers pub fn start(&mut self) { + self.workers.first_mut().unwrap().1.sequencial_worker = true; + let mut join_handles = Vec::new(); for w in self.workers.iter_mut() { let h = w.1.clone().start(); @@ -208,6 +261,20 @@ impl Workers { self.join_handles.replace(join_handles); } + pub fn start_sequencial(&mut self) { + if let Err(err) = self + .workers + .first() + .unwrap() + .0 + .send(Command::StartSequencial) + { + error!("start sequencial worker failed, error: {}", err); + } else { + info!("start sequencial worker success") + } + } + /// shutdown all workers, must call join_all after this. pub fn shutdown(&mut self) { if self.is_shutdown {