Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Windows: Make stdin pipes synchronous #96441

Merged
merged 3 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions library/std/src/os/windows/io/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ impl OwnedHandle {
})?;
unsafe { Ok(Self::from_raw_handle(ret)) }
}

/// Allow child processes to inherit the handle.
pub(crate) fn set_inheritable(&self) -> io::Result<()> {
cvt(unsafe {
c::SetHandleInformation(
self.as_raw_handle(),
c::HANDLE_FLAG_INHERIT,
c::HANDLE_FLAG_INHERIT,
)
})?;
Ok(())
}
}

impl TryFrom<HandleOrInvalid> for OwnedHandle {
Expand Down
6 changes: 6 additions & 0 deletions library/std/src/sys/windows/c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,6 +1022,12 @@ extern "system" {
bWaitAll: BOOL,
dwMilliseconds: DWORD,
) -> DWORD;
pub fn CreatePipe(
hReadPipe: *mut HANDLE,
hWritePipe: *mut HANDLE,
lpPipeAttributes: *const SECURITY_ATTRIBUTES,
nSize: DWORD,
) -> BOOL;
pub fn CreateNamedPipeW(
lpName: LPCWSTR,
dwOpenMode: DWORD,
Expand Down
4 changes: 4 additions & 0 deletions library/std/src/sys/windows/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,10 @@ impl Handle {
Ok(Self(self.0.duplicate(access, inherit, options)?))
}

pub(crate) fn set_inheritable(&self) -> io::Result<()> {
self.0.set_inheritable()
}

/// Performs a synchronous read.
///
/// If the handle is opened for asynchronous I/O then this abort the process.
Expand Down
90 changes: 65 additions & 25 deletions library/std/src/sys/windows/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,56 @@ use crate::sys_common::IntoInner;
// Anonymous pipes
////////////////////////////////////////////////////////////////////////////////

pub struct AnonPipe {
inner: Handle,
// A 64kb pipe capacity is the same as a typical Linux default.
const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;

pub enum AnonPipe {
Sync(Handle),
Async(Handle),
}

impl IntoInner<Handle> for AnonPipe {
fn into_inner(self) -> Handle {
self.inner
match self {
Self::Sync(handle) => handle,
Self::Async(handle) => handle,
}
}
}

pub struct Pipes {
pub ours: AnonPipe,
pub theirs: AnonPipe,
}
impl Pipes {
/// Create a new pair of pipes where both pipes are synchronous.
///
/// These must not be used asynchronously.
pub fn new_synchronous(
ours_readable: bool,
their_handle_inheritable: bool,
) -> io::Result<Self> {
unsafe {
// If `CreatePipe` succeeds, these will be our pipes.
let mut read = ptr::null_mut();
let mut write = ptr::null_mut();

if c::CreatePipe(&mut read, &mut write, ptr::null(), PIPE_BUFFER_CAPACITY) == 0 {
Err(io::Error::last_os_error())
} else {
let (ours, theirs) = if ours_readable { (read, write) } else { (write, read) };
let ours = Handle::from_raw_handle(ours);
let theirs = Handle::from_raw_handle(theirs);

if their_handle_inheritable {
theirs.set_inheritable()?;
}

Ok(Pipes { ours: AnonPipe::Sync(ours), theirs: AnonPipe::Sync(theirs) })
}
}
}
}

/// Although this looks similar to `anon_pipe` in the Unix module it's actually
/// subtly different. Here we'll return two pipes in the `Pipes` return value,
Expand All @@ -53,9 +89,6 @@ pub struct Pipes {
/// with `OVERLAPPED` instances, but also works out ok if it's only ever used
/// once at a time (which we do indeed guarantee).
pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Result<Pipes> {
// A 64kb pipe capacity is the same as a typical Linux default.
const PIPE_BUFFER_CAPACITY: u32 = 64 * 1024;

// Note that we specifically do *not* use `CreatePipe` here because
// unfortunately the anonymous pipes returned do not support overlapped
// operations. Instead, we create a "hopefully unique" name and create a
Expand Down Expand Up @@ -156,12 +189,9 @@ pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Res
};
opts.security_attributes(&mut sa);
let theirs = File::open(Path::new(&name), &opts)?;
let theirs = AnonPipe { inner: theirs.into_inner() };
let theirs = AnonPipe::Sync(theirs.into_inner());

Ok(Pipes {
ours: AnonPipe { inner: ours },
theirs: AnonPipe { inner: theirs.into_inner() },
})
Ok(Pipes { ours: AnonPipe::Async(ours), theirs })
}
}

Expand All @@ -171,12 +201,12 @@ pub fn anon_pipe(ours_readable: bool, their_handle_inheritable: bool) -> io::Res
/// This is achieved by creating a new set of pipes and spawning a thread that
/// relays messages between the source and the synchronous pipe.
pub fn spawn_pipe_relay(
source: &AnonPipe,
source: &Handle,
ours_readable: bool,
their_handle_inheritable: bool,
) -> io::Result<AnonPipe> {
// We need this handle to live for the lifetime of the thread spawned below.
let source = source.duplicate()?;
let source = AnonPipe::Async(source.duplicate(0, true, c::DUPLICATE_SAME_ACCESS)?);

// create a new pair of anon pipes.
let Pipes { theirs, ours } = anon_pipe(ours_readable, their_handle_inheritable)?;
Expand Down Expand Up @@ -227,19 +257,24 @@ type AlertableIoFn = unsafe extern "system" fn(

impl AnonPipe {
pub fn handle(&self) -> &Handle {
&self.inner
match self {
Self::Async(ref handle) => handle,
Self::Sync(ref handle) => handle,
}
}
pub fn into_handle(self) -> Handle {
self.inner
}
fn duplicate(&self) -> io::Result<Self> {
self.inner.duplicate(0, false, c::DUPLICATE_SAME_ACCESS).map(|inner| AnonPipe { inner })
self.into_inner()
}

pub fn read(&self, buf: &mut [u8]) -> io::Result<usize> {
let result = unsafe {
let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
match self {
Self::Sync(ref handle) => handle.read(buf),
Self::Async(_) => {
self.alertable_io_internal(c::ReadFileEx, buf.as_mut_ptr() as _, len)
}
}
};

match result {
Expand All @@ -253,28 +288,33 @@ impl AnonPipe {
}

pub fn read_vectored(&self, bufs: &mut [IoSliceMut<'_>]) -> io::Result<usize> {
self.inner.read_vectored(bufs)
io::default_read_vectored(|buf| self.read(buf), bufs)
}

#[inline]
pub fn is_read_vectored(&self) -> bool {
self.inner.is_read_vectored()
false
}

pub fn write(&self, buf: &[u8]) -> io::Result<usize> {
unsafe {
let len = crate::cmp::min(buf.len(), c::DWORD::MAX as usize) as c::DWORD;
self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
match self {
Self::Sync(ref handle) => handle.write(buf),
Self::Async(_) => {
self.alertable_io_internal(c::WriteFileEx, buf.as_ptr() as _, len)
}
}
}
}

pub fn write_vectored(&self, bufs: &[IoSlice<'_>]) -> io::Result<usize> {
self.inner.write_vectored(bufs)
io::default_write_vectored(|buf| self.write(buf), bufs)
}

#[inline]
pub fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored()
false
}

/// Synchronizes asynchronous reads or writes using our anonymous pipe.
Expand Down Expand Up @@ -346,7 +386,7 @@ impl AnonPipe {

// Asynchronous read of the pipe.
// If successful, `callback` will be called once it completes.
let result = io(self.inner.as_handle(), buf, len, &mut overlapped, callback);
let result = io(self.handle().as_handle(), buf, len, &mut overlapped, callback);
if result == c::FALSE {
// We can return here because the call failed.
// After this we must not return until the I/O completes.
Expand Down
38 changes: 32 additions & 6 deletions library/std/src/sys/windows/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::sys::cvt;
use crate::sys::fs::{File, OpenOptions};
use crate::sys::handle::Handle;
use crate::sys::path;
use crate::sys::pipe::{self, AnonPipe};
use crate::sys::pipe::{self, AnonPipe, Pipes};
use crate::sys::stdio;
use crate::sys_common::mutex::StaticMutex;
use crate::sys_common::process::{CommandEnv, CommandEnvs};
Expand Down Expand Up @@ -173,7 +173,7 @@ pub enum Stdio {
Inherit,
Null,
MakePipe,
Pipe(AnonPipe),
AsyncPipe(Handle),
Handle(Handle),
}

Expand Down Expand Up @@ -527,13 +527,33 @@ impl Stdio {
},

Stdio::MakePipe => {
let ours_readable = stdio_id != c::STD_INPUT_HANDLE;
let pipes = pipe::anon_pipe(ours_readable, true)?;
// Handles that are passed to a child process must be synchronous
// because they will be read synchronously (see #95759).
// Therefore we prefer to make both ends of a pipe synchronous
// just in case our end of the pipe is passed to another process.
//
// However, we may need to read from both the child's stdout and
// stderr simultaneously when waiting for output. This requires
// async reads so as to avoid blocking either pipe.
//
// The solution used here is to make handles synchronous
// except for our side of the stdout and sterr pipes.
// If our side of those pipes do end up being given to another
// process then we use a "pipe relay" to synchronize access
// (see `Stdio::AsyncPipe` below).
let pipes = if stdio_id == c::STD_INPUT_HANDLE {
// For stdin both sides of the pipe are synchronous.
Pipes::new_synchronous(false, true)?
} else {
// For stdout/stderr our side of the pipe is async and their side is synchronous.
pipe::anon_pipe(true, true)?
};
*pipe = Some(pipes.ours);
Ok(pipes.theirs.into_handle())
}

Stdio::Pipe(ref source) => {
Stdio::AsyncPipe(ref source) => {
// We need to synchronize asynchronous pipes by using a pipe relay.
let ours_readable = stdio_id != c::STD_INPUT_HANDLE;
pipe::spawn_pipe_relay(source, ours_readable, true).map(AnonPipe::into_handle)
}
Expand Down Expand Up @@ -562,7 +582,13 @@ impl Stdio {

impl From<AnonPipe> for Stdio {
fn from(pipe: AnonPipe) -> Stdio {
Stdio::Pipe(pipe)
// Note that it's very important we don't give async handles to child processes.
ChrisDenton marked this conversation as resolved.
Show resolved Hide resolved
// Therefore if the pipe is asynchronous we must have a way to turn it synchronous.
// See #95759.
match pipe {
AnonPipe::Sync(handle) => Stdio::Handle(handle),
AnonPipe::Async(handle) => Stdio::AsyncPipe(handle),
}
}
}

Expand Down