Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ReaderStream #2714

Merged
merged 7 commits into from
Aug 23, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ cfg_io_util! {

cfg_stream! {
pub use util::{stream_reader, StreamReader};
pub use util::{reader_stream, ReaderStream};
}
}

Expand Down
3 changes: 3 additions & 0 deletions tokio/src/io/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ cfg_io_util! {
cfg_stream! {
mod stream_reader;
pub use stream_reader::{stream_reader, StreamReader};

mod reader_stream;
pub use reader_stream::{reader_stream, ReaderStream};
}

mod take;
Expand Down
77 changes: 77 additions & 0 deletions tokio/src/io/util/reader_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use crate::io::AsyncRead;
use crate::stream::Stream;
use bytes::{Bytes, BytesMut};
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};

pin_project! {
/// Convert async reader into stream of Result<Bytes, io::Error>
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
///
/// This type can be created using the [`reader_stream`](crate::io::reader_stream) function
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct ReaderStream<R> {
// reader itself
#[pin]
reader: R,
// Working buffer, used to optimize allocations.
// # Capacity behavior
// Initially `buf` is empty. Also it's getting smaller and smaller
// during polls (because it's chunks are returned to stream user).
// But when it's capacity reaches 0, it is growed.
buf: BytesMut,
}
}

/// Convert `AsyncRead` into stream of byte chunks.
/// # Example
/// ```
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
/// use tokio::stream::StreamExt;
/// let data = b"hello, world!";
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
/// let mut stream = tokio::io::reader_stream(data as &[u8]);
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
/// let mut stream_contents = bytes::BytesMut::new();
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
/// while let Some(chunk) = stream.next().await {
/// stream_contents.extend_from_slice(chunk?.as_ref());
/// }
/// assert_eq!(stream_contents.as_ref(), data);
/// # Ok(())
/// # }
/// ```
pub fn reader_stream<R>(reader: R) -> ReaderStream<R>
where
R: AsyncRead,
{
ReaderStream {
reader,
buf: BytesMut::new(),
}
}

const CAPACITY: usize = 4096;

impl<R> Stream for ReaderStream<R>
where
R: AsyncRead,
{
type Item = std::io::Result<Bytes>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if this.buf.capacity() == 0 {
this.buf.reserve(CAPACITY);
}
// if we have something in our buf, let's return it
match this.reader.poll_read_buf(cx, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))),
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready(Ok(0)) => Poll::Ready(None),
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready(Ok(n)) => {
let chunk = this.buf.split_to(n);
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready(Some(Ok(chunk.freeze())))
}
}
}
}