From 848482d2bb5761cd8fab3ef0dd92b8241e75e3d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=8F=E4=B8=80?= Date: Wed, 14 Jun 2023 18:42:31 +0800 Subject: [PATCH] rt(threaded): adjust `transition_from_parked` behavior after introducing `disable_lifo_slot` feature (#5753) --- .../runtime/scheduler/multi_thread/queue.rs | 6 ++--- .../runtime/scheduler/multi_thread/worker.rs | 23 ++++++++++++++----- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/tokio/src/runtime/scheduler/multi_thread/queue.rs b/tokio/src/runtime/scheduler/multi_thread/queue.rs index 6444df88b8a..dd66fa2dde1 100644 --- a/tokio/src/runtime/scheduler/multi_thread/queue.rs +++ b/tokio/src/runtime/scheduler/multi_thread/queue.rs @@ -105,9 +105,9 @@ pub(crate) fn local() -> (Steal, Local) { } impl Local { - /// Returns true if the queue has entries that can be stolen. - pub(crate) fn is_stealable(&self) -> bool { - !self.inner.is_empty() + /// Returns the number of entries in the queue + pub(crate) fn len(&self) -> usize { + self.inner.len() as usize } /// How many tasks can be pushed into the queue diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 47ff86a5c3b..7fc335f5165 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -718,9 +718,7 @@ impl Context { // Place `park` back in `core` core.park = Some(park); - // If there are tasks available to steal, but this worker is not - // looking for tasks to steal, notify another worker. - if !core.is_searching && core.run_queue.is_stealable() { + if core.should_notify_others() { self.worker.handle.notify_parked_local(); } @@ -846,12 +844,25 @@ impl Core { worker.handle.transition_worker_from_searching(); } + fn has_tasks(&self) -> bool { + self.lifo_slot.is_some() || self.run_queue.has_tasks() + } + + fn should_notify_others(&self) -> bool { + // If there are tasks available to steal, but this worker is not + // looking for tasks to steal, notify another worker. + if self.is_searching { + return false; + } + self.lifo_slot.is_some() as usize + self.run_queue.len() > 1 + } + /// Prepares the worker state for parking. /// /// Returns true if the transition happened, false if there is work to do first. fn transition_to_parked(&mut self, worker: &Worker) -> bool { // Workers should not park if they have work to do - if self.lifo_slot.is_some() || self.run_queue.has_tasks() || self.is_traced { + if self.has_tasks() || self.is_traced { return false; } @@ -877,9 +888,9 @@ impl Core { /// Returns `true` if the transition happened. fn transition_from_parked(&mut self, worker: &Worker) -> bool { - // If a task is in the lifo slot, then we must unpark regardless of + // If a task is in the lifo slot/run queue, then we must unpark regardless of // being notified - if self.lifo_slot.is_some() { + if self.has_tasks() { // When a worker wakes, it should only transition to the "searching" // state when the wake originates from another worker *or* a new task // is pushed. We do *not* want the worker to transition to "searching"