From f7464a40a1d798ce6ef29d5a5b32892247689737 Mon Sep 17 00:00:00 2001 From: Imbris Date: Wed, 2 Oct 2024 01:06:08 -0400 Subject: [PATCH] Leak internal queue in FuturesUnordered drop if dropping a future panics and avoid potential abort if we try to continue dropping futures and another one panics --- .github/workflows/ci.yml | 10 ++++- .../src/stream/futures_unordered/mod.rs | 39 +++++++------------ 2 files changed, 23 insertions(+), 26 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 744168c09..cc52ae682 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -246,11 +246,17 @@ jobs: - uses: taiki-e/checkout-action@v1 - name: Install Rust run: rustup toolchain install nightly --component miri && rustup default nightly - - run: cargo miri test --workspace --all-features + - run: cargo miri test --workspace --all-features -- --skip panic_on_drop_fut env: MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation RUSTDOCFLAGS: ${{ env.RUSTDOCFLAGS }} -Z randomize-layout RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout + # This test is expected to leak. + - run: cargo miri test --workspace --all-features --test stream_futures_unordered -- panic_on_drop_fut + env: + MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation -Zmiri-ignore-leaks + RUSTDOCFLAGS: ${{ env.RUSTDOCFLAGS }} -Z randomize-layout + RUSTFLAGS: ${{ env.RUSTFLAGS }} -Z randomize-layout san: name: cargo test -Z sanitizer=${{ matrix.sanitizer }} @@ -269,7 +275,7 @@ jobs: run: rustup toolchain install nightly --component rust-src && rustup default nightly # https://github.com/google/sanitizers/issues/1716 / https://github.com/actions/runner-images/issues/9491 - run: sudo sysctl vm.mmap_rnd_bits=28 - - run: cargo -Z build-std test --workspace --all-features --target x86_64-unknown-linux-gnu --lib --tests + - run: cargo -Z build-std test --workspace --all-features --target x86_64-unknown-linux-gnu --lib --tests -- --skip panic_on_drop_fut env: # TODO: Once `cfg(sanitize = "..")` is stable, replace # `cfg(futures_sanitizer)` with `cfg(sanitize = "..")` and remove diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index d4ee16937..913e260fd 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -576,33 +576,24 @@ impl Drop for FuturesUnordered { // Before the strong reference to the queue is dropped we need all // futures to be dropped. See note at the bottom of this method. // - // This ensures we drop all futures even in the case of one drop - // panicking and unwinding. - // - // If a second future panics, that will trigger an abort from panicking - // while panicking. - struct DropGuard<'a, Fut>(&'a mut FuturesUnordered); - impl DropGuard<'_, Fut> { - fn drop_futures(&mut self) { - // When a `FuturesUnordered` is dropped we want to drop all futures - // associated with it. At the same time though there may be tons of - // wakers flying around which contain `Task` references - // inside them. We'll let those naturally get deallocated. - while !self.0.head_all.get_mut().is_null() { - let head = *self.0.head_all.get_mut(); - let task = unsafe { self.0.unlink(head) }; - self.0.release_task(task); - } - } - } - impl Drop for DropGuard<'_, Fut> { + // If there is a panic before this completes, we leak the queue. + struct LeakQueueOnDrop<'a, Fut>(&'a mut FuturesUnordered); + impl Drop for LeakQueueOnDrop<'_, Fut> { fn drop(&mut self) { - self.drop_futures(); + mem::forget(Arc::clone(&self.0.ready_to_run_queue)); } } - let mut guard = DropGuard(self); - guard.drop_futures(); - mem::forget(guard); // no need to check head_all again + let guard = LeakQueueOnDrop(self); + // When a `FuturesUnordered` is dropped we want to drop all futures + // associated with it. At the same time though there may be tons of + // wakers flying around which contain `Task` references + // inside them. We'll let those naturally get deallocated. + while !guard.0.head_all.get_mut().is_null() { + let head = *guard.0.head_all.get_mut(); + let task = unsafe { guard.0.unlink(head) }; + guard.0.release_task(task); + } + mem::forget(guard); // safe to release strong reference to queue // Note that at this point we could still have a bunch of tasks in the // ready to run queue. None of those tasks, however, have futures