Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 110 additions & 105 deletions src/concurrency/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use crate::*;
enum SchedulingAction {
/// Execute step on the active thread.
ExecuteStep,
/// Execute a timeout callback.
ExecuteTimeoutCallback,
/// Execute a callback on the active thread.
ExecuteCallback,
/// Wait for a bit, until there is a timeout to be called.
Sleep(Duration),
}
Expand Down Expand Up @@ -120,6 +120,9 @@ pub enum BlockReason {
enum ThreadState<'tcx> {
/// The thread is enabled and can be executed.
Enabled,
/// The thread is enabled, but there is an unblock callback that needs to
/// be executed first.
Unblocked { kind: UnblockKind, callback: DynUnblockCallback<'tcx> },
/// The thread is blocked on something.
Blocked { reason: BlockReason, timeout: Option<Timeout>, callback: DynUnblockCallback<'tcx> },
/// The thread has terminated its execution. We do not delete terminated
Expand All @@ -131,6 +134,8 @@ impl<'tcx> std::fmt::Debug for ThreadState<'tcx> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Enabled => write!(f, "Enabled"),
Self::Unblocked { kind, .. } =>
f.debug_struct("Unblocked").field("kind", kind).finish(),
Self::Blocked { reason, timeout, .. } =>
f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(),
Self::Terminated => write!(f, "Terminated"),
Expand Down Expand Up @@ -654,61 +659,54 @@ impl<'tcx> ThreadManager<'tcx> {
self.yield_active_thread = true;
}

/// Get the wait time for the next timeout, or `None` if no timeout is pending.
fn next_callback_wait_time(&self, clock: &MonotonicClock) -> Option<Duration> {
self.threads
.iter()
.filter_map(|t| {
match &t.state {
ThreadState::Blocked { timeout: Some(timeout), .. } =>
Some(timeout.get_wait_time(clock)),
_ => None,
/// Unblocks all blocked threads whose timeout has occurred. Returns the
/// wait time until the earliest non-elapsed timeout.
fn process_timeouts(&mut self, clock: &MonotonicClock) -> Option<Duration> {
let mut shortest_wait = None;
for thread in &mut self.threads {
match &thread.state {
ThreadState::Blocked { timeout: Some(timeout), .. } => {
let wait_time = timeout.get_wait_time(clock);
if wait_time == Duration::ZERO {
let ThreadState::Blocked { callback, .. } =
mem::replace(&mut thread.state, ThreadState::Enabled)
else {
unreachable!("just checked that the thread is blocked")
};
// The timeout has occurred. Unblock the thread.
thread.state =
ThreadState::Unblocked { kind: UnblockKind::TimedOut, callback };
} else {
shortest_wait = if shortest_wait.is_some_and(|t| t < wait_time) {
shortest_wait
} else {
Some(wait_time)
};
}
}
})
.min()
_ => {}
}
}

shortest_wait
}
}

impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {}
trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
/// Execute a timeout callback on the callback's thread.
/// Execute the callback scheduled for the currently active thread.
#[inline]
fn run_timeout_callback(&mut self) -> InterpResult<'tcx> {
fn run_callback(&mut self) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let mut found_callback = None;
// Find a blocked thread that has timed out.
for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() {
match &thread.state {
ThreadState::Blocked { timeout: Some(timeout), .. }
if timeout.get_wait_time(&this.machine.monotonic_clock) == Duration::ZERO =>
{
let old_state = mem::replace(&mut thread.state, ThreadState::Enabled);
let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() };
found_callback = Some((id, callback));
// Run the fallback (after the loop because borrow-checking).
break;
}
_ => {}
}
}
if let Some((thread, callback)) = found_callback {
// This back-and-forth with `set_active_thread` is here because of two
// design decisions:
// 1. Make the caller and not the callback responsible for changing
// thread.
// 2. Make the scheduler the only place that can change the active
// thread.
let old_thread = this.machine.threads.set_active_thread_id(thread);
callback.call(this, UnblockKind::TimedOut)?;
this.machine.threads.set_active_thread_id(old_thread);
}
// found_callback can remain None if the computer's clock
// was shifted after calling the scheduler and before the call
// to get_ready_callback (see issue
// https://github.com/rust-lang/miri/issues/1763). In this case,
// just do nothing, which effectively just returns to the
// scheduler.
interp_ok(())
let active_thread = this.active_thread_mut();

let ThreadState::Unblocked { kind, callback } =
mem::replace(&mut active_thread.state, ThreadState::Enabled)
else {
bug!("tried to execute a callback on a thread that didn't have a callback scheduled");
};

callback.call(this, kind)
}

#[inline]
Expand Down Expand Up @@ -768,29 +766,31 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
let thread_manager = &mut this.machine.threads;
let clock = &this.machine.monotonic_clock;
let rng = this.machine.rng.get_mut();
// This thread and the program can keep going.
if thread_manager.threads[thread_manager.active_thread].state.is_enabled()
&& !thread_manager.yield_active_thread
{
// The currently active thread is still enabled, just continue with it.
return interp_ok(SchedulingAction::ExecuteStep);
}
// The active thread yielded or got terminated. Let's see if there are any timeouts to take
// care of. We do this *before* running any other thread, to ensure that timeouts "in the
// past" fire before any other thread can take an action. This ensures that for
// `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by
// abstime has already been passed at the time of the call".
Comment on lines -781 to -782
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That has nothing to do with the timing of the callback – pthread_cond_timedwait is allowed to yield even if the time has passed already. The important thing is that the timeout checks happen before selecting the next thread, otherwise you could get a deadlock.

// <https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_cond_timedwait.html>
let potential_sleep_time = thread_manager.next_callback_wait_time(clock);
if potential_sleep_time == Some(Duration::ZERO) {
return interp_ok(SchedulingAction::ExecuteTimeoutCallback);

// If no reschedule was requested, try to continue with the currently active thread.
if !thread_manager.yield_active_thread {
match thread_manager.threads[thread_manager.active_thread].state {
// The currently active thread is still enabled, just continue with it.
ThreadState::Enabled => return interp_ok(SchedulingAction::ExecuteStep),
// The currently active thread is enabled, but a callback needs to be run first.
ThreadState::Unblocked { .. } =>
return interp_ok(SchedulingAction::ExecuteCallback),
_ => {}
}
}
// No callbacks immediately scheduled, pick a regular thread to execute.
// The active thread blocked or yielded. So we go search for another enabled thread.
// We build the list of threads by starting with the threads after the current one, followed by
// the threads before the current one and then the current thread itself (i.e., this iterator acts
// like `threads.rotate_left(self.active_thread.index() + 1)`. This ensures that if we pick the first
// eligible thread, we do regular round-robin scheduling, and all threads get a chance to take a step.

// The active thread is blocked or terminated. Let's see if there are
// any timeouts to take care of. This may unblock some threads, so we
// need to check before choosing the next thread to prevent deadlocks.
let potential_sleep_time = thread_manager.process_timeouts(clock);

// Search for another enabled thread. We build the list of threads by
// starting with the threads after the current one, followed by the
// threads before the current one and then the current thread itself
// (i.e., this iterator acts like
// `threads.rotate_left(self.active_thread.index() + 1)`.
// This ensures that if we pick the first eligible thread, we do regular
// round-robin scheduling, and all threads get a chance to take a step.
let mut threads_iter = thread_manager
.threads
.iter_enumerated()
Expand All @@ -801,15 +801,25 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
.iter_enumerated()
.take(thread_manager.active_thread.index() + 1),
)
.filter(|(_id, thread)| thread.state.is_enabled());
.filter_map(|(id, thread)| {
match thread.state {
ThreadState::Enabled => Some((id, SchedulingAction::ExecuteStep)),
ThreadState::Unblocked { .. } => Some((id, SchedulingAction::ExecuteCallback)),
_ => None,
}
});

// Pick a new thread, and switch to it.
let new_thread = if thread_manager.fixed_scheduling {
threads_iter.next()
} else {
threads_iter.choose(rng)
};

if let Some((id, _thread)) = new_thread {
// This completes the `yield`, if any was requested.
thread_manager.yield_active_thread = false;

if let Some((id, action)) = new_thread {
if thread_manager.active_thread != id {
info!(
"---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------",
Expand All @@ -818,23 +828,20 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> {
);
thread_manager.active_thread = id;
}
}
// This completes the `yield`, if any was requested.
thread_manager.yield_active_thread = false;

if thread_manager.threads[thread_manager.active_thread].state.is_enabled() {
return interp_ok(SchedulingAction::ExecuteStep);
}
// We have not found a thread to execute.
if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) {
unreachable!("all threads terminated without the main thread terminating?!");
} else if let Some(sleep_time) = potential_sleep_time {
// All threads are currently blocked, but we have unexecuted
// timeout_callbacks, which may unblock some of the threads. Hence,
// sleep until the first callback.
interp_ok(SchedulingAction::Sleep(sleep_time))
interp_ok(action)
} else {
throw_machine_stop!(TerminationInfo::GlobalDeadlock);
// We have not found a thread to execute.
if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) {
unreachable!("all threads terminated without the main thread terminating?!");
} else if let Some(sleep_time) = potential_sleep_time {
// All threads are currently blocked, but we have unexecuted
// timeout_callbacks, which may unblock some of the threads. Hence,
// sleep until the first callback.
interp_ok(SchedulingAction::Sleep(sleep_time))
} else {
throw_machine_stop!(TerminationInfo::GlobalDeadlock);
}
}
}
}
Expand Down Expand Up @@ -1068,22 +1075,20 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
/// Sanity-checks that the thread previously was blocked for the right reason.
fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> {
let this = self.eval_context_mut();
let old_state =
mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled);
let callback = match old_state {
ThreadState::Blocked { reason: actual_reason, callback, .. } => {
assert_eq!(
reason, actual_reason,
"unblock_thread: thread was blocked for the wrong reason"
);
callback
}
_ => panic!("unblock_thread: thread was not blocked"),
let state = &mut this.machine.threads.threads[thread].state;
let ThreadState::Blocked { reason: actual_reason, callback, .. } =
mem::replace(state, ThreadState::Enabled)
else {
panic!("unblock_thread: thread was not blocked")
};
// The callback must be executed in the previously blocked thread.
let old_thread = this.machine.threads.set_active_thread_id(thread);
callback.call(this, UnblockKind::Ready)?;
this.machine.threads.set_active_thread_id(old_thread);

assert_eq!(
reason, actual_reason,
"unblock_thread: thread was blocked for the wrong reason"
);

*state = ThreadState::Unblocked { kind: UnblockKind::Ready, callback };

interp_ok(())
}

Expand Down Expand Up @@ -1300,8 +1305,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
}
}
}
SchedulingAction::ExecuteTimeoutCallback => {
this.run_timeout_callback()?;
SchedulingAction::ExecuteCallback => {
this.run_callback()?;
}
SchedulingAction::Sleep(duration) => {
this.machine.monotonic_clock.sleep(duration);
Expand Down
5 changes: 5 additions & 0 deletions tests/pass-dep/concurrency/apple-futex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ fn wait_wake_multiple() {
0
);

// Allow the newly woken thread to execute.
thread::yield_now();

assert_eq!(
libc::os_sync_wake_by_address_any(
ptr::from_ref(futex).cast_mut().cast(),
Expand All @@ -197,6 +200,8 @@ fn wait_wake_multiple() {
0
);

thread::yield_now();

// Wake both remaining threads at the same time.
assert_eq!(
libc::os_sync_wake_by_address_all(
Expand Down
Loading