Skip to content

Commit 710c357

Browse files
committed
Add StreamExt::cycle
1 parent 7667251 commit 710c357

File tree

5 files changed

+107
-4
lines changed

5 files changed

+107
-4
lines changed

futures-util/src/stream/iter.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use futures_core::stream::Stream;
33
use futures_core::task::{Context, Poll};
44

55
/// Stream for the [`iter`] function.
6-
#[derive(Debug)]
6+
#[derive(Debug, Clone)]
77
#[must_use = "streams do nothing unless polled"]
88
pub struct Iter<I> {
99
iter: I,

futures-util/src/stream/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ pub use futures_core::stream::{FusedStream, Stream, TryStream};
1313
#[allow(clippy::module_inception)]
1414
mod stream;
1515
pub use self::stream::{
16-
Chain, Collect, Concat, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach, Fuse,
17-
Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
16+
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten, Fold, ForEach,
17+
Fuse, Inspect, Map, Next, Peek, Peekable, Scan, SelectNextSome, Skip, SkipWhile, StreamExt,
1818
StreamFuture, Take, TakeWhile, TakeUntil, Then, Zip,
1919
};
2020

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use core::pin::Pin;
2+
use core::usize;
3+
use futures_core::stream::{FusedStream, Stream};
4+
use futures_core::task::{Context, Poll};
5+
use pin_project::pin_project;
6+
7+
/// Stream for the [`cycle`](super::StreamExt::cycle) method.
8+
#[pin_project]
9+
#[derive(Debug)]
10+
#[must_use = "streams do nothing unless polled"]
11+
pub struct Cycle<St> {
12+
orig: St,
13+
#[pin]
14+
stream: St,
15+
}
16+
17+
impl<St> Cycle<St>
18+
where
19+
St: Clone + Stream,
20+
{
21+
pub(super) fn new(stream: St) -> Self {
22+
Self {
23+
orig: stream.clone(),
24+
stream,
25+
}
26+
}
27+
}
28+
29+
impl<St> Stream for Cycle<St>
30+
where
31+
St: Clone + Stream,
32+
{
33+
type Item = St::Item;
34+
35+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
36+
let mut this = self.project();
37+
38+
match ready!(this.stream.as_mut().poll_next(cx)) {
39+
None => {
40+
this.stream.set(this.orig.clone());
41+
this.stream.poll_next(cx)
42+
}
43+
item => Poll::Ready(item),
44+
}
45+
}
46+
47+
fn size_hint(&self) -> (usize, Option<usize>) {
48+
// the cycle stream is either empty or infinite
49+
match self.orig.size_hint() {
50+
size @ (0, Some(0)) => size,
51+
(0, _) => (0, None),
52+
_ => (usize::max_value(), None),
53+
}
54+
}
55+
}
56+
57+
impl<St> FusedStream for Cycle<St>
58+
where
59+
St: Clone + Stream,
60+
{
61+
fn is_terminated(&self) -> bool {
62+
// the cycle stream is either empty or infinite
63+
if let (0, Some(0)) = self.size_hint() {
64+
true
65+
} else {
66+
false
67+
}
68+
}
69+
}

futures-util/src/stream/stream/mod.rs

+34
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ mod concat;
3333
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
3434
pub use self::concat::Concat;
3535

36+
mod cycle;
37+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
38+
pub use self::cycle::Cycle;
39+
3640
mod enumerate;
3741
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
3842
pub use self::enumerate::Enumerate;
@@ -513,6 +517,36 @@ pub trait StreamExt: Stream {
513517
assert_future::<Self::Item, _>(Concat::new(self))
514518
}
515519

520+
/// Repeats a stream endlessly.
521+
///
522+
/// The stream never terminates. Note that you likely want to avoid
523+
/// usage of `collect` or such on the returned stream as it will exhaust
524+
/// available memory as it tries to just fill up all RAM.
525+
///
526+
/// # Examples
527+
///
528+
/// ```
529+
/// # futures::executor::block_on(async {
530+
/// use futures::stream::{self, StreamExt};
531+
/// let a = [1, 2, 3];
532+
/// let mut s = stream::iter(a.iter()).cycle();
533+
///
534+
/// assert_eq!(s.next().await, Some(&1));
535+
/// assert_eq!(s.next().await, Some(&2));
536+
/// assert_eq!(s.next().await, Some(&3));
537+
/// assert_eq!(s.next().await, Some(&1));
538+
/// assert_eq!(s.next().await, Some(&2));
539+
/// assert_eq!(s.next().await, Some(&3));
540+
/// assert_eq!(s.next().await, Some(&1));
541+
/// # });
542+
/// ```
543+
fn cycle(self) -> Cycle<Self>
544+
where
545+
Self: Sized + Clone,
546+
{
547+
assert_stream::<Self::Item, _>(Cycle::new(self))
548+
}
549+
516550
/// Execute an accumulating asynchronous computation over a stream,
517551
/// collecting all the values into one final result.
518552
///

futures/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ pub mod stream {
456456
try_unfold, TryUnfold,
457457

458458
StreamExt,
459-
Chain, Collect, Concat, Enumerate, Filter, FilterMap, FlatMap, Flatten,
459+
Chain, Collect, Concat, Cycle, Enumerate, Filter, FilterMap, FlatMap, Flatten,
460460
Fold, Forward, ForEach, Fuse, StreamFuture, Inspect, Map, Next,
461461
SelectNextSome, Peek, Peekable, Scan, Skip, SkipWhile, Take, TakeUntil,
462462
TakeWhile, Then, Zip,

0 commit comments

Comments
 (0)