Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move StreamReader and ReaderStream into tokio_util #2788

Merged
merged 8 commits into from
Sep 8, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ publish = false
default = []

# Shorthand for enabling everything
full = ["codec", "udp", "compat"]
full = ["codec", "udp", "compat", "io"]

compat = ["futures-io",]
codec = ["tokio/stream"]
udp = ["tokio/udp"]
io = []

[dependencies]
tokio = { version = "0.3.0", path = "../tokio" }
Expand Down
10 changes: 10 additions & 0 deletions tokio-util/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,13 @@ macro_rules! cfg_udp {
)*
}
}

macro_rules! cfg_io {
($($item:item)*) => {
$(
#[cfg(feature = "io")]
#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
$item
)*
}
}
13 changes: 13 additions & 0 deletions tokio-util/src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
//! Helpers for IO related tasks.
//!
//! These types are often used in combination with hyper or reqwest, as they
//! allow converting between a hyper [`Body`] and [`AsyncRead`].
//!
//! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html
//! [`AsyncRead`]: tokio::io::AsyncRead

mod reader_stream;
mod stream_reader;

pub use self::reader_stream::ReaderStream;
pub use self::stream_reader::StreamReader;
97 changes: 97 additions & 0 deletions tokio-util/src/io/reader_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use bytes::{Bytes, BytesMut};
use futures_core::stream::Stream;
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncRead;

pin_project! {
/// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks.
///
/// This stream is fused. It performs the inverse operation of
/// [`StreamReader`].
///
/// # Example
///
/// ```
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
/// use tokio::stream::StreamExt;
/// use tokio_util::io::ReaderStream;
///
/// // Create a stream of data.
/// let data = b"hello, world!";
/// let mut stream = ReaderStream::new(&data[..]);
///
/// // Read all of the chunks into a vector.
/// let mut stream_contents = Vec::new();
/// while let Some(chunk) = stream.next().await {
/// stream_contents.extend_from_slice(&chunk?);
/// }
///
/// // Once the chunks are concatenated, we should have the
/// // original data.
/// assert_eq!(stream_contents, data);
/// # Ok(())
/// # }
/// ```
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`StreamReader`]: crate::io::StreamReader
/// [`Stream`]: tokio::stream::Stream
#[derive(Debug)]
pub struct ReaderStream<R> {
// Reader itself.
//
// This value is `None` if the stream has terminated.
#[pin]
reader: Option<R>,
// Working buffer, used to optimize allocations.
buf: BytesMut,
}
}

impl<R: AsyncRead> ReaderStream<R> {
/// Convert an [`AsyncRead`] into a [`Stream`] with item type
/// `Result<Bytes, std::io::Error>`.
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`Stream`]: tokio::stream::Stream
pub fn new(reader: R) -> Self {
ReaderStream {
reader: Some(reader),
buf: BytesMut::new(),
}
}
}

const CAPACITY: usize = 4096;
Darksonn marked this conversation as resolved.
Show resolved Hide resolved

impl<R: AsyncRead> Stream for ReaderStream<R> {
type Item = std::io::Result<Bytes>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut().project();
let reader = match this.reader.as_pin_mut() {
Some(r) => r,
None => return Poll::Ready(None),
};
if this.buf.capacity() == 0 {
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
this.buf.reserve(CAPACITY);
}
match reader.poll_read_buf(cx, &mut this.buf) {
Poll::Pending => Poll::Pending,
Poll::Ready(Err(err)) => {
self.project().reader.set(None);
Poll::Ready(Some(Err(err)))
}
Poll::Ready(Ok(0)) => {
self.project().reader.set(None);
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
Poll::Ready(None)
}
Poll::Ready(Ok(_)) => {
let chunk = this.buf.split();
Poll::Ready(Some(Ok(chunk.freeze())))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,89 +1,87 @@
use crate::io::{AsyncBufRead, AsyncRead, ReadBuf};
use crate::stream::Stream;
use bytes::{Buf, BufMut};
use futures_core::stream::TryStream;
use pin_project_lite::pin_project;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf};

pin_project! {
/// Convert a stream of byte chunks into an [`AsyncRead`].
/// Convert a [`Stream`] of byte chunks into an [`AsyncRead`].
///
/// This type is usually created using the [`stream_reader`] function.
/// This type performs the inverse operation of [`ReaderStream`].
///
/// [`AsyncRead`]: crate::io::AsyncRead
/// [`stream_reader`]: crate::io::stream_reader
/// # Example
///
/// ```
/// use bytes::Bytes;
/// use tokio::io::{AsyncReadExt, Result};
/// use tokio_util::io::StreamReader;
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
///
/// // Create a stream from an iterator.
/// let stream = tokio::stream::iter(vec![
/// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])),
/// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])),
/// Result::Ok(Bytes::from_static(&[8, 9, 10, 11])),
/// ]);
///
/// // Convert it to an AsyncRead.
/// let mut read = StreamReader::new(stream);
///
/// // Read five bytes from the stream.
/// let mut buf = [0; 5];
/// read.read_exact(&mut buf).await?;
/// assert_eq!(buf, [0, 1, 2, 3, 4]);
///
/// // Read the rest of the current chunk.
/// assert_eq!(read.read(&mut buf).await?, 3);
/// assert_eq!(&buf[..3], [5, 6, 7]);
///
/// // Read the next chunk.
/// assert_eq!(read.read(&mut buf).await?, 4);
/// assert_eq!(&buf[..4], [8, 9, 10, 11]);
///
/// // We have now reached the end.
/// assert_eq!(read.read(&mut buf).await?, 0);
///
/// # Ok(())
/// # }
/// ```
///
/// [`AsyncRead`]: tokio::io::AsyncRead
/// [`Stream`]: tokio::stream::Stream
/// [`ReaderStream`]: crate::io::ReaderStream
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub struct StreamReader<S, B> {
pub struct StreamReader<S: TryStream> {
#[pin]
inner: S,
chunk: Option<B>,
chunk: Option<S::Ok>,
}
}

/// Convert a stream of byte chunks into an [`AsyncRead`](crate::io::AsyncRead).
///
/// # Example
///
/// ```
/// use bytes::Bytes;
/// use tokio::io::{stream_reader, AsyncReadExt};
/// # #[tokio::main]
/// # async fn main() -> std::io::Result<()> {
///
/// // Create a stream from an iterator.
/// let stream = tokio::stream::iter(vec![
/// Ok(Bytes::from_static(&[0, 1, 2, 3])),
/// Ok(Bytes::from_static(&[4, 5, 6, 7])),
/// Ok(Bytes::from_static(&[8, 9, 10, 11])),
/// ]);
///
/// // Convert it to an AsyncRead.
/// let mut read = stream_reader(stream);
///
/// // Read five bytes from the stream.
/// let mut buf = [0; 5];
/// read.read_exact(&mut buf).await?;
/// assert_eq!(buf, [0, 1, 2, 3, 4]);
///
/// // Read the rest of the current chunk.
/// assert_eq!(read.read(&mut buf).await?, 3);
/// assert_eq!(&buf[..3], [5, 6, 7]);
///
/// // Read the next chunk.
/// assert_eq!(read.read(&mut buf).await?, 4);
/// assert_eq!(&buf[..4], [8, 9, 10, 11]);
///
/// // We have now reached the end.
/// assert_eq!(read.read(&mut buf).await?, 0);
///
/// # Ok(())
/// # }
/// ```
#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
pub fn stream_reader<S, B>(stream: S) -> StreamReader<S, B>
where
S: Stream<Item = Result<B, io::Error>>,
B: Buf,
{
StreamReader::new(stream)
}

impl<S, B> StreamReader<S, B>
impl<S: TryStream> StreamReader<S>
where
S: Stream<Item = Result<B, io::Error>>,
B: Buf,
<S as TryStream>::Ok: Buf,
<S as TryStream>::Error: Into<std::io::Error>,
{
/// Convert the provided stream into an `AsyncRead`.
fn new(stream: S) -> Self {
/// Convert a stream of byte chunks into an [`AsyncRead`](tokio::io::AsyncRead).
///
/// The item should be a [`Result`] with the ok variant being something that
/// implements the [`Buf`] trait (e.g. `Vec<u8>` or `Bytes`). The error
/// should be convertible into an [io error].
///
/// [`Result`]: std::result::Result
/// [`Buf`]: bytes::Buf
/// [io error]: std::io::Error
pub fn new(stream: S) -> Self {
Self {
inner: stream,
chunk: None,
}
}

/// Do we have a chunk and is it non-empty?
fn has_chunk(self: Pin<&mut Self>) -> bool {
if let Some(chunk) = self.project().chunk {
Expand All @@ -94,10 +92,10 @@ where
}
}

impl<S, B> AsyncRead for StreamReader<S, B>
impl<S: TryStream> AsyncRead for StreamReader<S>
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
where
S: Stream<Item = Result<B, io::Error>>,
B: Buf,
<S as TryStream>::Ok: Buf,
<S as TryStream>::Error: Into<std::io::Error>,
{
fn poll_read(
mut self: Pin<&mut Self>,
Expand Down Expand Up @@ -144,10 +142,10 @@ where
}
}

impl<S, B> AsyncBufRead for StreamReader<S, B>
impl<S: TryStream> AsyncBufRead for StreamReader<S>
where
S: Stream<Item = Result<B, io::Error>>,
B: Buf,
<S as TryStream>::Ok: Buf,
<S as TryStream>::Error: Into<std::io::Error>,
{
fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>> {
loop {
Expand All @@ -156,12 +154,12 @@ where
let buf = self.project().chunk.as_ref().unwrap().bytes();
return Poll::Ready(Ok(buf));
} else {
match self.as_mut().project().inner.poll_next(cx) {
match self.as_mut().project().inner.try_poll_next(cx) {
Poll::Ready(Some(Ok(chunk))) => {
// Go around the loop in case the chunk is empty.
*self.as_mut().project().chunk = Some(chunk);
}
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)),
Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())),
Poll::Ready(None) => return Poll::Ready(Ok(&[])),
Poll::Pending => return Poll::Pending,
}
Expand Down
4 changes: 4 additions & 0 deletions tokio-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ cfg_compat! {
pub mod compat;
}

cfg_io! {
pub mod io;
}

pub mod context;

pub mod sync;
Loading