Skip to content

Commit

Permalink
io: use intrusive wait list for I/O driver (#2828)
Browse files Browse the repository at this point in the history
This refactors I/O registration in a few ways:

- Cleans up the cached readiness in `PollEvented`. This cache used to
  be helpful when readiness was a linked list of `*mut Node`s in
  `Registration`. Previous refactors have turned `Registration` into just
  an `AtomicUsize` holding the current readiness, so the cache is just
  extra work and complexity. Gone.
- Polling the `Registration` for readiness now gives a `ReadyEvent`,
  which includes the driver tick. This event must be passed back into
  `clear_readiness`, so that the readiness is only cleared from `Registration`
  if the tick hasn't changed. Previously, it was possible to clear the
  readiness even though another thread had *just* polled the driver and
  found the socket ready again.
- Registration now also contains an `async fn readiness`, which stores
  wakers in an instrusive linked list. This allows an unbounded number
  of tasks to register for readiness (previously, only 1 per direction (read
  and write)). By using the intrusive linked list, there is no concern of
  leaking the storage of the wakers, since they are stored inside the `async fn`
  and released when the future is dropped.
- Registration retains a `poll_readiness(Direction)` method, to support
  `AsyncRead` and `AsyncWrite`. They aren't able to use `async fn`s, and
  so there are 2 reserved slots for those methods.
- IO types where it makes sense to have multiple tasks waiting on them
  now take advantage of this new `async fn readiness`, such as `UdpSocket`
  and `UnixDatagram`.

Additionally, this makes the `io-driver` "feature" internal-only (no longer
documented, not part of public API), and adds a second internal-only
feature, `io-readiness`, to group together linked list part of registration
that is only used by some of the IO types.

After a bit of discussion, changing stream-based transports (like
`TcpStream`) to have `async fn read(&self)` is punted, since that
is likely too easy of a footgun to activate.

Refs: #2779, #2728
  • Loading branch information
seanmonstar authored Sep 23, 2020
1 parent f25f12d commit a055784
Show file tree
Hide file tree
Showing 38 changed files with 888 additions and 1,429 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ jobs:
run: cargo install cargo-hack

- name: check --each-feature
run: cargo hack check --all --each-feature -Z avoid-dev-deps
run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps

# Try with unstable feature flags
- name: check --each-feature --unstable
run: cargo hack check --all --each-feature -Z avoid-dev-deps
run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps
env:
RUSTFLAGS: --cfg tokio_unstable -Dwarnings

Expand Down
8 changes: 3 additions & 5 deletions examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ mod udp {
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;

pub async fn connect(
Expand All @@ -114,16 +113,15 @@ mod udp {

let socket = UdpSocket::bind(&bind_addr).await?;
socket.connect(addr).await?;
let (mut r, mut w) = socket.split();

future::try_join(send(stdin, &mut w), recv(stdout, &mut r)).await?;
future::try_join(send(stdin, &socket), recv(stdout, &socket)).await?;

Ok(())
}

async fn send(
mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin,
writer: &mut SendHalf,
writer: &UdpSocket,
) -> Result<(), io::Error> {
while let Some(item) = stdin.next().await {
let buf = item?;
Expand All @@ -135,7 +133,7 @@ mod udp {

async fn recv(
mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin,
reader: &mut RecvHalf,
reader: &UdpSocket,
) -> Result<(), io::Error> {
loop {
let mut buf = vec![0; 1024];
Expand Down
2 changes: 1 addition & 1 deletion examples/echo-udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct Server {
impl Server {
async fn run(self) -> Result<(), io::Error> {
let Server {
mut socket,
socket,
mut buf,
mut to_send,
} = self;
Expand Down
2 changes: 1 addition & 1 deletion examples/udp-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
.parse()?;

let mut socket = UdpSocket::bind(local_addr).await?;
let socket = UdpSocket::bind(local_addr).await?;
const MAX_DATAGRAM_SIZE: usize = 65_507;
socket.connect(&remote_addr).await?;
let data = get_stdin_data()?;
Expand Down
6 changes: 6 additions & 0 deletions examples/udp-codec.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
fn main() {}

// Disabled while future of UdpFramed is decided on.
// See https://github.com/tokio-rs/tokio/issues/2830
/*
//! This example leverages `BytesCodec` to create a UDP client and server which
//! speak a custom protocol.
//!
Expand Down Expand Up @@ -78,3 +83,4 @@ async fn pong(socket: &mut UdpFramed<BytesCodec>) -> Result<(), io::Error> {
Ok(())
}
*/
3 changes: 1 addition & 2 deletions tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ publish = false
default = []

# Shorthand for enabling everything
full = ["codec", "udp", "compat", "io"]
full = ["codec", "compat", "io"]

compat = ["futures-io",]
codec = ["tokio/stream"]
udp = ["tokio/udp"]
io = []

[dependencies]
Expand Down
2 changes: 2 additions & 0 deletions tokio-util/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ macro_rules! cfg_compat {
}
}

/*
macro_rules! cfg_udp {
($($item:item)*) => {
$(
Expand All @@ -27,6 +28,7 @@ macro_rules! cfg_udp {
)*
}
}
*/

macro_rules! cfg_io {
($($item:item)*) => {
Expand Down
5 changes: 5 additions & 0 deletions tokio-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ cfg_codec! {
pub mod codec;
}

/*
Disabled due to removal of poll_ functions on UdpSocket.
See https://github.com/tokio-rs/tokio/issues/2830
cfg_udp! {
pub mod udp;
}
*/

cfg_compat! {
pub mod compat;
Expand Down
2 changes: 2 additions & 0 deletions tokio-util/tests/udp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/*
#![warn(rust_2018_idioms)]
use tokio::{net::UdpSocket, stream::StreamExt};
Expand Down Expand Up @@ -100,3 +101,4 @@ async fn send_framed_lines_codec() -> std::io::Result<()> {
Ok(())
}
*/
8 changes: 4 additions & 4 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ full = [
"blocking",
"dns",
"fs",
"io-driver",
"io-util",
"io-std",
"macros",
Expand All @@ -51,7 +50,8 @@ full = [
blocking = ["rt-core"]
dns = ["rt-core"]
fs = ["rt-core", "io-util"]
io-driver = ["mio", "lazy_static"]
io-driver = ["mio", "lazy_static"] # internal only
io-readiness = [] # internal only
io-util = ["memchr"]
# stdin, stdout, stderr
io-std = ["rt-core"]
Expand Down Expand Up @@ -85,8 +85,8 @@ sync = ["fnv"]
test-util = []
tcp = ["io-driver", "iovec"]
time = ["slab"]
udp = ["io-driver"]
uds = ["io-driver", "mio-uds", "libc"]
udp = ["io-driver", "io-readiness"]
uds = ["io-driver", "io-readiness", "mio-uds", "libc"]

[dependencies]
tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true }
Expand Down
57 changes: 19 additions & 38 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ use mio::event::Evented;
use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration;

/// I/O driver, backed by Mio
pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap
/// as it is mostly used to determine when to call `compact()`
tick: u16,
tick: u8,

/// Reuse the `mio::Events` value across calls to poll.
events: Option<mio::Events>,
Expand All @@ -40,6 +39,11 @@ pub(crate) struct Handle {
inner: Weak<Inner>,
}

pub(crate) struct ReadyEvent {
tick: u8,
readiness: mio::Ready,
}

pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,
Expand All @@ -57,6 +61,11 @@ pub(super) enum Direction {
Write,
}

enum Tick {
Set(u8),
Clear(u8),
}

// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
// token.
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
Expand Down Expand Up @@ -122,11 +131,11 @@ impl Driver {

fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
// How often to call `compact()` on the resource slab
const COMPACT_INTERVAL: u16 = 256;
const COMPACT_INTERVAL: u8 = 255;

self.tick = self.tick.wrapping_add(1);

if self.tick % COMPACT_INTERVAL == 0 {
if self.tick == COMPACT_INTERVAL {
self.resources.compact();
}

Expand Down Expand Up @@ -160,39 +169,22 @@ impl Driver {
}

fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
let mut rd = None;
let mut wr = None;

let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));

let io = match self.resources.get(addr) {
Some(io) => io,
None => return,
};

if io
.set_readiness(Some(token.0), |curr| curr | ready.as_usize())
.is_err()
{
let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| {
curr | ready.as_usize()
});
if set.is_err() {
// token no longer valid!
return;
}

if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) {
wr = io.writer.take_waker();
}

if !(ready & (!mio::Ready::writable())).is_empty() {
rd = io.reader.take_waker();
}

if let Some(w) = rd {
w.wake();
}

if let Some(w) = wr {
w.wake();
}
io.wake(ready);
}
}

Expand All @@ -202,8 +194,7 @@ impl Drop for Driver {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.reader.wake();
io.writer.wake();
io.wake(mio::Ready::all());
})
}
}
Expand Down Expand Up @@ -310,16 +301,6 @@ impl Inner {
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
self.io.deregister(source)
}

/// Registers interest in the I/O resource associated with `token`.
pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) {
let waker = match dir {
Direction::Read => &io.reader,
Direction::Write => &io.writer,
};

waker.register(w);
}
}

impl Direction {
Expand Down
Loading

0 comments on commit a055784

Please sign in to comment.