From bf748efdeb0dc4169965fcf4bd85f5e135cbdd7c Mon Sep 17 00:00:00 2001 From: Mikail Bagishov Date: Sun, 23 Aug 2020 18:47:20 +0300 Subject: [PATCH 1/7] io: add ReaderStream (#2714) --- tokio/src/io/mod.rs | 1 + tokio/src/io/util/mod.rs | 3 + tokio/src/io/util/reader_stream.rs | 105 +++++++++++++++++++++++++++++ tokio/tests/io_reader_stream.rs | 64 ++++++++++++++++++ 4 files changed, 173 insertions(+) create mode 100644 tokio/src/io/util/reader_stream.rs create mode 100644 tokio/tests/io_reader_stream.rs diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index c43f0e83140..4a787837e40 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -239,6 +239,7 @@ cfg_io_util! { cfg_stream! { pub use util::{stream_reader, StreamReader}; + pub use util::{reader_stream, ReaderStream}; } } diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 609ff2386a6..782a02a8f74 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -66,6 +66,9 @@ cfg_io_util! { cfg_stream! { mod stream_reader; pub use stream_reader::{stream_reader, StreamReader}; + + mod reader_stream; + pub use reader_stream::{reader_stream, ReaderStream}; } mod take; diff --git a/tokio/src/io/util/reader_stream.rs b/tokio/src/io/util/reader_stream.rs new file mode 100644 index 00000000000..51651cede4d --- /dev/null +++ b/tokio/src/io/util/reader_stream.rs @@ -0,0 +1,105 @@ +use crate::io::AsyncRead; +use crate::stream::Stream; +use bytes::{Bytes, BytesMut}; +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Convert an [`AsyncRead`] implementor into a + /// [`Stream`] of Result<[`Bytes`], std::io::Error>. + /// After first error it will stop. + /// Additionally, this stream is fused: after it returns None at some + /// moment, it is guaranteed that further `next()`, `poll_next()` and + /// similar functions will instantly return None. + /// + /// This type can be created using the [`reader_stream`] function + /// + /// [`AsyncRead`]: crate::io::AsyncRead + /// [`Stream`]: crate::stream::Stream + /// [`Bytes`]: bytes::Bytes + /// [`reader_stream`]: crate::io::reader_stream + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub struct ReaderStream { + // Reader itself. + // None if we had error reading from the `reader` in the past. + #[pin] + reader: Option, + // Working buffer, used to optimize allocations. + // # Capacity behavior + // Initially `buf` is empty. Also it's getting smaller and smaller + // during polls (because its chunks are returned to stream user). + // But when it's capacity reaches 0, it is growed. + buf: BytesMut, + } +} + +/// Convert an [`AsyncRead`] implementor into a +/// [`Stream`] of Result<[`Bytes`], std::io::Error>. +/// +/// # Example +/// +/// ``` +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// use tokio::stream::StreamExt; +/// +/// let data: &[u8] = b"hello, world!"; +/// let mut stream = tokio::io::reader_stream(data); +/// let mut stream_contents = Vec::new(); +/// while let Some(chunk) = stream.next().await { +/// stream_contents.extend_from_slice(chunk?.as_ref()); +/// } +/// assert_eq!(stream_contents, data); +/// # Ok(()) +/// # } +/// ``` +/// +/// [`AsyncRead`]: crate::io::AsyncRead +/// [`Stream`]: crate::stream::Stream +/// [`Bytes`]: bytes::Bytes +pub fn reader_stream(reader: R) -> ReaderStream +where + R: AsyncRead, +{ + ReaderStream { + reader: Some(reader), + buf: BytesMut::new(), + } +} + +const CAPACITY: usize = 4096; + +impl Stream for ReaderStream +where + R: AsyncRead, +{ + type Item = std::io::Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + let reader = match this.reader.as_pin_mut() { + Some(r) => r, + None => return Poll::Ready(None), + }; + if this.buf.capacity() == 0 { + this.buf.reserve(CAPACITY); + } + match reader.poll_read_buf(cx, &mut this.buf) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => { + self.project().reader.set(None); + Poll::Ready(Some(Err(err))) + } + Poll::Ready(Ok(0)) => { + self.project().reader.set(None); + Poll::Ready(None) + } + Poll::Ready(Ok(_)) => { + let chunk = this.buf.split(); + Poll::Ready(Some(Ok(chunk.freeze()))) + } + } + } +} diff --git a/tokio/tests/io_reader_stream.rs b/tokio/tests/io_reader_stream.rs new file mode 100644 index 00000000000..6546a0ef4da --- /dev/null +++ b/tokio/tests/io_reader_stream.rs @@ -0,0 +1,64 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::AsyncRead; +use tokio::stream::StreamExt; + +/// produces at most `remaining` zeros, that returns error. +/// each time it reads at most 31 byte. +struct Reader { + remaining: usize, +} + +impl AsyncRead for Reader { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = Pin::into_inner(self); + assert_ne!(buf.len(), 0); + if this.remaining > 0 { + let n = std::cmp::min(this.remaining, buf.len()); + let n = std::cmp::min(n, 31); + for x in &mut buf[..n] { + *x = 0; + } + this.remaining -= n; + Poll::Ready(Ok(n)) + } else { + Poll::Ready(Err(std::io::Error::from_raw_os_error(22))) + } + } +} + +#[tokio::test] +async fn correct_behavior_on_errors() { + let reader = Reader { remaining: 8000 }; + let mut stream = tokio::io::reader_stream(reader); + let mut zeros_received = 0; + let mut had_error = false; + loop { + let item = stream.next().await.unwrap(); + match item { + Ok(bytes) => { + let bytes = &*bytes; + for byte in bytes { + assert_eq!(*byte, 0); + zeros_received += 1; + } + } + Err(_) => { + assert!(!had_error); + had_error = true; + break; + } + } + } + + assert!(had_error); + assert_eq!(zeros_received, 8000); + assert!(stream.next().await.is_none()); +} From 6acad9f3ffdd271ab2abfd8dac99bb8bd27e8051 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sun, 23 Aug 2020 18:21:14 +0200 Subject: [PATCH 2/7] Move StreamReader and ReaderStream into tokio_util --- tokio-util/Cargo.toml | 3 +- tokio-util/src/cfg.rs | 10 ++ tokio-util/src/io/mod.rs | 13 ++ tokio-util/src/io/reader_stream.rs | 97 ++++++++++++ .../src/io}/stream_reader.rs | 142 +++++++++--------- tokio-util/src/lib.rs | 4 + tokio-util/tests/foo.rs | 33 ++++ .../tests/io_reader_stream.rs | 18 ++- {tokio => tokio-util}/tests/stream_reader.rs | 7 +- tokio/src/io/mod.rs | 5 - tokio/src/io/util/mod.rs | 8 - tokio/src/io/util/reader_stream.rs | 105 ------------- 12 files changed, 243 insertions(+), 202 deletions(-) create mode 100644 tokio-util/src/io/mod.rs create mode 100644 tokio-util/src/io/reader_stream.rs rename {tokio/src/io/util => tokio-util/src/io}/stream_reader.rs (53%) create mode 100644 tokio-util/tests/foo.rs rename {tokio => tokio-util}/tests/io_reader_stream.rs (76%) rename {tokio => tokio-util}/tests/stream_reader.rs (83%) delete mode 100644 tokio/src/io/util/reader_stream.rs diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index b47c9dfc21c..85b4e5929c4 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -25,11 +25,12 @@ publish = false default = [] # Shorthand for enabling everything -full = ["codec", "udp", "compat"] +full = ["codec", "udp", "compat", "io"] compat = ["futures-io",] codec = ["tokio/stream"] udp = ["tokio/udp"] +io = [] [dependencies] tokio = { version = "0.3.0", path = "../tokio" } diff --git a/tokio-util/src/cfg.rs b/tokio-util/src/cfg.rs index 27e8c66a433..2efa5f09aff 100644 --- a/tokio-util/src/cfg.rs +++ b/tokio-util/src/cfg.rs @@ -27,3 +27,13 @@ macro_rules! cfg_udp { )* } } + +macro_rules! cfg_io { + ($($item:item)*) => { + $( + #[cfg(feature = "io")] + #[cfg_attr(docsrs, doc(cfg(feature = "io")))] + $item + )* + } +} diff --git a/tokio-util/src/io/mod.rs b/tokio-util/src/io/mod.rs new file mode 100644 index 00000000000..53066c4e444 --- /dev/null +++ b/tokio-util/src/io/mod.rs @@ -0,0 +1,13 @@ +//! Helpers for IO related tasks. +//! +//! These types are often used in combination with hyper or reqwest, as they +//! allow converting between a hyper [`Body`] and [`AsyncRead`]. +//! +//! [`Body`]: https://docs.rs/hyper/0.13/hyper/struct.Body.html +//! [`AsyncRead`]: tokio::io::AsyncRead + +mod reader_stream; +mod stream_reader; + +pub use self::reader_stream::ReaderStream; +pub use self::stream_reader::StreamReader; diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs new file mode 100644 index 00000000000..6f4f62bb13a --- /dev/null +++ b/tokio-util/src/io/reader_stream.rs @@ -0,0 +1,97 @@ +use bytes::{Bytes, BytesMut}; +use futures_core::stream::Stream; +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::AsyncRead; + +pin_project! { + /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks. + /// + /// This stream is fused. It performs the inverse operation of + /// [`StreamReader`]. + /// + /// # Example + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// use tokio::stream::StreamExt; + /// use tokio_util::io::ReaderStream; + /// + /// // Create a stream of data. + /// let data = b"hello, world!"; + /// let mut stream = ReaderStream::new(&data[..]); + /// + /// // Read all of the chunks into a vector. + /// let mut stream_contents = Vec::new(); + /// while let Some(chunk) = stream.next().await { + /// stream_contents.extend_from_slice(&chunk?); + /// } + /// + /// // Once the chunks are concatenated, we should have the + /// // original data. + /// assert_eq!(stream_contents, data); + /// # Ok(()) + /// # } + /// ``` + /// + /// [`AsyncRead`]: tokio::io::AsyncRead + /// [`StreamReader`]: crate::io::StreamReader + /// [`Stream`]: tokio::stream::Stream + #[derive(Debug)] + pub struct ReaderStream { + // Reader itself. + // + // This value is `None` if the stream has terminated. + #[pin] + reader: Option, + // Working buffer, used to optimize allocations. + buf: BytesMut, + } +} + +impl ReaderStream { + /// Convert an [`AsyncRead`] into a [`Stream`] with item type + /// `Result`. + /// + /// [`AsyncRead`]: tokio::io::AsyncRead + /// [`Stream`]: tokio::stream::Stream + pub fn new(reader: R) -> Self { + ReaderStream { + reader: Some(reader), + buf: BytesMut::new(), + } + } +} + +const CAPACITY: usize = 4096; + +impl Stream for ReaderStream { + type Item = std::io::Result; + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); + let reader = match this.reader.as_pin_mut() { + Some(r) => r, + None => return Poll::Ready(None), + }; + if this.buf.capacity() == 0 { + this.buf.reserve(CAPACITY); + } + match dbg!(reader.poll_read_buf(cx, &mut this.buf)) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => { + self.project().reader.set(None); + Poll::Ready(Some(Err(err))) + } + Poll::Ready(Ok(0)) => { + self.project().reader.set(None); + Poll::Ready(None) + } + Poll::Ready(Ok(_)) => { + let chunk = this.buf.split(); + Poll::Ready(Some(Ok(chunk.freeze()))) + } + } + } +} diff --git a/tokio/src/io/util/stream_reader.rs b/tokio-util/src/io/stream_reader.rs similarity index 53% rename from tokio/src/io/util/stream_reader.rs rename to tokio-util/src/io/stream_reader.rs index 2471197a46e..ec3d0e9d9bf 100644 --- a/tokio/src/io/util/stream_reader.rs +++ b/tokio-util/src/io/stream_reader.rs @@ -1,89 +1,87 @@ -use crate::io::{AsyncBufRead, AsyncRead, ReadBuf}; -use crate::stream::Stream; use bytes::{Buf, BufMut}; +use futures_core::stream::TryStream; use pin_project_lite::pin_project; use std::io; use std::pin::Pin; use std::task::{Context, Poll}; +use tokio::io::{AsyncBufRead, AsyncRead, ReadBuf}; pin_project! { - /// Convert a stream of byte chunks into an [`AsyncRead`]. + /// Convert a [`Stream`] of byte chunks into an [`AsyncRead`]. /// - /// This type is usually created using the [`stream_reader`] function. + /// This type performs the inverse operation of [`ReaderStream`]. /// - /// [`AsyncRead`]: crate::io::AsyncRead - /// [`stream_reader`]: crate::io::stream_reader + /// # Example + /// + /// ``` + /// use bytes::Bytes; + /// use tokio::io::{AsyncReadExt, Result}; + /// use tokio_util::io::StreamReader; + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// + /// // Create a stream from an iterator. + /// let stream = tokio::stream::iter(vec![ + /// Result::Ok(Bytes::from_static(&[0, 1, 2, 3])), + /// Result::Ok(Bytes::from_static(&[4, 5, 6, 7])), + /// Result::Ok(Bytes::from_static(&[8, 9, 10, 11])), + /// ]); + /// + /// // Convert it to an AsyncRead. + /// let mut read = StreamReader::new(stream); + /// + /// // Read five bytes from the stream. + /// let mut buf = [0; 5]; + /// read.read_exact(&mut buf).await?; + /// assert_eq!(buf, [0, 1, 2, 3, 4]); + /// + /// // Read the rest of the current chunk. + /// assert_eq!(read.read(&mut buf).await?, 3); + /// assert_eq!(&buf[..3], [5, 6, 7]); + /// + /// // Read the next chunk. + /// assert_eq!(read.read(&mut buf).await?, 4); + /// assert_eq!(&buf[..4], [8, 9, 10, 11]); + /// + /// // We have now reached the end. + /// assert_eq!(read.read(&mut buf).await?, 0); + /// + /// # Ok(()) + /// # } + /// ``` + /// + /// [`AsyncRead`]: tokio::io::AsyncRead + /// [`Stream`]: tokio::stream::Stream + /// [`ReaderStream`]: crate::io::ReaderStream #[derive(Debug)] - #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] - #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] - pub struct StreamReader { + pub struct StreamReader { #[pin] inner: S, - chunk: Option, + chunk: Option, } } -/// Convert a stream of byte chunks into an [`AsyncRead`](crate::io::AsyncRead). -/// -/// # Example -/// -/// ``` -/// use bytes::Bytes; -/// use tokio::io::{stream_reader, AsyncReadExt}; -/// # #[tokio::main] -/// # async fn main() -> std::io::Result<()> { -/// -/// // Create a stream from an iterator. -/// let stream = tokio::stream::iter(vec![ -/// Ok(Bytes::from_static(&[0, 1, 2, 3])), -/// Ok(Bytes::from_static(&[4, 5, 6, 7])), -/// Ok(Bytes::from_static(&[8, 9, 10, 11])), -/// ]); -/// -/// // Convert it to an AsyncRead. -/// let mut read = stream_reader(stream); -/// -/// // Read five bytes from the stream. -/// let mut buf = [0; 5]; -/// read.read_exact(&mut buf).await?; -/// assert_eq!(buf, [0, 1, 2, 3, 4]); -/// -/// // Read the rest of the current chunk. -/// assert_eq!(read.read(&mut buf).await?, 3); -/// assert_eq!(&buf[..3], [5, 6, 7]); -/// -/// // Read the next chunk. -/// assert_eq!(read.read(&mut buf).await?, 4); -/// assert_eq!(&buf[..4], [8, 9, 10, 11]); -/// -/// // We have now reached the end. -/// assert_eq!(read.read(&mut buf).await?, 0); -/// -/// # Ok(()) -/// # } -/// ``` -#[cfg_attr(docsrs, doc(cfg(feature = "stream")))] -#[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] -pub fn stream_reader(stream: S) -> StreamReader -where - S: Stream>, - B: Buf, -{ - StreamReader::new(stream) -} - -impl StreamReader +impl StreamReader where - S: Stream>, - B: Buf, + ::Ok: Buf, + ::Error: Into, { - /// Convert the provided stream into an `AsyncRead`. - fn new(stream: S) -> Self { + /// Convert a stream of byte chunks into an [`AsyncRead`](tokio::io::AsyncRead). + /// + /// The item should be a [`Result`] with the ok variant being something that + /// implements the [`Buf`] trait (e.g. `Vec` or `Bytes`). The error + /// should be convertible into an [io error]. + /// + /// [`Result`]: std::result::Result + /// [`Buf`]: bytes::Buf + /// [io error]: std::io::Error + pub fn new(stream: S) -> Self { Self { inner: stream, chunk: None, } } + /// Do we have a chunk and is it non-empty? fn has_chunk(self: Pin<&mut Self>) -> bool { if let Some(chunk) = self.project().chunk { @@ -94,10 +92,10 @@ where } } -impl AsyncRead for StreamReader +impl AsyncRead for StreamReader where - S: Stream>, - B: Buf, + ::Ok: Buf, + ::Error: Into, { fn poll_read( mut self: Pin<&mut Self>, @@ -144,10 +142,10 @@ where } } -impl AsyncBufRead for StreamReader +impl AsyncBufRead for StreamReader where - S: Stream>, - B: Buf, + ::Ok: Buf, + ::Error: Into, { fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { @@ -156,12 +154,12 @@ where let buf = self.project().chunk.as_ref().unwrap().bytes(); return Poll::Ready(Ok(buf)); } else { - match self.as_mut().project().inner.poll_next(cx) { + match self.as_mut().project().inner.try_poll_next(cx) { Poll::Ready(Some(Ok(chunk))) => { // Go around the loop in case the chunk is empty. *self.as_mut().project().chunk = Some(chunk); } - Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err)), + Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())), Poll::Ready(None) => return Poll::Ready(Ok(&[])), Poll::Pending => return Poll::Pending, } diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index ec69f59d04b..fa0a506ec9e 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -38,4 +38,8 @@ cfg_compat! { pub mod compat; } +cfg_io! { + pub mod io; +} + pub mod sync; diff --git a/tokio-util/tests/foo.rs b/tokio-util/tests/foo.rs new file mode 100644 index 00000000000..870605c18f9 --- /dev/null +++ b/tokio-util/tests/foo.rs @@ -0,0 +1,33 @@ +use bytes::Bytes; +use tokio::io::{AsyncReadExt, Result}; +use tokio_util::io::StreamReader; +#[tokio::main] +async fn main() -> std::io::Result<()> { + // Create a stream from an iterator. + let stream = tokio::stream::iter(vec![ + Result::Ok(Bytes::from_static(&[0, 1, 2, 3])), + Result::Ok(Bytes::from_static(&[4, 5, 6, 7])), + Result::Ok(Bytes::from_static(&[8, 9, 10, 11])), + ]); + + // Convert it to an AsyncRead. + let mut read = StreamReader::new(stream); + + // Read five bytes from the stream. + let mut buf = [0; 5]; + read.read_exact(&mut buf[..]).await?; + assert_eq!(buf, [0, 1, 2, 3, 4]); + + // Read the rest of the current chunk. + assert_eq!(read.read(&mut buf).await?, 3); + assert_eq!(&buf[..3], [5, 6, 7]); + + // Read the next chunk. + assert_eq!(read.read(&mut buf).await?, 4); + assert_eq!(&buf[..4], [8, 9, 10, 11]); + + // We have now reached the end. + assert_eq!(read.read(&mut buf).await?, 0); + + Ok(()) +} diff --git a/tokio/tests/io_reader_stream.rs b/tokio-util/tests/io_reader_stream.rs similarity index 76% rename from tokio/tests/io_reader_stream.rs rename to tokio-util/tests/io_reader_stream.rs index 6546a0ef4da..bbc8180ba4a 100644 --- a/tokio/tests/io_reader_stream.rs +++ b/tokio-util/tests/io_reader_stream.rs @@ -3,7 +3,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::AsyncRead; +use tokio::io::{AsyncRead, ReadBuf}; use tokio::stream::StreamExt; /// produces at most `remaining` zeros, that returns error. @@ -16,18 +16,19 @@ impl AsyncRead for Reader { fn poll_read( self: Pin<&mut Self>, _cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { + buf: &mut ReadBuf<'_>, + ) -> Poll> { let this = Pin::into_inner(self); - assert_ne!(buf.len(), 0); + assert_ne!(buf.remaining(), 0); if this.remaining > 0 { - let n = std::cmp::min(this.remaining, buf.len()); + let n = std::cmp::min(this.remaining, buf.remaining()); let n = std::cmp::min(n, 31); - for x in &mut buf[..n] { + for x in &mut buf.initialize_unfilled_to(n)[..n] { *x = 0; } + buf.add_filled(n); this.remaining -= n; - Poll::Ready(Ok(n)) + Poll::Ready(Ok(())) } else { Poll::Ready(Err(std::io::Error::from_raw_os_error(22))) } @@ -37,11 +38,12 @@ impl AsyncRead for Reader { #[tokio::test] async fn correct_behavior_on_errors() { let reader = Reader { remaining: 8000 }; - let mut stream = tokio::io::reader_stream(reader); + let mut stream = tokio_util::io::ReaderStream::new(reader); let mut zeros_received = 0; let mut had_error = false; loop { let item = stream.next().await.unwrap(); + println!("{:?}", item); match item { Ok(bytes) => { let bytes = &*bytes; diff --git a/tokio/tests/stream_reader.rs b/tokio-util/tests/stream_reader.rs similarity index 83% rename from tokio/tests/stream_reader.rs rename to tokio-util/tests/stream_reader.rs index 8370df4dac7..eddebcebea9 100644 --- a/tokio/tests/stream_reader.rs +++ b/tokio-util/tests/stream_reader.rs @@ -2,13 +2,14 @@ #![cfg(feature = "full")] use bytes::Bytes; -use tokio::io::{stream_reader, AsyncReadExt}; +use tokio::io::AsyncReadExt; use tokio::stream::iter; +use tokio_util::io::StreamReader; #[tokio::test] async fn test_stream_reader() -> std::io::Result<()> { let stream = iter(vec![ - Ok(Bytes::from_static(&[])), + std::io::Result::Ok(Bytes::from_static(&[])), Ok(Bytes::from_static(&[0, 1, 2, 3])), Ok(Bytes::from_static(&[])), Ok(Bytes::from_static(&[4, 5, 6, 7])), @@ -17,7 +18,7 @@ async fn test_stream_reader() -> std::io::Result<()> { Ok(Bytes::from_static(&[])), ]); - let mut read = stream_reader(stream); + let mut read = StreamReader::new(stream); let mut buf = [0; 5]; read.read_exact(&mut buf).await?; diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 4a787837e40..c4b4d7d3401 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -236,11 +236,6 @@ cfg_io_util! { copy, duplex, empty, repeat, sink, AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, BufStream, BufWriter, DuplexStream, Copy, Empty, Lines, Repeat, Sink, Split, Take, }; - - cfg_stream! { - pub use util::{stream_reader, StreamReader}; - pub use util::{reader_stream, ReaderStream}; - } } cfg_not_io_util! { diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 782a02a8f74..1bd0a3f87b4 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -63,14 +63,6 @@ cfg_io_util! { mod split; pub use split::Split; - cfg_stream! { - mod stream_reader; - pub use stream_reader::{stream_reader, StreamReader}; - - mod reader_stream; - pub use reader_stream::{reader_stream, ReaderStream}; - } - mod take; pub use take::Take; diff --git a/tokio/src/io/util/reader_stream.rs b/tokio/src/io/util/reader_stream.rs deleted file mode 100644 index 51651cede4d..00000000000 --- a/tokio/src/io/util/reader_stream.rs +++ /dev/null @@ -1,105 +0,0 @@ -use crate::io::AsyncRead; -use crate::stream::Stream; -use bytes::{Bytes, BytesMut}; -use pin_project_lite::pin_project; -use std::pin::Pin; -use std::task::{Context, Poll}; - -pin_project! { - /// Convert an [`AsyncRead`] implementor into a - /// [`Stream`] of Result<[`Bytes`], std::io::Error>. - /// After first error it will stop. - /// Additionally, this stream is fused: after it returns None at some - /// moment, it is guaranteed that further `next()`, `poll_next()` and - /// similar functions will instantly return None. - /// - /// This type can be created using the [`reader_stream`] function - /// - /// [`AsyncRead`]: crate::io::AsyncRead - /// [`Stream`]: crate::stream::Stream - /// [`Bytes`]: bytes::Bytes - /// [`reader_stream`]: crate::io::reader_stream - #[derive(Debug)] - #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] - #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] - pub struct ReaderStream { - // Reader itself. - // None if we had error reading from the `reader` in the past. - #[pin] - reader: Option, - // Working buffer, used to optimize allocations. - // # Capacity behavior - // Initially `buf` is empty. Also it's getting smaller and smaller - // during polls (because its chunks are returned to stream user). - // But when it's capacity reaches 0, it is growed. - buf: BytesMut, - } -} - -/// Convert an [`AsyncRead`] implementor into a -/// [`Stream`] of Result<[`Bytes`], std::io::Error>. -/// -/// # Example -/// -/// ``` -/// # #[tokio::main] -/// # async fn main() -> std::io::Result<()> { -/// use tokio::stream::StreamExt; -/// -/// let data: &[u8] = b"hello, world!"; -/// let mut stream = tokio::io::reader_stream(data); -/// let mut stream_contents = Vec::new(); -/// while let Some(chunk) = stream.next().await { -/// stream_contents.extend_from_slice(chunk?.as_ref()); -/// } -/// assert_eq!(stream_contents, data); -/// # Ok(()) -/// # } -/// ``` -/// -/// [`AsyncRead`]: crate::io::AsyncRead -/// [`Stream`]: crate::stream::Stream -/// [`Bytes`]: bytes::Bytes -pub fn reader_stream(reader: R) -> ReaderStream -where - R: AsyncRead, -{ - ReaderStream { - reader: Some(reader), - buf: BytesMut::new(), - } -} - -const CAPACITY: usize = 4096; - -impl Stream for ReaderStream -where - R: AsyncRead, -{ - type Item = std::io::Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.as_mut().project(); - let reader = match this.reader.as_pin_mut() { - Some(r) => r, - None => return Poll::Ready(None), - }; - if this.buf.capacity() == 0 { - this.buf.reserve(CAPACITY); - } - match reader.poll_read_buf(cx, &mut this.buf) { - Poll::Pending => Poll::Pending, - Poll::Ready(Err(err)) => { - self.project().reader.set(None); - Poll::Ready(Some(Err(err))) - } - Poll::Ready(Ok(0)) => { - self.project().reader.set(None); - Poll::Ready(None) - } - Poll::Ready(Ok(_)) => { - let chunk = this.buf.split(); - Poll::Ready(Some(Ok(chunk.freeze()))) - } - } - } -} From 436dd223b01b954a4b128952bc280027fa02fe6d Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 28 Aug 2020 19:25:27 +0200 Subject: [PATCH 3/7] Fix error stuff --- tokio-util/tests/foo.rs | 33 ------------------- tokio-util/tests/io_reader_stream.rs | 1 - .../{stream_reader.rs => io_stream_reader.rs} | 1 - tokio-util/tests/sync_cancellation_token.rs | 2 ++ tokio-util/tests/udp.rs | 2 ++ 5 files changed, 4 insertions(+), 35 deletions(-) delete mode 100644 tokio-util/tests/foo.rs rename tokio-util/tests/{stream_reader.rs => io_stream_reader.rs} (97%) diff --git a/tokio-util/tests/foo.rs b/tokio-util/tests/foo.rs deleted file mode 100644 index 870605c18f9..00000000000 --- a/tokio-util/tests/foo.rs +++ /dev/null @@ -1,33 +0,0 @@ -use bytes::Bytes; -use tokio::io::{AsyncReadExt, Result}; -use tokio_util::io::StreamReader; -#[tokio::main] -async fn main() -> std::io::Result<()> { - // Create a stream from an iterator. - let stream = tokio::stream::iter(vec![ - Result::Ok(Bytes::from_static(&[0, 1, 2, 3])), - Result::Ok(Bytes::from_static(&[4, 5, 6, 7])), - Result::Ok(Bytes::from_static(&[8, 9, 10, 11])), - ]); - - // Convert it to an AsyncRead. - let mut read = StreamReader::new(stream); - - // Read five bytes from the stream. - let mut buf = [0; 5]; - read.read_exact(&mut buf[..]).await?; - assert_eq!(buf, [0, 1, 2, 3, 4]); - - // Read the rest of the current chunk. - assert_eq!(read.read(&mut buf).await?, 3); - assert_eq!(&buf[..3], [5, 6, 7]); - - // Read the next chunk. - assert_eq!(read.read(&mut buf).await?, 4); - assert_eq!(&buf[..4], [8, 9, 10, 11]); - - // We have now reached the end. - assert_eq!(read.read(&mut buf).await?, 0); - - Ok(()) -} diff --git a/tokio-util/tests/io_reader_stream.rs b/tokio-util/tests/io_reader_stream.rs index bbc8180ba4a..b906de097e2 100644 --- a/tokio-util/tests/io_reader_stream.rs +++ b/tokio-util/tests/io_reader_stream.rs @@ -1,5 +1,4 @@ #![warn(rust_2018_idioms)] -#![cfg(feature = "full")] use std::pin::Pin; use std::task::{Context, Poll}; diff --git a/tokio-util/tests/stream_reader.rs b/tokio-util/tests/io_stream_reader.rs similarity index 97% rename from tokio-util/tests/stream_reader.rs rename to tokio-util/tests/io_stream_reader.rs index eddebcebea9..b0ed1d2d046 100644 --- a/tokio-util/tests/stream_reader.rs +++ b/tokio-util/tests/io_stream_reader.rs @@ -1,5 +1,4 @@ #![warn(rust_2018_idioms)] -#![cfg(feature = "full")] use bytes::Bytes; use tokio::io::AsyncReadExt; diff --git a/tokio-util/tests/sync_cancellation_token.rs b/tokio-util/tests/sync_cancellation_token.rs index c65a6425fd9..438e5d5ef13 100644 --- a/tokio-util/tests/sync_cancellation_token.rs +++ b/tokio-util/tests/sync_cancellation_token.rs @@ -1,3 +1,5 @@ +#![warn(rust_2018_idioms)] + use tokio::pin; use tokio_util::sync::CancellationToken; diff --git a/tokio-util/tests/udp.rs b/tokio-util/tests/udp.rs index d0320beb185..4820ac72d00 100644 --- a/tokio-util/tests/udp.rs +++ b/tokio-util/tests/udp.rs @@ -1,3 +1,5 @@ +#![warn(rust_2018_idioms)] + use tokio::{net::UdpSocket, stream::StreamExt}; use tokio_util::codec::{Decoder, Encoder, LinesCodec}; use tokio_util::udp::UdpFramed; From de3a2c086ffd6947e51e8fe40ef3e9a7dec598e6 Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Fri, 28 Aug 2020 19:28:31 +0200 Subject: [PATCH 4/7] Add semicolon --- tokio-util/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/lib.rs b/tokio-util/src/lib.rs index 9e4b1674590..49733c6a4b4 100644 --- a/tokio-util/src/lib.rs +++ b/tokio-util/src/lib.rs @@ -42,6 +42,6 @@ cfg_io! { pub mod io; } -pub mod context +pub mod context; pub mod sync; From 1a1f8b2aa6057d41559f4589b424b226e6e8d7bd Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 31 Aug 2020 16:32:15 +0200 Subject: [PATCH 5/7] Remove dbg! --- tokio-util/src/io/reader_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index 6f4f62bb13a..a67754f7863 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -78,7 +78,7 @@ impl Stream for ReaderStream { if this.buf.capacity() == 0 { this.buf.reserve(CAPACITY); } - match dbg!(reader.poll_read_buf(cx, &mut this.buf)) { + match reader.poll_read_buf(cx, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { self.project().reader.set(None); From 117c7d5f00f29a1c6390e76eb8227c04403991eb Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Sat, 5 Sep 2020 20:55:12 +0200 Subject: [PATCH 6/7] Change StreamReader to use Stream --- tokio-util/src/io/stream_reader.rs | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/tokio-util/src/io/stream_reader.rs b/tokio-util/src/io/stream_reader.rs index ec3d0e9d9bf..5c3ab019eba 100644 --- a/tokio-util/src/io/stream_reader.rs +++ b/tokio-util/src/io/stream_reader.rs @@ -1,5 +1,5 @@ use bytes::{Buf, BufMut}; -use futures_core::stream::TryStream; +use futures_core::stream::Stream; use pin_project_lite::pin_project; use std::io; use std::pin::Pin; @@ -54,17 +54,18 @@ pin_project! { /// [`Stream`]: tokio::stream::Stream /// [`ReaderStream`]: crate::io::ReaderStream #[derive(Debug)] - pub struct StreamReader { + pub struct StreamReader { #[pin] inner: S, - chunk: Option, + chunk: Option, } } -impl StreamReader +impl StreamReader where - ::Ok: Buf, - ::Error: Into, + S: Stream>, + B: Buf, + E: Into, { /// Convert a stream of byte chunks into an [`AsyncRead`](tokio::io::AsyncRead). /// @@ -92,10 +93,11 @@ where } } -impl AsyncRead for StreamReader +impl AsyncRead for StreamReader where - ::Ok: Buf, - ::Error: Into, + S: Stream>, + B: Buf, + E: Into, { fn poll_read( mut self: Pin<&mut Self>, @@ -142,10 +144,11 @@ where } } -impl AsyncBufRead for StreamReader +impl AsyncBufRead for StreamReader where - ::Ok: Buf, - ::Error: Into, + S: Stream>, + B: Buf, + E: Into, { fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { @@ -154,7 +157,7 @@ where let buf = self.project().chunk.as_ref().unwrap().bytes(); return Poll::Ready(Ok(buf)); } else { - match self.as_mut().project().inner.try_poll_next(cx) { + match self.as_mut().project().inner.poll_next(cx) { Poll::Ready(Some(Ok(chunk))) => { // Go around the loop in case the chunk is empty. *self.as_mut().project().chunk = Some(chunk); From 4c0e002195a832ac16d3d9cc2cadddeca66572da Mon Sep 17 00:00:00 2001 From: Alice Ryhl Date: Mon, 7 Sep 2020 21:36:26 +0200 Subject: [PATCH 7/7] Move CAPACITY and add newlines --- tokio-util/src/io/reader_stream.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tokio-util/src/io/reader_stream.rs b/tokio-util/src/io/reader_stream.rs index a67754f7863..bde7cceec5c 100644 --- a/tokio-util/src/io/reader_stream.rs +++ b/tokio-util/src/io/reader_stream.rs @@ -5,6 +5,8 @@ use std::pin::Pin; use std::task::{Context, Poll}; use tokio::io::AsyncRead; +const CAPACITY: usize = 4096; + pin_project! { /// Convert an [`AsyncRead`] into a [`Stream`] of byte chunks. /// @@ -65,19 +67,20 @@ impl ReaderStream { } } -const CAPACITY: usize = 4096; - impl Stream for ReaderStream { type Item = std::io::Result; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.as_mut().project(); + let reader = match this.reader.as_pin_mut() { Some(r) => r, None => return Poll::Ready(None), }; + if this.buf.capacity() == 0 { this.buf.reserve(CAPACITY); } + match reader.poll_read_buf(cx, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => {