Skip to content

Commit eb5863a

Browse files
authored
Rollup merge of rust-lang#102773 - joboet:apple_parker, r=thomcc
Use semaphores for thread parking on Apple platforms Currently we use a mutex-condvar pair for thread parking on Apple systems. Unfortunately, `pthread_cond_timedwait` uses the real-time clock for measuring time, which causes problems when the system time changes. The parking implementation in this PR uses a semaphore instead, which measures monotonic time by default, avoiding these issues. As a further benefit, this has the potential to improve performance a bit, since `unpark` does not need to wait for a lock to be released. Since the Mach semaphores are poorly documented (I could not find availability or stability guarantees for instance), this uses a [dispatch semaphore](https://developer.apple.com/documentation/dispatch/dispatch_semaphore?language=objc) instead. While it adds a layer of indirection (it uses Mach semaphores internally), the overhead is probably negligible. Tested on macOS 12.5. r? ````@thomcc````
2 parents 3654dc7 + c320ab9 commit eb5863a

File tree

3 files changed

+165
-1
lines changed

3 files changed

+165
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
//! Thread parking for Darwin-based systems.
2+
//!
3+
//! Darwin actually has futex syscalls (`__ulock_wait`/`__ulock_wake`), but they
4+
//! cannot be used in `std` because they are non-public (their use will lead to
5+
//! rejection from the App Store) and because they are only available starting
6+
//! with macOS version 10.12, even though the minimum target version is 10.7.
7+
//!
8+
//! Therefore, we need to look for other synchronization primitives. Luckily, Darwin
9+
//! supports semaphores, which allow us to implement the behaviour we need with
10+
//! only one primitive (as opposed to a mutex-condvar pair). We use the semaphore
11+
//! provided by libdispatch, as the underlying Mach semaphore is only dubiously
12+
//! public.
13+
14+
use crate::pin::Pin;
15+
use crate::sync::atomic::{
16+
AtomicI8,
17+
Ordering::{Acquire, Release},
18+
};
19+
use crate::time::Duration;
20+
21+
type dispatch_semaphore_t = *mut crate::ffi::c_void;
22+
type dispatch_time_t = u64;
23+
24+
const DISPATCH_TIME_NOW: dispatch_time_t = 0;
25+
const DISPATCH_TIME_FOREVER: dispatch_time_t = !0;
26+
27+
// Contained in libSystem.dylib, which is linked by default.
28+
extern "C" {
29+
fn dispatch_time(when: dispatch_time_t, delta: i64) -> dispatch_time_t;
30+
fn dispatch_semaphore_create(val: isize) -> dispatch_semaphore_t;
31+
fn dispatch_semaphore_wait(dsema: dispatch_semaphore_t, timeout: dispatch_time_t) -> isize;
32+
fn dispatch_semaphore_signal(dsema: dispatch_semaphore_t) -> isize;
33+
fn dispatch_release(object: *mut crate::ffi::c_void);
34+
}
35+
36+
const EMPTY: i8 = 0;
37+
const NOTIFIED: i8 = 1;
38+
const PARKED: i8 = -1;
39+
40+
pub struct Parker {
41+
semaphore: dispatch_semaphore_t,
42+
state: AtomicI8,
43+
}
44+
45+
unsafe impl Sync for Parker {}
46+
unsafe impl Send for Parker {}
47+
48+
impl Parker {
49+
pub unsafe fn new(parker: *mut Parker) {
50+
let semaphore = dispatch_semaphore_create(0);
51+
assert!(
52+
!semaphore.is_null(),
53+
"failed to create dispatch semaphore for thread synchronization"
54+
);
55+
parker.write(Parker { semaphore, state: AtomicI8::new(EMPTY) })
56+
}
57+
58+
// Does not need `Pin`, but other implementation do.
59+
pub unsafe fn park(self: Pin<&Self>) {
60+
// The semaphore counter must be zero at this point, because unparking
61+
// threads will not actually increase it until we signalled that we
62+
// are waiting.
63+
64+
// Change NOTIFIED to EMPTY and EMPTY to PARKED.
65+
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
66+
return;
67+
}
68+
69+
// Another thread may increase the semaphore counter from this point on.
70+
// If it is faster than us, we will decrement it again immediately below.
71+
// If we are faster, we wait.
72+
73+
// Ensure that the semaphore counter has actually been decremented, even
74+
// if the call timed out for some reason.
75+
while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {}
76+
77+
// At this point, the semaphore counter is zero again.
78+
79+
// We were definitely woken up, so we don't need to check the state.
80+
// Still, we need to reset the state using a swap to observe the state
81+
// change with acquire ordering.
82+
self.state.swap(EMPTY, Acquire);
83+
}
84+
85+
// Does not need `Pin`, but other implementation do.
86+
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
87+
if self.state.fetch_sub(1, Acquire) == NOTIFIED {
88+
return;
89+
}
90+
91+
let nanos = dur.as_nanos().try_into().unwrap_or(i64::MAX);
92+
let timeout = dispatch_time(DISPATCH_TIME_NOW, nanos);
93+
94+
let timeout = dispatch_semaphore_wait(self.semaphore, timeout) != 0;
95+
96+
let state = self.state.swap(EMPTY, Acquire);
97+
if state == NOTIFIED && timeout {
98+
// If the state was NOTIFIED but semaphore_wait returned without
99+
// decrementing the count because of a timeout, it means another
100+
// thread is about to call semaphore_signal. We must wait for that
101+
// to happen to ensure the semaphore count is reset.
102+
while dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER) != 0 {}
103+
} else {
104+
// Either a timeout occurred and we reset the state before any thread
105+
// tried to wake us up, or we were woken up and reset the state,
106+
// making sure to observe the state change with acquire ordering.
107+
// Either way, the semaphore counter is now zero again.
108+
}
109+
}
110+
111+
// Does not need `Pin`, but other implementation do.
112+
pub fn unpark(self: Pin<&Self>) {
113+
let state = self.state.swap(NOTIFIED, Release);
114+
if state == PARKED {
115+
unsafe {
116+
dispatch_semaphore_signal(self.semaphore);
117+
}
118+
}
119+
}
120+
}
121+
122+
impl Drop for Parker {
123+
fn drop(&mut self) {
124+
// SAFETY:
125+
// We always ensure that the semaphore count is reset, so this will
126+
// never cause an exception.
127+
unsafe {
128+
dispatch_release(self.semaphore);
129+
}
130+
}
131+
}

library/std/src/sys/unix/thread_parker/mod.rs

+12-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,18 @@
1111
)))]
1212

1313
cfg_if::cfg_if! {
14-
if #[cfg(target_os = "netbsd")] {
14+
if #[cfg(all(
15+
any(
16+
target_os = "macos",
17+
target_os = "ios",
18+
target_os = "watchos",
19+
target_os = "tvos",
20+
),
21+
not(miri),
22+
))] {
23+
mod darwin;
24+
pub use darwin::Parker;
25+
} else if #[cfg(target_os = "netbsd")] {
1526
mod netbsd;
1627
pub use netbsd::Parker;
1728
} else {

library/std/src/thread/tests.rs

+22
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,28 @@ fn test_try_panic_any_message_unit_struct() {
244244
}
245245
}
246246

247+
#[test]
248+
fn test_park_unpark_before() {
249+
for _ in 0..10 {
250+
thread::current().unpark();
251+
thread::park();
252+
}
253+
}
254+
255+
#[test]
256+
fn test_park_unpark_called_other_thread() {
257+
for _ in 0..10 {
258+
let th = thread::current();
259+
260+
let _guard = thread::spawn(move || {
261+
super::sleep(Duration::from_millis(50));
262+
th.unpark();
263+
});
264+
265+
thread::park();
266+
}
267+
}
268+
247269
#[test]
248270
fn test_park_timeout_unpark_before() {
249271
for _ in 0..10 {

0 commit comments

Comments
 (0)