Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

petri: use a single method to parse a stream of lines #893

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -8598,6 +8598,7 @@ dependencies = [
"mesh",
"mesh_rpc",
"nvme_resources",
"pal",
"pal_async",
"petri",
"petri_artifact_resolver_openvmm_known_paths",
Expand Down
4 changes: 2 additions & 2 deletions petri/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,8 +303,8 @@ impl<'a> MakeWriter<'a> for PetriWriter {
}
}

/// read from the serial reader and write entries to the log
pub async fn serial_log_task(
/// Logs lines from `reader` into `log_file`.
pub async fn log_stream(
log_file: PetriLogFile,
reader: impl AsyncRead + Unpin + Send + 'static,
) -> anyhow::Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion petri/src/vm/hyperv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl PetriVmConfigHyperV {
diag_client::hyperv::ComPortAccessInfo::PortPipePath(&serial_pipe_path),
)
.await?;
crate::serial_log_task(serial_log_file, PolledPipe::new(&driver, serial)?).await
crate::log_stream(serial_log_file, PolledPipe::new(&driver, serial)?).await
}
}));

Expand Down
8 changes: 4 additions & 4 deletions petri/src/vm/openvmm/construct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl PetriVmConfigOpenVmm {

let SerialData {
mut emulated_serial_config,
serial_tasks,
serial_tasks: log_stream_tasks,
linux_direct_serial_agent,
} = setup.configure_serial(params.logger)?;

Expand Down Expand Up @@ -357,7 +357,7 @@ impl PetriVmConfigOpenVmm {
config,

resources: PetriVmResourcesOpenVmm {
serial_tasks,
log_stream_tasks,
firmware_event_recv,
shutdown_ic_send,
expected_boot_event,
Expand Down Expand Up @@ -424,7 +424,7 @@ impl PetriVmConfigSetupCore<'_> {
let (serial0_read, serial0_write) = serial0_host.split();
let serial0_task = self.driver.spawn(
"serial0-console",
crate::serial_log_task(serial0_log_file, serial0_read),
crate::log_stream(serial0_log_file, serial0_read),
);
serial_tasks.push(serial0_task);

Expand All @@ -434,7 +434,7 @@ impl PetriVmConfigSetupCore<'_> {
.context("failed to create serial2 stream")?;
let serial2_task = self.driver.spawn(
"serial2-openhcl",
crate::serial_log_task(logger.log_file("openhcl")?, serial2_host),
crate::log_stream(logger.log_file("openhcl")?, serial2_host),
);
serial_tasks.push(serial2_task);
serial2
Expand Down
2 changes: 1 addition & 1 deletion petri/src/vm/openvmm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ impl PetriVmConfig for PetriVmConfigOpenVmm {

/// Various channels and resources used to interact with the VM while it is running.
struct PetriVmResourcesOpenVmm {
serial_tasks: Vec<Task<anyhow::Result<()>>>,
log_stream_tasks: Vec<Task<anyhow::Result<()>>>,
firmware_event_recv: Receiver<FirmwareEvent>,
shutdown_ic_send: Sender<ShutdownRpc>,
expected_boot_event: Option<FirmwareEvent>,
Expand Down
2 changes: 1 addition & 1 deletion petri/src/vm/openvmm/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ impl PetriVmOpenVmm {
self.inner.mesh.shutdown().await;

tracing::info!("Mesh shutdown, waiting for logging tasks");
for t in self.inner.resources.serial_tasks {
for t in self.inner.resources.log_stream_tasks {
t.await?;
}

Expand Down
37 changes: 15 additions & 22 deletions petri/src/vm/openvmm/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

use super::PetriVmConfigOpenVmm;
use super::PetriVmOpenVmm;
use super::PetriVmResourcesOpenVmm;
use crate::worker::Worker;
use crate::Firmware;
use crate::PetriLogFile;
Expand All @@ -19,6 +20,7 @@ use image::ColorType;
use mesh_process::Mesh;
use mesh_process::ProcessConfig;
use mesh_worker::WorkerHost;
use pal_async::pipe::PolledPipe;
use pal_async::task::Spawn;
use pal_async::task::Task;
use pal_async::timer::PolledTimer;
Expand All @@ -27,9 +29,7 @@ use petri_artifacts_common::tags::MachineArch;
use petri_artifacts_common::tags::OsFlavor;
use pipette_client::PipetteClient;
use scsidisk_resources::SimpleScsiDiskHandle;
use std::io::BufRead;
use std::io::Write;
use std::path::Path;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -45,7 +45,7 @@ impl PetriVmConfigOpenVmm {
arch,
mut config,

resources,
mut resources,

openvmm_log_file,

Expand All @@ -71,7 +71,7 @@ impl PetriVmConfigOpenVmm {

let mesh = Mesh::new("petri_mesh".to_string())?;

let host = Self::openvmm_host(&mesh, openvmm_log_file, resources.openvmm_path.as_ref())
let host = Self::openvmm_host(&mut resources, &mesh, openvmm_log_file)
.await
.context("failed to create host process")?;
let (worker, halt_notif) = Worker::launch(&host, config)
Expand Down Expand Up @@ -362,34 +362,27 @@ impl PetriVmConfigOpenVmm {
}

async fn openvmm_host(
resources: &mut PetriVmResourcesOpenVmm,
mesh: &Mesh,
log_file: PetriLogFile,
path: &Path,
) -> anyhow::Result<WorkerHost> {
// Copy the child's stderr to this process's, since internally this is
// wrapped by the test harness.
let (stderr_read, stderr_write) = pal::pipe_pair()?;
std::thread::spawn(move || {
let read = std::io::BufReader::new(stderr_read);
for line in read.lines() {
match line {
Ok(line) => {
log_file.write_entry(line);
}
Err(err) => {
tracing::warn!(
error = &err as &dyn std::error::Error,
"error reading hvlite stderr"
);
}
}
}
});
let task = resources.driver.spawn(
"serial log",
crate::log_stream(
log_file,
PolledPipe::new(&resources.driver, stderr_read)
.context("failed to create polled pipe")?,
),
);
resources.log_stream_tasks.push(task);

let (host, runner) = mesh_worker::worker_host();
mesh.launch_host(
ProcessConfig::new("vmm")
.process_name(path)
.process_name(&resources.openvmm_path)
.stderr(Some(stderr_write)),
hvlite_defs::entrypoint::MeshHostParams { runner },
)
Expand Down
3 changes: 2 additions & 1 deletion support/pal/pal_async/src/windows/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ pub struct PolledPipe {
impl PolledPipe {
/// Configures a pipe file for polled use.
///
/// Due to platform limitations, this will fail for unidirectional pipes and unbuffered pipes.
/// Due to platform limitations, this will fail for unidirectional pipes on
/// older versions of Windows and on unbuffered pipes.
pub fn new(driver: &(impl ?Sized + Driver), file: File) -> io::Result<Self> {
let message_mode = file.get_pipe_state()? & PIPE_READMODE_MESSAGE != 0;
Self::new_internal(driver, file, message_mode)
Expand Down
43 changes: 30 additions & 13 deletions support/pal/src/windows/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,19 +314,30 @@ impl PipeExt for File {
};
unsafe {
let mut iosb = zeroed();
chk_status(NtFsControlFile(
self.as_raw_handle(),
null_mut(),
None,
null_mut(),
&mut iosb,
FSCTL_PIPE_EVENT_SELECT,
std::ptr::from_mut::<FILE_PIPE_EVENT_SELECT_BUFFER>(&mut input)
.cast::<std::ffi::c_void>(),
size_of_val(&input) as u32,
null_mut(),
0,
))?;
let mut status = !0;
// Newer versions of Windows support FSCTL_PIPE_EVENT_SELECT, which
// works on unidirectional pipes. Older versions require
// FSCTL_PIPE_EVENT_SELECT_OLD, which only works on bidirectional
// pipes.
for fsctl in [FSCTL_PIPE_EVENT_SELECT, FSCTL_PIPE_EVENT_SELECT_OLD] {
status = NtFsControlFile(
self.as_raw_handle(),
null_mut(),
None,
null_mut(),
&mut iosb,
fsctl,
std::ptr::from_mut::<FILE_PIPE_EVENT_SELECT_BUFFER>(&mut input)
.cast::<std::ffi::c_void>(),
size_of_val(&input) as u32,
null_mut(),
0,
);
if status != winapi::shared::ntstatus::STATUS_NOT_SUPPORTED {
break;
}
}
chk_status(status)?;
}
Ok(())
}
Expand Down Expand Up @@ -394,6 +405,12 @@ const fn ctl_code(device_type: u32, function: u32, method: u32, access: u32) ->
}

const FSCTL_PIPE_EVENT_SELECT: u32 = ctl_code(
winioctl::FILE_DEVICE_NAMED_PIPE,
3071,
winioctl::METHOD_BUFFERED,
winioctl::FILE_ANY_ACCESS,
);
const FSCTL_PIPE_EVENT_SELECT_OLD: u32 = ctl_code(
winioctl::FILE_DEVICE_NAMED_PIPE,
3071,
winioctl::METHOD_BUFFERED,
Expand Down
1 change: 1 addition & 0 deletions vmm_tests/vmm_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ vmm_core_defs.workspace = true

guid.workspace = true
mesh.workspace = true
pal.workspace = true
pal_async.workspace = true
unix_socket.workspace = true

Expand Down
50 changes: 22 additions & 28 deletions vmm_tests/vmm_tests/tests/tests/ttrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
use anyhow::Context;
use guid::Guid;
use hvlite_ttrpc_vmservice as vmservice;
use pal_async::pipe::PolledPipe;
use pal_async::socket::PolledSocket;
use pal_async::task::Spawn;
use pal_async::DefaultPool;
use petri::ResolvedArtifact;
use petri_artifacts_vmm_test::artifacts;
use std::io::BufRead;
use std::io::BufReader;
use std::io::Read;
use std::process::Stdio;
use unix_socket::UnixStream;
Expand All @@ -35,32 +36,31 @@ fn test_ttrpc_interface(

tracing::info!(socket_path = %socket_path.display(), "launching hvlite with ttrpc");

let (stderr_read, stderr_write) = pal::pipe_pair()?;
let mut child = std::process::Command::new(openvmm)
.arg("--ttrpc")
.arg(&socket_path)
.stdin(Stdio::null())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stderr(stderr_write)
.spawn()?;

// Wait for stdout to close.
let mut stdout = child.stdout.take().context("failed to take stdout")?;
let mut b = [0];
assert_eq!(stdout.read(&mut b)?, 0);

// Copy the child's stderr to this process's, since internally this is
// wrapped by the test harness.
let stderr = child.stderr.take().context("failed to take stderr")?;
let stderr_log = params.logger.log_file("stderr").unwrap();
std::thread::spawn(move || {
let stderr = BufReader::new(stderr);
for line in stderr.lines() {
stderr_log.write_entry(line.unwrap());
}
});
DefaultPool::run_with(|driver| async {
let driver = driver;
let _stderr_task = driver.spawn(
"stderr",
petri::log_stream(
params.logger.log_file("stderr").unwrap(),
PolledPipe::new(&driver, stderr_read).unwrap(),
),
);

let ttrpc_path = socket_path.clone();
DefaultPool::run_with(|driver| async move {
let ttrpc_path = socket_path.clone();
let client = mesh_rpc::Client::new(
&driver,
mesh_rpc::client::UnixDialier::new(driver.clone(), ttrpc_path),
Expand Down Expand Up @@ -108,19 +108,13 @@ fn test_ttrpc_interface(

let com1 = UnixStream::connect(&com1_path).unwrap();

let com1_log = params.logger.log_file("linux").unwrap();
std::thread::spawn(move || {
let read = BufReader::new(com1);
for line in read.lines() {
match line {
Ok(line) => com1_log.write_entry(line),
Err(e) => tracing::error!(
error = &e as &dyn std::error::Error,
"failed to read from com1"
),
}
}
});
let _com1_task = driver.spawn(
"com1",
petri::log_stream(
params.logger.log_file("linux").unwrap(),
PolledSocket::new(&driver, com1).unwrap(),
),
);

assert_eq!(
client
Expand Down