diff --git a/tokio-io/src/io/async_buf_read_ext.rs b/tokio-io/src/io/async_buf_read_ext.rs index 20a5bbf2af5..98d5d9b85da 100644 --- a/tokio-io/src/io/async_buf_read_ext.rs +++ b/tokio-io/src/io/async_buf_read_ext.rs @@ -1,6 +1,7 @@ use crate::io::lines::{lines, Lines}; use crate::io::read_line::{read_line, ReadLine}; use crate::io::read_until::{read_until, ReadUntil}; +use crate::io::split::{split, Split}; use crate::AsyncBufRead; /// An extension trait which adds utility methods to `AsyncBufRead` types. @@ -55,6 +56,30 @@ pub trait AsyncBufReadExt: AsyncBufRead { read_line(self, buf) } + /// Returns a stream of the contents of this reader split on the byte + /// `byte`. + /// + /// This method is the async equivalent to + /// [`BufRead::split`](std::io::BufRead::split). + /// + /// The stream returned from this function will yield instances of + /// [`io::Result`]`<`[`Vec`]`>`. Each vector returned will *not* have + /// the delimiter byte at the end. + /// + /// [`io::Result`]: std::io::Result + /// [`Vec`]: std::vec::Vec + /// + /// # Errors + /// + /// Each item of the stream has the same error semantics as + /// [`AsyncBufReadExt::read_until`](AsyncBufReadExt::read_until). + fn split(self, byte: u8) -> Split + where + Self: Sized, + { + split(self, byte) + } + /// Returns a stream over the lines of this reader. /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines). /// diff --git a/tokio-io/src/io/mod.rs b/tokio-io/src/io/mod.rs index 522188a7f69..cadbec5576f 100644 --- a/tokio-io/src/io/mod.rs +++ b/tokio-io/src/io/mod.rs @@ -18,6 +18,7 @@ mod read_until; mod repeat; mod shutdown; mod sink; +mod split; mod take; mod write; mod write_all; diff --git a/tokio-io/src/io/split.rs b/tokio-io/src/io/split.rs new file mode 100644 index 00000000000..bc69e1ca9a2 --- /dev/null +++ b/tokio-io/src/io/split.rs @@ -0,0 +1,67 @@ +use super::read_until::read_until_internal; +use crate::AsyncBufRead; + +use futures_core::{ready, Stream}; +use pin_project::{pin_project, project}; +use std::io; +use std::mem; +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// Stream for the [`split`](crate::io::AsyncBufReadExt::split) method. +#[pin_project] +#[derive(Debug)] +#[must_use = "streams do nothing unless polled"] +pub struct Split { + #[pin] + reader: R, + buf: Vec, + delim: u8, + read: usize, +} + +pub(crate) fn split(reader: R, delim: u8) -> Split +where + R: AsyncBufRead, +{ + Split { + reader, + buf: Vec::new(), + delim, + read: 0, + } +} + +impl Stream for Split { + type Item = io::Result>; + + #[project] + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + #[project] + let Split { + reader, + buf, + delim, + read, + } = self.project(); + + let n = ready!(read_until_internal(reader, cx, *delim, buf, read))?; + if n == 0 && buf.is_empty() { + return Poll::Ready(None); + } + if buf.last() == Some(&delim) { + buf.pop(); + } + Poll::Ready(Some(Ok(mem::replace(buf, Vec::new())))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn assert_unpin() { + crate::is_unpin::>(); + } +}