From 4bdbf59fe5c1fd3409aa027d0ec0ea50065b39bb Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Sun, 4 Mar 2018 21:17:08 -0700 Subject: [PATCH 1/4] Change sys::aio::lio_listio to sys::aio::LioCb::listio The new LioCb structure allows us to control the exact arguments passed to lio_listio, guaranteeing that each call gets a unique storage location for the list argument. This prevents clients from misusing lio_listio in a way that causes events to get dropped from a kqueue Fixes #870 --- CHANGELOG.md | 4 ++ src/sys/aio.rs | 166 ++++++++++++++++++++++++++++--------------- test/sys/test_aio.rs | 89 +++++++++++++---------- 3 files changed, 163 insertions(+), 96 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7c7b9d24fd..c8731bd268 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ This project adheres to [Semantic Versioning](http://semver.org/). ### Added - Added `SO_MARK` on Linux. - ([#873](https://github.com/nix-rust/nix/pull/873)) +- Added `sys::aio::LioCb` as a wrapper for `libc::lio_listio`. + ([#872](https://github.com/nix-rust/nix/pull/872)) - Added `getsid` in `::nix::unistd` ([#850](https://github.com/nix-rust/nix/pull/850)) - Added `alarm`. ([#830](https://github.com/nix-rust/nix/pull/830)) @@ -31,6 +33,8 @@ This project adheres to [Semantic Versioning](http://semver.org/). ([#837](https://github.com/nix-rust/nix/pull/837)) ### Removed +- Removed `sys::aio::lio_listio`. Use `sys::aio::LioCb::listio` instead. + ([#872](https://github.com/nix-rust/nix/pull/872)) ## [0.10.0] 2018-01-26 diff --git a/src/sys/aio.rs b/src/sys/aio.rs index fe8e9ed2cb..745ce71bb7 100644 --- a/src/sys/aio.rs +++ b/src/sys/aio.rs @@ -965,63 +965,6 @@ pub fn aio_suspend(list: &[&AioCb], timeout: Option) -> Result<()> { }).map(drop) } - -/// Submits multiple asynchronous I/O requests with a single system call. -/// -/// They are not guaranteed to complete atomically, and the order in which the -/// requests are carried out is not specified. Reads, writes, and fsyncs may be -/// freely mixed. -/// -/// This function is useful for reducing the context-switch overhead of -/// submitting many AIO operations. It can also be used with -/// `LioMode::LIO_WAIT` to block on the result of several independent -/// operations. Used that way, it is often useful in programs that otherwise -/// make little use of AIO. -/// -/// # Examples -/// -/// Use `lio_listio` to submit an aio operation and wait for its completion. In -/// this case, there is no need to use `aio_suspend` to wait or `AioCb#error` to -/// poll. -/// -/// ``` -/// # extern crate tempfile; -/// # extern crate nix; -/// # use nix::sys::aio::*; -/// # use nix::sys::signal::SigevNotify; -/// # use std::os::unix::io::AsRawFd; -/// # use tempfile::tempfile; -/// # fn main() { -/// const WBUF: &[u8] = b"abcdef123456"; -/// let mut f = tempfile().unwrap(); -/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), -/// 2, //offset -/// WBUF, -/// 0, //priority -/// SigevNotify::SigevNone, -/// LioOpcode::LIO_WRITE); -/// lio_listio(LioMode::LIO_WAIT, -/// &[&mut aiocb], -/// SigevNotify::SigevNone).unwrap(); -/// assert_eq!(aiocb.aio_return().unwrap() as usize, WBUF.len()); -/// # } -/// ``` -/// -/// # References -/// -/// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) -#[cfg(not(any(target_os = "ios", target_os = "macos")))] -pub fn lio_listio(mode: LioMode, list: &[&mut AioCb], - sigev_notify: SigevNotify) -> Result<()> { - let sigev = SigEvent::new(sigev_notify); - let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; - let plist = list as *const [&mut AioCb] as *const [*mut libc::aiocb]; - let p = plist as *const *mut libc::aiocb; - Errno::result(unsafe { - libc::lio_listio(mode as i32, p, list.len() as i32, sigevp) - }).map(drop) -} - impl<'a> Debug for AioCb<'a> { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_struct("AioCb") @@ -1045,3 +988,112 @@ impl<'a> Drop for AioCb<'a> { assert!(!self.in_progress, "Dropped an in-progress AioCb"); } } + +/// LIO Control Block. +/// +/// The basic structure used to issue multiple AIO operations simultaneously. +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +pub struct LioCb<'a> { + /// A collection of [`AioCb`]s. All of these will be issued simultaneously + /// by the [`listio`] method. + /// + /// [`AioCb`]: struct.AioCb.html + /// [`listio`]: #method.listio + pub aiocbs: Vec>, + + /// The actual list passed to `libc::lio_listio`. + /// + /// It must live for as long as any of the operations are still being + /// processesed, because the aio subsystem uses its address as a unique + /// identifier. + list: Vec<*mut libc::aiocb> +} + +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +impl<'a> LioCb<'a> { + /// Initialize an empty `LioCb` + pub fn with_capacity(capacity: usize) -> LioCb<'a> { + LioCb { + aiocbs: Vec::with_capacity(capacity), + list: Vec::with_capacity(capacity) + } + } + + /// Submits multiple asynchronous I/O requests with a single system call. + /// + /// They are not guaranteed to complete atomically, and the order in which + /// the requests are carried out is not specified. Reads, writes, and + /// fsyncs may be freely mixed. + /// + /// This function is useful for reducing the context-switch overhead of + /// submitting many AIO operations. It can also be used with + /// `LioMode::LIO_WAIT` to block on the result of several independent + /// operations. Used that way, it is often useful in programs that + /// otherwise make little use of AIO. + /// + /// # Examples + /// + /// Use `listio` to submit an aio operation and wait for its completion. In + /// this case, there is no need to use `aio_suspend` to wait or + /// `AioCb#error` to poll. + /// + /// ``` + /// # extern crate tempfile; + /// # extern crate nix; + /// # use nix::sys::aio::*; + /// # use nix::sys::signal::SigevNotify; + /// # use std::os::unix::io::AsRawFd; + /// # use tempfile::tempfile; + /// # fn main() { + /// const WBUF: &[u8] = b"abcdef123456"; + /// let mut f = tempfile().unwrap(); + /// let mut liocb = LioCb::with_capacity(1); + /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(), + /// 2, //offset + /// WBUF, + /// 0, //priority + /// SigevNotify::SigevNone, + /// LioOpcode::LIO_WRITE)); + /// liocb.listio(LioMode::LIO_WAIT, + /// SigevNotify::SigevNone).unwrap(); + /// assert_eq!(liocb.aiocbs[0].aio_return().unwrap() as usize, WBUF.len()); + /// # } + /// ``` + /// + /// # References + /// + /// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) + pub fn listio(&mut self, mode: LioMode, + sigev_notify: SigevNotify) -> Result<()> { + let sigev = SigEvent::new(sigev_notify); + let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; + self.list.clear(); + for a in self.aiocbs.iter_mut() { + self.list.push(a as *mut AioCb<'a> + as *mut libc::aiocb); + } + let p = self.list.as_ptr(); + Errno::result(unsafe { + libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) + }).map(|_| ()) + } +} + +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +impl<'a> Debug for LioCb<'a> { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + fmt.debug_struct("LioCb") + .field("aiocbs", &self.aiocbs) + .finish() + } +} + +#[cfg(not(any(target_os = "ios", target_os = "macos")))] +impl<'a> From>> for LioCb<'a> { + fn from(src: Vec>) -> LioCb<'a> { + LioCb { + list: Vec::with_capacity(src.capacity()), + aiocbs: src, + } + } +} diff --git a/test/sys/test_aio.rs b/test/sys/test_aio.rs index f88fc2688d..f11c7ef939 100644 --- a/test/sys/test_aio.rs +++ b/test/sys/test_aio.rs @@ -505,12 +505,12 @@ fn test_write_sigev_signal() { assert!(rbuf == EXPECT); } -// Test lio_listio with LIO_WAIT, so all AIO ops should be complete by the time -// lio_listio returns. +// Test LioCb::listio with LIO_WAIT, so all AIO ops should be complete by the +// time listio returns. #[test] #[cfg(not(any(target_os = "ios", target_os = "macos")))] #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_lio_listio_wait() { +fn test_liocb_listio_wait() { const INITIAL: &[u8] = b"abcdef123456"; const WBUF: &[u8] = b"CDEF"; let mut rbuf = vec![0; 4]; @@ -522,24 +522,27 @@ fn test_lio_listio_wait() { f.write_all(INITIAL).unwrap(); { - let mut wcb = AioCb::from_slice( f.as_raw_fd(), + let wcb = AioCb::from_slice( f.as_raw_fd(), 2, //offset WBUF, 0, //priority SigevNotify::SigevNone, LioOpcode::LIO_WRITE); - let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(), + let rcb = AioCb::from_mut_slice( f.as_raw_fd(), 8, //offset &mut rbuf, 0, //priority SigevNotify::SigevNone, LioOpcode::LIO_READ); - let err = lio_listio(LioMode::LIO_WAIT, &[&mut wcb, &mut rcb], SigevNotify::SigevNone); - err.expect("lio_listio failed"); - - assert!(wcb.aio_return().unwrap() as usize == WBUF.len()); - assert!(rcb.aio_return().unwrap() as usize == rlen); + let mut liocb = LioCb::with_capacity(2); + liocb.aiocbs.push(wcb); + liocb.aiocbs.push(rcb); + let err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone); + err.expect("lio_listio"); + + assert!(liocb.aiocbs[0].aio_return().unwrap() as usize == WBUF.len()); + assert!(liocb.aiocbs[1].aio_return().unwrap() as usize == rlen); } assert!(rbuf.deref().deref() == b"3456"); @@ -549,12 +552,12 @@ fn test_lio_listio_wait() { assert!(rbuf2 == EXPECT); } -// Test lio_listio with LIO_NOWAIT and no SigEvent, so we must use some other +// Test LioCb::listio with LIO_NOWAIT and no SigEvent, so we must use some other // mechanism to check for the individual AioCb's completion. #[test] #[cfg(not(any(target_os = "ios", target_os = "macos")))] #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_lio_listio_nowait() { +fn test_liocb_listio_nowait() { const INITIAL: &[u8] = b"abcdef123456"; const WBUF: &[u8] = b"CDEF"; let mut rbuf = vec![0; 4]; @@ -566,26 +569,29 @@ fn test_lio_listio_nowait() { f.write_all(INITIAL).unwrap(); { - let mut wcb = AioCb::from_slice( f.as_raw_fd(), + let wcb = AioCb::from_slice( f.as_raw_fd(), 2, //offset WBUF, 0, //priority SigevNotify::SigevNone, LioOpcode::LIO_WRITE); - let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(), + let rcb = AioCb::from_mut_slice( f.as_raw_fd(), 8, //offset &mut rbuf, 0, //priority SigevNotify::SigevNone, LioOpcode::LIO_READ); - let err = lio_listio(LioMode::LIO_NOWAIT, &[&mut wcb, &mut rcb], SigevNotify::SigevNone); - err.expect("lio_listio failed"); - - poll_aio(&mut wcb).unwrap(); - poll_aio(&mut rcb).unwrap(); - assert!(wcb.aio_return().unwrap() as usize == WBUF.len()); - assert!(rcb.aio_return().unwrap() as usize == rlen); + let mut liocb = LioCb::with_capacity(2); + liocb.aiocbs.push(wcb); + liocb.aiocbs.push(rcb); + let err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone); + err.expect("lio_listio"); + + poll_aio(&mut liocb.aiocbs[0]).unwrap(); + poll_aio(&mut liocb.aiocbs[1]).unwrap(); + assert!(liocb.aiocbs[0].aio_return().unwrap() as usize == WBUF.len()); + assert!(liocb.aiocbs[1].aio_return().unwrap() as usize == rlen); } assert!(rbuf.deref().deref() == b"3456"); @@ -595,13 +601,13 @@ fn test_lio_listio_nowait() { assert!(rbuf2 == EXPECT); } -// Test lio_listio with LIO_NOWAIT and a SigEvent to indicate when all AioCb's -// are complete. +// Test LioCb::listio with LIO_NOWAIT and a SigEvent to indicate when all +// AioCb's are complete. // FIXME: This test is ignored on mips/mips64 because of failures in qemu in CI. #[test] #[cfg(not(any(target_os = "ios", target_os = "macos")))] #[cfg_attr(any(target_arch = "mips", target_arch = "mips64", target_env = "musl"), ignore)] -fn test_lio_listio_signal() { +fn test_liocb_listio_signal() { #[allow(unused_variables)] let m = ::SIGNAL_MTX.lock().expect("Mutex got poisoned by another test"); const INITIAL: &[u8] = b"abcdef123456"; @@ -620,29 +626,32 @@ fn test_lio_listio_signal() { f.write_all(INITIAL).unwrap(); { - let mut wcb = AioCb::from_slice( f.as_raw_fd(), + let wcb = AioCb::from_slice( f.as_raw_fd(), 2, //offset WBUF, 0, //priority SigevNotify::SigevNone, LioOpcode::LIO_WRITE); - let mut rcb = AioCb::from_mut_slice( f.as_raw_fd(), + let rcb = AioCb::from_mut_slice( f.as_raw_fd(), 8, //offset &mut rbuf, 0, //priority SigevNotify::SigevNone, LioOpcode::LIO_READ); + let mut liocb = LioCb::with_capacity(2); + liocb.aiocbs.push(wcb); + liocb.aiocbs.push(rcb); SIGNALED.store(false, Ordering::Relaxed); unsafe { sigaction(Signal::SIGUSR2, &sa) }.unwrap(); - let err = lio_listio(LioMode::LIO_NOWAIT, &[&mut wcb, &mut rcb], sigev_notify); - err.expect("lio_listio failed"); + let err = liocb.listio(LioMode::LIO_NOWAIT, sigev_notify); + err.expect("lio_listio"); while !SIGNALED.load(Ordering::Relaxed) { thread::sleep(time::Duration::from_millis(10)); } - assert!(wcb.aio_return().unwrap() as usize == WBUF.len()); - assert!(rcb.aio_return().unwrap() as usize == rlen); + assert!(liocb.aiocbs[0].aio_return().unwrap() as usize == WBUF.len()); + assert!(liocb.aiocbs[1].aio_return().unwrap() as usize == rlen); } assert!(rbuf.deref().deref() == b"3456"); @@ -652,22 +661,24 @@ fn test_lio_listio_signal() { assert!(rbuf2 == EXPECT); } -// Try to use lio_listio to read into an immutable buffer. It should fail +// Try to use LioCb::listio to read into an immutable buffer. It should fail // FIXME: This test fails to panic on Linux/musl #[test] #[cfg(not(any(target_os = "ios", target_os = "macos")))] #[should_panic(expected = "Can't read into an immutable buffer")] #[cfg_attr(target_env = "musl", ignore)] -fn test_lio_listio_read_immutable() { +fn test_liocb_listio_read_immutable() { let rbuf: &[u8] = b"abcd"; let f = tempfile().unwrap(); - let mut rcb = AioCb::from_slice( f.as_raw_fd(), - 2, //offset - rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_READ); - let _ = lio_listio(LioMode::LIO_NOWAIT, &[&mut rcb], SigevNotify::SigevNone); + let mut liocb = LioCb::from(vec![ + AioCb::from_slice( f.as_raw_fd(), + 2, //offset + rbuf, + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_READ) + ]); + let _ = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone); } From 6703bc8a654f4a4cff352a1959c08217f878adb0 Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Tue, 6 Mar 2018 19:12:30 -0700 Subject: [PATCH 2/4] Fix an annoying double panic A double panic can screw up the first panic's stack trace. Better not to assert! anything when the thread is already panicing. --- src/sys/aio.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/sys/aio.rs b/src/sys/aio.rs index 745ce71bb7..5cb3b81850 100644 --- a/src/sys/aio.rs +++ b/src/sys/aio.rs @@ -32,6 +32,7 @@ use std::marker::PhantomData; use std::mem; use std::ops::Deref; use std::ptr::{null, null_mut}; +use std::thread; use sys::signal::*; use sys::time::TimeSpec; @@ -985,7 +986,8 @@ impl<'a> Drop for AioCb<'a> { /// If the `AioCb` has no remaining state in the kernel, just drop it. /// Otherwise, dropping constitutes a resource leak, which is an error fn drop(&mut self) { - assert!(!self.in_progress, "Dropped an in-progress AioCb"); + assert!(thread::panicking() || !self.in_progress, + "Dropped an in-progress AioCb"); } } From 4729935dec575d1d0353f3cdf7e318b4157d2cc3 Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Tue, 6 Mar 2018 19:28:16 -0700 Subject: [PATCH 3/4] Replace AioCb::from_bytes with more generic from_boxed_slice Supporting the bytes crate was unnecessarily specific. This change replaces from_bytes and from_bytes_mut with from_boxed_slice and from_boxed_mut_slice, which can work with anything that implements Borrow<[u8]> and BorrowMut<[u8]>, respectively. --- CHANGELOG.md | 5 + Cargo.toml | 8 +- src/lib.rs | 1 - src/sys/aio.rs | 309 +++++++++++++++++++++++++------------------ test/sys/test_aio.rs | 50 ++----- 5 files changed, 199 insertions(+), 174 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8731bd268..2991fbbeba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,8 @@ This project adheres to [Semantic Versioning](http://semver.org/). ### Added - Added `SO_MARK` on Linux. - ([#873](https://github.com/nix-rust/nix/pull/873)) +- Added safe support for nearly any buffer type in the `sys::aio` module. + ([#872](https://github.com/nix-rust/nix/pull/872)) - Added `sys::aio::LioCb` as a wrapper for `libc::lio_listio`. ([#872](https://github.com/nix-rust/nix/pull/872)) - Added `getsid` in `::nix::unistd` @@ -33,6 +35,9 @@ This project adheres to [Semantic Versioning](http://semver.org/). ([#837](https://github.com/nix-rust/nix/pull/837)) ### Removed +- Removed explicit support for the `bytes` crate from the `sys::aio` module. + See `sys::aio::AioCb::from_boxed_slice`s examples for alternatives. + ([#872](https://github.com/nix-rust/nix/pull/872)) - Removed `sys::aio::lio_listio`. Use `sys::aio::LioCb::listio` instead. ([#872](https://github.com/nix-rust/nix/pull/872)) diff --git a/Cargo.toml b/Cargo.toml index 651c2184ce..41471de465 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,15 +17,13 @@ bitflags = "1.0" cfg-if = "0.1.0" void = "1.0.2" -[dependencies.bytes] -version = "0.4.5" -# Don't include the optional serde feature -default-features = false - [target.'cfg(target_os = "dragonfly")'.build-dependencies] gcc = "0.3" [dev-dependencies] +# The examples use a new feature of Bytes which should be available in 0.4.7 +# https://github.com/carllerche/bytes/pull/192 +bytes = { git = "https://github.com/carllerche/bytes", rev = "ae1b454" } lazy_static = "1" rand = "0.4" tempdir = "0.3" diff --git a/src/lib.rs b/src/lib.rs index 3873f4a18b..07e84c1272 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,6 @@ #![deny(missing_debug_implementations)] // External crates -extern crate bytes; #[macro_use] extern crate bitflags; #[macro_use] diff --git a/src/sys/aio.rs b/src/sys/aio.rs index 5cb3b81850..8a958c8411 100644 --- a/src/sys/aio.rs +++ b/src/sys/aio.rs @@ -21,16 +21,15 @@ //! not support this for all filesystems and devices. use {Error, Result}; -use bytes::{Bytes, BytesMut}; use errno::Errno; use std::os::unix::io::RawFd; use libc::{c_void, off_t, size_t}; use libc; +use std::borrow::{Borrow, BorrowMut}; use std::fmt; use std::fmt::Debug; use std::marker::PhantomData; use std::mem; -use std::ops::Deref; use std::ptr::{null, null_mut}; use std::thread; use sys::signal::*; @@ -93,46 +92,38 @@ pub enum AioCancelStat { /// Owns (uniquely or shared) a memory buffer to keep it from `Drop`ing while /// the kernel has a pointer to it. -#[derive(Clone, Debug)] pub enum Buffer<'a> { /// No buffer to own. /// /// Used for operations like `aio_fsync` that have no data, or for unsafe /// operations that work with raw pointers. None, - /// Immutable shared ownership `Bytes` object - // Must use out-of-line allocation so the address of the data will be - // stable. `Bytes` and `BytesMut` sometimes dynamically allocate a buffer, - // and sometimes inline the data within the struct itself. - Bytes(Bytes), - /// Mutable uniquely owned `BytesMut` object - BytesMut(BytesMut), /// Keeps a reference to a slice - Phantom(PhantomData<&'a mut [u8]>) + Phantom(PhantomData<&'a mut [u8]>), + /// Generic thing that keeps a buffer from dropping + BoxedSlice(Box>), + /// Generic thing that keeps a mutable buffer from dropping + BoxedMutSlice(Box>), } -impl<'a> Buffer<'a> { - /// Return the inner `Bytes`, if any - pub fn bytes(&self) -> Option<&Bytes> { - match *self { - Buffer::Bytes(ref x) => Some(x), - _ => None - } - } - - /// Return the inner `BytesMut`, if any - pub fn bytes_mut(&self) -> Option<&BytesMut> { - match *self { - Buffer::BytesMut(ref x) => Some(x), - _ => None - } - } - - /// Is this `Buffer` `None`? - pub fn is_none(&self) -> bool { - match *self { - Buffer::None => true, - _ => false, +impl<'a> Debug for Buffer<'a> { + // Note: someday it may be possible to Derive Debug for a trait object, but + // not today. + // https://github.com/rust-lang/rust/issues/1563 + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + match self { + &Buffer::None => write!(fmt, "None"), + &Buffer::Phantom(p) => p.fmt(fmt), + &Buffer::BoxedSlice(ref bs) => { + let borrowed : &Borrow<[u8]> = bs.borrow(); + write!(fmt, "BoxedSlice({:?})", + borrowed as *const Borrow<[u8]>) + }, + &Buffer::BoxedMutSlice(ref bms) => { + let borrowed : &BorrowMut<[u8]> = bms.borrow(); + write!(fmt, "BoxedMutSlice({:?})", + borrowed as *const BorrowMut<[u8]>) + } } } } @@ -150,7 +141,7 @@ pub struct AioCb<'a> { /// Optionally keeps a reference to the data. /// /// Used to keep buffers from `Drop`'ing, and may be returned once the - /// `AioCb` is completed by `into_buffer`. + /// `AioCb` is completed by [`buffer`](#method.buffer). buffer: Buffer<'a> } @@ -166,6 +157,50 @@ impl<'a> AioCb<'a> { x } + /// Remove the inner boxed slice, if any, and return it. + /// + /// The returned value will be the argument that was passed to + /// `from_boxed_slice` when this `AioCb` was created. + /// + /// It is an error to call this method while the `AioCb` is still in + /// progress. + pub fn boxed_slice(&mut self) -> Option>> { + assert!(!self.in_progress, "Can't remove the buffer from an AioCb that's still in-progress. Did you forget to call aio_return?"); + if let Buffer::BoxedSlice(_) = self.buffer { + let mut oldbuffer = Buffer::None; + mem::swap(&mut self.buffer, &mut oldbuffer); + if let Buffer::BoxedSlice(inner) = oldbuffer { + Some(inner) + } else { + unreachable!(); + } + } else { + None + } + } + + /// Remove the inner boxed mutable slice, if any, and return it. + /// + /// The returned value will be the argument that was passed to + /// `from_boxed_mut_slice` when this `AioCb` was created. + /// + /// It is an error to call this method while the `AioCb` is still in + /// progress. + pub fn boxed_mut_slice(&mut self) -> Option>> { + assert!(!self.in_progress, "Can't remove the buffer from an AioCb that's still in-progress. Did you forget to call aio_return?"); + if let Buffer::BoxedMutSlice(_) = self.buffer { + let mut oldbuffer = Buffer::None; + mem::swap(&mut self.buffer, &mut oldbuffer); + if let Buffer::BoxedMutSlice(inner) = oldbuffer { + Some(inner) + } else { + unreachable!(); + } + } else { + None + } + } + /// Returns the underlying file descriptor associated with the `AioCb` pub fn fd(&self) -> RawFd { self.aiocb.aio_fildes @@ -187,7 +222,7 @@ impl<'a> AioCb<'a> { /// # Examples /// /// Create an `AioCb` from a raw file descriptor and use it for an - /// [`fsync`](#method.from_bytes_mut) operation. + /// [`fsync`](#method.fsync) operation. /// /// ``` /// # extern crate tempfile; @@ -300,17 +335,19 @@ impl<'a> AioCb<'a> { } } - /// Constructs a new `AioCb` from a `Bytes` object. + /// The safest and most flexible way to create an `AioCb`. /// - /// Unlike `from_slice`, this method returns a structure suitable for + /// Unlike [`from_slice`], this method returns a structure suitable for /// placement on the heap. It may be used for write operations, but not - /// read operations. + /// read operations. Unlike `from_ptr`, this method will ensure that the + /// buffer doesn't `drop` while the kernel is still processing it. Any + /// object that can be borrowed as a boxed slice will work. /// /// # Parameters /// /// * `fd`: File descriptor. Required for all aio functions. /// * `offs`: File offset - /// * `buf`: A shared memory buffer + /// * `buf`: A boxed slice-like object /// * `prio`: If POSIX Prioritized IO is supported, then the /// operation will be prioritized at the process's /// priority level minus `prio` @@ -322,15 +359,13 @@ impl<'a> AioCb<'a> { /// /// # Examples /// - /// Create an `AioCb` from a `Bytes` object and use it for writing. + /// Create an `AioCb` from a Vector and use it for writing /// /// ``` - /// # extern crate bytes; /// # extern crate tempfile; /// # extern crate nix; /// # use nix::errno::Errno; /// # use nix::Error; - /// # use bytes::Bytes; /// # use nix::sys::aio::*; /// # use nix::sys::signal::SigevNotify; /// # use std::{thread, time}; @@ -338,11 +373,12 @@ impl<'a> AioCb<'a> { /// # use std::os::unix::io::AsRawFd; /// # use tempfile::tempfile; /// # fn main() { - /// let wbuf = Bytes::from(&b"CDEF"[..]); + /// let wbuf = Box::new(Vec::from("CDEF")); + /// let expected_len = wbuf.len(); /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_bytes( f.as_raw_fd(), + /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(), /// 2, //offset - /// wbuf.clone(), + /// wbuf, /// 0, //priority /// SigevNotify::SigevNone, /// LioOpcode::LIO_NOP); @@ -350,72 +386,103 @@ impl<'a> AioCb<'a> { /// while (aiocb.error() == Err(Error::from(Errno::EINPROGRESS))) { /// thread::sleep(time::Duration::from_millis(10)); /// } - /// assert_eq!(aiocb.aio_return().unwrap() as usize, wbuf.len()); + /// assert_eq!(aiocb.aio_return().unwrap() as usize, expected_len); + /// # } + /// ``` + /// + /// Create an `AioCb` from a `Bytes` object + /// + /// ``` + /// # extern crate bytes; + /// # extern crate tempfile; + /// # extern crate nix; + /// # use bytes::Bytes; + /// # use nix::sys::aio::*; + /// # use nix::sys::signal::SigevNotify; + /// # use std::os::unix::io::AsRawFd; + /// # use tempfile::tempfile; + /// # fn main() { + /// let wbuf = Box::new(Bytes::from(&b"CDEF"[..])); + /// let mut f = tempfile().unwrap(); + /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(), + /// 2, //offset + /// wbuf, + /// 0, //priority + /// SigevNotify::SigevNone, + /// LioOpcode::LIO_NOP); /// # } /// ``` - pub fn from_bytes(fd: RawFd, offs: off_t, buf: Bytes, + /// + /// If a library needs to work with buffers that aren't `Box`ed, it can + /// create a `Box`ed container for use with this method. Here's an example + /// using an un`Box`ed `Bytes` object. + /// + /// ``` + /// # extern crate bytes; + /// # extern crate tempfile; + /// # extern crate nix; + /// # use bytes::Bytes; + /// # use nix::sys::aio::*; + /// # use nix::sys::signal::SigevNotify; + /// # use std::borrow::Borrow; + /// # use std::os::unix::io::AsRawFd; + /// # use tempfile::tempfile; + /// struct BytesContainer(Bytes); + /// impl Borrow<[u8]> for BytesContainer { + /// fn borrow(&self) -> &[u8] { + /// self.0.as_ref() + /// } + /// } + /// fn main() { + /// let wbuf = Bytes::from(&b"CDEF"[..]); + /// let boxed_wbuf = Box::new(BytesContainer(wbuf)); + /// let mut f = tempfile().unwrap(); + /// let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(), + /// 2, //offset + /// boxed_wbuf, + /// 0, //priority + /// SigevNotify::SigevNone, + /// LioOpcode::LIO_NOP); + /// } + /// ``` + /// + /// [`from_slice`]: #method.from_slice + pub fn from_boxed_slice(fd: RawFd, offs: off_t, buf: Box>, prio: libc::c_int, sigev_notify: SigevNotify, opcode: LioOpcode) -> AioCb<'a> { - // Small BytesMuts are stored inline. Inline storage is a no-no, - // because we store a pointer to the buffer in the AioCb before - // returning the Buffer by move. If the buffer is too small, reallocate - // it to force out-of-line storage - // TODO: Add an is_inline() method to BytesMut, and a way to explicitly - // force out-of-line allocation. - let buf2 = if buf.len() < 64 { - // Reallocate to force out-of-line allocation - let mut ool = Bytes::with_capacity(64); - ool.extend_from_slice(buf.deref()); - ool - } else { - buf - }; let mut a = AioCb::common_init(fd, prio, sigev_notify); + { + let borrowed : &Borrow<[u8]> = buf.borrow(); + let slice : &[u8] = borrowed.borrow(); + a.aio_nbytes = slice.len() as size_t; + a.aio_buf = slice.as_ptr() as *mut c_void; + } a.aio_offset = offs; - a.aio_nbytes = buf2.len() as size_t; - a.aio_buf = buf2.as_ptr() as *mut c_void; a.aio_lio_opcode = opcode as libc::c_int; AioCb { aiocb: a, mutable: false, in_progress: false, - buffer: Buffer::Bytes(buf2), + buffer: Buffer::BoxedSlice(buf), } } - /// Constructs a new `AioCb` from a `BytesMut` object. - /// - /// Unlike `from_mut_slice`, this method returns a structure suitable for - /// placement on the heap. It may be used for both reads and writes. - /// - /// # Parameters + /// The safest and most flexible way to create an `AioCb` for reading. /// - /// * `fd`: File descriptor. Required for all aio functions. - /// * `offs`: File offset - /// * `buf`: An owned memory buffer - /// * `prio`: If POSIX Prioritized IO is supported, then the - /// operation will be prioritized at the process's - /// priority level minus `prio` - /// * `sigev_notify`: Determines how you will be notified of event - /// completion. - /// * `opcode`: This field is only used for `lio_listio`. It - /// determines which operation to use for this individual - /// aiocb + /// Like [`from_boxed_slice`], but the slice is a mutable one. More + /// flexible than [`from_mut_slice`], because a wide range of objects can be + /// used. /// /// # Examples /// - /// Create an `AioCb` from a `BytesMut` and use it for reading. In this - /// example the `AioCb` is stack-allocated, so we could've used - /// `from_mut_slice` instead. + /// Create an `AioCb` from a Vector and use it for reading /// /// ``` - /// # extern crate bytes; /// # extern crate tempfile; /// # extern crate nix; /// # use nix::errno::Errno; /// # use nix::Error; - /// # use bytes::BytesMut; /// # use nix::sys::aio::*; /// # use nix::sys::signal::SigevNotify; /// # use std::{thread, time}; @@ -425,10 +492,10 @@ impl<'a> AioCb<'a> { /// # fn main() { /// const INITIAL: &[u8] = b"abcdef123456"; /// const LEN: usize = 4; - /// let rbuf = BytesMut::from(vec![0; LEN]); + /// let rbuf = Box::new(vec![0; LEN]); /// let mut f = tempfile().unwrap(); /// f.write_all(INITIAL).unwrap(); - /// let mut aiocb = AioCb::from_bytes_mut( f.as_raw_fd(), + /// let mut aiocb = AioCb::from_boxed_mut_slice( f.as_raw_fd(), /// 2, //offset /// rbuf, /// 0, //priority @@ -439,33 +506,33 @@ impl<'a> AioCb<'a> { /// thread::sleep(time::Duration::from_millis(10)); /// } /// assert_eq!(aiocb.aio_return().unwrap() as usize, LEN); - /// let buffer = aiocb.into_buffer(); + /// let mut buffer = aiocb.boxed_mut_slice().unwrap(); /// const EXPECT: &[u8] = b"cdef"; - /// assert_eq!(buffer.bytes_mut().unwrap(), EXPECT); + /// assert_eq!(buffer.borrow_mut(), EXPECT); /// # } /// ``` - pub fn from_bytes_mut(fd: RawFd, offs: off_t, buf: BytesMut, - prio: libc::c_int, sigev_notify: SigevNotify, - opcode: LioOpcode) -> AioCb<'a> { - let mut buf2 = if buf.len() < 64 { - // Reallocate to force out-of-line allocation - let mut ool = BytesMut::with_capacity(64); - ool.extend_from_slice(buf.deref()); - ool - } else { - buf - }; + /// + /// [`from_boxed_slice`]: #method.from_boxed_slice + /// [`from_mut_slice`]: #method.from_mut_slice + pub fn from_boxed_mut_slice(fd: RawFd, offs: off_t, + mut buf: Box>, + prio: libc::c_int, sigev_notify: SigevNotify, + opcode: LioOpcode) -> AioCb<'a> { let mut a = AioCb::common_init(fd, prio, sigev_notify); + { + let borrowed : &mut BorrowMut<[u8]> = buf.borrow_mut(); + let slice : &mut [u8] = borrowed.borrow_mut(); + a.aio_nbytes = slice.len() as size_t; + a.aio_buf = slice.as_mut_ptr() as *mut c_void; + } a.aio_offset = offs; - a.aio_nbytes = buf2.len() as size_t; - a.aio_buf = buf2.as_mut_ptr() as *mut c_void; a.aio_lio_opcode = opcode as libc::c_int; AioCb { aiocb: a, mutable: true, in_progress: false, - buffer: Buffer::BytesMut(buf2), + buffer: Buffer::BoxedMutSlice(buf), } } @@ -475,7 +542,7 @@ impl<'a> AioCb<'a> { /// placement on the heap. It may be used for both reads and writes. Due /// to its unsafety, this method is not recommended. It is most useful when /// heap allocation is required but for some reason the data cannot be - /// converted to a `BytesMut`. + /// wrapped in a `struct` that implements `BorrowMut<[u8]>` /// /// # Parameters /// @@ -519,7 +586,8 @@ impl<'a> AioCb<'a> { /// Unlike `from_slice`, this method returns a structure suitable for /// placement on the heap. Due to its unsafety, this method is not /// recommended. It is most useful when heap allocation is required but for - /// some reason the data cannot be converted to a `Bytes`. + /// some reason the data cannot be wrapped in a `struct` that implements + /// `Borrow<[u8]>` /// /// # Parameters /// @@ -624,19 +692,6 @@ impl<'a> AioCb<'a> { } } - /// Consumes the `aiocb` and returns its inner `Buffer`, if any. - /// - /// This method is especially useful when reading into a `BytesMut`, because - /// that type does not support shared ownership. - pub fn into_buffer(mut self) -> Buffer<'static> { - let buf = self.buffer(); - match buf { - Buffer::BytesMut(x) => Buffer::BytesMut(x), - Buffer::Bytes(x) => Buffer::Bytes(x), - _ => Buffer::None - } - } - fn common_init(fd: RawFd, prio: libc::c_int, sigev_notify: SigevNotify) -> libc::aiocb { // Use mem::zeroed instead of explicitly zeroing each field, because the @@ -670,12 +725,10 @@ impl<'a> AioCb<'a> { /// result. /// /// ``` - /// # extern crate bytes; /// # extern crate tempfile; /// # extern crate nix; /// # use nix::errno::Errno; /// # use nix::Error; - /// # use bytes::Bytes; /// # use nix::sys::aio::*; /// # use nix::sys::signal::SigevNotify; /// # use std::{thread, time}; @@ -683,11 +736,11 @@ impl<'a> AioCb<'a> { /// # use std::os::unix::io::AsRawFd; /// # use tempfile::tempfile; /// # fn main() { - /// let wbuf = Bytes::from(&b"CDEF"[..]); + /// let wbuf = b"CDEF"; /// let mut f = tempfile().unwrap(); - /// let mut aiocb = AioCb::from_bytes( f.as_raw_fd(), + /// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), /// 2, //offset - /// wbuf.clone(), + /// &wbuf[..], /// 0, //priority /// SigevNotify::SigevNone, /// LioOpcode::LIO_NOP); @@ -871,12 +924,10 @@ impl<'a> AioCb<'a> { /// descriptor. /// /// ``` -/// # extern crate bytes; /// # extern crate tempfile; /// # extern crate nix; /// # use nix::errno::Errno; /// # use nix::Error; -/// # use bytes::Bytes; /// # use nix::sys::aio::*; /// # use nix::sys::signal::SigevNotify; /// # use std::{thread, time}; @@ -884,11 +935,11 @@ impl<'a> AioCb<'a> { /// # use std::os::unix::io::AsRawFd; /// # use tempfile::tempfile; /// # fn main() { -/// let wbuf = Bytes::from(&b"CDEF"[..]); +/// let wbuf = b"CDEF"; /// let mut f = tempfile().unwrap(); -/// let mut aiocb = AioCb::from_bytes( f.as_raw_fd(), +/// let mut aiocb = AioCb::from_slice( f.as_raw_fd(), /// 2, //offset -/// wbuf.clone(), +/// &wbuf[..], /// 0, //priority /// SigevNotify::SigevNone, /// LioOpcode::LIO_NOP); diff --git a/test/sys/test_aio.rs b/test/sys/test_aio.rs index f11c7ef939..0f7d71e5c1 100644 --- a/test/sys/test_aio.rs +++ b/test/sys/test_aio.rs @@ -323,20 +323,21 @@ fn test_write() { assert!(rbuf == EXPECT); } -// Tests `AioCb::from_bytes` +// Tests `AioCb::from_boxed_slice` with `Bytes` #[test] #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] fn test_write_bytes() { const INITIAL: &[u8] = b"abcdef123456"; - let wbuf = Bytes::from(&b"CDEF"[..]); + let wbuf = Box::new(Bytes::from(&b"CDEF"[..])); let mut rbuf = Vec::new(); const EXPECT: &[u8] = b"abCDEF123456"; + let expected_len = wbuf.len(); let mut f = tempfile().unwrap(); f.write_all(INITIAL).unwrap(); - let mut aiocb = AioCb::from_bytes( f.as_raw_fd(), + let mut aiocb = AioCb::from_boxed_slice( f.as_raw_fd(), 2, //offset - wbuf.clone(), + wbuf, 0, //priority SigevNotify::SigevNone, LioOpcode::LIO_NOP); @@ -344,7 +345,7 @@ fn test_write_bytes() { let err = poll_aio(&mut aiocb); assert!(err == Ok(())); - assert!(aiocb.aio_return().unwrap() as usize == wbuf.len()); + assert!(aiocb.aio_return().unwrap() as usize == expected_len); f.seek(SeekFrom::Start(0)).unwrap(); let len = f.read_to_end(&mut rbuf).unwrap(); @@ -352,46 +353,17 @@ fn test_write_bytes() { assert!(rbuf == EXPECT); } -// Tests `AioCb::from_bytes_mut` -#[test] -#[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] -fn test_read_bytes_mut_big() { - const INITIAL: &[u8] = b"abcdefgh12345678abcdefgh12345678abcdefgh12345678abcdefgh12345678abcdefgh12345678"; - // rbuf needs to be larger than 32 bytes (64 on 32-bit systems) so - // BytesMut::clone is implemented by reference. - let rbuf = BytesMut::from(vec![0; 70]); - const EXPECT: &[u8] = b"cdefgh12345678abcdefgh12345678abcdefgh12345678abcdefgh12345678abcdefgh"; - let mut f = tempfile().unwrap(); - f.write_all(INITIAL).unwrap(); - - let mut aiocb = AioCb::from_bytes_mut( f.as_raw_fd(), - 2, //offset - rbuf, - 0, //priority - SigevNotify::SigevNone, - LioOpcode::LIO_NOP); - aiocb.read().unwrap(); - - let err = poll_aio(&mut aiocb); - assert_eq!(err, Ok(())); - assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len()); - let buffer = aiocb.into_buffer(); - assert_eq!(buffer.bytes_mut().unwrap(), EXPECT); -} - -// Tests reallocation in `AioCb::from_bytes_mut` +// Tests `AioCb::from_boxed_mut_slice` with `BytesMut` #[test] #[cfg_attr(all(target_env = "musl", target_arch = "x86_64"), ignore)] fn test_read_bytes_mut_small() { const INITIAL: &[u8] = b"abcdef"; - // rbuf needs to be no more than 32 bytes (64 on 32-bit systems) so - // BytesMut::clone is implemented inline. - let rbuf = BytesMut::from(vec![0; 4]); + let rbuf = Box::new(BytesMut::from(vec![0; 4])); const EXPECT: &[u8] = b"cdef"; let mut f = tempfile().unwrap(); f.write_all(INITIAL).unwrap(); - let mut aiocb = AioCb::from_bytes_mut( f.as_raw_fd(), + let mut aiocb = AioCb::from_boxed_mut_slice( f.as_raw_fd(), 2, //offset rbuf, 0, //priority @@ -402,8 +374,8 @@ fn test_read_bytes_mut_small() { let err = poll_aio(&mut aiocb); assert_eq!(err, Ok(())); assert_eq!(aiocb.aio_return().unwrap() as usize, EXPECT.len()); - let buffer = aiocb.into_buffer(); - assert_eq!(buffer.bytes_mut().unwrap(), EXPECT); + let buffer = aiocb.boxed_mut_slice().unwrap(); + assert_eq!(buffer.borrow(), EXPECT); } // Tests `AioCb::from_ptr` From 4b769a42d5b55367e5908058e8ca59c3c474cacf Mon Sep 17 00:00:00 2001 From: Alan Somers Date: Thu, 22 Mar 2018 09:44:54 -0600 Subject: [PATCH 4/4] Add LioCb::listio_resubmit It helps deal with errors like EAGAIN, which can result in a subset of an LioCb's operations being queued. The test is only enabled on FreeBSD, because it requires intimate knowledge of AIO system limits. --- Cargo.toml | 7 ++ src/sys/aio.rs | 160 ++++++++++++++++++++++++--- test/sys/test_aio.rs | 4 +- test/sys/test_lio_listio_resubmit.rs | 111 +++++++++++++++++++ 4 files changed, 267 insertions(+), 15 deletions(-) create mode 100644 test/sys/test_lio_listio_resubmit.rs diff --git a/Cargo.toml b/Cargo.toml index 41471de465..49a18d9cbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,6 +29,9 @@ rand = "0.4" tempdir = "0.3" tempfile = "2" +[target.'cfg(target_os = "freebsd")'.dev-dependencies] +sysctl = "0.1" + [[test]] name = "test" path = "test/test.rs" @@ -37,6 +40,10 @@ path = "test/test.rs" name = "test-aio-drop" path = "test/sys/test_aio_drop.rs" +[[test]] +name = "test-lio-listio-resubmit" +path = "test/sys/test_lio_listio_resubmit.rs" + [[test]] name = "test-mount" path = "test/test_mount.rs" diff --git a/src/sys/aio.rs b/src/sys/aio.rs index 8a958c8411..3d53982134 100644 --- a/src/sys/aio.rs +++ b/src/sys/aio.rs @@ -1,3 +1,4 @@ +// vim: tw=80 //! POSIX Asynchronous I/O //! //! The POSIX AIO interface is used for asynchronous I/O on files and disk-like @@ -31,8 +32,8 @@ use std::fmt::Debug; use std::marker::PhantomData; use std::mem; use std::ptr::{null, null_mut}; -use std::thread; use sys::signal::*; +use std::thread; use sys::time::TimeSpec; libc_enum! { @@ -111,15 +112,15 @@ impl<'a> Debug for Buffer<'a> { // not today. // https://github.com/rust-lang/rust/issues/1563 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { - match self { - &Buffer::None => write!(fmt, "None"), - &Buffer::Phantom(p) => p.fmt(fmt), - &Buffer::BoxedSlice(ref bs) => { + match *self { + Buffer::None => write!(fmt, "None"), + Buffer::Phantom(p) => p.fmt(fmt), + Buffer::BoxedSlice(ref bs) => { let borrowed : &Borrow<[u8]> = bs.borrow(); write!(fmt, "BoxedSlice({:?})", borrowed as *const Borrow<[u8]>) }, - &Buffer::BoxedMutSlice(ref bms) => { + Buffer::BoxedMutSlice(ref bms) => { let borrowed : &BorrowMut<[u8]> = bms.borrow(); write!(fmt, "BoxedMutSlice({:?})", borrowed as *const BorrowMut<[u8]>) @@ -265,7 +266,7 @@ impl<'a> AioCb<'a> { /// operations, but only if the borrow checker can guarantee that the slice /// will outlive the `AioCb`. That will usually be the case if the `AioCb` /// is stack-allocated. If the borrow checker gives you trouble, try using - /// [`from_bytes_mut`](#method.from_bytes_mut) instead. + /// [`from_boxed_mut_slice`](#method.from_boxed_mut_slice) instead. /// /// # Parameters /// @@ -1059,7 +1060,11 @@ pub struct LioCb<'a> { /// It must live for as long as any of the operations are still being /// processesed, because the aio subsystem uses its address as a unique /// identifier. - list: Vec<*mut libc::aiocb> + list: Vec<*mut libc::aiocb>, + + /// A partial set of results. This field will get populated by + /// `listio_resubmit` when an `LioCb` is resubmitted after an error + results: Vec>> } #[cfg(not(any(target_os = "ios", target_os = "macos")))] @@ -1068,7 +1073,8 @@ impl<'a> LioCb<'a> { pub fn with_capacity(capacity: usize) -> LioCb<'a> { LioCb { aiocbs: Vec::with_capacity(capacity), - list: Vec::with_capacity(capacity) + list: Vec::with_capacity(capacity), + results: Vec::with_capacity(capacity) } } @@ -1087,8 +1093,8 @@ impl<'a> LioCb<'a> { /// # Examples /// /// Use `listio` to submit an aio operation and wait for its completion. In - /// this case, there is no need to use `aio_suspend` to wait or - /// `AioCb#error` to poll. + /// this case, there is no need to use [`aio_suspend`] to wait or + /// [`AioCb::error`] to poll. /// /// ``` /// # extern crate tempfile; @@ -1109,19 +1115,23 @@ impl<'a> LioCb<'a> { /// LioOpcode::LIO_WRITE)); /// liocb.listio(LioMode::LIO_WAIT, /// SigevNotify::SigevNone).unwrap(); - /// assert_eq!(liocb.aiocbs[0].aio_return().unwrap() as usize, WBUF.len()); + /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); /// # } /// ``` /// /// # References /// /// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) + /// + /// [`aio_suspend`]: fn.aio_suspend.html + /// [`AioCb::error`]: struct.AioCb.html#method.error pub fn listio(&mut self, mode: LioMode, sigev_notify: SigevNotify) -> Result<()> { let sigev = SigEvent::new(sigev_notify); let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; self.list.clear(); - for a in self.aiocbs.iter_mut() { + for a in &mut self.aiocbs { + a.in_progress = true; self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb); } @@ -1130,6 +1140,129 @@ impl<'a> LioCb<'a> { libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) }).map(|_| ()) } + + /// Resubmits any incomplete operations with [`lio_listio`]. + /// + /// Sometimes, due to system resource limitations, an `lio_listio` call will + /// return `EIO`, or `EAGAIN`. Or, if a signal is received, it may return + /// `EINTR`. In any of these cases, only a subset of its constituent + /// operations will actually have been initiated. `listio_resubmit` will + /// resubmit any operations that are still uninitiated. + /// + /// After calling `listio_resubmit`, results should be collected by + /// [`LioCb::aio_return`]. + /// + /// # Examples + /// ```no_run + /// # extern crate tempfile; + /// # extern crate nix; + /// # use nix::Error; + /// # use nix::errno::Errno; + /// # use nix::sys::aio::*; + /// # use nix::sys::signal::SigevNotify; + /// # use std::os::unix::io::AsRawFd; + /// # use std::{thread, time}; + /// # use tempfile::tempfile; + /// # fn main() { + /// const WBUF: &[u8] = b"abcdef123456"; + /// let mut f = tempfile().unwrap(); + /// let mut liocb = LioCb::with_capacity(1); + /// liocb.aiocbs.push(AioCb::from_slice( f.as_raw_fd(), + /// 2, //offset + /// WBUF, + /// 0, //priority + /// SigevNotify::SigevNone, + /// LioOpcode::LIO_WRITE)); + /// let mut err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone); + /// while err == Err(Error::Sys(Errno::EIO)) || + /// err == Err(Error::Sys(Errno::EAGAIN)) { + /// thread::sleep(time::Duration::from_millis(10)); + /// err = liocb.listio_resubmit(LioMode::LIO_WAIT, SigevNotify::SigevNone); + /// } + /// assert_eq!(liocb.aio_return(0).unwrap() as usize, WBUF.len()); + /// # } + /// ``` + /// + /// # References + /// + /// [`lio_listio`](http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html) + /// + /// [`lio_listio`]: http://pubs.opengroup.org/onlinepubs/9699919799/functions/lio_listio.html + /// [`LioCb::aio_return`]: struct.LioCb.html#method.aio_return + // Note: the addresses of any EINPROGRESS or EOK aiocbs _must_ not be + // changed by this method, because the kernel relies on their addresses + // being stable. + // Note: aiocbs that are Ok(()) must be finalized by aio_return, or else the + // sigev_notify will immediately refire. + pub fn listio_resubmit(&mut self, mode:LioMode, + sigev_notify: SigevNotify) -> Result<()> { + let sigev = SigEvent::new(sigev_notify); + let sigevp = &mut sigev.sigevent() as *mut libc::sigevent; + self.list.clear(); + + while self.results.len() < self.aiocbs.len() { + self.results.push(None); + } + + for (i, a) in self.aiocbs.iter_mut().enumerate() { + if self.results[i].is_some() { + // Already collected final status for this operation + continue; + } + match a.error() { + Ok(()) => { + // aiocb is complete; collect its status and don't resubmit + self.results[i] = Some(a.aio_return()); + }, + Err(Error::Sys(Errno::EAGAIN)) => { + self.list.push(a as *mut AioCb<'a> as *mut libc::aiocb); + }, + Err(Error::Sys(Errno::EINPROGRESS)) => { + // aiocb is was successfully queued; no need to do anything + () + }, + Err(Error::Sys(Errno::EINVAL)) => panic!( + "AioCb was never submitted, or already finalized"), + _ => unreachable!() + } + } + let p = self.list.as_ptr(); + Errno::result(unsafe { + libc::lio_listio(mode as i32, p, self.list.len() as i32, sigevp) + }).map(|_| ()) + } + + /// Collect final status for an individual `AioCb` submitted as part of an + /// `LioCb`. + /// + /// This is just like [`AioCb::aio_return`], except it takes into account + /// operations that were restarted by [`LioCb::listio_resubmit`] + /// + /// [`AioCb::aio_return`]: struct.AioCb.html#method.aio_return + /// [`LioCb::listio_resubmit`]: #method.listio_resubmit + pub fn aio_return(&mut self, i: usize) -> Result { + if i >= self.results.len() || self.results[i].is_none() { + self.aiocbs[i].aio_return() + } else { + self.results[i].unwrap() + } + } + + /// Retrieve error status of an individual `AioCb` submitted as part of an + /// `LioCb`. + /// + /// This is just like [`AioCb::error`], except it takes into account + /// operations that were restarted by [`LioCb::listio_resubmit`] + /// + /// [`AioCb::error`]: struct.AioCb.html#method.error + /// [`LioCb::listio_resubmit`]: #method.listio_resubmit + pub fn error(&mut self, i: usize) -> Result<()> { + if i >= self.results.len() || self.results[i].is_none() { + self.aiocbs[i].error() + } else { + Ok(()) + } + } } #[cfg(not(any(target_os = "ios", target_os = "macos")))] @@ -1146,6 +1279,7 @@ impl<'a> From>> for LioCb<'a> { fn from(src: Vec>) -> LioCb<'a> { LioCb { list: Vec::with_capacity(src.capacity()), + results: Vec::with_capacity(src.capacity()), aiocbs: src, } } diff --git a/test/sys/test_aio.rs b/test/sys/test_aio.rs index 0f7d71e5c1..48399fbdfa 100644 --- a/test/sys/test_aio.rs +++ b/test/sys/test_aio.rs @@ -513,8 +513,8 @@ fn test_liocb_listio_wait() { let err = liocb.listio(LioMode::LIO_WAIT, SigevNotify::SigevNone); err.expect("lio_listio"); - assert!(liocb.aiocbs[0].aio_return().unwrap() as usize == WBUF.len()); - assert!(liocb.aiocbs[1].aio_return().unwrap() as usize == rlen); + assert!(liocb.aio_return(0).unwrap() as usize == WBUF.len()); + assert!(liocb.aio_return(1).unwrap() as usize == rlen); } assert!(rbuf.deref().deref() == b"3456"); diff --git a/test/sys/test_lio_listio_resubmit.rs b/test/sys/test_lio_listio_resubmit.rs new file mode 100644 index 0000000000..19ee3facf8 --- /dev/null +++ b/test/sys/test_lio_listio_resubmit.rs @@ -0,0 +1,111 @@ +// vim: tw=80 + +// Annoyingly, Cargo is unable to conditionally build an entire test binary. So +// we must disable the test here rather than in Cargo.toml +#![cfg(target_os = "freebsd")] + +extern crate nix; +extern crate sysctl; +extern crate tempfile; + +use nix::Error; +use nix::errno::*; +use nix::libc::off_t; +use nix::sys::aio::*; +use nix::sys::signal::SigevNotify; +use nix::unistd::{SysconfVar, sysconf}; +use std::os::unix::io::AsRawFd; +use std::{thread, time}; +use sysctl::CtlValue; +use tempfile::tempfile; + +const BYTES_PER_OP: usize = 512; + +/// Attempt to collect final status for all of `liocb`'s operations, freeing +/// system resources +fn finish_liocb(liocb: &mut LioCb) { + for j in 0..liocb.aiocbs.len() { + loop { + let e = liocb.error(j); + match e { + Ok(()) => break, + Err(Error::Sys(Errno::EINPROGRESS)) => + thread::sleep(time::Duration::from_millis(10)), + Err(x) => panic!("aio_error({:?})", x) + } + } + assert_eq!(liocb.aio_return(j).unwrap(), BYTES_PER_OP as isize); + } +} + +// Deliberately exceed system resource limits, causing lio_listio to return EIO. +// This test must run in its own process since it deliberately uses all AIO +// resources. ATM it is only enabled on FreeBSD, because I don't know how to +// check system AIO limits on other operating systems. +#[test] +fn test_lio_listio_resubmit() { + let mut resubmit_count = 0; + + // Lookup system resource limits + let alm = sysconf(SysconfVar::AIO_LISTIO_MAX) + .expect("sysconf").unwrap() as usize; + let maqpp = if let CtlValue::Int(x) = sysctl::value( + "vfs.aio.max_aio_queue_per_proc").unwrap(){ + x as usize + } else { + panic!("unknown sysctl"); + }; + + // Find lio_listio sizes that satisfy the AIO_LISTIO_MAX constraint and also + // result in a final lio_listio call that can only partially be queued + let target_ops = maqpp + alm / 2; + let num_listios = (target_ops + alm - 3) / (alm - 2); + let ops_per_listio = (target_ops + num_listios - 1) / num_listios; + assert!((num_listios - 1) * ops_per_listio < maqpp, + "the last lio_listio won't make any progress; fix the algorithm"); + println!("Using {:?} LioCbs of {:?} operations apiece", num_listios, + ops_per_listio); + + let f = tempfile().unwrap(); + let buffer_set = (0..num_listios).map(|_| { + (0..ops_per_listio).map(|_| { + vec![0u8; BYTES_PER_OP] + }).collect::>() + }).collect::>(); + + let mut liocbs = (0..num_listios).map(|i| { + let mut liocb = LioCb::with_capacity(ops_per_listio); + for j in 0..ops_per_listio { + let offset = (BYTES_PER_OP * (i * ops_per_listio + j)) as off_t; + let wcb = AioCb::from_slice( f.as_raw_fd(), + offset, + &buffer_set[i][j][..], + 0, //priority + SigevNotify::SigevNone, + LioOpcode::LIO_WRITE); + liocb.aiocbs.push(wcb); + } + let mut err = liocb.listio(LioMode::LIO_NOWAIT, SigevNotify::SigevNone); + while err == Err(Error::Sys(Errno::EIO)) || + err == Err(Error::Sys(Errno::EAGAIN)) || + err == Err(Error::Sys(Errno::EINTR)) { + // + thread::sleep(time::Duration::from_millis(10)); + resubmit_count += 1; + err = liocb.listio_resubmit(LioMode::LIO_NOWAIT, + SigevNotify::SigevNone); + } + liocb + }).collect::>(); + + // Ensure that every AioCb completed + for liocb in liocbs.iter_mut() { + finish_liocb(liocb); + } + + if resubmit_count > 0 { + println!("Resubmitted {:?} times, test passed", resubmit_count); + } else { + println!("Never resubmitted. Test ambiguous"); + } +}