Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: add copy_buf #2884

Merged
merged 4 commits into from
Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ cfg_io_util! {

pub(crate) mod util;
pub use util::{
copy, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
BufReader, BufStream, BufWriter, DuplexStream, Copy, Empty, Lines, Repeat, Sink, Split, Take,
copy, copy_buf, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt,
BufReader, BufStream, BufWriter, DuplexStream, Copy, CopyBuf, Empty, Lines, Repeat, Sink, Split, Take,
};
}

Expand Down
101 changes: 101 additions & 0 deletions tokio/src/io/util/copy_buf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use crate::io::{AsyncBufRead, AsyncWrite};
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

cfg_io_util! {
/// A future that asynchronously copies the entire contents of a reader into a
/// writer.
///
/// This struct is generally created by calling [`copy_buf`][copy_buf]. Please
/// see the documentation of `copy_buf()` for more details.
///
/// [copy_buf]: copy_buf()
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CopyBuf<'a, R: ?Sized, W: ?Sized> {
reader: &'a mut R,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: futures's copy_buf takes the reader by value to allow !Unpin reader. (This is only done in the reader as only the reader is exhausted by this operation.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, taking it by reference is consistent with Tokio's copy. In either case, !Unpin readers can still be used here if combined with tokio::pin!.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, taking it by reference is consistent with Tokio's copy.

Yeah, if copy_buf takes the reader by value, copy also needs to take the reader by value for consistency.

writer: &'a mut W,
amt: u64,
}

/// Asynchronously copies the entire contents of a reader into a writer.
///
/// This function returns a future that will continuously read data from
/// `reader` and then write it into `writer` in a streaming fashion until
/// `reader` returns EOF.
///
/// On success, the total number of bytes that were copied from `reader` to
/// `writer` is returned.
///
///
/// # Errors
///
/// The returned future will finish with an error will return an error
/// immediately if any call to `poll_fill_buf` or `poll_write` returns an
/// error.
///
/// # Examples
///
/// ```
/// use tokio::io;
///
/// # async fn dox() -> std::io::Result<()> {
/// let mut reader: &[u8] = b"hello";
/// let mut writer: Vec<u8> = vec![];
///
/// io::copy_buf(&mut reader, &mut writer).await?;
///
/// assert_eq!(b"hello", &writer[..]);
/// # Ok(())
/// # }
/// ```
pub fn copy_buf<'a, R, W>(reader: &'a mut R, writer: &'a mut W) -> CopyBuf<'a, R, W>
where
R: AsyncBufRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized,
{
CopyBuf {
reader,
writer,
amt: 0,
}
}
}

impl<R, W> Future for CopyBuf<'_, R, W>
where
R: AsyncBufRead + Unpin + ?Sized,
W: AsyncWrite + Unpin + ?Sized,
{
type Output = std::io::Result<u64>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
let me = &mut *self;
let buffer = ready!(Pin::new(&mut *me.reader).poll_fill_buf(cx))?;
if buffer.is_empty() {
ready!(Pin::new(&mut self.writer).poll_flush(cx))?;
return Poll::Ready(Ok(self.amt));
}

let i = ready!(Pin::new(&mut *me.writer).poll_write(cx, buffer))?;
if i == 0 {
return Poll::Ready(Err(std::io::ErrorKind::WriteZero.into()));
}
self.amt += i as u64;
Pin::new(&mut *self.reader).consume(i);
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn assert_unpin() {
use std::marker::PhantomPinned;
crate::is_unpin::<CopyBuf<'_, PhantomPinned, PhantomPinned>>();
}
}
3 changes: 3 additions & 0 deletions tokio/src/io/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ cfg_io_util! {
mod copy;
pub use copy::{copy, Copy};

mod copy_buf;
pub use copy_buf::{copy_buf, CopyBuf};

mod empty;
pub use empty::{empty, Empty};

Expand Down