Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve thread notify #597

Merged
merged 4 commits into from
Oct 25, 2017
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 63 additions & 11 deletions src/task_impl/std/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@ use std::fmt;
use std::marker::PhantomData;
use std::mem;
use std::ptr;
use std::sync::{Arc, Once, ONCE_INIT};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::sync::{Arc, Mutex, Condvar, Once, ONCE_INIT};
use std::sync::atomic::{AtomicUsize, Ordering};

use {Future, Stream, Sink, Poll, Async, StartSend, AsyncSink};
use super::core;
Expand Down Expand Up @@ -486,14 +485,20 @@ impl Unpark for RunInner {
// ===== ThreadNotify =====

struct ThreadNotify {
ready: AtomicBool,
thread: thread::Thread,
state: AtomicUsize,
mutex: Mutex<()>,
condvar: Condvar,
}

const IDLE: usize = 0;
const NOTIFY: usize = 1;
const SLEEP: usize = 2;

thread_local! {
static CURRENT_THREAD_NOTIFY: Arc<ThreadNotify> = Arc::new(ThreadNotify {
ready: AtomicBool::new(false),
thread: thread::current(),
state: AtomicUsize::new(IDLE),
mutex: Mutex::new(()),
condvar: Condvar::new(),
});
}

Expand All @@ -505,16 +510,63 @@ impl ThreadNotify {
}

fn park(&self) {
if !self.ready.swap(false, Ordering::SeqCst) {
thread::park();
// If currently notified, then we skip sleeping. This is checked outside
// of the lock to avoid acquiring a mutex if not necessary.
match self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
NOTIFY => return,
IDLE => {},
_ => unreachable!(),
}

// The state is currently idle, so obtain the lock and then try to
// transition to a sleeping state.
let mut m = self.mutex.lock().unwrap();

// Transition to sleeping
match self.state.compare_and_swap(IDLE, SLEEP, Ordering::SeqCst) {
NOTIFY => {
// Notified before we could sleep, consume the notification and
// exit
self.state.store(IDLE, Ordering::SeqCst);
return;
}
IDLE => {},
_ => unreachable!(),
}

// Loop until we've been notified
loop {
m = self.condvar.wait(m).unwrap();

// Transition back to idle, loop otherwise
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would this ever not be NOTIFY? Can this be just an atomic store and return?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

condvars can wakeup spuriously

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, I forgot that actual OS condvars can do that - Go's sync.Cond does not. Apologies!

if NOTIFY == self.state.compare_and_swap(NOTIFY, IDLE, Ordering::SeqCst) {
return;
}
}
}
}

impl Notify for ThreadNotify {
fn notify(&self, _unpark_id: usize) {
self.ready.store(true, Ordering::SeqCst);
self.thread.unpark()
// First, try transitioning from IDLE -> NOTIFY, this does not require a
// lock.
match self.state.compare_and_swap(IDLE, NOTIFY, Ordering::SeqCst) {
IDLE | NOTIFY => return,
SLEEP => {}
_ => unreachable!(),
}

// The other half is sleeping, this requires a lock
let _m = self.mutex.lock().unwrap();
Copy link

@twmb twmb Oct 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worthwhile to have a separate notifier_mutex? This would allow something like...

let _nm = match self.notifier_mutex.try_lock() {
    Ok(g) => g,
    Err(e) => match e {
        TryLockResult::Poisoned(e) => panic!(e),
        TryLockResult::WouldBlock => { return ; }
    }
}
let _m = self.mutex.lock().unwrap();
...

which would avoid simultaneous notifications sitting on a mutex to notify a thread that will only need the first (and would also help avoid [not eliminate] the scenario where the first notification woke the sleeping thread, which then consumes all events, and then gets falsely notified by the other notifications that hadn't hit yet).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh nvm - this would still require keeping one notifier on deck, which TryLock does not provide.


// Transition from SLEEP -> NOTIFY
match self.state.compare_and_swap(SLEEP, NOTIFY, Ordering::SeqCst) {
Copy link

@twmb twmb Oct 24, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this cas would just need to be a store with notifier_mutex.

[edit: nvm - it still needs to be a compare_and_swap, b/c a simultaneous notification that is slow to the try_lock could still fall in here after the parked thread awakens and swaps to idle]

SLEEP => {}
_ => return,
}

// Wakeup the sleeper
self.condvar.notify_one();
}
}

Expand Down