diff --git a/Cargo.toml b/Cargo.toml index d72851b..b139e4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,11 +18,11 @@ exclude = ["/.*"] async-channel = "1.4.0" async-lock = "2.6.0" async-task = "4.0.2" -atomic-waker = "1.0.0" fastrand = "1.3.4" futures-io = { version = "0.3.28", default-features = false, features = ["std"] } futures-lite = { version = "1.11.0", default-features = false } log = "0.4.17" +piper = "0.2.0" [dev-dependencies] futures-lite = "1.11.0" diff --git a/src/lib.rs b/src/lib.rs index 15737ec..07de679 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -76,19 +76,18 @@ //! ``` #![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)] +#![forbid(unsafe_code)] use std::any::Any; use std::collections::VecDeque; use std::env; use std::fmt; use std::io::{self, Read, Seek, SeekFrom, Write}; -use std::mem; use std::num::NonZeroUsize; use std::panic; use std::pin::Pin; -use std::slice; -use std::sync::atomic::{self, AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, Condvar, Mutex, MutexGuard}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Condvar, Mutex, MutexGuard}; use std::task::{Context, Poll}; use std::thread; use std::time::Duration; @@ -96,9 +95,9 @@ use std::time::Duration; use async_channel::{bounded, Receiver}; use async_lock::OnceCell; use async_task::Runnable; -use atomic_waker::AtomicWaker; use futures_io::{AsyncRead, AsyncSeek, AsyncWrite}; use futures_lite::{future, prelude::*, ready}; +use piper::{pipe, Reader, Writer}; #[doc(no_inline)] pub use async_task::Task; @@ -752,7 +751,7 @@ impl AsyncRead for Unblock { // Copy bytes from the I/O handle into the pipe until the pipe is closed or // an error occurs. loop { - match future::poll_fn(|cx| writer.fill(cx, &mut io)).await { + match future::poll_fn(|cx| writer.poll_fill(cx, &mut io)).await { Ok(0) => return (Ok(()), io), Ok(_) => {} Err(err) => return (Err(err), io), @@ -767,7 +766,7 @@ impl AsyncRead for Unblock { // If reading, read bytes from the pipe. State::Reading(Some(reader), task) => { // Poll the pipe. - let n = ready!(reader.drain(cx, buf))?; + let n = ready!(reader.poll_drain(cx, buf))?; // If the pipe is closed, retrieve the I/O handle back from the blocking task. // This is not really a required step, but it's cleaner to drop the handle on @@ -820,7 +819,7 @@ impl AsyncWrite for Unblock { // Copy bytes from the pipe into the I/O handle until the pipe is closed or an // error occurs. Flush the I/O handle at the end. loop { - match future::poll_fn(|cx| reader.drain(cx, &mut io)).await { + match future::poll_fn(|cx| reader.poll_drain(cx, &mut io)).await { Ok(0) => return (io.flush(), io), Ok(_) => {} Err(err) => { @@ -836,7 +835,7 @@ impl AsyncWrite for Unblock { } // If writing, write more bytes into the pipe. - State::Writing(Some(writer), _) => return writer.fill(cx, buf), + State::Writing(Some(writer), _) => return writer.poll_fill(cx, buf), } } } @@ -915,365 +914,6 @@ impl AsyncSeek for Unblock { } } -/// Creates a bounded single-producer single-consumer pipe. -/// -/// A pipe is a ring buffer of `cap` bytes that can be asynchronously read from and written to. -/// -/// When the sender is dropped, remaining bytes in the pipe can still be read. After that, attempts -/// to read will result in `Ok(0)`, i.e. they will always 'successfully' read 0 bytes. -/// -/// When the receiver is dropped, the pipe is closed and no more bytes and be written into it. -/// Further writes will result in `Ok(0)`, i.e. they will always 'successfully' write 0 bytes. -fn pipe(cap: usize) -> (Reader, Writer) { - assert!(cap > 0, "capacity must be positive"); - assert!(cap.checked_mul(2).is_some(), "capacity is too large"); - - // Allocate the ring buffer. - let mut v = Vec::with_capacity(cap); - let buffer = v.as_mut_ptr(); - mem::forget(v); - - let inner = Arc::new(Pipe { - head: AtomicUsize::new(0), - tail: AtomicUsize::new(0), - reader: AtomicWaker::new(), - writer: AtomicWaker::new(), - closed: AtomicBool::new(false), - buffer, - cap, - }); - - let r = Reader { - inner: inner.clone(), - head: 0, - tail: 0, - }; - - let w = Writer { - inner, - head: 0, - tail: 0, - zeroed_until: 0, - }; - - (r, w) -} - -/// The reading side of a pipe. -struct Reader { - /// The inner ring buffer. - inner: Arc, - - /// The head index, moved by the reader, in the range `0..2*cap`. - /// - /// This index always matches `inner.head`. - head: usize, - - /// The tail index, moved by the writer, in the range `0..2*cap`. - /// - /// This index is a snapshot of `index.tail` that might become stale at any point. - tail: usize, -} - -/// The writing side of a pipe. -struct Writer { - /// The inner ring buffer. - inner: Arc, - - /// The head index, moved by the reader, in the range `0..2*cap`. - /// - /// This index is a snapshot of `index.head` that might become stale at any point. - head: usize, - - /// The tail index, moved by the writer, in the range `0..2*cap`. - /// - /// This index always matches `inner.tail`. - tail: usize, - - /// How many bytes at the beginning of the buffer have been zeroed. - /// - /// The pipe allocates an uninitialized buffer, and we must be careful about passing - /// uninitialized data to user code. Zeroing the buffer right after allocation would be too - /// expensive, so we zero it in smaller chunks as the writer makes progress. - zeroed_until: usize, -} - -unsafe impl Send for Reader {} -unsafe impl Send for Writer {} - -/// The inner ring buffer. -/// -/// Head and tail indices are in the range `0..2*cap`, even though they really map onto the -/// `0..cap` range. The distance between head and tail indices is never more than `cap`. -/// -/// The reason why indices are not in the range `0..cap` is because we need to distinguish between -/// the pipe being empty and being full. If head and tail were in `0..cap`, then `head == tail` -/// could mean the pipe is either empty or full, but we don't know which! -struct Pipe { - /// The head index, moved by the reader, in the range `0..2*cap`. - head: AtomicUsize, - - /// The tail index, moved by the writer, in the range `0..2*cap`. - tail: AtomicUsize, - - /// A waker representing the blocked reader. - reader: AtomicWaker, - - /// A waker representing the blocked writer. - writer: AtomicWaker, - - /// Set to `true` if the reader or writer was dropped. - closed: AtomicBool, - - /// The byte buffer. - buffer: *mut u8, - - /// The buffer capacity. - cap: usize, -} - -unsafe impl Sync for Pipe {} -unsafe impl Send for Pipe {} - -impl Drop for Pipe { - fn drop(&mut self) { - // Deallocate the byte buffer. - unsafe { - Vec::from_raw_parts(self.buffer, 0, self.cap); - } - } -} - -impl Drop for Reader { - fn drop(&mut self) { - // Dropping closes the pipe and then wakes the writer. - self.inner.closed.store(true, Ordering::SeqCst); - self.inner.writer.wake(); - } -} - -impl Drop for Writer { - fn drop(&mut self) { - // Dropping closes the pipe and then wakes the reader. - self.inner.closed.store(true, Ordering::SeqCst); - self.inner.reader.wake(); - } -} - -impl Reader { - /// Reads bytes from this reader and writes into blocking `dest`. - fn drain(&mut self, cx: &mut Context<'_>, mut dest: impl Write) -> Poll> { - let cap = self.inner.cap; - - // Calculates the distance between two indices. - let distance = |a: usize, b: usize| { - if a <= b { - b - a - } else { - 2 * cap - (a - b) - } - }; - - // If the pipe appears to be empty... - if distance(self.head, self.tail) == 0 { - // Reload the tail in case it's become stale. - self.tail = self.inner.tail.load(Ordering::Acquire); - - // If the pipe is now really empty... - if distance(self.head, self.tail) == 0 { - // Register the waker. - self.inner.reader.register(cx.waker()); - atomic::fence(Ordering::SeqCst); - - // Reload the tail after registering the waker. - self.tail = self.inner.tail.load(Ordering::Acquire); - - // If the pipe is still empty... - if distance(self.head, self.tail) == 0 { - // Check whether the pipe is closed or just empty. - if self.inner.closed.load(Ordering::Relaxed) { - return Poll::Ready(Ok(0)); - } else { - return Poll::Pending; - } - } - } - } - - // The pipe is not empty so remove the waker. - self.inner.reader.take(); - - // Yield with some small probability - this improves fairness. - ready!(maybe_yield(cx)); - - // Given an index in `0..2*cap`, returns the real index in `0..cap`. - let real_index = |i: usize| { - if i < cap { - i - } else { - i - cap - } - }; - - // Number of bytes read so far. - let mut count = 0; - - loop { - // Calculate how many bytes to read in this iteration. - let n = (128 * 1024) // Not too many bytes in one go - better to wake the writer soon! - .min(distance(self.head, self.tail)) // No more than bytes in the pipe. - .min(cap - real_index(self.head)); // Don't go past the buffer boundary. - - // Create a slice of data in the pipe buffer. - let pipe_slice = - unsafe { slice::from_raw_parts(self.inner.buffer.add(real_index(self.head)), n) }; - - // Copy bytes from the pipe buffer into `dest`. - let n = dest.write(pipe_slice)?; - count += n; - - // If pipe is empty or `dest` is full, return. - if n == 0 { - return Poll::Ready(Ok(count)); - } - - // Move the head forward. - if self.head + n < 2 * cap { - self.head += n; - } else { - self.head = 0; - } - - // Store the current head index. - self.inner.head.store(self.head, Ordering::Release); - - // Wake the writer because the pipe is not full. - self.inner.writer.wake(); - } - } -} - -impl Writer { - /// Reads bytes from blocking `src` and writes into this writer. - fn fill(&mut self, cx: &mut Context<'_>, mut src: impl Read) -> Poll> { - // Just a quick check if the pipe is closed, which is why a relaxed load is okay. - if self.inner.closed.load(Ordering::Relaxed) { - return Poll::Ready(Ok(0)); - } - - // Calculates the distance between two indices. - let cap = self.inner.cap; - let distance = |a: usize, b: usize| { - if a <= b { - b - a - } else { - 2 * cap - (a - b) - } - }; - - // If the pipe appears to be full... - if distance(self.head, self.tail) == cap { - // Reload the head in case it's become stale. - self.head = self.inner.head.load(Ordering::Acquire); - - // If the pipe is now really empty... - if distance(self.head, self.tail) == cap { - // Register the waker. - self.inner.writer.register(cx.waker()); - atomic::fence(Ordering::SeqCst); - - // Reload the head after registering the waker. - self.head = self.inner.head.load(Ordering::Acquire); - - // If the pipe is still full... - if distance(self.head, self.tail) == cap { - // Check whether the pipe is closed or just full. - if self.inner.closed.load(Ordering::Relaxed) { - return Poll::Ready(Ok(0)); - } else { - return Poll::Pending; - } - } - } - } - - // The pipe is not full so remove the waker. - self.inner.writer.take(); - - // Yield with some small probability - this improves fairness. - ready!(maybe_yield(cx)); - - // Given an index in `0..2*cap`, returns the real index in `0..cap`. - let real_index = |i: usize| { - if i < cap { - i - } else { - i - cap - } - }; - - // Number of bytes written so far. - let mut count = 0; - - loop { - // Calculate how many bytes to write in this iteration. - let n = (128 * 1024) // Not too many bytes in one go - better to wake the reader soon! - .min(self.zeroed_until * 2 + 4096) // Don't zero too many bytes when starting. - .min(cap - distance(self.head, self.tail)) // No more than space in the pipe. - .min(cap - real_index(self.tail)); // Don't go past the buffer boundary. - - // Create a slice of available space in the pipe buffer. - let pipe_slice_mut = unsafe { - let from = real_index(self.tail); - let to = from + n; - - // Make sure all bytes in the slice are initialized. - if self.zeroed_until < to { - self.inner - .buffer - .add(self.zeroed_until) - .write_bytes(0u8, to - self.zeroed_until); - self.zeroed_until = to; - } - - slice::from_raw_parts_mut(self.inner.buffer.add(from), n) - }; - - // Copy bytes from `src` into the piper buffer. - let n = src.read(pipe_slice_mut)?; - count += n; - - // If the pipe is full or closed, or `src` is empty, return. - if n == 0 || self.inner.closed.load(Ordering::Relaxed) { - return Poll::Ready(Ok(count)); - } - - // Move the tail forward. - if self.tail + n < 2 * cap { - self.tail += n; - } else { - self.tail = 0; - } - - // Store the current tail index. - self.inner.tail.store(self.tail, Ordering::Release); - - // Wake the reader because the pipe is not empty. - self.inner.reader.wake(); - } - } -} - -/// Yield with some small probability. -fn maybe_yield(cx: &mut Context<'_>) -> Poll<()> { - if fastrand::usize(..100) == 0 { - cx.waker().wake_by_ref(); - Poll::Pending - } else { - Poll::Ready(()) - } -} - #[cfg(test)] mod tests { use super::*;