From 96b2d928aa63de41a3e1a977af81b47cad553c79 Mon Sep 17 00:00:00 2001 From: Colin Walters Date: Fri, 19 Sep 2025 14:44:19 -0400 Subject: [PATCH] Fix task leakage after qemu exit The recent changes to ssh had broke container exits after qemu had finished, which broke to-disk etc. --- crates/kit/src/container_entrypoint.rs | 2 +- crates/kit/src/main.rs | 10 ++++++--- crates/kit/src/run_ephemeral.rs | 18 ++++++++--------- crates/kit/src/run_ephemeral_ssh.rs | 28 +++++++++++++++++--------- crates/kit/src/status_monitor.rs | 28 ++++++++++++++------------ crates/kit/src/supervisor_status.rs | 12 ++++++++++- 6 files changed, 60 insertions(+), 38 deletions(-) diff --git a/crates/kit/src/container_entrypoint.rs b/crates/kit/src/container_entrypoint.rs index c9479e1..9814905 100644 --- a/crates/kit/src/container_entrypoint.rs +++ b/crates/kit/src/container_entrypoint.rs @@ -102,7 +102,7 @@ pub async fn run(opts: ContainerEntrypointOpts) -> Result<()> { tokio::select! { _ = signal_joinset.join_next() => { debug!("Caught termination signal"); - std::process::exit(0) + Ok(()) } r = async { match opts.command { diff --git a/crates/kit/src/main.rs b/crates/kit/src/main.rs index 1070617..18bfb25 100644 --- a/crates/kit/src/main.rs +++ b/crates/kit/src/main.rs @@ -2,7 +2,7 @@ use std::ffi::OsString; use cap_std_ext::cap_std::fs::Dir; use clap::{Parser, Subcommand}; -use color_eyre::{Report, Result}; +use color_eyre::{eyre::Context as _, Report, Result}; mod arch; mod boot_progress; @@ -164,6 +164,10 @@ fn main() -> Result<(), Report> { color_eyre::install()?; let cli = Cli::parse(); + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .context("Init tokio runtime")?; match cli.command { Commands::Hostexec(opts) => { @@ -185,7 +189,6 @@ fn main() -> Result<(), Report> { } Commands::ContainerEntrypoint(opts) => { // Create a tokio runtime for async container entrypoint operations - let rt = tokio::runtime::Runtime::new()?; rt.block_on(container_entrypoint::run(opts))?; } Commands::DebugInternals(opts) => match opts.command { @@ -208,5 +211,6 @@ fn main() -> Result<(), Report> { } }, } - Ok(()) + tracing::debug!("exiting"); + std::process::exit(0) } diff --git a/crates/kit/src/run_ephemeral.rs b/crates/kit/src/run_ephemeral.rs index 30f3c95..a372c71 100644 --- a/crates/kit/src/run_ephemeral.rs +++ b/crates/kit/src/run_ephemeral.rs @@ -1090,7 +1090,7 @@ Options= let status_writer_clone = StatusWriter::new("/run/supervisor-status.json"); // Only enable systemd notification debugging if the systemd version supports it - let monitor = if systemd_version + if systemd_version .map(|v| v.has_vmm_notify()) .unwrap_or_default() { @@ -1098,16 +1098,18 @@ Options= qemu_config.systemd_notify = Some(File::from(pipew)); debug!("Enabling systemd notification debugging"); - let monitor = tokio::task::spawn(boot_progress::monitor_boot_progress( + // Run this in the background + let _ = tokio::task::spawn(boot_progress::monitor_boot_progress( File::from(piper), status_writer_clone, )); - Some(monitor) } else { debug!("systemd version does not support vmm.notify_socket",); // For older systemd versions, write an unknown state - let _ = status_writer.update(SupervisorStatus { state: None }); - None + status_writer.update(SupervisorStatus { + state: None, + running: true, + })?; }; debug!("Starting QEMU with systemd debugging enabled"); @@ -1158,12 +1160,8 @@ Options= drop(tmp_swapfile); - // Cancel the systemd monitor task if it exists - if let Some(monitor) = monitor { - monitor.abort(); - } - debug!("QEMU completed successfully"); + status_writer.finish()?; Ok(()) } diff --git a/crates/kit/src/run_ephemeral_ssh.rs b/crates/kit/src/run_ephemeral_ssh.rs index 3b3864a..3847e71 100644 --- a/crates/kit/src/run_ephemeral_ssh.rs +++ b/crates/kit/src/run_ephemeral_ssh.rs @@ -1,6 +1,7 @@ use color_eyre::eyre::{eyre, Context as _}; use color_eyre::Result; use indicatif::ProgressBar; +use std::os::unix::process::CommandExt; use std::process::{Command, Stdio}; use std::thread; use std::time::{Duration, Instant}; @@ -36,18 +37,25 @@ pub fn wait_for_vm_ready( ); // Use the new monitor-status subcommand for efficient inotify-based monitoring - let mut cmd = Command::new("podman") - .args([ - "exec", - container_name, - "/var/lib/bcvk/entrypoint", - "monitor-status", - ]) + let mut cmd = Command::new("podman"); + cmd.args([ + "exec", + container_name, + "/var/lib/bcvk/entrypoint", + "monitor-status", + ]); + unsafe { + cmd.pre_exec(|| { + rustix::process::set_parent_process_death_signal(Some(rustix::process::Signal::TERM)) + .map_err(Into::into) + }); + } + let mut child = cmd .stdout(Stdio::piped()) .spawn() .context("Failed to start status monitor")?; - let stdout = cmd.stdout.take().unwrap(); + let stdout = child.stdout.take().unwrap(); let reader = std::io::BufReader::new(stdout); // Read JSON lines from the monitor @@ -64,7 +72,7 @@ pub fn wait_for_vm_ready( progress.set_message("Ready"); // End the monitor - let _ = cmd.kill(); + let _ = child.kill(); return Ok((true, progress)); } SupervisorState::ReachedTarget(ref target) => { @@ -82,7 +90,7 @@ pub fn wait_for_vm_ready( } } - let status = cmd.wait()?; + let status = child.wait()?; Err(eyre!("Monitor process exited unexpectedly: {status:?}")) } diff --git a/crates/kit/src/status_monitor.rs b/crates/kit/src/status_monitor.rs index 3ff36f1..5bf5a3e 100644 --- a/crates/kit/src/status_monitor.rs +++ b/crates/kit/src/status_monitor.rs @@ -1,8 +1,8 @@ use color_eyre::Result; use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher}; +use std::io::Write; use std::path::Path; use std::sync::mpsc::{self, Receiver}; -use std::time::Duration; use tracing::{debug, warn}; use crate::supervisor_status::SupervisorStatus; @@ -10,7 +10,6 @@ use crate::supervisor_status::SupervisorStatus; /// Monitor a status file for changes using inotify pub fn monitor_status_file>( path: P, - timeout: Duration, ) -> Result>> { let path = path.as_ref(); let parent_dir = path.parent().unwrap_or(Path::new("/")); @@ -33,7 +32,6 @@ pub fn monitor_status_file>( path: path.to_path_buf(), receiver: rx, _watcher: watcher, - timeout, last_mtime: None, }) } @@ -42,7 +40,6 @@ struct StatusFileIterator { path: std::path::PathBuf, receiver: Receiver>, _watcher: RecommendedWatcher, - timeout: Duration, last_mtime: Option, } @@ -57,7 +54,7 @@ impl Iterator for StatusFileIterator { } // Wait for file system events with timeout - let event = self.receiver.recv_timeout(self.timeout).ok()?.ok()?; + let event = self.receiver.recv().ok()?.ok()?; // Check if this event is for our target file if self.is_relevant_event(&event) { if let Some(status) = self.try_read_status_if_changed() { @@ -108,19 +105,24 @@ impl StatusFileIterator { /// Monitor status and stream updates to stdout as JSON lines pub fn monitor_and_stream_status() -> Result<()> { let path = "/run/supervisor-status.json"; - let timeout = Duration::from_secs(60); // Default timeout - let monitor = monitor_status_file(path, timeout)?; + let monitor = monitor_status_file(path)?; for status_result in monitor { match status_result { Ok(status) => { - // Output as JSON line - just stream every update - if let Ok(json) = serde_json::to_string(&status) { - println!("{}", json); - // Flush stdout to ensure immediate delivery - use std::io::Write; - std::io::stdout().flush().unwrap_or(()); + // Output as JSON line - just stream every update. We don't panic + // or error on failure to write, just silently exit as we assume + // the caller intentionally dropped. + let mut stdout = std::io::stdout().lock(); + if let Err(_) = serde_json::to_writer(&mut stdout, &status) { + return Ok(()); + } + let _ = stdout.write(b"\n"); + let _ = stdout.flush()?; + // Terminate stream when qemu exits + if !status.running { + return Ok(()); } } Err(e) => { diff --git a/crates/kit/src/supervisor_status.rs b/crates/kit/src/supervisor_status.rs index d930957..fbe7a1f 100644 --- a/crates/kit/src/supervisor_status.rs +++ b/crates/kit/src/supervisor_status.rs @@ -1,4 +1,5 @@ use cap_std_ext::{cap_std, cap_std::fs::Dir, dirext::CapStdExtDirExt}; +use color_eyre::Result; use serde::{Deserialize, Serialize}; use std::fs; use std::path::Path; @@ -9,6 +10,8 @@ use std::path::Path; pub struct SupervisorStatus { /// Current state of the supervisor/VM pub state: Option, + /// True if qemu is running + pub running: bool, } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] @@ -37,7 +40,7 @@ impl SupervisorStatus { pub fn new(state: SupervisorState) -> Self { Self { state: Some(state), - ..Default::default() + running: true, } } @@ -80,4 +83,11 @@ impl StatusWriter { pub fn update_state(&self, state: SupervisorState) -> color_eyre::Result<()> { self.update(SupervisorStatus::new(state)) } + + pub fn finish(self) -> Result<()> { + self.update(SupervisorStatus { + running: false, + ..Default::default() + }) + } }