From 7159da859f03f756e0b47ba0df868c9b68b68aa7 Mon Sep 17 00:00:00 2001 From: taoky Date: Mon, 20 Jan 2025 03:26:14 +0800 Subject: [PATCH] scx_loader: Avoid race condition with start_scheduler When RunnerMessage::Switch is called frequently, start_scheduler could encounter a race condition issue, where more than one tokio async task would start the same time, messing with the child_id state. To resolve this issue, the JoinHandle got in start_scheduler is recorded, and instead of using child_id to send SIGINT, a cancellation token from tokio-util is used, and stop_scheduler uses it to inform the async task to kill its child. And we could ensure that only one task is managing spawned scheduler process. --- Cargo.lock | 1 + rust/scx_loader/Cargo.toml | 1 + rust/scx_loader/src/main.rs | 140 +++++++++++++++++------------------- 3 files changed, 69 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 17eec3cf8..51a319d06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2145,6 +2145,7 @@ dependencies = [ "serde", "sysinfo", "tokio", + "tokio-util", "toml", "zbus", "zvariant", diff --git a/rust/scx_loader/Cargo.toml b/rust/scx_loader/Cargo.toml index 929b88b7c..bd48abd3b 100644 --- a/rust/scx_loader/Cargo.toml +++ b/rust/scx_loader/Cargo.toml @@ -16,6 +16,7 @@ nix = { features = ["process", "signal"], default-features = false, version = "0 serde = { version = "1.0", features = ["derive"] } sysinfo = "0.31.4" tokio = { version = "1.39", features = ["macros", "sync", "rt-multi-thread", "process"] } +tokio-util = "0.7" toml = "0.8.19" zbus = { version = "5", features = ["tokio"], default-features = false } zvariant = "5.1" diff --git a/rust/scx_loader/src/main.rs b/rust/scx_loader/src/main.rs index a1916f84f..fbe69cd63 100644 --- a/rust/scx_loader/src/main.rs +++ b/rust/scx_loader/src/main.rs @@ -10,9 +10,8 @@ mod logger; use scx_loader::dbus::LoaderClientProxy; use scx_loader::*; +use std::process::ExitStatus; use std::process::Stdio; -use std::sync::atomic::AtomicU32; -use std::sync::atomic::Ordering; use std::sync::Arc; use anyhow::Context; @@ -384,44 +383,45 @@ async fn worker_loop( } async fn handle_child_process(mut rx: tokio::sync::mpsc::Receiver) -> Result<()> { - let child_id = Arc::new(AtomicU32::new(0)); + let mut task: Option>>> = None; + let mut cancel_token = Arc::new(tokio_util::sync::CancellationToken::new()); while let Some(message) = rx.recv().await { match message { RunnerMessage::Switch((scx_sched, sched_args)) => { // stop the sched if its running - if let Err(stop_err) = stop_scheduler(child_id.clone()).await { - log::error!("Failed to stop previous scheduler: {stop_err}"); - } + stop_scheduler(&mut task, &mut cancel_token).await; // overwise start scheduler - if let Err(sched_err) = - start_scheduler(scx_sched, sched_args, child_id.clone()).await - { - log::error!("Scheduler exited with err: {sched_err}"); - } else { - log::debug!("Scheduler exited"); + match start_scheduler(scx_sched, sched_args, cancel_token.clone()).await { + Ok(handle) => { + task = Some(handle); + log::debug!("Scheduler started"); + } + Err(err) => { + log::error!("Failed to start scheduler: {err}"); + } } } RunnerMessage::Start((scx_sched, sched_args)) => { // check if sched is running or not - if child_id.load(Ordering::Relaxed) != 0 { + if task.is_some() { log::error!("Scheduler wasn't finished yet. Stop already running scheduler!"); continue; } // overwise start scheduler - if let Err(sched_err) = - start_scheduler(scx_sched, sched_args, child_id.clone()).await - { - log::error!("Scheduler exited with err: {sched_err}"); - } else { - log::debug!("Scheduler exited"); + match start_scheduler(scx_sched, sched_args, cancel_token.clone()).await { + Ok(handle) => { + task = Some(handle); + log::debug!("Scheduler started"); + } + Err(err) => { + log::error!("Failed to start scheduler: {err}"); + } } } RunnerMessage::Stop => { - if let Err(stop_err) = stop_scheduler(child_id.clone()).await { - log::error!("Failed to stop scheduler: {stop_err}"); - } + stop_scheduler(&mut task, &mut cancel_token).await; } } } @@ -433,27 +433,44 @@ async fn handle_child_process(mut rx: tokio::sync::mpsc::Receiver async fn start_scheduler( scx_crate: SupportedSched, args: Vec, - child_id: Arc, -) -> Result<()> { + cancel_token: Arc, +) -> Result>>> { // Ensure the child process exit is handled correctly in the runtime - tokio::spawn(async move { + let handle = tokio::spawn(async move { let mut retries = 0u32; let max_retries = 5u32; + let mut last_status: Option = None; + while retries < max_retries { - let child = spawn_scheduler(scx_crate.clone(), args.clone(), child_id.clone()).await; + let child = spawn_scheduler(scx_crate.clone(), args.clone()).await; let mut failed = false; if let Ok(mut child) = child { - let status = child - .wait() - .await - .expect("child process encountered an error"); - - if !status.success() { - failed = true; - } - log::debug!("Child process exited with status: {status:?}"); + tokio::select! { + status = child.wait() => { + let status = status.expect("child process encountered an error"); + last_status = Some(status); + if !status.success() { + failed = true; + } + log::debug!("Child process exited with status: {status:?}"); + } + + _ = cancel_token.cancelled() => { + log::debug!("Received cancellation signal"); + // Send SIGINT + if let Some(child_id) = child.id() { + nix::sys::signal::kill( + nix::unistd::Pid::from_raw(child_id as i32), + nix::sys::signal::SIGINT, + ).context("Failed to send termination signal to the child")?; + } + let status = child.wait().await.expect("child process encountered an error"); + last_status = Some(status); + break; + } + }; } else { log::debug!("Failed to spawn child process"); failed = true; @@ -461,7 +478,6 @@ async fn start_scheduler( // retrying if failed, otherwise exit if !failed { - child_id.store(0, Ordering::Relaxed); break; } @@ -472,19 +488,16 @@ async fn start_scheduler( max_retries, ); } - child_id.store(0, Ordering::Relaxed); + + Ok(last_status) }); - Ok(()) + Ok(handle) } /// Starts the scheduler as a child process and returns child object to manage lifecycle by the /// caller. -async fn spawn_scheduler( - scx_crate: SupportedSched, - args: Vec, - child_id: Arc, -) -> Result { +async fn spawn_scheduler(scx_crate: SupportedSched, args: Vec) -> Result { let sched_bin_name: &str = scx_crate.into(); log::info!("starting {sched_bin_name} command"); @@ -500,38 +513,19 @@ async fn spawn_scheduler( // spawn process let child = cmd.spawn().expect("failed to spawn command"); - // NOTE: unsafe because the child might not exist, when we will try to stop it - // set child id - child_id.store( - child - .id() - .ok_or(anyhow::anyhow!("Failed to get child id"))?, - Ordering::Relaxed, - ); - Ok(child) } -async fn stop_scheduler(child_id: Arc) -> Result<()> { - // if child_proc is 0, then we assume the child process is terminated - let child_proc = child_id.load(Ordering::Relaxed); - if child_proc == 0 { - return Ok(()); +async fn stop_scheduler( + task: &mut Option>>>, + cancel_token: &mut Arc, +) { + if let Some(task) = task.take() { + log::debug!("Stopping already running scheduler.."); + cancel_token.cancel(); + let status = task.await; + log::debug!("Scheduler was stopped with status: {:?}", status); + // Create a new cancellation token + *cancel_token = Arc::new(tokio_util::sync::CancellationToken::new()); } - - // send SIGINT signal to child - log::debug!("Stopping already running scheduler.."); - nix::sys::signal::kill( - nix::unistd::Pid::from_raw(child_proc as i32), - nix::sys::signal::SIGINT, - ) - .context("Failed to send termination signal to the child")?; - - // Wait for the child process to exit and child_id to become 0 - while child_id.load(Ordering::Relaxed) != 0 { - tokio::time::sleep(Duration::from_secs(1)).await; - } - log::debug!("Scheduler was stopped"); - - Ok(()) }