diff --git a/tokio-executor/Cargo.toml b/tokio-executor/Cargo.toml index b286cc6ea42..56ac43e6d20 100644 --- a/tokio-executor/Cargo.toml +++ b/tokio-executor/Cargo.toml @@ -19,4 +19,5 @@ keywords = ["futures", "tokio"] categories = ["concurrency", "asynchronous"] [dependencies] +crossbeam-utils = "0.6.2" futures = "0.1.19" diff --git a/tokio-executor/src/lib.rs b/tokio-executor/src/lib.rs index 550d47d9320..c1925ba86a7 100644 --- a/tokio-executor/src/lib.rs +++ b/tokio-executor/src/lib.rs @@ -35,6 +35,7 @@ //! [`Park`]: park/index.html //! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll +extern crate crossbeam_utils; extern crate futures; mod enter; diff --git a/tokio-executor/src/park.rs b/tokio-executor/src/park.rs index 4f278fdd3af..2b392cd7711 100644 --- a/tokio-executor/src/park.rs +++ b/tokio-executor/src/park.rs @@ -46,10 +46,10 @@ use std::marker::PhantomData; use std::rc::Rc; -use std::sync::{Arc, Mutex, Condvar}; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; +use crossbeam_utils::sync::{Parker, Unparker}; + /// Block the current thread. /// /// See [module documentation][mod] for more details. @@ -161,26 +161,11 @@ pub struct ParkError { /// Unblocks a thread that was blocked by `ParkThread`. #[derive(Clone, Debug)] pub struct UnparkThread { - inner: Arc, -} - -#[derive(Debug)] -struct Inner { - state: AtomicUsize, - mutex: Mutex<()>, - condvar: Condvar, + inner: Unparker, } -const IDLE: usize = 0; -const NOTIFY: usize = 1; -const SLEEP: usize = 2; - thread_local! { - static CURRENT_PARK_THREAD: Arc = Arc::new(Inner { - state: AtomicUsize::new(IDLE), - mutex: Mutex::new(()), - condvar: Condvar::new(), - }); + static CURRENT_PARKER: Parker = Parker::new(); } // ===== impl ParkThread ===== @@ -198,9 +183,9 @@ impl ParkThread { /// Get a reference to the `ParkThread` handle for this thread. fn with_current(&self, f: F) -> R - where F: FnOnce(&Arc) -> R, + where F: FnOnce(&Parker) -> R, { - CURRENT_PARK_THREAD.with(|inner| f(inner)) + CURRENT_PARKER.with(|inner| f(inner)) } } @@ -209,16 +194,18 @@ impl Park for ParkThread { type Error = ParkError; fn unpark(&self) -> Self::Unpark { - let inner = self.with_current(|inner| inner.clone()); + let inner = self.with_current(|inner| inner.unparker().clone()); UnparkThread { inner } } fn park(&mut self) -> Result<(), Self::Error> { - self.with_current(|inner| inner.park(None)) + self.with_current(|inner| inner.park()); + Ok(()) } fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.with_current(|inner| inner.park(Some(duration))) + self.with_current(|inner| inner.park_timeout(duration)); + Ok(()) } } @@ -229,74 +216,3 @@ impl Unpark for UnparkThread { self.inner.unpark(); } } - -// ===== impl Inner ===== - -impl Inner { - /// Park the current thread for at most `dur`. - fn park(&self, timeout: Option) -> Result<(), ParkError> { - // If currently notified, then we skip sleeping. This is checked outside - // of the lock to avoid acquiring a mutex if not necessary. - match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) { - NOTIFY => return Ok(()), - IDLE => {}, - _ => unreachable!(), - } - - // The state is currently idle, so obtain the lock and then try to - // transition to a sleeping state. - let mut m = self.mutex.lock().unwrap(); - - // Transition to sleeping - match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) { - NOTIFY => { - // Notified before we could sleep, consume the notification and - // exit - self.state.store(IDLE, Ordering::SeqCst); - return Ok(()); - } - IDLE => {}, - _ => unreachable!(), - } - - m = match timeout { - Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0, - None => self.condvar.wait(m).unwrap(), - }; - - // Transition back to idle. If the state has transitioned to `NOTIFY`, - // this will consume that notification - self.state.store(IDLE, Ordering::SeqCst); - - // Explicitly drop the mutex guard. There is no real point in doing it - // except that I find it helpful to make it explicit where we want the - // mutex to unlock. - drop(m); - - Ok(()) - } - - fn unpark(&self) { - // First, try transitioning from IDLE -> NOTIFY, this does not require a - // lock. - match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) { - IDLE | NOTIFY => return, - SLEEP => {} - _ => unreachable!(), - } - - // The other half is sleeping, this requires a lock - let _m = self.mutex.lock().unwrap(); - - // Transition to NOTIFY - match self.state.swap(NOTIFY, Ordering::SeqCst) { - SLEEP => {} - NOTIFY => return, - IDLE => return, - _ => unreachable!(), - } - - // Wakeup the sleeper - self.condvar.notify_one(); - } -} diff --git a/tokio-threadpool/Cargo.toml b/tokio-threadpool/Cargo.toml index 21875ea11d7..c32c48a6635 100644 --- a/tokio-threadpool/Cargo.toml +++ b/tokio-threadpool/Cargo.toml @@ -21,7 +21,7 @@ categories = ["concurrency", "asynchronous"] tokio-executor = { version = "0.1.2", path = "../tokio-executor" } futures = "0.1.19" crossbeam-deque = "0.6.1" -crossbeam-utils = "0.6.0" +crossbeam-utils = "0.6.2" num_cpus = "1.2" rand = "0.6" log = "0.4" diff --git a/tokio-threadpool/src/park/default_park.rs b/tokio-threadpool/src/park/default_park.rs index 4ccbe34da58..ecc22350f04 100644 --- a/tokio-threadpool/src/park/default_park.rs +++ b/tokio-threadpool/src/park/default_park.rs @@ -2,21 +2,20 @@ use tokio_executor::park::{Park, Unpark}; use std::error::Error; use std::fmt; -use std::sync::{Arc, Mutex, Condvar}; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::SeqCst; use std::time::Duration; +use crossbeam_utils::sync::{Parker, Unparker}; + /// Parks the thread. #[derive(Debug)] pub struct DefaultPark { - inner: Arc, + inner: Parker, } /// Unparks threads that were parked by `DefaultPark`. #[derive(Debug)] pub struct DefaultUnpark { - inner: Arc, + inner: Unparker, } /// Error returned by [`ParkThread`] @@ -29,40 +28,28 @@ pub struct ParkError { _p: (), } -#[derive(Debug)] -struct Inner { - state: AtomicUsize, - mutex: Mutex<()>, - condvar: Condvar, -} - -const IDLE: usize = 0; -const NOTIFY: usize = 1; -const SLEEP: usize = 2; - // ===== impl DefaultPark ===== impl DefaultPark { /// Creates a new `DefaultPark` instance. pub fn new() -> DefaultPark { - let inner = Arc::new(Inner { - state: AtomicUsize::new(IDLE), - mutex: Mutex::new(()), - condvar: Condvar::new(), - }); - - DefaultPark { inner } + DefaultPark { + inner: Parker::new(), + } } /// Unpark the thread without having to clone the unpark handle. /// /// Named `notify` to avoid conflicting with the `unpark` fn. pub(crate) fn notify(&self) { - self.inner.unpark(); + self.inner.unparker().unpark(); } pub(crate) fn park_sync(&self, duration: Option) { - self.inner.park(duration); + match duration { + None => self.inner.park(), + Some(duration) => self.inner.park_timeout(duration), + } } } @@ -71,17 +58,18 @@ impl Park for DefaultPark { type Error = ParkError; fn unpark(&self) -> Self::Unpark { - let inner = self.inner.clone(); - DefaultUnpark { inner } + DefaultUnpark { + inner: self.inner.unparker().clone(), + } } fn park(&mut self) -> Result<(), Self::Error> { - self.inner.park(None); + self.inner.park(); Ok(()) } fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.inner.park(Some(duration)); + self.inner.park_timeout(duration); Ok(()) } } @@ -94,80 +82,6 @@ impl Unpark for DefaultUnpark { } } -impl Inner { - /// Park the current thread for at most `dur`. - fn park(&self, timeout: Option) { - // If currently notified, then we skip sleeping. This is checked outside - // of the lock to avoid acquiring a mutex if not necessary. - match self.state.compare_and_swap(NOTIFY, IDLE, SeqCst) { - NOTIFY => return, - IDLE => {}, - _ => unreachable!(), - } - - // If the duration is zero, then there is no need to actually block - if let Some(ref dur) = timeout { - if *dur == Duration::from_millis(0) { - return; - } - } - - // The state is currently idle, so obtain the lock and then try to - // transition to a sleeping state. - let mut m = self.mutex.lock().unwrap(); - - // Transition to sleeping - match self.state.compare_and_swap(IDLE, SLEEP, SeqCst) { - NOTIFY => { - // Notified before we could sleep, consume the notification and - // exit - self.state.store(IDLE, SeqCst); - return; - } - IDLE => {}, - _ => unreachable!(), - } - - m = match timeout { - Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0, - None => self.condvar.wait(m).unwrap(), - }; - - // Transition back to idle. If the state has transitioned to `NOTIFY`, - // this will consume that notification. - self.state.store(IDLE, SeqCst); - - // Explicitly drop the mutex guard. There is no real point in doing it - // except that I find it helpful to make it explicit where we want the - // mutex to unlock. - drop(m); - } - - fn unpark(&self) { - // First, try transitioning from IDLE -> NOTIFY, this does not require a - // lock. - match self.state.compare_and_swap(IDLE, NOTIFY, SeqCst) { - IDLE | NOTIFY => return, - SLEEP => {} - _ => unreachable!(), - } - - // The other half is sleeping, this requires a lock - let _m = self.mutex.lock().unwrap(); - - // Transition to NOTIFY - match self.state.swap(NOTIFY, SeqCst) { - SLEEP => {} - NOTIFY => return, - IDLE => return, - _ => unreachable!(), - } - - // Wakeup the sleeper - self.condvar.notify_one(); - } -} - // ===== impl ParkError ===== impl fmt::Display for ParkError {