diff --git a/src/stream/mod.rs b/src/stream/mod.rs index a08c82786..ec1c23bc8 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -27,7 +27,7 @@ pub use from_stream::FromStream; pub use into_stream::IntoStream; pub use once::{once, Once}; pub use repeat::{repeat, Repeat}; -pub use stream::{Scan, Stream, Take, Zip}; +pub use stream::{Fuse, Scan, Stream, Take, Zip}; mod double_ended_stream; mod empty; diff --git a/src/stream/stream/fuse.rs b/src/stream/stream/fuse.rs new file mode 100644 index 000000000..354193700 --- /dev/null +++ b/src/stream/stream/fuse.rs @@ -0,0 +1,33 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +/// A `Stream` that is permanently closed once a single call to `poll` results in +/// `Poll::Ready(None)`, returning `Poll::Ready(None)` for all future calls to `poll`. +#[derive(Clone, Debug)] +pub struct Fuse { + pub(crate) stream: S, + pub(crate) done: bool, +} + +impl Unpin for Fuse {} + +impl Fuse { + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(done: bool); +} + +impl futures_core::Stream for Fuse { + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.done { + Poll::Ready(None) + } else { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + if next.is_none() { + *self.as_mut().done() = true; + } + Poll::Ready(next) + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index ca83fbb8a..e13e2ebc7 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -28,6 +28,7 @@ mod filter_map; mod find; mod find_map; mod fold; +mod fuse; mod min_by; mod next; mod nth; @@ -35,6 +36,7 @@ mod scan; mod take; mod zip; +pub use fuse::Fuse; pub use scan::Scan; pub use take::Take; pub use zip::Zip; @@ -246,6 +248,35 @@ pub trait Stream { Enumerate::new(self) } + /// Transforms this `Stream` into a "fused" `Stream` such that after the first time `poll` + /// returns `Poll::Ready(None)`, all future calls to `poll` will also return + /// `Poll::Ready(None)`. + /// + /// # Examples + /// + /// ``` + /// # fn main() { async_std::task::block_on(async { + /// # + /// use async_std::prelude::*; + /// use async_std::stream; + /// + /// let mut s = stream::once(1).fuse(); + /// assert_eq!(s.next().await, Some(1)); + /// assert_eq!(s.next().await, None); + /// assert_eq!(s.next().await, None); + /// # + /// # }) } + /// ``` + fn fuse(self) -> Fuse + where + Self: Sized, + { + Fuse { + stream: self, + done: false, + } + } + /// Both filters and maps a stream. /// /// # Examples