Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async WaitGroup #2880

Open
tisonkun opened this issue Aug 23, 2024 · 2 comments
Open

Async WaitGroup #2880

tisonkun opened this issue Aug 23, 2024 · 2 comments

Comments

@tisonkun
Copy link

This is a moved issue from rust-lang/libs-team#427 (comment).

Proposal

Problem statement

crossbeam provides a WaitGroup that is good for synchronizing a fork-join task.

However, it's a sync version, meaning that it's not suitable to use in async/await context.

I made a tidy and solid async WaitGroup implementation and wonder if it's suitable to contribute to the std lib (e.g., std::sync) or futures-util or anywhere that is (de-facto) standard so that the ecosystem can collaborate together instead of make one own each time.

Motivating examples or use cases

My motivating scenario is during the server starting, wait for all its components ready. For example,

let wg = WaitGroup::new();
let wg_clone = wg.clone();

// may repeat for different components
runtime.spawn(async move {
  // ... initializing ...
  drop(wg_clone);
  // accepting connections
});

wg.await;

Solution sketch

The implementation is tidy, and can be even inlined here:

#[derive(Clone)]
pub struct WaitGroup {
    inner: Arc<Inner>,
}

pub struct WaitGroupFuture {
    inner: Weak<Inner>,
}

impl WaitGroupFuture {
    /// Gets the number of active workers.
    pub fn workers(&self) -> usize {
        Weak::strong_count(&self.inner)
    }
}

struct Inner {
    waker: AtomicWaker,
}

impl Drop for Inner {
    fn drop(&mut self) {
        self.waker.wake();
    }
}

impl WaitGroup {
    pub fn new() -> Self {
        Self {
            inner: Arc::new(Inner {
                waker: AtomicWaker::new(),
            }),
        }
    }

    /// Gets the number of active workers.
    pub fn workers(&self) -> usize {
        Arc::strong_count(&self.inner) - 1
    }
}

impl Default for WaitGroup {
    fn default() -> Self {
        Self::new()
    }
}

impl IntoFuture for WaitGroup {
    type Output = ();

    type IntoFuture = WaitGroupFuture;

    fn into_future(self) -> Self::IntoFuture {
        WaitGroupFuture {
            inner: Arc::downgrade(&self.inner),
        }
    }
}

impl Future for WaitGroupFuture {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        match self.inner.upgrade() {
            Some(inner) => {
                inner.waker.register(cx.waker());
                Poll::Pending
            }
            None => Poll::Ready(()),
        }
    }
}

#[cfg(test)]
mod test {
    use pollster::FutureExt;

    use super::*;
    use crate::test_runtime;

    #[test]
    fn test_wait_group_match() {
        let wg = WaitGroup::new();

        for _ in 0..100 {
            let w = wg.clone();
            let _drop = test_runtime().spawn(async move {
                drop(w);
            });
        }

        wg.into_future().block_on();
    }

    #[test]
    fn test_wait_group_timeout() {
        let wg = WaitGroup::new();
        let _wg_clone = wg.clone();
        test_runtime().block_on(async move {
            tokio::select! {
                _ = wg => panic!("wait group should timeout"),
                _ = tokio::time::sleep(tokio::time::Duration::from_secs(1)) => {}
            }
        });
    }
}

Alternatives

The main argument is whether it should be in the std lib, or anywhere suitable for the most Rust developers to collaborate on.

Links and related work

This is somehow derivative from waitgroup-rs.

@taiki-e
Copy link
Member

taiki-e commented Sep 6, 2024

waker: AtomicWaker,

self.waker.wake();

This wakes up the last registered Waker, but isn't that insufficient for the purpose of WaitGroup? (I think it should wake up all waiters.)

Barrier in tokio and async-lock seem to handle it correctly.

@tisonkun
Copy link
Author

@taiki-e Thanks for your feedback!

So in general this direction and the WaitGroup idea is valid for future-rs?

For the wake up all implementation, I can take a closer look and we can discuss on implementation with a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants