Skip to content

Commit

Permalink
Refactor IO registration using intrusive linked list
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Sep 10, 2020
1 parent be7462e commit 3dffee1
Show file tree
Hide file tree
Showing 21 changed files with 718 additions and 1,236 deletions.
58 changes: 20 additions & 38 deletions tokio/src/io/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,13 @@ use mio::event::Evented;
use std::fmt;
use std::io;
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration;

/// I/O driver, backed by Mio
pub(crate) struct Driver {
/// Tracks the number of times `turn` is called. It is safe for this to wrap
/// as it is mostly used to determine when to call `compact()`
tick: u16,
tick: u8,

/// Reuse the `mio::Events` value across calls to poll.
events: Option<mio::Events>,
Expand All @@ -40,6 +39,11 @@ pub(crate) struct Handle {
inner: Weak<Inner>,
}

pub(crate) struct ReadyEvent {
tick: u8,
readiness: mio::Ready,
}

pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,
Expand All @@ -57,6 +61,11 @@ pub(super) enum Direction {
Write,
}

enum Tick {
Set(u8),
Clear(u8),
}

// TODO: Don't use a fake token. Instead, reserve a slot entry for the wakeup
// token.
const TOKEN_WAKEUP: mio::Token = mio::Token(1 << 31);
Expand Down Expand Up @@ -122,11 +131,11 @@ impl Driver {

fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
// How often to call `compact()` on the resource slab
const COMPACT_INTERVAL: u16 = 256;
const COMPACT_INTERVAL: u8 = 255;

self.tick = self.tick.wrapping_add(1);

if self.tick % COMPACT_INTERVAL == 0 {
if self.tick == COMPACT_INTERVAL {
self.resources.compact();
}

Expand Down Expand Up @@ -160,39 +169,23 @@ impl Driver {
}

fn dispatch(&mut self, token: mio::Token, ready: mio::Ready) {
let mut rd = None;
let mut wr = None;

let addr = slab::Address::from_usize(ADDRESS.unpack(token.0));

let io = match self.resources.get(addr) {
Some(io) => io,
None => return,
};

if io
.set_readiness(Some(token.0), |curr| curr | ready.as_usize())
.is_err()
{
let set = io
.set_readiness(Some(token.0), Tick::Set(self.tick), |curr| {
curr | ready.as_usize()
});
if set.is_err() {
// token no longer valid!
return;
}

if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) {
wr = io.writer.take_waker();
}

if !(ready & (!mio::Ready::writable())).is_empty() {
rd = io.reader.take_waker();
}

if let Some(w) = rd {
w.wake();
}

if let Some(w) = wr {
w.wake();
}
io.wake(ready);
}
}

Expand All @@ -202,8 +195,7 @@ impl Drop for Driver {
// If a task is waiting on the I/O resource, notify it. The task
// will then attempt to use the I/O resource and fail due to the
// driver being shutdown.
io.reader.wake();
io.writer.wake();
io.wake(mio::Ready::all());
})
}
}
Expand Down Expand Up @@ -310,16 +302,6 @@ impl Inner {
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
self.io.deregister(source)
}

/// Registers interest in the I/O resource associated with `token`.
pub(super) fn register(&self, io: &slab::Ref<ScheduledIo>, dir: Direction, w: Waker) {
let waker = match dir {
Direction::Read => &io.reader,
Direction::Write => &io.writer,
};

waker.register(w);
}
}

impl Direction {
Expand Down
Loading

0 comments on commit 3dffee1

Please sign in to comment.