Skip to content

Commit

Permalink
Merge pull request #7 from alexcrichton/testing
Browse files Browse the repository at this point in the history
Transmit AddSource errors and handle schedule()
  • Loading branch information
alexcrichton authored Jul 12, 2016
2 parents 5c0db44 + 3f66229 commit 7fddd51
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 36 deletions.
77 changes: 41 additions & 36 deletions mio/src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use mio;
use slab::Slab;
use futures::{Future, Tokens, Wake};

use slot::{self, Slot};

pub type Source = Arc<mio::Evented + Send + Sync>;

static NEXT_LOOP_ID: AtomicUsize = ATOMIC_USIZE_INIT;
Expand Down Expand Up @@ -68,7 +70,7 @@ impl Scheduled {
}

enum Message {
AddSource(Source, Arc<AtomicUsize>, Arc<Wake>),
AddSource(Source, Arc<Slot<io::Result<usize>>>),
DropSource(usize),
Schedule(usize, Direction, Arc<Wake>),
Deschedule(usize, Direction),
Expand Down Expand Up @@ -193,7 +195,7 @@ impl Loop {
rx_res.recv().unwrap()
}

fn add_source(&self, source: Source) -> usize {
fn add_source(&self, source: Source) -> io::Result<usize> {
let sched = Scheduled {
source: source,
reader: None,
Expand All @@ -203,7 +205,7 @@ impl Loop {
// TODO: handle out of space
let entry = dispatch.vacant_entry().unwrap();
register(&mut self.io.borrow_mut(), entry.index(), &sched);
entry.insert(sched).index()
Ok(entry.insert(sched).index())
}

fn drop_source(&self, token: usize) {
Expand Down Expand Up @@ -233,10 +235,10 @@ impl Loop {

fn notify(&self, msg: Message) {
match msg {
Message::AddSource(source, id, wake) => {
let tok = self.add_source(source);
id.store(tok, Ordering::Relaxed);
wake.wake(&Tokens::from_usize(ADD_SOURCE_TOKEN));
Message::AddSource(source, slot) => {
// This unwrap() should always be ok as we're the only producer
slot.try_produce(self.add_source(source))
.ok().expect("interference with try_produce");
}
Message::DropSource(tok) => self.drop_source(tok),
Message::Schedule(tok, dir, wake) => self.schedule(tok, dir, wake),
Expand Down Expand Up @@ -298,13 +300,12 @@ impl LoopHandle {
AddSource {
loop_handle: self.clone(),
source: Some(source),
id: Arc::new(AtomicUsize::new(0)),
scheduled: false,
result: None,
}
}

fn add_source_(&self, source: Source, id: Arc<AtomicUsize>, wake: Arc<Wake>) {
self.send(Message::AddSource(source, id, wake));
fn add_source_(&self, source: Source, slot: Arc<Slot<io::Result<usize>>>) {
self.send(Message::AddSource(source, slot));
}

/// Begin listening for events on an event loop.
Expand Down Expand Up @@ -363,43 +364,47 @@ const ADD_SOURCE_TOKEN: usize = 0;
pub struct AddSource {
loop_handle: LoopHandle,
source: Option<Source>,
id: Arc<AtomicUsize>,
scheduled: bool,
result: Option<(Arc<Slot<io::Result<usize>>>, slot::Token)>,
}

impl Future for AddSource {
type Item = usize;
type Error = io::Error; // TODO: integrate channel error?
type Error = io::Error;

fn poll(&mut self, tokens: &Tokens) -> Option<Result<usize, io::Error>> {
if self.scheduled {
if tokens.may_contain(&Tokens::from_usize(ADD_SOURCE_TOKEN)) {
let id = self.id.load(Ordering::Relaxed);
if id != 0 {
return Some(Ok(id))
match self.result {
Some((ref result, _)) => {
if tokens.may_contain(&Tokens::from_usize(ADD_SOURCE_TOKEN)) {
result.try_consume().ok()
} else {
None
}
}
} else {
if CURRENT_LOOP.is_set() {
let res = CURRENT_LOOP.with(|lp| {
if lp.id == self.loop_handle.id {
Some(lp.add_source(self.source.take().unwrap()))
} else {
None
}
});
if let Some(id) = res {
return Some(Ok(id));
}
None => {
let source = &mut self.source;
self.loop_handle.with_loop(|lp| {
lp.map(|lp| {
lp.add_source(source.take().unwrap())
})
})
}
}

None
}

fn schedule(&mut self, wake: Arc<Wake>) {
if self.scheduled { return; }
self.scheduled = true;
self.loop_handle.add_source_(self.source.take().unwrap(), self.id.clone(), wake);
if let Some((ref result, ref mut token)) = self.result {
result.cancel(*token);
*token = result.on_full(move |_| {
wake.wake(&Tokens::from_usize(ADD_SOURCE_TOKEN));
});
return
}

let result = Arc::new(Slot::new(None));
let token = result.on_full(move |_| {
wake.wake(&Tokens::from_usize(ADD_SOURCE_TOKEN));
});
self.result = Some((result.clone(), token));
self.loop_handle.add_source_(self.source.take().unwrap(), result);
}
}
4 changes: 4 additions & 0 deletions mio/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ use futures::stream::Stream;
mod readiness_stream;
mod event_loop;
mod tcp;
#[path = "../../src/slot.rs"]
mod slot;
#[path = "../../src/lock.rs"]
mod lock;

pub type IoFuture<T> = Future<Item=T, Error=io::Error>;
pub type IoStream<T> = Stream<Item=T, Error=io::Error>;
Expand Down
3 changes: 3 additions & 0 deletions src/slot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
//! a value. It is unlikely that this module will survive stabilization of this
//! library, so it is not recommended to rely on it.
#![allow(dead_code)] // imported in a few places

use std::sync::atomic::{AtomicUsize, Ordering};

use lock::Lock;
Expand Down Expand Up @@ -80,6 +82,7 @@ pub struct OnFullError(());
pub struct OnEmptyError(());

/// A `Token` represents a registered callback, and can be used to cancel the callback.
#[derive(Clone, Copy)]
pub struct Token(usize);

// Slot state: the lowest 3 bits are flags; the remaining bits are used to
Expand Down

0 comments on commit 7fddd51

Please sign in to comment.