21
21
//! "wait flag").
22
22
//!
23
23
//! `unpark` changes the state to `NOTIFIED`. If the state was `PARKED`, the thread
24
- //! is or will be sleeping on the wait flag, so we raise it. Only the first thread
25
- //! to call `unpark` will raise the wait flag, so spurious wakeups are avoided
26
- //! (this is especially important for semaphores).
24
+ //! is or will be sleeping on the wait flag, so we raise it.
27
25
28
26
use crate :: pin:: Pin ;
29
27
use crate :: sync:: atomic:: AtomicI8 ;
30
- use crate :: sync:: atomic:: Ordering :: SeqCst ;
28
+ use crate :: sync:: atomic:: Ordering :: { Relaxed , SeqCst } ;
31
29
use crate :: sys:: wait_flag:: WaitFlag ;
32
30
use crate :: time:: Duration ;
33
31
@@ -49,39 +47,48 @@ impl Parker {
49
47
50
48
// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
51
49
pub unsafe fn park ( self : Pin < & Self > ) {
52
- // The state values are chosen so that this subtraction changes
53
- // `NOTIFIED` to `EMPTY` and `EMPTY` to `PARKED`.
54
- let state = self . state . fetch_sub ( 1 , SeqCst ) ;
55
- match state {
56
- EMPTY => ( ) ,
50
+ match self . state . fetch_sub ( 1 , SeqCst ) {
51
+ // NOTIFIED => EMPTY
57
52
NOTIFIED => return ,
53
+ // EMPTY => PARKED
54
+ EMPTY => ( ) ,
58
55
_ => panic ! ( "inconsistent park state" ) ,
59
56
}
60
57
61
- self . wait_flag . wait ( ) ;
58
+ // Avoid waking up from spurious wakeups (these are quite likely, see below).
59
+ loop {
60
+ self . wait_flag . wait ( ) ;
62
61
63
- // We need to do a load here to use `Acquire` ordering.
64
- self . state . swap ( EMPTY , SeqCst ) ;
62
+ match self . state . compare_exchange ( NOTIFIED , EMPTY , SeqCst , Relaxed ) {
63
+ Ok ( _) => return ,
64
+ Err ( PARKED ) => ( ) ,
65
+ Err ( _) => panic ! ( "inconsistent park state" ) ,
66
+ }
67
+ }
65
68
}
66
69
67
70
// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
68
71
pub unsafe fn park_timeout ( self : Pin < & Self > , dur : Duration ) {
69
- let state = self . state . fetch_sub ( 1 , SeqCst ) ;
70
- match state {
71
- EMPTY => ( ) ,
72
+ match self . state . fetch_sub ( 1 , SeqCst ) {
72
73
NOTIFIED => return ,
74
+ EMPTY => ( ) ,
73
75
_ => panic ! ( "inconsistent park state" ) ,
74
76
}
75
77
76
- let wakeup = self . wait_flag . wait_timeout ( dur) ;
77
- let state = self . state . swap ( EMPTY , SeqCst ) ;
78
- if state == NOTIFIED && !wakeup {
79
- // The token was made available after the wait timed out, but before
80
- // we reset the state, so we need to reset the wait flag to avoid
81
- // spurious wakeups. This wait has no timeout, but we know it will
82
- // return quickly, as the unparking thread will definitely raise the
83
- // flag if it has not already done so.
84
- self . wait_flag . wait ( ) ;
78
+ self . wait_flag . wait_timeout ( dur) ;
79
+
80
+ // Either a wakeup or a timeout occurred. Wakeups may be spurious, as there can be
81
+ // a race condition when `unpark` is performed between receiving the timeout and
82
+ // resetting the state, resulting in the eventflag being set unnecessarily. `park`
83
+ // is protected against this by looping until the token is actually given, but
84
+ // here we cannot easily tell.
85
+
86
+ // Use `swap` to provide acquire ordering (not strictly necessary, but all other
87
+ // implementations do).
88
+ match self . state . swap ( EMPTY , SeqCst ) {
89
+ NOTIFIED => ( ) ,
90
+ PARKED => ( ) ,
91
+ _ => panic ! ( "inconsistent park state" ) ,
85
92
}
86
93
}
87
94
0 commit comments