From 1c96d9b4555794b7a258a70f1dc90809b1850703 Mon Sep 17 00:00:00 2001 From: Mike Date: Thu, 19 Jan 2023 23:45:46 +0000 Subject: [PATCH] Pipelined Rendering (#6503) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Objective - Implement pipelined rendering - Fixes #5082 - Fixes #4718 ## User Facing Description Bevy now implements piplelined rendering! Pipelined rendering allows the app logic and rendering logic to run on different threads leading to large gains in performance. ![image](https://user-images.githubusercontent.com/2180432/202049871-3c00b801-58ab-448f-93fd-471e30aba55f.png) *tracy capture of many_foxes example* To use pipelined rendering, you just need to add the `PipelinedRenderingPlugin`. If you're using `DefaultPlugins` then it will automatically be added for you on all platforms except wasm. Bevy does not currently support multithreading on wasm which is needed for this feature to work. If you aren't using `DefaultPlugins` you can add the plugin manually. ```rust use bevy::prelude::*; use bevy::render::pipelined_rendering::PipelinedRenderingPlugin; fn main() { App::new() // whatever other plugins you need .add_plugin(RenderPlugin) // needs to be added after RenderPlugin .add_plugin(PipelinedRenderingPlugin) .run(); } ``` If for some reason pipelined rendering needs to be removed. You can also disable the plugin the normal way. ```rust use bevy::prelude::*; use bevy::render::pipelined_rendering::PipelinedRenderingPlugin; fn main() { App::new.add_plugins(DefaultPlugins.build().disable::()); } ``` ### A setup function was added to plugins A optional plugin lifecycle function was added to the `Plugin trait`. This function is called after all plugins have been built, but before the app runner is called. This allows for some final setup to be done. In the case of pipelined rendering, the function removes the sub app from the main app and sends it to the render thread. ```rust struct MyPlugin; impl Plugin for MyPlugin { fn build(&self, app: &mut App) { } // optional function fn setup(&self, app: &mut App) { // do some final setup before runner is called } } ``` ### A Stage for Frame Pacing In the `RenderExtractApp` there is a stage labelled `BeforeIoAfterRenderStart` that systems can be added to. The specific use case for this stage is for a frame pacing system that can delay the start of main app processing in render bound apps to reduce input latency i.e. "frame pacing". This is not currently built into bevy, but exists as `bevy` ```text |-------------------------------------------------------------------| | | BeforeIoAfterRenderStart | winit events | main schedule | | extract |---------------------------------------------------------| | | extract commands | rendering schedule | |-------------------------------------------------------------------| ``` ### Small API additions * `Schedule::remove_stage` * `App::insert_sub_app` * `App::remove_sub_app` * `TaskPool::scope_with_executor` ## Problems and Solutions ### Moving render app to another thread Most of the hard bits for this were done with the render redo. This PR just sends the render app back and forth through channels which seems to work ok. I originally experimented with using a scope to run the render task. It was cuter, but that approach didn't allow render to start before i/o processing. So I switched to using channels. There is much complexity in the coordination that needs to be done, but it's worth it. By moving rendering during i/o processing the frame times should be much more consistent in render bound apps. See https://github.com/bevyengine/bevy/issues/4691. ### Unsoundness with Sending World with NonSend resources Dropping !Send things on threads other than the thread they were spawned on is considered unsound. The render world doesn't have any nonsend resources. So if we tell the users to "pretty please don't spawn nonsend resource on the render world", we can avoid this problem. More seriously there is this https://github.com/bevyengine/bevy/pull/6534 pr, which patches the unsoundness by aborting the app if a nonsend resource is dropped on the wrong thread. ~~That PR should probably be merged before this one.~~ For a longer term solution we have this discussion going https://github.com/bevyengine/bevy/discussions/6552. ### NonSend Systems in render world The render world doesn't have any !Send resources, but it does have a non send system. While Window is Send, winit does have some API's that can only be accessed on the main thread. `prepare_windows` in the render schedule thus needs to be scheduled on the main thread. Currently we run nonsend systems by running them on the thread the TaskPool::scope runs on. When we move render to another thread this no longer works. To fix this, a new `scope_with_executor` method was added that takes a optional `TheadExecutor` that can only be ticked on the thread it was initialized on. The render world then holds a `MainThreadExecutor` resource which can be passed to the scope in the parallel executor that it uses to spawn it's non send systems on. ### Scopes executors between render and main should not share tasks Since the render world and the app world share the `ComputeTaskPool`. Because `scope` has executors for the ComputeTaskPool a system from the main world could run on the render thread or a render system could run on the main thread. This can cause performance problems because it can delay a stage from finishing. See https://github.com/bevyengine/bevy/pull/6503#issuecomment-1309791442 for more details. To avoid this problem, `TaskPool::scope` has been changed to not tick the ComputeTaskPool when it's used by the parallel executor. In the future when we move closer to the 1 thread to 1 logical core model we may want to overprovide threads, because the render and main app threads don't do much when executing the schedule. ## Performance My machine is Windows 11, AMD Ryzen 5600x, RX 6600 ### Examples #### This PR with pipelining vs Main > Note that these were run on an older version of main and the performance profile has probably changed due to optimizations Seeing a perf gain from 29% on many lights to 7% on many sprites.   | percent |   |   | Diff |   |   | Main |   |   | PR |   |   -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- tracy frame time | mean | median | sigma | mean | median | sigma | mean | median | sigma | mean | median | sigma many foxes | 27.01% | 27.34% | -47.09% | 1.58 | 1.55 | -1.78 | 5.85 | 5.67 | 3.78 | 4.27 | 4.12 | 5.56 many lights | 29.35% | 29.94% | -10.84% | 3.02 | 3.03 | -0.57 | 10.29 | 10.12 | 5.26 | 7.27 | 7.09 | 5.83 many animated sprites | 13.97% | 15.69% | 14.20% | 3.79 | 4.17 | 1.41 | 27.12 | 26.57 | 9.93 | 23.33 | 22.4 | 8.52 3d scene | 25.79% | 26.78% | 7.46% | 0.49 | 0.49 | 0.15 | 1.9 | 1.83 | 2.01 | 1.41 | 1.34 | 1.86 many cubes | 11.97% | 11.28% | 14.51% | 1.93 | 1.78 | 1.31 | 16.13 | 15.78 | 9.03 | 14.2 | 14 | 7.72 many sprites | 7.14% | 9.42% | -85.42% | 1.72 | 2.23 | -6.15 | 24.09 | 23.68 | 7.2 | 22.37 | 21.45 | 13.35 #### This PR with pipelining disabled vs Main Mostly regressions here. I don't think this should be a problem as users that are disabling pipelined rendering are probably running single threaded and not using the parallel executor. The regression is probably mostly due to the switch to use `async_executor::run` instead of `try_tick` and also having one less thread to run systems on. I'll do a writeup on why switching to `run` causes regressions, so we can try to eventually fix it. Using try_tick causes issues when pipeline rendering is enable as seen [here](https://github.com/bevyengine/bevy/pull/6503#issuecomment-1380803518)   | percent |   |   | Diff |   |   | Main |   |   | PR no pipelining |   |   -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- | -- tracy frame time | mean | median | sigma | mean | median | sigma | mean | median | sigma | mean | median | sigma many foxes | -3.72% | -4.42% | -1.07% | -0.21 | -0.24 | -0.04 | 5.64 | 5.43 | 3.74 | 5.85 | 5.67 | 3.78 many lights | 0.29% | -0.30% | 4.75% | 0.03 | -0.03 | 0.25 | 10.29 | 10.12 | 5.26 | 10.26 | 10.15 | 5.01 many animated sprites | 0.22% | 1.81% | -2.72% | 0.06 | 0.48 | -0.27 | 27.12 | 26.57 | 9.93 | 27.06 | 26.09 | 10.2 3d scene | -15.79% | -14.75% | -31.34% | -0.3 | -0.27 | -0.63 | 1.9 | 1.83 | 2.01 | 2.2 | 2.1 | 2.64 many cubes | -2.85% | -3.30% | 0.00% | -0.46 | -0.52 | 0 | 16.13 | 15.78 | 9.03 | 16.59 | 16.3 | 9.03 many sprites | 2.49% | 2.41% | 0.69% | 0.6 | 0.57 | 0.05 | 24.09 | 23.68 | 7.2 | 23.49 | 23.11 | 7.15 ### Benchmarks Mostly the same except empty_systems has got a touch slower. The maybe_pipelining+1 column has the compute task pool with an extra thread over default added. This is because pipelining loses one thread over main to execute systems on, since the main thread no longer runs normal systems.
Click Me ```text group main maybe-pipelining+1 ----- ------------------------- ------------------ busy_systems/01x_entities_03_systems 1.07 30.7±1.32µs ? ?/sec 1.00 28.6±1.35µs ? ?/sec busy_systems/01x_entities_06_systems 1.10 52.1±1.10µs ? ?/sec 1.00 47.2±1.08µs ? ?/sec busy_systems/01x_entities_09_systems 1.00 74.6±1.36µs ? ?/sec 1.00 75.0±1.93µs ? ?/sec busy_systems/01x_entities_12_systems 1.03 100.6±6.68µs ? ?/sec 1.00 98.0±1.46µs ? ?/sec busy_systems/01x_entities_15_systems 1.11 128.5±3.53µs ? ?/sec 1.00 115.5±1.02µs ? ?/sec busy_systems/02x_entities_03_systems 1.16 50.4±2.56µs ? ?/sec 1.00 43.5±3.00µs ? ?/sec busy_systems/02x_entities_06_systems 1.00 87.1±1.27µs ? ?/sec 1.05 91.5±7.15µs ? ?/sec busy_systems/02x_entities_09_systems 1.04 139.9±6.37µs ? ?/sec 1.00 134.0±1.06µs ? ?/sec busy_systems/02x_entities_12_systems 1.05 179.2±3.47µs ? ?/sec 1.00 170.1±3.17µs ? ?/sec busy_systems/02x_entities_15_systems 1.01 219.6±3.75µs ? ?/sec 1.00 218.1±2.55µs ? ?/sec busy_systems/03x_entities_03_systems 1.10 70.6±2.33µs ? ?/sec 1.00 64.3±0.69µs ? ?/sec busy_systems/03x_entities_06_systems 1.02 130.2±3.11µs ? ?/sec 1.00 128.0±1.34µs ? ?/sec busy_systems/03x_entities_09_systems 1.00 195.0±10.11µs ? ?/sec 1.00 194.8±1.41µs ? ?/sec busy_systems/03x_entities_12_systems 1.01 261.7±4.05µs ? ?/sec 1.00 259.8±4.11µs ? ?/sec busy_systems/03x_entities_15_systems 1.00 318.0±3.04µs ? ?/sec 1.06 338.3±20.25µs ? ?/sec busy_systems/04x_entities_03_systems 1.00 82.9±0.63µs ? ?/sec 1.02 84.3±0.63µs ? ?/sec busy_systems/04x_entities_06_systems 1.01 181.7±3.65µs ? ?/sec 1.00 179.8±1.76µs ? ?/sec busy_systems/04x_entities_09_systems 1.04 265.0±4.68µs ? ?/sec 1.00 255.3±1.98µs ? ?/sec busy_systems/04x_entities_12_systems 1.00 335.9±3.00µs ? ?/sec 1.05 352.6±15.84µs ? ?/sec busy_systems/04x_entities_15_systems 1.00 418.6±10.26µs ? ?/sec 1.08 450.2±39.58µs ? ?/sec busy_systems/05x_entities_03_systems 1.07 114.3±0.95µs ? ?/sec 1.00 106.9±1.52µs ? ?/sec busy_systems/05x_entities_06_systems 1.08 229.8±2.90µs ? ?/sec 1.00 212.3±4.18µs ? ?/sec busy_systems/05x_entities_09_systems 1.03 329.3±1.99µs ? ?/sec 1.00 319.2±2.43µs ? ?/sec busy_systems/05x_entities_12_systems 1.06 454.7±6.77µs ? ?/sec 1.00 430.1±3.58µs ? ?/sec busy_systems/05x_entities_15_systems 1.03 554.6±6.15µs ? ?/sec 1.00 538.4±23.87µs ? ?/sec contrived/01x_entities_03_systems 1.00 14.0±0.15µs ? ?/sec 1.08 15.1±0.21µs ? ?/sec contrived/01x_entities_06_systems 1.04 28.5±0.37µs ? ?/sec 1.00 27.4±0.44µs ? ?/sec contrived/01x_entities_09_systems 1.00 41.5±4.38µs ? ?/sec 1.02 42.2±2.24µs ? ?/sec contrived/01x_entities_12_systems 1.06 55.9±1.49µs ? ?/sec 1.00 52.6±1.36µs ? ?/sec contrived/01x_entities_15_systems 1.02 68.0±2.00µs ? ?/sec 1.00 66.5±0.78µs ? ?/sec contrived/02x_entities_03_systems 1.03 25.2±0.38µs ? ?/sec 1.00 24.6±0.52µs ? ?/sec contrived/02x_entities_06_systems 1.00 46.3±0.49µs ? ?/sec 1.04 48.1±4.13µs ? ?/sec contrived/02x_entities_09_systems 1.02 70.4±0.99µs ? ?/sec 1.00 68.8±1.04µs ? ?/sec contrived/02x_entities_12_systems 1.06 96.8±1.49µs ? ?/sec 1.00 91.5±0.93µs ? ?/sec contrived/02x_entities_15_systems 1.02 116.2±0.95µs ? ?/sec 1.00 114.2±1.42µs ? ?/sec contrived/03x_entities_03_systems 1.00 33.2±0.38µs ? ?/sec 1.01 33.6±0.45µs ? ?/sec contrived/03x_entities_06_systems 1.00 62.4±0.73µs ? ?/sec 1.01 63.3±1.05µs ? ?/sec contrived/03x_entities_09_systems 1.02 96.4±0.85µs ? ?/sec 1.00 94.8±3.02µs ? ?/sec contrived/03x_entities_12_systems 1.01 126.3±4.67µs ? ?/sec 1.00 125.6±2.27µs ? ?/sec contrived/03x_entities_15_systems 1.03 160.2±9.37µs ? ?/sec 1.00 156.0±1.53µs ? ?/sec contrived/04x_entities_03_systems 1.02 41.4±3.39µs ? ?/sec 1.00 40.5±0.52µs ? ?/sec contrived/04x_entities_06_systems 1.00 78.9±1.61µs ? ?/sec 1.02 80.3±1.06µs ? ?/sec contrived/04x_entities_09_systems 1.02 121.8±3.97µs ? ?/sec 1.00 119.2±1.46µs ? ?/sec contrived/04x_entities_12_systems 1.00 157.8±1.48µs ? ?/sec 1.01 160.1±1.72µs ? ?/sec contrived/04x_entities_15_systems 1.00 197.9±1.47µs ? ?/sec 1.08 214.2±34.61µs ? ?/sec contrived/05x_entities_03_systems 1.00 49.1±0.33µs ? ?/sec 1.01 49.7±0.75µs ? ?/sec contrived/05x_entities_06_systems 1.00 95.0±0.93µs ? ?/sec 1.00 94.6±0.94µs ? ?/sec contrived/05x_entities_09_systems 1.01 143.2±1.68µs ? ?/sec 1.00 142.2±2.00µs ? ?/sec contrived/05x_entities_12_systems 1.00 191.8±2.03µs ? ?/sec 1.01 192.7±7.88µs ? ?/sec contrived/05x_entities_15_systems 1.02 239.7±3.71µs ? ?/sec 1.00 235.8±4.11µs ? ?/sec empty_systems/000_systems 1.01 47.8±0.67ns ? ?/sec 1.00 47.5±2.02ns ? ?/sec empty_systems/001_systems 1.00 1743.2±126.14ns ? ?/sec 1.01 1761.1±70.10ns ? ?/sec empty_systems/002_systems 1.01 2.2±0.04µs ? ?/sec 1.00 2.2±0.02µs ? ?/sec empty_systems/003_systems 1.02 2.7±0.09µs ? ?/sec 1.00 2.7±0.16µs ? ?/sec empty_systems/004_systems 1.00 3.1±0.11µs ? ?/sec 1.00 3.1±0.24µs ? ?/sec empty_systems/005_systems 1.00 3.5±0.05µs ? ?/sec 1.11 3.9±0.70µs ? ?/sec empty_systems/010_systems 1.00 5.5±0.12µs ? ?/sec 1.03 5.7±0.17µs ? ?/sec empty_systems/015_systems 1.00 7.9±0.19µs ? ?/sec 1.06 8.4±0.16µs ? ?/sec empty_systems/020_systems 1.00 10.4±1.25µs ? ?/sec 1.02 10.6±0.18µs ? ?/sec empty_systems/025_systems 1.00 12.4±0.39µs ? ?/sec 1.14 14.1±1.07µs ? ?/sec empty_systems/030_systems 1.00 15.1±0.39µs ? ?/sec 1.05 15.8±0.62µs ? ?/sec empty_systems/035_systems 1.00 16.9±0.47µs ? ?/sec 1.07 18.0±0.37µs ? ?/sec empty_systems/040_systems 1.00 19.3±0.41µs ? ?/sec 1.05 20.3±0.39µs ? ?/sec empty_systems/045_systems 1.00 22.4±1.67µs ? ?/sec 1.02 22.9±0.51µs ? ?/sec empty_systems/050_systems 1.00 24.4±1.67µs ? ?/sec 1.01 24.7±0.40µs ? ?/sec empty_systems/055_systems 1.05 28.6±5.27µs ? ?/sec 1.00 27.2±0.70µs ? ?/sec empty_systems/060_systems 1.02 29.9±1.64µs ? ?/sec 1.00 29.3±0.66µs ? ?/sec empty_systems/065_systems 1.02 32.7±3.15µs ? ?/sec 1.00 32.1±0.98µs ? ?/sec empty_systems/070_systems 1.00 33.0±1.42µs ? ?/sec 1.03 34.1±1.44µs ? ?/sec empty_systems/075_systems 1.00 34.8±0.89µs ? ?/sec 1.04 36.2±0.70µs ? ?/sec empty_systems/080_systems 1.00 37.0±1.82µs ? ?/sec 1.05 38.7±1.37µs ? ?/sec empty_systems/085_systems 1.00 38.7±0.76µs ? ?/sec 1.05 40.8±0.83µs ? ?/sec empty_systems/090_systems 1.00 41.5±1.09µs ? ?/sec 1.04 43.2±0.82µs ? ?/sec empty_systems/095_systems 1.00 43.6±1.10µs ? ?/sec 1.04 45.2±0.99µs ? ?/sec empty_systems/100_systems 1.00 46.7±2.27µs ? ?/sec 1.03 48.1±1.25µs ? ?/sec ```
## Migration Guide ### App `runner` and SubApp `extract` functions are now required to be Send This was changed to enable pipelined rendering. If this breaks your use case please report it as these new bounds might be able to be relaxed. ## ToDo * [x] redo benchmarking * [x] reinvestigate the perf of the try_tick -> run change for task pool scope --- crates/bevy_app/src/app.rs | 85 ++++++-- .../src/schedule/executor_parallel.rs | 84 +++++--- crates/bevy_internal/src/default_plugins.rs | 6 + crates/bevy_render/Cargo.toml | 2 + crates/bevy_render/src/lib.rs | 24 ++- crates/bevy_render/src/pipelined_rendering.rs | 155 ++++++++++++++ crates/bevy_tasks/src/lib.rs | 2 +- .../src/single_threaded_task_pool.rs | 32 +++ crates/bevy_tasks/src/task_pool.rs | 197 +++++++++++++----- 9 files changed, 488 insertions(+), 99 deletions(-) create mode 100644 crates/bevy_render/src/pipelined_rendering.rs diff --git a/crates/bevy_app/src/app.rs b/crates/bevy_app/src/app.rs index a8db6bfc042f16..66f38cbd44ee22 100644 --- a/crates/bevy_app/src/app.rs +++ b/crates/bevy_app/src/app.rs @@ -67,7 +67,7 @@ pub struct App { /// the application's event loop and advancing the [`Schedule`]. /// Typically, it is not configured manually, but set by one of Bevy's built-in plugins. /// See `bevy::winit::WinitPlugin` and [`ScheduleRunnerPlugin`](crate::schedule_runner::ScheduleRunnerPlugin). - pub runner: Box, + pub runner: Box, // Send bound is required to make App Send /// A container of [`Stage`]s set to be run in a linear order. pub schedule: Schedule, sub_apps: HashMap, @@ -87,10 +87,55 @@ impl Debug for App { } } -/// Each `SubApp` has its own [`Schedule`] and [`World`], enabling a separation of concerns. -struct SubApp { - app: App, - extract: Box, +/// A [`SubApp`] contains its own [`Schedule`] and [`World`] separate from the main [`App`]. +/// This is useful for situations where data and data processing should be kept completely separate +/// from the main application. The primary use of this feature in bevy is to enable pipelined rendering. +/// +/// # Example +/// +/// ```rust +/// # use bevy_app::{App, AppLabel}; +/// # use bevy_ecs::prelude::*; +/// +/// #[derive(Resource, Default)] +/// struct Val(pub i32); +/// +/// #[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, AppLabel)] +/// struct ExampleApp; +/// +/// #[derive(Debug, Hash, PartialEq, Eq, Clone, StageLabel)] +/// struct ExampleStage; +/// +/// let mut app = App::empty(); +/// // initialize the main app with a value of 0; +/// app.insert_resource(Val(10)); +/// +/// // create a app with a resource and a single stage +/// let mut sub_app = App::empty(); +/// sub_app.insert_resource(Val(100)); +/// let mut example_stage = SystemStage::single_threaded(); +/// example_stage.add_system(|counter: Res| { +/// // since we assigned the value from the main world in extract +/// // we see that value instead of 100 +/// assert_eq!(counter.0, 10); +/// }); +/// sub_app.add_stage(ExampleStage, example_stage); +/// +/// // add the sub_app to the app +/// app.add_sub_app(ExampleApp, sub_app, |main_world, sub_app| { +/// sub_app.world.resource_mut::().0 = main_world.resource::().0; +/// }); +/// +/// // This will run the schedules once, since we're using the default runner +/// app.run(); +/// ``` +pub struct SubApp { + /// The [`SubApp`]'s instance of [`App`] + pub app: App, + + /// A function that allows access to both the [`SubApp`] [`World`] and the main [`App`]. This is + /// useful for moving data between the sub app and the main app. + pub extract: Box, } impl SubApp { @@ -161,11 +206,14 @@ impl App { /// /// See [`add_sub_app`](Self::add_sub_app) and [`run_once`](Schedule::run_once) for more details. pub fn update(&mut self) { - #[cfg(feature = "trace")] - let _bevy_frame_update_span = info_span!("frame").entered(); - self.schedule.run(&mut self.world); - - for sub_app in self.sub_apps.values_mut() { + { + #[cfg(feature = "trace")] + let _bevy_frame_update_span = info_span!("main app").entered(); + self.schedule.run(&mut self.world); + } + for (_label, sub_app) in self.sub_apps.iter_mut() { + #[cfg(feature = "trace")] + let _sub_app_span = info_span!("sub app", name = ?_label).entered(); sub_app.extract(&mut self.world); sub_app.run(); } @@ -850,7 +898,7 @@ impl App { /// App::new() /// .set_runner(my_runner); /// ``` - pub fn set_runner(&mut self, run_fn: impl Fn(App) + 'static) -> &mut Self { + pub fn set_runner(&mut self, run_fn: impl Fn(App) + 'static + Send) -> &mut Self { self.runner = Box::new(run_fn); self } @@ -1035,14 +1083,15 @@ impl App { /// Adds an [`App`] as a child of the current one. /// - /// The provided function `sub_app_runner` is called by the [`update`](Self::update) method. The [`World`] + /// The provided function `extract` is normally called by the [`update`](Self::update) method. + /// After extract is called, the [`Schedule`] of the sub app is run. The [`World`] /// parameter represents the main app world, while the [`App`] parameter is just a mutable /// reference to the `SubApp` itself. pub fn add_sub_app( &mut self, label: impl AppLabel, app: App, - extract: impl Fn(&mut World, &mut App) + 'static, + extract: impl Fn(&mut World, &mut App) + 'static + Send, ) -> &mut Self { self.sub_apps.insert( label.as_label(), @@ -1088,6 +1137,16 @@ impl App { } } + /// Inserts an existing sub app into the app + pub fn insert_sub_app(&mut self, label: impl AppLabel, sub_app: SubApp) { + self.sub_apps.insert(label.as_label(), sub_app); + } + + /// Removes a sub app from the app. Returns [`None`] if the label doesn't exist. + pub fn remove_sub_app(&mut self, label: impl AppLabel) -> Option { + self.sub_apps.remove(&label.as_label()) + } + /// Retrieves a `SubApp` inside this [`App`] with the given label, if it exists. Otherwise returns /// an [`Err`] containing the given label. pub fn get_sub_app(&self, label: impl AppLabel) -> Result<&App, impl AppLabel> { diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 68dd1f1ea798dc..0db9627633ba1e 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -1,11 +1,15 @@ +use std::sync::Arc; + +use crate as bevy_ecs; use crate::{ archetype::ArchetypeComponentId, query::Access, schedule::{ParallelSystemExecutor, SystemContainer}, + system::Resource, world::World, }; use async_channel::{Receiver, Sender}; -use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; +use bevy_tasks::{ComputeTaskPool, Scope, TaskPool, ThreadExecutor}; #[cfg(feature = "trace")] use bevy_utils::tracing::Instrument; use event_listener::Event; @@ -14,6 +18,16 @@ use fixedbitset::FixedBitSet; #[cfg(test)] use scheduling_event::*; +/// New-typed [`ThreadExecutor`] [`Resource`] that is used to run systems on the main thread +#[derive(Resource, Default, Clone)] +pub struct MainThreadExecutor(pub Arc>); + +impl MainThreadExecutor { + pub fn new() -> Self { + MainThreadExecutor(Arc::new(ThreadExecutor::new())) + } +} + struct SystemSchedulingMetadata { /// Used to signal the system's task to start the system. start: Event, @@ -124,40 +138,46 @@ impl ParallelSystemExecutor for ParallelExecutor { } } - ComputeTaskPool::init(TaskPool::default).scope(|scope| { - self.prepare_systems(scope, systems, world); - if self.should_run.count_ones(..) == 0 { - return; - } - let parallel_executor = async { - // All systems have been ran if there are no queued or running systems. - while 0 != self.queued.count_ones(..) + self.running.count_ones(..) { - self.process_queued_systems(); - // Avoid deadlocking if no systems were actually started. - if self.running.count_ones(..) != 0 { - // Wait until at least one system has finished. - let index = self - .finish_receiver - .recv() - .await - .unwrap_or_else(|error| unreachable!("{}", error)); - self.process_finished_system(index); - // Gather other systems than may have finished. - while let Ok(index) = self.finish_receiver.try_recv() { + let thread_executor = world.get_resource::().map(|e| &*e.0); + + ComputeTaskPool::init(TaskPool::default).scope_with_executor( + false, + thread_executor, + |scope| { + self.prepare_systems(scope, systems, world); + if self.should_run.count_ones(..) == 0 { + return; + } + let parallel_executor = async { + // All systems have been ran if there are no queued or running systems. + while 0 != self.queued.count_ones(..) + self.running.count_ones(..) { + self.process_queued_systems(); + // Avoid deadlocking if no systems were actually started. + if self.running.count_ones(..) != 0 { + // Wait until at least one system has finished. + let index = self + .finish_receiver + .recv() + .await + .unwrap_or_else(|error| unreachable!("{}", error)); self.process_finished_system(index); + // Gather other systems than may have finished. + while let Ok(index) = self.finish_receiver.try_recv() { + self.process_finished_system(index); + } + // At least one system has finished, so active access is outdated. + self.rebuild_active_access(); } - // At least one system has finished, so active access is outdated. - self.rebuild_active_access(); + self.update_counters_and_queue_systems(); } - self.update_counters_and_queue_systems(); - } - }; - #[cfg(feature = "trace")] - let span = bevy_utils::tracing::info_span!("parallel executor"); - #[cfg(feature = "trace")] - let parallel_executor = parallel_executor.instrument(span); - scope.spawn(parallel_executor); - }); + }; + #[cfg(feature = "trace")] + let span = bevy_utils::tracing::info_span!("parallel executor"); + #[cfg(feature = "trace")] + let parallel_executor = parallel_executor.instrument(span); + scope.spawn(parallel_executor); + }, + ); } } diff --git a/crates/bevy_internal/src/default_plugins.rs b/crates/bevy_internal/src/default_plugins.rs index 1c8ee63d53aee4..7bd49c761dc059 100644 --- a/crates/bevy_internal/src/default_plugins.rs +++ b/crates/bevy_internal/src/default_plugins.rs @@ -68,6 +68,12 @@ impl PluginGroup for DefaultPlugins { // NOTE: Load this after renderer initialization so that it knows about the supported // compressed texture formats .add(bevy_render::texture::ImagePlugin::default()); + + #[cfg(not(target_arch = "wasm32"))] + { + group = group + .add(bevy_render::pipelined_rendering::PipelinedRenderingPlugin::default()); + } } #[cfg(feature = "bevy_core_pipeline")] diff --git a/crates/bevy_render/Cargo.toml b/crates/bevy_render/Cargo.toml index 8fca30904838eb..b66739e2b8cabc 100644 --- a/crates/bevy_render/Cargo.toml +++ b/crates/bevy_render/Cargo.toml @@ -44,6 +44,7 @@ bevy_time = { path = "../bevy_time", version = "0.9.0" } bevy_transform = { path = "../bevy_transform", version = "0.9.0" } bevy_window = { path = "../bevy_window", version = "0.9.0" } bevy_utils = { path = "../bevy_utils", version = "0.9.0" } +bevy_tasks = { path = "../bevy_tasks", version = "0.9.0" } # rendering image = { version = "0.24", default-features = false } @@ -75,3 +76,4 @@ basis-universal = { version = "0.2.0", optional = true } encase = { version = "0.4", features = ["glam"] } # For wgpu profiling using tracing. Use `RUST_LOG=info` to also capture the wgpu spans. profiling = { version = "1", features = ["profile-with-tracing"], optional = true } +async-channel = "1.4" diff --git a/crates/bevy_render/src/lib.rs b/crates/bevy_render/src/lib.rs index 2cb1868939fd2f..0eca1967fb9161 100644 --- a/crates/bevy_render/src/lib.rs +++ b/crates/bevy_render/src/lib.rs @@ -10,6 +10,7 @@ mod extract_param; pub mod extract_resource; pub mod globals; pub mod mesh; +pub mod pipelined_rendering; pub mod primitives; pub mod render_asset; pub mod render_graph; @@ -72,6 +73,9 @@ pub enum RenderStage { /// running the next frame while rendering the current frame. Extract, + /// A stage for applying the commands from the [`Extract`] stage + ExtractCommands, + /// Prepare render resources from the extracted data for the GPU. Prepare, @@ -191,8 +195,14 @@ impl Plugin for RenderPlugin { // after access to the main world is removed // See also https://github.com/bevyengine/bevy/issues/5082 extract_stage.set_apply_buffers(false); + + // This stage applies the commands from the extract stage while the render schedule + // is running in parallel with the main app. + let mut extract_commands_stage = SystemStage::parallel(); + extract_commands_stage.add_system(apply_extract_commands.at_start()); render_app .add_stage(RenderStage::Extract, extract_stage) + .add_stage(RenderStage::ExtractCommands, extract_commands_stage) .add_stage(RenderStage::Prepare, SystemStage::parallel()) .add_stage(RenderStage::Queue, SystemStage::parallel()) .add_stage(RenderStage::PhaseSort, SystemStage::parallel()) @@ -223,7 +233,7 @@ impl Plugin for RenderPlugin { app.add_sub_app(RenderApp, render_app, move |app_world, render_app| { #[cfg(feature = "trace")] - let _render_span = bevy_utils::tracing::info_span!("renderer subapp").entered(); + let _render_span = bevy_utils::tracing::info_span!("extract main app to render subapp").entered(); { #[cfg(feature = "trace")] let _stage_span = @@ -309,10 +319,12 @@ fn extract(app_world: &mut World, render_app: &mut App) { let inserted_world = render_world.remove_resource::().unwrap(); let scratch_world = std::mem::replace(app_world, inserted_world.0); app_world.insert_resource(ScratchMainWorld(scratch_world)); - - // Note: We apply buffers (read, Commands) after the `MainWorld` has been removed from the render app's world - // so that in future, pipelining will be able to do this too without any code relying on it. - // see - extract_stage.0.apply_buffers(render_world); }); } + +// system for render app to apply the extract commands +fn apply_extract_commands(world: &mut World) { + world.resource_scope(|world, mut extract_stage: Mut| { + extract_stage.0.apply_buffers(world); + }); +} diff --git a/crates/bevy_render/src/pipelined_rendering.rs b/crates/bevy_render/src/pipelined_rendering.rs new file mode 100644 index 00000000000000..63c0a0cfb74b63 --- /dev/null +++ b/crates/bevy_render/src/pipelined_rendering.rs @@ -0,0 +1,155 @@ +use async_channel::{Receiver, Sender}; + +use bevy_app::{App, AppLabel, Plugin, SubApp}; +use bevy_ecs::{ + schedule::{MainThreadExecutor, StageLabel, SystemStage}, + system::Resource, + world::{Mut, World}, +}; +use bevy_tasks::ComputeTaskPool; + +use crate::RenderApp; + +/// A Label for the sub app that runs the parts of pipelined rendering that need to run on the main thread. +#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, AppLabel)] +pub struct RenderExtractApp; + +/// Labels for stages in the [`RenderExtractApp`] sub app. These will run after rendering has started. +#[derive(Debug, Hash, PartialEq, Eq, Clone, StageLabel)] +pub enum RenderExtractStage { + /// When pipelined rendering is enabled this stage runs after the render schedule starts, but + /// before I/O processing and the main app schedule. This can be useful for something like + /// frame pacing. + BeforeIoAfterRenderStart, +} + +/// Channel to send the render app from the main thread to the rendering thread +#[derive(Resource)] +pub struct MainToRenderAppSender(pub Sender); + +/// Channel to send the render app from the render thread to the main thread +#[derive(Resource)] +pub struct RenderToMainAppReceiver(pub Receiver); + +/// The [`PipelinedRenderingPlugin`] can be added to your application to enable pipelined rendering. +/// This moves rendering into a different thread, so that the Nth frame's rendering can +/// be run at the same time as the N + 1 frame's simulation. +/// +/// ```text +/// |--------------------|--------------------|--------------------|--------------------| +/// | simulation thread | frame 1 simulation | frame 2 simulation | frame 3 simulation | +/// |--------------------|--------------------|--------------------|--------------------| +/// | rendering thread | | frame 1 rendering | frame 2 rendering | +/// |--------------------|--------------------|--------------------|--------------------| +/// ``` +/// +/// The plugin is dependent on the [`crate::RenderApp`] added by [`crate::RenderPlugin`] and so must +/// be added after that plugin. If it is not added after, the plugin will do nothing. +/// +/// A single frame of execution looks something like below +/// +/// ```text +/// |-------------------------------------------------------------------| +/// | | BeforeIoAfterRenderStart | winit events | main schedule | +/// | extract |---------------------------------------------------------| +/// | | extract commands | rendering schedule | +/// |-------------------------------------------------------------------| +/// ``` +/// +/// - `extract` is the stage where data is copied from the main world to the render world. +/// This is run on the main app's thread. +/// - On the render thread, we first apply the `extract commands`. This is not run during extract, so the +/// main schedule can start sooner. +/// - Then the `rendering schedule` is run. See [`crate::RenderStage`] for the available stages. +/// - In parallel to the rendering thread we first run the [`RenderExtractStage::BeforeIoAfterRenderStart`] stage. By +/// default this stage is empty. But is useful if you need something to run before I/O processing. +/// - Next all the `winit events` are processed. +/// - And finally the `main app schedule` is run. +/// - Once both the `main app schedule` and the `render schedule` are finished running, `extract` is run again. +#[derive(Default)] +pub struct PipelinedRenderingPlugin; + +impl Plugin for PipelinedRenderingPlugin { + fn build(&self, app: &mut App) { + // Don't add RenderExtractApp if RenderApp isn't initialized. + if app.get_sub_app(RenderApp).is_err() { + return; + } + app.insert_resource(MainThreadExecutor::new()); + + let mut sub_app = App::empty(); + sub_app.add_stage( + RenderExtractStage::BeforeIoAfterRenderStart, + SystemStage::parallel(), + ); + app.add_sub_app(RenderExtractApp, sub_app, update_rendering); + } + + // Sets up the render thread and inserts resources into the main app used for controlling the render thread. + fn setup(&self, app: &mut App) { + // skip setting up when headless + if app.get_sub_app(RenderExtractApp).is_err() { + return; + } + + let (app_to_render_sender, app_to_render_receiver) = async_channel::bounded::(1); + let (render_to_app_sender, render_to_app_receiver) = async_channel::bounded::(1); + + let mut render_app = app + .remove_sub_app(RenderApp) + .expect("Unable to get RenderApp. Another plugin may have removed the RenderApp before PipelinedRenderingPlugin"); + + // clone main thread executor to render world + let executor = app.world.get_resource::().unwrap(); + render_app.app.world.insert_resource(executor.clone()); + + render_to_app_sender.send_blocking(render_app).unwrap(); + + app.insert_resource(MainToRenderAppSender(app_to_render_sender)); + app.insert_resource(RenderToMainAppReceiver(render_to_app_receiver)); + + std::thread::spawn(move || { + #[cfg(feature = "trace")] + let _span = bevy_utils::tracing::info_span!("render thread").entered(); + + loop { + // run a scope here to allow main world to use this thread while it's waiting for the render app + let mut render_app = ComputeTaskPool::get() + .scope(|s| { + s.spawn(async { app_to_render_receiver.recv().await.unwrap() }); + }) + .pop() + .unwrap(); + + #[cfg(feature = "trace")] + let _sub_app_span = + bevy_utils::tracing::info_span!("sub app", name = ?RenderApp).entered(); + render_app.run(); + render_to_app_sender.send_blocking(render_app).unwrap(); + } + }); + } +} + +// This function waits for the rendering world to be received, +// runs extract, and then sends the rendering world back to the render thread. +fn update_rendering(app_world: &mut World, _sub_app: &mut App) { + app_world.resource_scope(|world, main_thread_executor: Mut| { + // we use a scope here to run any main thread tasks that the render world still needs to run + // while we wait for the render world to be received. + let mut render_app = ComputeTaskPool::get() + .scope_with_executor(true, Some(&*main_thread_executor.0), |s| { + s.spawn(async { + let receiver = world.get_resource::().unwrap(); + receiver.0.recv().await.unwrap() + }); + }) + .pop() + .unwrap(); + + render_app.extract(world); + + let sender = world.resource::(); + sender.0.send_blocking(render_app).unwrap(); + }); +} diff --git a/crates/bevy_tasks/src/lib.rs b/crates/bevy_tasks/src/lib.rs index ae9b0a3dbd8dc1..b5e340c2d65ecc 100644 --- a/crates/bevy_tasks/src/lib.rs +++ b/crates/bevy_tasks/src/lib.rs @@ -15,7 +15,7 @@ pub use task_pool::{Scope, TaskPool, TaskPoolBuilder}; #[cfg(target_arch = "wasm32")] mod single_threaded_task_pool; #[cfg(target_arch = "wasm32")] -pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder}; +pub use single_threaded_task_pool::{Scope, TaskPool, TaskPoolBuilder, ThreadExecutor}; mod usages; #[cfg(not(target_arch = "wasm32"))] diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 8fa37f4f2361be..9b77d8fd3bb2cb 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -9,6 +9,20 @@ use std::{ #[derive(Debug, Default, Clone)] pub struct TaskPoolBuilder {} +/// This is a dummy struct for wasm support to provide the same api as with the multithreaded +/// task pool. In the case of the multithreaded task pool this struct is used to spawn +/// tasks on a specific thread. But the wasm task pool just calls +/// [`wasm_bindgen_futures::spawn_local`] for spawning which just runs tasks on the main thread +/// and so the [`ThreadExecutor`] does nothing. +#[derive(Default)] +pub struct ThreadExecutor<'a>(PhantomData<&'a ()>); +impl<'a> ThreadExecutor<'a> { + /// Creates a new `ThreadExecutor` + pub fn new() -> Self { + Self(PhantomData::default()) + } +} + impl TaskPoolBuilder { /// Creates a new TaskPoolBuilder instance pub fn new() -> Self { @@ -63,6 +77,24 @@ impl TaskPool { /// /// This is similar to `rayon::scope` and `crossbeam::scope` pub fn scope<'env, F, T>(&self, f: F) -> Vec + where + F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>), + T: Send + 'static, + { + self.scope_with_executor(false, None, f) + } + + /// Allows spawning non-`static futures on the thread pool. The function takes a callback, + /// passing a scope object into it. The scope object provided to the callback can be used + /// to spawn tasks. This function will await the completion of all tasks before returning. + /// + /// This is similar to `rayon::scope` and `crossbeam::scope` + pub fn scope_with_executor<'env, F, T>( + &self, + _tick_task_pool_executor: bool, + _thread_executor: Option<&ThreadExecutor>, + f: F, + ) -> Vec where F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>), T: Send + 'static, diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 7bca59e7dba67c..250bfba91f72c5 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -2,15 +2,19 @@ use std::{ future::Future, marker::PhantomData, mem, + panic::AssertUnwindSafe, sync::Arc, thread::{self, JoinHandle}, }; use async_task::FallibleTask; use concurrent_queue::ConcurrentQueue; -use futures_lite::{future, pin, FutureExt}; +use futures_lite::{future, FutureExt}; -use crate::{thread_executor::ThreadExecutor, Task}; +use crate::{ + thread_executor::{ThreadExecutor, ThreadExecutorTicker}, + Task, +}; struct CallOnDrop(Option>); @@ -266,67 +270,166 @@ impl TaskPool { /// }); /// }); /// } - /// pub fn scope<'env, F, T>(&self, f: F) -> Vec where F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), T: Send + 'static, { - Self::THREAD_EXECUTOR.with(|thread_executor| { - // SAFETY: This safety comment applies to all references transmuted to 'env. - // Any futures spawned with these references need to return before this function completes. - // This is guaranteed because we drive all the futures spawned onto the Scope - // to completion in this function. However, rust has no way of knowing this so we - // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. - let executor: &async_executor::Executor = &self.executor; - let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; - let thread_executor: &'env ThreadExecutor<'env> = - unsafe { mem::transmute(thread_executor) }; - let spawned: ConcurrentQueue> = ConcurrentQueue::unbounded(); - let spawned_ref: &'env ConcurrentQueue> = - unsafe { mem::transmute(&spawned) }; - - let scope = Scope { - executor, - thread_executor, - spawned: spawned_ref, - scope: PhantomData, - env: PhantomData, - }; - - let scope_ref: &'env Scope<'_, 'env, T> = unsafe { mem::transmute(&scope) }; - - f(scope_ref); - - if spawned.is_empty() { - Vec::new() - } else { + Self::THREAD_EXECUTOR + .with(|thread_executor| self.scope_with_executor_inner(true, thread_executor, f)) + } + + /// This allows passing an external executor to spawn tasks on. When you pass an external executor + /// [`Scope::spawn_on_scope`] spawns is then run on the thread that [`ThreadExecutor`] is being ticked on. + /// If [`None`] is passed the scope will use a [`ThreadExecutor`] that is ticked on the current thread. + /// + /// When `tick_task_pool_executor` is set to `true`, the multithreaded task stealing executor is ticked on the scope + /// thread. Disabling this can be useful when finishing the scope is latency sensitive. Pulling tasks from + /// global excutor can run tasks unrelated to the scope and delay when the scope returns. + /// + /// See [`Self::scope`] for more details in general about how scopes work. + pub fn scope_with_executor<'env, F, T>( + &self, + tick_task_pool_executor: bool, + thread_executor: Option<&ThreadExecutor>, + f: F, + ) -> Vec + where + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), + T: Send + 'static, + { + // If a `thread_executor` is passed use that. Otherwise get the `thread_executor` stored + // in the `THREAD_EXECUTOR` thread local. + if let Some(thread_executor) = thread_executor { + self.scope_with_executor_inner(tick_task_pool_executor, thread_executor, f) + } else { + Self::THREAD_EXECUTOR.with(|thread_executor| { + self.scope_with_executor_inner(tick_task_pool_executor, thread_executor, f) + }) + } + } + + fn scope_with_executor_inner<'env, F, T>( + &self, + tick_task_pool_executor: bool, + thread_executor: &ThreadExecutor, + f: F, + ) -> Vec + where + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), + T: Send + 'static, + { + // SAFETY: This safety comment applies to all references transmuted to 'env. + // Any futures spawned with these references need to return before this function completes. + // This is guaranteed because we drive all the futures spawned onto the Scope + // to completion in this function. However, rust has no way of knowing this so we + // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. + let executor: &async_executor::Executor = &self.executor; + let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; + let thread_executor: &'env ThreadExecutor<'env> = + unsafe { mem::transmute(thread_executor) }; + let spawned: ConcurrentQueue> = ConcurrentQueue::unbounded(); + let spawned_ref: &'env ConcurrentQueue> = + unsafe { mem::transmute(&spawned) }; + + let scope = Scope { + executor, + thread_executor, + spawned: spawned_ref, + scope: PhantomData, + env: PhantomData, + }; + + let scope_ref: &'env Scope<'_, 'env, T> = unsafe { mem::transmute(&scope) }; + + f(scope_ref); + + if spawned.is_empty() { + Vec::new() + } else { + future::block_on(async move { let get_results = async { let mut results = Vec::with_capacity(spawned_ref.len()); while let Ok(task) = spawned_ref.pop() { results.push(task.await.unwrap()); } - results }; - // Pin the futures on the stack. - pin!(get_results); + let tick_task_pool_executor = tick_task_pool_executor || self.threads.is_empty(); + if let Some(thread_ticker) = thread_executor.ticker() { + if tick_task_pool_executor { + Self::execute_local_global(thread_ticker, executor, get_results).await + } else { + Self::execute_local(thread_ticker, get_results).await + } + } else if tick_task_pool_executor { + Self::execute_global(executor, get_results).await + } else { + get_results.await + } + }) + } + } + + #[inline] + async fn execute_local_global<'scope, 'ticker, T>( + thread_ticker: ThreadExecutorTicker<'scope, 'ticker>, + executor: &'scope async_executor::Executor<'scope>, + get_results: impl Future>, + ) -> Vec { + // we restart the executors if a task errors. if a scoped + // task errors it will panic the scope on the call to get_results + let execute_forever = async move { + loop { + let tick_forever = async { + loop { + thread_ticker.tick().await; + } + }; + // we don't care if it errors. If a scoped task errors it will propagate + // to get_results + let _result = AssertUnwindSafe(executor.run(tick_forever)) + .catch_unwind() + .await + .is_ok(); + } + }; + execute_forever.or(get_results).await + } - let thread_ticker = thread_executor.ticker().unwrap(); - loop { - if let Some(result) = future::block_on(future::poll_once(&mut get_results)) { - break result; - }; + #[inline] + async fn execute_local<'scope, 'ticker, T>( + thread_ticker: ThreadExecutorTicker<'scope, 'ticker>, + get_results: impl Future>, + ) -> Vec { + let execute_forever = async { + loop { + let tick_forever = async { + loop { + thread_ticker.tick().await; + } + }; + let _result = AssertUnwindSafe(tick_forever).catch_unwind().await.is_ok(); + } + }; + execute_forever.or(get_results).await + } - std::panic::catch_unwind(|| { - executor.try_tick(); - thread_ticker.try_tick(); - }) - .ok(); - } + #[inline] + async fn execute_global<'scope, T>( + executor: &'scope async_executor::Executor<'scope>, + get_results: impl Future>, + ) -> Vec { + let execute_forever = async { + loop { + let _result = AssertUnwindSafe(executor.run(std::future::pending::<()>())) + .catch_unwind() + .await + .is_ok(); } - }) + }; + execute_forever.or(get_results).await } /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be