Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[beta] backports #85588

Merged
merged 11 commits into from
May 23, 2021
Prev Previous commit
Next Next commit
Use atomics in join_orders_after_tls_destructors test
std::sync::mpsc uses thread locals and depending on the order TLS dtors
are run `rx.recv()` can panic when used in a TLS dtor.
  • Loading branch information
mzohreva authored and Mark-Simulacrum committed May 22, 2021
commit 5d1fdf44a996a71015a1e086f060e9ef36f987d6
122 changes: 88 additions & 34 deletions library/std/src/thread/local/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::cell::{Cell, UnsafeCell};
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::mpsc::{self, channel, Sender};
use crate::sync::atomic::{AtomicU8, Ordering};
use crate::sync::mpsc::{channel, Sender};
use crate::thread::{self, LocalKey};
use crate::thread_local;

@@ -217,46 +217,100 @@ fn dtors_in_dtors_in_dtors_const_init() {
// thread::yield_now and running the test several times.
#[test]
fn join_orders_after_tls_destructors() {
static THREAD2_LAUNCHED: AtomicBool = AtomicBool::new(false);
// We emulate a synchronous MPSC rendezvous channel using only atomics and
// thread::yield_now. We can't use std::mpsc as the implementation itself
// may rely on thread locals.
//
// The basic state machine for an SPSC rendezvous channel is:
// FRESH -> THREAD1_WAITING -> MAIN_THREAD_RENDEZVOUS
// where the first transition is done by the “receiving” thread and the 2nd
// transition is done by the “sending” thread.
//
// We add an additional state `THREAD2_LAUNCHED` between `FRESH` and
// `THREAD1_WAITING` to block until all threads are actually running.
//
// A thread that joins on the “receiving” thread completion should never
// observe the channel in the `THREAD1_WAITING` state. If this does occur,
// we switch to the “poison” state `THREAD2_JOINED` and panic all around.
// (This is equivalent to “sending” from an alternate producer thread.)
const FRESH: u8 = 0;
const THREAD2_LAUNCHED: u8 = 1;
const THREAD1_WAITING: u8 = 2;
const MAIN_THREAD_RENDEZVOUS: u8 = 3;
const THREAD2_JOINED: u8 = 4;
static SYNC_STATE: AtomicU8 = AtomicU8::new(FRESH);

for _ in 0..10 {
let (tx, rx) = mpsc::sync_channel(0);
THREAD2_LAUNCHED.store(false, Ordering::SeqCst);
SYNC_STATE.store(FRESH, Ordering::SeqCst);

let jh = thread::Builder::new()
.name("thread1".into())
.spawn(move || {
struct TlDrop;

impl Drop for TlDrop {
fn drop(&mut self) {
loop {
match SYNC_STATE.load(Ordering::SeqCst) {
FRESH => thread::yield_now(),
THREAD2_LAUNCHED => break,
v => unreachable!("sync state: {}", v),
}
}
let mut sync_state = SYNC_STATE.swap(THREAD1_WAITING, Ordering::SeqCst);
loop {
match sync_state {
THREAD2_LAUNCHED | THREAD1_WAITING => thread::yield_now(),
MAIN_THREAD_RENDEZVOUS => break,
THREAD2_JOINED => panic!(
"Thread 1 still running after thread 2 joined on thread 1"
),
v => unreachable!("sync state: {}", v),
}
sync_state = SYNC_STATE.load(Ordering::SeqCst);
}
}
}

let jh = thread::spawn(move || {
struct RecvOnDrop(Cell<Option<mpsc::Receiver<()>>>);
thread_local! {
static TL_DROP: TlDrop = TlDrop;
}

impl Drop for RecvOnDrop {
fn drop(&mut self) {
let rx = self.0.take().unwrap();
while !THREAD2_LAUNCHED.load(Ordering::SeqCst) {
thread::yield_now();
TL_DROP.with(|_| {})
})
.unwrap();

let jh2 = thread::Builder::new()
.name("thread2".into())
.spawn(move || {
assert_eq!(SYNC_STATE.swap(THREAD2_LAUNCHED, Ordering::SeqCst), FRESH);
jh.join().unwrap();
match SYNC_STATE.swap(THREAD2_JOINED, Ordering::SeqCst) {
MAIN_THREAD_RENDEZVOUS => return,
THREAD2_LAUNCHED | THREAD1_WAITING => {
panic!("Thread 2 running after thread 1 join before main thread rendezvous")
}
rx.recv().unwrap();
v => unreachable!("sync state: {:?}", v),
}
})
.unwrap();

loop {
match SYNC_STATE.compare_exchange_weak(
THREAD1_WAITING,
MAIN_THREAD_RENDEZVOUS,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => break,
Err(FRESH) => thread::yield_now(),
Err(THREAD2_LAUNCHED) => thread::yield_now(),
Err(THREAD2_JOINED) => {
panic!("Main thread rendezvous after thread 2 joined thread 1")
}
v => unreachable!("sync state: {:?}", v),
}

thread_local! {
static TL_RX: RecvOnDrop = RecvOnDrop(Cell::new(None));
}

TL_RX.with(|v| v.0.set(Some(rx)))
});

let tx_clone = tx.clone();
let jh2 = thread::spawn(move || {
THREAD2_LAUNCHED.store(true, Ordering::SeqCst);
jh.join().unwrap();
tx_clone.send(()).expect_err(
"Expecting channel to be closed because thread 1 TLS destructors must've run",
);
});

while !THREAD2_LAUNCHED.load(Ordering::SeqCst) {
thread::yield_now();
}
thread::yield_now();
tx.send(()).expect("Expecting channel to be live because thread 2 must block on join");
jh2.join().unwrap();
}
}