Skip to content

Commit

Permalink
Merge pull request rust-lang#211 from NeoLegends/buffer-combinator
Browse files Browse the repository at this point in the history
Implement buffer combinator
  • Loading branch information
alexcrichton authored Oct 19, 2016
2 parents c95e3c4 + 11f1d5e commit fccd991
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 0 deletions.
62 changes: 62 additions & 0 deletions src/stream/chunks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::mem;
use std::prelude::v1::*;

use {Async, Poll};
use stream::{Stream, Fuse};

/// An adaptor that chunks up elements in a vector.
///
/// This adaptor will buffer up a list of items in the stream and pass on the
/// vector used for buffering when a specified capacity has been reached. This is
/// created by the `Stream::chunks` method.
#[must_use = "streams do nothing unless polled"]
pub struct Chunks<S>
where S: Stream
{
capacity: usize, // TODO: Do we need this? Doesn't Vec::capacity() suffice?
items: Vec<<S as Stream>::Item>,
stream: Fuse<S>
}

pub fn new<S>(s: S, capacity: usize) -> Chunks<S>
where S: Stream
{
assert!(capacity > 0);

Chunks {
capacity: capacity,
items: Vec::with_capacity(capacity),
stream: super::fuse::new(s),
}
}

impl<S> Stream for Chunks<S>
where S: Stream
{
type Item = Vec<<S as Stream>::Item>;
type Error = <S as Stream>::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
if let Some(item) = try_ready!(self.stream.poll()) {
// Push the item into the buffer and check whether it is
// full. If so, replace our buffer with a new and empty one
// and return the full one.
self.items.push(item);
if self.items.len() >= self.capacity {
let full_buf = mem::replace(&mut self.items, Vec::with_capacity(self.capacity));
return Ok(Async::Ready(Some(full_buf)))
}
} else {
// Since the underlying stream ran out of values, return
// what we have buffered, if we have anything.
return if self.items.len() > 0 {
let full_buf = mem::replace(&mut self.items, Vec::new());
Ok(Async::Ready(Some(full_buf)))
} else {
Ok(Async::Ready(None))
}
}
}
}
}
16 changes: 16 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,14 @@ if_std! {
mod buffer_unordered;
mod catch_unwind;
mod channel;
mod chunks;
mod collect;
mod wait;
pub use self::buffered::Buffered;
pub use self::buffer_unordered::BufferUnordered;
pub use self::catch_unwind::CatchUnwind;
pub use self::channel::{channel, Sender, Receiver, FutureSender, SendError};
pub use self::chunks::Chunks;
pub use self::collect::Collect;
pub use self::wait::Wait;

Expand Down Expand Up @@ -741,6 +743,20 @@ pub trait Stream {
{
peek::new(self)
}

/// An adaptor for chunking up items of the stream inside a vector.
///
/// The vector will contain at most `capacity` elements, though can contain
/// less if the underlying stream ended and did not produce a multiple of
/// `capacity` elements. `capacity` must be greater than zero.
///
/// Errors are passed through.
#[cfg(feature = "use_std")]
fn chunks(self, capacity: usize) -> Chunks<Self>
where Self: Sized
{
chunks::new(self, capacity)
}
}

impl<'a, S: ?Sized + Stream> Stream for &'a mut S {
Expand Down
13 changes: 13 additions & 0 deletions tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,16 @@ fn wait() {
assert_eq!(list().wait().collect::<Result<Vec<_>, _>>(),
Ok(vec![1, 2, 3]));
}

#[test]
fn chunks() {
assert_done(|| list().chunks(3).collect(), Ok(vec![vec![1, 2, 3]]));
assert_done(|| list().chunks(1).collect(), Ok(vec![vec![1], vec![2], vec![3]]));
assert_done(|| list().chunks(2).collect(), Ok(vec![vec![1, 2], vec![3]]));
}

#[test]
#[should_panic]
fn chunks_panic_on_cap_zero() {
let _ = list().chunks(0);
}

0 comments on commit fccd991

Please sign in to comment.