Skip to content

Commit

Permalink
Merge pull request #100 from NobodyXu/fix
Browse files Browse the repository at this point in the history
Fix `Client::configure*` on unix
  • Loading branch information
weihanglo authored Jul 16, 2024
2 parents c0c2898 + 0119e16 commit a9900f3
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 94 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,7 @@ mod test {
client.try_acquire().unwrap().unwrap();
}

#[cfg(not(unix))]
#[cfg(windows)]
#[test]
fn test_try_acquire() {
let client = Client::new(0).unwrap();
Expand Down
224 changes: 131 additions & 93 deletions src/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::io::{self, Read, Write};
use std::mem;
use std::mem::MaybeUninit;
use std::os::unix::prelude::*;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::process::Command;
use std::ptr;
use std::sync::{
Expand All @@ -17,17 +17,27 @@ use std::thread::{self, Builder, JoinHandle};
use std::time::Duration;

#[derive(Debug)]
pub enum Client {
/// `--jobserver-auth=R,W`
Pipe { read: File, write: File },
/// `--jobserver-auth=fifo:PATH`
Fifo {
file: File,
path: PathBuf,
/// it can only go from false -> true but not the other way around, since that
/// could cause a race condition.
is_non_blocking: AtomicBool,
},
/// This preserves the `--jobserver-auth` type at creation time,
/// so auth type will be passed down to and inherit from sub-Make processes correctly.
///
/// See <https://github.com/rust-lang/jobserver-rs/issues/99> for details.
enum ClientCreationArg {
Fds { read: c_int, write: c_int },
Fifo(Box<Path>),
}

#[derive(Debug)]
pub struct Client {
read: File,
write: File,
creation_arg: ClientCreationArg,
/// It is set to `None` if the pipe is shared with other processes, so it
/// cannot support non-blocking mode.
///
/// If it is set to `Some`, then it can only go from
/// `Some(false)` -> `Some(true)` but not the other way around,
/// since that could cause a race condition.
is_non_blocking: Option<AtomicBool>,
}

#[derive(Debug)]
Expand All @@ -43,7 +53,7 @@ impl Client {
// wrong!
const BUFFER: [u8; 128] = [b'|'; 128];

let mut write = client.write();
let mut write = &client.write;

set_nonblocking(write.as_raw_fd(), true)?;

Expand Down Expand Up @@ -111,16 +121,24 @@ impl Client {
FromEnvErrorInner::CannotParse("expected a path after `fifo:`".to_string())
})?;
let path = Path::new(path_str);
let file = OpenOptions::new()
.read(true)
.write(true)
.open(path)
.map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err))?;

Ok(Some(Client::Fifo {
file,
path: path.into(),
is_non_blocking: AtomicBool::new(false),

let open_file = || {
// Opening with read write is necessary, since opening with
// read-only or write-only could block the thread until another
// thread opens it with write-only or read-only (or RDWR)
// correspondingly.
OpenOptions::new()
.read(true)
.write(true)
.open(path)
.map_err(|err| FromEnvErrorInner::CannotOpenPath(path_str.to_string(), err))
};

Ok(Some(Client {
read: open_file()?,
write: open_file()?,
creation_arg: ClientCreationArg::Fifo(path.into()),
is_non_blocking: Some(AtomicBool::new(false)),
}))
}

Expand Down Expand Up @@ -148,6 +166,8 @@ impl Client {
return Err(FromEnvErrorInner::NegativeFd(write));
}

let creation_arg = ClientCreationArg::Fds { read, write };

// Ok so we've got two integers that look like file descriptors, but
// for extra sanity checking let's see if they actually look like
// valid files and instances of a pipe if feature enabled before we
Expand All @@ -174,40 +194,36 @@ impl Client {
//
// I tested this on macOS 14 and Linux 6.5.13
#[cfg(target_os = "linux")]
if let Ok(Some(jobserver)) =
Self::from_fifo(&format!("fifo:/dev/fd/{}", read.as_raw_fd()))
{
return Ok(Some(jobserver));
if let (Ok(read), Ok(write)) = (
File::open(format!("/dev/fd/{}", read)),
OpenOptions::new()
.write(true)
.open(format!("/dev/fd/{}", write)),
) {
return Ok(Some(Client {
read,
write,
creation_arg,
is_non_blocking: Some(AtomicBool::new(false)),
}));
}
}
}

Ok(Some(Client::Pipe {
Ok(Some(Client {
read: clone_fd_and_set_cloexec(read)?,
write: clone_fd_and_set_cloexec(write)?,
creation_arg,
is_non_blocking: None,
}))
}

unsafe fn from_fds(read: c_int, write: c_int) -> Client {
Client::Pipe {
Client {
read: File::from_raw_fd(read),
write: File::from_raw_fd(write),
}
}

/// Gets the read end of our jobserver client.
fn read(&self) -> &File {
match self {
Client::Pipe { read, .. } => read,
Client::Fifo { file, .. } => file,
}
}

/// Gets the write end of our jobserver client.
fn write(&self) -> &File {
match self {
Client::Pipe { write, .. } => write,
Client::Fifo { file, .. } => file,
creation_arg: ClientCreationArg::Fds { read, write },
is_non_blocking: None,
}
}

Expand Down Expand Up @@ -245,7 +261,7 @@ impl Client {
// to shut us down, so we otherwise punt all errors upwards.
unsafe {
let mut fd: libc::pollfd = mem::zeroed();
let mut read = self.read();
let mut read = &self.read;
fd.fd = read.as_raw_fd();
fd.events = libc::POLLIN;
loop {
Expand Down Expand Up @@ -284,19 +300,15 @@ impl Client {

pub fn try_acquire(&self) -> io::Result<Option<Acquired>> {
let mut buf = [0];
let mut fifo = &self.read;

let (mut fifo, is_non_blocking) = match self {
Self::Fifo {
file,
is_non_blocking,
..
} => (file, is_non_blocking),
_ => return Err(io::ErrorKind::Unsupported.into()),
};

if !is_non_blocking.load(Ordering::Relaxed) {
set_nonblocking(fifo.as_raw_fd(), true)?;
is_non_blocking.store(true, Ordering::Relaxed);
if let Some(is_non_blocking) = self.is_non_blocking.as_ref() {
if !is_non_blocking.load(Ordering::Relaxed) {
set_nonblocking(fifo.as_raw_fd(), true)?;
is_non_blocking.store(true, Ordering::Relaxed);
}
} else {
return Err(io::ErrorKind::Unsupported.into());
}

loop {
Expand All @@ -323,7 +335,7 @@ impl Client {
// always quickly release a token). If that turns out to not be the
// case we'll get an error anyway!
let byte = data.map(|d| d.byte).unwrap_or(b'+');
match self.write().write(&[byte])? {
match (&self.write).write(&[byte])? {
1 => Ok(()),
_ => Err(io::Error::new(
io::ErrorKind::Other,
Expand All @@ -333,31 +345,30 @@ impl Client {
}

pub fn string_arg(&self) -> String {
match self {
Client::Pipe { read, write } => format!("{},{}", read.as_raw_fd(), write.as_raw_fd()),
Client::Fifo { path, .. } => format!("fifo:{}", path.to_str().unwrap()),
match &self.creation_arg {
ClientCreationArg::Fifo(path) => format!("fifo:{}", path.display()),
ClientCreationArg::Fds { read, write } => format!("{},{}", read, write),
}
}

pub fn available(&self) -> io::Result<usize> {
let mut len = MaybeUninit::<c_int>::uninit();
cvt(unsafe { libc::ioctl(self.read().as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
cvt(unsafe { libc::ioctl(self.read.as_raw_fd(), libc::FIONREAD, len.as_mut_ptr()) })?;
Ok(unsafe { len.assume_init() } as usize)
}

pub fn configure(&self, cmd: &mut Command) {
match self {
if matches!(self.creation_arg, ClientCreationArg::Fifo { .. }) {
// We `File::open`ed it when inheriting from environment,
// so no need to set cloexec for fifo.
Client::Fifo { .. } => return,
Client::Pipe { .. } => {}
};
return;
}
// Here we basically just want to say that in the child process
// we'll configure the read/write file descriptors to *not* be
// cloexec, so they're inherited across the exec and specified as
// integers through `string_arg` above.
let read = self.read().as_raw_fd();
let write = self.write().as_raw_fd();
let read = self.read.as_raw_fd();
let write = self.write.as_raw_fd();
unsafe {
cmd.pre_exec(move || {
set_cloexec(read, false)?;
Expand Down Expand Up @@ -559,55 +570,82 @@ mod test {

use crate::{test::run_named_fifo_try_acquire_tests, Client};

use std::sync::Arc;
use std::{
fs::File,
io::{self, Write},
os::unix::io::AsRawFd,
sync::Arc,
};

fn from_imp_client(imp: ClientImp) -> Client {
Client {
inner: Arc::new(imp),
}
}

#[test]
fn test_try_acquire_named_fifo() {
fn new_client_from_fifo() -> (Client, String) {
let file = tempfile::NamedTempFile::new().unwrap();
let fifo_path = file.path().to_owned();
file.close().unwrap(); // Remove the NamedTempFile to create fifo

nix::unistd::mkfifo(&fifo_path, nix::sys::stat::Mode::S_IRWXU).unwrap();

let client = ClientImp::from_fifo(&format!("fifo:{}", fifo_path.to_str().unwrap()))
.unwrap()
.map(from_imp_client)
.unwrap();
let arg = format!("fifo:{}", fifo_path.to_str().unwrap());

run_named_fifo_try_acquire_tests(&client);
(
ClientImp::from_fifo(&arg)
.unwrap()
.map(from_imp_client)
.unwrap(),
arg,
)
}

#[cfg(not(target_os = "linux"))]
#[test]
fn test_try_acquire_annoymous_pipe_linux_specific_optimization() {
use std::{
fs::File,
io::{self, Write},
os::unix::io::AsRawFd,
};

fn new_client_from_pipe() -> (Client, String) {
let (read, write) = nix::unistd::pipe().unwrap();
let read = File::from(read);
let mut write = File::from(write);

write.write_all(b"1").unwrap();

let client = unsafe {
ClientImp::from_pipe(&format!("{},{}", read.as_raw_fd(), write.as_raw_fd()), true)
}
.unwrap()
.map(from_imp_client)
.unwrap();
let arg = format!("{},{}", read.as_raw_fd(), write.as_raw_fd());

(
unsafe { ClientImp::from_pipe(&arg, true) }
.unwrap()
.map(from_imp_client)
.unwrap(),
arg,
)
}

#[test]
fn test_try_acquire_named_fifo() {
run_named_fifo_try_acquire_tests(&new_client_from_fifo().0);
}

#[test]
fn test_try_acquire_annoymous_pipe_linux_specific_optimization() {
#[cfg(not(target_os = "linux"))]
assert_eq!(
client.try_acquire().unwrap_err().kind(),
new_client_from_pipe().0.try_acquire().unwrap_err().kind(),
io::ErrorKind::Unsupported
);

#[cfg(target_os = "linux")]
{
let client = new_client_from_pipe().0;
client.acquire().unwrap().drop_without_releasing();
run_named_fifo_try_acquire_tests(&client);
}
}

#[test]
fn test_string_arg() {
let (client, arg) = new_client_from_fifo();
assert_eq!(client.inner.string_arg(), arg);

let (client, arg) = new_client_from_pipe();
assert_eq!(client.inner.string_arg(), arg);
}
}

0 comments on commit a9900f3

Please sign in to comment.