Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test utility that verifies an AsyncWrite is closed correctly #2159

Merged
merged 3 commits into from
Sep 5, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions futures-test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ futures-task = { version = "0.3.5", path = "../futures-task", default-features =
futures-io = { version = "0.3.5", path = "../futures-io", default-features = false }
futures-util = { version = "0.3.5", path = "../futures-util", default-features = false }
futures-executor = { version = "0.3.5", path = "../futures-executor", default-features = false }
futures-sink = { version = "0.3.5", path = "../futures-sink", default-features = false }
pin-utils = { version = "0.1.0", default-features = false }
once_cell = { version = "1.3.1", default-features = false, features = ["std"], optional = true }
pin-project = "0.4.15"
Expand Down
40 changes: 40 additions & 0 deletions futures-test/src/io/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use futures_io::AsyncWrite;

pub use super::limited::Limited;
pub use crate::interleave_pending::InterleavePending;
pub use crate::track_closed::TrackClosed;

/// Additional combinators for testing async writers.
pub trait AsyncWriteTestExt: AsyncWrite {
Expand Down Expand Up @@ -80,6 +81,45 @@ pub trait AsyncWriteTestExt: AsyncWrite {
{
Limited::new(self, limit)
}

/// Track whether this stream has been closed and errors if it is used after closing.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::io::{AsyncWriteExt, Cursor};
/// use futures_test::io::AsyncWriteTestExt;
///
/// let mut writer = Cursor::new(vec![0u8; 4]).track_closed();
///
/// writer.write_all(&[1, 2]).await?;
/// assert!(!writer.is_closed());
/// writer.close().await?;
/// assert!(writer.is_closed());
///
/// # Ok::<(), std::io::Error>(()) })?;
/// # Ok::<(), std::io::Error>(())
/// ```
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::io::{AsyncWriteExt, Cursor};
/// use futures_test::io::AsyncWriteTestExt;
///
/// let mut writer = Cursor::new(vec![0u8; 4]).track_closed();
///
/// writer.close().await?;
/// assert!(writer.write_all(&[1, 2]).await.is_err());
/// # Ok::<(), std::io::Error>(()) })?;
/// # Ok::<(), std::io::Error>(())
/// ```
fn track_closed(self) -> TrackClosed<Self>
where
Self: Sized,
{
TrackClosed::new(self)
}
}

impl<W> AsyncWriteTestExt for W where W: AsyncWrite {}
4 changes: 4 additions & 0 deletions futures-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ pub mod future;
#[cfg(feature = "std")]
pub mod stream;

#[cfg(feature = "std")]
pub mod sink;

#[cfg(feature = "std")]
pub mod io;

mod interleave_pending;
mod track_closed;
56 changes: 56 additions & 0 deletions futures-test/src/sink/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//! Additional combinators for testing sinks.

use futures_sink::Sink;

pub use crate::track_closed::TrackClosed;

/// Additional combinators for testing sinks.
pub trait SinkTestExt<Item>: Sink<Item> {
/// Track whether this sink has been closed and panics if it is used after closing.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::sink::{SinkExt, drain};
/// use futures_test::sink::SinkTestExt;
///
/// let mut sink = drain::<i32>().track_closed();
///
/// sink.send(1).await?;
/// assert!(!sink.is_closed());
/// sink.close().await?;
/// assert!(sink.is_closed());
///
/// # Ok::<(), std::convert::Infallible>(()) })?;
/// # Ok::<(), std::convert::Infallible>(())
/// ```
///
/// Note: Unlike [`AsyncWriteTestExt::track_closed`] when
/// used as a sink the adaptor will panic if closed too early as there's no easy way to
/// integrate as an error.
///
/// [`AsyncWriteTestExt::track_closed`]: crate::io::AsyncWriteTestExt::track_closed
///
/// ```
/// # futures::executor::block_on(async {
/// use std::panic::AssertUnwindSafe;
/// use futures::{sink::{SinkExt, drain}, future::FutureExt};
/// use futures_test::sink::SinkTestExt;
///
/// let mut sink = drain::<i32>().track_closed();
///
/// sink.close().await?;
/// assert!(AssertUnwindSafe(sink.send(1)).catch_unwind().await.is_err());
/// # Ok::<(), std::convert::Infallible>(()) })?;
/// # Ok::<(), std::convert::Infallible>(())
/// ```
fn track_closed(self) -> TrackClosed<Self>
where
Self: Sized,
{
TrackClosed::new(self)
}
}

impl<Item, W> SinkTestExt<Item> for W where W: Sink<Item> {}
146 changes: 146 additions & 0 deletions futures-test/src/track_closed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use futures_io::AsyncWrite;
use futures_sink::Sink;
use std::{
io::{self, IoSlice},
pin::Pin,
task::{Context, Poll},
};

/// Async wrapper that tracks whether it has been closed.
///
/// See the `track_closed` methods on:
/// * [`SinkTestExt`](crate::sink::SinkTestExt::track_closed)
/// * [`AsyncWriteTestExt`](crate::io::AsyncWriteTestExt::track_closed)
#[pin_project::pin_project]
#[derive(Debug)]
pub struct TrackClosed<T> {
#[pin]
inner: T,
closed: bool,
}

impl<T> TrackClosed<T> {
pub(crate) fn new(inner: T) -> TrackClosed<T> {
TrackClosed {
inner,
closed: false,
}
}

/// Check whether this object has been closed.
pub fn is_closed(&self) -> bool {
self.closed
}

/// Acquires a reference to the underlying object that this adaptor is
/// wrapping.
pub fn get_ref(&self) -> &T {
&self.inner
}

/// Acquires a mutable reference to the underlying object that this
/// adaptor is wrapping.
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}

/// Acquires a pinned mutable reference to the underlying object that
/// this adaptor is wrapping.
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut T> {
self.project().inner
}

/// Consumes this adaptor returning the underlying object.
pub fn into_inner(self) -> T {
self.inner
}
}

impl<T: AsyncWrite> AsyncWrite for TrackClosed<T> {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
if self.is_closed() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"Attempted to write after stream was closed",
)));
}
self.project().inner.poll_write(cx, buf)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self.is_closed() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"Attempted to flush after stream was closed",
)));
}
assert!(!self.is_closed());
self.project().inner.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
if self.is_closed() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"Attempted to close after stream was closed",
)));
}
let this = self.project();
match this.inner.poll_close(cx) {
Poll::Ready(Ok(())) => {
*this.closed = true;
Poll::Ready(Ok(()))
}
other => other,
}
}

fn poll_write_vectored(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
if self.is_closed() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::Other,
"Attempted to write after stream was closed",
)));
}
self.project().inner.poll_write_vectored(cx, bufs)
}
}

impl<Item, T: Sink<Item>> Sink<Item> for TrackClosed<T> {
type Error = T::Error;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
assert!(!self.is_closed());
self.project().inner.poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, item: Item) -> Result<(), Self::Error> {
assert!(!self.is_closed());
self.project().inner.start_send(item)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
assert!(!self.is_closed());
self.project().inner.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
assert!(!self.is_closed());
let this = self.project();
match this.inner.poll_close(cx) {
Poll::Ready(Ok(())) => {
*this.closed = true;
Poll::Ready(Ok(()))
}
other => other,
}
}
}