From c9e75785c84a441199992ed38e49aeba2f061a24 Mon Sep 17 00:00:00 2001 From: Vincent Palancher Date: Thu, 14 Mar 2024 10:23:59 +0100 Subject: [PATCH] sync: remove `'static` bound on `impl Sink for PollSender` (#6397) In PR #5665, the `'static` bound has been removed on values sent into `PollSender`. One of this bound was remaining on the `PollSender` implementation of `Sink`. This patch removes it and adds some tests on the `Sink` interface for `PollSender`. --- tokio-util/src/sync/mpsc.rs | 2 +- tokio-util/tests/mpsc.rs | 89 ++++++++++++++++++++++++++++++++++++- 2 files changed, 89 insertions(+), 2 deletions(-) diff --git a/tokio-util/src/sync/mpsc.rs b/tokio-util/src/sync/mpsc.rs index fd48c72582b..8e2ff814622 100644 --- a/tokio-util/src/sync/mpsc.rs +++ b/tokio-util/src/sync/mpsc.rs @@ -303,7 +303,7 @@ impl Clone for PollSender { } } -impl Sink for PollSender { +impl Sink for PollSender { type Error = PollSendError; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio-util/tests/mpsc.rs b/tokio-util/tests/mpsc.rs index 545a580318d..cf4dcd55f63 100644 --- a/tokio-util/tests/mpsc.rs +++ b/tokio-util/tests/mpsc.rs @@ -1,7 +1,10 @@ use futures::future::poll_fn; +use futures::sink::SinkExt; use tokio::sync::mpsc::channel; use tokio_test::task::spawn; -use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok}; +use tokio_test::{ + assert_ok, assert_pending, assert_ready, assert_ready_eq, assert_ready_err, assert_ready_ok, +}; use tokio_util::sync::PollSender; #[tokio::test] @@ -260,3 +263,87 @@ fn start_send_panics_when_acquiring() { assert_pending!(reserve.poll()); send.send_item(2).unwrap(); } + +#[test] +fn sink_send_then_flush() { + let (send, mut recv) = channel(1); + let mut send = PollSender::new(send); + + let mut recv_task = spawn(recv.recv()); + assert_pending!(recv_task.poll()); + + let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx))); + assert_ready_ok!(ready.poll()); + assert_ok!(send.start_send_unpin(())); + + let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx))); + assert_pending!(ready.poll()); + + let mut flush = spawn(poll_fn(|cx| send.poll_flush_unpin(cx))); + assert_ready_ok!(flush.poll()); + + // Flushing does not mean that the sender becomes ready. + let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx))); + assert_pending!(ready.poll()); + + assert_ready_eq!(recv_task.poll(), Some(())); + assert!(ready.is_woken()); + assert_ready_ok!(ready.poll()); +} + +#[test] +fn sink_send_then_close() { + let (send, mut recv) = channel(1); + let mut send = PollSender::new(send); + + let mut recv_task = spawn(recv.recv()); + assert_pending!(recv_task.poll()); + + let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx))); + assert_ready_ok!(ready.poll()); + assert_ok!(send.start_send_unpin(1)); + + let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx))); + assert_pending!(ready.poll()); + + assert!(recv_task.is_woken()); + assert_ready_eq!(recv_task.poll(), Some(1)); + + assert!(ready.is_woken()); + assert_ready_ok!(ready.poll()); + + drop(recv_task); + let mut recv_task = spawn(recv.recv()); + assert_pending!(recv_task.poll()); + assert_ok!(send.start_send_unpin(2)); + + let mut close = spawn(poll_fn(|cx| send.poll_close_unpin(cx))); + assert_ready_ok!(close.poll()); + + assert!(recv_task.is_woken()); + assert_ready_eq!(recv_task.poll(), Some(2)); + + drop(recv_task); + let mut recv_task = spawn(recv.recv()); + assert_ready_eq!(recv_task.poll(), None); +} + +#[test] +fn sink_send_ref() { + let data = "data".to_owned(); + let (send, mut recv) = channel(1); + let mut send = PollSender::new(send); + + let mut recv_task = spawn(recv.recv()); + assert_pending!(recv_task.poll()); + + let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx))); + assert_ready_ok!(ready.poll()); + + assert_ok!(send.start_send_unpin(data.as_str())); + + let mut flush = spawn(poll_fn(|cx| send.poll_flush_unpin(cx))); + assert_ready_ok!(flush.poll()); + + assert_ready_eq!(recv_task.poll(), Some("data")); +}