diff --git a/src/reactor/global.rs b/src/reactor/global.rs new file mode 100644 index 00000000000..69156d43591 --- /dev/null +++ b/src/reactor/global.rs @@ -0,0 +1,43 @@ +use std::io; +use std::thread; + +use reactor::{Reactor, Handle}; + +pub struct HelperThread { + thread: Option>, + reactor: Handle, +} + +impl HelperThread { + pub fn new() -> io::Result { + let reactor = Reactor::new()?; + let reactor_handle = reactor.handle().clone(); + let thread = thread::Builder::new().spawn(move || run(reactor))?; + + Ok(HelperThread { + thread: Some(thread), + reactor: reactor_handle, + }) + } + + pub fn handle(&self) -> &Handle { + &self.reactor + } + + pub fn forget(mut self) { + drop(self.thread.take()); + } +} + +impl Drop for HelperThread { + fn drop(&mut self) { + // TODO: kill the reactor thread and wait for it to exit, needs + // `Handle::wakeup` to be implemented in a future PR + } +} + +fn run(mut reactor: Reactor) { + loop { + reactor.turn(None); + } +} diff --git a/src/reactor/io_token.rs b/src/reactor/io_token.rs index 9306e8f69cc..2ff93ed58f3 100644 --- a/src/reactor/io_token.rs +++ b/src/reactor/io_token.rs @@ -29,7 +29,7 @@ impl IoToken { /// associated with has gone away, or if there is an error communicating /// with the event loop. pub fn new(source: &Evented, handle: &Handle) -> io::Result { - match handle.inner.upgrade() { + match handle.inner() { Some(inner) => { let token = try!(inner.add_source(source)); let handle = handle.clone(); @@ -61,7 +61,7 @@ impl IoToken { /// > rather the `ReadinessStream` type should be used instead. // TODO: this should really return a proper newtype/enum, not a usize pub fn take_readiness(&self) -> usize { - let inner = match self.handle.inner.upgrade() { + let inner = match self.handle.inner() { Some(inner) => inner, None => return 0, }; @@ -93,7 +93,7 @@ impl IoToken { /// This function will also panic if there is not a currently running future /// task. pub fn schedule_read(&self) -> io::Result<()> { - let inner = match self.handle.inner.upgrade() { + let inner = match self.handle.inner() { Some(inner) => inner, None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), }; @@ -126,7 +126,7 @@ impl IoToken { /// This function will also panic if there is not a currently running future /// task. pub fn schedule_write(&self) -> io::Result<()> { - let inner = match self.handle.inner.upgrade() { + let inner = match self.handle.inner() { Some(inner) => inner, None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), }; @@ -158,7 +158,7 @@ impl IoToken { /// with has gone away, or if there is an error communicating with the event /// loop. pub fn drop_source(&self) { - let inner = match self.handle.inner.upgrade() { + let inner = match self.handle.inner() { Some(inner) => inner, None => return, }; diff --git a/src/reactor/mod.rs b/src/reactor/mod.rs index 1d9167a2754..7954954fb32 100644 --- a/src/reactor/mod.rs +++ b/src/reactor/mod.rs @@ -22,8 +22,10 @@ use std::fmt; use std::io::{self, ErrorKind}; +use std::mem; +use std::sync::atomic::Ordering::{Relaxed, SeqCst}; +use std::sync::atomic::{AtomicUsize, ATOMIC_USIZE_INIT}; use std::sync::{Arc, Weak, RwLock}; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration}; use futures::{Future, Async}; @@ -34,6 +36,7 @@ use mio::event::Evented; use slab::Slab; mod io_token; +mod global; mod poll_evented; pub use self::poll_evented::PollEvented; @@ -214,7 +217,7 @@ impl Reactor { let io_dispatch = self.inner.io_dispatch.read().unwrap(); if let Some(io) = io_dispatch.get(token) { - io.readiness.fetch_or(ready2usize(ready), Ordering::Relaxed); + io.readiness.fetch_or(ready2usize(ready), Relaxed); if ready.is_writable() { io.writer.notify(); } @@ -291,12 +294,113 @@ impl Inner { task.register(); - if sched.readiness.load(Ordering::SeqCst) & ready2usize(ready) != 0 { + if sched.readiness.load(SeqCst) & ready2usize(ready) != 0 { task.notify(); } } } +static HANDLE_FALLBACK: AtomicUsize = ATOMIC_USIZE_INIT; + +/// Error returned from `Handle::set_fallback`. +#[derive(Clone, Debug)] +pub struct SetDefaultError(()); + +impl Handle { + /// Configures the fallback handle to be returned from `Handle::default`. + /// + /// The `Handle::default()` function will by default lazily spin up a global + /// thread and run a reactor on this global thread. This behavior is not + /// always desirable in all applications, however, and sometimes a different + /// fallback reactor is desired. + /// + /// This function will attempt to globally alter the return value of + /// `Handle::default()` to return the `handle` specified rather than a + /// lazily initialized global thread. If successful then all future calls to + /// `Handle::default()` which would otherwise fall back to the global thread + /// will instead return a clone of the handle specified. + /// + /// # Errors + /// + /// This function may not always succeed in configuring the fallback handle. + /// If this function was previously called (or perhaps concurrently called + /// on many threads) only the *first* invocation of this function will + /// succeed. All other invocations will return an error. + /// + /// Additionally if the global reactor thread has already been initialized + /// then this function will also return an error. (aka if `Handle::default` + /// has been called previously in this program). + pub fn set_fallback(handle: Handle) -> Result<(), SetDefaultError> { + unsafe { + let val = handle.into_usize(); + match HANDLE_FALLBACK.compare_exchange(0, val, SeqCst, SeqCst) { + Ok(_) => Ok(()), + Err(_) => { + drop(Handle::from_usize(val)); + Err(SetDefaultError(())) + } + } + } + } + + fn into_usize(self) -> usize { + unsafe { + mem::transmute::, usize>(self.inner) + } + } + + unsafe fn from_usize(val: usize) -> Handle { + let inner = mem::transmute::>(val);; + Handle { inner } + } + + fn inner(&self) -> Option> { + self.inner.upgrade() + } +} + +impl Default for Handle { + fn default() -> Handle { + let mut fallback = HANDLE_FALLBACK.load(SeqCst); + + // If the fallback hasn't been previously initialized then let's spin + // up a helper thread and try to initialize with that. If we can't + // actually create a helper thread then we'll just return a "defunkt" + // handle which will return errors when I/O objects are attempted to be + // associated. + if fallback == 0 { + let helper = match global::HelperThread::new() { + Ok(helper) => helper, + Err(_) => return Handle { inner: Weak::new() }, + }; + + // If we successfully set ourselves as the actual fallback then we + // want to `forget` the helper thread to ensure that it persists + // globally. If we fail to set ourselves as the fallback that means + // that someone was racing with this call to `Handle::default`. + // They ended up winning so we'll destroy our helper thread (which + // shuts down the thread) and reload the fallback. + if Handle::set_fallback(helper.handle().clone()).is_ok() { + let ret = helper.handle().clone(); + helper.forget(); + return ret + } + fallback = HANDLE_FALLBACK.load(SeqCst); + } + + // At this point our fallback handle global was configured so we use + // its value to reify a handle, clone it, and then forget our reified + // handle as we don't actually have an owning reference to it. + assert!(fallback != 0); + unsafe { + let handle = Handle::from_usize(fallback); + let ret = handle.clone(); + drop(handle.into_usize()); + return ret + } + } +} + impl fmt::Debug for Handle { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { write!(f, "Handle") diff --git a/src/reactor/poll_evented.rs b/src/reactor/poll_evented.rs index 57e362c4317..809ea79292a 100644 --- a/src/reactor/poll_evented.rs +++ b/src/reactor/poll_evented.rs @@ -281,7 +281,7 @@ impl PollEvented { pub fn deregister(&self) -> io::Result<()> where E: Evented, { - let inner = match self.handle().inner.upgrade() { + let inner = match self.handle().inner() { Some(inner) => inner, None => return Ok(()), }; diff --git a/tests/global.rs b/tests/global.rs new file mode 100644 index 00000000000..7a90a8835e8 --- /dev/null +++ b/tests/global.rs @@ -0,0 +1,37 @@ +extern crate futures; +extern crate tokio; + +use std::thread; + +use futures::prelude::*; +use tokio::net::{TcpStream, TcpListener}; +use tokio::reactor::Handle; + +macro_rules! t { + ($e:expr) => (match $e { + Ok(e) => e, + Err(e) => panic!("{} failed with {:?}", stringify!($e), e), + }) +} + +#[test] +fn hammer() { + let threads = (0..10).map(|_| { + thread::spawn(|| { + let handle = Handle::default(); + let srv = t!(TcpListener::bind(&"127.0.0.1:0".parse().unwrap(), &handle)); + let addr = t!(srv.local_addr()); + let mine = TcpStream::connect(&addr, &handle); + let theirs = srv.incoming().into_future() + .map(|(s, _)| s.unwrap().0) + .map_err(|(s, _)| s); + let (mine, theirs) = t!(mine.join(theirs).wait()); + + assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); + assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); + }) + }).collect::>(); + for thread in threads { + thread.join().unwrap(); + } +}