Skip to content

Commit

Permalink
Add AsyncBufReadExt::read_line
Browse files Browse the repository at this point in the history
  • Loading branch information
taiki-e committed Apr 27, 2019
1 parent 3872328 commit 4c9c6c1
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 6 deletions.
64 changes: 61 additions & 3 deletions futures-util/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ pub use self::read::Read;
mod read_exact;
pub use self::read_exact::ReadExact;

// TODO
// mod read_line;
// pub use self::read_line::ReadLine;
mod read_line;
pub use self::read_line::ReadLine;

mod read_to_end;
pub use self::read_to_end::ReadToEnd;
Expand Down Expand Up @@ -403,6 +402,65 @@ pub trait AsyncBufReadExt: AsyncBufRead {
{
ReadUntil::new(self, byte, buf)
}

/// Creates a future which will read all the bytes associated with this I/O
/// object into `buf` until a newline (the 0xA byte) or EOF is reached,
/// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
///
/// This function will read bytes from the underlying stream until the
/// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
/// up to, and including, the delimiter (if found) will be appended to
/// `buf`.
///
/// The returned future will resolve to the number of bytes read once the read
/// operation is completed.
///
/// In the case of an error the buffer and the object will be discarded, with
/// the error yielded.
///
/// # Errors
///
/// This function has the same error semantics as [`read_until`] and will
/// also return an error if the read bytes are not valid UTF-8. If an I/O
/// error is encountered then `buf` may contain some bytes already read in
/// the event that all data read so far was valid UTF-8.
///
/// [`read_until`]: AsyncBufReadExt::read_until
///
/// # Examples
///
/// ```
/// #![feature(async_await, await_macro)]
/// # futures::executor::block_on(async {
/// use futures::io::AsyncBufReadExt;
/// use std::io::Cursor;
///
/// let mut cursor = Cursor::new(b"foo\nbar");
/// let mut buf = String::new();
///
/// // cursor is at 'f'
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
/// assert_eq!(num_bytes, 4);
/// assert_eq!(buf, "foo\n");
/// buf.clear();
///
/// // cursor is at 'b'
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
/// assert_eq!(num_bytes, 3);
/// assert_eq!(buf, "bar");
/// buf.clear();
///
/// // cursor is at EOF
/// let num_bytes = await!(cursor.read_line(&mut buf))?;
/// assert_eq!(num_bytes, 0);
/// assert_eq!(buf, "");
/// # Ok::<(), Box<std::error::Error>>(()) }).unwrap();
/// ```
fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
where Self: Unpin,
{
ReadLine::new(self, buf)
}
}

impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
55 changes: 55 additions & 0 deletions futures-util/src/io/read_line.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use futures_core::future::Future;
use futures_core::task::{Context, Poll};
use futures_io::AsyncBufRead;
use std::io;
use std::pin::Pin;
use std::str;
use super::read_until::read_until_internal;

/// Future for the [`read_line`](super::AsyncBufReadExt::read_line) method.
#[derive(Debug)]
pub struct ReadLine<'a, R: ?Sized + Unpin> {
reader: &'a mut R,
buf: &'a mut String,
read: usize,
}

impl<R: ?Sized + Unpin> Unpin for ReadLine<'_, R> {}

impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadLine<'a, R> {
pub(super) fn new(reader: &'a mut R, buf: &'a mut String) -> Self {
Self { reader, buf, read: 0 }
}
}

struct Guard<'a> { buf: &'a mut Vec<u8>, len: usize }

impl Drop for Guard<'_> {
fn drop(&mut self) {
unsafe { self.buf.set_len(self.len); }
}
}

impl<R: AsyncBufRead + ?Sized + Unpin> Future for ReadLine<'_, R> {
type Output = io::Result<usize>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Self { reader, buf, read } = &mut *self;
unsafe {
// safety: https://doc.rust-lang.org/src/std/io/mod.rs.html#310
let mut g = Guard { len: buf.len(), buf: buf.as_mut_vec() };
let poll = read_until_internal(Pin::new(reader), b'\n', g.buf, read, cx);
if str::from_utf8(&g.buf[g.len..]).is_err() {
let err = |_| Err(io::Error::new(io::ErrorKind::InvalidData,
"stream did not contain valid UTF-8"));
match poll {
Poll::Ready(ret) => Poll::Ready(ret.and_then(err)),
Poll::Pending => Poll::Ready(err(0))
}
} else {
g.len = g.buf.len();
poll
}
}
}
}
2 changes: 1 addition & 1 deletion futures-util/src/io/read_until.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl<'a, R: AsyncBufRead + ?Sized + Unpin> ReadUntil<'a, R> {
}
}

fn read_until_internal<R: AsyncBufRead + ?Sized + Unpin>(
pub(super) fn read_until_internal<R: AsyncBufRead + ?Sized + Unpin>(
mut reader: Pin<&mut R>,
byte: u8,
buf: &mut Vec<u8>,
Expand Down
4 changes: 2 additions & 2 deletions futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ pub mod io {
};
pub use futures_util::io::{
AsyncReadExt, AsyncWriteExt, AsyncSeekExt, AsyncBufReadExt, AllowStdIo,
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadToEnd, ReadUntil,
Seek, Window, WriteAll, WriteHalf,
Close, CopyInto, Flush, Read, ReadExact, ReadHalf, ReadLine, ReadToEnd,
ReadUntil, Seek, Window, WriteAll, WriteHalf,
};
}

Expand Down
97 changes: 97 additions & 0 deletions futures/tests/io_read_line.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use futures::executor::block_on;
use futures::future::Future;
use futures::io::{AsyncRead, AsyncBufRead, AsyncBufReadExt};
use futures::task::{Context, Poll};
use futures_test::task::noop_context;
use std::cmp;
use std::io::{self, Cursor};
use std::pin::Pin;

#[test]
fn read_line() {
let mut buf = Cursor::new(&b"12"[..]);
let mut v = String::new();
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 2);
assert_eq!(v, "12");

let mut buf = Cursor::new(&b"12\n\n"[..]);
let mut v = String::new();
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 3);
assert_eq!(v, "12\n");
v.clear();
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 1);
assert_eq!(v, "\n");
v.clear();
assert_eq!(block_on(buf.read_line(&mut v)).unwrap(), 0);
assert_eq!(v, "");
}

fn run<F: Future + Unpin>(mut f: F) -> F::Output {
let mut cx = noop_context();
loop {
if let Poll::Ready(x) = Pin::new(&mut f).poll(&mut cx) {
return x;
}
}
}

struct MaybePending<'a> {
inner: &'a [u8],
ready: bool,
}

impl<'a> MaybePending<'a> {
fn new(inner: &'a [u8]) -> Self {
Self { inner, ready: false }
}
}

impl AsyncRead for MaybePending<'_> {
fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, _: &mut [u8])
-> Poll<io::Result<usize>>
{
unimplemented!()
}
}

impl AsyncBufRead for MaybePending<'_> {
fn poll_fill_buf<'a>(mut self: Pin<&'a mut Self>, _: &mut Context<'_>)
-> Poll<io::Result<&'a [u8]>>
{
if self.ready {
self.ready = false;
if self.inner.is_empty() { return Poll::Ready(Ok(&[])) }
let len = cmp::min(2, self.inner.len());
Poll::Ready(Ok(&self.inner[0..len]))
} else {
self.ready = true;
Poll::Pending
}
}

fn consume(mut self: Pin<&mut Self>, amt: usize) {
self.inner = &self.inner[amt..];
}
}

#[test]
fn maybe_ready() {
let mut buf = MaybePending::new(&b"12"[..]);
let mut v = String::new();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 2);
assert_eq!(v, "12");

let mut buf = MaybePending::new(&b"12\n\n"[..]);
let mut v = String::new();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 3);
assert_eq!(v, "12\n");
v.clear();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 1);
assert_eq!(v, "\n");
v.clear();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0);
assert_eq!(v, "");
v.clear();
assert_eq!(run(buf.read_line(&mut v)).unwrap(), 0);
assert_eq!(v, "");
}

0 comments on commit 4c9c6c1

Please sign in to comment.