diff --git a/src/concurrency/thread.rs b/src/concurrency/thread.rs index 1c404d419e..17d6ededbb 100644 --- a/src/concurrency/thread.rs +++ b/src/concurrency/thread.rs @@ -25,8 +25,8 @@ use crate::*; enum SchedulingAction { /// Execute step on the active thread. ExecuteStep, - /// Execute a timeout callback. - ExecuteTimeoutCallback, + /// Execute a callback on the active thread. + ExecuteCallback, /// Wait for a bit, until there is a timeout to be called. Sleep(Duration), } @@ -120,6 +120,9 @@ pub enum BlockReason { enum ThreadState<'tcx> { /// The thread is enabled and can be executed. Enabled, + /// The thread is enabled, but there is an unblock callback that needs to + /// be executed first. + Unblocked { kind: UnblockKind, callback: DynUnblockCallback<'tcx> }, /// The thread is blocked on something. Blocked { reason: BlockReason, timeout: Option, callback: DynUnblockCallback<'tcx> }, /// The thread has terminated its execution. We do not delete terminated @@ -131,6 +134,8 @@ impl<'tcx> std::fmt::Debug for ThreadState<'tcx> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::Enabled => write!(f, "Enabled"), + Self::Unblocked { kind, .. } => + f.debug_struct("Unblocked").field("kind", kind).finish(), Self::Blocked { reason, timeout, .. } => f.debug_struct("Blocked").field("reason", reason).field("timeout", timeout).finish(), Self::Terminated => write!(f, "Terminated"), @@ -654,61 +659,54 @@ impl<'tcx> ThreadManager<'tcx> { self.yield_active_thread = true; } - /// Get the wait time for the next timeout, or `None` if no timeout is pending. - fn next_callback_wait_time(&self, clock: &MonotonicClock) -> Option { - self.threads - .iter() - .filter_map(|t| { - match &t.state { - ThreadState::Blocked { timeout: Some(timeout), .. } => - Some(timeout.get_wait_time(clock)), - _ => None, + /// Unblocks all blocked threads whose timeout has occurred. Returns the + /// wait time until the earliest non-elapsed timeout. + fn process_timeouts(&mut self, clock: &MonotonicClock) -> Option { + let mut shortest_wait = None; + for thread in &mut self.threads { + match &thread.state { + ThreadState::Blocked { timeout: Some(timeout), .. } => { + let wait_time = timeout.get_wait_time(clock); + if wait_time == Duration::ZERO { + let ThreadState::Blocked { callback, .. } = + mem::replace(&mut thread.state, ThreadState::Enabled) + else { + unreachable!("just checked that the thread is blocked") + }; + // The timeout has occurred. Unblock the thread. + thread.state = + ThreadState::Unblocked { kind: UnblockKind::TimedOut, callback }; + } else { + shortest_wait = if shortest_wait.is_some_and(|t| t < wait_time) { + shortest_wait + } else { + Some(wait_time) + }; + } } - }) - .min() + _ => {} + } + } + + shortest_wait } } impl<'tcx> EvalContextPrivExt<'tcx> for MiriInterpCx<'tcx> {} trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> { - /// Execute a timeout callback on the callback's thread. + /// Execute the callback scheduled for the currently active thread. #[inline] - fn run_timeout_callback(&mut self) -> InterpResult<'tcx> { + fn run_callback(&mut self) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - let mut found_callback = None; - // Find a blocked thread that has timed out. - for (id, thread) in this.machine.threads.threads.iter_enumerated_mut() { - match &thread.state { - ThreadState::Blocked { timeout: Some(timeout), .. } - if timeout.get_wait_time(&this.machine.monotonic_clock) == Duration::ZERO => - { - let old_state = mem::replace(&mut thread.state, ThreadState::Enabled); - let ThreadState::Blocked { callback, .. } = old_state else { unreachable!() }; - found_callback = Some((id, callback)); - // Run the fallback (after the loop because borrow-checking). - break; - } - _ => {} - } - } - if let Some((thread, callback)) = found_callback { - // This back-and-forth with `set_active_thread` is here because of two - // design decisions: - // 1. Make the caller and not the callback responsible for changing - // thread. - // 2. Make the scheduler the only place that can change the active - // thread. - let old_thread = this.machine.threads.set_active_thread_id(thread); - callback.call(this, UnblockKind::TimedOut)?; - this.machine.threads.set_active_thread_id(old_thread); - } - // found_callback can remain None if the computer's clock - // was shifted after calling the scheduler and before the call - // to get_ready_callback (see issue - // https://github.com/rust-lang/miri/issues/1763). In this case, - // just do nothing, which effectively just returns to the - // scheduler. - interp_ok(()) + let active_thread = this.active_thread_mut(); + + let ThreadState::Unblocked { kind, callback } = + mem::replace(&mut active_thread.state, ThreadState::Enabled) + else { + bug!("tried to execute a callback on a thread that didn't have a callback scheduled"); + }; + + callback.call(this, kind) } #[inline] @@ -768,29 +766,31 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> { let thread_manager = &mut this.machine.threads; let clock = &this.machine.monotonic_clock; let rng = this.machine.rng.get_mut(); - // This thread and the program can keep going. - if thread_manager.threads[thread_manager.active_thread].state.is_enabled() - && !thread_manager.yield_active_thread - { - // The currently active thread is still enabled, just continue with it. - return interp_ok(SchedulingAction::ExecuteStep); - } - // The active thread yielded or got terminated. Let's see if there are any timeouts to take - // care of. We do this *before* running any other thread, to ensure that timeouts "in the - // past" fire before any other thread can take an action. This ensures that for - // `pthread_cond_timedwait`, "an error is returned if [...] the absolute time specified by - // abstime has already been passed at the time of the call". - // - let potential_sleep_time = thread_manager.next_callback_wait_time(clock); - if potential_sleep_time == Some(Duration::ZERO) { - return interp_ok(SchedulingAction::ExecuteTimeoutCallback); + + // If no reschedule was requested, try to continue with the currently active thread. + if !thread_manager.yield_active_thread { + match thread_manager.threads[thread_manager.active_thread].state { + // The currently active thread is still enabled, just continue with it. + ThreadState::Enabled => return interp_ok(SchedulingAction::ExecuteStep), + // The currently active thread is enabled, but a callback needs to be run first. + ThreadState::Unblocked { .. } => + return interp_ok(SchedulingAction::ExecuteCallback), + _ => {} + } } - // No callbacks immediately scheduled, pick a regular thread to execute. - // The active thread blocked or yielded. So we go search for another enabled thread. - // We build the list of threads by starting with the threads after the current one, followed by - // the threads before the current one and then the current thread itself (i.e., this iterator acts - // like `threads.rotate_left(self.active_thread.index() + 1)`. This ensures that if we pick the first - // eligible thread, we do regular round-robin scheduling, and all threads get a chance to take a step. + + // The active thread is blocked or terminated. Let's see if there are + // any timeouts to take care of. This may unblock some threads, so we + // need to check before choosing the next thread to prevent deadlocks. + let potential_sleep_time = thread_manager.process_timeouts(clock); + + // Search for another enabled thread. We build the list of threads by + // starting with the threads after the current one, followed by the + // threads before the current one and then the current thread itself + // (i.e., this iterator acts like + // `threads.rotate_left(self.active_thread.index() + 1)`. + // This ensures that if we pick the first eligible thread, we do regular + // round-robin scheduling, and all threads get a chance to take a step. let mut threads_iter = thread_manager .threads .iter_enumerated() @@ -801,7 +801,14 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> { .iter_enumerated() .take(thread_manager.active_thread.index() + 1), ) - .filter(|(_id, thread)| thread.state.is_enabled()); + .filter_map(|(id, thread)| { + match thread.state { + ThreadState::Enabled => Some((id, SchedulingAction::ExecuteStep)), + ThreadState::Unblocked { .. } => Some((id, SchedulingAction::ExecuteCallback)), + _ => None, + } + }); + // Pick a new thread, and switch to it. let new_thread = if thread_manager.fixed_scheduling { threads_iter.next() @@ -809,7 +816,10 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> { threads_iter.choose(rng) }; - if let Some((id, _thread)) = new_thread { + // This completes the `yield`, if any was requested. + thread_manager.yield_active_thread = false; + + if let Some((id, action)) = new_thread { if thread_manager.active_thread != id { info!( "---------- Now executing on thread `{}` (previous: `{}`) ----------------------------------------", @@ -818,23 +828,20 @@ trait EvalContextPrivExt<'tcx>: MiriInterpCxExt<'tcx> { ); thread_manager.active_thread = id; } - } - // This completes the `yield`, if any was requested. - thread_manager.yield_active_thread = false; - if thread_manager.threads[thread_manager.active_thread].state.is_enabled() { - return interp_ok(SchedulingAction::ExecuteStep); - } - // We have not found a thread to execute. - if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) { - unreachable!("all threads terminated without the main thread terminating?!"); - } else if let Some(sleep_time) = potential_sleep_time { - // All threads are currently blocked, but we have unexecuted - // timeout_callbacks, which may unblock some of the threads. Hence, - // sleep until the first callback. - interp_ok(SchedulingAction::Sleep(sleep_time)) + interp_ok(action) } else { - throw_machine_stop!(TerminationInfo::GlobalDeadlock); + // We have not found a thread to execute. + if thread_manager.threads.iter().all(|thread| thread.state.is_terminated()) { + unreachable!("all threads terminated without the main thread terminating?!"); + } else if let Some(sleep_time) = potential_sleep_time { + // All threads are currently blocked, but we have unexecuted + // timeout_callbacks, which may unblock some of the threads. Hence, + // sleep until the first callback. + interp_ok(SchedulingAction::Sleep(sleep_time)) + } else { + throw_machine_stop!(TerminationInfo::GlobalDeadlock); + } } } } @@ -1068,22 +1075,20 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { /// Sanity-checks that the thread previously was blocked for the right reason. fn unblock_thread(&mut self, thread: ThreadId, reason: BlockReason) -> InterpResult<'tcx> { let this = self.eval_context_mut(); - let old_state = - mem::replace(&mut this.machine.threads.threads[thread].state, ThreadState::Enabled); - let callback = match old_state { - ThreadState::Blocked { reason: actual_reason, callback, .. } => { - assert_eq!( - reason, actual_reason, - "unblock_thread: thread was blocked for the wrong reason" - ); - callback - } - _ => panic!("unblock_thread: thread was not blocked"), + let state = &mut this.machine.threads.threads[thread].state; + let ThreadState::Blocked { reason: actual_reason, callback, .. } = + mem::replace(state, ThreadState::Enabled) + else { + panic!("unblock_thread: thread was not blocked") }; - // The callback must be executed in the previously blocked thread. - let old_thread = this.machine.threads.set_active_thread_id(thread); - callback.call(this, UnblockKind::Ready)?; - this.machine.threads.set_active_thread_id(old_thread); + + assert_eq!( + reason, actual_reason, + "unblock_thread: thread was blocked for the wrong reason" + ); + + *state = ThreadState::Unblocked { kind: UnblockKind::Ready, callback }; + interp_ok(()) } @@ -1300,8 +1305,8 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { } } } - SchedulingAction::ExecuteTimeoutCallback => { - this.run_timeout_callback()?; + SchedulingAction::ExecuteCallback => { + this.run_callback()?; } SchedulingAction::Sleep(duration) => { this.machine.monotonic_clock.sleep(duration); diff --git a/tests/pass-dep/concurrency/apple-futex.rs b/tests/pass-dep/concurrency/apple-futex.rs index a28f08c3bb..e1b3d28945 100644 --- a/tests/pass-dep/concurrency/apple-futex.rs +++ b/tests/pass-dep/concurrency/apple-futex.rs @@ -188,6 +188,9 @@ fn wait_wake_multiple() { 0 ); + // Allow the newly woken thread to execute. + thread::yield_now(); + assert_eq!( libc::os_sync_wake_by_address_any( ptr::from_ref(futex).cast_mut().cast(), @@ -197,6 +200,8 @@ fn wait_wake_multiple() { 0 ); + thread::yield_now(); + // Wake both remaining threads at the same time. assert_eq!( libc::os_sync_wake_by_address_all(