Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Regression v9->v10: tasks storing wakers aren't dropped #30

Closed
loyd opened this issue Aug 3, 2024 · 1 comment · Fixed by #31
Closed

Regression v9->v10: tasks storing wakers aren't dropped #30

loyd opened this issue Aug 3, 2024 · 1 comment · Fixed by #31

Comments

@loyd
Copy link
Contributor

loyd commented Aug 3, 2024

#28 introduced an unexpected regression.

To support removing from the slab (until #26), I store a waker inside tasks added to StreamsUnordered. The current implementation never decrements Header::reference_count to zero in such cases, leading to a leak and preventing the task from dropping.

A test to reproduce locally:

use std::{
    pin::Pin,
    sync::{atomic, Arc},
    task,
};

use unicycle::StreamsUnordered;

#[tokio::test]
async fn test_drop_with_stored_waker() {
    struct Testee {
        waker: Option<task::Waker>,
        dropped: Arc<atomic::AtomicBool>,
    }

    impl futures::Stream for Testee {
        type Item = u32;

        fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<u32>> {
            println!("testee polled");
            unsafe { self.get_unchecked_mut() }.waker = Some(cx.waker().clone());
            task::Poll::Pending
        }
    }

    impl Drop for Testee {
        fn drop(&mut self) {
            println!("testee dropped");
            self.dropped.store(true, atomic::Ordering::SeqCst);
        }
    }

    let mut streams = StreamsUnordered::new();

    let dropped = Arc::new(atomic::AtomicBool::new(false));
    streams.push(Testee {
        waker: None,
        dropped: dropped.clone(),
    });

    {
        let fut = streams.next();
        let res = futures::future::poll_immediate(fut).await;
        assert!(res.is_none());
    }

    drop(streams);
    assert!(dropped.load(atomic::Ordering::SeqCst));
}
@loyd
Copy link
Contributor Author

loyd commented Aug 3, 2024

I'm sure It's totally wrong to delay dropping a task due to the existence of some related wakers because they can live infinitely long in general cases.

loyd added a commit to loyd/unicycle that referenced this issue Aug 3, 2024
loyd added a commit to loyd/unicycle that referenced this issue Aug 3, 2024
loyd added a commit to loyd/unicycle that referenced this issue Aug 3, 2024
loyd added a commit to loyd/unicycle that referenced this issue Aug 3, 2024
loyd added a commit to loyd/unicycle that referenced this issue Aug 3, 2024
udoprog pushed a commit that referenced this issue Aug 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant