Skip to content

Commit

Permalink
io: make repeat and sink cooperative (#6254)
Browse files Browse the repository at this point in the history
  • Loading branch information
mox692 authored Dec 30, 2023
1 parent 5f7fe8f commit 581cd41
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 18 deletions.
15 changes: 1 addition & 14 deletions tokio/src/io/util/empty.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::io::util::poll_proceed_and_make_progress;
use crate::io::{AsyncBufRead, AsyncRead, AsyncWrite, ReadBuf};

use std::fmt;
Expand Down Expand Up @@ -138,20 +139,6 @@ impl fmt::Debug for Empty {
}
}

cfg_coop! {
fn poll_proceed_and_make_progress(cx: &mut Context<'_>) -> Poll<()> {
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
coop.made_progress();
Poll::Ready(())
}
}

cfg_not_coop! {
fn poll_proceed_and_make_progress(_: &mut Context<'_>) -> Poll<()> {
Poll::Ready(())
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
14 changes: 14 additions & 0 deletions tokio/src/io/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,20 @@ cfg_io_util! {
// used by `BufReader` and `BufWriter`
// https://github.com/rust-lang/rust/blob/master/library/std/src/sys_common/io.rs#L1
const DEFAULT_BUF_SIZE: usize = 8 * 1024;

cfg_coop! {
fn poll_proceed_and_make_progress(cx: &mut std::task::Context<'_>) -> std::task::Poll<()> {
let coop = ready!(crate::runtime::coop::poll_proceed(cx));
coop.made_progress();
std::task::Poll::Ready(())
}
}

cfg_not_coop! {
fn poll_proceed_and_make_progress(_: &mut std::task::Context<'_>) -> std::task::Poll<()> {
std::task::Poll::Ready(())
}
}
}

cfg_not_io_util! {
Expand Down
5 changes: 4 additions & 1 deletion tokio/src/io/util/repeat.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::io::util::poll_proceed_and_make_progress;
use crate::io::{AsyncRead, ReadBuf};

use std::io;
Expand Down Expand Up @@ -50,9 +51,11 @@ impl AsyncRead for Repeat {
#[inline]
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context<'_>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
ready!(crate::trace::trace_leaf(cx));
ready!(poll_proceed_and_make_progress(cx));
// TODO: could be faster, but should we unsafe it?
while buf.remaining() != 0 {
buf.put_slice(&[self.byte]);
Expand Down
13 changes: 10 additions & 3 deletions tokio/src/io/util/sink.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::io::util::poll_proceed_and_make_progress;
use crate::io::AsyncWrite;

use std::fmt;
Expand Down Expand Up @@ -53,19 +54,25 @@ impl AsyncWrite for Sink {
#[inline]
fn poll_write(
self: Pin<&mut Self>,
_: &mut Context<'_>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
ready!(crate::trace::trace_leaf(cx));
ready!(poll_proceed_and_make_progress(cx));
Poll::Ready(Ok(buf.len()))
}

#[inline]
fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
ready!(crate::trace::trace_leaf(cx));
ready!(poll_proceed_and_make_progress(cx));
Poll::Ready(Ok(()))
}

#[inline]
fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
ready!(crate::trace::trace_leaf(cx));
ready!(poll_proceed_and_make_progress(cx));
Poll::Ready(Ok(()))
}
}
Expand Down
18 changes: 18 additions & 0 deletions tokio/tests/io_repeat.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full"))]

use tokio::io::AsyncReadExt;

#[tokio::test]
async fn repeat_poll_read_is_cooperative() {
tokio::select! {
biased;
_ = async {
loop {
let mut buf = [0u8; 4096];
tokio::io::repeat(0b101).read_exact(&mut buf).await.unwrap();
}
} => {},
_ = tokio::task::yield_now() => {}
}
}
44 changes: 44 additions & 0 deletions tokio/tests/io_sink.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full"))]

use tokio::io::AsyncWriteExt;

#[tokio::test]
async fn sink_poll_write_is_cooperative() {
tokio::select! {
biased;
_ = async {
loop {
let buf = vec![1, 2, 3];
tokio::io::sink().write_all(&buf).await.unwrap();
}
} => {},
_ = tokio::task::yield_now() => {}
}
}

#[tokio::test]
async fn sink_poll_flush_is_cooperative() {
tokio::select! {
biased;
_ = async {
loop {
tokio::io::sink().flush().await.unwrap();
}
} => {},
_ = tokio::task::yield_now() => {}
}
}

#[tokio::test]
async fn sink_poll_shutdown_is_cooperative() {
tokio::select! {
biased;
_ = async {
loop {
tokio::io::sink().shutdown().await.unwrap();
}
} => {},
_ = tokio::task::yield_now() => {}
}
}

0 comments on commit 581cd41

Please sign in to comment.