Skip to content

Commit

Permalink
New poll_immediate functions to immediately return from a poll (#2452)
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerhawkes authored and taiki-e committed Aug 28, 2021
1 parent b57860f commit ae422d8
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 0 deletions.
3 changes: 3 additions & 0 deletions futures-util/src/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ pub use self::option::OptionFuture;
mod poll_fn;
pub use self::poll_fn::{poll_fn, PollFn};

mod poll_immediate;
pub use self::poll_immediate::{poll_immediate, PollImmediate};

mod ready;
pub use self::ready::{err, ok, ready, Ready};

Expand Down
126 changes: 126 additions & 0 deletions futures-util/src/future/poll_immediate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use super::assert_future;
use core::pin::Pin;
use futures_core::task::{Context, Poll};
use futures_core::{FusedFuture, Future, Stream};
use pin_project_lite::pin_project;

pin_project! {
/// Future for the [`poll_immediate`](poll_immediate()) function.
///
/// It will never return [Poll::Pending](core::task::Poll::Pending)
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PollImmediate<T> {
#[pin]
future: Option<T>
}
}

impl<T, F> Future for PollImmediate<F>
where
F: Future<Output = T>,
{
type Output = Option<T>;

#[inline]
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
let mut this = self.project();
let inner =
this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion");
match inner.poll(cx) {
Poll::Ready(t) => {
this.future.set(None);
Poll::Ready(Some(t))
}
Poll::Pending => Poll::Ready(None),
}
}
}

impl<T: Future> FusedFuture for PollImmediate<T> {
fn is_terminated(&self) -> bool {
self.future.is_none()
}
}

/// A [Stream](crate::stream::Stream) implementation that can be polled repeatedly until the future is done.
/// The stream will never return [Poll::Pending](core::task::Poll::Pending)
/// so polling it in a tight loop is worse than using a blocking synchronous function.
/// ```
/// # futures::executor::block_on(async {
/// use futures::task::Poll;
/// use futures::{StreamExt, future, pin_mut};
/// use future::FusedFuture;
///
/// let f = async { 1_u32 };
/// pin_mut!(f);
/// let mut r = future::poll_immediate(f);
/// assert_eq!(r.next().await, Some(Poll::Ready(1)));
///
/// let f = async {futures::pending!(); 42_u8};
/// pin_mut!(f);
/// let mut p = future::poll_immediate(f);
/// assert_eq!(p.next().await, Some(Poll::Pending));
/// assert!(!p.is_terminated());
/// assert_eq!(p.next().await, Some(Poll::Ready(42)));
/// assert!(p.is_terminated());
/// assert_eq!(p.next().await, None);
/// # });
/// ```
impl<T, F> Stream for PollImmediate<F>
where
F: Future<Output = T>,
{
type Item = Poll<T>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
match this.future.as_mut().as_pin_mut() {
// inner is gone, so we can signal that the stream is closed.
None => Poll::Ready(None),
Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| {
this.future.set(None);
t
}))),
}
}
}

/// Creates a future that is immediately ready with an Option of a value.
/// Specifically this means that [poll](core::future::Future::poll()) always returns [Poll::Ready](core::task::Poll::Ready).
///
/// # Caution
///
/// When consuming the future by this function, note the following:
///
/// - This function does not guarantee that the future will run to completion, so it is generally incompatible with passing the non-cancellation-safe future by value.
/// - Even if the future is cancellation-safe, creating and dropping new futures frequently may lead to performance problems.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::future;
///
/// let r = future::poll_immediate(async { 1_u32 });
/// assert_eq!(r.await, Some(1));
///
/// let p = future::poll_immediate(future::pending::<i32>());
/// assert_eq!(p.await, None);
/// # });
/// ```
///
/// ### Reusing a future
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::{future, pin_mut};
/// let f = async {futures::pending!(); 42_u8};
/// pin_mut!(f);
/// assert_eq!(None, future::poll_immediate(&mut f).await);
/// assert_eq!(42, f.await);
/// # });
/// ```
pub fn poll_immediate<F: Future>(f: F) -> PollImmediate<F> {
assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate { future: Some(f) })
}
3 changes: 3 additions & 0 deletions futures-util/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ pub use self::pending::{pending, Pending};
mod poll_fn;
pub use self::poll_fn::{poll_fn, PollFn};

mod poll_immediate;
pub use self::poll_immediate::{poll_immediate, PollImmediate};

mod select;
pub use self::select::{select, Select};

Expand Down
80 changes: 80 additions & 0 deletions futures-util/src/stream/poll_immediate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use core::pin::Pin;
use futures_core::task::{Context, Poll};
use futures_core::Stream;
use pin_project_lite::pin_project;

pin_project! {
/// Stream for the [poll_immediate](poll_immediate()) function.
///
/// It will never return [Poll::Pending](core::task::Poll::Pending)
#[derive(Debug, Clone)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct PollImmediate<S> {
#[pin]
stream: Option<S>
}
}

impl<T, S> Stream for PollImmediate<S>
where
S: Stream<Item = T>,
{
type Item = Poll<T>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let stream = match this.stream.as_mut().as_pin_mut() {
// inner is gone, so we can continue to signal that the stream is closed.
None => return Poll::Ready(None),
Some(inner) => inner,
};

match stream.poll_next(cx) {
Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))),
Poll::Ready(None) => {
this.stream.set(None);
Poll::Ready(None)
}
Poll::Pending => Poll::Ready(Some(Poll::Pending)),
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.as_ref().map_or((0, Some(0)), Stream::size_hint)
}
}

impl<S: Stream> super::FusedStream for PollImmediate<S> {
fn is_terminated(&self) -> bool {
self.stream.is_none()
}
}

/// Creates a new stream that always immediately returns [Poll::Ready](core::task::Poll::Ready) when awaiting it.
///
/// This is useful when immediacy is more important than waiting for the next item to be ready.
///
/// # Examples
///
/// ```
/// # futures::executor::block_on(async {
/// use futures::stream::{self, StreamExt};
/// use futures::task::Poll;
///
/// let mut r = stream::poll_immediate(Box::pin(stream::iter(1_u32..3)));
/// assert_eq!(r.next().await, Some(Poll::Ready(1)));
/// assert_eq!(r.next().await, Some(Poll::Ready(2)));
/// assert_eq!(r.next().await, None);
///
/// let mut p = stream::poll_immediate(Box::pin(stream::once(async {
/// futures::pending!();
/// 42_u8
/// })));
/// assert_eq!(p.next().await, Some(Poll::Pending));
/// assert_eq!(p.next().await, Some(Poll::Ready(42)));
/// assert_eq!(p.next().await, None);
/// # });
/// ```
pub fn poll_immediate<S: Stream>(s: S) -> PollImmediate<S> {
super::assert_stream::<Poll<S::Item>, PollImmediate<S>>(PollImmediate { stream: Some(s) })
}

0 comments on commit ae422d8

Please sign in to comment.