From cbb37f3bf7c2b3784e30d678eb7ee3b103e0c7db Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 21 Sep 2020 12:55:32 -0700 Subject: [PATCH 1/2] sync: fix missing notification during mpsc close When the mpsc channel receiver closes the channel, receiving should return `None` once all in-progress sends have completed. When a sender reserves capacity, this prevents the receiver from fully shutting down. Previously, when the sender, after reserving capacity, dropped without sending a message, the receiver was not notified. This results in blocking the shutdown process until all sender handles drop. This patch adds a receiver notification when the channel is both closed and all outstanding sends have completed. --- tokio/src/sync/mpsc/chan.rs | 23 ++++++++++++++++++----- tokio/src/sync/semaphore_ll.rs | 2 +- tokio/tests/sync_mpsc.rs | 22 ++++++++++++++++++++++ 3 files changed, 41 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 148ee3ad766..6659a43ceb5 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -75,10 +75,15 @@ pub(crate) trait Semaphore { /// The permit is dropped without a value being sent. In this case, the /// permit must be returned to the semaphore. - fn drop_permit(&self, permit: &mut Self::Permit); + /// + /// # Return + /// + /// Returns true if the permit was acquired. + fn drop_permit(&self, permit: &mut Self::Permit) -> bool; fn is_idle(&self) -> bool; + // Returns `true` if the receiver should be notified for shutdown. fn add_permit(&self); fn poll_acquire( @@ -192,7 +197,7 @@ where pub(crate) fn disarm(&mut self) { // TODO: should this error if not acquired? - self.inner.semaphore.drop_permit(&mut self.permit) + self.inner.semaphore.drop_permit(&mut self.permit); } /// Send a message and notify the receiver. @@ -234,7 +239,11 @@ where S: Semaphore, { fn drop(&mut self) { - self.inner.semaphore.drop_permit(&mut self.permit); + let notify = self.inner.semaphore.drop_permit(&mut self.permit); + + if notify && self.inner.semaphore.is_idle() { + self.inner.rx_waker.wake(); + } if self.inner.tx_count.fetch_sub(1, AcqRel) != 1 { return; @@ -424,8 +433,10 @@ impl Semaphore for (crate::sync::semaphore_ll::Semaphore, usize) { Permit::new() } - fn drop_permit(&self, permit: &mut Permit) { + fn drop_permit(&self, permit: &mut Permit) -> bool { + let ret = permit.is_acquired(); permit.release(1, &self.0); + ret } fn add_permit(&self) { @@ -477,7 +488,9 @@ impl Semaphore for AtomicUsize { fn new_permit() {} - fn drop_permit(&self, _permit: &mut ()) {} + fn drop_permit(&self, _permit: &mut ()) -> bool { + false + } fn add_permit(&self) { let prev = self.fetch_sub(2, Release); diff --git a/tokio/src/sync/semaphore_ll.rs b/tokio/src/sync/semaphore_ll.rs index 25d25ac88ab..f044095f8fc 100644 --- a/tokio/src/sync/semaphore_ll.rs +++ b/tokio/src/sync/semaphore_ll.rs @@ -910,7 +910,7 @@ impl Waiter { } /// Try to decrement the number of permits to acquire. This returns the - /// actual number of permits that were decremented. The delta betweeen `n` + /// actual number of permits that were decremented. The delta between `n` /// and the return has been assigned to the permit and the caller must /// assign these back to the semaphore. fn try_dec_permits_to_acquire(&self, n: usize) -> usize { diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index f02d90aa56d..f4966c31377 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -490,3 +490,25 @@ fn try_recv_unbounded() { _ => panic!(), } } + +#[test] +fn ready_close_cancel_bounded() { + use futures::future::poll_fn; + + let (mut tx, mut rx) = mpsc::channel::<()>(100); + let _tx2 = tx.clone(); + + { + let mut ready = task::spawn(async { poll_fn(|cx| tx.poll_ready(cx)).await }); + assert_ready_ok!(ready.poll()); + } + + rx.close(); + + let mut recv = task::spawn(async { rx.recv().await }); + assert_pending!(recv.poll()); + + drop(tx); + + assert!(recv.is_woken()); +} From 03cdc4a6daf31a3ded7c0bd1f1ae4410ef51307a Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Mon, 21 Sep 2020 13:39:28 -0700 Subject: [PATCH 2/2] remove incorrect comment --- tokio/src/sync/mpsc/chan.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 6659a43ceb5..0a53cda2038 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -83,7 +83,6 @@ pub(crate) trait Semaphore { fn is_idle(&self) -> bool; - // Returns `true` if the receiver should be notified for shutdown. fn add_permit(&self); fn poll_acquire(