diff --git a/futures-channel/src/mpsc/mod.rs b/futures-channel/src/mpsc/mod.rs index edbac7aa3..addc96983 100644 --- a/futures-channel/src/mpsc/mod.rs +++ b/futures-channel/src/mpsc/mod.rs @@ -616,6 +616,26 @@ impl BoundedSenderInner { self.poll_unparked(Some(cx)).map(Ok) } + /// Polls the channel to determine if it is empty. + /// + /// # Return value + /// + /// This method returns: + /// + /// - `Poll::Ready(()` if there are no messages in the channel; + /// - `Poll::Pending` if there are messages in the channel. + #[cfg(feature = "sink")] + fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> Poll<()> { + let state = decode_state(self.inner.state.load(SeqCst)); + if state.num_messages == 0 { + return Poll::Ready(()); + } + + // If there are messages in the channel, we must park the task unconditionally. + self.sender_task.lock().unwrap().task = Some(cx.waker().clone()); + Poll::Pending + } + /// Returns whether the senders send to the same receiver. fn same_receiver(&self, other: &Self) -> bool { Arc::ptr_eq(&self.inner, &other.inner) @@ -755,6 +775,24 @@ impl Sender { let ptr = self.0.as_ref().map(|inner| inner.ptr()); ptr.hash(hasher); } + + /// Polls the channel to determine if it is empty. + /// + /// # Return value + /// + /// This method returns: + /// + /// - `Poll::Ready(()` if there are no messages in the channel or the [`Receiver`] is disconnected. + /// - `Poll::Pending` if there are messages in the channel. + #[cfg(feature = "sink")] + pub(crate) fn poll_is_empty(&mut self, cx: &mut Context<'_>) -> Poll<()> { + let inner = match self.0.as_mut() { + None => return Poll::Ready(()), + Some(inner) => inner, + }; + + inner.poll_is_empty(cx) + } } impl UnboundedSender { diff --git a/futures-channel/src/mpsc/sink_impl.rs b/futures-channel/src/mpsc/sink_impl.rs index 1be20162c..bf6104697 100644 --- a/futures-channel/src/mpsc/sink_impl.rs +++ b/futures-channel/src/mpsc/sink_impl.rs @@ -15,13 +15,7 @@ impl Sink for Sender { } fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - match (*self).poll_ready(cx) { - Poll::Ready(Err(ref e)) if e.is_disconnected() => { - // If the receiver disconnected, we consider the sink to be flushed. - Poll::Ready(Ok(())) - } - x => x, - } + (*self).poll_is_empty(cx).map(Ok) } fn poll_close(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { diff --git a/futures-channel/tests/mpsc.rs b/futures-channel/tests/mpsc.rs index f18fc3d66..8e3e768d0 100644 --- a/futures-channel/tests/mpsc.rs +++ b/futures-channel/tests/mpsc.rs @@ -60,6 +60,37 @@ fn send_recv_no_buffer() { })); } +#[test] +fn sink_poll_flush() { + // Run on a task context + block_on(poll_fn(move |cx| { + let (tx, rx) = mpsc::channel::(2); + pin_mut!(tx, rx); + + assert!(tx.as_mut().poll_flush(cx).is_ready()); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + + // Send two messages, `poll_flush` should be pending after each of them. + assert!(tx.as_mut().start_send(1).is_ok()); + assert!(tx.as_mut().poll_flush(cx).is_pending()); + + assert!(tx.as_mut().start_send(2).is_ok()); + assert!(tx.as_mut().poll_flush(cx).is_pending()); + + // Take first message + assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1))); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + assert!(tx.as_mut().poll_flush(cx).is_pending()); + + // Take second message + assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2))); + assert!(tx.as_mut().poll_ready(cx).is_ready()); + assert!(tx.as_mut().poll_flush(cx).is_ready()); + + Poll::Ready(()) + })); +} + #[test] fn send_shared_recv() { let (mut tx1, rx) = mpsc::channel::(16); diff --git a/futures/tests/async_await_macros.rs b/futures/tests/async_await_macros.rs index e87a5bf05..96f806fd2 100644 --- a/futures/tests/async_await_macros.rs +++ b/futures/tests/async_await_macros.rs @@ -90,8 +90,8 @@ fn select_streams() { _ = rx1.next() => panic!(), _ = rx2.next() => panic!(), default => { - tx1.send(2).await.unwrap(); - tx2.send(3).await.unwrap(); + tx1.feed(2).await.unwrap(); + tx2.feed(3).await.unwrap(); tx1_opt = Some(tx1); tx2_opt = Some(tx2); }