Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite the park/unpark mechanism #528

Merged
merged 4 commits into from Jan 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tokio-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,5 @@ keywords = ["futures", "tokio"]
categories = ["concurrency", "asynchronous"]

[dependencies]
crossbeam-utils = "0.6.2"
futures = "0.1.19"
1 change: 1 addition & 0 deletions tokio-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
//! [`Park`]: park/index.html
//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll

extern crate crossbeam_utils;
extern crate futures;

mod enter;
Expand Down
106 changes: 11 additions & 95 deletions tokio-executor/src/park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@

use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

use crossbeam_utils::sync::{Parker, Unparker};

/// Block the current thread.
///
/// See [module documentation][mod] for more details.
Expand Down Expand Up @@ -161,26 +161,11 @@ pub struct ParkError {
/// Unblocks a thread that was blocked by `ParkThread`.
#[derive(Clone, Debug)]
pub struct UnparkThread {
inner: Arc<Inner>,
}

#[derive(Debug)]
struct Inner {
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
inner: Unparker,
}

const IDLE: usize = 0;
const NOTIFY: usize = 1;
const SLEEP: usize = 2;

thread_local! {
static CURRENT_PARK_THREAD: Arc<Inner> = Arc::new(Inner {
state: AtomicUsize::new(IDLE),
mutex: Mutex::new(()),
condvar: Condvar::new(),
});
static CURRENT_PARKER: Parker = Parker::new();
}

// ===== impl ParkThread =====
Expand All @@ -198,9 +183,9 @@ impl ParkThread {

/// Get a reference to the `ParkThread` handle for this thread.
fn with_current<F, R>(&self, f: F) -> R
where F: FnOnce(&Arc<Inner>) -> R,
where F: FnOnce(&Parker) -> R,
{
CURRENT_PARK_THREAD.with(|inner| f(inner))
CURRENT_PARKER.with(|inner| f(inner))
}
}

Expand All @@ -209,16 +194,18 @@ impl Park for ParkThread {
type Error = ParkError;

fn unpark(&self) -> Self::Unpark {
let inner = self.with_current(|inner| inner.clone());
let inner = self.with_current(|inner| inner.unparker().clone());
UnparkThread { inner }
}

fn park(&mut self) -> Result<(), Self::Error> {
self.with_current(|inner| inner.park(None))
self.with_current(|inner| inner.park());
Ok(())
}

fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.with_current(|inner| inner.park(Some(duration)))
self.with_current(|inner| inner.park_timeout(duration));
Ok(())
}
}

Expand All @@ -229,74 +216,3 @@ impl Unpark for UnparkThread {
self.inner.unpark();
}
}

// ===== impl Inner =====

impl Inner {
/// Park the current thread for at most `dur`.
fn park(&self, timeout: Option<Duration>) -> Result<(), ParkError> {
// If currently notified, then we skip sleeping. This is checked outside
// of the lock to avoid acquiring a mutex if not necessary.
match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
NOTIFY => return Ok(()),
IDLE => {},
_ => unreachable!(),
}

// The state is currently idle, so obtain the lock and then try to
// transition to a sleeping state.
let mut m = self.mutex.lock().unwrap();

// Transition to sleeping
match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
NOTIFY => {
// Notified before we could sleep, consume the notification and
// exit
self.state.store(IDLE, Ordering::SeqCst);
return Ok(());
}
IDLE => {},
_ => unreachable!(),
}

m = match timeout {
Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0,
None => self.condvar.wait(m).unwrap(),
};

// Transition back to idle. If the state has transitioned to `NOTIFY`,
// this will consume that notification
self.state.store(IDLE, Ordering::SeqCst);

// Explicitly drop the mutex guard. There is no real point in doing it
// except that I find it helpful to make it explicit where we want the
// mutex to unlock.
drop(m);

Ok(())
}

fn unpark(&self) {
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
IDLE | NOTIFY => return,
SLEEP => {}
_ => unreachable!(),
}

// The other half is sleeping, this requires a lock
let _m = self.mutex.lock().unwrap();

// Transition to NOTIFY
match self.state.swap(NOTIFY, Ordering::SeqCst) {
SLEEP => {}
NOTIFY => return,
IDLE => return,
_ => unreachable!(),
}

// Wakeup the sleeper
self.condvar.notify_one();
}
}
2 changes: 1 addition & 1 deletion tokio-threadpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ categories = ["concurrency", "asynchronous"]
tokio-executor = { version = "0.1.2", path = "../tokio-executor" }
futures = "0.1.19"
crossbeam-deque = "0.6.1"
crossbeam-utils = "0.6.0"
crossbeam-utils = "0.6.2"
num_cpus = "1.2"
rand = "0.6"
log = "0.4"
Expand Down
120 changes: 17 additions & 103 deletions tokio-threadpool/src/park/default_park.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,20 @@ use tokio_executor::park::{Park, Unpark};

use std::error::Error;
use std::fmt;
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::time::Duration;

use crossbeam_utils::sync::{Parker, Unparker};

/// Parks the thread.
#[derive(Debug)]
pub struct DefaultPark {
inner: Arc<Inner>,
inner: Parker,
}

/// Unparks threads that were parked by `DefaultPark`.
#[derive(Debug)]
pub struct DefaultUnpark {
inner: Arc<Inner>,
inner: Unparker,
}

/// Error returned by [`ParkThread`]
Expand All @@ -29,40 +28,28 @@ pub struct ParkError {
_p: (),
}

#[derive(Debug)]
struct Inner {
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
}

const IDLE: usize = 0;
const NOTIFY: usize = 1;
const SLEEP: usize = 2;

// ===== impl DefaultPark =====

impl DefaultPark {
/// Creates a new `DefaultPark` instance.
pub fn new() -> DefaultPark {
let inner = Arc::new(Inner {
state: AtomicUsize::new(IDLE),
mutex: Mutex::new(()),
condvar: Condvar::new(),
});

DefaultPark { inner }
DefaultPark {
inner: Parker::new(),
}
}

/// Unpark the thread without having to clone the unpark handle.
///
/// Named `notify` to avoid conflicting with the `unpark` fn.
pub(crate) fn notify(&self) {
self.inner.unpark();
self.inner.unparker().unpark();
}

pub(crate) fn park_sync(&self, duration: Option<Duration>) {
self.inner.park(duration);
match duration {
None => self.inner.park(),
Some(duration) => self.inner.park_timeout(duration),
}
}
}

Expand All @@ -71,17 +58,18 @@ impl Park for DefaultPark {
type Error = ParkError;

fn unpark(&self) -> Self::Unpark {
let inner = self.inner.clone();
DefaultUnpark { inner }
DefaultUnpark {
inner: self.inner.unparker().clone(),
}
}

fn park(&mut self) -> Result<(), Self::Error> {
self.inner.park(None);
self.inner.park();
Ok(())
}

fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
self.inner.park(Some(duration));
self.inner.park_timeout(duration);
Ok(())
}
}
Expand All @@ -94,80 +82,6 @@ impl Unpark for DefaultUnpark {
}
}

impl Inner {
/// Park the current thread for at most `dur`.
fn park(&self, timeout: Option<Duration>) {
// If currently notified, then we skip sleeping. This is checked outside
// of the lock to avoid acquiring a mutex if not necessary.
match self.state.compare_and_swap(NOTIFY, IDLE, SeqCst) {
NOTIFY => return,
IDLE => {},
_ => unreachable!(),
}

// If the duration is zero, then there is no need to actually block
if let Some(ref dur) = timeout {
if *dur == Duration::from_millis(0) {
return;
}
}

// The state is currently idle, so obtain the lock and then try to
// transition to a sleeping state.
let mut m = self.mutex.lock().unwrap();

// Transition to sleeping
match self.state.compare_and_swap(IDLE, SLEEP, SeqCst) {
NOTIFY => {
// Notified before we could sleep, consume the notification and
// exit
self.state.store(IDLE, SeqCst);
return;
}
IDLE => {},
_ => unreachable!(),
}

m = match timeout {
Some(timeout) => self.condvar.wait_timeout(m, timeout).unwrap().0,
None => self.condvar.wait(m).unwrap(),
};

// Transition back to idle. If the state has transitioned to `NOTIFY`,
// this will consume that notification.
self.state.store(IDLE, SeqCst);

// Explicitly drop the mutex guard. There is no real point in doing it
// except that I find it helpful to make it explicit where we want the
// mutex to unlock.
drop(m);
}

fn unpark(&self) {
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_and_swap(IDLE, NOTIFY, SeqCst) {
IDLE | NOTIFY => return,
SLEEP => {}
_ => unreachable!(),
}

// The other half is sleeping, this requires a lock
let _m = self.mutex.lock().unwrap();

// Transition to NOTIFY
match self.state.swap(NOTIFY, SeqCst) {
SLEEP => {}
NOTIFY => return,
IDLE => return,
_ => unreachable!(),
}

// Wakeup the sleeper
self.condvar.notify_one();
}
}

// ===== impl ParkError =====

impl fmt::Display for ParkError {
Expand Down