From 4c9c6c135e2f316e34ade90ee99b9c28b5fe3eda Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Sun, 28 Apr 2019 01:25:55 +0900 Subject: [PATCH] Add AsyncBufReadExt::read_line --- futures-util/src/io/mod.rs | 64 +++++++++++++++++++- futures-util/src/io/read_line.rs | 55 ++++++++++++++++++ futures-util/src/io/read_until.rs | 2 +- futures/src/lib.rs | 4 +- futures/tests/io_read_line.rs | 97 +++++++++++++++++++++++++++++++ 5 files changed, 216 insertions(+), 6 deletions(-) create mode 100644 futures-util/src/io/read_line.rs create mode 100644 futures/tests/io_read_line.rs diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index b8fa599e3a..75821de1e9 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -30,9 +30,8 @@ pub use self::read::Read; mod read_exact; pub use self::read_exact::ReadExact; -// TODO -// mod read_line; -// pub use self::read_line::ReadLine; +mod read_line; +pub use self::read_line::ReadLine; mod read_to_end; pub use self::read_to_end::ReadToEnd; @@ -403,6 +402,65 @@ pub trait AsyncBufReadExt: AsyncBufRead { { ReadUntil::new(self, byte, buf) } + + /// Creates a future which will read all the bytes associated with this I/O + /// object into `buf` until a newline (the 0xA byte) or EOF is reached, + /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line). + /// + /// This function will read bytes from the underlying stream until the + /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes + /// up to, and including, the delimiter (if found) will be appended to + /// `buf`. + /// + /// The returned future will resolve to the number of bytes read once the read + /// operation is completed. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + /// + /// # Errors + /// + /// This function has the same error semantics as [`read_until`] and will + /// also return an error if the read bytes are not valid UTF-8. If an I/O + /// error is encountered then `buf` may contain some bytes already read in + /// the event that all data read so far was valid UTF-8. + /// + /// [`read_until`]: AsyncBufReadExt::read_until + /// + /// # Examples + /// + /// ``` + /// #![feature(async_await, await_macro)] + /// # futures::executor::block_on(async { + /// use futures::io::AsyncBufReadExt; + /// use std::io::Cursor; + /// + /// let mut cursor = Cursor::new(b"foo\nbar"); + /// let mut buf = String::new(); + /// + /// // cursor is at 'f' + /// let num_bytes = await!(cursor.read_line(&mut buf))?; + /// assert_eq!(num_bytes, 4); + /// assert_eq!(buf, "foo\n"); + /// buf.clear(); + /// + /// // cursor is at 'b' + /// let num_bytes = await!(cursor.read_line(&mut buf))?; + /// assert_eq!(num_bytes, 3); + /// assert_eq!(buf, "bar"); + /// buf.clear(); + /// + /// // cursor is at EOF + /// let num_bytes = await!(cursor.read_line(&mut buf))?; + /// assert_eq!(num_bytes, 0); + /// assert_eq!(buf, ""); + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> + where Self: Unpin, + { + ReadLine::new(self, buf) + } } impl AsyncBufReadExt for R {} diff --git a/futures-util/src/io/read_line.rs b/futures-util/src/io/read_line.rs new file mode 100644 index 0000000000..b23ade8c3a --- /dev/null +++ b/futures-util/src/io/read_line.rs @@ -0,0 +1,55 @@ +use futures_core::future::Future; +use futures_core::task::{Context, Poll}; +use futures_io::AsyncBufRead; +use std::io; +use std::pin::Pin; +use std::str; +use super::read_until::read_until_internal; + +/// Future for the [`read_line`](super::AsyncBufReadExt::read_line) method. +#[derive(Debug)] +pub struct ReadLine<'a, R: ?Sized + Unpin> { + reader: &'a mut R, + buf: &'a mut String, + read: usize, +} + +impl Unpin for ReadLine<'_, R> {} + +impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> { + pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self { + Self { reader, buf, read: 0 } + } +} + +struct Guard<'a> { buf: &'a mut Vec, len: usize } + +impl Drop for Guard<'_> { + fn drop(&mut self) { + unsafe { self.buf.set_len(self.len); } + } +} + +impl Future for ReadLine<'_, R> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { reader, buf, read } = &mut *self; + unsafe { + // safety: https://doc.rust-lang.org/src/std/io/mod.rs.html#310 + let mut g = Guard { len: buf.len(), buf: buf.as_mut_vec() }; + let poll = read_until_internal(Pin::new(reader), b'\n', g.buf, read, cx); + if str::from_utf8(&g.buf[g.len..]).is_err() { + let err = |_| Err(io::Error::new(io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8")); + match poll { + Poll::Ready(ret) => Poll::Ready(ret.and_then(err)), + Poll::Pending => Poll::Ready(err(0)) + } + } else { + g.len = g.buf.len(); + poll + } + } + } +} diff --git a/futures-util/src/io/read_until.rs b/futures-util/src/io/read_until.rs index 2da81aca98..d924f02494 100644 --- a/futures-util/src/io/read_until.rs +++ b/futures-util/src/io/read_until.rs @@ -22,7 +22,7 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> { } } -fn read_until_internal( +pub(super) fn read_until_internal( mut reader: Pin<&mut R>, byte: u8, buf: &mut Vec, diff --git a/futures/src/lib.rs b/futures/src/lib.rs index 0a73700df4..373184b194 100644 --- a/futures/src/lib.rs +++ b/futures/src/lib.rs @@ -276,8 +276,8 @@ pub mod io { }; pub use futures_util::io::{ AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo, - Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadToEnd, ReadUntil, - Seek, Window, WriteAll, WriteHalf, + Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd, + ReadUntil, Seek, Window, WriteAll, WriteHalf, }; } diff --git a/futures/tests/io_read_line.rs b/futures/tests/io_read_line.rs new file mode 100644 index 0000000000..798b2b8af2 --- /dev/null +++ b/futures/tests/io_read_line.rs @@ -0,0 +1,97 @@ +use futures::executor::block_on; +use futures::future::Future; +use futures::io::{AsyncRead, AsyncBufRead, AsyncBufReadExt}; +use futures::task::{Context, Poll}; +use futures_test::task::noop_context; +use std::cmp; +use std::io::{self, Cursor}; +use std::pin::Pin; + +#[test] +fn read_line() { + let mut buf = Cursor::new(&b"12"[..]); + let mut v = String::new(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2); + assert_eq!(v, "12"); + + let mut buf = Cursor::new(&b"12\n\n"[..]); + let mut v = String::new(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 3); + assert_eq!(v, "12\n"); + v.clear(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 1); + assert_eq!(v, "\n"); + v.clear(); + assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 0); + assert_eq!(v, ""); +} + +fn run(mut f: F) -> F::Output { + let mut cx = noop_context(); + loop { + if let Poll::Ready(x) = Pin::new(&mut f).poll(&mut cx) { + return x; + } + } +} + +struct MaybePending<'a> { + inner: &'a [u8], + ready: bool, +} + +impl<'a> MaybePending<'a> { + fn new(inner: &'a [u8]) -> Self { + Self { inner, ready: false } + } +} + +impl AsyncRead for MaybePending<'_> { + fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8]) + -> Poll> + { + unimplemented!() + } +} + +impl AsyncBufRead for MaybePending<'_> { + fn poll_fill_buf<'a>(mut self: Pin<&'a mut Self>, _: &mut Context<'_>) + -> Poll> + { + if self.ready { + self.ready = false; + if self.inner.is_empty() { return Poll::Ready(Ok(&[])) } + let len = cmp::min(2, self.inner.len()); + Poll::Ready(Ok(&self.inner[0..len])) + } else { + self.ready = true; + Poll::Pending + } + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + self.inner = &self.inner[amt..]; + } +} + +#[test] +fn maybe_ready() { + let mut buf = MaybePending::new(&b"12"[..]); + let mut v = String::new(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2); + assert_eq!(v, "12"); + + let mut buf = MaybePending::new(&b"12\n\n"[..]); + let mut v = String::new(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3); + assert_eq!(v, "12\n"); + v.clear(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 1); + assert_eq!(v, "\n"); + v.clear(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0); + assert_eq!(v, ""); + v.clear(); + assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0); + assert_eq!(v, ""); +}