Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/kit/src/container_entrypoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ pub async fn run(opts: ContainerEntrypointOpts) -> Result<()> {
tokio::select! {
_ = signal_joinset.join_next() => {
debug!("Caught termination signal");
std::process::exit(0)
Ok(())
}
r = async {
match opts.command {
Expand Down
10 changes: 7 additions & 3 deletions crates/kit/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::ffi::OsString;

use cap_std_ext::cap_std::fs::Dir;
use clap::{Parser, Subcommand};
use color_eyre::{Report, Result};
use color_eyre::{eyre::Context as _, Report, Result};

mod arch;
mod boot_progress;
Expand Down Expand Up @@ -164,6 +164,10 @@ fn main() -> Result<(), Report> {
color_eyre::install()?;

let cli = Cli::parse();
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.context("Init tokio runtime")?;

match cli.command {
Commands::Hostexec(opts) => {
Expand All @@ -185,7 +189,6 @@ fn main() -> Result<(), Report> {
}
Commands::ContainerEntrypoint(opts) => {
// Create a tokio runtime for async container entrypoint operations
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(container_entrypoint::run(opts))?;
}
Commands::DebugInternals(opts) => match opts.command {
Expand All @@ -208,5 +211,6 @@ fn main() -> Result<(), Report> {
}
},
}
Ok(())
tracing::debug!("exiting");
std::process::exit(0)
}
18 changes: 8 additions & 10 deletions crates/kit/src/run_ephemeral.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1090,24 +1090,26 @@ Options=
let status_writer_clone = StatusWriter::new("/run/supervisor-status.json");

// Only enable systemd notification debugging if the systemd version supports it
let monitor = if systemd_version
if systemd_version
.map(|v| v.has_vmm_notify())
.unwrap_or_default()
{
let (piper, pipew) = rustix::pipe::pipe()?;
qemu_config.systemd_notify = Some(File::from(pipew));
debug!("Enabling systemd notification debugging");

let monitor = tokio::task::spawn(boot_progress::monitor_boot_progress(
// Run this in the background
let _ = tokio::task::spawn(boot_progress::monitor_boot_progress(
File::from(piper),
status_writer_clone,
));
Some(monitor)
} else {
debug!("systemd version does not support vmm.notify_socket",);
// For older systemd versions, write an unknown state
let _ = status_writer.update(SupervisorStatus { state: None });
None
status_writer.update(SupervisorStatus {
state: None,
running: true,
})?;
};

debug!("Starting QEMU with systemd debugging enabled");
Expand Down Expand Up @@ -1158,12 +1160,8 @@ Options=

drop(tmp_swapfile);

// Cancel the systemd monitor task if it exists
if let Some(monitor) = monitor {
monitor.abort();
}

debug!("QEMU completed successfully");
status_writer.finish()?;

Ok(())
}
28 changes: 18 additions & 10 deletions crates/kit/src/run_ephemeral_ssh.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use color_eyre::eyre::{eyre, Context as _};
use color_eyre::Result;
use indicatif::ProgressBar;
use std::os::unix::process::CommandExt;
use std::process::{Command, Stdio};
use std::thread;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -36,18 +37,25 @@ pub fn wait_for_vm_ready(
);

// Use the new monitor-status subcommand for efficient inotify-based monitoring
let mut cmd = Command::new("podman")
.args([
"exec",
container_name,
"/var/lib/bcvk/entrypoint",
"monitor-status",
])
let mut cmd = Command::new("podman");
cmd.args([
"exec",
container_name,
"/var/lib/bcvk/entrypoint",
"monitor-status",
]);
unsafe {
cmd.pre_exec(|| {
rustix::process::set_parent_process_death_signal(Some(rustix::process::Signal::TERM))
.map_err(Into::into)
});
}
let mut child = cmd
.stdout(Stdio::piped())
.spawn()
.context("Failed to start status monitor")?;

let stdout = cmd.stdout.take().unwrap();
let stdout = child.stdout.take().unwrap();
let reader = std::io::BufReader::new(stdout);

// Read JSON lines from the monitor
Expand All @@ -64,7 +72,7 @@ pub fn wait_for_vm_ready(
progress.set_message("Ready");

// End the monitor
let _ = cmd.kill();
let _ = child.kill();
return Ok((true, progress));
}
SupervisorState::ReachedTarget(ref target) => {
Expand All @@ -82,7 +90,7 @@ pub fn wait_for_vm_ready(
}
}

let status = cmd.wait()?;
let status = child.wait()?;
Err(eyre!("Monitor process exited unexpectedly: {status:?}"))
}

Expand Down
28 changes: 15 additions & 13 deletions crates/kit/src/status_monitor.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use color_eyre::Result;
use notify::{Config, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
use std::io::Write;
use std::path::Path;
use std::sync::mpsc::{self, Receiver};
use std::time::Duration;
use tracing::{debug, warn};

use crate::supervisor_status::SupervisorStatus;

/// Monitor a status file for changes using inotify
pub fn monitor_status_file<P: AsRef<Path>>(
path: P,
timeout: Duration,
) -> Result<impl Iterator<Item = Result<SupervisorStatus>>> {
let path = path.as_ref();
let parent_dir = path.parent().unwrap_or(Path::new("/"));
Expand All @@ -33,7 +32,6 @@ pub fn monitor_status_file<P: AsRef<Path>>(
path: path.to_path_buf(),
receiver: rx,
_watcher: watcher,
timeout,
last_mtime: None,
})
}
Expand All @@ -42,7 +40,6 @@ struct StatusFileIterator {
path: std::path::PathBuf,
receiver: Receiver<notify::Result<notify::Event>>,
_watcher: RecommendedWatcher,
timeout: Duration,
last_mtime: Option<std::time::SystemTime>,
}

Expand All @@ -57,7 +54,7 @@ impl Iterator for StatusFileIterator {
}

// Wait for file system events with timeout
let event = self.receiver.recv_timeout(self.timeout).ok()?.ok()?;
let event = self.receiver.recv().ok()?.ok()?;
// Check if this event is for our target file
if self.is_relevant_event(&event) {
if let Some(status) = self.try_read_status_if_changed() {
Expand Down Expand Up @@ -108,19 +105,24 @@ impl StatusFileIterator {
/// Monitor status and stream updates to stdout as JSON lines
pub fn monitor_and_stream_status() -> Result<()> {
let path = "/run/supervisor-status.json";
let timeout = Duration::from_secs(60); // Default timeout

let monitor = monitor_status_file(path, timeout)?;
let monitor = monitor_status_file(path)?;

for status_result in monitor {
match status_result {
Ok(status) => {
// Output as JSON line - just stream every update
if let Ok(json) = serde_json::to_string(&status) {
println!("{}", json);
// Flush stdout to ensure immediate delivery
use std::io::Write;
std::io::stdout().flush().unwrap_or(());
// Output as JSON line - just stream every update. We don't panic
// or error on failure to write, just silently exit as we assume
// the caller intentionally dropped.
let mut stdout = std::io::stdout().lock();
if let Err(_) = serde_json::to_writer(&mut stdout, &status) {
return Ok(());
}
let _ = stdout.write(b"\n");
let _ = stdout.flush()?;
// Terminate stream when qemu exits
if !status.running {
return Ok(());
}
}
Err(e) => {
Expand Down
12 changes: 11 additions & 1 deletion crates/kit/src/supervisor_status.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use cap_std_ext::{cap_std, cap_std::fs::Dir, dirext::CapStdExtDirExt};
use color_eyre::Result;
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::Path;
Expand All @@ -9,6 +10,8 @@ use std::path::Path;
pub struct SupervisorStatus {
/// Current state of the supervisor/VM
pub state: Option<SupervisorState>,
/// True if qemu is running
pub running: bool,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
Expand Down Expand Up @@ -37,7 +40,7 @@ impl SupervisorStatus {
pub fn new(state: SupervisorState) -> Self {
Self {
state: Some(state),
..Default::default()
running: true,
}
}

Expand Down Expand Up @@ -80,4 +83,11 @@ impl StatusWriter {
pub fn update_state(&self, state: SupervisorState) -> color_eyre::Result<()> {
self.update(SupervisorStatus::new(state))
}

pub fn finish(self) -> Result<()> {
self.update(SupervisorStatus {
running: false,
..Default::default()
})
}
}
Loading