From b02e8c773e02944d0ac21ee563a34211c4229b9f Mon Sep 17 00:00:00 2001 From: Mikail Bagishov Date: Wed, 29 Jul 2020 23:14:36 +0300 Subject: [PATCH 1/7] Add ReaderStream --- tokio/src/io/mod.rs | 1 + tokio/src/io/util/mod.rs | 3 ++ tokio/src/io/util/reader_stream.rs | 77 ++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+) create mode 100644 tokio/src/io/util/reader_stream.rs diff --git a/tokio/src/io/mod.rs b/tokio/src/io/mod.rs index 9e0e063195c..37da942ff3d 100644 --- a/tokio/src/io/mod.rs +++ b/tokio/src/io/mod.rs @@ -236,6 +236,7 @@ cfg_io_util! { cfg_stream! { pub use util::{stream_reader, StreamReader}; + pub use util::{reader_stream, ReaderStream}; } } diff --git a/tokio/src/io/util/mod.rs b/tokio/src/io/util/mod.rs index 609ff2386a6..782a02a8f74 100644 --- a/tokio/src/io/util/mod.rs +++ b/tokio/src/io/util/mod.rs @@ -66,6 +66,9 @@ cfg_io_util! { cfg_stream! { mod stream_reader; pub use stream_reader::{stream_reader, StreamReader}; + + mod reader_stream; + pub use reader_stream::{reader_stream, ReaderStream}; } mod take; diff --git a/tokio/src/io/util/reader_stream.rs b/tokio/src/io/util/reader_stream.rs new file mode 100644 index 00000000000..475d2bde959 --- /dev/null +++ b/tokio/src/io/util/reader_stream.rs @@ -0,0 +1,77 @@ +use crate::io::AsyncRead; +use crate::stream::Stream; +use bytes::{Bytes, BytesMut}; +use pin_project_lite::pin_project; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Convert async reader into stream of Result + /// + /// This type can be created using the [`reader_stream`](crate::io::reader_stream) function + #[derive(Debug)] + #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] + #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] + pub struct ReaderStream { + // reader itself + #[pin] + reader: R, + // Working buffer, used to optimize allocations. + // # Capacity behavior + // Initially `buf` is empty. Also it's getting smaller and smaller + // during polls (because it's chunks are returned to stream user). + // But when it's capacity reaches 0, it is growed. + buf: BytesMut, + } +} + +/// Convert `AsyncRead` into stream of byte chunks. +/// # Example +/// ``` +/// # #[tokio::main] +/// # async fn main() -> std::io::Result<()> { +/// use tokio::stream::StreamExt; +/// let data = b"hello, world!"; +/// let mut stream = tokio::io::reader_stream(data as &[u8]); +/// let mut stream_contents = bytes::BytesMut::new(); +/// while let Some(chunk) = stream.next().await { +/// stream_contents.extend_from_slice(chunk?.as_ref()); +/// } +/// assert_eq!(stream_contents.as_ref(), data); +/// # Ok(()) +/// # } +/// ``` +pub fn reader_stream(reader: R) -> ReaderStream +where + R: AsyncRead, +{ + ReaderStream { + reader, + buf: BytesMut::new(), + } +} + +const CAPACITY: usize = 4096; + +impl Stream for ReaderStream +where + R: AsyncRead, +{ + type Item = std::io::Result; + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + if this.buf.capacity() == 0 { + this.buf.reserve(CAPACITY); + } + // if we have something in our buf, let's return it + match this.reader.poll_read_buf(cx, &mut this.buf) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Ready(Ok(0)) => Poll::Ready(None), + Poll::Ready(Ok(n)) => { + let chunk = this.buf.split_to(n); + Poll::Ready(Some(Ok(chunk.freeze()))) + } + } + } +} From 1c27682294b06e6cc3675595c58abb972135ece8 Mon Sep 17 00:00:00 2001 From: Mikail Bagishov Date: Fri, 31 Jul 2020 23:03:56 +0300 Subject: [PATCH 2/7] improve --- tokio/src/io/util/reader_stream.rs | 35 +++++++++++++++++++++--------- 1 file changed, 25 insertions(+), 10 deletions(-) diff --git a/tokio/src/io/util/reader_stream.rs b/tokio/src/io/util/reader_stream.rs index 475d2bde959..1a9389dee98 100644 --- a/tokio/src/io/util/reader_stream.rs +++ b/tokio/src/io/util/reader_stream.rs @@ -6,7 +6,9 @@ use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { - /// Convert async reader into stream of Result + /// Convert an [`AsyncRead`](crate::io::AsyncRead) implementor into a + /// [`Stream`](crate::stream::Stream) of Result<[`Bytes`](bytes::Bytes), io::Error>. After first error it will + /// stop. /// /// This type can be created using the [`reader_stream`](crate::io::reader_stream) function #[derive(Debug)] @@ -22,22 +24,28 @@ pin_project! { // during polls (because it's chunks are returned to stream user). // But when it's capacity reaches 0, it is growed. buf: BytesMut, + // true if we had error reading from the `reader` in the past + had_error: bool, } } -/// Convert `AsyncRead` into stream of byte chunks. +/// Convert an [`AsyncRead`](crate::io::AsyncRead) implementor into a +/// [`Stream`](crate::stream::Stream) of Result<[`Bytes`](bytes::Bytes), io::Error>. +/// /// # Example +/// /// ``` /// # #[tokio::main] /// # async fn main() -> std::io::Result<()> { -/// use tokio::stream::StreamExt; -/// let data = b"hello, world!"; -/// let mut stream = tokio::io::reader_stream(data as &[u8]); -/// let mut stream_contents = bytes::BytesMut::new(); +/// use tokio::stream::StreamExt; +/// +/// let data: &[u8] = b"hello, world!"; +/// let mut stream = tokio::io::reader_stream(data); +/// let mut stream_contents = Vec::new(); /// while let Some(chunk) = stream.next().await { /// stream_contents.extend_from_slice(chunk?.as_ref()); /// } -/// assert_eq!(stream_contents.as_ref(), data); +/// assert_eq!(stream_contents, data); /// # Ok(()) /// # } /// ``` @@ -48,6 +56,7 @@ where ReaderStream { reader, buf: BytesMut::new(), + had_error: false, } } @@ -60,16 +69,22 @@ where type Item = std::io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); + if *this.had_error { + return Poll::Ready(None); + } if this.buf.capacity() == 0 { this.buf.reserve(CAPACITY); } // if we have something in our buf, let's return it match this.reader.poll_read_buf(cx, &mut this.buf) { Poll::Pending => Poll::Pending, - Poll::Ready(Err(err)) => Poll::Ready(Some(Err(err))), + Poll::Ready(Err(err)) => { + *this.had_error = true; + Poll::Ready(Some(Err(err))) + } Poll::Ready(Ok(0)) => Poll::Ready(None), - Poll::Ready(Ok(n)) => { - let chunk = this.buf.split_to(n); + Poll::Ready(Ok(_)) => { + let chunk = this.buf.split(); Poll::Ready(Some(Ok(chunk.freeze()))) } } From 15c9bdcf8ed49c1a42cff4d838023513fb977e69 Mon Sep 17 00:00:00 2001 From: Mikail Bagishov Date: Sun, 9 Aug 2020 16:05:24 +0300 Subject: [PATCH 3/7] store reader as option --- tokio/src/io/util/reader_stream.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/tokio/src/io/util/reader_stream.rs b/tokio/src/io/util/reader_stream.rs index 1a9389dee98..348909312cc 100644 --- a/tokio/src/io/util/reader_stream.rs +++ b/tokio/src/io/util/reader_stream.rs @@ -15,17 +15,16 @@ pin_project! { #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub struct ReaderStream { - // reader itself + // reader itself. + // None if we had error reading from the `reader` in the past. #[pin] - reader: R, + reader: Option, // Working buffer, used to optimize allocations. // # Capacity behavior // Initially `buf` is empty. Also it's getting smaller and smaller // during polls (because it's chunks are returned to stream user). // But when it's capacity reaches 0, it is growed. buf: BytesMut, - // true if we had error reading from the `reader` in the past - had_error: bool, } } @@ -54,9 +53,8 @@ where R: AsyncRead, { ReaderStream { - reader, + reader: Some(reader), buf: BytesMut::new(), - had_error: false, } } @@ -69,17 +67,17 @@ where type Item = std::io::Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); - if *this.had_error { - return Poll::Ready(None); - } + let reader = match this.reader.as_pin_mut() { + Some(r) => r, + None => return Poll::Ready(None), + }; if this.buf.capacity() == 0 { this.buf.reserve(CAPACITY); } - // if we have something in our buf, let's return it - match this.reader.poll_read_buf(cx, &mut this.buf) { + match reader.poll_read_buf(cx, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { - *this.had_error = true; + this.reader = unsafe { Pin::new_unchecked(&mut None) }; Poll::Ready(Some(Err(err))) } Poll::Ready(Ok(0)) => Poll::Ready(None), From 32b5575edc070791ff347adc4ec6986cdcbdf0c4 Mon Sep 17 00:00:00 2001 From: Mikail Bagishov Date: Sun, 9 Aug 2020 16:33:31 +0300 Subject: [PATCH 4/7] address review --- tokio/src/io/util/reader_stream.rs | 6 +-- tokio/tests/io_reader_stream.rs | 62 ++++++++++++++++++++++++++++++ 2 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 tokio/tests/io_reader_stream.rs diff --git a/tokio/src/io/util/reader_stream.rs b/tokio/src/io/util/reader_stream.rs index 348909312cc..a48e2c49528 100644 --- a/tokio/src/io/util/reader_stream.rs +++ b/tokio/src/io/util/reader_stream.rs @@ -65,8 +65,8 @@ where R: AsyncRead, { type Item = std::io::Result; - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.as_mut().project(); let reader = match this.reader.as_pin_mut() { Some(r) => r, None => return Poll::Ready(None), @@ -77,7 +77,7 @@ where match reader.poll_read_buf(cx, &mut this.buf) { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => { - this.reader = unsafe { Pin::new_unchecked(&mut None) }; + self.project().reader.set(None); Poll::Ready(Some(Err(err))) } Poll::Ready(Ok(0)) => Poll::Ready(None), diff --git a/tokio/tests/io_reader_stream.rs b/tokio/tests/io_reader_stream.rs new file mode 100644 index 00000000000..33e7435b687 --- /dev/null +++ b/tokio/tests/io_reader_stream.rs @@ -0,0 +1,62 @@ +#![warn(rust_2018_idioms)] +#![cfg(feature = "full")] + +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::AsyncRead; +use tokio::stream::StreamExt; + +/// produces at most `remaining` zeros, that returns error +struct Reader { + remaining: usize, +} + +impl AsyncRead for Reader { + fn poll_read( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let this = Pin::into_inner(self); + assert_ne!(buf.len(), 0); + if this.remaining > 0 { + let n = std::cmp::min(this.remaining, buf.len()); + for x in &mut buf[..n] { + *x = 0; + } + this.remaining -= n; + Poll::Ready(Ok(n)) + } else { + Poll::Ready(Err(std::io::Error::from_raw_os_error(22))) + } + } +} + +#[tokio::test] +async fn correct_behavior_on_errors() { + let reader = Reader { remaining: 100 }; + let mut stream = tokio::io::reader_stream(reader); + let mut zeros_received = 0; + let mut had_error = false; + loop { + let item = stream.next().await.unwrap(); + match item { + Ok(bytes) => { + let bytes = &*bytes; + for byte in bytes { + assert_eq!(*byte, 0); + zeros_received += 1; + } + } + Err(_) => { + assert!(!had_error); + had_error = true; + break; + } + } + } + + assert!(had_error); + assert_eq!(zeros_received, 100); + assert!(stream.next().await.is_none()); +} From f58b6b9dea2e9b63bc0d20c6f8df139712f078f9 Mon Sep 17 00:00:00 2001 From: Mikail Bagishov Date: Sun, 9 Aug 2020 16:37:43 +0300 Subject: [PATCH 5/7] docs --- tokio/src/io/util/reader_stream.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tokio/src/io/util/reader_stream.rs b/tokio/src/io/util/reader_stream.rs index a48e2c49528..3a11cbbcb5a 100644 --- a/tokio/src/io/util/reader_stream.rs +++ b/tokio/src/io/util/reader_stream.rs @@ -28,8 +28,8 @@ pin_project! { } } -/// Convert an [`AsyncRead`](crate::io::AsyncRead) implementor into a -/// [`Stream`](crate::stream::Stream) of Result<[`Bytes`](bytes::Bytes), io::Error>. +/// Convert an [`AsyncRead`] implementor into a +/// [`Stream`] of Result<[`Bytes`], std::io::Error>. /// /// # Example /// @@ -47,7 +47,11 @@ pin_project! { /// assert_eq!(stream_contents, data); /// # Ok(()) /// # } +/// /// ``` +/// [`AsyncRead`]: crate::io::AsyncRead +/// [`Stream`]: crate::stream::Stream +/// [`Bytes`]: bytes::Bytes pub fn reader_stream(reader: R) -> ReaderStream where R: AsyncRead, From 3e71c3af80ce1b1479375a8db9adf5f91040ef96 Mon Sep 17 00:00:00 2001 From: Mikail Bagishov Date: Sun, 9 Aug 2020 19:36:03 +0300 Subject: [PATCH 6/7] address review --- tokio/src/io/util/reader_stream.rs | 27 +++++++++++++++++++-------- tokio/tests/io_reader_stream.rs | 8 +++++--- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/tokio/src/io/util/reader_stream.rs b/tokio/src/io/util/reader_stream.rs index 3a11cbbcb5a..879f1c233f3 100644 --- a/tokio/src/io/util/reader_stream.rs +++ b/tokio/src/io/util/reader_stream.rs @@ -6,23 +6,31 @@ use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { - /// Convert an [`AsyncRead`](crate::io::AsyncRead) implementor into a - /// [`Stream`](crate::stream::Stream) of Result<[`Bytes`](bytes::Bytes), io::Error>. After first error it will - /// stop. + /// Convert an [`AsyncRead`] implementor into a + /// [`Stream`] of Result<[`Bytes`], std::io::Error>. + /// After first error it will stop. + /// Additionally, this stream is fused: after it returns None at some + /// moment, it is guaranteed that further `next()`, `poll_next()` and + /// similar functions will instantly return None. /// - /// This type can be created using the [`reader_stream`](crate::io::reader_stream) function + /// This type can be created using the [`reader_stream`] function + /// + /// [`AsyncRead`]: crate::io::AsyncRead + /// [`Stream`]: crate::stream::Stream + /// [`Bytes`]: bytes::Bytes + /// [`reader_stream`]: crate::io::reader_stream #[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))] pub struct ReaderStream { - // reader itself. + // Reader itself. // None if we had error reading from the `reader` in the past. #[pin] reader: Option, // Working buffer, used to optimize allocations. // # Capacity behavior // Initially `buf` is empty. Also it's getting smaller and smaller - // during polls (because it's chunks are returned to stream user). + // during polls (because its chunks are returned to stream user). // But when it's capacity reaches 0, it is growed. buf: BytesMut, } @@ -47,8 +55,8 @@ pin_project! { /// assert_eq!(stream_contents, data); /// # Ok(()) /// # } -/// /// ``` +/// /// [`AsyncRead`]: crate::io::AsyncRead /// [`Stream`]: crate::stream::Stream /// [`Bytes`]: bytes::Bytes @@ -84,7 +92,10 @@ where self.project().reader.set(None); Poll::Ready(Some(Err(err))) } - Poll::Ready(Ok(0)) => Poll::Ready(None), + Poll::Ready(Ok(0)) => { + self.project().reader.set(None); + Poll::Ready(None) + }, Poll::Ready(Ok(_)) => { let chunk = this.buf.split(); Poll::Ready(Some(Ok(chunk.freeze()))) diff --git a/tokio/tests/io_reader_stream.rs b/tokio/tests/io_reader_stream.rs index 33e7435b687..6546a0ef4da 100644 --- a/tokio/tests/io_reader_stream.rs +++ b/tokio/tests/io_reader_stream.rs @@ -6,7 +6,8 @@ use std::task::{Context, Poll}; use tokio::io::AsyncRead; use tokio::stream::StreamExt; -/// produces at most `remaining` zeros, that returns error +/// produces at most `remaining` zeros, that returns error. +/// each time it reads at most 31 byte. struct Reader { remaining: usize, } @@ -21,6 +22,7 @@ impl AsyncRead for Reader { assert_ne!(buf.len(), 0); if this.remaining > 0 { let n = std::cmp::min(this.remaining, buf.len()); + let n = std::cmp::min(n, 31); for x in &mut buf[..n] { *x = 0; } @@ -34,7 +36,7 @@ impl AsyncRead for Reader { #[tokio::test] async fn correct_behavior_on_errors() { - let reader = Reader { remaining: 100 }; + let reader = Reader { remaining: 8000 }; let mut stream = tokio::io::reader_stream(reader); let mut zeros_received = 0; let mut had_error = false; @@ -57,6 +59,6 @@ async fn correct_behavior_on_errors() { } assert!(had_error); - assert_eq!(zeros_received, 100); + assert_eq!(zeros_received, 8000); assert!(stream.next().await.is_none()); } From 6e6d3bc93c72c12b3235d04c182b25514ed42496 Mon Sep 17 00:00:00 2001 From: Mikail Bagishov Date: Sun, 9 Aug 2020 20:34:25 +0300 Subject: [PATCH 7/7] fmt --- tokio/src/io/util/reader_stream.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/io/util/reader_stream.rs b/tokio/src/io/util/reader_stream.rs index 879f1c233f3..51651cede4d 100644 --- a/tokio/src/io/util/reader_stream.rs +++ b/tokio/src/io/util/reader_stream.rs @@ -95,7 +95,7 @@ where Poll::Ready(Ok(0)) => { self.project().reader.set(None); Poll::Ready(None) - }, + } Poll::Ready(Ok(_)) => { let chunk = this.buf.split(); Poll::Ready(Some(Ok(chunk.freeze())))