Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor IO registration using intrusive linked list #2828

Merged
merged 13 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,11 +150,11 @@ jobs:
run: cargo install cargo-hack

- name: check --each-feature
run: cargo hack check --all --each-feature -Z avoid-dev-deps
run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If only io-driver or io-readiness are enabled, it results in a bunch of unused warnings. The other option is to remove the features and tag all the related code with the cfg(any(..)) matrix of things that need the io-driver stuff.


# Try with unstable feature flags
- name: check --each-feature --unstable
run: cargo hack check --all --each-feature -Z avoid-dev-deps
run: cargo hack check --all --each-feature --skip io-driver,io-readiness -Z avoid-dev-deps
env:
RUSTFLAGS: --cfg tokio_unstable

Expand Down
8 changes: 3 additions & 5 deletions examples/connect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ mod udp {
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use tokio::net::udp::{RecvHalf, SendHalf};
use tokio::net::UdpSocket;

pub async fn connect(
Expand All @@ -114,16 +113,15 @@ mod udp {

let socket = UdpSocket::bind(&bind_addr).await?;
socket.connect(addr).await?;
let (mut r, mut w) = socket.split();

future::try_join(send(stdin, &mut w), recv(stdout, &mut r)).await?;
future::try_join(send(stdin, &socket), recv(stdout, &socket)).await?;

Ok(())
}

async fn send(
mut stdin: impl Stream<Item = Result<Bytes, io::Error>> + Unpin,
writer: &mut SendHalf,
writer: &UdpSocket,
) -> Result<(), io::Error> {
while let Some(item) = stdin.next().await {
let buf = item?;
Expand All @@ -135,7 +133,7 @@ mod udp {

async fn recv(
mut stdout: impl Sink<Bytes, Error = io::Error> + Unpin,
reader: &mut RecvHalf,
reader: &UdpSocket,
) -> Result<(), io::Error> {
loop {
let mut buf = vec![0; 1024];
Expand Down
2 changes: 1 addition & 1 deletion examples/echo-udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ struct Server {
impl Server {
async fn run(self) -> Result<(), io::Error> {
let Server {
mut socket,
socket,
mut buf,
mut to_send,
} = self;
Expand Down
2 changes: 1 addition & 1 deletion examples/udp-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
}
.parse()?;

let mut socket = UdpSocket::bind(local_addr).await?;
let socket = UdpSocket::bind(local_addr).await?;
const MAX_DATAGRAM_SIZE: usize = 65_507;
socket.connect(&remote_addr).await?;
let data = get_stdin_data()?;
Expand Down
6 changes: 6 additions & 0 deletions examples/udp-codec.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
fn main() {}

// Disabled while future of UdpFramed is decided on.
// See https://github.com/tokio-rs/tokio/issues/2830
/*
//! This example leverages `BytesCodec` to create a UDP client and server which
//! speak a custom protocol.
//!
Expand Down Expand Up @@ -78,3 +83,4 @@ async fn pong(socket: &mut UdpFramed<BytesCodec>) -> Result<(), io::Error> {

Ok(())
}
*/
3 changes: 1 addition & 2 deletions tokio-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ publish = false
default = []

# Shorthand for enabling everything
full = ["codec", "udp", "compat", "io"]
full = ["codec", "compat", "io"]

compat = ["futures-io",]
codec = ["tokio/stream"]
udp = ["tokio/udp"]
io = []

[dependencies]
Expand Down
2 changes: 2 additions & 0 deletions tokio-util/src/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ macro_rules! cfg_compat {
}
}

/*
macro_rules! cfg_udp {
($($item:item)*) => {
$(
Expand All @@ -27,6 +28,7 @@ macro_rules! cfg_udp {
)*
}
}
*/

macro_rules! cfg_io {
($($item:item)*) => {
Expand Down
5 changes: 5 additions & 0 deletions tokio-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,14 @@ cfg_codec! {
pub mod codec;
}

/*
Disabled due to removal of poll_ functions on UdpSocket.
See https://github.com/tokio-rs/tokio/issues/2830
cfg_udp! {
pub mod udp;
}
*/

cfg_compat! {
pub mod compat;
Expand Down
2 changes: 2 additions & 0 deletions tokio-util/tests/udp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/*
#![warn(rust_2018_idioms)]
use tokio::{net::UdpSocket, stream::StreamExt};
Expand Down Expand Up @@ -100,3 +101,4 @@ async fn send_framed_lines_codec() -> std::io::Result<()> {
Ok(())
}
*/
8 changes: 4 additions & 4 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ full = [
"blocking",
"dns",
"fs",
"io-driver",
"io-util",
"io-std",
"macros",
Expand All @@ -51,7 +50,8 @@ full = [
blocking = ["rt-core"]
dns = ["rt-core"]
fs = ["rt-core", "io-util"]
io-driver = ["mio", "lazy_static"]
io-driver = ["mio", "lazy_static"] # internal only
io-readiness = [] # internal only
seanmonstar marked this conversation as resolved.
Show resolved Hide resolved
io-util = ["memchr"]
# stdin, stdout, stderr
io-std = ["rt-core"]
Expand Down Expand Up @@ -85,8 +85,8 @@ sync = ["fnv"]
test-util = []
tcp = ["io-driver", "iovec"]
time = ["slab"]
udp = ["io-driver"]
uds = ["io-driver", "mio-uds", "libc"]
udp = ["io-driver", "io-readiness"]
uds = ["io-driver", "io-readiness", "mio-uds", "libc"]

[dependencies]
tokio-macros = { version = "0.3.0", path = "../tokio-macros", optional = true }
Expand Down
57 changes: 19 additions & 38 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ use mio::event::Evented;
use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration;

/// I/O driver, backed by Mio
pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap
/// as it is mostly used to determine when to call `compact()`
tick: u16,
tick: u8,

/// Reuse the `mio::Events` value across calls to poll.
events: Option<mio::Events>,
Expand All @@ -40,6 +39,11 @@ pub(crate) struct Handle {
inner: Weak<Inner>,
}

pub(crate) struct ReadyEvent {
tick: u8,
readiness: mio::Ready,
}

pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,
Expand All @@ -57,6 +61,11 @@ pub(super) enum Direction {
Write,
}

enum Tick {
Set(u8),
Clear(u8),
}

// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
// token.
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
Expand Down Expand Up @@ -122,11 +131,11 @@ impl Driver {

fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
// How often to call `compact()` on the resource slab
const COMPACT_INTERVAL: u16 = 256;
const COMPACT_INTERVAL: u8 = 255;

self.tick = self.tick.wrapping_add(1);

if self.tick % COMPACT_INTERVAL == 0 {
if self.tick == COMPACT_INTERVAL {
self.resources.compact();
}

Expand Down Expand Up @@ -160,39 +169,22 @@ impl Driver {
}

fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
let mut rd = None;
let mut wr = None;

let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));

let io = match self.resources.get(addr) {
Some(io) => io,
None => return,
};

if io
.set_readiness(Some(token.0), |curr| curr | ready.as_usize())
.is_err()
{
let set = io.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| {
curr | ready.as_usize()
});
if set.is_err() {
// token no longer valid!
return;
}

if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) {
wr = io.writer.take_waker();
}

if !(ready & (!mio::Ready::writable())).is_empty() {
rd = io.reader.take_waker();
}

if let Some(w) = rd {
w.wake();
}

if let Some(w) = wr {
w.wake();
}
io.wake(ready);
}
}

Expand All @@ -202,8 +194,7 @@ impl Drop for Driver {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.reader.wake();
io.writer.wake();
io.wake(mio::Ready::all());
})
}
}
Expand Down Expand Up @@ -310,16 +301,6 @@ impl Inner {
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
self.io.deregister(source)
}

/// Registers interest in the I/O resource associated with `token`.
pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) {
let waker = match dir {
Direction::Read => &io.reader,
Direction::Write => &io.writer,
};

waker.register(w);
}
}

impl Direction {
Expand Down
Loading