From 48ca0e96cfafee1823ea3d05892465d0d4aafb88 Mon Sep 17 00:00:00 2001 From: Mike Date: Mon, 24 Oct 2022 13:46:40 +0000 Subject: [PATCH] tick local executor (#6121) # Objective - #4466 broke local tasks running. - Fixes https://github.com/bevyengine/bevy/issues/6120 ## Solution - Add system for ticking local executors on main thread into bevy_core where the tasks pools are initialized. - Add ticking local executors into thread executors ## Changelog - tick all thread local executors in task pool. ## Notes - ~~Not 100% sure about this PR. Ticking the local executor for the main thread in scope feels a little kludgy as it requires users of bevy_tasks to be calling scope periodically for those tasks to make progress.~~ took this out in favor of a system that ticks the local executors. --- crates/bevy_core/Cargo.toml | 3 ++ crates/bevy_core/src/lib.rs | 51 ++++++++++++++++++++++++++++++ crates/bevy_tasks/src/lib.rs | 2 ++ crates/bevy_tasks/src/task_pool.rs | 33 ++++++++++++++++--- crates/bevy_tasks/src/usages.rs | 26 +++++++++++++++ 5 files changed, 111 insertions(+), 4 deletions(-) diff --git a/crates/bevy_core/Cargo.toml b/crates/bevy_core/Cargo.toml index 08bd7987c36f7..5c26103cb7a82 100644 --- a/crates/bevy_core/Cargo.toml +++ b/crates/bevy_core/Cargo.toml @@ -20,3 +20,6 @@ bevy_utils = { path = "../bevy_utils", version = "0.9.0-dev" } # other bytemuck = "1.5" + +[dev-dependencies] +crossbeam-channel = "0.5.0" diff --git a/crates/bevy_core/src/lib.rs b/crates/bevy_core/src/lib.rs index 58ece9d81f111..89ff2ca9deb01 100644 --- a/crates/bevy_core/src/lib.rs +++ b/crates/bevy_core/src/lib.rs @@ -22,6 +22,11 @@ use bevy_utils::{Duration, HashSet, Instant}; use std::borrow::Cow; use std::ops::Range; +#[cfg(not(target_arch = "wasm32"))] +use bevy_ecs::schedule::IntoSystemDescriptor; +#[cfg(not(target_arch = "wasm32"))] +use bevy_tasks::tick_global_task_pools_on_main_thread; + /// Adds core functionality to Apps. #[derive(Default)] pub struct CorePlugin; @@ -35,6 +40,13 @@ impl Plugin for CorePlugin { .unwrap_or_default() .create_default_pools(); + #[cfg(not(target_arch = "wasm32"))] + app.add_system_to_stage( + bevy_app::CoreStage::Last, + tick_global_task_pools_on_main_thread.at_end(), + ); + + app.register_type::().register_type::(); app.register_type::() .register_type::() .register_type::>() @@ -97,3 +109,42 @@ fn register_math_types(app: &mut App) { /// Wraps to 0 when it reaches the maximum u32 value #[derive(Default, Resource, Clone, Copy)] pub struct FrameCount(pub u32); + +#[cfg(test)] +mod tests { + use super::*; + use bevy_tasks::prelude::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; + + #[test] + fn runs_spawn_local_tasks() { + let mut app = App::new(); + app.add_plugin(CorePlugin); + + let (async_tx, async_rx) = crossbeam_channel::unbounded(); + AsyncComputeTaskPool::get() + .spawn_local(async move { + async_tx.send(()).unwrap(); + }) + .detach(); + + let (compute_tx, compute_rx) = crossbeam_channel::unbounded(); + ComputeTaskPool::get() + .spawn_local(async move { + compute_tx.send(()).unwrap(); + }) + .detach(); + + let (io_tx, io_rx) = crossbeam_channel::unbounded(); + IoTaskPool::get() + .spawn_local(async move { + io_tx.send(()).unwrap(); + }) + .detach(); + + app.run(); + + async_rx.try_recv().unwrap(); + compute_rx.try_recv().unwrap(); + io_rx.try_recv().unwrap(); + } +} diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index 62c42a7717868..802f6c267b7cf 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -18,6 +18,8 @@ mod single_threaded_task_pool; pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder}; mod usages; +#[cfg(not(target_arch = "wasm32"))] +pub use usages::tick_global_task_pools_on_main_thread; pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool}; mod iter; diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 4165f3a941cc1..1ec2c9d4c514d 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -8,7 +8,7 @@ use std::{ }; use concurrent_queue::ConcurrentQueue; -use futures_lite::{future, pin}; +use futures_lite::{future, pin, FutureExt}; use crate::Task; @@ -117,9 +117,16 @@ impl TaskPool { thread_builder .spawn(move || { - let shutdown_future = ex.run(shutdown_rx.recv()); - // Use unwrap_err because we expect a Closed error - future::block_on(shutdown_future).unwrap_err(); + TaskPool::LOCAL_EXECUTOR.with(|local_executor| { + let tick_forever = async move { + loop { + local_executor.tick().await; + } + }; + let shutdown_future = ex.run(tick_forever.or(shutdown_rx.recv())); + // Use unwrap_err because we expect a Closed error + future::block_on(shutdown_future).unwrap_err(); + }); }) .expect("Failed to spawn thread.") }) @@ -314,6 +321,24 @@ impl TaskPool { { Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future))) } + + /// Runs a function with the local executor. Typically used to tick + /// the local executor on the main thread as it needs to share time with + /// other things. + /// + /// ```rust + /// use bevy_tasks::TaskPool; + /// + /// TaskPool::new().with_local_executor(|local_executor| { + /// local_executor.try_tick(); + /// }); + /// ``` + pub fn with_local_executor(&self, f: F) -> R + where + F: FnOnce(&async_executor::LocalExecutor) -> R, + { + Self::LOCAL_EXECUTOR.with(f) + } } impl Default for TaskPool { diff --git a/crates/bevy_tasks/src/usages.rs b/crates/bevy_tasks/src/usages.rs index 419d842f47168..1d0c83b271c2f 100644 --- a/crates/bevy_tasks/src/usages.rs +++ b/crates/bevy_tasks/src/usages.rs @@ -109,3 +109,29 @@ impl Deref for IoTaskPool { &self.0 } } + +/// Used by `bevy_core` to tick the global tasks pools on the main thread. +/// This will run a maximum of 100 local tasks per executor per call to this function. +#[cfg(not(target_arch = "wasm32"))] +pub fn tick_global_task_pools_on_main_thread() { + COMPUTE_TASK_POOL + .get() + .unwrap() + .with_local_executor(|compute_local_executor| { + ASYNC_COMPUTE_TASK_POOL + .get() + .unwrap() + .with_local_executor(|async_local_executor| { + IO_TASK_POOL + .get() + .unwrap() + .with_local_executor(|io_local_executor| { + for _ in 0..100 { + compute_local_executor.try_tick(); + async_local_executor.try_tick(); + io_local_executor.try_tick(); + } + }); + }); + }); +}