Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix IO safety in unix.rs #101

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,8 @@ harness = false
[[test]]
name = "helper"
path = "tests/helper.rs"

[[test]]
name = "spawn-after-drop"
path = "tests/spawn-after-drop.rs"
harness = false
126 changes: 70 additions & 56 deletions src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,15 @@ pub struct Acquired {

impl Client {
pub fn new(mut limit: usize) -> io::Result<Client> {
let client = unsafe { Client::mk()? };
let client = Client::mk()?;

// I don't think the character written here matters, but I could be
// wrong!
const BUFFER: [u8; 128] = [b'|'; 128];

let mut write = &client.write;

set_nonblocking(write.as_raw_fd(), true)?;
set_nonblocking(write.as_fd(), true)?;

while limit > 0 {
let n = limit.min(BUFFER.len());
Expand All @@ -64,39 +64,51 @@ impl Client {
limit -= n;
}

set_nonblocking(write.as_raw_fd(), false)?;
set_nonblocking(write.as_fd(), false)?;

Ok(client)
}

unsafe fn mk() -> io::Result<Client> {
let mut pipes = [0; 2];

fn mk() -> io::Result<Client> {
// Attempt atomically-create-with-cloexec if we can on Linux,
// detected by using the `syscall` function in `libc` to try to work
// with as many kernels/glibc implementations as possible.
#[cfg(target_os = "linux")]
{
static PIPE2_AVAILABLE: AtomicBool = AtomicBool::new(true);
if PIPE2_AVAILABLE.load(Ordering::SeqCst) {
match libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) {
if PIPE2_AVAILABLE.load(Ordering::Relaxed) {
let mut pipes = [0; 2];
match unsafe { libc::syscall(libc::SYS_pipe2, pipes.as_mut_ptr(), libc::O_CLOEXEC) }
{
-1 => {
let err = io::Error::last_os_error();
if err.raw_os_error() == Some(libc::ENOSYS) {
PIPE2_AVAILABLE.store(false, Ordering::SeqCst);
PIPE2_AVAILABLE.store(false, Ordering::Relaxed);
} else {
return Err(err);
}
}
_ => return Ok(Client::from_fds(pipes[0], pipes[1])),
_ => unsafe {
return Ok(Client::from_fds(
OwnedFd::from_raw_fd(pipes[0]),
OwnedFd::from_raw_fd(pipes[1]),
));
},
}
}
}

cvt(libc::pipe(pipes.as_mut_ptr()))?;
drop(set_cloexec(pipes[0], true));
drop(set_cloexec(pipes[1], true));
Ok(Client::from_fds(pipes[0], pipes[1]))
let (read, write) = unsafe {
let mut pipes = [0; 2];
cvt(libc::pipe(pipes.as_mut_ptr()))?;
(
OwnedFd::from_raw_fd(pipes[0]),
OwnedFd::from_raw_fd(pipes[1]),
)
};
set_cloexec(read.as_fd(), true)?;
set_cloexec(write.as_fd(), true)?;
Ok(Client::from_fds(read, write))
}

pub(crate) unsafe fn open(s: &str, check_pipe: bool) -> Result<Client, FromEnvErrorInner> {
Expand Down Expand Up @@ -211,18 +223,21 @@ impl Client {
}

Ok(Some(Client {
read: clone_fd_and_set_cloexec(read)?,
write: clone_fd_and_set_cloexec(write)?,
read: clone_fd_and_set_cloexec(BorrowedFd::borrow_raw(read))?,
write: clone_fd_and_set_cloexec(BorrowedFd::borrow_raw(write))?,
creation_arg,
is_non_blocking: None,
}))
}

unsafe fn from_fds(read: c_int, write: c_int) -> Client {
fn from_fds(read: OwnedFd, write: OwnedFd) -> Client {
Client {
read: File::from_raw_fd(read),
write: File::from_raw_fd(write),
creation_arg: ClientCreationArg::Fds { read, write },
creation_arg: ClientCreationArg::Fds {
read: read.as_raw_fd(),
write: write.as_raw_fd(),
},
read: read.into(),
write: write.into(),
is_non_blocking: None,
}
}
Expand Down Expand Up @@ -304,7 +319,7 @@ impl Client {

if let Some(is_non_blocking) = self.is_non_blocking.as_ref() {
if !is_non_blocking.load(Ordering::Relaxed) {
set_nonblocking(fifo.as_raw_fd(), true)?;
set_nonblocking(fifo.as_fd(), true)?;
is_non_blocking.store(true, Ordering::Relaxed);
}
} else {
Expand Down Expand Up @@ -357,24 +372,30 @@ impl Client {
Ok(unsafe { len.assume_init() } as usize)
}

pub fn configure(&self, cmd: &mut Command) {
if matches!(self.creation_arg, ClientCreationArg::Fifo { .. }) {
// We `File::open`ed it when inheriting from environment,
// so no need to set cloexec for fifo.
return;
}
// Here we basically just want to say that in the child process
// we'll configure the read/write file descriptors to *not* be
// cloexec, so they're inherited across the exec and specified as
// integers through `string_arg` above.
let read = self.read.as_raw_fd();
let write = self.write.as_raw_fd();
unsafe {
cmd.pre_exec(move || {
set_cloexec(read, false)?;
set_cloexec(write, false)?;
Ok(())
});
pub fn configure(self: &Arc<Self>, cmd: &mut Command) {
match self.creation_arg {
ClientCreationArg::Fifo { .. } => {
// We `File::open`ed it when inheriting from environment,
// so no need to set cloexec for fifo.
}
ClientCreationArg::Fds { read, write } => {
// Here we basically just want to say that in the child process
// we'll configure the read/write file descriptors to *not* be
// cloexec, so they're inherited across the exec and specified as
// integers through `string_arg` above.
unsafe {
// Keep a reference to the jobserver alive in the closure so that
// the pipe FDs aren't closed, otherwise `set_cloexec` might end up
// targetting a completely unrelated file descriptor.
let arc = self.clone();
Comment on lines +387 to +390
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is awesome, now it would always work once Client::configure is called

cmd.pre_exec(move || {
let _ = &arc;
set_cloexec(BorrowedFd::borrow_raw(read), false)?;
set_cloexec(BorrowedFd::borrow_raw(write), false)?;
Ok(())
});
}
}
}
}
}
Expand Down Expand Up @@ -515,34 +536,32 @@ unsafe fn fd_check(fd: c_int, check_pipe: bool) -> Result<(), FromEnvErrorInner>
}
}

fn clone_fd_and_set_cloexec(fd: c_int) -> Result<File, FromEnvErrorInner> {
// Safety: fd is a valid fd dand it remains open until returns
unsafe { BorrowedFd::borrow_raw(fd) }
.try_clone_to_owned()
fn clone_fd_and_set_cloexec(fd: BorrowedFd<'_>) -> Result<File, FromEnvErrorInner> {
fd.try_clone_to_owned()
.map(File::from)
.map_err(|err| FromEnvErrorInner::CannotOpenFd(fd, err))
.map_err(|err| FromEnvErrorInner::CannotOpenFd(fd.as_raw_fd(), err))
}

fn set_cloexec(fd: c_int, set: bool) -> io::Result<()> {
fn set_cloexec(fd: BorrowedFd<'_>, set: bool) -> io::Result<()> {
unsafe {
let previous = cvt(libc::fcntl(fd, libc::F_GETFD))?;
let previous = cvt(libc::fcntl(fd.as_raw_fd(), libc::F_GETFD))?;
let new = if set {
previous | libc::FD_CLOEXEC
} else {
previous & !libc::FD_CLOEXEC
};
if new != previous {
cvt(libc::fcntl(fd, libc::F_SETFD, new))?;
cvt(libc::fcntl(fd.as_raw_fd(), libc::F_SETFD, new))?;
}
Ok(())
}
}

fn set_nonblocking(fd: c_int, set: bool) -> io::Result<()> {
fn set_nonblocking(fd: BorrowedFd<'_>, set: bool) -> io::Result<()> {
let status_flag = if set { libc::O_NONBLOCK } else { 0 };

unsafe {
cvt(libc::fcntl(fd, libc::F_SETFL, status_flag))?;
cvt(libc::fcntl(fd.as_raw_fd(), libc::F_SETFL, status_flag))?;
}

Ok(())
Expand Down Expand Up @@ -570,12 +589,7 @@ mod test {

use crate::{test::run_named_fifo_try_acquire_tests, Client};

use std::{
fs::File,
io::{self, Write},
os::unix::io::AsRawFd,
sync::Arc,
};
use std::{fs::File, io::Write, os::unix::io::AsRawFd, sync::Arc};

fn from_imp_client(imp: ClientImp) -> Client {
Client {
Expand Down Expand Up @@ -629,7 +643,7 @@ mod test {
#[cfg(not(target_os = "linux"))]
assert_eq!(
new_client_from_pipe().0.try_acquire().unwrap_err().kind(),
io::ErrorKind::Unsupported
std::io::ErrorKind::Unsupported
);

#[cfg(target_os = "linux")]
Expand Down
2 changes: 1 addition & 1 deletion src/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl Client {
Ok(*lock)
}

pub fn configure(&self, _cmd: &mut Command) {
pub fn configure(self: &Arc<Self>, _cmd: &mut Command) {
unreachable!();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/windows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ impl Client {
}
}

pub fn configure(&self, _cmd: &mut Command) {
pub fn configure(self: &Arc<Self>, _cmd: &mut Command) {
// nothing to do here, we gave the name of our semaphore to the
// child above
}
Expand Down
41 changes: 41 additions & 0 deletions tests/spawn-after-drop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::{env, process::Command};

use jobserver::{Client, FromEnvErrorKind};

fn main() {
match env::args().skip(1).next().unwrap_or_default().as_str() {
"" => {
let me = env::current_exe().unwrap();
let mut cmd = Command::new(me);
let client = Client::new(1).unwrap();
client.configure(&mut cmd);
drop(client);
assert!(cmd.arg("from_env").status().unwrap().success());
}
"from_env" => {
let me = env::current_exe().unwrap();
let mut cmd = Command::new(me);
let client = unsafe {
match Client::from_env_ext(true).client {
// Its ok for a dropped jobservers path to no longer exist (e.g. on Windows).
Err(e) if matches!(e.kind(), FromEnvErrorKind::CannotOpenPath) => return,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO it's kind of a bad behavior that it has different behavior on different platform.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately having the same behaviour would be difficult:

  • On Windows, the child process doesn't have any OS-level reference to the jobserver until it opens the semaphore, so if the parent process Client is dropped before Client::from_env_ext is called then Client::from_env_ext will fail as semaphores are automatically deleted when there are no handles referencing them.
  • On Linux (when using pipes, which is what jobserver-rs currently does when creating it's own jobserver) the opposite is true: as long as the FDs are alive when the process is spawned (which this PR ensures) then Client::from_env_ext will succeed in the child process even if the parent process Client is dropped before the client process calls it.

The only possibility that comes to mind would be if on Windows the parent process lets the child process inherit a copy of the semaphore handle; this would mean that the semaphore would stick around for as long as that process was running. The difficult bit is ensuring only the desired processes inherit the handle and making sure the handle isn't closed before the command is spawned. This would require stdlib support, possibly involving PROC_THREAD_ATTRIBUTE_HANDLE_LIST and rust-lang/rust#114854.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChrisDenton, one more case to start supporting this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am happy to collaborate would @beetrees on this to stop processes from inheriting handles.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

res => res.unwrap(),
}
};
client.configure(&mut cmd);
drop(client);
assert!(cmd.arg("use_it").status().unwrap().success());
}
"use_it" => {
let client = unsafe {
match Client::from_env_ext(true).client {
// See above.
Err(e) if matches!(e.kind(), FromEnvErrorKind::CannotOpenPath) => return,
res => res.unwrap(),
}
};
client.acquire().unwrap();
}
_ => unreachable!(),
}
}