From d38afed525593ccc9921f6e6f08e38346e102567 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Fri, 25 Sep 2020 13:20:53 +0300 Subject: [PATCH 1/3] io: add `copy_buf` Signed-off-by: Zahari Dichev --- tokio/src/io/mod.rs | 4 +- tokio/src/io/util/copy_buf.rs | 101 ++++++++++++++++++++++++++++++++++ tokio/src/io/util/mod.rs | 3 + 3 files changed, 106 insertions(+), 2 deletions(-) create mode 100644 tokio/src/io/util/copy_buf.rs diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 7eba6d14972..ea19cf70ca8 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -236,8 +236,8 @@ cfg_io_util! { pub(crate) mod util; pub use util::{ - copy, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, - BufReader, BufStream, BufWriter, DuplexStream, Copy, Empty, Lines, Repeat, Sink, Split, Take, + copy, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, + BufReader, BufStream, BufWriter, DuplexStream, Copy, CopyBuf, Empty, Lines, Repeat, Sink, Split, Take, }; } diff --git a/tokio/src/io/util/copy_buf.rs b/tokio/src/io/util/copy_buf.rs new file mode 100644 index 00000000000..4ebfe0993ec --- /dev/null +++ b/tokio/src/io/util/copy_buf.rs @@ -0,0 +1,101 @@ +use crate::io::{AsyncBufRead, AsyncWrite}; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +cfg_io_util! { + /// A future that asynchronously copies the entire contents of a reader into a + /// writer. + /// + /// This struct is generally created by calling [`copy_buf`][copy_buf]. Please + /// see the documentation of `copy_buf()` for more details. + /// + /// [copy_buf]: copy_buf() + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct CopyBuf<'a, R: ?Sized, W: ?Sized> { + reader: &'a mut R, + writer: &'a mut W, + amt: u64, + } + + /// Asynchronously copies the entire contents of a reader into a writer. + /// + /// This function returns a future that will continuously read data from + /// `reader` and then write it into `writer` in a streaming fashion until + /// `reader` returns EOF. + /// + /// On success, the total number of bytes that were copied from `reader` to + /// `writer` is returned. + /// + /// + /// # Errors + /// + /// The returned future will finish with an error will return an error + /// immediately if any call to `poll_fill_buf` or `poll_write` returns an + /// error. + /// + /// # Examples + /// + /// ``` + /// use tokio::io; + /// + /// # async fn dox() -> std::io::Result<()> { + /// let mut reader: &[u8] = b"hello"; + /// let mut writer: Vec = vec![]; + /// + /// io::copy_buf(&mut reader, &mut writer).await?; + /// + /// assert_eq!(&b"hello"[..], &writer[..]); + /// # Ok(()) + /// # } + /// ``` + pub fn copy_buf<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> CopyBuf<'a, R, W> + where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, + { + CopyBuf { + reader, + writer, + amt: 0, + } + } +} + +impl Future for CopyBuf<'_, R, W> +where + R: AsyncBufRead + Unpin + ?Sized, + W: AsyncWrite + Unpin + ?Sized, +{ + type Output = std::io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + let me = &mut *self; + let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?; + if buffer.is_empty() { + ready!(Pin::new(&mut self.writer).poll_flush(cx))?; + return Poll::Ready(Ok(self.amt)); + } + + let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?; + if i == 0 { + return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into())); + } + self.amt += i as u64; + Pin::new(&mut *self.reader).consume(i); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn assert_unpin() { + use std::marker::PhantomPinned; + crate::is_unpin::>(); + } +} diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 52dab990c62..a32b7af8fc1 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -27,6 +27,9 @@ cfg_io_util! { mod copy; pub use copy::{copy, Copy}; + mod copy_buf; + pub use copy_buf::{copy_buf, CopyBuf}; + mod empty; pub use empty::{empty, Empty}; From 7e47c45624e3a5f1052f0a576e0b3253b48a77ee Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Mon, 5 Oct 2020 08:14:34 +0000 Subject: [PATCH 2/3] Address feedback --- tokio/src/io/util/copy_buf.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/io/util/copy_buf.rs b/tokio/src/io/util/copy_buf.rs index 4ebfe0993ec..9afe200c953 100644 --- a/tokio/src/io/util/copy_buf.rs +++ b/tokio/src/io/util/copy_buf.rs @@ -17,7 +17,7 @@ cfg_io_util! { reader: &'a mut R, writer: &'a mut W, amt: u64, - } + } /// Asynchronously copies the entire contents of a reader into a writer. /// @@ -46,7 +46,7 @@ cfg_io_util! { /// /// io::copy_buf(&mut reader, &mut writer).await?; /// - /// assert_eq!(&b"hello"[..], &writer[..]); + /// assert_eq!(b"hello", &writer[..]); /// # Ok(()) /// # } /// ``` From 15ca2f80173ddc3ed1e9fcd1e53302d6f8abc0eb Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 15 Oct 2020 11:44:31 +0000 Subject: [PATCH 3/3] make CopyBuf private --- tokio/src/io/mod.rs | 2 +- tokio/src/io/util/copy_buf.rs | 9 +++++---- tokio/src/io/util/mod.rs | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 20a70577b7b..9191bbcd19e 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -235,7 +235,7 @@ cfg_io_util! { pub(crate) mod util; pub use util::{ copy, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, - BufReader, BufStream, BufWriter, DuplexStream, Copy, CopyBuf, Empty, Lines, Repeat, Sink, Split, Take, + BufReader, BufStream, BufWriter, DuplexStream, Empty, Lines, Repeat, Sink, Split, Take, }; } diff --git a/tokio/src/io/util/copy_buf.rs b/tokio/src/io/util/copy_buf.rs index 9afe200c953..6831580b407 100644 --- a/tokio/src/io/util/copy_buf.rs +++ b/tokio/src/io/util/copy_buf.rs @@ -1,5 +1,6 @@ use crate::io::{AsyncBufRead, AsyncWrite}; use std::future::Future; +use std::io; use std::pin::Pin; use std::task::{Context, Poll}; @@ -13,7 +14,7 @@ cfg_io_util! { /// [copy_buf]: copy_buf() #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct CopyBuf<'a, R: ?Sized, W: ?Sized> { + struct CopyBuf<'a, R: ?Sized, W: ?Sized> { reader: &'a mut R, writer: &'a mut W, amt: u64, @@ -50,7 +51,7 @@ cfg_io_util! { /// # Ok(()) /// # } /// ``` - pub fn copy_buf<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> CopyBuf<'a, R, W> + pub async fn copy_buf<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> io::Result where R: AsyncBufRead + Unpin + ?Sized, W: AsyncWrite + Unpin + ?Sized, @@ -59,7 +60,7 @@ cfg_io_util! { reader, writer, amt: 0, - } + }.await } } @@ -68,7 +69,7 @@ where R: AsyncBufRead + Unpin + ?Sized, W: AsyncWrite + Unpin + ?Sized, { - type Output = std::io::Result; + type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 3e29fa395aa..c945be0d1b3 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -28,7 +28,7 @@ cfg_io_util! { pub use copy::copy; mod copy_buf; - pub use copy_buf::{copy_buf, CopyBuf}; + pub use copy_buf::copy_buf; mod empty; pub use empty::{empty, Empty};