Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

io: make duplex stream cooperative (#4470) #4478

Merged
merged 6 commits into from
Feb 9, 2022
Merged

Conversation

GongLG
Copy link
Contributor

@GongLG GongLG commented Feb 7, 2022

Add coop checks on pipe poll_read and poll_write.

Fixes: #4470
Refs: #4291, #4300

Motivation

This attempts to fix #4470 DuplexStream does not participate in coop.

Solution

Similar to what's done in #4291. The change calls coop poll_proceed to decrement budge on poll_read and poll_write, then it calls made_progress to resume budget if made progress.

Testing

Without the changes, the added duplex_is_cooperative test would hang.
Ran cargo build, check and test with all-features. Ran cargo fmt.

Add coop checks on pipe poll_read and poll_write.

Fixes: tokio-rs#4470
Refs: tokio-rs#4291, tokio-rs#4300
@Darksonn Darksonn added A-tokio Area: The main tokio crate M-coop Module: tokio/coop M-io Module: tokio/io labels Feb 7, 2022
@@ -185,6 +185,7 @@ impl AsyncRead for Pipe {
cx: &mut task::Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
ready!(poll_proceed_and_make_progress(cx));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't make progress if it returns Poll::Pending.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, looks like instead of calling make_progress always, we need to poll proceed at the start, and only call make_progress in the two branches below that would return Poll::Ready.

Comment on lines 247 to 253
cfg_coop! {
fn poll_proceed_and_make_progress(cx: &mut task::Context<'_>) -> Poll<()> {
let coop = ready!(crate::coop::poll_proceed(cx));
coop.made_progress();
Poll::Ready(())
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't doing this conditional thing in src/io/util/empty.rs. Why is it needed here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what you mean. It looks like it's in there:

cfg_coop! {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi! Thanks for the review!

Maybe I read it wrong but I thought this is added in https://github.com/tokio-rs/tokio/blob/master/tokio/src/io/util/empty.rs#L78-L90 so I thought we only want this under coop feature. I can change it real quick if this is not the plan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, seems like I'm just blind.

@GongLG
Copy link
Contributor Author

GongLG commented Feb 8, 2022

Btw apparently I didn't run the build without coop cfg..I will fix the build.

Comment on lines 113 to 116
let buf = [3u8; 4096];
let _ = tx.write_all(&buf).await;
let mut buf = [0u8; 4096];
let _ = rx.read(&mut buf).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let buf = [3u8; 4096];
let _ = tx.write_all(&buf).await;
let mut buf = [0u8; 4096];
let _ = rx.read(&mut buf).await;
let buf = [3u8; 4096];
tx.write_all(&buf).await.unwrap();
let mut buf = [0u8; 4096];
rx.read(&mut buf).await.unwrap();


cfg_not_coop! {
fn poll_write(
mut self: Pin<&mut Self>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're getting a CI failure due to some warnings about these being unnecessary:

Suggested change
mut self: Pin<&mut Self>,
self: Pin<&mut Self>,

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah..yeah..I fixed it once because the warning showed up in local build, but I guess I messed up git merge so this isn't removed here. Thanks I will fix it.

Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

@Darksonn Darksonn merged commit fd4d2b0 into tokio-rs:master Feb 9, 2022
hawkw added a commit that referenced this pull request Feb 15, 2022
# 1.16.2 (February 15, 2022)

This release updates the minimum supported Rust version (MSRV) to 1.49,
the `mio` dependency to v0.8, and the (optional) `parking_lot`
dependency to v0.12. Additionally, it contains several bug fixes, as
well as internal refactoring and performance improvements.

### Fixed

- time: prevent panicking in `sleep` with large durations ([#4495])
- time: eliminate potential panics in `Instant` arithmetic on platforms
  where `Instant::now` is not monotonic ([#4461])
- io: fix `DuplexStream` not participating in cooperative yielding
  ([#4478])
- rt: fix potential double panic when dropping a `JoinHandle` ([#4430])

### Changed

- update minimum supported Rust version to 1.49 ([#4457])
- update `parking_lot` dependency to v0.12.0 ([#4459])
- update `mio` dependency to v0.8 ([#4449])
- rt: remove an unnecessary lock in the blocking pool ([#4436])
- rt: remove an unnecessary enum in the basic scheduler ([#4462])
- time: use bit manipulation instead of modulo to improve performance
  ([#4480])
- net: use `std::future::Ready` instead of our own `Ready` future
  ([#4271])
- replace deprecated `atomic::spin_loop_hint` with `hint::spin_loop`
  ([#4491])
- fix miri failures in intrusive linked lists ([#4397])

### Documented

- io: add an example for `tokio::process::ChildStdin` ([#4479])

### Unstable

The following changes only apply when building with `--cfg
tokio_unstable`:

- task: fix missing location information in `tracing` spans generated by
  `spawn_local` ([#4483])
- task: add `JoinSet` for managing sets of tasks ([#4335])
- metrics: fix compilation error on MIPS ([#4475])
- metrics: fix compilation error on arm32v7 ([#4453])

[#4495]: #4495
[#4461]: #4461
[#4478]: #4478
[#4430]: #4430
[#4457]: #4457
[#4459]: #4459
[#4449]: #4449
[#4462]: #4462
[#4436]: #4436
[#4480]: #4480
[#4271]: #4271
[#4491]: #4491
[#4397]: #4397
[#4479]: #4479
[#4483]: #4483
[#4335]: #4335
[#4475]: #4475
[#4453]: #4453
hawkw added a commit that referenced this pull request Feb 16, 2022
# 1.17.0 (February 16, 2022)

This release updates the minimum supported Rust version (MSRV) to 1.49,
the `mio` dependency to v0.8, and the (optional) `parking_lot`
dependency to v0.12. Additionally, it contains several bug fixes, as
well as internal refactoring and performance improvements.

### Fixed

- time: prevent panicking in `sleep` with large durations ([#4495])
- time: eliminate potential panics in `Instant` arithmetic on platforms
  where `Instant::now` is not monotonic ([#4461])
- io: fix `DuplexStream` not participating in cooperative yielding
  ([#4478])
- rt: fix potential double panic when dropping a `JoinHandle` ([#4430])

### Changed

- update minimum supported Rust version to 1.49 ([#4457])
- update `parking_lot` dependency to v0.12.0 ([#4459])
- update `mio` dependency to v0.8 ([#4449])
- rt: remove an unnecessary lock in the blocking pool ([#4436])
- rt: remove an unnecessary enum in the basic scheduler ([#4462])
- time: use bit manipulation instead of modulo to improve performance
  ([#4480])
- net: use `std::future::Ready` instead of our own `Ready` future
  ([#4271])
- replace deprecated `atomic::spin_loop_hint` with `hint::spin_loop`
  ([#4491])
- fix miri failures in intrusive linked lists ([#4397])

### Documented

- io: add an example for `tokio::process::ChildStdin` ([#4479])

### Unstable

The following changes only apply when building with `--cfg
tokio_unstable`:

- task: fix missing location information in `tracing` spans generated by
  `spawn_local` ([#4483])
- task: add `JoinSet` for managing sets of tasks ([#4335])
- metrics: fix compilation error on MIPS ([#4475])
- metrics: fix compilation error on arm32v7 ([#4453])

[#4495]: #4495
[#4461]: #4461
[#4478]: #4478
[#4430]: #4430
[#4457]: #4457
[#4459]: #4459
[#4449]: #4449
[#4462]: #4462
[#4436]: #4436
[#4480]: #4480
[#4271]: #4271
[#4491]: #4491
[#4397]: #4397
[#4479]: #4479
[#4483]: #4483
[#4335]: #4335
[#4475]: #4475
[#4453]: #4453
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-coop Module: tokio/coop M-io Module: tokio/io
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DuplexStream does not participate in coop
3 participants