forked from rust-lang/rust
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Rollup merge of rust-lang#102773 - joboet:apple_parker, r=thomcc
Use semaphores for thread parking on Apple platforms Currently we use a mutex-condvar pair for thread parking on Apple systems. Unfortunately, `pthread_cond_timedwait` uses the real-time clock for measuring time, which causes problems when the system time changes. The parking implementation in this PR uses a semaphore instead, which measures monotonic time by default, avoiding these issues. As a further benefit, this has the potential to improve performance a bit, since `unpark` does not need to wait for a lock to be released. Since the Mach semaphores are poorly documented (I could not find availability or stability guarantees for instance), this uses a [dispatch semaphore](https://developer.apple.com/documentation/dispatch/dispatch_semaphore?language=objc) instead. While it adds a layer of indirection (it uses Mach semaphores internally), the overhead is probably negligible. Tested on macOS 12.5. r? ``@thomcc``
- Loading branch information
Showing
3 changed files
with
165 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
//! Thread parking for Darwin-based systems. | ||
//! | ||
//! Darwin actually has futex syscalls (`__ulock_wait`/`__ulock_wake`), but they | ||
//! cannot be used in `std` because they are non-public (their use will lead to | ||
//! rejection from the App Store) and because they are only available starting | ||
//! with macOS version 10.12, even though the minimum target version is 10.7. | ||
//! | ||
//! Therefore, we need to look for other synchronization primitives. Luckily, Darwin | ||
//! supports semaphores, which allow us to implement the behaviour we need with | ||
//! only one primitive (as opposed to a mutex-condvar pair). We use the semaphore | ||
//! provided by libdispatch, as the underlying Mach semaphore is only dubiously | ||
//! public. | ||
|
||
use crate::pin::Pin; | ||
use crate::sync::atomic::{ | ||
AtomicI8, | ||
Ordering::{Acquire, Release}, | ||
}; | ||
use crate::time::Duration; | ||
|
||
type dispatch_semaphore_t = *mut crate::ffi::c_void; | ||
type dispatch_time_t = u64; | ||
|
||
const DISPATCH_TIME_NOW: dispatch_time_t = 0; | ||
const DISPATCH_TIME_FOREVER: dispatch_time_t = !0; | ||
|
||
// Contained in libSystem.dylib, which is linked by default. | ||
extern "C" { | ||
fn dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t; | ||
fn dispatch_semaphore_create(val: isize) -> dispatch_semaphore_t; | ||
fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> isize; | ||
fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> isize; | ||
fn dispatch_release(object: *mut crate::ffi::c_void); | ||
} | ||
|
||
const EMPTY: i8 = 0; | ||
const NOTIFIED: i8 = 1; | ||
const PARKED: i8 = -1; | ||
|
||
pub struct Parker { | ||
semaphore: dispatch_semaphore_t, | ||
state: AtomicI8, | ||
} | ||
|
||
unsafe impl Sync for Parker {} | ||
unsafe impl Send for Parker {} | ||
|
||
impl Parker { | ||
pub unsafe fn new(parker: *mut Parker) { | ||
let semaphore = dispatch_semaphore_create(0); | ||
assert!( | ||
!semaphore.is_null(), | ||
"failed to create dispatch semaphore for thread synchronization" | ||
); | ||
parker.write(Parker { semaphore, state: AtomicI8::new(EMPTY) }) | ||
} | ||
|
||
// Does not need `Pin`, but other implementation do. | ||
pub unsafe fn park(self: Pin<&Self>) { | ||
// The semaphore counter must be zero at this point, because unparking | ||
// threads will not actually increase it until we signalled that we | ||
// are waiting. | ||
|
||
// Change NOTIFIED to EMPTY and EMPTY to PARKED. | ||
if self.state.fetch_sub(1, Acquire) == NOTIFIED { | ||
return; | ||
} | ||
|
||
// Another thread may increase the semaphore counter from this point on. | ||
// If it is faster than us, we will decrement it again immediately below. | ||
// If we are faster, we wait. | ||
|
||
// Ensure that the semaphore counter has actually been decremented, even | ||
// if the call timed out for some reason. | ||
while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {} | ||
|
||
// At this point, the semaphore counter is zero again. | ||
|
||
// We were definitely woken up, so we don't need to check the state. | ||
// Still, we need to reset the state using a swap to observe the state | ||
// change with acquire ordering. | ||
self.state.swap(EMPTY, Acquire); | ||
} | ||
|
||
// Does not need `Pin`, but other implementation do. | ||
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { | ||
if self.state.fetch_sub(1, Acquire) == NOTIFIED { | ||
return; | ||
} | ||
|
||
let nanos = dur.as_nanos().try_into().unwrap_or(i64::MAX); | ||
let timeout = dispatch_time(DISPATCH_TIME_NOW, nanos); | ||
|
||
let timeout = dispatch_semaphore_wait(self.semaphore, timeout) != 0; | ||
|
||
let state = self.state.swap(EMPTY, Acquire); | ||
if state == NOTIFIED && timeout { | ||
// If the state was NOTIFIED but semaphore_wait returned without | ||
// decrementing the count because of a timeout, it means another | ||
// thread is about to call semaphore_signal. We must wait for that | ||
// to happen to ensure the semaphore count is reset. | ||
while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {} | ||
} else { | ||
// Either a timeout occurred and we reset the state before any thread | ||
// tried to wake us up, or we were woken up and reset the state, | ||
// making sure to observe the state change with acquire ordering. | ||
// Either way, the semaphore counter is now zero again. | ||
} | ||
} | ||
|
||
// Does not need `Pin`, but other implementation do. | ||
pub fn unpark(self: Pin<&Self>) { | ||
let state = self.state.swap(NOTIFIED, Release); | ||
if state == PARKED { | ||
unsafe { | ||
dispatch_semaphore_signal(self.semaphore); | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl Drop for Parker { | ||
fn drop(&mut self) { | ||
// SAFETY: | ||
// We always ensure that the semaphore count is reset, so this will | ||
// never cause an exception. | ||
unsafe { | ||
dispatch_release(self.semaphore); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters