@@ -9,26 +9,37 @@ pub struct Thread(task_queue::JoinHandle);
9
9
10
10
pub const DEFAULT_MIN_STACK_SIZE : usize = 4096 ;
11
11
12
+ pub use self :: task_queue:: JoinNotifier ;
13
+
12
14
mod task_queue {
13
- use crate :: sync :: mpsc ;
15
+ use super :: wait_notify ;
14
16
use crate :: sync:: { Mutex , MutexGuard , Once } ;
15
17
16
- pub type JoinHandle = mpsc:: Receiver < ( ) > ;
18
+ pub type JoinHandle = wait_notify:: Waiter ;
19
+
20
+ pub struct JoinNotifier ( Option < wait_notify:: Notifier > ) ;
21
+
22
+ impl Drop for JoinNotifier {
23
+ fn drop ( & mut self ) {
24
+ self . 0 . take ( ) . unwrap ( ) . notify ( ) ;
25
+ }
26
+ }
17
27
18
28
pub ( super ) struct Task {
19
29
p : Box < dyn FnOnce ( ) > ,
20
- done : mpsc :: Sender < ( ) > ,
30
+ done : JoinNotifier ,
21
31
}
22
32
23
33
impl Task {
24
34
pub ( super ) fn new ( p : Box < dyn FnOnce ( ) > ) -> ( Task , JoinHandle ) {
25
- let ( done, recv) = mpsc:: channel ( ) ;
35
+ let ( done, recv) = wait_notify:: new ( ) ;
36
+ let done = JoinNotifier ( Some ( done) ) ;
26
37
( Task { p, done } , recv)
27
38
}
28
39
29
- pub ( super ) fn run ( self ) {
40
+ pub ( super ) fn run ( self ) -> JoinNotifier {
30
41
( self . p ) ( ) ;
31
- let _ = self . done . send ( ( ) ) ;
42
+ self . done
32
43
}
33
44
}
34
45
@@ -47,6 +58,48 @@ mod task_queue {
47
58
}
48
59
}
49
60
61
+ /// This module provides a synchronization primitive that does not use thread
62
+ /// local variables. This is needed for signaling that a thread has finished
63
+ /// execution. The signal is sent once all TLS destructors have finished at
64
+ /// which point no new thread locals should be created.
65
+ pub mod wait_notify {
66
+ use super :: super :: waitqueue:: { SpinMutex , WaitQueue , WaitVariable } ;
67
+ use crate :: sync:: Arc ;
68
+
69
+ pub struct Notifier ( Arc < SpinMutex < WaitVariable < bool > > > ) ;
70
+
71
+ impl Notifier {
72
+ /// Notify the waiter. The waiter is either notified right away (if
73
+ /// currently blocked in `Waiter::wait()`) or later when it calls the
74
+ /// `Waiter::wait()` method.
75
+ pub fn notify ( self ) {
76
+ let mut guard = self . 0 . lock ( ) ;
77
+ * guard. lock_var_mut ( ) = true ;
78
+ let _ = WaitQueue :: notify_one ( guard) ;
79
+ }
80
+ }
81
+
82
+ pub struct Waiter ( Arc < SpinMutex < WaitVariable < bool > > > ) ;
83
+
84
+ impl Waiter {
85
+ /// Wait for a notification. If `Notifier::notify()` has already been
86
+ /// called, this will return immediately, otherwise the current thread
87
+ /// is blocked until notified.
88
+ pub fn wait ( self ) {
89
+ let guard = self . 0 . lock ( ) ;
90
+ if * guard. lock_var ( ) {
91
+ return ;
92
+ }
93
+ WaitQueue :: wait ( guard, || { } ) ;
94
+ }
95
+ }
96
+
97
+ pub fn new ( ) -> ( Notifier , Waiter ) {
98
+ let inner = Arc :: new ( SpinMutex :: new ( WaitVariable :: new ( false ) ) ) ;
99
+ ( Notifier ( inner. clone ( ) ) , Waiter ( inner) )
100
+ }
101
+ }
102
+
50
103
impl Thread {
51
104
// unsafe: see thread::Builder::spawn_unchecked for safety requirements
52
105
pub unsafe fn new ( _stack : usize , p : Box < dyn FnOnce ( ) > ) -> io:: Result < Thread > {
@@ -57,7 +110,7 @@ impl Thread {
57
110
Ok ( Thread ( handle) )
58
111
}
59
112
60
- pub ( super ) fn entry ( ) {
113
+ pub ( super ) fn entry ( ) -> JoinNotifier {
61
114
let mut pending_tasks = task_queue:: lock ( ) ;
62
115
let task = rtunwrap ! ( Some , pending_tasks. pop( ) ) ;
63
116
drop ( pending_tasks) ; // make sure to not hold the task queue lock longer than necessary
@@ -78,7 +131,7 @@ impl Thread {
78
131
}
79
132
80
133
pub fn join ( self ) {
81
- let _ = self . 0 . recv ( ) ;
134
+ self . 0 . wait ( ) ;
82
135
}
83
136
}
84
137
0 commit comments