From 204d9608e35f7dc56e179d4539e931766fd88f28 Mon Sep 17 00:00:00 2001 From: James Duley Date: Tue, 11 Sep 2018 18:04:33 +0100 Subject: [PATCH 1/3] Fix `thread` `park`/`unpark` synchronization Previously the code below would not be guaranteed to exit when the first spawned thread took the `return, // already unparked` path because there was no write to synchronize with a read in `park`. ``` use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::{current, spawn, park}; static FLAG: AtomicBool = AtomicBool::new(false); fn main() { let thread_0 = current(); spawn(move || { FLAG.store(true, Ordering::Relaxed); thread_0.unpark(); }); let thread_0 = current(); spawn(move || { thread_0.unpark(); }); while !FLAG.load(Ordering::Relaxed) { park(); } } ``` --- src/libstd/thread/mod.rs | 29 +++++++++++------------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/src/libstd/thread/mod.rs b/src/libstd/thread/mod.rs index a7c7dbb1b4027..020ea09db2a2a 100644 --- a/src/libstd/thread/mod.rs +++ b/src/libstd/thread/mod.rs @@ -800,7 +800,7 @@ pub fn park() { match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { Ok(_) => {} Err(NOTIFIED) => { - thread.inner.state.store(EMPTY, SeqCst); + thread.inner.state.swap(EMPTY, SeqCst); return; } // should consume this notification, so prohibit spurious wakeups in next park. Err(_) => panic!("inconsistent park state"), @@ -889,7 +889,7 @@ pub fn park_timeout(dur: Duration) { match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { Ok(_) => {} Err(NOTIFIED) => { - thread.inner.state.store(EMPTY, SeqCst); + thread.inner.state.swap(EMPTY, SeqCst); return; } // should consume this notification, so prohibit spurious wakeups in next park. Err(_) => panic!("inconsistent park_timeout state"), @@ -1058,23 +1058,16 @@ impl Thread { /// [park]: fn.park.html #[stable(feature = "rust1", since = "1.0.0")] pub fn unpark(&self) { - loop { - match self.inner.state.compare_exchange(EMPTY, NOTIFIED, SeqCst, SeqCst) { - Ok(_) => return, // no one was waiting - Err(NOTIFIED) => return, // already unparked - Err(PARKED) => {} // gotta go wake someone up - _ => panic!("inconsistent state in unpark"), - } - - // Coordinate wakeup through the mutex and a condvar notification - let _lock = self.inner.lock.lock().unwrap(); - match self.inner.state.compare_exchange(PARKED, NOTIFIED, SeqCst, SeqCst) { - Ok(_) => return self.inner.cvar.notify_one(), - Err(NOTIFIED) => return, // a different thread unparked - Err(EMPTY) => {} // parked thread went away, try again - _ => panic!("inconsistent state in unpark"), - } + match self.inner.state.swap(NOTIFIED, SeqCst) { + EMPTY => return, // no one was waiting + NOTIFIED => return, // already unparked + PARKED => {} // gotta go wake someone up + _ => panic!("inconsistent state in unpark"), } + + // Coordinate wakeup through the mutex and a condvar notification + let _lock = self.inner.lock.lock().unwrap(); + self.inner.cvar.notify_one() } /// Gets the thread's unique identifier. From f8a78bdfdfffffe74f516eec4303d7f3d4ac879e Mon Sep 17 00:00:00 2001 From: James Duley Date: Fri, 14 Sep 2018 17:32:16 +0100 Subject: [PATCH 2/3] Add comments and assertion to `park`/`unpark` regarding the synchronization. --- src/libstd/thread/mod.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/libstd/thread/mod.rs b/src/libstd/thread/mod.rs index 020ea09db2a2a..1f03ee3e37deb 100644 --- a/src/libstd/thread/mod.rs +++ b/src/libstd/thread/mod.rs @@ -800,7 +800,11 @@ pub fn park() { match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { Ok(_) => {} Err(NOTIFIED) => { - thread.inner.state.swap(EMPTY, SeqCst); + // We must read again here, even though we know it will be NOTIFY, + // to synchronize with an write in `unpark` that occurred since we + // last read. + let old = thread.inner.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); return; } // should consume this notification, so prohibit spurious wakeups in next park. Err(_) => panic!("inconsistent park state"), @@ -889,7 +893,11 @@ pub fn park_timeout(dur: Duration) { match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { Ok(_) => {} Err(NOTIFIED) => { - thread.inner.state.swap(EMPTY, SeqCst); + // We must read again here, even though we know it will be NOTIFY, + // to synchronize with an write in `unpark` that occurred since we + // last read. + let old = thread.inner.state.swap(EMPTY, SeqCst); + assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); return; } // should consume this notification, so prohibit spurious wakeups in next park. Err(_) => panic!("inconsistent park_timeout state"), @@ -1058,6 +1066,8 @@ impl Thread { /// [park]: fn.park.html #[stable(feature = "rust1", since = "1.0.0")] pub fn unpark(&self) { + // We must unconditionally write NOTIFIED here to + // synchronize with a read in `park`. match self.inner.state.swap(NOTIFIED, SeqCst) { EMPTY => return, // no one was waiting NOTIFIED => return, // already unparked From a3b87058e7e4e590ac2c00215785e62aa5f7bec5 Mon Sep 17 00:00:00 2001 From: James Duley Date: Tue, 18 Sep 2018 17:51:49 +0100 Subject: [PATCH 3/3] Expand synchronization comments in `park`/`unpark` --- src/libstd/thread/mod.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/libstd/thread/mod.rs b/src/libstd/thread/mod.rs index 1f03ee3e37deb..3987ae83866e5 100644 --- a/src/libstd/thread/mod.rs +++ b/src/libstd/thread/mod.rs @@ -800,9 +800,12 @@ pub fn park() { match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { Ok(_) => {} Err(NOTIFIED) => { - // We must read again here, even though we know it will be NOTIFY, - // to synchronize with an write in `unpark` that occurred since we - // last read. + // We must read here, even though we know it will be `NOTIFIED`. + // This is because `unpark` may have been called again since we read + // `NOTIFIED` in the `compare_exchange` above. We must perform an + // acquire operation that synchronizes with that `unpark` to observe + // any writes it made before the call to unpark. To do that we must + // read from the write it made to `state`. let old = thread.inner.state.swap(EMPTY, SeqCst); assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); return; @@ -893,9 +896,7 @@ pub fn park_timeout(dur: Duration) { match thread.inner.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { Ok(_) => {} Err(NOTIFIED) => { - // We must read again here, even though we know it will be NOTIFY, - // to synchronize with an write in `unpark` that occurred since we - // last read. + // We must read again here, see `park`. let old = thread.inner.state.swap(EMPTY, SeqCst); assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); return; @@ -1066,8 +1067,12 @@ impl Thread { /// [park]: fn.park.html #[stable(feature = "rust1", since = "1.0.0")] pub fn unpark(&self) { - // We must unconditionally write NOTIFIED here to - // synchronize with a read in `park`. + // To ensure the unparked thread will observe any writes we made + // before this call, we must perform a release operation that `park` + // can synchronize with. To do that we must write `NOTIFIED` even if + // `state` is already `NOTIFIED`. That is why this must be a swap + // rather than a compare-and-swap that returns if it reads `NOTIFIED` + // on failure. match self.inner.state.swap(NOTIFIED, SeqCst) { EMPTY => return, // no one was waiting NOTIFIED => return, // already unparked