Skip to content

Fix thread park/unpark synchronization #54174

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Sep 19, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions src/libstd/thread/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,14 @@ pub fn park() {
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
thread.inner.state.store(EMPTY, SeqCst);
// We must read here, even though we know it will be `NOTIFIED`.
// This is because `unpark` may have been called again since we read
// `NOTIFIED` in the `compare_exchange` above. We must perform an
// acquire operation that synchronizes with that `unpark` to observe
// any writes it made before the call to unpark. To do that we must
// read from the write it made to `state`.
let old = thread.inner.state.swap(EMPTY, SeqCst);
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
} // should consume this notification, so prohibit spurious wakeups in next park.
Err(_) => panic!("inconsistent park state"),
Expand Down Expand Up @@ -889,7 +896,9 @@ pub fn park_timeout(dur: Duration) {
match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
Ok(_) => {}
Err(NOTIFIED) => {
thread.inner.state.store(EMPTY, SeqCst);
// We must read again here, see `park`.
let old = thread.inner.state.swap(EMPTY, SeqCst);
assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
return;
} // should consume this notification, so prohibit spurious wakeups in next park.
Err(_) => panic!("inconsistent park_timeout state"),
Expand Down Expand Up @@ -1058,23 +1067,22 @@ impl Thread {
/// [park]: fn.park.html
#[stable(feature = "rust1", since = "1.0.0")]
pub fn unpark(&self) {
loop {
match self.inner.state.compare_exchange(EMPTY, NOTIFIED, SeqCst, SeqCst) {
Ok(_) => return, // no one was waiting
Err(NOTIFIED) => return, // already unparked
Err(PARKED) => {} // gotta go wake someone up
_ => panic!("inconsistent state in unpark"),
}

// Coordinate wakeup through the mutex and a condvar notification
let _lock = self.inner.lock.lock().unwrap();
match self.inner.state.compare_exchange(PARKED, NOTIFIED, SeqCst, SeqCst) {
Ok(_) => return self.inner.cvar.notify_one(),
Err(NOTIFIED) => return, // a different thread unparked
Err(EMPTY) => {} // parked thread went away, try again
_ => panic!("inconsistent state in unpark"),
}
// To ensure the unparked thread will observe any writes we made
// before this call, we must perform a release operation that `park`
// can synchronize with. To do that we must write `NOTIFIED` even if
// `state` is already `NOTIFIED`. That is why this must be a swap
// rather than a compare-and-swap that returns if it reads `NOTIFIED`
// on failure.
match self.inner.state.swap(NOTIFIED, SeqCst) {
EMPTY => return, // no one was waiting
NOTIFIED => return, // already unparked
PARKED => {} // gotta go wake someone up
_ => panic!("inconsistent state in unpark"),
}

// Coordinate wakeup through the mutex and a condvar notification
let _lock = self.inner.lock.lock().unwrap();
self.inner.cvar.notify_one()
}

/// Gets the thread's unique identifier.
Expand Down