diff --git a/crates/bevy_ecs/Cargo.toml b/crates/bevy_ecs/Cargo.toml index 0f23192b313d33..f08c82658b4733 100644 --- a/crates/bevy_ecs/Cargo.toml +++ b/crates/bevy_ecs/Cargo.toml @@ -21,6 +21,7 @@ bevy_utils = { path = "../bevy_utils", version = "0.9.0-dev" } bevy_ecs_macros = { path = "macros", version = "0.9.0-dev" } async-channel = "1.4" +event-listener = "2.5" thread_local = "1.1.4" fixedbitset = "0.4" fxhash = "0.2" diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 887501a1fc87ff..96b272d03f2c83 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -8,6 +8,7 @@ use async_channel::{Receiver, Sender}; use bevy_tasks::{ComputeTaskPool, Scope, TaskPool}; #[cfg(feature = "trace")] use bevy_utils::tracing::Instrument; +use event_listener::Event; use fixedbitset::FixedBitSet; #[cfg(test)] @@ -15,9 +16,7 @@ use scheduling_event::*; struct SystemSchedulingMetadata { /// Used to signal the system's task to start the system. - start_sender: Sender<()>, - /// Receives the signal to start the system. - start_receiver: Receiver<()>, + start: Event, /// Indices of systems that depend on this one, used to decrement their /// dependency counters when this system finishes. dependants: Vec, @@ -56,7 +55,11 @@ pub struct ParallelExecutor { impl Default for ParallelExecutor { fn default() -> Self { - let (finish_sender, finish_receiver) = async_channel::unbounded(); + // Using a bounded channel here as it avoids allocations when signaling + // and generally remains hotter in memory. It'll take 128 systems completing + // before the parallel executor runs before this overflows. If it overflows + // all systems will just suspend until the parallel executor runs. + let (finish_sender, finish_receiver) = async_channel::bounded(128); Self { system_metadata: Default::default(), finish_sender, @@ -84,10 +87,8 @@ impl ParallelSystemExecutor for ParallelExecutor { for container in systems { let dependencies_total = container.dependencies().len(); let system = container.system(); - let (start_sender, start_receiver) = async_channel::bounded(1); self.system_metadata.push(SystemSchedulingMetadata { - start_sender, - start_receiver, + start: Event::new(), dependants: vec![], dependencies_total, dependencies_now: 0, @@ -125,10 +126,13 @@ impl ParallelSystemExecutor for ParallelExecutor { ComputeTaskPool::init(TaskPool::default).scope(|scope| { self.prepare_systems(scope, systems, world); + if self.should_run.count_ones(..) == 0 { + return; + } let parallel_executor = async { // All systems have been ran if there are no queued or running systems. while 0 != self.queued.count_ones(..) + self.running.count_ones(..) { - self.process_queued_systems().await; + self.process_queued_systems(); // Avoid deadlocking if no systems were actually started. if self.running.count_ones(..) != 0 { // Wait until at least one system has finished. @@ -166,34 +170,96 @@ impl ParallelExecutor { systems: &'scope mut [ParallelSystemContainer], world: &'scope World, ) { + // These are used as a part of a unit test. + #[cfg(test)] + let mut started_systems = 0; #[cfg(feature = "trace")] let _span = bevy_utils::tracing::info_span!("prepare_systems").entered(); self.should_run.clear(); for (index, (system_data, system)) in self.system_metadata.iter_mut().zip(systems).enumerate() { + let should_run = system.should_run(); + let can_start = should_run + && system_data.dependencies_total == 0 + && Self::can_start_now( + self.non_send_running, + system_data, + &self.active_archetype_component_access, + ); + + // Queue the system if it has no dependencies, otherwise reset its dependency counter. + if system_data.dependencies_total == 0 { + if !can_start { + self.queued.insert(index); + } + } else { + system_data.dependencies_now = system_data.dependencies_total; + } + + if !should_run { + continue; + } + // Spawn the system task. - if system.should_run() { - self.should_run.set(index, true); - let start_receiver = system_data.start_receiver.clone(); - let finish_sender = self.finish_sender.clone(); - let system = system.system_mut(); - #[cfg(feature = "trace")] // NB: outside the task to get the TLS current span - let system_span = bevy_utils::tracing::info_span!("system", name = &*system.name()); + self.should_run.insert(index); + let finish_sender = self.finish_sender.clone(); + let system = system.system_mut(); + #[cfg(feature = "trace")] // NB: outside the task to get the TLS current span + let system_span = bevy_utils::tracing::info_span!("system", name = &*system.name()); + #[cfg(feature = "trace")] + let overhead_span = + bevy_utils::tracing::info_span!("system overhead", name = &*system.name()); + + let mut run = move || { #[cfg(feature = "trace")] - let overhead_span = - bevy_utils::tracing::info_span!("system overhead", name = &*system.name()); + let _system_guard = system_span.enter(); + // SAFETY: the executor prevents two systems with conflicting access from running simultaneously. + unsafe { system.run_unsafe((), world) }; + }; + + if can_start { let task = async move { - start_receiver - .recv() + run(); + // This will never panic: + // - The channel is never closed or dropped. + // - Overflowing the bounded size will just suspend until + // there is capacity. + finish_sender + .send(index) .await .unwrap_or_else(|error| unreachable!("{}", error)); - #[cfg(feature = "trace")] - let system_guard = system_span.enter(); - // SAFETY: the executor prevents two systems with conflicting access from running simultaneously. - unsafe { system.run_unsafe((), world) }; - #[cfg(feature = "trace")] - drop(system_guard); + }; + + #[cfg(feature = "trace")] + let task = task.instrument(overhead_span); + if system_data.is_send { + scope.spawn(task); + } else { + scope.spawn_local(task); + } + + #[cfg(test)] + { + started_systems += 1; + } + + self.running.insert(index); + if !system_data.is_send { + self.non_send_running = true; + } + // Add this system's access information to the active access information. + self.active_archetype_component_access + .extend(&system_data.archetype_component_access); + } else { + let start_listener = system_data.start.listen(); + let task = async move { + start_listener.await; + run(); + // This will never panic: + // - The channel is never closed or dropped. + // - Overflowing the bounded size will just suspend until + // there is capacity. finish_sender .send(index) .await @@ -208,29 +274,33 @@ impl ParallelExecutor { scope.spawn_local(task); } } - // Queue the system if it has no dependencies, otherwise reset its dependency counter. - if system_data.dependencies_total == 0 { - self.queued.insert(index); - } else { - system_data.dependencies_now = system_data.dependencies_total; - } + } + #[cfg(test)] + if started_systems != 0 { + self.emit_event(SchedulingEvent::StartedSystems(started_systems)); } } /// Determines if the system with given index has no conflicts with already running systems. - fn can_start_now(&self, index: usize) -> bool { - let system_data = &self.system_metadata[index]; + #[inline] + fn can_start_now( + non_send_running: bool, + system_data: &SystemSchedulingMetadata, + active_archetype_component_access: &Access, + ) -> bool { // Non-send systems are considered conflicting with each other. - (!self.non_send_running || system_data.is_send) + (!non_send_running || system_data.is_send) && system_data .archetype_component_access - .is_compatible(&self.active_archetype_component_access) + .is_compatible(active_archetype_component_access) } /// Starts all non-conflicting queued systems, moves them from `queued` to `running`, /// adds their access information to active access information; /// processes queued systems that shouldn't run this iteration as completed immediately. - async fn process_queued_systems(&mut self) { + fn process_queued_systems(&mut self) { + // These are used as a part of a unit test as seen in `process_queued_systems`. + // Removing them will cause the test to fail. #[cfg(test)] let mut started_systems = 0; for index in self.queued.ones() { @@ -239,17 +309,17 @@ impl ParallelExecutor { let system_metadata = &self.system_metadata[index]; if !self.should_run[index] { self.dependants_scratch.extend(&system_metadata.dependants); - } else if self.can_start_now(index) { + } else if Self::can_start_now( + self.non_send_running, + system_metadata, + &self.active_archetype_component_access, + ) { #[cfg(test)] { started_systems += 1; } - system_metadata - .start_sender - .send(()) - .await - .unwrap_or_else(|error| unreachable!("{}", error)); - self.running.set(index, true); + system_metadata.start.notify_additional_relaxed(1); + self.running.insert(index); if !system_metadata.is_send { self.non_send_running = true; }