From 80aa2c992a340acdff4e764f03bf75580042a937 Mon Sep 17 00:00:00 2001 From: Tyler Hawkes Date: Thu, 17 Jun 2021 09:58:46 -0600 Subject: [PATCH 1/4] New poll_immediate(_reuse) functions to immediately return from a poll (#1) New poll_immediate(_unpin) functions to immediately return from a poll on futures and streams --- futures-util/src/future/mod.rs | 5 + futures-util/src/future/poll_immediate.rs | 201 ++++++++++++++++++++++ futures-util/src/stream/mod.rs | 3 + futures-util/src/stream/poll_immediate.rs | 80 +++++++++ 4 files changed, 289 insertions(+) create mode 100644 futures-util/src/future/poll_immediate.rs create mode 100644 futures-util/src/stream/poll_immediate.rs diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index 6ae23107f6..d4cf8fae71 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -68,6 +68,11 @@ pub use self::option::OptionFuture; mod poll_fn; pub use self::poll_fn::{poll_fn, PollFn}; +mod poll_immediate; +pub use self::poll_immediate::{ + poll_immediate, poll_immediate_reuse, PollImmediate, PollImmediateReuse, +}; + mod ready; pub use self::ready::{err, ok, ready, Ready}; diff --git a/futures-util/src/future/poll_immediate.rs b/futures-util/src/future/poll_immediate.rs new file mode 100644 index 0000000000..5f3c7a104b --- /dev/null +++ b/futures-util/src/future/poll_immediate.rs @@ -0,0 +1,201 @@ +use super::assert_future; +use crate::FutureExt; +use core::pin::Pin; +use futures_core::task::{Context, Poll}; +use futures_core::{FusedFuture, Future, Stream}; + +/// Future for the [`poll_immediate`](poll_immediate()) function. +#[derive(Debug, Clone)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PollImmediate(Option); + +impl Future for PollImmediate +where + F: Future, +{ + type Output = Option; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + // # Safety + // This is the only time that this future will ever be polled. + let inner = + unsafe { self.get_unchecked_mut().0.take().expect("PollOnce polled after completion") }; + crate::pin_mut!(inner); + match inner.poll(cx) { + Poll::Ready(t) => Poll::Ready(Some(t)), + Poll::Pending => Poll::Ready(None), + } + } +} + +impl FusedFuture for PollImmediate { + fn is_terminated(&self) -> bool { + self.0.is_none() + } +} + +/// Creates a stream that can be polled repeatedly until the future is done +/// ``` +/// # futures::executor::block_on(async { +/// use futures::task::Poll; +/// use futures::{StreamExt, future, pin_mut}; +/// use future::FusedFuture; +/// +/// let f = async { 1_u32 }; +/// pin_mut!(f); +/// let mut r = future::poll_immediate(f); +/// assert_eq!(r.next().await, Some(Poll::Ready(1))); +/// +/// let f = async {futures::pending!(); 42_u8}; +/// pin_mut!(f); +/// let mut p = future::poll_immediate(f); +/// assert_eq!(p.next().await, Some(Poll::Pending)); +/// assert!(!p.is_terminated()); +/// assert_eq!(p.next().await, Some(Poll::Ready(42))); +/// assert!(p.is_terminated()); +/// assert_eq!(p.next().await, None); +/// # }); +/// ``` +impl Stream for PollImmediate +where + F: Future, +{ + type Item = Poll; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + // # Safety + // We never move the inner value until it is done. We only get a reference to it. + let inner = &mut self.get_unchecked_mut().0; + let fut = match inner.as_mut() { + // inner is gone, so we can signal that the stream is closed. + None => return Poll::Ready(None), + Some(inner) => inner, + }; + let fut = Pin::new_unchecked(fut); + Poll::Ready(Some(fut.poll(cx).map(|t| { + // # Safety + // The inner option value is done, so we need to drop it. We do it without moving it + // by using drop in place. We then write over the value without trying to drop it first + // This should uphold all the safety requirements of `Pin` + std::ptr::drop_in_place(inner); + std::ptr::write(inner, None); + t + }))) + } + } +} + +/// Creates a future that is immediately ready with an Option of a value. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let r = future::poll_immediate(async { 1_u32 }); +/// assert_eq!(r.await, Some(1)); +/// +/// let p = future::poll_immediate(future::pending::()); +/// assert_eq!(p.await, None); +/// # }); +/// ``` +pub fn poll_immediate(f: F) -> PollImmediate { + assert_future::, PollImmediate>(PollImmediate(Some(f))) +} + +/// Future for the [`poll_immediate_reuse`](poll_immediate_reuse()) function. +#[derive(Debug, Clone)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PollImmediateReuse(Option); + +impl Future for PollImmediateReuse +where + F: Future + Unpin, +{ + type Output = Result; + + #[inline] + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut inner = self.get_mut().0.take().expect("PollOnceReuse polled after completion"); + match inner.poll_unpin(cx) { + Poll::Ready(t) => Poll::Ready(Ok(t)), + Poll::Pending => Poll::Ready(Err(inner)), + } + } +} + +impl FusedFuture for PollImmediateReuse { + fn is_terminated(&self) -> bool { + self.0.is_none() + } +} + +/// Creates a stream that can be polled repeatedly until the future is done +/// ``` +/// # futures::executor::block_on(async { +/// use futures::task::Poll; +/// use futures::{StreamExt, future}; +/// use future::FusedFuture; +/// +/// let mut r = future::poll_immediate_reuse(future::ready(1_u32)); +/// assert_eq!(r.next().await, Some(Poll::Ready(1))); +/// +/// let mut p = future::poll_immediate_reuse(Box::pin(async {futures::pending!(); 42_u8})); +/// assert_eq!(p.next().await, Some(Poll::Pending)); +/// assert!(!p.is_terminated()); +/// assert_eq!(p.next().await, Some(Poll::Ready(42))); +/// assert!(p.is_terminated()); +/// assert_eq!(p.next().await, None); +/// # }); +/// ``` +impl Stream for PollImmediateReuse +where + F: Future + Unpin, +{ + type Item = Poll; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let inner = &mut self.get_mut().0; + let fut = match inner.as_mut() { + // inner is gone, so we can signal that the stream is closed. + None => return Poll::Ready(None), + Some(inner) => inner, + }; + let fut = Pin::new(fut); + Poll::Ready(Some(fut.poll(cx).map(|t| { + *inner = None; + t + }))) + } +} + +/// Creates a future that is immediately ready with a Result of a value or the future. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::future; +/// +/// let r = future::poll_immediate_reuse(future::ready(1_i32)); +/// assert_eq!(r.await.unwrap(), 1); +/// +/// // futures::pending!() returns pending once and then evaluates to `()` +/// let p = future::poll_immediate_reuse(Box::pin(async { +/// futures::pending!(); +/// 42_u8 +/// })); +/// match p.await { +/// Ok(_) => unreachable!(), +/// Err(e) => { +/// assert_eq!(e.await, 42); +/// } +/// } +/// # }); +/// ``` +pub fn poll_immediate_reuse(f: F) -> PollImmediateReuse { + assert_future::, PollImmediateReuse>(PollImmediateReuse(Some(f))) +} diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index f0db571413..3b2d4c6b2a 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -89,6 +89,9 @@ pub use self::pending::{pending, Pending}; mod poll_fn; pub use self::poll_fn::{poll_fn, PollFn}; +mod poll_immediate; +pub use self::poll_immediate::{poll_immediate, PollImmediate}; + mod select; pub use self::select::{select, Select}; diff --git a/futures-util/src/stream/poll_immediate.rs b/futures-util/src/stream/poll_immediate.rs new file mode 100644 index 0000000000..d945ea493b --- /dev/null +++ b/futures-util/src/stream/poll_immediate.rs @@ -0,0 +1,80 @@ +use futures_core::task::{Context, Poll}; +use futures_core::Stream; +use std::pin::Pin; + +/// Stream for the [`poll_immediate`](poll_immediate()) function. +#[derive(Debug, Clone)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PollImmediate(Option); + +impl Stream for PollImmediate +where + S: Stream, +{ + type Item = Poll; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + unsafe { + // # Safety + // We never move the inner value until it is done. We only get a reference to it. + let inner = &mut self.get_unchecked_mut().0; + let fut = match inner.as_mut() { + // inner is gone, so we can continue to signal that the stream is closed. + None => return Poll::Ready(None), + Some(inner) => inner, + }; + let stream = Pin::new_unchecked(fut); + match stream.poll_next(cx) { + Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))), + Poll::Ready(None) => { + // # Safety + // The inner stream is done, so we need to drop it. We do it without moving it + // by using drop in place. We then write over the value without trying to drop it first + // This should uphold all the safety requirements of `Pin` + std::ptr::drop_in_place(inner); + std::ptr::write(inner, None); + Poll::Ready(None) + } + Poll::Pending => Poll::Ready(Some(Poll::Pending)), + } + } + } + + fn size_hint(&self) -> (usize, Option) { + self.0.as_ref().map_or((0, Some(0)), Stream::size_hint) + } +} + +impl super::FusedStream for PollImmediate { + fn is_terminated(&self) -> bool { + self.0.is_none() + } +} + +/// Creates a new stream that never blocks when awaiting it. This is useful +/// when immediacy is more important than waiting for the next item to be ready +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use futures::stream::{self, StreamExt}; +/// use futures::task::Poll; +/// +/// let mut r = stream::poll_immediate(Box::pin(stream::iter(1_u32..3))); +/// assert_eq!(r.next().await, Some(Poll::Ready(1))); +/// assert_eq!(r.next().await, Some(Poll::Ready(2))); +/// assert_eq!(r.next().await, None); +/// +/// let mut p = stream::poll_immediate(Box::pin(stream::once(async { +/// futures::pending!(); +/// 42_u8 +/// }))); +/// assert_eq!(p.next().await, Some(Poll::Pending)); +/// assert_eq!(p.next().await, Some(Poll::Ready(42))); +/// assert_eq!(p.next().await, None); +/// # }); +/// ``` +pub fn poll_immediate(s: S) -> PollImmediate { + super::assert_stream::, PollImmediate>(PollImmediate(Some(s))) +} From 559f709d73d3d90ac8adce97d72aad7fd7a179a3 Mon Sep 17 00:00:00 2001 From: Tyler Hawkes Date: Fri, 18 Jun 2021 17:32:01 -0600 Subject: [PATCH 2/4] Removed all unsafe code. Fixing build failures. --- futures-util/src/future/poll_immediate.rs | 57 +++++++++++----------- futures-util/src/stream/poll_immediate.rs | 59 +++++++++++------------ 2 files changed, 55 insertions(+), 61 deletions(-) diff --git a/futures-util/src/future/poll_immediate.rs b/futures-util/src/future/poll_immediate.rs index 5f3c7a104b..463c6e888c 100644 --- a/futures-util/src/future/poll_immediate.rs +++ b/futures-util/src/future/poll_immediate.rs @@ -3,11 +3,17 @@ use crate::FutureExt; use core::pin::Pin; use futures_core::task::{Context, Poll}; use futures_core::{FusedFuture, Future, Stream}; +use pin_project_lite::pin_project; -/// Future for the [`poll_immediate`](poll_immediate()) function. -#[derive(Debug, Clone)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct PollImmediate(Option); +pin_project! { + /// Future for the [`poll_immediate`](poll_immediate()) function. + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PollImmediate { + #[pin] + future: Option + } +} impl Future for PollImmediate where @@ -17,13 +23,14 @@ where #[inline] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // # Safety - // This is the only time that this future will ever be polled. + let mut this = self.project(); let inner = - unsafe { self.get_unchecked_mut().0.take().expect("PollOnce polled after completion") }; - crate::pin_mut!(inner); + this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion"); match inner.poll(cx) { - Poll::Ready(t) => Poll::Ready(Some(t)), + Poll::Ready(t) => { + this.future.set(None); + Poll::Ready(Some(t)) + } Poll::Pending => Poll::Ready(None), } } @@ -31,7 +38,7 @@ where impl FusedFuture for PollImmediate { fn is_terminated(&self) -> bool { - self.0.is_none() + self.future.is_none() } } @@ -64,25 +71,14 @@ where type Item = Poll; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - // # Safety - // We never move the inner value until it is done. We only get a reference to it. - let inner = &mut self.get_unchecked_mut().0; - let fut = match inner.as_mut() { - // inner is gone, so we can signal that the stream is closed. - None => return Poll::Ready(None), - Some(inner) => inner, - }; - let fut = Pin::new_unchecked(fut); - Poll::Ready(Some(fut.poll(cx).map(|t| { - // # Safety - // The inner option value is done, so we need to drop it. We do it without moving it - // by using drop in place. We then write over the value without trying to drop it first - // This should uphold all the safety requirements of `Pin` - std::ptr::drop_in_place(inner); - std::ptr::write(inner, None); + let mut this = self.project(); + match this.future.as_mut().as_pin_mut() { + // inner is gone, so we can signal that the stream is closed. + None => Poll::Ready(None), + Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| { + this.future.set(None); t - }))) + }))), } } } @@ -103,7 +99,7 @@ where /// # }); /// ``` pub fn poll_immediate(f: F) -> PollImmediate { - assert_future::, PollImmediate>(PollImmediate(Some(f))) + assert_future::, PollImmediate>(PollImmediate { future: Some(f) }) } /// Future for the [`poll_immediate_reuse`](poll_immediate_reuse()) function. @@ -119,7 +115,8 @@ where #[inline] fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut inner = self.get_mut().0.take().expect("PollOnceReuse polled after completion"); + let mut inner = + self.get_mut().0.take().expect("PollImmediateReuse polled after completion"); match inner.poll_unpin(cx) { Poll::Ready(t) => Poll::Ready(Ok(t)), Poll::Pending => Poll::Ready(Err(inner)), diff --git a/futures-util/src/stream/poll_immediate.rs b/futures-util/src/stream/poll_immediate.rs index d945ea493b..1bd4d4cd9e 100644 --- a/futures-util/src/stream/poll_immediate.rs +++ b/futures-util/src/stream/poll_immediate.rs @@ -1,53 +1,50 @@ +use core::pin::Pin; use futures_core::task::{Context, Poll}; use futures_core::Stream; -use std::pin::Pin; +use pin_project_lite::pin_project; -/// Stream for the [`poll_immediate`](poll_immediate()) function. -#[derive(Debug, Clone)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct PollImmediate(Option); +pin_project! { + /// Stream for the [`poll_immediate`](poll_immediate()) function. + #[derive(Debug, Clone)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PollImmediate { + #[pin] + stream: Option + } +} impl Stream for PollImmediate where - S: Stream, + S: Stream, { type Item = Poll; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - unsafe { - // # Safety - // We never move the inner value until it is done. We only get a reference to it. - let inner = &mut self.get_unchecked_mut().0; - let fut = match inner.as_mut() { - // inner is gone, so we can continue to signal that the stream is closed. - None => return Poll::Ready(None), - Some(inner) => inner, - }; - let stream = Pin::new_unchecked(fut); - match stream.poll_next(cx) { - Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))), - Poll::Ready(None) => { - // # Safety - // The inner stream is done, so we need to drop it. We do it without moving it - // by using drop in place. We then write over the value without trying to drop it first - // This should uphold all the safety requirements of `Pin` - std::ptr::drop_in_place(inner); - std::ptr::write(inner, None); - Poll::Ready(None) - } - Poll::Pending => Poll::Ready(Some(Poll::Pending)), + let mut this = self.project(); + let stream = match this.stream.as_mut().as_pin_mut() { + // inner is gone, so we can continue to signal that the stream is closed. + None => return Poll::Ready(None), + Some(inner) => inner, + }; + + match stream.poll_next(cx) { + Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))), + Poll::Ready(None) => { + this.stream.set(None); + Poll::Ready(None) } + Poll::Pending => Poll::Ready(Some(Poll::Pending)), } } fn size_hint(&self) -> (usize, Option) { - self.0.as_ref().map_or((0, Some(0)), Stream::size_hint) + self.stream.as_ref().map_or((0, Some(0)), Stream::size_hint) } } impl super::FusedStream for PollImmediate { fn is_terminated(&self) -> bool { - self.0.is_none() + self.stream.is_none() } } @@ -76,5 +73,5 @@ impl super::FusedStream for PollImmediate { /// # }); /// ``` pub fn poll_immediate(s: S) -> PollImmediate { - super::assert_stream::, PollImmediate>(PollImmediate(Some(s))) + super::assert_stream::, PollImmediate>(PollImmediate { stream: Some(s) }) } From a444ac02231849119ffc43aabedfdae1a9c3b518 Mon Sep 17 00:00:00 2001 From: Tyler Hawkes Date: Thu, 24 Jun 2021 15:32:15 -0600 Subject: [PATCH 3/4] Making docs better and adding another example for how to reuse a future. Removing the poll_immediate_reuse function. --- futures-util/src/future/mod.rs | 4 +- futures-util/src/future/poll_immediate.rs | 114 ++++------------------ futures-util/src/stream/poll_immediate.rs | 9 +- 3 files changed, 28 insertions(+), 99 deletions(-) diff --git a/futures-util/src/future/mod.rs b/futures-util/src/future/mod.rs index d4cf8fae71..e3838b7ad2 100644 --- a/futures-util/src/future/mod.rs +++ b/futures-util/src/future/mod.rs @@ -69,9 +69,7 @@ mod poll_fn; pub use self::poll_fn::{poll_fn, PollFn}; mod poll_immediate; -pub use self::poll_immediate::{ - poll_immediate, poll_immediate_reuse, PollImmediate, PollImmediateReuse, -}; +pub use self::poll_immediate::{poll_immediate, PollImmediate}; mod ready; pub use self::ready::{err, ok, ready, Ready}; diff --git a/futures-util/src/future/poll_immediate.rs b/futures-util/src/future/poll_immediate.rs index 463c6e888c..922719a19e 100644 --- a/futures-util/src/future/poll_immediate.rs +++ b/futures-util/src/future/poll_immediate.rs @@ -1,5 +1,4 @@ use super::assert_future; -use crate::FutureExt; use core::pin::Pin; use futures_core::task::{Context, Poll}; use futures_core::{FusedFuture, Future, Stream}; @@ -7,6 +6,8 @@ use pin_project_lite::pin_project; pin_project! { /// Future for the [`poll_immediate`](poll_immediate()) function. + /// + /// It will never return [Poll::Pending](core::task::Poll::Pending) #[derive(Debug, Clone)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct PollImmediate { @@ -42,7 +43,9 @@ impl FusedFuture for PollImmediate { } } -/// Creates a stream that can be polled repeatedly until the future is done +/// A [Stream](crate::stream::Stream) implementation that can be polled repeatedly until the future is done. +/// The stream will never return [Poll::Pending](core::task::Poll::Pending) +/// so polling it in a tight loop is worse than using a blocking synchronous function. /// ``` /// # futures::executor::block_on(async { /// use futures::task::Poll; @@ -84,6 +87,14 @@ where } /// Creates a future that is immediately ready with an Option of a value. +/// Specifically this means that [poll](core::future::Future::poll()) always returns [Poll::Ready](core::task::Poll::Ready). +/// +/// # Caution +/// +/// Some futures expect to run until completion. If the future passed to this function isn't some sort of `&mut Future` then it will +/// be dropped and can cause performance problems when creating and dropping futures continuously. In some cases this is fine like +/// when asking for the [next()](crate::stream::StreamExt::next()) value of a stream and dropping the [Next](crate::stream::Next) future +/// doesn't change any state in the stream. /// /// # Examples /// @@ -98,101 +109,18 @@ where /// assert_eq!(p.await, None); /// # }); /// ``` -pub fn poll_immediate(f: F) -> PollImmediate { - assert_future::, PollImmediate>(PollImmediate { future: Some(f) }) -} - -/// Future for the [`poll_immediate_reuse`](poll_immediate_reuse()) function. -#[derive(Debug, Clone)] -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct PollImmediateReuse(Option); - -impl Future for PollImmediateReuse -where - F: Future + Unpin, -{ - type Output = Result; - - #[inline] - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut inner = - self.get_mut().0.take().expect("PollImmediateReuse polled after completion"); - match inner.poll_unpin(cx) { - Poll::Ready(t) => Poll::Ready(Ok(t)), - Poll::Pending => Poll::Ready(Err(inner)), - } - } -} - -impl FusedFuture for PollImmediateReuse { - fn is_terminated(&self) -> bool { - self.0.is_none() - } -} - -/// Creates a stream that can be polled repeatedly until the future is done -/// ``` -/// # futures::executor::block_on(async { -/// use futures::task::Poll; -/// use futures::{StreamExt, future}; -/// use future::FusedFuture; -/// -/// let mut r = future::poll_immediate_reuse(future::ready(1_u32)); -/// assert_eq!(r.next().await, Some(Poll::Ready(1))); -/// -/// let mut p = future::poll_immediate_reuse(Box::pin(async {futures::pending!(); 42_u8})); -/// assert_eq!(p.next().await, Some(Poll::Pending)); -/// assert!(!p.is_terminated()); -/// assert_eq!(p.next().await, Some(Poll::Ready(42))); -/// assert!(p.is_terminated()); -/// assert_eq!(p.next().await, None); -/// # }); -/// ``` -impl Stream for PollImmediateReuse -where - F: Future + Unpin, -{ - type Item = Poll; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let inner = &mut self.get_mut().0; - let fut = match inner.as_mut() { - // inner is gone, so we can signal that the stream is closed. - None => return Poll::Ready(None), - Some(inner) => inner, - }; - let fut = Pin::new(fut); - Poll::Ready(Some(fut.poll(cx).map(|t| { - *inner = None; - t - }))) - } -} - -/// Creates a future that is immediately ready with a Result of a value or the future. /// -/// # Examples +/// ### Reusing a future /// /// ``` /// # futures::executor::block_on(async { -/// use futures::future; -/// -/// let r = future::poll_immediate_reuse(future::ready(1_i32)); -/// assert_eq!(r.await.unwrap(), 1); -/// -/// // futures::pending!() returns pending once and then evaluates to `()` -/// let p = future::poll_immediate_reuse(Box::pin(async { -/// futures::pending!(); -/// 42_u8 -/// })); -/// match p.await { -/// Ok(_) => unreachable!(), -/// Err(e) => { -/// assert_eq!(e.await, 42); -/// } -/// } +/// use futures::{future, pin_mut}; +/// let f = async {futures::pending!(); 42_u8}; +/// pin_mut!(f); +/// assert_eq!(None, future::poll_immediate(&mut f).await); +/// assert_eq!(42, f.await); /// # }); /// ``` -pub fn poll_immediate_reuse(f: F) -> PollImmediateReuse { - assert_future::, PollImmediateReuse>(PollImmediateReuse(Some(f))) +pub fn poll_immediate(f: F) -> PollImmediate { + assert_future::, PollImmediate>(PollImmediate { future: Some(f) }) } diff --git a/futures-util/src/stream/poll_immediate.rs b/futures-util/src/stream/poll_immediate.rs index 1bd4d4cd9e..c7e8a5b3c6 100644 --- a/futures-util/src/stream/poll_immediate.rs +++ b/futures-util/src/stream/poll_immediate.rs @@ -4,7 +4,9 @@ use futures_core::Stream; use pin_project_lite::pin_project; pin_project! { - /// Stream for the [`poll_immediate`](poll_immediate()) function. + /// Stream for the [poll_immediate](poll_immediate()) function. + /// + /// It will never return [Poll::Pending](core::task::Poll::Pending) #[derive(Debug, Clone)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct PollImmediate { @@ -48,8 +50,9 @@ impl super::FusedStream for PollImmediate { } } -/// Creates a new stream that never blocks when awaiting it. This is useful -/// when immediacy is more important than waiting for the next item to be ready +/// Creates a new stream that always immediately returns [Poll::Ready](core::task::Poll::Ready) when awaiting it. +/// +/// This is useful when immediacy is more important than waiting for the next item to be ready. /// /// # Examples /// From 4514f919076e5da437fadc12971ea80174d40d8b Mon Sep 17 00:00:00 2001 From: Tyler Hawkes Date: Thu, 29 Jul 2021 09:24:38 -0600 Subject: [PATCH 4/4] Update futures-util/src/future/poll_immediate.rs Co-authored-by: Taiki Endo --- futures-util/src/future/poll_immediate.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/futures-util/src/future/poll_immediate.rs b/futures-util/src/future/poll_immediate.rs index 922719a19e..5ae555c73e 100644 --- a/futures-util/src/future/poll_immediate.rs +++ b/futures-util/src/future/poll_immediate.rs @@ -91,10 +91,10 @@ where /// /// # Caution /// -/// Some futures expect to run until completion. If the future passed to this function isn't some sort of `&mut Future` then it will -/// be dropped and can cause performance problems when creating and dropping futures continuously. In some cases this is fine like -/// when asking for the [next()](crate::stream::StreamExt::next()) value of a stream and dropping the [Next](crate::stream::Next) future -/// doesn't change any state in the stream. +/// When consuming the future by this function, note the following: +/// +/// - This function does not guarantee that the future will run to completion, so it is generally incompatible with passing the non-cancellation-safe future by value. +/// - Even if the future is cancellation-safe, creating and dropping new futures frequently may lead to performance problems. /// /// # Examples ///