Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make Miri's scheduler proper round-robin #2197

Merged
merged 2 commits into from
Jun 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions src/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,16 +518,26 @@ impl<'mir, 'tcx: 'mir> ThreadManager<'mir, 'tcx> {
return Ok(SchedulingAction::ExecuteTimeoutCallback);
}
// No callbacks scheduled, pick a regular thread to execute.
// We need to pick a new thread for execution.
for (id, thread) in self.threads.iter_enumerated() {
// The active thread blocked or yielded. So we go search for another enabled thread.
// Curcially, we start searching at the current active thread ID, rather than at 0, since we
// want to avoid always scheduling threads 0 and 1 without ever making progress in thread 2.
//
// `skip(N)` means we start iterating at thread N, so we skip 1 more to start just *after*
// the active thread. Then after that we look at `take(N)`, i.e., the threads *before* the
// active thread.
let threads = self
.threads
.iter_enumerated()
.skip(self.active_thread.index() + 1)
.chain(self.threads.iter_enumerated().take(self.active_thread.index()));
for (id, thread) in threads {
debug_assert_ne!(self.active_thread, id);
if thread.state == ThreadState::Enabled {
if !self.yield_active_thread || id != self.active_thread {
self.active_thread = id;
if let Some(data_race) = data_race {
data_race.thread_set_active(self.active_thread);
}
break;
self.active_thread = id;
if let Some(data_race) = data_race {
data_race.thread_set_active(self.active_thread);
}
break;
}
}
self.yield_active_thread = false;
Expand Down
82 changes: 82 additions & 0 deletions tests/pass/concurrency/spin_loops.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// ignore-windows: Concurrency on Windows is not supported yet.

use std::thread;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::mpsc;
use std::cell::Cell;

/// When a thread yields, Miri's scheduler used to pick the thread with the lowest ID
/// that can run. IDs are assigned in thread creation order.
/// This means we could make 2 threads infinitely ping-pong with each other while
/// really there is a 3rd thread that we should schedule to make progress.
fn two_player_ping_pong() {
static FLAG: AtomicUsize = AtomicUsize::new(0);

let waiter1 = thread::spawn(|| {
while FLAG.load(Ordering::Acquire) == 0 {
// spin and wait
thread::yield_now();
}
});
let waiter2 = thread::spawn(|| {
while FLAG.load(Ordering::Acquire) == 0 {
// spin and wait
thread::yield_now();
}
});
let progress = thread::spawn(|| {
FLAG.store(1, Ordering::Release);
});
// The first `join` blocks the main thread and thus takes it out of the equation.
waiter1.join().unwrap();
waiter2.join().unwrap();
progress.join().unwrap();
}

/// Based on a test by @jethrogb.
fn launcher() {
static THREAD2_LAUNCHED: AtomicBool = AtomicBool::new(false);

for _ in 0..10 {
let (tx, rx) = mpsc::sync_channel(0);
THREAD2_LAUNCHED.store(false, Ordering::SeqCst);

let jh = thread::spawn(move || {
struct RecvOnDrop(Cell<Option<mpsc::Receiver<()>>>);

impl Drop for RecvOnDrop {
fn drop(&mut self) {
let rx = self.0.take().unwrap();
while !THREAD2_LAUNCHED.load(Ordering::SeqCst) {
thread::yield_now();
}
rx.recv().unwrap();
}
}

let tl_rx: RecvOnDrop = RecvOnDrop(Cell::new(None));
tl_rx.0.set(Some(rx));
});

let tx_clone = tx.clone();
let jh2 = thread::spawn(move || {
THREAD2_LAUNCHED.store(true, Ordering::SeqCst);
jh.join().unwrap();
tx_clone.send(()).expect_err(
"Expecting channel to be closed because thread 1 TLS destructors must've run",
);
});

while !THREAD2_LAUNCHED.load(Ordering::SeqCst) {
thread::yield_now();
}
thread::yield_now();
tx.send(()).expect("Expecting channel to be live because thread 2 must block on join");
jh2.join().unwrap();
}
}

fn main() {
two_player_ping_pong();
launcher();
}
2 changes: 2 additions & 0 deletions tests/pass/concurrency/spin_loops.stderr
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
warning: thread support is experimental and incomplete: weak memory effects are not emulated.