Skip to content

Commit

Permalink
Merge pull request rust-lang#199 from tmiasko/stream-take
Browse files Browse the repository at this point in the history
Pass through all errors in both Stream::take and Stream::skip until stream is done.
  • Loading branch information
alexcrichton authored Oct 10, 2016
2 parents 9e56f99 + 7b7a758 commit f9870f2
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 7 deletions.
13 changes: 12 additions & 1 deletion src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,10 +587,16 @@ pub trait Stream {
for_each::new(self, f)
}

/// Creates a new stream of at most `amt` items.
/// Creates a new stream of at most `amt` items of the underlying stream.
///
/// Once `amt` items have been yielded from this stream then it will always
/// return that the stream is done.
///
/// # Errors
///
/// Any errors yielded from underlying stream, before the desired amount of
/// items is reached, are passed through and do not affect the total number
/// of items taken.
fn take(self, amt: u64) -> Take<Self>
where Self: Sized
{
Expand All @@ -601,6 +607,11 @@ pub trait Stream {
///
/// Once `amt` items have been skipped from this stream then it will always
/// return the remaining items on this stream.
///
/// # Errors
///
/// All errors yielded from underlying stream are passed through and do not
/// affect the total number of items skipped.
fn skip(self, amt: u64) -> Skip<Self>
where Self: Sized
{
Expand Down
11 changes: 5 additions & 6 deletions src/stream/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ impl<S> Stream for Take<S>
if self.remaining == 0 {
Ok(Async::Ready(None))
} else {
match self.stream.poll() {
e @ Ok(Async::Ready(Some(_))) | e @ Err(_) => {
self.remaining -= 1;
e
}
other => other,
let next = try_ready!(self.stream.poll());
match next {
Some(_) => self.remaining -= 1,
None => self.remaining = 0,
}
Ok(Async::Ready(next))
}
}
}
27 changes: 27 additions & 0 deletions tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,18 @@ fn skip() {
assert_done(|| list().skip(2).collect(), Ok(vec![3]));
}

#[test]
fn skip_passes_errors_through() {
let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Ok(5)])
.skip(1)
.wait();
assert_eq!(s.next(), Some(Err(1)));
assert_eq!(s.next(), Some(Err(2)));
assert_eq!(s.next(), Some(Ok(4)));
assert_eq!(s.next(), Some(Ok(5)));
assert_eq!(s.next(), None);
}

#[test]
fn skip_while() {
assert_done(|| list().skip_while(|e| Ok(*e % 2 == 1)).collect(),
Expand All @@ -100,6 +112,21 @@ fn take() {
assert_done(|| list().take(2).collect(), Ok(vec![1, 2]));
}

#[test]
fn take_passes_errors_through() {
let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)])
.take(1)
.wait();
assert_eq!(s.next(), Some(Err(1)));
assert_eq!(s.next(), Some(Err(2)));
assert_eq!(s.next(), Some(Ok(3)));
assert_eq!(s.next(), None);

let mut s = iter(vec![Ok(1), Err(2)]).take(1).wait();
assert_eq!(s.next(), Some(Ok(1)));
assert_eq!(s.next(), None);
}

#[test]
fn peekable() {
assert_done(|| list().peekable().collect(), Ok(vec![1, 2, 3]));
Expand Down

0 comments on commit f9870f2

Please sign in to comment.