diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c49e8ec83b..e7c851dfa5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,7 +36,7 @@ jobs: - windows-latest runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust # --no-self-update is necessary because the windows environment cannot self-update rustup.exe. run: rustup update nightly --no-self-update && rustup default nightly @@ -53,7 +53,7 @@ jobs: - aarch64-unknown-linux-gnu runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - name: Install cross @@ -73,7 +73,7 @@ jobs: - 1.36 runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} # cargo does not support for --features/--no-default-features with workspace, so use cargo-hack instead. @@ -105,7 +105,7 @@ jobs: - 1.45 runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - name: Install cargo-hack @@ -136,7 +136,7 @@ jobs: - nightly runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update ${{ matrix.rust }} && rustup default ${{ matrix.rust }} - name: Install cargo-hack @@ -148,7 +148,7 @@ jobs: name: cargo build -Z minimal-versions runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - name: Install cargo-hack @@ -170,7 +170,7 @@ jobs: - thumbv6m-none-eabi runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: rustup target add ${{ matrix.target }} @@ -202,7 +202,7 @@ jobs: name: cargo bench runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: cargo bench --workspace @@ -212,7 +212,7 @@ jobs: name: cargo hack check --feature-powerset runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - name: Install cargo-hack @@ -237,7 +237,7 @@ jobs: contents: write pull-requests: write steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: ci/no_atomic_cas.sh @@ -270,7 +270,7 @@ jobs: name: cargo miri test runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup toolchain install nightly --component miri && rustup default nightly - run: cargo miri test --workspace --all-features @@ -289,7 +289,7 @@ jobs: - thread runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: rustup component add rust-src @@ -304,7 +304,7 @@ jobs: name: cargo clippy runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup toolchain install nightly --component clippy && rustup default nightly - run: cargo clippy --workspace --all-features --all-targets @@ -313,7 +313,7 @@ jobs: name: cargo fmt runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update stable - run: cargo fmt --all -- --check @@ -322,7 +322,7 @@ jobs: name: cargo doc runs-on: ubuntu-latest steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v3 - name: Install Rust run: rustup update nightly && rustup default nightly - run: RUSTDOCFLAGS="-D warnings --cfg docsrs" cargo doc --workspace --no-deps --all-features diff --git a/futures-executor/src/enter.rs b/futures-executor/src/enter.rs index 5895a9efb6..cb58c30bb7 100644 --- a/futures-executor/src/enter.rs +++ b/futures-executor/src/enter.rs @@ -34,7 +34,7 @@ impl std::error::Error for EnterError {} /// executor. /// /// Executor implementations should call this function before beginning to -/// execute a tasks, and drop the returned [`Enter`](Enter) value after +/// execute a task, and drop the returned [`Enter`](Enter) value after /// completing task execution: /// /// ``` diff --git a/futures-executor/src/local_pool.rs b/futures-executor/src/local_pool.rs index bee96d8db9..9691060725 100644 --- a/futures-executor/src/local_pool.rs +++ b/futures-executor/src/local_pool.rs @@ -106,17 +106,9 @@ fn run_executor) -> Poll>(mut f: F) -> T { }) } -fn poll_executor) -> T>(mut f: F) -> T { - let _enter = enter().expect( - "cannot execute `LocalPool` executor from within \ - another executor", - ); - - CURRENT_THREAD_NOTIFY.with(|thread_notify| { - let waker = waker_ref(thread_notify); - let mut cx = Context::from_waker(&waker); - f(&mut cx) - }) +/// Check for a wakeup, but don't consume it. +fn woken() -> bool { + CURRENT_THREAD_NOTIFY.with(|thread_notify| thread_notify.unparked.load(Ordering::SeqCst)) } impl LocalPool { @@ -212,20 +204,26 @@ impl LocalPool { /// further use of one of the pool's run or poll methods. /// Though only one task will be completed, progress may be made on multiple tasks. pub fn try_run_one(&mut self) -> bool { - poll_executor(|ctx| { + run_executor(|cx| { loop { - let ret = self.poll_pool_once(ctx); - - // return if we have executed a future - if let Poll::Ready(Some(_)) = ret { - return true; + self.drain_incoming(); + + match self.pool.poll_next_unpin(cx) { + // Success! + Poll::Ready(Some(())) => return Poll::Ready(true), + // The pool was empty. + Poll::Ready(None) => return Poll::Ready(false), + Poll::Pending => (), } - // if there are no new incoming futures - // then there is no feature that can make progress - // and we can return without having completed a single future - if self.incoming.borrow().is_empty() { - return false; + if !self.incoming.borrow().is_empty() { + // New tasks were spawned; try again. + continue; + } else if woken() { + // The pool yielded to us, but there's more progress to be made. + return Poll::Pending; + } else { + return Poll::Ready(false); } } }) @@ -257,44 +255,52 @@ impl LocalPool { /// of the pool's run or poll methods. While the function is running, all tasks /// in the pool will try to make progress. pub fn run_until_stalled(&mut self) { - poll_executor(|ctx| { - let _ = self.poll_pool(ctx); + run_executor(|cx| match self.poll_pool(cx) { + // The pool is empty. + Poll::Ready(()) => Poll::Ready(()), + Poll::Pending => { + if woken() { + Poll::Pending + } else { + // We're stalled for now. + Poll::Ready(()) + } + } }); } - // Make maximal progress on the entire pool of spawned task, returning `Ready` - // if the pool is empty and `Pending` if no further progress can be made. + /// Poll `self.pool`, re-filling it with any newly-spawned tasks. + /// Repeat until either the pool is empty, or it returns `Pending`. + /// + /// Returns `Ready` if the pool was empty, and `Pending` otherwise. + /// + /// NOTE: the pool may call `wake`, so `Pending` doesn't necessarily + /// mean that the pool can't make progress. fn poll_pool(&mut self, cx: &mut Context<'_>) -> Poll<()> { - // state for the FuturesUnordered, which will never be used loop { - let ret = self.poll_pool_once(cx); + self.drain_incoming(); - // we queued up some new tasks; add them and poll again + let pool_ret = self.pool.poll_next_unpin(cx); + + // We queued up some new tasks; add them and poll again. if !self.incoming.borrow().is_empty() { continue; } - // no queued tasks; we may be done - match ret { - Poll::Pending => return Poll::Pending, + match pool_ret { + Poll::Ready(Some(())) => continue, Poll::Ready(None) => return Poll::Ready(()), - _ => {} + Poll::Pending => return Poll::Pending, } } } - // Try make minimal progress on the pool of spawned tasks - fn poll_pool_once(&mut self, cx: &mut Context<'_>) -> Poll> { - // empty the incoming queue of newly-spawned tasks - { - let mut incoming = self.incoming.borrow_mut(); - for task in incoming.drain(..) { - self.pool.push(task) - } + /// Empty the incoming queue of newly-spawned tasks. + fn drain_incoming(&mut self) { + let mut incoming = self.incoming.borrow_mut(); + for task in incoming.drain(..) { + self.pool.push(task) } - - // try to execute the next ready future - self.pool.poll_next_unpin(cx) } } diff --git a/futures-executor/tests/local_pool.rs b/futures-executor/tests/local_pool.rs index 8e5e27981d..6e908d2444 100644 --- a/futures-executor/tests/local_pool.rs +++ b/futures-executor/tests/local_pool.rs @@ -1,7 +1,7 @@ use futures::channel::oneshot; use futures::executor::LocalPool; use futures::future::{self, lazy, poll_fn, Future}; -use futures::task::{Context, LocalSpawn, Poll, Spawn, Waker}; +use futures::task::{Context, LocalSpawn, LocalSpawnExt, Poll, Spawn, SpawnExt, Waker}; use std::cell::{Cell, RefCell}; use std::pin::Pin; use std::rc::Rc; @@ -435,3 +435,65 @@ fn park_unpark_independence() { futures::executor::block_on(future) } + +struct SelfWaking { + wakeups_remaining: Rc>, +} + +impl Future for SelfWaking { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if *self.wakeups_remaining.borrow() != 0 { + *self.wakeups_remaining.borrow_mut() -= 1; + cx.waker().wake_by_ref(); + } + + Poll::Pending + } +} + +/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 +/// +/// The issue was that self-waking futures could cause `run_until_stalled` +/// to exit early, even when progress could still be made. +#[test] +fn self_waking_run_until_stalled() { + let wakeups_remaining = Rc::new(RefCell::new(10)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + for _ in 0..3 { + let wakeups_remaining = Rc::clone(&wakeups_remaining); + spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); + } + + // This should keep polling until there are no more wakeups. + pool.run_until_stalled(); + + assert_eq!(*wakeups_remaining.borrow(), 0); +} + +/// Regression test for https://github.com/rust-lang/futures-rs/pull/2593 +/// +/// The issue was that self-waking futures could cause `try_run_one` +/// to exit early, even when progress could still be made. +#[test] +fn self_waking_try_run_one() { + let wakeups_remaining = Rc::new(RefCell::new(10)); + + let mut pool = LocalPool::new(); + let spawner = pool.spawner(); + for _ in 0..3 { + let wakeups_remaining = Rc::clone(&wakeups_remaining); + spawner.spawn_local(SelfWaking { wakeups_remaining }).unwrap(); + } + + spawner.spawn(future::ready(())).unwrap(); + + // The `ready` future should complete. + assert!(pool.try_run_one()); + + // The self-waking futures are each polled once. + assert_eq!(*wakeups_remaining.borrow(), 7); +} diff --git a/futures-util/Cargo.toml b/futures-util/Cargo.toml index 1bcc4c083e..d95dbc60b6 100644 --- a/futures-util/Cargo.toml +++ b/futures-util/Cargo.toml @@ -41,7 +41,7 @@ memchr = { version = "2.2", optional = true } futures_01 = { version = "0.1.25", optional = true, package = "futures" } tokio-io = { version = "0.1.9", optional = true } pin-utils = "0.1.0" -pin-project-lite = "0.2.4" +pin-project-lite = "0.2.6" [dev-dependencies] futures = { path = "../futures", features = ["async-await", "thread-pool"] } diff --git a/futures-util/src/future/select.rs b/futures-util/src/future/select.rs index bd44f20f77..e693a30b00 100644 --- a/futures-util/src/future/select.rs +++ b/futures-util/src/future/select.rs @@ -100,16 +100,17 @@ where fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let (mut a, mut b) = self.inner.take().expect("cannot poll Select twice"); - match a.poll_unpin(cx) { - Poll::Ready(x) => Poll::Ready(Either::Left((x, b))), - Poll::Pending => match b.poll_unpin(cx) { - Poll::Ready(x) => Poll::Ready(Either::Right((x, a))), - Poll::Pending => { - self.inner = Some((a, b)); - Poll::Pending - } - }, + + if let Poll::Ready(val) = a.poll_unpin(cx) { + return Poll::Ready(Either::Left((val, b))); + } + + if let Poll::Ready(val) = b.poll_unpin(cx) { + return Poll::Ready(Either::Right((val, a))); } + + self.inner = Some((a, b)); + Poll::Pending } } diff --git a/futures-util/src/future/try_future/mod.rs b/futures-util/src/future/try_future/mod.rs index fb3bdd8a02..e5bc700714 100644 --- a/futures-util/src/future/try_future/mod.rs +++ b/futures-util/src/future/try_future/mod.rs @@ -302,6 +302,9 @@ pub trait TryFutureExt: TryFuture { /// assert_eq!(future.await, Ok(1)); /// # }); /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select fn map_err(self, f: F) -> MapErr where F: FnOnce(Self::Error) -> E, @@ -332,6 +335,9 @@ pub trait TryFutureExt: TryFuture { /// let future_err_i32 = future_err_u8.err_into::(); /// # }); /// ``` + /// + /// [`join!`]: crate::join + /// [`select!`]: crate::select fn err_into(self) -> ErrInto where Self: Sized, diff --git a/futures-util/src/io/buf_reader.rs b/futures-util/src/io/buf_reader.rs index 2a7e5005cd..31ac183c47 100644 --- a/futures-util/src/io/buf_reader.rs +++ b/futures-util/src/io/buf_reader.rs @@ -74,8 +74,8 @@ impl BufReader { /// the buffer will not be flushed, allowing for more efficient seeks. /// This method does not return the location of the underlying reader, so the caller /// must track this information themselves if it is required. - pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeeKRelative<'_, R> { - SeeKRelative { inner: self, offset, first: true } + pub fn seek_relative(self: Pin<&mut Self>, offset: i64) -> SeekRelative<'_, R> { + SeekRelative { inner: self, offset, first: true } } /// Attempts to seek relative to the current position. If the new position lies within the buffer, @@ -232,13 +232,13 @@ impl AsyncSeek for BufReader { /// Future for the [`BufReader::seek_relative`](self::BufReader::seek_relative) method. #[derive(Debug)] #[must_use = "futures do nothing unless polled"] -pub struct SeeKRelative<'a, R> { +pub struct SeekRelative<'a, R> { inner: Pin<&'a mut BufReader>, offset: i64, first: bool, } -impl Future for SeeKRelative<'_, R> +impl Future for SeekRelative<'_, R> where R: AsyncRead + AsyncSeek, { diff --git a/futures-util/src/io/mod.rs b/futures-util/src/io/mod.rs index dc24a27b01..4f474f757d 100644 --- a/futures-util/src/io/mod.rs +++ b/futures-util/src/io/mod.rs @@ -46,7 +46,7 @@ mod allow_std; pub use self::allow_std::AllowStdIo; mod buf_reader; -pub use self::buf_reader::{BufReader, SeeKRelative}; +pub use self::buf_reader::{BufReader, SeekRelative}; mod buf_writer; pub use self::buf_writer::BufWriter; diff --git a/futures-util/src/stream/mod.rs b/futures-util/src/stream/mod.rs index 5a1f766aaa..5f9412a21e 100644 --- a/futures-util/src/stream/mod.rs +++ b/futures-util/src/stream/mod.rs @@ -100,10 +100,12 @@ mod poll_immediate; pub use self::poll_immediate::{poll_immediate, PollImmediate}; mod select; -pub use self::select::{select, Select}; +pub use self::select::{select, select_early_exit, Select}; mod select_with_strategy; -pub use self::select_with_strategy::{select_with_strategy, PollNext, SelectWithStrategy}; +pub use self::select_with_strategy::{ + select_with_strategy, ClosedStreams, ExitStrategy, PollNext, SelectWithStrategy, +}; mod unfold; pub use self::unfold::{unfold, Unfold}; diff --git a/futures-util/src/stream/select.rs b/futures-util/src/stream/select.rs index 0c1e3af782..16cb21d806 100644 --- a/futures-util/src/stream/select.rs +++ b/futures-util/src/stream/select.rs @@ -1,20 +1,54 @@ use super::assert_stream; -use crate::stream::{select_with_strategy, PollNext, SelectWithStrategy}; +use crate::stream::{ + select_with_strategy, ClosedStreams, ExitStrategy, PollNext, SelectWithStrategy, +}; use core::pin::Pin; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; use pin_project_lite::pin_project; +type PollNextFn = fn(&mut PollNext) -> PollNext; + pin_project! { /// Stream for the [`select()`] function. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] - pub struct Select { + pub struct Select { #[pin] - inner: SelectWithStrategy PollNext, PollNext>, + inner: SelectWithStrategy, + } +} + +#[derive(Debug)] +pub struct ExitWhenBothFinished {} + +impl ExitStrategy for ExitWhenBothFinished { + #[inline] + fn is_terminated(closed_streams: ClosedStreams) -> bool { + match closed_streams { + ClosedStreams::Both => true, + _ => false, + } } } +#[derive(Debug)] +pub struct ExitWhenEitherFinished {} + +impl ExitStrategy for ExitWhenEitherFinished { + #[inline] + fn is_terminated(closed_streams: ClosedStreams) -> bool { + match closed_streams { + ClosedStreams::None => false, + _ => true, + } + } +} + +fn round_robin(last: &mut PollNext) -> PollNext { + last.toggle() +} + /// This function will attempt to pull items from both streams. Each /// stream will be polled in a round-robin fashion, and whenever a stream is /// ready to yield an item that item is yielded. @@ -44,21 +78,31 @@ pin_project! { /// } /// # }); /// ``` -pub fn select(stream1: St1, stream2: St2) -> Select +pub fn select(stream1: St1, stream2: St2) -> Select where St1: Stream, St2: Stream, { - fn round_robin(last: &mut PollNext) -> PollNext { - last.toggle() - } + assert_stream::(Select { + inner: select_with_strategy(stream1, stream2, round_robin), + }) +} +/// Same as `select`, but finishes when either stream finishes +pub fn select_early_exit( + stream1: St1, + stream2: St2, +) -> Select +where + St1: Stream, + St2: Stream, +{ assert_stream::(Select { inner: select_with_strategy(stream1, stream2, round_robin), }) } -impl Select { +impl Select { /// Acquires a reference to the underlying streams that this combinator is /// pulling from. pub fn get_ref(&self) -> (&St1, &St2) { @@ -93,20 +137,22 @@ impl Select { } } -impl FusedStream for Select +impl FusedStream for Select where St1: Stream, St2: Stream, + Exit: ExitStrategy, { fn is_terminated(&self) -> bool { self.inner.is_terminated() } } -impl Stream for Select +impl Stream for Select where St1: Stream, St2: Stream, + Exit: ExitStrategy, { type Item = St1::Item; diff --git a/futures-util/src/stream/select_with_strategy.rs b/futures-util/src/stream/select_with_strategy.rs index 6ccb321aaf..de76b56c42 100644 --- a/futures-util/src/stream/select_with_strategy.rs +++ b/futures-util/src/stream/select_with_strategy.rs @@ -1,5 +1,5 @@ use super::assert_stream; -use crate::stream::{Fuse, StreamExt}; +use core::marker::PhantomData; use core::{fmt, pin::Pin}; use futures_core::stream::{FusedStream, Stream}; use futures_core::task::{Context, Poll}; @@ -19,13 +19,15 @@ impl PollNext { #[must_use] pub fn toggle(&mut self) -> Self { let old = *self; + *self = self.other(); + old + } + fn other(&self) -> PollNext { match self { - PollNext::Left => *self = PollNext::Right, - PollNext::Right => *self = PollNext::Left, + PollNext::Left => PollNext::Right, + PollNext::Right => PollNext::Left, } - - old } } @@ -35,16 +37,57 @@ impl Default for PollNext { } } +/// Which streams are known to have finished? +#[derive(PartialEq, Debug, Eq, Clone, Copy)] +pub enum ClosedStreams { + /// Neither stream has finished + None, + /// The left stream has finished + Left, + /// The right stream has finished + Right, + /// Both streams have finished + Both, +} + +/// A trait for chosing to close the stream before both streams +/// have finished. +pub trait ExitStrategy { + /// Chose whether this stream is terminated based on the termination state of its + /// substreams. + fn is_terminated(closed_streams: ClosedStreams) -> bool; +} + +impl ClosedStreams { + fn finish(&mut self, ps: PollNext) { + match (&self, ps) { + (ClosedStreams::None, PollNext::Left) => { + *self = ClosedStreams::Left; + } + (ClosedStreams::None, PollNext::Right) => { + *self = ClosedStreams::Right; + } + (ClosedStreams::Left, PollNext::Right) | (ClosedStreams::Right, PollNext::Left) => { + *self = ClosedStreams::Both; + } + _ => {} + } + } +} + pin_project! { /// Stream for the [`select_with_strategy()`] function. See function docs for details. #[must_use = "streams do nothing unless polled"] - pub struct SelectWithStrategy { + #[project = SelectWithStrategyProj] + pub struct SelectWithStrategy { #[pin] - stream1: Fuse, + stream1: St1, #[pin] - stream2: Fuse, + stream2: St2, + closed_streams: ClosedStreams, state: State, clos: Clos, + _marker: PhantomData, } } @@ -67,7 +110,8 @@ pin_project! { /// /// ```rust /// # futures::executor::block_on(async { -/// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt }; +/// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt, +/// ClosedStreams, ExitStrategy, SelectWithStrategy }; /// /// let left = repeat(1); /// let right = repeat(2); @@ -78,7 +122,18 @@ pin_project! { /// // use a function pointer instead of a closure. /// fn prio_left(_: &mut ()) -> PollNext { PollNext::Left } /// -/// let mut out = select_with_strategy(left, right, prio_left); +/// struct ExitWhenBothFinished {} +/// +/// impl ExitStrategy for ExitWhenBothFinished { +/// fn is_terminated(closed_streams: ClosedStreams) -> bool { +/// match closed_streams { +/// ClosedStreams::Both => true, +/// _ => false, +/// } +/// } +/// } +/// +/// let mut out: SelectWithStrategy<_, _, _, _, ExitWhenBothFinished> = select_with_strategy(left, right, prio_left); /// /// for _ in 0..100 { /// // Whenever we poll out, we will always get `1`. @@ -89,50 +144,103 @@ pin_project! { /// /// ### Round Robin /// This example shows how to select from both streams round robin. -/// Note: this special case is provided by [`futures-util::stream::select`]. +/// Note: these special cases are provided by [`futures-util::stream::select`]. /// /// ```rust /// # futures::executor::block_on(async { -/// use futures::stream::{ repeat, select_with_strategy, PollNext, StreamExt }; +/// use futures::stream::{ repeat, select_with_strategy, FusedStream, PollNext, StreamExt, +/// ClosedStreams, ExitStrategy, SelectWithStrategy }; /// -/// let left = repeat(1); -/// let right = repeat(2); +/// struct ExitWhenBothFinished {} /// -/// let rrobin = |last: &mut PollNext| last.toggle(); +/// impl ExitStrategy for ExitWhenBothFinished { +/// fn is_terminated(closed_streams: ClosedStreams) -> bool { +/// match closed_streams { +/// ClosedStreams::Both => true, +/// _ => false, +/// } +/// } +/// } /// -/// let mut out = select_with_strategy(left, right, rrobin); +/// struct ExitWhenEitherFinished {} /// -/// for _ in 0..100 { -/// // We should be alternating now. -/// assert_eq!(1, out.select_next_some().await); -/// assert_eq!(2, out.select_next_some().await); +/// impl ExitStrategy for ExitWhenEitherFinished { +/// fn is_terminated(closed_streams: ClosedStreams) -> bool { +/// match closed_streams { +/// ClosedStreams::None => false, +/// _ => true, +/// } +/// } +/// } +/// +/// // Finishes when both streams finish +/// { +/// let left = repeat(1).take(10); +/// let right = repeat(2); +/// +/// let rrobin = |last: &mut PollNext| last.toggle(); +/// +/// let mut out: SelectWithStrategy<_, _, _, _, ExitWhenBothFinished> = select_with_strategy(left, right, rrobin); +/// +/// for _ in 0..10 { +/// // We should be alternating now. +/// assert_eq!(1, out.select_next_some().await); +/// assert_eq!(2, out.select_next_some().await); +/// } +/// for _ in 0..100 { +/// // First stream has finished +/// assert_eq!(2, out.select_next_some().await); +/// } +/// assert!(!out.is_terminated()); +/// } +/// +/// // Finishes when either stream finishes +/// { +/// let left = repeat(1).take(10); +/// let right = repeat(2); +/// +/// let rrobin = |last: &mut PollNext| last.toggle(); +/// +/// let mut out: SelectWithStrategy<_, _, _, _, ExitWhenEitherFinished> = select_with_strategy(left, right, rrobin); +/// +/// for _ in 0..10 { +/// // We should be alternating now. +/// assert_eq!(1, out.select_next_some().await); +/// assert_eq!(2, out.select_next_some().await); +/// } +/// assert_eq!(None, out.next().await); +/// assert!(out.is_terminated()); /// } /// # }); /// ``` -pub fn select_with_strategy( +/// +pub fn select_with_strategy( stream1: St1, stream2: St2, which: Clos, -) -> SelectWithStrategy +) -> SelectWithStrategy where St1: Stream, St2: Stream, Clos: FnMut(&mut State) -> PollNext, State: Default, + Exit: ExitStrategy, { assert_stream::(SelectWithStrategy { - stream1: stream1.fuse(), - stream2: stream2.fuse(), + stream1, + stream2, state: Default::default(), clos: which, + closed_streams: ClosedStreams::None, + _marker: PhantomData, }) } -impl SelectWithStrategy { +impl SelectWithStrategy { /// Acquires a reference to the underlying streams that this combinator is /// pulling from. pub fn get_ref(&self) -> (&St1, &St2) { - (self.stream1.get_ref(), self.stream2.get_ref()) + (&self.stream1, &self.stream2) } /// Acquires a mutable reference to the underlying streams that this @@ -141,7 +249,7 @@ impl SelectWithStrategy { /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. pub fn get_mut(&mut self) -> (&mut St1, &mut St2) { - (self.stream1.get_mut(), self.stream2.get_mut()) + (&mut self.stream1, &mut self.stream2) } /// Acquires a pinned mutable reference to the underlying streams that this @@ -151,7 +259,7 @@ impl SelectWithStrategy { /// stream which may otherwise confuse this combinator. pub fn get_pin_mut(self: Pin<&mut Self>) -> (Pin<&mut St1>, Pin<&mut St2>) { let this = self.project(); - (this.stream1.get_pin_mut(), this.stream2.get_pin_mut()) + (this.stream1, this.stream2) } /// Consumes this combinator, returning the underlying streams. @@ -159,62 +267,110 @@ impl SelectWithStrategy { /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> (St1, St2) { - (self.stream1.into_inner(), self.stream2.into_inner()) + (self.stream1, self.stream2) } } -impl FusedStream for SelectWithStrategy +impl FusedStream for SelectWithStrategy where St1: Stream, St2: Stream, Clos: FnMut(&mut State) -> PollNext, + Exit: ExitStrategy, { fn is_terminated(&self) -> bool { - self.stream1.is_terminated() && self.stream2.is_terminated() + Exit::is_terminated(self.closed_streams) } } -impl Stream for SelectWithStrategy +#[inline] +fn poll_side( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State, Exit>, + side: PollNext, + cx: &mut Context<'_>, +) -> Poll> where St1: Stream, St2: Stream, - Clos: FnMut(&mut State) -> PollNext, { - type Item = St1::Item; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - - match (this.clos)(this.state) { - PollNext::Left => poll_inner(this.stream1, this.stream2, cx), - PollNext::Right => poll_inner(this.stream2, this.stream1, cx), - } + match side { + PollNext::Left => select.stream1.as_mut().poll_next(cx), + PollNext::Right => select.stream2.as_mut().poll_next(cx), } } -fn poll_inner( - a: Pin<&mut St1>, - b: Pin<&mut St2>, +#[inline] +fn poll_inner( + select: &mut SelectWithStrategyProj<'_, St1, St2, Clos, State, Exit>, + side: PollNext, cx: &mut Context<'_>, ) -> Poll> where St1: Stream, St2: Stream, + Exit: ExitStrategy, { - let a_done = match a.poll_next(cx) { + match poll_side(select, side, cx) { Poll::Ready(Some(item)) => return Poll::Ready(Some(item)), - Poll::Ready(None) => true, - Poll::Pending => false, + Poll::Ready(None) => { + select.closed_streams.finish(side); + if Exit::is_terminated(*select.closed_streams) { + return Poll::Ready(None); + } + } + Poll::Pending => (), }; + let other = side.other(); + match poll_side(select, other, cx) { + Poll::Ready(None) => { + select.closed_streams.finish(other); + Poll::Ready(None) + } + a => a, + } +} + +impl Stream for SelectWithStrategy +where + St1: Stream, + St2: Stream, + Clos: FnMut(&mut State) -> PollNext, + Exit: ExitStrategy, +{ + type Item = St1::Item; - match b.poll_next(cx) { - Poll::Ready(Some(item)) => Poll::Ready(Some(item)), - Poll::Ready(None) if a_done => Poll::Ready(None), - Poll::Ready(None) | Poll::Pending => Poll::Pending, + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if self.is_terminated() { + return Poll::Ready(None); + } + + let mut this = self.project(); + + match this.closed_streams { + ClosedStreams::None => { + let next_side = (this.clos)(this.state); + poll_inner(&mut this, next_side, cx) + } + ClosedStreams::Left => match this.stream2.poll_next(cx) { + Poll::Ready(None) => { + *this.closed_streams = ClosedStreams::Both; + Poll::Ready(None) + } + a => a, + }, + ClosedStreams::Right => match this.stream1.poll_next(cx) { + Poll::Ready(None) => { + *this.closed_streams = ClosedStreams::Both; + Poll::Ready(None) + } + a => a, + }, + ClosedStreams::Both => Poll::Ready(None), + } } } -impl fmt::Debug for SelectWithStrategy +impl fmt::Debug for SelectWithStrategy where St1: fmt::Debug, St2: fmt::Debug, diff --git a/futures-util/src/stream/stream/flatten_unordered.rs b/futures-util/src/stream/stream/flatten_unordered.rs index 66ba4d0d55..88006cf235 100644 --- a/futures-util/src/stream/stream/flatten_unordered.rs +++ b/futures-util/src/stream/stream/flatten_unordered.rs @@ -56,14 +56,14 @@ impl SharedPollState { } /// Attempts to start polling, returning stored state in case of success. - /// Returns `None` if some waker is waking at the moment. + /// Returns `None` if either waker is waking at the moment or state is empty. fn start_polling( &self, ) -> Option<(u8, PollStateBomb<'_, impl FnOnce(&SharedPollState) -> u8>)> { let value = self .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - if value & WAKING == NONE { + if value & WAKING == NONE && value & NEED_TO_POLL_ALL != NONE { Some(POLLING) } else { None @@ -99,7 +99,7 @@ impl SharedPollState { }) .ok()?; - // Only start the waking process if we're not in the polling phase and the stream isn't woken already + // Only start the waking process if we're not in the polling/waking phase and the stream isn't woken already if value & (WOKEN | POLLING | WAKING) == NONE { let bomb = PollStateBomb::new(self, SharedPollState::stop_waking); @@ -135,9 +135,10 @@ impl SharedPollState { /// Toggles state to non-waking, allowing to start polling. fn stop_waking(&self) -> u8 { - self.state + let value = self + .state .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| { - let next_value = value & !WAKING; + let next_value = value & !WAKING | WOKEN; if next_value != value { Some(next_value) @@ -145,7 +146,10 @@ impl SharedPollState { None } }) - .unwrap_or_else(identity) + .unwrap_or_else(identity); + + debug_assert!(value & (WOKEN | POLLING | WAKING) == WAKING); + value } /// Resets current state allowing to poll the stream and wake up wakers. @@ -170,11 +174,6 @@ impl<'a, F: FnOnce(&SharedPollState) -> u8> PollStateBomb<'a, F> { fn deactivate(mut self) { self.drop.take(); } - - /// Manually fires the bomb, returning supplied state. - fn fire(mut self) -> Option { - self.drop.take().map(|drop| (drop)(self.state)) - } } impl u8> Drop for PollStateBomb<'_, F> { @@ -225,20 +224,17 @@ impl ArcWake for WrappedWaker { if let Some(inner_waker) = waker_opt.clone() { // Stop waking to allow polling stream - let poll_state_value = state_bomb.fire().unwrap(); + drop(state_bomb); - // We want to call waker only if the stream isn't woken yet - if poll_state_value & (WOKEN | WAKING) == WAKING { - // Wake up inner waker - inner_waker.wake(); - } + // Wake up inner waker + inner_waker.wake(); } } } } pin_project! { - /// Future which contains optional stream. + /// Future which polls optional inner stream. /// /// If it's `Some`, it will attempt to call `poll_next` on it, /// returning `Some((item, next_item_fut))` in case of `Poll::Ready(Some(...))` diff --git a/futures-util/src/stream/stream/mod.rs b/futures-util/src/stream/stream/mod.rs index 384634b004..30d08081f1 100644 --- a/futures-util/src/stream/stream/mod.rs +++ b/futures-util/src/stream/stream/mod.rs @@ -1839,6 +1839,8 @@ pub trait StreamExt: Stream { /// assert_eq!(total, 6); /// # }); /// ``` + /// + /// [`select!`]: crate::select fn select_next_some(&mut self) -> SelectNextSome<'_, Self> where Self: Unpin + FusedStream, diff --git a/futures-util/src/stream/try_stream/into_async_read.rs b/futures-util/src/stream/try_stream/into_async_read.rs index 914b277a02..ffbfc7eae9 100644 --- a/futures-util/src/stream/try_stream/into_async_read.rs +++ b/futures-util/src/stream/try_stream/into_async_read.rs @@ -1,30 +1,26 @@ -use crate::stream::TryStreamExt; use core::pin::Pin; use futures_core::ready; use futures_core::stream::TryStream; use futures_core::task::{Context, Poll}; use futures_io::{AsyncBufRead, AsyncRead, AsyncWrite}; +use pin_project_lite::pin_project; use std::cmp; use std::io::{Error, Result}; -/// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. -#[derive(Debug)] -#[must_use = "readers do nothing unless polled"] -#[cfg_attr(docsrs, doc(cfg(feature = "io")))] -pub struct IntoAsyncRead -where - St: TryStream + Unpin, - St::Ok: AsRef<[u8]>, -{ - stream: St, - state: ReadState, -} - -impl Unpin for IntoAsyncRead -where - St: TryStream + Unpin, - St::Ok: AsRef<[u8]>, -{ +pin_project! { + /// Reader for the [`into_async_read`](super::TryStreamExt::into_async_read) method. + #[derive(Debug)] + #[must_use = "readers do nothing unless polled"] + #[cfg_attr(docsrs, doc(cfg(feature = "io")))] + pub struct IntoAsyncRead + where + St: TryStream, + St::Ok: AsRef<[u8]>, + { + #[pin] + stream: St, + state: ReadState, + } } #[derive(Debug)] @@ -36,7 +32,7 @@ enum ReadState> { impl IntoAsyncRead where - St: TryStream + Unpin, + St: TryStream, St::Ok: AsRef<[u8]>, { pub(super) fn new(stream: St) -> Self { @@ -46,16 +42,18 @@ where impl AsyncRead for IntoAsyncRead where - St: TryStream + Unpin, + St: TryStream, St::Ok: AsRef<[u8]>, { fn poll_read( - mut self: Pin<&mut Self>, + self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { + let mut this = self.project(); + loop { - match &mut self.state { + match this.state { ReadState::Ready { chunk, chunk_start } => { let chunk = chunk.as_ref(); let len = cmp::min(buf.len(), chunk.len() - *chunk_start); @@ -64,23 +62,23 @@ where *chunk_start += len; if chunk.len() == *chunk_start { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } return Poll::Ready(Ok(len)); } - ReadState::PendingChunk => match ready!(self.stream.try_poll_next_unpin(cx)) { + ReadState::PendingChunk => match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(0)); } }, @@ -94,51 +92,52 @@ where impl AsyncWrite for IntoAsyncRead where - St: TryStream + AsyncWrite + Unpin, + St: TryStream + AsyncWrite, St::Ok: AsRef<[u8]>, { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.stream).poll_write(cx, buf) + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + let this = self.project(); + this.stream.poll_write(cx, buf) } - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_flush(cx) + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.stream.poll_flush(cx) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.stream).poll_close(cx) + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.stream.poll_close(cx) } } impl AsyncBufRead for IntoAsyncRead where - St: TryStream + Unpin, + St: TryStream, St::Ok: AsRef<[u8]>, { - fn poll_fill_buf(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - while let ReadState::PendingChunk = self.state { - match ready!(self.stream.try_poll_next_unpin(cx)) { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + + while let ReadState::PendingChunk = this.state { + match ready!(this.stream.as_mut().try_poll_next(cx)) { Some(Ok(chunk)) => { if !chunk.as_ref().is_empty() { - self.state = ReadState::Ready { chunk, chunk_start: 0 }; + *this.state = ReadState::Ready { chunk, chunk_start: 0 }; } } Some(Err(err)) => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Err(err)); } None => { - self.state = ReadState::Eof; + *this.state = ReadState::Eof; return Poll::Ready(Ok(&[])); } } } - if let ReadState::Ready { ref chunk, chunk_start } = self.into_ref().get_ref().state { + if let &mut ReadState::Ready { ref chunk, chunk_start } = this.state { let chunk = chunk.as_ref(); return Poll::Ready(Ok(&chunk[chunk_start..])); } @@ -147,16 +146,18 @@ where Poll::Ready(Ok(&[])) } - fn consume(mut self: Pin<&mut Self>, amount: usize) { + fn consume(self: Pin<&mut Self>, amount: usize) { + let this = self.project(); + // https://github.com/rust-lang/futures-rs/pull/1556#discussion_r281644295 if amount == 0 { return; } - if let ReadState::Ready { chunk, chunk_start } = &mut self.state { + if let ReadState::Ready { chunk, chunk_start } = this.state { *chunk_start += amount; debug_assert!(*chunk_start <= chunk.as_ref().len()); if *chunk_start >= chunk.as_ref().len() { - self.state = ReadState::PendingChunk; + *this.state = ReadState::PendingChunk; } } else { debug_assert!(false, "Attempted to consume from IntoAsyncRead without chunk"); diff --git a/futures-util/src/stream/try_stream/mod.rs b/futures-util/src/stream/try_stream/mod.rs index 0df1218a10..dcfdc70a3b 100644 --- a/futures-util/src/stream/try_stream/mod.rs +++ b/futures-util/src/stream/try_stream/mod.rs @@ -985,12 +985,7 @@ pub trait TryStreamExt: TryStream { Compat::new(self) } - /// Adapter that converts this stream into an [`AsyncRead`](crate::io::AsyncRead). - /// - /// Note that because `into_async_read` moves the stream, the [`Stream`](futures_core::stream::Stream) type must be - /// [`Unpin`]. If you want to use `into_async_read` with a [`!Unpin`](Unpin) stream, you'll - /// first have to pin the stream. This can be done by boxing the stream using [`Box::pin`] - /// or pinning it to the stack using the `pin_mut!` macro from the `pin_utils` crate. + /// Adapter that converts this stream into an [`AsyncBufRead`](crate::io::AsyncBufRead). /// /// This method is only available when the `std` feature of this /// library is activated, and it is activated by default. @@ -1002,12 +997,12 @@ pub trait TryStreamExt: TryStream { /// use futures::stream::{self, TryStreamExt}; /// use futures::io::AsyncReadExt; /// - /// let stream = stream::iter(vec![Ok(vec![1, 2, 3, 4, 5])]); + /// let stream = stream::iter([Ok(vec![1, 2, 3]), Ok(vec![4, 5])]); /// let mut reader = stream.into_async_read(); - /// let mut buf = Vec::new(); /// - /// assert!(reader.read_to_end(&mut buf).await.is_ok()); - /// assert_eq!(buf, &[1, 2, 3, 4, 5]); + /// let mut buf = Vec::new(); + /// reader.read_to_end(&mut buf).await.unwrap(); + /// assert_eq!(buf, [1, 2, 3, 4, 5]); /// # }) /// ``` #[cfg(feature = "io")] @@ -1015,7 +1010,7 @@ pub trait TryStreamExt: TryStream { #[cfg(feature = "std")] fn into_async_read(self) -> IntoAsyncRead where - Self: Sized + TryStreamExt + Unpin, + Self: Sized + TryStreamExt, Self::Ok: AsRef<[u8]>, { crate::io::assert_read(IntoAsyncRead::new(self)) diff --git a/futures-util/src/stream/try_stream/try_flatten_unordered.rs b/futures-util/src/stream/try_stream/try_flatten_unordered.rs index aaad910bf0..e21b514023 100644 --- a/futures-util/src/stream/try_stream/try_flatten_unordered.rs +++ b/futures-util/src/stream/try_stream/try_flatten_unordered.rs @@ -27,7 +27,7 @@ delegate_all!( St: TryStream, St::Ok: TryStream, St::Ok: Unpin, - ::Error: From + ::Error: From ); pin_project! { @@ -40,7 +40,7 @@ pin_project! { St: TryStream, St::Ok: TryStream, St::Ok: Unpin, - ::Error: From + ::Error: From { #[pin] stream: St, @@ -121,10 +121,10 @@ where // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] -impl Sink for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams +impl Sink for TryStreamOfTryStreamsIntoHomogeneousStreamOfTryStreams where St: TryStream + Sink, - St::Ok: Stream> + Unpin, + St::Ok: TryStream + Unpin, ::Error: From<::Error>, { type Error = >::Error; diff --git a/futures/tests/auto_traits.rs b/futures/tests/auto_traits.rs index 558e0bf1d6..99b5fb5af5 100644 --- a/futures/tests/auto_traits.rs +++ b/futures/tests/auto_traits.rs @@ -817,11 +817,11 @@ pub mod io { assert_impl!(Seek<'_, ()>: Unpin); assert_not_impl!(Seek<'_, PhantomPinned>: Unpin); - assert_impl!(SeeKRelative<'_, ()>: Send); - assert_not_impl!(SeeKRelative<'_, *const ()>: Send); - assert_impl!(SeeKRelative<'_, ()>: Sync); - assert_not_impl!(SeeKRelative<'_, *const ()>: Sync); - assert_impl!(SeeKRelative<'_, PhantomPinned>: Unpin); + assert_impl!(SeekRelative<'_, ()>: Send); + assert_not_impl!(SeekRelative<'_, *const ()>: Send); + assert_impl!(SeekRelative<'_, ()>: Sync); + assert_not_impl!(SeekRelative<'_, *const ()>: Sync); + assert_impl!(SeekRelative<'_, PhantomPinned>: Unpin); assert_impl!(Sink: Send); assert_impl!(Sink: Sync); @@ -1536,15 +1536,15 @@ pub mod stream { assert_not_impl!(Scan: Unpin); assert_not_impl!(Scan: Unpin); - assert_impl!(Select<(), ()>: Send); - assert_not_impl!(Select<*const (), ()>: Send); - assert_not_impl!(Select<(), *const ()>: Send); - assert_impl!(Select<(), ()>: Sync); - assert_not_impl!(Select<*const (), ()>: Sync); - assert_not_impl!(Select<(), *const ()>: Sync); - assert_impl!(Select<(), ()>: Unpin); - assert_not_impl!(Select: Unpin); - assert_not_impl!(Select<(), PhantomPinned>: Unpin); + assert_impl!(Select<(), (), ()>: Send); + assert_not_impl!(Select<*const (), (), ()>: Send); + assert_not_impl!(Select<(), *const (), ()>: Send); + assert_impl!(Select<(), (), ()>: Sync); + assert_not_impl!(Select<*const (), (), ()>: Sync); + assert_not_impl!(Select<(), *const (), ()>: Sync); + assert_impl!(Select<(), (), ()>: Unpin); + assert_not_impl!(Select: Unpin); + assert_not_impl!(Select<(), PhantomPinned, ()>: Unpin); assert_impl!(SelectAll<()>: Send); assert_not_impl!(SelectAll<*const ()>: Send); diff --git a/futures/tests/macro-tests/src/main.rs b/futures/tests/macro-tests/src/main.rs index fef24a99e3..c89397ddd3 100644 --- a/futures/tests/macro-tests/src/main.rs +++ b/futures/tests/macro-tests/src/main.rs @@ -4,7 +4,7 @@ fn main() { use futures04::{executor::block_on, future}; // join! macro - let _ = block_on(async { + block_on(async { let _ = futures04::join!(async {}, async {}); let _ = macro_reexport::join!(async {}, async {}); let _ = macro_reexport::join2!(async {}, async {}); @@ -19,48 +19,48 @@ fn main() { }); // select! macro - let _ = block_on(async { + block_on(async { let mut a = future::ready(()); let mut b = future::pending::<()>(); - let _ = futures04::select! { + futures04::select! { _ = a => {}, _ = b => unreachable!(), }; let mut a = future::ready(()); let mut b = future::pending::<()>(); - let _ = macro_reexport::select! { + macro_reexport::select! { _ = a => {}, _ = b => unreachable!(), }; let mut a = future::ready(()); let mut b = future::pending::<()>(); - let _ = macro_reexport::select2! { + macro_reexport::select2! { _ = a => {}, _ = b => unreachable!(), }; }); // select_biased! macro - let _ = block_on(async { + block_on(async { let mut a = future::ready(()); let mut b = future::pending::<()>(); - let _ = futures04::select_biased! { + futures04::select_biased! { _ = a => {}, _ = b => unreachable!(), }; let mut a = future::ready(()); let mut b = future::pending::<()>(); - let _ = macro_reexport::select_biased! { + macro_reexport::select_biased! { _ = a => {}, _ = b => unreachable!(), }; let mut a = future::ready(()); let mut b = future::pending::<()>(); - let _ = macro_reexport::select_biased2! { + macro_reexport::select_biased2! { _ = a => {}, _ = b => unreachable!(), }; diff --git a/futures/tests/stream.rs b/futures/tests/stream.rs index 6781a102d2..0c02179361 100644 --- a/futures/tests/stream.rs +++ b/futures/tests/stream.rs @@ -25,6 +25,22 @@ fn select() { select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5, 6]); } +#[test] +fn select_early_exit() { + fn select_and_compare(a: Vec, b: Vec, expected: Vec) { + let a = stream::iter(a); + let b = stream::iter(b); + let vec = block_on(stream::select_early_exit(a, b).collect::>()); + assert_eq!(vec, expected); + } + + select_and_compare(vec![1, 2, 3], vec![4, 5, 6], vec![1, 4, 2, 5, 3, 6]); + select_and_compare(vec![], vec![4, 5], vec![]); + select_and_compare(vec![4, 5], vec![], vec![4]); + select_and_compare(vec![1, 2, 3], vec![4, 5], vec![1, 4, 2, 5, 3]); + select_and_compare(vec![1, 2], vec![4, 5, 6], vec![1, 4, 2, 5]); +} + #[test] fn flat_map() { block_on(async {