Skip to content

Commit

Permalink
sync: remove 'static bound on impl Sink for PollSender (#6397)
Browse files Browse the repository at this point in the history
In PR #5665, the `'static` bound has been removed on values sent into
`PollSender`. One of this bound was remaining on the `PollSender`
implementation of `Sink`. This patch removes it and adds some tests on
the `Sink` interface for `PollSender`.
  • Loading branch information
nyibbang authored Mar 14, 2024
1 parent 3141ed6 commit c9e7578
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 2 deletions.
2 changes: 1 addition & 1 deletion tokio-util/src/sync/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl<T> Clone for PollSender<T> {
}
}

impl<T: Send + 'static> Sink<T> for PollSender<T> {
impl<T: Send> Sink<T> for PollSender<T> {
type Error = PollSendError<T>;

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Expand Down
89 changes: 88 additions & 1 deletion tokio-util/tests/mpsc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use futures::future::poll_fn;
use futures::sink::SinkExt;
use tokio::sync::mpsc::channel;
use tokio_test::task::spawn;
use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok};
use tokio_test::{
assert_ok, assert_pending, assert_ready, assert_ready_eq, assert_ready_err, assert_ready_ok,
};
use tokio_util::sync::PollSender;

#[tokio::test]
Expand Down Expand Up @@ -260,3 +263,87 @@ fn start_send_panics_when_acquiring() {
assert_pending!(reserve.poll());
send.send_item(2).unwrap();
}

#[test]
fn sink_send_then_flush() {
let (send, mut recv) = channel(1);
let mut send = PollSender::new(send);

let mut recv_task = spawn(recv.recv());
assert_pending!(recv_task.poll());

let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
assert_ready_ok!(ready.poll());
assert_ok!(send.start_send_unpin(()));

let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
assert_pending!(ready.poll());

let mut flush = spawn(poll_fn(|cx| send.poll_flush_unpin(cx)));
assert_ready_ok!(flush.poll());

// Flushing does not mean that the sender becomes ready.
let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
assert_pending!(ready.poll());

assert_ready_eq!(recv_task.poll(), Some(()));
assert!(ready.is_woken());
assert_ready_ok!(ready.poll());
}

#[test]
fn sink_send_then_close() {
let (send, mut recv) = channel(1);
let mut send = PollSender::new(send);

let mut recv_task = spawn(recv.recv());
assert_pending!(recv_task.poll());

let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
assert_ready_ok!(ready.poll());
assert_ok!(send.start_send_unpin(1));

let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
assert_pending!(ready.poll());

assert!(recv_task.is_woken());
assert_ready_eq!(recv_task.poll(), Some(1));

assert!(ready.is_woken());
assert_ready_ok!(ready.poll());

drop(recv_task);
let mut recv_task = spawn(recv.recv());
assert_pending!(recv_task.poll());
assert_ok!(send.start_send_unpin(2));

let mut close = spawn(poll_fn(|cx| send.poll_close_unpin(cx)));
assert_ready_ok!(close.poll());

assert!(recv_task.is_woken());
assert_ready_eq!(recv_task.poll(), Some(2));

drop(recv_task);
let mut recv_task = spawn(recv.recv());
assert_ready_eq!(recv_task.poll(), None);
}

#[test]
fn sink_send_ref() {
let data = "data".to_owned();
let (send, mut recv) = channel(1);
let mut send = PollSender::new(send);

let mut recv_task = spawn(recv.recv());
assert_pending!(recv_task.poll());

let mut ready = spawn(poll_fn(|cx| send.poll_ready_unpin(cx)));
assert_ready_ok!(ready.poll());

assert_ok!(send.start_send_unpin(data.as_str()));

let mut flush = spawn(poll_fn(|cx| send.poll_flush_unpin(cx)));
assert_ready_ok!(flush.poll());

assert_ready_eq!(recv_task.poll(), Some("data"));
}

0 comments on commit c9e7578

Please sign in to comment.