Skip to content

Commit

Permalink
Enable sys::unix::pipe when using pipe based Waker
Browse files Browse the repository at this point in the history
This also adds a new function sys::unix::pipe::new_raw that creates a
pipe, returning the raw fds. This is used by the Waker implementation
and of course sys::unix::pipe::new (which is exposed as
mio::unix::pipe::new).
  • Loading branch information
Thomasdezeeuw committed Aug 21, 2023
1 parent e7a0685 commit 9f21ce1
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 75 deletions.
16 changes: 13 additions & 3 deletions src/sys/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,19 @@ cfg_os_poll! {
pub(crate) use self::selector::IoSourceState;
}

cfg_os_ext! {
pub(crate) mod pipe;
}
#[cfg(any(
// For the public `pipe` module, must match `cfg_os_ext` macro.
feature = "os-ext",
// For the `Waker` type based on a pipe.
mio_unsupported_force_waker_pipe,
target_os = "aix",
target_os = "dragonfly",
target_os = "illumos",
target_os = "netbsd",
target_os = "openbsd",
target_os = "redox",
))]
pub(crate) mod pipe;
}

cfg_not_os_poll! {
Expand Down
145 changes: 77 additions & 68 deletions src/sys/unix/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,81 @@
//!
//! See the [`new`] function for documentation.
use std::io;
use std::os::unix::io::RawFd;

pub(crate) fn new_raw() -> io::Result<[RawFd; 2]> {
let mut fds: [RawFd; 2] = [-1, -1];

#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
target_os = "illumos",
target_os = "redox",
))]
unsafe {
if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
return Err(io::Error::last_os_error());
}
}

#[cfg(any(
target_os = "aix",
target_os = "ios",
target_os = "macos",
target_os = "tvos",
target_os = "watchos",
target_os = "espidf",
))]
unsafe {
// For platforms that don't have `pipe2(2)` we need to manually set the
// correct flags on the file descriptor.
if libc::pipe(fds.as_mut_ptr()) != 0 {
return Err(io::Error::last_os_error());
}

for fd in &fds {
if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
|| libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
{
let err = io::Error::last_os_error();
// Don't leak file descriptors. Can't handle closing error though.
let _ = libc::close(fds[0]);
let _ = libc::close(fds[1]);
return Err(err);
}
}
}

#[cfg(not(any(
target_os = "aix",
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "illumos",
target_os = "ios",
target_os = "linux",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd",
target_os = "redox",
target_os = "tvos",
target_os = "watchos",
target_os = "espidf",
)))]
compile_error!("unsupported target for `mio::unix::pipe`");

Ok(fds)
}

cfg_os_ext! {
use std::fs::File;
use std::io::{self, IoSlice, IoSliceMut, Read, Write};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
use std::io::{IoSlice, IoSliceMut, Read, Write};
use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd};
use std::process::{ChildStderr, ChildStdin, ChildStdout};

use crate::io_source::IoSource;
Expand Down Expand Up @@ -145,74 +217,10 @@ use crate::{event, Interest, Registry, Token};
/// # }
/// ```
pub fn new() -> io::Result<(Sender, Receiver)> {
let mut fds: [RawFd; 2] = [-1, -1];

#[cfg(any(
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd",
target_os = "illumos",
target_os = "redox",
))]
unsafe {
if libc::pipe2(fds.as_mut_ptr(), libc::O_CLOEXEC | libc::O_NONBLOCK) != 0 {
return Err(io::Error::last_os_error());
}
}

#[cfg(any(
target_os = "aix",
target_os = "ios",
target_os = "macos",
target_os = "tvos",
target_os = "watchos",
target_os = "espidf",
))]
unsafe {
// For platforms that don't have `pipe2(2)` we need to manually set the
// correct flags on the file descriptor.
if libc::pipe(fds.as_mut_ptr()) != 0 {
return Err(io::Error::last_os_error());
}

for fd in &fds {
if libc::fcntl(*fd, libc::F_SETFL, libc::O_NONBLOCK) != 0
|| libc::fcntl(*fd, libc::F_SETFD, libc::FD_CLOEXEC) != 0
{
let err = io::Error::last_os_error();
// Don't leak file descriptors. Can't handle closing error though.
let _ = libc::close(fds[0]);
let _ = libc::close(fds[1]);
return Err(err);
}
}
}

#[cfg(not(any(
target_os = "aix",
target_os = "android",
target_os = "dragonfly",
target_os = "freebsd",
target_os = "illumos",
target_os = "ios",
target_os = "linux",
target_os = "macos",
target_os = "netbsd",
target_os = "openbsd",
target_os = "redox",
target_os = "tvos",
target_os = "watchos",
target_os = "espidf",
)))]
compile_error!("unsupported target for `mio::unix::pipe`");

// SAFETY: we just initialised the `fds` above.
let fds = new_raw()?;
// SAFETY: `new_raw` initialised the `fds` above.
let r = unsafe { Receiver::from_raw_fd(fds[0]) };
let w = unsafe { Sender::from_raw_fd(fds[1]) };

Ok((w, r))
}

Expand Down Expand Up @@ -579,3 +587,4 @@ fn set_nonblocking(fd: RawFd, nonblocking: bool) -> io::Result<()> {

Ok(())
}
} // `cfg_os_ext!`.
8 changes: 4 additions & 4 deletions src/sys/unix/waker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ pub use self::kqueue::Waker;
target_os = "redox",
))]
mod pipe {
use crate::unix::pipe::new as new_pipe;
use crate::sys::unix::pipe;
use std::fs::File;
use std::io::{self, Read, Write};
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
Expand All @@ -224,9 +224,9 @@ mod pipe {

impl WakerInternal {
pub fn new() -> io::Result<WakerInternal> {
let (sender, receiver) = new_pipe()?;
let sender = unsafe { File::from_raw_fd(sender.as_raw_fd()) };
let receiver = unsafe { File::from_raw_fd(receiver.as_raw_fd()) };
let [sender, receiver] = pipe::new_raw()?;
let sender = unsafe { File::from_raw_fd(sender) };
let receiver = unsafe { File::from_raw_fd(receiver) };
Ok(WakerInternal { sender, receiver })
}

Expand Down

0 comments on commit 9f21ce1

Please sign in to comment.