From 75dc819b2f5984e9a600da088213d5b1645af6e5 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 27 Sep 2019 13:26:06 +0200 Subject: [PATCH 1/7] feat(io): implement Read::take --- src/io/read/mod.rs | 42 +++++++++ src/io/read/take.rs | 214 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 256 insertions(+) create mode 100644 src/io/read/take.rs diff --git a/src/io/read/mod.rs b/src/io/read/mod.rs index 5a2276e32..53fc94bd9 100644 --- a/src/io/read/mod.rs +++ b/src/io/read/mod.rs @@ -3,6 +3,7 @@ mod read_exact; mod read_to_end; mod read_to_string; mod read_vectored; +mod take; use read::ReadFuture; use read_exact::ReadExactFuture; @@ -261,6 +262,47 @@ extension_trait! { { ReadExactFuture { reader: self, buf } } + + #[doc = r#" + Creates an adaptor which will read at most `limit` bytes from it. + + This function returns a new instance of `Read` which will read at most + `limit` bytes, after which it will always return EOF ([`Ok(0)`]). Any + read errors will not count towards the number of bytes read and future + calls to [`read()`] may succeed. + + # Examples + + [`File`]s implement `Read`: + + [`File`]: ../fs/struct.File.html + [`Ok(0)`]: ../../std/result/enum.Result.html#variant.Ok + [`read()`]: tymethod.read + + ```no_run + use async_std::io::prelude::*; + use async_std::fs::File; + + fn main() -> std::io::Result<()> { + async_std::task::block_on(async { + let f = File::open("foo.txt").await?; + let mut buffer = [0; 5]; + + // read at most five bytes + let mut handle = f.take(5); + + handle.read(&mut buffer).await?; + Ok(()) + }) + } + ``` + "#] + fn take(self, limit: u64) -> take::Take + where + Self: Sized, + { + take::Take { inner: self, limit } + } } impl Read for Box { diff --git a/src/io/read/take.rs b/src/io/read/take.rs new file mode 100644 index 000000000..1dfe61a2d --- /dev/null +++ b/src/io/read/take.rs @@ -0,0 +1,214 @@ +use std::cmp; +use std::pin::Pin; + +use crate::io::{self, Read}; +use crate::task::{Context, Poll}; + +/// Reader adaptor which limits the bytes read from an underlying reader. +/// +/// This struct is generally created by calling [`take`] on a reader. +/// Please see the documentation of [`take`] for more details. +/// +/// [`take`]: trait.Read.html#method.take +#[derive(Debug)] +pub struct Take { + pub(crate) inner: T, + pub(crate) limit: u64, +} + +impl Take { + /// Returns the number of bytes that can be read before this instance will + /// return EOF. + /// + /// # Note + /// + /// This instance may reach `EOF` after reading fewer bytes than indicated by + /// this method if the underlying [`Read`] instance reaches EOF. + /// + /// [`Read`]: trait.Read.html + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// fn main() -> io::Result<()> { async_std::task::block_on(async { + /// let f = File::open("foo.txt").await?; + /// + /// // read at most five bytes + /// let handle = f.take(5); + /// + /// println!("limit: {}", handle.limit()); + /// Ok(()) + /// }) } + /// ``` + pub fn limit(&self) -> u64 { + self.limit + } + + /// Sets the number of bytes that can be read before this instance will + /// return EOF. This is the same as constructing a new `Take` instance, so + /// the amount of bytes read and the previous limit value don't matter when + /// calling this method. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// fn main() -> io::Result<()> { async_std::task::block_on(async { + /// let f = File::open("foo.txt").await?; + /// + /// // read at most five bytes + /// let mut handle = f.take(5); + /// handle.set_limit(10); + /// + /// assert_eq!(handle.limit(), 10); + /// Ok(()) + /// }) } + /// ``` + pub fn set_limit(&mut self, limit: u64) { + self.limit = limit; + } + + /// Consumes the `Take`, returning the wrapped reader. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// fn main() -> io::Result<()> { async_std::task::block_on(async { + /// let file = File::open("foo.txt").await?; + /// + /// let mut buffer = [0; 5]; + /// let mut handle = file.take(5); + /// handle.read(&mut buffer).await?; + /// + /// let file = handle.into_inner(); + /// Ok(()) + /// }) } + /// ``` + pub fn into_inner(self) -> T { + self.inner + } + + /// Gets a reference to the underlying reader. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// fn main() -> io::Result<()> { async_std::task::block_on(async { + /// let file = File::open("foo.txt").await?; + /// + /// let mut buffer = [0; 5]; + /// let mut handle = file.take(5); + /// handle.read(&mut buffer).await?; + /// + /// let file = handle.get_ref(); + /// Ok(()) + /// }) } + /// ``` + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Gets a mutable reference to the underlying reader. + /// + /// Care should be taken to avoid modifying the internal I/O state of the + /// underlying reader as doing so may corrupt the internal limit of this + /// `Take`. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// fn main() -> io::Result<()> { async_std::task::block_on(async { + /// let file = File::open("foo.txt").await?; + /// + /// let mut buffer = [0; 5]; + /// let mut handle = file.take(5); + /// handle.read(&mut buffer).await?; + /// + /// let file = handle.get_mut(); + /// Ok(()) + /// }) } + /// ``` + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } +} + +impl Read for Take { + /// Attempt to read from the `AsyncRead` into `buf`. + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + let Self { inner, limit } = &mut *self; + take_read_internal(Pin::new(inner), cx, buf, limit) + } +} + +pub fn take_read_internal( + mut rd: Pin<&mut R>, + cx: &mut Context<'_>, + buf: &mut [u8], + limit: &mut u64, +) -> Poll> { + // Don't call into inner reader at all at EOF because it may still block + if *limit == 0 { + return Poll::Ready(Ok(0)); + } + + let max = cmp::min(buf.len() as u64, *limit) as usize; + + match futures_core::ready!(rd.as_mut().poll_read(cx, &mut buf[..max])) { + Ok(n) => { + *limit -= n as u64; + Poll::Ready(Ok(n)) + } + Err(e) => Poll::Ready(Err(e)), + } +} + +#[cfg(test)] +mod tests { + use crate::io; + use crate::prelude::*; + use crate::task; + + #[test] + fn test_take_basics() -> std::io::Result<()> { + let source: io::Cursor> = io::Cursor::new(vec![0, 1, 2, 3, 4, 5, 6, 7, 8]); + + task::block_on(async move { + let mut buffer = [0u8; 5]; + + // read at most five bytes + let mut handle = source.take(5); + + handle.read(&mut buffer).await?; + assert_eq!(buffer, [0, 1, 2, 3, 4]); + + // check that the we are actually at the end + assert_eq!(handle.read(&mut buffer).await.unwrap(), 0); + + Ok(()) + }) + } +} From f751ebb8c4992a803c8a7f72c1ea347a8556a2fb Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 27 Sep 2019 13:37:14 +0200 Subject: [PATCH 2/7] feat(io): implement Read::by_ref --- src/io/read/mod.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/src/io/read/mod.rs b/src/io/read/mod.rs index 53fc94bd9..59e9bfba3 100644 --- a/src/io/read/mod.rs +++ b/src/io/read/mod.rs @@ -303,6 +303,44 @@ extension_trait! { { take::Take { inner: self, limit } } + + #[doc = r#" + Creates a "by reference" adaptor for this instance of `Read`. + + The returned adaptor also implements `Read` and will simply borrow this + current reader. + + # Examples + + [`File`][file]s implement `Read`: + + [file]: ../fs/struct.File.html + + ```no_run + use async_std::io; + use async_std::prelude::*; + use async_std::fs::File; + + fn main() -> io::Result<()> { async_std::task::block_on(async { + let mut f = File::open("foo.txt").await?; + let mut buffer = Vec::new(); + let mut other_buffer = Vec::new(); + + { + let reference = f.by_ref(); + + // read at most 5 bytes + reference.take(5).read_to_end(&mut buffer).await?; + + } // drop our &mut reference so we can use f again + + // original file still usable, read the rest + f.read_to_end(&mut other_buffer).await?; + Ok(()) + }) } + ``` + "#] + fn by_ref(&mut self) -> &mut Self where Self: Sized { self } } impl Read for Box { @@ -349,3 +387,31 @@ extension_trait! { } } } + +#[cfg(test)] +mod tests { + use crate::io; + use crate::prelude::*; + + #[test] + fn test_read_by_ref() -> io::Result<()> { + crate::task::block_on(async { + let mut f = io::Cursor::new(vec![0u8, 1, 2, 3, 4, 5, 6, 7, 8]); + let mut buffer = Vec::new(); + let mut other_buffer = Vec::new(); + + { + let reference = f.by_ref(); + + // read at most 5 bytes + assert_eq!(reference.take(5).read_to_end(&mut buffer).await?, 5); + assert_eq!(&buffer, &[0, 1, 2, 3, 4]) + } // drop our &mut reference so we can use f again + + // original file still usable, read the rest + assert_eq!(f.read_to_end(&mut other_buffer).await?, 4); + assert_eq!(&other_buffer, &[5, 6, 7, 8]); + Ok(()) + }) + } +} From e681e297ef49b9bdf16d41bf931bdf23186a839d Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 27 Sep 2019 14:58:14 +0200 Subject: [PATCH 3/7] feat(io): implement Read::bytes --- src/io/read/bytes.rs | 60 ++++++++++++++++++++++++++++++++++++++++++++ src/io/read/mod.rs | 36 ++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 src/io/read/bytes.rs diff --git a/src/io/read/bytes.rs b/src/io/read/bytes.rs new file mode 100644 index 000000000..a45ee1650 --- /dev/null +++ b/src/io/read/bytes.rs @@ -0,0 +1,60 @@ +use std::pin::Pin; + +use crate::io::{self, Read}; +use crate::stream::stream::Stream; +use crate::task::{Context, Poll}; + +/// An iterator over `u8` values of a reader. +/// +/// This struct is generally created by calling [`bytes`] on a reader. +/// Please see the documentation of [`bytes`] for more details. +/// +/// [`bytes`]: trait.Read.html#method.bytes +#[derive(Debug)] +pub struct Bytes { + pub(crate) inner: T, +} + +impl Stream for Bytes { + type Item = io::Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut byte = 0; + + let rd = Pin::new(&mut self.inner); + + match futures_core::ready!(rd.poll_read(cx, std::slice::from_mut(&mut byte))) { + Ok(0) => Poll::Ready(None), + Ok(..) => Poll::Ready(Some(Ok(byte))), + Err(ref e) if e.kind() == io::ErrorKind::Interrupted => Poll::Pending, + Err(e) => Poll::Ready(Some(Err(e))), + } + } +} + +#[cfg(test)] +mod tests { + use crate::io; + use crate::prelude::*; + use crate::task; + + #[test] + fn test_bytes_basics() -> std::io::Result<()> { + task::block_on(async move { + let raw: Vec = vec![0, 1, 2, 3, 4, 5, 6, 7, 8]; + let source: io::Cursor> = io::Cursor::new(raw.clone()); + + let mut s = source.bytes(); + + // TODO(@dignifiedquire): Use collect, once it is stable. + let mut result = Vec::new(); + while let Some(byte) = s.next().await { + result.push(byte?); + } + + assert_eq!(result, raw); + + Ok(()) + }) + } +} diff --git a/src/io/read/mod.rs b/src/io/read/mod.rs index 59e9bfba3..c6b5bad06 100644 --- a/src/io/read/mod.rs +++ b/src/io/read/mod.rs @@ -1,3 +1,4 @@ +mod bytes; mod read; mod read_exact; mod read_to_end; @@ -341,6 +342,41 @@ extension_trait! { ``` "#] fn by_ref(&mut self) -> &mut Self where Self: Sized { self } + + + #[doc=r#" + Transforms this `Read` instance to a `Stream` over its bytes. + + The returned type implements `Stream` where the `Item` is + `Result`. + The yielded item is `Ok` if a byte was successfully read and `Err` + otherwise. EOF is mapped to returning `None` from this iterator. + + # Examples + + [`File`][file]s implement `Read`: + + [file]: ../fs/struct.File.html + + ```no_run + use async_std::io; + use async_std::prelude::*; + use async_std::fs::File; + + fn main() -> io::Result<()> { async_std::task::block_on(async { + let f = File::open("foo.txt").await?; + let mut s = f.bytes(); + + while let Some(byte) = s.next().await { + println!("{}", byte.unwrap()); + } + Ok(()) + }) } + ``` + "#] + fn bytes(self) -> bytes::Bytes where Self: Sized { + bytes::Bytes { inner: self } + } } impl Read for Box { From d9aec105a1a5b3b18463bab32cf36d2fcd57a00a Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 27 Sep 2019 16:30:38 +0200 Subject: [PATCH 4/7] feat(io): implement Read::chain --- src/io/read/chain.rs | 199 +++++++++++++++++++++++++++++++++++++++++++ src/io/read/mod.rs | 40 ++++++++- 2 files changed, 238 insertions(+), 1 deletion(-) create mode 100644 src/io/read/chain.rs diff --git a/src/io/read/chain.rs b/src/io/read/chain.rs new file mode 100644 index 000000000..c05a43cc0 --- /dev/null +++ b/src/io/read/chain.rs @@ -0,0 +1,199 @@ +use crate::io::IoSliceMut; +use std::fmt; +use std::pin::Pin; + +use crate::io::{self, BufRead, Read}; +use crate::task::{Context, Poll}; + +/// Adaptor to chain together two readers. +/// +/// This struct is generally created by calling [`chain`] on a reader. +/// Please see the documentation of [`chain`] for more details. +/// +/// [`chain`]: trait.Read.html#method.chain +pub struct Chain { + pub(crate) first: T, + pub(crate) second: U, + pub(crate) done_first: bool, +} + +impl Chain { + /// Consumes the `Chain`, returning the wrapped readers. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// fn main() -> io::Result<()> { async_std::task::block_on(async { + /// let foo_file = File::open("foo.txt").await?; + /// let bar_file = File::open("bar.txt").await?; + /// + /// let chain = foo_file.chain(bar_file); + /// let (foo_file, bar_file) = chain.into_inner(); + /// Ok(()) + /// }) } + /// ``` + pub fn into_inner(self) -> (T, U) { + (self.first, self.second) + } + + /// Gets references to the underlying readers in this `Chain`. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// fn main() -> io::Result<()> { async_std::task::block_on(async { + /// let foo_file = File::open("foo.txt").await?; + /// let bar_file = File::open("bar.txt").await?; + /// + /// let chain = foo_file.chain(bar_file); + /// let (foo_file, bar_file) = chain.get_ref(); + /// Ok(()) + /// }) } + /// ``` + pub fn get_ref(&self) -> (&T, &U) { + (&self.first, &self.second) + } + + /// Gets mutable references to the underlying readers in this `Chain`. + /// + /// Care should be taken to avoid modifying the internal I/O state of the + /// underlying readers as doing so may corrupt the internal state of this + /// `Chain`. + /// + /// # Examples + /// + /// ```no_run + /// use async_std::io; + /// use async_std::prelude::*; + /// use async_std::fs::File; + /// + /// fn main() -> io::Result<()> { async_std::task::block_on(async { + /// let foo_file = File::open("foo.txt").await?; + /// let bar_file = File::open("bar.txt").await?; + /// + /// let mut chain = foo_file.chain(bar_file); + /// let (foo_file, bar_file) = chain.get_mut(); + /// Ok(()) + /// }) } + /// ``` + pub fn get_mut(&mut self) -> (&mut T, &mut U) { + (&mut self.first, &mut self.second) + } +} + +impl fmt::Debug for Chain { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Chain") + .field("t", &self.first) + .field("u", &self.second) + .finish() + } +} + +impl Read for Chain { + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + if !self.done_first { + let rd = Pin::new(&mut self.first); + + match futures_core::ready!(rd.poll_read(cx, buf)) { + Ok(0) if !buf.is_empty() => self.done_first = true, + Ok(n) => return Poll::Ready(Ok(n)), + Err(err) => return Poll::Ready(Err(err)), + } + } + + let rd = Pin::new(&mut self.second); + rd.poll_read(cx, buf) + } + + fn poll_read_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + if !self.done_first { + let rd = Pin::new(&mut self.first); + + match futures_core::ready!(rd.poll_read_vectored(cx, bufs)) { + Ok(0) if !bufs.is_empty() => self.done_first = true, + Ok(n) => return Poll::Ready(Ok(n)), + Err(err) => return Poll::Ready(Err(err)), + } + } + + let rd = Pin::new(&mut self.second); + rd.poll_read_vectored(cx, bufs) + } +} + +impl BufRead for Chain { + fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + // FIXME: how to make this compile? + + // let Self { + // first, + // second, + // done_first + // } = &mut *self; + + // if !*done_first { + // let rd = Pin::new(first); + + // match futures_core::ready!(rd.poll_fill_buf(cx)) { + // Ok(buf) if buf.is_empty() => { *done_first = true; } + // Ok(buf) => return Poll::Ready(Ok(buf)), + // Err(err) => return Poll::Ready(Err(err)), + // } + // } + + // let rd = Pin::new(second); + // rd.poll_fill_buf(cx) + unimplemented!() + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + if !self.done_first { + let rd = Pin::new(&mut self.first); + rd.consume(amt) + } else { + let rd = Pin::new(&mut self.second); + rd.consume(amt) + } + } +} + +#[cfg(test)] +mod tests { + use crate::io; + use crate::prelude::*; + use crate::task; + + #[test] + fn test_chain_basics() -> std::io::Result<()> { + let source1: io::Cursor> = io::Cursor::new(vec![0, 1, 2]); + let source2: io::Cursor> = io::Cursor::new(vec![3, 4, 5]); + + task::block_on(async move { + let mut buffer = Vec::new(); + + let mut source = source1.chain(source2); + + assert_eq!(6, source.read_to_end(&mut buffer).await?); + assert_eq!(buffer, vec![0, 1, 2, 3, 4, 5]); + + Ok(()) + }) + } +} diff --git a/src/io/read/mod.rs b/src/io/read/mod.rs index c6b5bad06..6b8ad71c7 100644 --- a/src/io/read/mod.rs +++ b/src/io/read/mod.rs @@ -1,4 +1,5 @@ mod bytes; +mod chain; mod read; mod read_exact; mod read_to_end; @@ -344,7 +345,7 @@ extension_trait! { fn by_ref(&mut self) -> &mut Self where Self: Sized { self } - #[doc=r#" + #[doc = r#" Transforms this `Read` instance to a `Stream` over its bytes. The returned type implements `Stream` where the `Item` is @@ -377,6 +378,43 @@ extension_trait! { fn bytes(self) -> bytes::Bytes where Self: Sized { bytes::Bytes { inner: self } } + + #[doc = r#" + Creates an adaptor which will chain this stream with another. + + The returned `Read` instance will first read all bytes from this object + until EOF is encountered. Afterwards the output is equivalent to the + output of `next`. + + # Examples + + [`File`][file]s implement `Read`: + + [file]: ../fs/struct.File.html + + ```no_run + use async_std::io; + use async_std::prelude::*; + use async_std::fs::File; + + fn main() -> io::Result<()> { async_std::task::block_on(async { + let f1 = File::open("foo.txt").await?; + let f2 = File::open("bar.txt").await?; + + let mut handle = f1.chain(f2); + let mut buffer = String::new(); + + // read the value into a String. We could use any Read method here, + // this is just one example. + handle.read_to_string(&mut buffer).await?; + Ok(()) + }) } + ``` + "#] + fn chain(self, next: R) -> chain::Chain where Self: Sized { + chain::Chain { first: self, second: next, done_first: false } + } + } impl Read for Box { From dc6c8fb1318677e0b2ab70e211f6c4fc14a77be2 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 27 Sep 2019 16:36:55 +0200 Subject: [PATCH 5/7] feat(io): add stub for BufRead for Take --- src/io/read/take.rs | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/src/io/read/take.rs b/src/io/read/take.rs index 1dfe61a2d..cd7e40dd9 100644 --- a/src/io/read/take.rs +++ b/src/io/read/take.rs @@ -1,7 +1,7 @@ use std::cmp; use std::pin::Pin; -use crate::io::{self, Read}; +use crate::io::{self, BufRead, Read}; use crate::task::{Context, Poll}; /// Reader adaptor which limits the bytes read from an underlying reader. @@ -186,6 +186,41 @@ pub fn take_read_internal( } } +impl BufRead for Take { + fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + // FIXME: how to get this to compile? + unimplemented!(); + + // let Self { + // inner, + // limit, + // } = &mut *self; + + // if *limit == 0 { + // return Poll::Ready(Ok(&[])); + // } + + // let rd = Pin::new(inner); + + // match futures_core::ready!(rd.poll_fill_buf(cx)) { + // Ok(buf) => { + // let cap = cmp::min(buf.len() as u64, *limit) as usize; + // Poll::Ready(Ok(&buf[..cap])) + // } + // Err(e) => Poll::Ready(Err(e)), + // } + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + // Don't let callers reset the limit by passing an overlarge value + let amt = cmp::min(amt as u64, self.limit) as usize; + self.limit -= amt as u64; + + let rd = Pin::new(&mut self.inner); + rd.consume(amt); + } +} + #[cfg(test)] mod tests { use crate::io; From a1aa3f823d68d8df0be8a767ac54ec6272ba0ad0 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 27 Sep 2019 18:59:30 +0200 Subject: [PATCH 6/7] finish BufRead --- src/io/read/chain.rs | 42 ++++++++++++++++++++---------------------- src/io/read/take.rs | 37 +++++++++++++++---------------------- 2 files changed, 35 insertions(+), 44 deletions(-) diff --git a/src/io/read/chain.rs b/src/io/read/chain.rs index c05a43cc0..e29b9bcb3 100644 --- a/src/io/read/chain.rs +++ b/src/io/read/chain.rs @@ -139,28 +139,26 @@ impl Read for Chain { } impl BufRead for Chain { - fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - // FIXME: how to make this compile? - - // let Self { - // first, - // second, - // done_first - // } = &mut *self; - - // if !*done_first { - // let rd = Pin::new(first); - - // match futures_core::ready!(rd.poll_fill_buf(cx)) { - // Ok(buf) if buf.is_empty() => { *done_first = true; } - // Ok(buf) => return Poll::Ready(Ok(buf)), - // Err(err) => return Poll::Ready(Err(err)), - // } - // } - - // let rd = Pin::new(second); - // rd.poll_fill_buf(cx) - unimplemented!() + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { + first, + second, + done_first, + } = unsafe { self.get_unchecked_mut() }; + + if !*done_first { + let first = unsafe { Pin::new_unchecked(first) }; + match futures_core::ready!(first.poll_fill_buf(cx)) { + Ok(buf) if buf.is_empty() => { + *done_first = true; + } + Ok(buf) => return Poll::Ready(Ok(buf)), + Err(err) => return Poll::Ready(Err(err)), + } + } + + let second = unsafe { Pin::new_unchecked(second) }; + second.poll_fill_buf(cx) } fn consume(mut self: Pin<&mut Self>, amt: usize) { diff --git a/src/io/read/take.rs b/src/io/read/take.rs index cd7e40dd9..b63f76d4a 100644 --- a/src/io/read/take.rs +++ b/src/io/read/take.rs @@ -187,28 +187,21 @@ pub fn take_read_internal( } impl BufRead for Take { - fn poll_fill_buf(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - // FIXME: how to get this to compile? - unimplemented!(); - - // let Self { - // inner, - // limit, - // } = &mut *self; - - // if *limit == 0 { - // return Poll::Ready(Ok(&[])); - // } - - // let rd = Pin::new(inner); - - // match futures_core::ready!(rd.poll_fill_buf(cx)) { - // Ok(buf) => { - // let cap = cmp::min(buf.len() as u64, *limit) as usize; - // Poll::Ready(Ok(&buf[..cap])) - // } - // Err(e) => Poll::Ready(Err(e)), - // } + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let Self { inner, limit } = unsafe { self.get_unchecked_mut() }; + let inner = unsafe { Pin::new_unchecked(inner) }; + + if *limit == 0 { + return Poll::Ready(Ok(&[])); + } + + match futures_core::ready!(inner.poll_fill_buf(cx)) { + Ok(buf) => { + let cap = cmp::min(buf.len() as u64, *limit) as usize; + Poll::Ready(Ok(&buf[..cap])) + } + Err(e) => Poll::Ready(Err(e)), + } } fn consume(mut self: Pin<&mut Self>, amt: usize) { From 064b44f695af7abf3e8af41d36d7757b015bb6c2 Mon Sep 17 00:00:00 2001 From: dignifiedquire Date: Fri, 27 Sep 2019 18:49:23 -0600 Subject: [PATCH 7/7] apply cr --- src/io/read/bytes.rs | 2 +- src/io/read/chain.rs | 48 ++++++++++++------------ src/io/read/mod.rs | 87 ++++++++++++++++++++++--------------------- src/io/read/take.rs | 88 ++++++++++++++++++++++---------------------- 4 files changed, 112 insertions(+), 113 deletions(-) diff --git a/src/io/read/bytes.rs b/src/io/read/bytes.rs index a45ee1650..422452433 100644 --- a/src/io/read/bytes.rs +++ b/src/io/read/bytes.rs @@ -4,7 +4,7 @@ use crate::io::{self, Read}; use crate::stream::stream::Stream; use crate::task::{Context, Poll}; -/// An iterator over `u8` values of a reader. +/// A stream over `u8` values of a reader. /// /// This struct is generally created by calling [`bytes`] on a reader. /// Please see the documentation of [`bytes`] for more details. diff --git a/src/io/read/chain.rs b/src/io/read/chain.rs index e29b9bcb3..09517ccad 100644 --- a/src/io/read/chain.rs +++ b/src/io/read/chain.rs @@ -23,18 +23,18 @@ impl Chain { /// # Examples /// /// ```no_run - /// use async_std::io; + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # /// use async_std::prelude::*; /// use async_std::fs::File; /// - /// fn main() -> io::Result<()> { async_std::task::block_on(async { - /// let foo_file = File::open("foo.txt").await?; - /// let bar_file = File::open("bar.txt").await?; + /// let foo_file = File::open("foo.txt").await?; + /// let bar_file = File::open("bar.txt").await?; /// - /// let chain = foo_file.chain(bar_file); - /// let (foo_file, bar_file) = chain.into_inner(); - /// Ok(()) - /// }) } + /// let chain = foo_file.chain(bar_file); + /// let (foo_file, bar_file) = chain.into_inner(); + /// # + /// # Ok(()) }) } /// ``` pub fn into_inner(self) -> (T, U) { (self.first, self.second) @@ -45,18 +45,18 @@ impl Chain { /// # Examples /// /// ```no_run - /// use async_std::io; + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # /// use async_std::prelude::*; /// use async_std::fs::File; /// - /// fn main() -> io::Result<()> { async_std::task::block_on(async { - /// let foo_file = File::open("foo.txt").await?; - /// let bar_file = File::open("bar.txt").await?; + /// let foo_file = File::open("foo.txt").await?; + /// let bar_file = File::open("bar.txt").await?; /// - /// let chain = foo_file.chain(bar_file); - /// let (foo_file, bar_file) = chain.get_ref(); - /// Ok(()) - /// }) } + /// let chain = foo_file.chain(bar_file); + /// let (foo_file, bar_file) = chain.get_ref(); + /// # + /// # Ok(()) }) } /// ``` pub fn get_ref(&self) -> (&T, &U) { (&self.first, &self.second) @@ -71,18 +71,18 @@ impl Chain { /// # Examples /// /// ```no_run - /// use async_std::io; + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # /// use async_std::prelude::*; /// use async_std::fs::File; /// - /// fn main() -> io::Result<()> { async_std::task::block_on(async { - /// let foo_file = File::open("foo.txt").await?; - /// let bar_file = File::open("bar.txt").await?; + /// let foo_file = File::open("foo.txt").await?; + /// let bar_file = File::open("bar.txt").await?; /// - /// let mut chain = foo_file.chain(bar_file); - /// let (foo_file, bar_file) = chain.get_mut(); - /// Ok(()) - /// }) } + /// let mut chain = foo_file.chain(bar_file); + /// let (foo_file, bar_file) = chain.get_mut(); + /// # + /// # Ok(()) }) } /// ``` pub fn get_mut(&mut self) -> (&mut T, &mut U) { (&mut self.first, &mut self.second) diff --git a/src/io/read/mod.rs b/src/io/read/mod.rs index 6b8ad71c7..9c81f9599 100644 --- a/src/io/read/mod.rs +++ b/src/io/read/mod.rs @@ -282,21 +282,20 @@ extension_trait! { [`read()`]: tymethod.read ```no_run + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # use async_std::io::prelude::*; use async_std::fs::File; - fn main() -> std::io::Result<()> { - async_std::task::block_on(async { - let f = File::open("foo.txt").await?; - let mut buffer = [0; 5]; + let f = File::open("foo.txt").await?; + let mut buffer = [0; 5]; - // read at most five bytes - let mut handle = f.take(5); + // read at most five bytes + let mut handle = f.take(5); - handle.read(&mut buffer).await?; - Ok(()) - }) - } + handle.read(&mut buffer).await?; + # + # Ok(()) }) } ``` "#] fn take(self, limit: u64) -> take::Take @@ -319,27 +318,27 @@ extension_trait! { [file]: ../fs/struct.File.html ```no_run - use async_std::io; + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # use async_std::prelude::*; use async_std::fs::File; - fn main() -> io::Result<()> { async_std::task::block_on(async { - let mut f = File::open("foo.txt").await?; - let mut buffer = Vec::new(); - let mut other_buffer = Vec::new(); + let mut f = File::open("foo.txt").await?; + let mut buffer = Vec::new(); + let mut other_buffer = Vec::new(); - { - let reference = f.by_ref(); + { + let reference = f.by_ref(); - // read at most 5 bytes - reference.take(5).read_to_end(&mut buffer).await?; + // read at most 5 bytes + reference.take(5).read_to_end(&mut buffer).await?; - } // drop our &mut reference so we can use f again + } // drop our &mut reference so we can use f again - // original file still usable, read the rest - f.read_to_end(&mut other_buffer).await?; - Ok(()) - }) } + // original file still usable, read the rest + f.read_to_end(&mut other_buffer).await?; + # + # Ok(()) }) } ``` "#] fn by_ref(&mut self) -> &mut Self where Self: Sized { self } @@ -360,19 +359,19 @@ extension_trait! { [file]: ../fs/struct.File.html ```no_run - use async_std::io; + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # use async_std::prelude::*; use async_std::fs::File; - fn main() -> io::Result<()> { async_std::task::block_on(async { - let f = File::open("foo.txt").await?; - let mut s = f.bytes(); + let f = File::open("foo.txt").await?; + let mut s = f.bytes(); - while let Some(byte) = s.next().await { - println!("{}", byte.unwrap()); - } - Ok(()) - }) } + while let Some(byte) = s.next().await { + println!("{}", byte.unwrap()); + } + # + # Ok(()) }) } ``` "#] fn bytes(self) -> bytes::Bytes where Self: Sized { @@ -393,22 +392,22 @@ extension_trait! { [file]: ../fs/struct.File.html ```no_run - use async_std::io; + # fn main() -> std::io::Result<()> { async_std::task::block_on(async { + # use async_std::prelude::*; use async_std::fs::File; - fn main() -> io::Result<()> { async_std::task::block_on(async { - let f1 = File::open("foo.txt").await?; - let f2 = File::open("bar.txt").await?; + let f1 = File::open("foo.txt").await?; + let f2 = File::open("bar.txt").await?; - let mut handle = f1.chain(f2); - let mut buffer = String::new(); + let mut handle = f1.chain(f2); + let mut buffer = String::new(); - // read the value into a String. We could use any Read method here, - // this is just one example. - handle.read_to_string(&mut buffer).await?; - Ok(()) - }) } + // read the value into a String. We could use any Read method here, + // this is just one example. + handle.read_to_string(&mut buffer).await?; + # + # Ok(()) }) } ``` "#] fn chain(self, next: R) -> chain::Chain where Self: Sized { diff --git a/src/io/read/take.rs b/src/io/read/take.rs index b63f76d4a..def4e2405 100644 --- a/src/io/read/take.rs +++ b/src/io/read/take.rs @@ -30,19 +30,19 @@ impl Take { /// # Examples /// /// ```no_run - /// use async_std::io; + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # /// use async_std::prelude::*; /// use async_std::fs::File; /// - /// fn main() -> io::Result<()> { async_std::task::block_on(async { - /// let f = File::open("foo.txt").await?; + /// let f = File::open("foo.txt").await?; /// - /// // read at most five bytes - /// let handle = f.take(5); + /// // read at most five bytes + /// let handle = f.take(5); /// - /// println!("limit: {}", handle.limit()); - /// Ok(()) - /// }) } + /// println!("limit: {}", handle.limit()); + /// # + /// # Ok(()) }) } /// ``` pub fn limit(&self) -> u64 { self.limit @@ -56,20 +56,20 @@ impl Take { /// # Examples /// /// ```no_run - /// use async_std::io; + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # /// use async_std::prelude::*; /// use async_std::fs::File; /// - /// fn main() -> io::Result<()> { async_std::task::block_on(async { - /// let f = File::open("foo.txt").await?; + /// let f = File::open("foo.txt").await?; /// - /// // read at most five bytes - /// let mut handle = f.take(5); - /// handle.set_limit(10); + /// // read at most five bytes + /// let mut handle = f.take(5); + /// handle.set_limit(10); /// - /// assert_eq!(handle.limit(), 10); - /// Ok(()) - /// }) } + /// assert_eq!(handle.limit(), 10); + /// # + /// # Ok(()) }) } /// ``` pub fn set_limit(&mut self, limit: u64) { self.limit = limit; @@ -80,20 +80,20 @@ impl Take { /// # Examples /// /// ```no_run - /// use async_std::io; + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # /// use async_std::prelude::*; /// use async_std::fs::File; /// - /// fn main() -> io::Result<()> { async_std::task::block_on(async { - /// let file = File::open("foo.txt").await?; + /// let file = File::open("foo.txt").await?; /// - /// let mut buffer = [0; 5]; - /// let mut handle = file.take(5); - /// handle.read(&mut buffer).await?; + /// let mut buffer = [0; 5]; + /// let mut handle = file.take(5); + /// handle.read(&mut buffer).await?; /// - /// let file = handle.into_inner(); - /// Ok(()) - /// }) } + /// let file = handle.into_inner(); + /// # + /// # Ok(()) }) } /// ``` pub fn into_inner(self) -> T { self.inner @@ -104,20 +104,20 @@ impl Take { /// # Examples /// /// ```no_run - /// use async_std::io; + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # /// use async_std::prelude::*; /// use async_std::fs::File; /// - /// fn main() -> io::Result<()> { async_std::task::block_on(async { - /// let file = File::open("foo.txt").await?; + /// let file = File::open("foo.txt").await?; /// - /// let mut buffer = [0; 5]; - /// let mut handle = file.take(5); - /// handle.read(&mut buffer).await?; + /// let mut buffer = [0; 5]; + /// let mut handle = file.take(5); + /// handle.read(&mut buffer).await?; /// - /// let file = handle.get_ref(); - /// Ok(()) - /// }) } + /// let file = handle.get_ref(); + /// # + /// # Ok(()) }) } /// ``` pub fn get_ref(&self) -> &T { &self.inner @@ -132,20 +132,20 @@ impl Take { /// # Examples /// /// ```no_run - /// use async_std::io; + /// # fn main() -> async_std::io::Result<()> { async_std::task::block_on(async { + /// # /// use async_std::prelude::*; /// use async_std::fs::File; /// - /// fn main() -> io::Result<()> { async_std::task::block_on(async { - /// let file = File::open("foo.txt").await?; + /// let file = File::open("foo.txt").await?; /// - /// let mut buffer = [0; 5]; - /// let mut handle = file.take(5); - /// handle.read(&mut buffer).await?; + /// let mut buffer = [0; 5]; + /// let mut handle = file.take(5); + /// handle.read(&mut buffer).await?; /// - /// let file = handle.get_mut(); - /// Ok(()) - /// }) } + /// let file = handle.get_mut(); + /// # + /// # Ok(()) }) } /// ``` pub fn get_mut(&mut self) -> &mut T { &mut self.inner