|
1 | 1 | use crate::cell::{Cell, UnsafeCell};
|
2 |
| -use crate::sync::atomic::{AtomicBool, Ordering}; |
3 |
| -use crate::sync::mpsc::{self, channel, Sender}; |
| 2 | +use crate::sync::atomic::{AtomicU8, Ordering}; |
| 3 | +use crate::sync::mpsc::{channel, Sender}; |
4 | 4 | use crate::thread::{self, LocalKey};
|
5 | 5 | use crate::thread_local;
|
6 | 6 |
|
@@ -217,46 +217,100 @@ fn dtors_in_dtors_in_dtors_const_init() {
|
217 | 217 | // thread::yield_now and running the test several times.
|
218 | 218 | #[test]
|
219 | 219 | fn join_orders_after_tls_destructors() {
|
220 |
| - static THREAD2_LAUNCHED: AtomicBool = AtomicBool::new(false); |
| 220 | + // We emulate a synchronous MPSC rendezvous channel using only atomics and |
| 221 | + // thread::yield_now. We can't use std::mpsc as the implementation itself |
| 222 | + // may rely on thread locals. |
| 223 | + // |
| 224 | + // The basic state machine for an SPSC rendezvous channel is: |
| 225 | + // FRESH -> THREAD1_WAITING -> MAIN_THREAD_RENDEZVOUS |
| 226 | + // where the first transition is done by the “receiving” thread and the 2nd |
| 227 | + // transition is done by the “sending” thread. |
| 228 | + // |
| 229 | + // We add an additional state `THREAD2_LAUNCHED` between `FRESH` and |
| 230 | + // `THREAD1_WAITING` to block until all threads are actually running. |
| 231 | + // |
| 232 | + // A thread that joins on the “receiving” thread completion should never |
| 233 | + // observe the channel in the `THREAD1_WAITING` state. If this does occur, |
| 234 | + // we switch to the “poison” state `THREAD2_JOINED` and panic all around. |
| 235 | + // (This is equivalent to “sending” from an alternate producer thread.) |
| 236 | + const FRESH: u8 = 0; |
| 237 | + const THREAD2_LAUNCHED: u8 = 1; |
| 238 | + const THREAD1_WAITING: u8 = 2; |
| 239 | + const MAIN_THREAD_RENDEZVOUS: u8 = 3; |
| 240 | + const THREAD2_JOINED: u8 = 4; |
| 241 | + static SYNC_STATE: AtomicU8 = AtomicU8::new(FRESH); |
221 | 242 |
|
222 | 243 | for _ in 0..10 {
|
223 |
| - let (tx, rx) = mpsc::sync_channel(0); |
224 |
| - THREAD2_LAUNCHED.store(false, Ordering::SeqCst); |
| 244 | + SYNC_STATE.store(FRESH, Ordering::SeqCst); |
| 245 | + |
| 246 | + let jh = thread::Builder::new() |
| 247 | + .name("thread1".into()) |
| 248 | + .spawn(move || { |
| 249 | + struct TlDrop; |
| 250 | + |
| 251 | + impl Drop for TlDrop { |
| 252 | + fn drop(&mut self) { |
| 253 | + loop { |
| 254 | + match SYNC_STATE.load(Ordering::SeqCst) { |
| 255 | + FRESH => thread::yield_now(), |
| 256 | + THREAD2_LAUNCHED => break, |
| 257 | + v => unreachable!("sync state: {}", v), |
| 258 | + } |
| 259 | + } |
| 260 | + let mut sync_state = SYNC_STATE.swap(THREAD1_WAITING, Ordering::SeqCst); |
| 261 | + loop { |
| 262 | + match sync_state { |
| 263 | + THREAD2_LAUNCHED | THREAD1_WAITING => thread::yield_now(), |
| 264 | + MAIN_THREAD_RENDEZVOUS => break, |
| 265 | + THREAD2_JOINED => panic!( |
| 266 | + "Thread 1 still running after thread 2 joined on thread 1" |
| 267 | + ), |
| 268 | + v => unreachable!("sync state: {}", v), |
| 269 | + } |
| 270 | + sync_state = SYNC_STATE.load(Ordering::SeqCst); |
| 271 | + } |
| 272 | + } |
| 273 | + } |
225 | 274 |
|
226 |
| - let jh = thread::spawn(move || { |
227 |
| - struct RecvOnDrop(Cell<Option<mpsc::Receiver<()>>>); |
| 275 | + thread_local! { |
| 276 | + static TL_DROP: TlDrop = TlDrop; |
| 277 | + } |
228 | 278 |
|
229 |
| - impl Drop for RecvOnDrop { |
230 |
| - fn drop(&mut self) { |
231 |
| - let rx = self.0.take().unwrap(); |
232 |
| - while !THREAD2_LAUNCHED.load(Ordering::SeqCst) { |
233 |
| - thread::yield_now(); |
| 279 | + TL_DROP.with(|_| {}) |
| 280 | + }) |
| 281 | + .unwrap(); |
| 282 | + |
| 283 | + let jh2 = thread::Builder::new() |
| 284 | + .name("thread2".into()) |
| 285 | + .spawn(move || { |
| 286 | + assert_eq!(SYNC_STATE.swap(THREAD2_LAUNCHED, Ordering::SeqCst), FRESH); |
| 287 | + jh.join().unwrap(); |
| 288 | + match SYNC_STATE.swap(THREAD2_JOINED, Ordering::SeqCst) { |
| 289 | + MAIN_THREAD_RENDEZVOUS => return, |
| 290 | + THREAD2_LAUNCHED | THREAD1_WAITING => { |
| 291 | + panic!("Thread 2 running after thread 1 join before main thread rendezvous") |
234 | 292 | }
|
235 |
| - rx.recv().unwrap(); |
| 293 | + v => unreachable!("sync state: {:?}", v), |
236 | 294 | }
|
| 295 | + }) |
| 296 | + .unwrap(); |
| 297 | + |
| 298 | + loop { |
| 299 | + match SYNC_STATE.compare_exchange_weak( |
| 300 | + THREAD1_WAITING, |
| 301 | + MAIN_THREAD_RENDEZVOUS, |
| 302 | + Ordering::SeqCst, |
| 303 | + Ordering::SeqCst, |
| 304 | + ) { |
| 305 | + Ok(_) => break, |
| 306 | + Err(FRESH) => thread::yield_now(), |
| 307 | + Err(THREAD2_LAUNCHED) => thread::yield_now(), |
| 308 | + Err(THREAD2_JOINED) => { |
| 309 | + panic!("Main thread rendezvous after thread 2 joined thread 1") |
| 310 | + } |
| 311 | + v => unreachable!("sync state: {:?}", v), |
237 | 312 | }
|
238 |
| - |
239 |
| - thread_local! { |
240 |
| - static TL_RX: RecvOnDrop = RecvOnDrop(Cell::new(None)); |
241 |
| - } |
242 |
| - |
243 |
| - TL_RX.with(|v| v.0.set(Some(rx))) |
244 |
| - }); |
245 |
| - |
246 |
| - let tx_clone = tx.clone(); |
247 |
| - let jh2 = thread::spawn(move || { |
248 |
| - THREAD2_LAUNCHED.store(true, Ordering::SeqCst); |
249 |
| - jh.join().unwrap(); |
250 |
| - tx_clone.send(()).expect_err( |
251 |
| - "Expecting channel to be closed because thread 1 TLS destructors must've run", |
252 |
| - ); |
253 |
| - }); |
254 |
| - |
255 |
| - while !THREAD2_LAUNCHED.load(Ordering::SeqCst) { |
256 |
| - thread::yield_now(); |
257 | 313 | }
|
258 |
| - thread::yield_now(); |
259 |
| - tx.send(()).expect("Expecting channel to be live because thread 2 must block on join"); |
260 | 314 | jh2.join().unwrap();
|
261 | 315 | }
|
262 | 316 | }
|
0 commit comments