diff --git a/src/stream/chunks.rs b/src/stream/chunks.rs new file mode 100644 index 00000000000..56bcce427b1 --- /dev/null +++ b/src/stream/chunks.rs @@ -0,0 +1,62 @@ +use std::mem; +use std::prelude::v1::*; + +use {Async, Poll}; +use stream::{Stream, Fuse}; + +/// An adaptor that chunks up elements in a vector. +/// +/// This adaptor will buffer up a list of items in the stream and pass on the +/// vector used for buffering when a specified capacity has been reached. This is +/// created by the `Stream::chunks` method. +#[must_use = "streams do nothing unless polled"] +pub struct Chunks + where S: Stream +{ + capacity: usize, // TODO: Do we need this? Doesn't Vec::capacity() suffice? + items: Vec<::Item>, + stream: Fuse +} + +pub fn new(s: S, capacity: usize) -> Chunks + where S: Stream +{ + assert!(capacity > 0); + + Chunks { + capacity: capacity, + items: Vec::with_capacity(capacity), + stream: super::fuse::new(s), + } +} + +impl Stream for Chunks + where S: Stream +{ + type Item = Vec<::Item>; + type Error = ::Error; + + fn poll(&mut self) -> Poll, Self::Error> { + loop { + if let Some(item) = try_ready!(self.stream.poll()) { + // Push the item into the buffer and check whether it is + // full. If so, replace our buffer with a new and empty one + // and return the full one. + self.items.push(item); + if self.items.len() >= self.capacity { + let full_buf = mem::replace(&mut self.items, Vec::with_capacity(self.capacity)); + return Ok(Async::Ready(Some(full_buf))) + } + } else { + // Since the underlying stream ran out of values, return + // what we have buffered, if we have anything. + return if self.items.len() > 0 { + let full_buf = mem::replace(&mut self.items, Vec::new()); + Ok(Async::Ready(Some(full_buf))) + } else { + Ok(Async::Ready(None)) + } + } + } + } +} diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 7cc61f0ac0a..e9d80f13ed6 100755 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -64,12 +64,14 @@ if_std! { mod buffer_unordered; mod catch_unwind; mod channel; + mod chunks; mod collect; mod wait; pub use self::buffered::Buffered; pub use self::buffer_unordered::BufferUnordered; pub use self::catch_unwind::CatchUnwind; pub use self::channel::{channel, Sender, Receiver, FutureSender, SendError}; + pub use self::chunks::Chunks; pub use self::collect::Collect; pub use self::wait::Wait; @@ -741,6 +743,20 @@ pub trait Stream { { peek::new(self) } + + /// An adaptor for chunking up items of the stream inside a vector. + /// + /// The vector will contain at most `capacity` elements, though can contain + /// less if the underlying stream ended and did not produce a multiple of + /// `capacity` elements. `capacity` must be greater than zero. + /// + /// Errors are passed through. + #[cfg(feature = "use_std")] + fn chunks(self, capacity: usize) -> Chunks + where Self: Sized + { + chunks::new(self, capacity) + } } impl<'a, S: ?Sized + Stream> Stream for &'a mut S { diff --git a/tests/stream.rs b/tests/stream.rs index d2186206a32..6475aa545cc 100644 --- a/tests/stream.rs +++ b/tests/stream.rs @@ -265,3 +265,16 @@ fn wait() { assert_eq!(list().wait().collect::, _>>(), Ok(vec![1, 2, 3])); } + +#[test] +fn chunks() { + assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]])); + assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]])); + assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]])); +} + +#[test] +#[should_panic] +fn chunks_panic_on_cap_zero() { + let _ = list().chunks(0); +} \ No newline at end of file