From 6d66369d6baff7884fb74f61f4425060af56aa8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Wed, 11 Oct 2023 17:52:33 +0800 Subject: [PATCH 1/8] Add unix pipe. --- compio-fs/Cargo.toml | 12 +- compio-fs/src/lib.rs | 3 + compio-fs/src/pipe.rs | 459 ++++++++++++++++++++++++++++++++++ compio/Cargo.toml | 4 + compio/examples/named_pipe.rs | 32 +++ 5 files changed, 506 insertions(+), 4 deletions(-) create mode 100644 compio-fs/src/pipe.rs diff --git a/compio-fs/Cargo.toml b/compio-fs/Cargo.toml index 2cf75b9c..ad6576fe 100644 --- a/compio-fs/Cargo.toml +++ b/compio-fs/Cargo.toml @@ -30,10 +30,6 @@ windows-sys = { version = "0.48", features = [ "Win32_System_SystemServices", ] } -# Windows specific dev dependencies -[target.'cfg(target_os = "windows")'.dev-dependencies] -windows-sys = { version = "0.48", features = ["Win32_Security_Authorization"] } - # Unix specific dependencies [target.'cfg(unix)'.dependencies] libc = "0.2" @@ -43,6 +39,14 @@ libc = "0.2" compio-runtime = { workspace = true, features = ["time"] } futures-util = "0.3" +# Windows specific dev dependencies +[target.'cfg(target_os = "windows")'.dev-dependencies] +windows-sys = { version = "0.48", features = ["Win32_Security_Authorization"] } + +# Unix specific dev dependencies +[target.'cfg(unix)'.dev-dependencies] +nix = { version = "0.27", features = ["fs"] } + [features] runtime = ["dep:compio-buf", "dep:compio-runtime"] diff --git a/compio-fs/src/lib.rs b/compio-fs/src/lib.rs index e956d5de..f49b350a 100644 --- a/compio-fs/src/lib.rs +++ b/compio-fs/src/lib.rs @@ -11,3 +11,6 @@ pub use open_options::*; #[cfg(target_os = "windows")] pub mod named_pipe; + +#[cfg(unix)] +pub mod pipe; diff --git a/compio-fs/src/pipe.rs b/compio-fs/src/pipe.rs new file mode 100644 index 00000000..1df57d2a --- /dev/null +++ b/compio-fs/src/pipe.rs @@ -0,0 +1,459 @@ +//! Unix pipe types. + +use std::{io, os::unix::fs::FileTypeExt, path::Path}; + +use compio_driver::{impl_raw_fd, syscall, AsRawFd, FromRawFd}; +#[cfg(feature = "runtime")] +use { + compio_buf::{BufResult, IoBuf, IoBufMut}, + compio_runtime::impl_attachable, +}; + +use crate::File; + +/// Creates a pair of anonymous pipe. +/// +/// ``` +/// use compio_fs::pipe::pipe; +/// +/// # compio_runtime::block_on(async { +/// let (rx, tx) = pipe().unwrap(); +/// +/// tx.write_all("Hello world!").await.unwrap(); +/// let (_, buf) = rx.read_exact(Vec::with_capacity(12)).await.unwrap(); +/// assert_eq!(&buf, b"Hello world!"); +/// # }); +/// ``` +pub fn pipe() -> io::Result<(Receiver, Sender)> { + let mut fds = [-1, -1]; + syscall!(pipe(fds.as_mut_ptr()))?; + let receiver = unsafe { Receiver::from_raw_fd(fds[0]) }; + let sender = unsafe { Sender::from_raw_fd(fds[1]) }; + Ok((receiver, sender)) +} + +/// Options and flags which can be used to configure how a FIFO file is opened. +/// +/// This builder allows configuring how to create a pipe end from a FIFO file. +/// Generally speaking, when using `OpenOptions`, you'll first call [`new`], +/// then chain calls to methods to set each option, then call either +/// [`open_receiver`] or [`open_sender`], passing the path of the FIFO file you +/// are trying to open. This will give you a [`io::Result`] with a pipe end +/// inside that you can further operate on. +/// +/// [`new`]: OpenOptions::new +/// [`open_receiver`]: OpenOptions::open_receiver +/// [`open_sender`]: OpenOptions::open_sender +/// +/// # Examples +/// +/// Opening a pair of pipe ends from a FIFO file: +/// +/// ```no_run +/// use compio_fs::pipe; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> std::io::Result<()> { +/// let rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; +/// let tx = pipe::OpenOptions::new().open_sender(FIFO_NAME)?; +/// # Ok(()) +/// # } +/// ``` +/// +/// Opening a [`Sender`] on Linux when you are sure the file is a FIFO: +/// +/// ``` +/// use compio_fs::pipe; +/// use nix::{sys::stat::Mode, unistd::mkfifo}; +/// +/// // Our program has exclusive access to this path. +/// const FIFO_NAME: &str = "path/to/a/new/fifo"; +/// +/// # async fn dox() -> std::io::Result<()> { +/// mkfifo(FIFO_NAME, Mode::S_IRWXU)?; +/// let tx = pipe::OpenOptions::new() +/// .read_write(true) +/// .unchecked(true) +/// .open_sender(FIFO_NAME)?; +/// # Ok(()) +/// # } +/// ``` +#[derive(Clone, Debug)] +pub struct OpenOptions { + #[cfg(target_os = "linux")] + read_write: bool, + unchecked: bool, +} + +impl OpenOptions { + /// Creates a blank new set of options ready for configuration. + /// + /// All options are initially set to `false`. + pub fn new() -> OpenOptions { + OpenOptions { + #[cfg(target_os = "linux")] + read_write: false, + unchecked: false, + } + } + + /// Sets the option for read-write access. + /// + /// This option, when true, will indicate that a FIFO file will be opened + /// in read-write access mode. This operation is not defined by the POSIX + /// standard and is only guaranteed to work on Linux. + /// + /// # Examples + /// + /// Opening a [`Sender`] even if there are no open reading ends: + /// + /// ``` + /// use compio_fs::pipe; + /// + /// let tx = pipe::OpenOptions::new() + /// .read_write(true) + /// .open_sender("path/to/a/fifo"); + /// ``` + /// + /// Opening a resilient [`Receiver`] i.e. a reading pipe end which will not + /// fail with [`UnexpectedEof`] during reading if all writing ends of the + /// pipe close the FIFO file. + /// + /// [`UnexpectedEof`]: std::io::ErrorKind::UnexpectedEof + /// + /// ``` + /// use compio_fs::pipe; + /// + /// let tx = pipe::OpenOptions::new() + /// .read_write(true) + /// .open_receiver("path/to/a/fifo"); + /// ``` + #[cfg(target_os = "linux")] + #[cfg_attr(docsrs, doc(cfg(target_os = "linux")))] + pub fn read_write(&mut self, value: bool) -> &mut Self { + self.read_write = value; + self + } + + /// Sets the option to skip the check for FIFO file type. + /// + /// By default, [`open_receiver`] and [`open_sender`] functions will check + /// if the opened file is a FIFO file. Set this option to `true` if you are + /// sure the file is a FIFO file. + /// + /// [`open_receiver`]: OpenOptions::open_receiver + /// [`open_sender`]: OpenOptions::open_sender + /// + /// # Examples + /// + /// ```no_run + /// use compio_fs::pipe; + /// use nix::{sys::stat::Mode, unistd::mkfifo}; + /// + /// // Our program has exclusive access to this path. + /// const FIFO_NAME: &str = "path/to/a/new/fifo"; + /// + /// # async fn dox() -> std::io::Result<()> { + /// mkfifo(FIFO_NAME, Mode::S_IRWXU)?; + /// let rx = pipe::OpenOptions::new() + /// .unchecked(true) + /// .open_receiver(FIFO_NAME)?; + /// # Ok(()) + /// # } + /// ``` + pub fn unchecked(&mut self, value: bool) -> &mut Self { + self.unchecked = value; + self + } + + /// Creates a [`Receiver`] from a FIFO file with the options specified by + /// `self`. + /// + /// This function will open the FIFO file at the specified path, possibly + /// check if it is a pipe, and associate the pipe with the default event + /// loop for reading. + /// + /// # Errors + /// + /// If the file type check fails, this function will fail with + /// `io::ErrorKind::InvalidInput`. This function may also fail with + /// other standard OS errors. + pub fn open_receiver>(&self, path: P) -> io::Result { + let file = self.open(path.as_ref(), PipeEnd::Receiver)?; + Receiver::from_file(file) + } + + /// Creates a [`Sender`] from a FIFO file with the options specified by + /// `self`. + /// + /// This function will open the FIFO file at the specified path, possibly + /// check if it is a pipe, and associate the pipe with the default event + /// loop for writing. + /// + /// # Errors + /// + /// If the file type check fails, this function will fail with + /// `io::ErrorKind::InvalidInput`. If the file is not opened in + /// read-write access mode and the file is not currently open for + /// reading, this function will fail with `ENXIO`. This function may + /// also fail with other standard OS errors. + pub fn open_sender>(&self, path: P) -> io::Result { + let file = self.open(path.as_ref(), PipeEnd::Sender)?; + Sender::from_file(file) + } + + fn open(&self, path: &Path, pipe_end: PipeEnd) -> io::Result { + let options = crate::OpenOptions::new() + .read(pipe_end == PipeEnd::Receiver) + .write(pipe_end == PipeEnd::Sender); + + #[cfg(target_os = "linux")] + let options = if self.read_write { + options.read(true).write(true) + } else { + options + }; + + let file = options.open(path)?; + + if !self.unchecked && !is_fifo(&file)? { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "not a pipe")); + } + + Ok(file) + } +} + +impl Default for OpenOptions { + fn default() -> OpenOptions { + OpenOptions::new() + } +} + +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +enum PipeEnd { + Sender, + Receiver, +} + +/// Writing end of a Unix pipe. +/// +/// It can be constructed from a FIFO file with [`OpenOptions::open_sender`]. +/// +/// Opening a named pipe for writing involves a few steps. +/// Call to [`OpenOptions::open_sender`] might fail with an error indicating +/// different things: +/// +/// * [`io::ErrorKind::NotFound`] - There is no file at the specified path. +/// * [`io::ErrorKind::InvalidInput`] - The file exists, but it is not a FIFO. +/// * [`ENXIO`] - The file is a FIFO, but no process has it open for reading. +/// Sleep for a while and try again. +/// * Other OS errors not specific to opening FIFO files. +/// +/// Opening a `Sender` from a FIFO file should look like this: +/// +/// ```no_run +/// use std::time::Duration; +/// +/// use compio_fs::pipe; +/// use compio_runtime::time; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> std::io::Result<()> { +/// // Wait for a reader to open the file. +/// let tx = loop { +/// match pipe::OpenOptions::new().open_sender(FIFO_NAME) { +/// Ok(tx) => break tx, +/// Err(e) if e.raw_os_error() == Some(libc::ENXIO) => {} +/// Err(e) => return Err(e.into()), +/// } +/// +/// time::sleep(Duration::from_millis(50)).await; +/// }; +/// # Ok(()) +/// # } +/// ``` +/// +/// On Linux, it is possible to create a `Sender` without waiting in a sleeping +/// loop. This is done by opening a named pipe in read-write access mode with +/// `OpenOptions::read_write`. This way, a `Sender` can at the same time hold +/// both a writing end and a reading end, and the latter allows to open a FIFO +/// without [`ENXIO`] error since the pipe is open for reading as well. +/// +/// `Sender` cannot be used to read from a pipe, so in practice the read access +/// is only used when a FIFO is opened. However, using a `Sender` in read-write +/// mode **may lead to lost data**, because written data will be dropped by the +/// system as soon as all pipe ends are closed. To avoid lost data you have to +/// make sure that a reading end has been opened before dropping a `Sender`. +/// +/// Note that using read-write access mode with FIFO files is not defined by +/// the POSIX standard and it is only guaranteed to work on Linux. +/// +/// ``` +/// use compio_fs::pipe; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() { +/// let mut tx = pipe::OpenOptions::new() +/// .read_write(true) +/// .open_sender(FIFO_NAME) +/// .unwrap(); +/// +/// // Asynchronously write to the pipe before a reader. +/// tx.write_all("hello world").await.unwrap(); +/// # } +/// ``` +/// +/// [`ENXIO`]: https://docs.rs/libc/latest/libc/constant.ENXIO.html +#[derive(Debug)] +pub struct Sender { + file: File, +} + +impl Sender { + pub(crate) fn from_file(file: File) -> io::Result { + if cfg!(not(all(target_os = "linux", feature = "io-uring"))) { + set_nonblocking(&file)?; + } + Ok(Sender { file }) + } + + /// Write a buffer into the pipe, returning how many bytes were written. + #[cfg(feature = "runtime")] + pub async fn write(&self, buffer: T) -> BufResult { + self.file.write_at(buffer, usize::MAX).await + } + + /// Write all bytes into the pipe. + #[cfg(feature = "runtime")] + pub async fn write_all(&self, buffer: T) -> BufResult { + self.file.write_all_at(buffer, usize::MAX).await + } +} + +impl_raw_fd!(Sender, file); + +#[cfg(feature = "runtime")] +impl_attachable!(Sender, file); + +/// Reading end of a Unix pipe. +/// +/// It can be constructed from a FIFO file with [`OpenOptions::open_receiver`]. +/// +/// # Examples +/// +/// Receiving messages from a named pipe in a loop: +/// +/// ```no_run +/// use std::io; +/// +/// use compio_buf::BufResult; +/// use compio_fs::pipe; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() -> io::Result<()> { +/// let mut rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; +/// loop { +/// let mut msg = Vec::with_capacity(256); +/// let BufResult(res, msg) = rx.read_exact(msg).await; +/// match res { +/// Ok(_) => { /* handle the message */ } +/// Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { +/// // Writing end has been closed, we should reopen the pipe. +/// rx = pipe::OpenOptions::new().open_receiver(FIFO_NAME)?; +/// } +/// Err(e) => return Err(e.into()), +/// } +/// } +/// # } +/// ``` +/// +/// On Linux, you can use a `Receiver` in read-write access mode to implement +/// resilient reading from a named pipe. Unlike `Receiver` opened in read-only +/// mode, read from a pipe in read-write mode will not fail with `UnexpectedEof` +/// when the writing end is closed. This way, a `Receiver` can asynchronously +/// wait for the next writer to open the pipe. +/// +/// You should not use functions waiting for EOF such as [`read_to_end`] with +/// a `Receiver` in read-write access mode, since it **may wait forever**. +/// `Receiver` in this mode also holds an open writing end, which prevents +/// receiving EOF. +/// +/// To set the read-write access mode you can use `OpenOptions::read_write`. +/// Note that using read-write access mode with FIFO files is not defined by +/// the POSIX standard and it is only guaranteed to work on Linux. +/// +/// ``` +/// use compio_fs::pipe; +/// +/// const FIFO_NAME: &str = "path/to/a/fifo"; +/// +/// # async fn dox() { +/// let mut rx = pipe::OpenOptions::new() +/// .read_write(true) +/// .open_receiver(FIFO_NAME) +/// .unwrap(); +/// loop { +/// let mut msg = Vec::with_capacity(256); +/// rx.read_exact(msg).await.unwrap(); +/// // handle the message +/// } +/// # } +/// ``` +/// +/// [`read_to_end`]: crate::io::AsyncReadExt::read_to_end +#[derive(Debug)] +pub struct Receiver { + file: File, +} + +impl Receiver { + pub(crate) fn from_file(file: File) -> io::Result { + if cfg!(not(all(target_os = "linux", feature = "io-uring"))) { + set_nonblocking(&file)?; + } + Ok(Receiver { file }) + } + + /// Read some bytes from the pipe into the specified + /// buffer, returning how many bytes were read. + #[cfg(feature = "runtime")] + pub async fn read(&self, buffer: T) -> BufResult { + self.file.read_at(buffer, usize::MAX).await + } + + /// Read the exact number of bytes from the pipe. + #[cfg(feature = "runtime")] + pub async fn read_exact(&self, buffer: T) -> BufResult { + self.file.read_exact_at(buffer, usize::MAX).await + } +} + +impl_raw_fd!(Receiver, file); + +#[cfg(feature = "runtime")] +impl_attachable!(Receiver, file); + +/// Checks if file is a FIFO +fn is_fifo(file: &File) -> io::Result { + Ok(file.metadata()?.file_type().is_fifo()) +} + +/// Sets file's flags with O_NONBLOCK by fcntl. +fn set_nonblocking(file: &File) -> io::Result<()> { + let fd = file.as_raw_fd(); + + let current_flags = syscall!(fcntl(fd, libc::F_GETFL))?; + + let flags = current_flags | libc::O_NONBLOCK; + + if flags != current_flags { + syscall!(fcntl(fd, libc::F_SETFL, flags))?; + } + + Ok(()) +} diff --git a/compio/Cargo.toml b/compio/Cargo.toml index efacb84a..252a5f66 100644 --- a/compio/Cargo.toml +++ b/compio/Cargo.toml @@ -48,6 +48,10 @@ compio-buf = { workspace = true, features = ["arrayvec"] } tempfile = "3" tokio = { version = "1", features = ["fs", "io-util", "macros", "net", "rt"] } +# Unix specific dev dependencies +[target.'cfg(unix)'.dev-dependencies] +nix = { version = "0.27", features = ["fs"] } + [features] default = ["runtime", "io-uring"] io-uring = ["compio-driver/io-uring"] diff --git a/compio/examples/named_pipe.rs b/compio/examples/named_pipe.rs index f433f072..afefc928 100644 --- a/compio/examples/named_pipe.rs +++ b/compio/examples/named_pipe.rs @@ -21,6 +21,38 @@ async fn main() { let buffer = Vec::with_capacity(12); let read = client.read_exact(buffer); + let (BufResult(write, _), BufResult(read, buffer)) = futures_util::join!(write, read); + write.unwrap(); + read.unwrap(); + println!("{}", String::from_utf8(buffer).unwrap()); + } + #[cfg(unix)] + { + use compio::{buf::IntoInner, fs::pipe::OpenOptions, runtime::Unattached, BufResult}; + use nix::{sys::stat::Mode, unistd::mkfifo}; + use tempfile::tempdir; + + let dir = tempdir().unwrap(); + let file = dir.path().join("compio-named-pipe"); + + mkfifo(&file, Mode::S_IRWXU).unwrap(); + + let (rx, tx) = std::thread::scope(|s| { + let rx = s.spawn(|| { + Unattached::new(OpenOptions::new().open_receiver(&file).unwrap()).unwrap() + }); + let tx = s + .spawn(|| Unattached::new(OpenOptions::new().open_sender(&file).unwrap()).unwrap()); + ( + rx.join().unwrap().into_inner(), + tx.join().unwrap().into_inner(), + ) + }); + + let write = tx.write_all("Hello world!"); + let buffer = Vec::with_capacity(12); + let read = rx.read_exact(buffer); + let (BufResult(write, _), BufResult(read, buffer)) = futures_util::join!(write, read); write.unwrap(); read.unwrap(); From e541be288ea54619d53db14309f4087fbf4f1549 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Wed, 11 Oct 2023 20:29:26 +0800 Subject: [PATCH 2/8] Use os_pipe to add CLOEXEC flag. --- compio-fs/Cargo.toml | 1 + compio-fs/src/pipe.rs | 41 ++++++++++++----------------- compio-runtime/Cargo.toml | 1 + compio-runtime/src/event/eventfd.rs | 2 +- compio-runtime/src/event/pipe.rs | 11 +++----- 5 files changed, 24 insertions(+), 32 deletions(-) diff --git a/compio-fs/Cargo.toml b/compio-fs/Cargo.toml index ad6576fe..609307de 100644 --- a/compio-fs/Cargo.toml +++ b/compio-fs/Cargo.toml @@ -33,6 +33,7 @@ windows-sys = { version = "0.48", features = [ # Unix specific dependencies [target.'cfg(unix)'.dependencies] libc = "0.2" +os_pipe = "1" # Shared dev dependencies for all platforms [dev-dependencies] diff --git a/compio-fs/src/pipe.rs b/compio-fs/src/pipe.rs index 1df57d2a..4ead0767 100644 --- a/compio-fs/src/pipe.rs +++ b/compio-fs/src/pipe.rs @@ -2,7 +2,7 @@ use std::{io, os::unix::fs::FileTypeExt, path::Path}; -use compio_driver::{impl_raw_fd, syscall, AsRawFd, FromRawFd}; +use compio_driver::{impl_raw_fd, syscall, AsRawFd, FromRawFd, IntoRawFd}; #[cfg(feature = "runtime")] use { compio_buf::{BufResult, IoBuf, IoBufMut}, @@ -14,21 +14,20 @@ use crate::File; /// Creates a pair of anonymous pipe. /// /// ``` -/// use compio_fs::pipe::pipe; +/// use compio_fs::pipe::anon_pipe; /// /// # compio_runtime::block_on(async { -/// let (rx, tx) = pipe().unwrap(); +/// let (rx, tx) = anon_pipe().unwrap(); /// /// tx.write_all("Hello world!").await.unwrap(); /// let (_, buf) = rx.read_exact(Vec::with_capacity(12)).await.unwrap(); /// assert_eq!(&buf, b"Hello world!"); /// # }); /// ``` -pub fn pipe() -> io::Result<(Receiver, Sender)> { - let mut fds = [-1, -1]; - syscall!(pipe(fds.as_mut_ptr()))?; - let receiver = unsafe { Receiver::from_raw_fd(fds[0]) }; - let sender = unsafe { Sender::from_raw_fd(fds[1]) }; +pub fn anon_pipe() -> io::Result<(Receiver, Sender)> { + let (receiver, sender) = os_pipe::pipe()?; + let receiver = Receiver::from_file(unsafe { File::from_raw_fd(receiver.into_raw_fd()) })?; + let sender = Sender::from_file(unsafe { File::from_raw_fd(sender.into_raw_fd()) })?; Ok((receiver, sender)) } @@ -315,9 +314,7 @@ pub struct Sender { impl Sender { pub(crate) fn from_file(file: File) -> io::Result { - if cfg!(not(all(target_os = "linux", feature = "io-uring"))) { - set_nonblocking(&file)?; - } + set_nonblocking(&file)?; Ok(Sender { file }) } @@ -413,9 +410,7 @@ pub struct Receiver { impl Receiver { pub(crate) fn from_file(file: File) -> io::Result { - if cfg!(not(all(target_os = "linux", feature = "io-uring"))) { - set_nonblocking(&file)?; - } + set_nonblocking(&file)?; Ok(Receiver { file }) } @@ -444,16 +439,14 @@ fn is_fifo(file: &File) -> io::Result { } /// Sets file's flags with O_NONBLOCK by fcntl. -fn set_nonblocking(file: &File) -> io::Result<()> { - let fd = file.as_raw_fd(); - - let current_flags = syscall!(fcntl(fd, libc::F_GETFL))?; - - let flags = current_flags | libc::O_NONBLOCK; - - if flags != current_flags { - syscall!(fcntl(fd, libc::F_SETFL, flags))?; +fn set_nonblocking(file: &impl AsRawFd) -> io::Result<()> { + if cfg!(not(all(target_os = "linux", feature = "io-uring"))) { + let fd = file.as_raw_fd(); + let current_flags = syscall!(fcntl(fd, libc::F_GETFL))?; + let flags = current_flags | libc::O_NONBLOCK; + if flags != current_flags { + syscall!(fcntl(fd, libc::F_SETFL, flags))?; + } } - Ok(()) } diff --git a/compio-runtime/Cargo.toml b/compio-runtime/Cargo.toml index 1e1b77d8..9f371057 100644 --- a/compio-runtime/Cargo.toml +++ b/compio-runtime/Cargo.toml @@ -44,6 +44,7 @@ windows-sys = { version = "0.48", features = ["Win32_System_IO"] } # Unix specific dependencies [target.'cfg(unix)'.dependencies] +os_pipe = "1" libc = "0.2" [features] diff --git a/compio-runtime/src/event/eventfd.rs b/compio-runtime/src/event/eventfd.rs index 0c07e90e..83748c3b 100644 --- a/compio-runtime/src/event/eventfd.rs +++ b/compio-runtime/src/event/eventfd.rs @@ -19,7 +19,7 @@ pub struct Event { impl Event { /// Create [`Event`]. pub fn new() -> io::Result { - let fd = syscall!(eventfd(0, 0))?; + let fd = syscall!(eventfd(0, libc::EFD_CLOEXEC))?; let fd = unsafe { OwnedFd::from_raw_fd(fd) }; Ok(Self { fd, diff --git a/compio-runtime/src/event/pipe.rs b/compio-runtime/src/event/pipe.rs index ccedf1c2..ef2a7bd4 100644 --- a/compio-runtime/src/event/pipe.rs +++ b/compio-runtime/src/event/pipe.rs @@ -1,6 +1,6 @@ use std::{ io, - os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}, + os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, }; use compio_buf::{arrayvec::ArrayVec, BufResult}; @@ -20,14 +20,11 @@ pub struct Event { impl Event { /// Create [`Event`]. pub fn new() -> io::Result { - let mut fds = [-1, -1]; - syscall!(pipe(fds.as_mut_ptr()))?; - let receiver = unsafe { OwnedFd::from_raw_fd(fds[0]) }; - let sender = unsafe { OwnedFd::from_raw_fd(fds[1]) }; + let (receiver, sender) = os_pipe::pipe()?; + let receiver = unsafe { OwnedFd::from_raw_fd(receiver.into_raw_fd()) }; + let sender = unsafe { OwnedFd::from_raw_fd(sender.into_raw_fd()) }; - syscall!(fcntl(receiver.as_raw_fd(), libc::F_SETFD, libc::FD_CLOEXEC))?; syscall!(fcntl(receiver.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK))?; - syscall!(fcntl(sender.as_raw_fd(), libc::F_SETFD, libc::FD_CLOEXEC))?; Ok(Self { sender, receiver, From a9a6d8a5a993dd5d084d2d9cc0f23498493e7521 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Wed, 11 Oct 2023 20:34:43 +0800 Subject: [PATCH 3/8] Recognize usize::MAX as -1. --- compio-driver/src/iour/op.rs | 4 ++-- compio-driver/src/poll/op.rs | 15 ++++----------- compio-driver/src/unix/op.rs | 24 +++++++++++++++++++++--- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/compio-driver/src/iour/op.rs b/compio-driver/src/iour/op.rs index 155ad252..1c578d8f 100644 --- a/compio-driver/src/iour/op.rs +++ b/compio-driver/src/iour/op.rs @@ -21,7 +21,7 @@ impl OpCode for ReadAt { let fd = Fd(self.fd); let slice = self.buffer.as_uninit_slice(); opcode::Read::new(fd, slice.as_mut_ptr() as _, slice.len() as _) - .offset(self.offset as _) + .offset(self.offset()) .build() } } @@ -30,7 +30,7 @@ impl OpCode for WriteAt { fn create_entry(self: Pin<&mut Self>) -> Entry { let slice = self.buffer.as_slice(); opcode::Write::new(Fd(self.fd), slice.as_ptr(), slice.len() as _) - .offset(self.offset as _) + .offset(self.offset()) .build() } } diff --git a/compio-driver/src/poll/op.rs b/compio-driver/src/poll/op.rs index 8aa63437..10cadd44 100644 --- a/compio-driver/src/poll/op.rs +++ b/compio-driver/src/poll/op.rs @@ -20,7 +20,7 @@ impl OpCode for ReadAt { fd, slice.as_mut_ptr() as _, slice.len() as _, - self.offset as _ + self.offset() ))? as _)) } else { Ok(Decision::wait_readable(self.fd)) @@ -33,14 +33,7 @@ impl OpCode for ReadAt { let fd = self.fd; let slice = self.buffer.as_uninit_slice(); - syscall!( - break pread( - fd, - slice.as_mut_ptr() as _, - slice.len() as _, - self.offset as _ - ) - ) + syscall!(break pread(fd, slice.as_mut_ptr() as _, slice.len() as _, self.offset())) } } @@ -52,7 +45,7 @@ impl OpCode for WriteAt { self.fd, slice.as_ptr() as _, slice.len() as _, - self.offset as _ + self.offset() ))? as _)) } else { Ok(Decision::wait_writable(self.fd)) @@ -69,7 +62,7 @@ impl OpCode for WriteAt { self.fd, slice.as_ptr() as _, slice.len() as _, - self.offset as _ + self.offset() ) ) } diff --git a/compio-driver/src/unix/op.rs b/compio-driver/src/unix/op.rs index 8df6b7e2..e4e6e2af 100644 --- a/compio-driver/src/unix/op.rs +++ b/compio-driver/src/unix/op.rs @@ -4,9 +4,27 @@ use compio_buf::{IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut}; use libc::{sockaddr_storage, socklen_t}; use socket2::SockAddr; -#[cfg(doc)] -use crate::op::*; -use crate::RawFd; +use crate::{op::*, RawFd}; + +impl ReadAt { + pub(crate) fn offset(&self) -> u64 { + if self.offset == usize::MAX { + u64::MAX + } else { + self.offset as _ + } + } +} + +impl WriteAt { + pub(crate) fn offset(&self) -> u64 { + if self.offset == usize::MAX { + u64::MAX + } else { + self.offset as _ + } + } +} /// Accept a connection. pub struct Accept { From 9c8f92f4c6b493800c455506fcd54b3ee0139104 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Wed, 18 Oct 2023 11:45:21 +0800 Subject: [PATCH 4/8] Remove offset() --- compio-driver/src/iour/op.rs | 4 ++-- compio-driver/src/poll/op.rs | 15 +++++++++++---- compio-driver/src/unix/op.rs | 20 -------------------- 3 files changed, 13 insertions(+), 26 deletions(-) diff --git a/compio-driver/src/iour/op.rs b/compio-driver/src/iour/op.rs index 08497aaa..dd0c165b 100644 --- a/compio-driver/src/iour/op.rs +++ b/compio-driver/src/iour/op.rs @@ -19,7 +19,7 @@ impl OpCode for ReadAt { let fd = Fd(self.fd); let slice = self.buffer.as_uninit_slice(); opcode::Read::new(fd, slice.as_mut_ptr() as _, slice.len() as _) - .offset(self.offset()) + .offset(self.offset) .build() } } @@ -28,7 +28,7 @@ impl OpCode for WriteAt { fn create_entry(self: Pin<&mut Self>) -> Entry { let slice = self.buffer.as_slice(); opcode::Write::new(Fd(self.fd), slice.as_ptr(), slice.len() as _) - .offset(self.offset()) + .offset(self.offset) .build() } } diff --git a/compio-driver/src/poll/op.rs b/compio-driver/src/poll/op.rs index 818f1dea..b3a168cb 100644 --- a/compio-driver/src/poll/op.rs +++ b/compio-driver/src/poll/op.rs @@ -18,7 +18,7 @@ impl OpCode for ReadAt { fd, slice.as_mut_ptr() as _, slice.len() as _, - self.offset() + self.offset as _ ))? as _)) } else { Ok(Decision::wait_readable(self.fd)) @@ -31,7 +31,14 @@ impl OpCode for ReadAt { let fd = self.fd; let slice = self.buffer.as_uninit_slice(); - syscall!(break pread(fd, slice.as_mut_ptr() as _, slice.len() as _, self.offset())) + syscall!( + break pread( + fd, + slice.as_mut_ptr() as _, + slice.len() as _, + self.offset as _ + ) + ) } } @@ -43,7 +50,7 @@ impl OpCode for WriteAt { self.fd, slice.as_ptr() as _, slice.len() as _, - self.offset() + self.offset as _ ))? as _)) } else { Ok(Decision::wait_writable(self.fd)) @@ -60,7 +67,7 @@ impl OpCode for WriteAt { self.fd, slice.as_ptr() as _, slice.len() as _, - self.offset() + self.offset as _ ) ) } diff --git a/compio-driver/src/unix/op.rs b/compio-driver/src/unix/op.rs index 5eec0429..af28db90 100644 --- a/compio-driver/src/unix/op.rs +++ b/compio-driver/src/unix/op.rs @@ -6,26 +6,6 @@ use socket2::SockAddr; use crate::{op::*, RawFd}; -impl ReadAt { - pub(crate) fn offset(&self) -> u64 { - if self.offset == usize::MAX { - u64::MAX - } else { - self.offset as _ - } - } -} - -impl WriteAt { - pub(crate) fn offset(&self) -> u64 { - if self.offset == usize::MAX { - u64::MAX - } else { - self.offset as _ - } - } -} - /// Accept a connection. pub struct Accept { pub(crate) fd: RawFd, From f7cf29bf2ca334bd10bb8a3662ffe630eb6ea17f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Wed, 18 Oct 2023 11:56:21 +0800 Subject: [PATCH 5/8] Use IO traits in unix pipe. --- compio-driver/src/unix/op.rs | 4 ++- compio-fs/src/pipe.rs | 51 ++++++++++++++++++++--------------- compio/examples/named_pipe.rs | 15 ++++++----- 3 files changed, 40 insertions(+), 30 deletions(-) diff --git a/compio-driver/src/unix/op.rs b/compio-driver/src/unix/op.rs index af28db90..cade0395 100644 --- a/compio-driver/src/unix/op.rs +++ b/compio-driver/src/unix/op.rs @@ -4,7 +4,9 @@ use compio_buf::{ use libc::{sockaddr_storage, socklen_t}; use socket2::SockAddr; -use crate::{op::*, RawFd}; +#[cfg(doc)] +use crate::op::*; +use crate::RawFd; /// Accept a connection. pub struct Accept { diff --git a/compio-fs/src/pipe.rs b/compio-fs/src/pipe.rs index 4ead0767..519471c4 100644 --- a/compio-fs/src/pipe.rs +++ b/compio-fs/src/pipe.rs @@ -5,8 +5,10 @@ use std::{io, os::unix::fs::FileTypeExt, path::Path}; use compio_driver::{impl_raw_fd, syscall, AsRawFd, FromRawFd, IntoRawFd}; #[cfg(feature = "runtime")] use { - compio_buf::{BufResult, IoBuf, IoBufMut}, - compio_runtime::impl_attachable, + compio_buf::{buf_try, BufResult, IntoInner, IoBuf, IoBufMut}, + compio_driver::op::{BufResultExt, Recv, Send}, + compio_io::{AsyncRead, AsyncWrite}, + compio_runtime::{impl_attachable, submit, Attachable}, }; use crate::File; @@ -15,9 +17,10 @@ use crate::File; /// /// ``` /// use compio_fs::pipe::anon_pipe; +/// use compio_io::{AsyncReadExt, AsyncWriteExt}; /// /// # compio_runtime::block_on(async { -/// let (rx, tx) = anon_pipe().unwrap(); +/// let (mut rx, mut tx) = anon_pipe().unwrap(); /// /// tx.write_all("Hello world!").await.unwrap(); /// let (_, buf) = rx.read_exact(Vec::with_capacity(12)).await.unwrap(); @@ -292,6 +295,7 @@ enum PipeEnd { /// /// ``` /// use compio_fs::pipe; +/// use compio_io::AsyncWriteExt; /// /// const FIFO_NAME: &str = "path/to/a/fifo"; /// @@ -317,17 +321,22 @@ impl Sender { set_nonblocking(&file)?; Ok(Sender { file }) } +} + +#[cfg(feature = "runtime")] +impl AsyncWrite for Sender { + async fn write(&mut self, buffer: T) -> BufResult { + let ((), buffer) = buf_try!(self.attach(), buffer); + let op = Send::new(self.as_raw_fd(), buffer); + submit(op).await.into_inner() + } - /// Write a buffer into the pipe, returning how many bytes were written. - #[cfg(feature = "runtime")] - pub async fn write(&self, buffer: T) -> BufResult { - self.file.write_at(buffer, usize::MAX).await + async fn flush(&mut self) -> io::Result<()> { + Ok(()) } - /// Write all bytes into the pipe. - #[cfg(feature = "runtime")] - pub async fn write_all(&self, buffer: T) -> BufResult { - self.file.write_all_at(buffer, usize::MAX).await + async fn shutdown(&mut self) -> io::Result<()> { + Ok(()) } } @@ -349,6 +358,7 @@ impl_attachable!(Sender, file); /// /// use compio_buf::BufResult; /// use compio_fs::pipe; +/// use compio_io::AsyncReadExt; /// /// const FIFO_NAME: &str = "path/to/a/fifo"; /// @@ -386,6 +396,7 @@ impl_attachable!(Sender, file); /// /// ``` /// use compio_fs::pipe; +/// use compio_io::AsyncReadExt; /// /// const FIFO_NAME: &str = "path/to/a/fifo"; /// @@ -413,18 +424,14 @@ impl Receiver { set_nonblocking(&file)?; Ok(Receiver { file }) } +} - /// Read some bytes from the pipe into the specified - /// buffer, returning how many bytes were read. - #[cfg(feature = "runtime")] - pub async fn read(&self, buffer: T) -> BufResult { - self.file.read_at(buffer, usize::MAX).await - } - - /// Read the exact number of bytes from the pipe. - #[cfg(feature = "runtime")] - pub async fn read_exact(&self, buffer: T) -> BufResult { - self.file.read_exact_at(buffer, usize::MAX).await +#[cfg(feature = "runtime")] +impl AsyncRead for Receiver { + async fn read(&mut self, buffer: B) -> BufResult { + let ((), buffer) = buf_try!(self.attach(), buffer); + let op = Recv::new(self.as_raw_fd(), buffer); + submit(op).await.into_inner().map_advanced() } } diff --git a/compio/examples/named_pipe.rs b/compio/examples/named_pipe.rs index b5359bad..b26ac2ba 100644 --- a/compio/examples/named_pipe.rs +++ b/compio/examples/named_pipe.rs @@ -1,12 +1,13 @@ +use compio::{ + io::{AsyncReadExt, AsyncWriteExt}, + BufResult, +}; + #[compio::main(crate = "compio")] async fn main() { #[cfg(windows)] { - use compio::{ - buf::BufResult, - fs::named_pipe::{ClientOptions, ServerOptions}, - io::{AsyncReadExt, AsyncWriteExt}, - }; + use compio::fs::named_pipe::{ClientOptions, ServerOptions}; const PIPE_NAME: &str = r"\\.\pipe\compio-named-pipe"; @@ -29,7 +30,7 @@ async fn main() { } #[cfg(unix)] { - use compio::{buf::IntoInner, fs::pipe::OpenOptions, runtime::Unattached, BufResult}; + use compio::{buf::IntoInner, fs::pipe::OpenOptions, runtime::Unattached}; use nix::{sys::stat::Mode, unistd::mkfifo}; use tempfile::tempdir; @@ -38,7 +39,7 @@ async fn main() { mkfifo(&file, Mode::S_IRWXU).unwrap(); - let (rx, tx) = std::thread::scope(|s| { + let (mut rx, mut tx) = std::thread::scope(|s| { let rx = s.spawn(|| { Unattached::new(OpenOptions::new().open_receiver(&file).unwrap()).unwrap() }); From 73d481176035ba8b203713290ef1ac5565db1930 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Wed, 18 Oct 2023 11:58:32 +0800 Subject: [PATCH 6/8] Rename anon_pipe to anonymous --- compio-fs/src/pipe.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/compio-fs/src/pipe.rs b/compio-fs/src/pipe.rs index 519471c4..1552569a 100644 --- a/compio-fs/src/pipe.rs +++ b/compio-fs/src/pipe.rs @@ -16,18 +16,18 @@ use crate::File; /// Creates a pair of anonymous pipe. /// /// ``` -/// use compio_fs::pipe::anon_pipe; +/// use compio_fs::pipe::anonymous; /// use compio_io::{AsyncReadExt, AsyncWriteExt}; /// /// # compio_runtime::block_on(async { -/// let (mut rx, mut tx) = anon_pipe().unwrap(); +/// let (mut rx, mut tx) = anonymous().unwrap(); /// /// tx.write_all("Hello world!").await.unwrap(); /// let (_, buf) = rx.read_exact(Vec::with_capacity(12)).await.unwrap(); /// assert_eq!(&buf, b"Hello world!"); /// # }); /// ``` -pub fn anon_pipe() -> io::Result<(Receiver, Sender)> { +pub fn anonymous() -> io::Result<(Receiver, Sender)> { let (receiver, sender) = os_pipe::pipe()?; let receiver = Receiver::from_file(unsafe { File::from_raw_fd(receiver.into_raw_fd()) })?; let sender = Sender::from_file(unsafe { File::from_raw_fd(sender.into_raw_fd()) })?; From 589c3916ee925d2828d95ed7409a81f138997b51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Wed, 18 Oct 2023 12:08:01 +0800 Subject: [PATCH 7/8] Ignore read_write examples. --- compio-fs/src/pipe.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/compio-fs/src/pipe.rs b/compio-fs/src/pipe.rs index 1552569a..c698ba92 100644 --- a/compio-fs/src/pipe.rs +++ b/compio-fs/src/pipe.rs @@ -65,7 +65,7 @@ pub fn anonymous() -> io::Result<(Receiver, Sender)> { /// /// Opening a [`Sender`] on Linux when you are sure the file is a FIFO: /// -/// ``` +/// ```ignore /// use compio_fs::pipe; /// use nix::{sys::stat::Mode, unistd::mkfifo}; /// @@ -293,7 +293,7 @@ enum PipeEnd { /// Note that using read-write access mode with FIFO files is not defined by /// the POSIX standard and it is only guaranteed to work on Linux. /// -/// ``` +/// ```ignore /// use compio_fs::pipe; /// use compio_io::AsyncWriteExt; /// @@ -394,7 +394,7 @@ impl_attachable!(Sender, file); /// Note that using read-write access mode with FIFO files is not defined by /// the POSIX standard and it is only guaranteed to work on Linux. /// -/// ``` +/// ```ignore /// use compio_fs::pipe; /// use compio_io::AsyncReadExt; /// From f0770ec6ac45244d73bae4a3624fc24423056ce2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Wed, 18 Oct 2023 21:33:53 +0800 Subject: [PATCH 8/8] Add readv & writev support for pipes. --- compio-fs/src/pipe.rs | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/compio-fs/src/pipe.rs b/compio-fs/src/pipe.rs index c698ba92..6708952a 100644 --- a/compio-fs/src/pipe.rs +++ b/compio-fs/src/pipe.rs @@ -6,7 +6,7 @@ use compio_driver::{impl_raw_fd, syscall, AsRawFd, FromRawFd, IntoRawFd}; #[cfg(feature = "runtime")] use { compio_buf::{buf_try, BufResult, IntoInner, IoBuf, IoBufMut}, - compio_driver::op::{BufResultExt, Recv, Send}, + compio_driver::op::{BufResultExt, Recv, RecvVectored, Send, SendVectored}, compio_io::{AsyncRead, AsyncWrite}, compio_runtime::{impl_attachable, submit, Attachable}, }; @@ -331,6 +331,15 @@ impl AsyncWrite for Sender { submit(op).await.into_inner() } + async fn write_vectored( + &mut self, + buffer: T, + ) -> BufResult { + let ((), buffer) = buf_try!(self.attach(), buffer); + let op = SendVectored::new(self.as_raw_fd(), buffer); + submit(op).await.into_inner() + } + async fn flush(&mut self) -> io::Result<()> { Ok(()) } @@ -433,6 +442,18 @@ impl AsyncRead for Receiver { let op = Recv::new(self.as_raw_fd(), buffer); submit(op).await.into_inner().map_advanced() } + + async fn read_vectored( + &mut self, + buffer: V, + ) -> BufResult + where + V: Unpin + 'static, + { + let ((), buffer) = buf_try!(self.attach(), buffer); + let op = RecvVectored::new(self.as_raw_fd(), buffer); + submit(op).await.into_inner().map_advanced() + } } impl_raw_fd!(Receiver, file);