|
| 1 | +//! A wait-flag-based thread parker. |
| 2 | +//! |
| 3 | +//! Some operating systems provide low-level parking primitives like wait counts, |
| 4 | +//! event flags or semaphores which are not susceptible to race conditions (meaning |
| 5 | +//! the wakeup can occur before the wait operation). To implement the `std` thread |
| 6 | +//! parker on top of these primitives, we only have to ensure that parking is fast |
| 7 | +//! when the thread token is available, the atomic ordering guarantees are maintained |
| 8 | +//! and spurious wakeups are minimized. |
| 9 | +//! |
| 10 | +//! To achieve this, this parker uses an atomic variable with three states: `EMPTY`, |
| 11 | +//! `PARKED` and `NOTIFIED`: |
| 12 | +//! * `EMPTY` means the token has not been made available, but the thread is not |
| 13 | +//! currently waiting on it. |
| 14 | +//! * `PARKED` means the token is not available and the thread is parked. |
| 15 | +//! * `NOTIFIED` means the token is available. |
| 16 | +//! |
| 17 | +//! `park` and `park_timeout` change the state from `EMPTY` to `PARKED` and from |
| 18 | +//! `NOTIFIED` to `EMPTY`. If the state was `NOTIFIED`, the thread was unparked and |
| 19 | +//! execution can continue without calling into the OS. If the state was `EMPTY`, |
| 20 | +//! the token is not available and the thread waits on the primitive (here called |
| 21 | +//! "wait flag"). |
| 22 | +//! |
| 23 | +//! `unpark` changes the state to `NOTIFIED`. If the state was `PARKED`, the thread |
| 24 | +//! is or will be sleeping on the wait flag, so we raise it. |
| 25 | +
|
| 26 | +use crate::pin::Pin; |
| 27 | +use crate::sync::atomic::AtomicI8; |
| 28 | +use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release}; |
| 29 | +use crate::sys::wait_flag::WaitFlag; |
| 30 | +use crate::time::Duration; |
| 31 | + |
| 32 | +const EMPTY: i8 = 0; |
| 33 | +const PARKED: i8 = -1; |
| 34 | +const NOTIFIED: i8 = 1; |
| 35 | + |
| 36 | +pub struct Parker { |
| 37 | + state: AtomicI8, |
| 38 | + wait_flag: WaitFlag, |
| 39 | +} |
| 40 | + |
| 41 | +impl Parker { |
| 42 | + /// Construct a parker for the current thread. The UNIX parker |
| 43 | + /// implementation requires this to happen in-place. |
| 44 | + pub unsafe fn new(parker: *mut Parker) { |
| 45 | + parker.write(Parker { state: AtomicI8::new(EMPTY), wait_flag: WaitFlag::new() }) |
| 46 | + } |
| 47 | + |
| 48 | + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. |
| 49 | + pub unsafe fn park(self: Pin<&Self>) { |
| 50 | + match self.state.fetch_sub(1, Acquire) { |
| 51 | + // NOTIFIED => EMPTY |
| 52 | + NOTIFIED => return, |
| 53 | + // EMPTY => PARKED |
| 54 | + EMPTY => (), |
| 55 | + _ => panic!("inconsistent park state"), |
| 56 | + } |
| 57 | + |
| 58 | + // Avoid waking up from spurious wakeups (these are quite likely, see below). |
| 59 | + loop { |
| 60 | + self.wait_flag.wait(); |
| 61 | + |
| 62 | + match self.state.compare_exchange(NOTIFIED, EMPTY, Acquire, Relaxed) { |
| 63 | + Ok(_) => return, |
| 64 | + Err(PARKED) => (), |
| 65 | + Err(_) => panic!("inconsistent park state"), |
| 66 | + } |
| 67 | + } |
| 68 | + } |
| 69 | + |
| 70 | + // This implementation doesn't require `unsafe` and `Pin`, but other implementations do. |
| 71 | + pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) { |
| 72 | + match self.state.fetch_sub(1, Acquire) { |
| 73 | + NOTIFIED => return, |
| 74 | + EMPTY => (), |
| 75 | + _ => panic!("inconsistent park state"), |
| 76 | + } |
| 77 | + |
| 78 | + self.wait_flag.wait_timeout(dur); |
| 79 | + |
| 80 | + // Either a wakeup or a timeout occurred. Wakeups may be spurious, as there can be |
| 81 | + // a race condition when `unpark` is performed between receiving the timeout and |
| 82 | + // resetting the state, resulting in the eventflag being set unnecessarily. `park` |
| 83 | + // is protected against this by looping until the token is actually given, but |
| 84 | + // here we cannot easily tell. |
| 85 | + |
| 86 | + // Use `swap` to provide acquire ordering. |
| 87 | + match self.state.swap(EMPTY, Acquire) { |
| 88 | + NOTIFIED => (), |
| 89 | + PARKED => (), |
| 90 | + _ => panic!("inconsistent park state"), |
| 91 | + } |
| 92 | + } |
| 93 | + |
| 94 | + // This implementation doesn't require `Pin`, but other implementations do. |
| 95 | + pub fn unpark(self: Pin<&Self>) { |
| 96 | + let state = self.state.swap(NOTIFIED, Release); |
| 97 | + |
| 98 | + if state == PARKED { |
| 99 | + self.wait_flag.raise(); |
| 100 | + } |
| 101 | + } |
| 102 | +} |
0 commit comments