Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start adding a global event loop #57

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions src/reactor/global.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::io;
use std::thread;

use reactor::{Reactor, Handle};

pub struct HelperThread {
thread: Option<thread::JoinHandle<()>>,
reactor: Handle,
}

impl HelperThread {
pub fn new() -> io::Result<HelperThread> {
let reactor = Reactor::new()?;
let reactor_handle = reactor.handle().clone();
let thread = thread::Builder::new().spawn(move || run(reactor))?;

Ok(HelperThread {
thread: Some(thread),
reactor: reactor_handle,
})
}

pub fn handle(&self) -> &Handle {
&self.reactor
}

pub fn forget(mut self) {
drop(self.thread.take());
}
}

impl Drop for HelperThread {
fn drop(&mut self) {
// TODO: kill the reactor thread and wait for it to exit, needs
// `Handle::wakeup` to be implemented in a future PR
}
}

fn run(mut reactor: Reactor) {
loop {
reactor.turn(None);
}
}
10 changes: 5 additions & 5 deletions src/reactor/io_token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl IoToken {
/// associated with has gone away, or if there is an error communicating
/// with the event loop.
pub fn new(source: &Evented, handle: &Handle) -> io::Result<IoToken> {
match handle.inner.upgrade() {
match handle.inner() {
Some(inner) => {
let token = try!(inner.add_source(source));
let handle = handle.clone();
Expand Down Expand Up @@ -61,7 +61,7 @@ impl IoToken {
/// > rather the `ReadinessStream` type should be used instead.
// TODO: this should really return a proper newtype/enum, not a usize
pub fn take_readiness(&self) -> usize {
let inner = match self.handle.inner.upgrade() {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return 0,
};
Expand Down Expand Up @@ -93,7 +93,7 @@ impl IoToken {
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_read(&self) -> io::Result<()> {
let inner = match self.handle.inner.upgrade() {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
Expand Down Expand Up @@ -126,7 +126,7 @@ impl IoToken {
/// This function will also panic if there is not a currently running future
/// task.
pub fn schedule_write(&self) -> io::Result<()> {
let inner = match self.handle.inner.upgrade() {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
Expand Down Expand Up @@ -158,7 +158,7 @@ impl IoToken {
/// with has gone away, or if there is an error communicating with the event
/// loop.
pub fn drop_source(&self) {
let inner = match self.handle.inner.upgrade() {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return,
};
Expand Down
110 changes: 107 additions & 3 deletions src/reactor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

use std::fmt;
use std::io::{self, ErrorKind};
use std::mem;
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT};
use std::sync::{Arc, Weak, RwLock};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration};

use futures::{Future, Async};
Expand All @@ -34,6 +36,7 @@ use mio::event::Evented;
use slab::Slab;

mod io_token;
mod global;

mod poll_evented;
pub use self::poll_evented::PollEvented;
Expand Down Expand Up @@ -214,7 +217,7 @@ impl Reactor {
let io_dispatch = self.inner.io_dispatch.read().unwrap();

if let Some(io) = io_dispatch.get(token) {
io.readiness.fetch_or(ready2usize(ready), Ordering::Relaxed);
io.readiness.fetch_or(ready2usize(ready), Relaxed);
if ready.is_writable() {
io.writer.notify();
}
Expand Down Expand Up @@ -291,12 +294,113 @@ impl Inner {

task.register();

if sched.readiness.load(Ordering::SeqCst) & ready2usize(ready) != 0 {
if sched.readiness.load(SeqCst) & ready2usize(ready) != 0 {
task.notify();
}
}
}

static HANDLE_FALLBACK: AtomicUsize = ATOMIC_USIZE_INIT;

/// Error returned from `Handle::set_fallback`.
#[derive(Clone, Debug)]
pub struct SetDefaultError(());

impl Handle {
/// Configures the fallback handle to be returned from `Handle::default`.
///
/// The `Handle::default()` function will by default lazily spin up a global
/// thread and run a reactor on this global thread. This behavior is not
/// always desirable in all applications, however, and sometimes a different
/// fallback reactor is desired.
///
/// This function will attempt to globally alter the return value of
/// `Handle::default()` to return the `handle` specified rather than a
/// lazily initialized global thread. If successful then all future calls to
/// `Handle::default()` which would otherwise fall back to the global thread
/// will instead return a clone of the handle specified.
///
/// # Errors
///
/// This function may not always succeed in configuring the fallback handle.
/// If this function was previously called (or perhaps concurrently called
/// on many threads) only the *first* invocation of this function will
/// succeed. All other invocations will return an error.
///
/// Additionally if the global reactor thread has already been initialized
/// then this function will also return an error. (aka if `Handle::default`
/// has been called previously in this program).
pub fn set_fallback(handle: Handle) -> Result<(), SetDefaultError> {
unsafe {
let val = handle.into_usize();
match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) {
Ok(_) => Ok(()),
Err(_) => {
drop(Handle::from_usize(val));
Err(SetDefaultError(()))
}
}
}
}

fn into_usize(self) -> usize {
unsafe {
mem::transmute::<Weak<Inner>, usize>(self.inner)
}
}

unsafe fn from_usize(val: usize) -> Handle {
let inner = mem::transmute::<usize, Weak<Inner>>(val);;
Handle { inner }
}

fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}
}

impl Default for Handle {
fn default() -> Handle {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the default handle point to the global reactor? For new users this might be bit confusing, and for users not using a global event loop this might point to the wrong event loop.

Would impl Handle { fn global() -> Handle } (so we can call Handle::global) be a better approach?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC this was debated at some length on the RFC, but eventually this may not actually return a global handle in the sense that you'll be able to override the return value of Handle::default within a particular program scope.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok.

let mut fallback = HANDLE_FALLBACK.load(SeqCst);

// If the fallback hasn't been previously initialized then let's spin
// up a helper thread and try to initialize with that. If we can't
// actually create a helper thread then we'll just return a "defunkt"
// handle which will return errors when I/O objects are attempted to be
// associated.
if fallback == 0 {
let helper = match global::HelperThread::new() {
Ok(helper) => helper,
Err(_) => return Handle { inner: Weak::new() },
};

// If we successfully set ourselves as the actual fallback then we
// want to `forget` the helper thread to ensure that it persists
// globally. If we fail to set ourselves as the fallback that means
// that someone was racing with this call to `Handle::default`.
// They ended up winning so we'll destroy our helper thread (which
// shuts down the thread) and reload the fallback.
if Handle::set_fallback(helper.handle().clone()).is_ok() {
let ret = helper.handle().clone();
helper.forget();
return ret
}
fallback = HANDLE_FALLBACK.load(SeqCst);
}

// At this point our fallback handle global was configured so we use
// its value to reify a handle, clone it, and then forget our reified
// handle as we don't actually have an owning reference to it.
assert!(fallback != 0);
unsafe {
let handle = Handle::from_usize(fallback);
let ret = handle.clone();
drop(handle.into_usize());
return ret
}
}
}

impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Handle")
Expand Down
2 changes: 1 addition & 1 deletion src/reactor/poll_evented.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ impl<E> PollEvented<E> {
pub fn deregister(&self) -> io::Result<()>
where E: Evented,
{
let inner = match self.handle().inner.upgrade() {
let inner = match self.handle().inner() {
Some(inner) => inner,
None => return Ok(()),
};
Expand Down
37 changes: 37 additions & 0 deletions tests/global.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
extern crate futures;
extern crate tokio;

use std::thread;

use futures::prelude::*;
use tokio::net::{TcpStream, TcpListener};
use tokio::reactor::Handle;

macro_rules! t {
($e:expr) => (match $e {
Ok(e) => e,
Err(e) => panic!("{} failed with {:?}", stringify!($e), e),
})
}

#[test]
fn hammer() {
let threads = (0..10).map(|_| {
thread::spawn(|| {
let handle = Handle::default();
let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle));
let addr = t!(srv.local_addr());
let mine = TcpStream::connect(&addr, &handle);
let theirs = srv.incoming().into_future()
.map(|(s, _)| s.unwrap().0)
.map_err(|(s, _)| s);
let (mine, theirs) = t!(mine.join(theirs).wait());

assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr()));
assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr()));
})
}).collect::<Vec<_>>();
for thread in threads {
thread.join().unwrap();
}
}