Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Remove task_pool parameter from par_for_each(_mut) #4705

Closed
wants to merge 11 commits into from
8 changes: 4 additions & 4 deletions benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bevy_ecs::prelude::*;
use bevy_tasks::TaskPool;
use bevy_tasks::{ComputeTaskPool, TaskPool};
use glam::*;

#[derive(Component, Copy, Clone)]
Expand Down Expand Up @@ -29,8 +29,8 @@ impl Benchmark {
)
}));

fn sys(task_pool: Res<TaskPool>, mut query: Query<(&mut Position, &mut Transform)>) {
query.par_for_each_mut(&task_pool, 128, |(mut pos, mut mat)| {
fn sys(mut query: Query<(&mut Position, &mut Transform)>) {
query.par_for_each_mut(128, |(mut pos, mut mat)| {
for _ in 0..100 {
mat.0 = mat.0.inverse();
}
Expand All @@ -39,7 +39,7 @@ impl Benchmark {
});
}

world.insert_resource(TaskPool::default());
world.insert_resource(ComputeTaskPool(TaskPool::default()));
let mut system = IntoSystem::into_system(sys);
system.initialize(&mut world);
system.update_archetype_component_access(&world);
Expand Down
10 changes: 4 additions & 6 deletions crates/bevy_ecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ mod tests {
query::{Added, ChangeTrackers, Changed, FilteredAccess, With, Without, WorldQuery},
world::{Mut, World},
};
use bevy_tasks::TaskPool;
use bevy_tasks::{ComputeTaskPool, TaskPool};
use std::{
any::TypeId,
sync::{
Expand Down Expand Up @@ -373,7 +373,7 @@ mod tests {
#[test]
fn par_for_each_dense() {
let mut world = World::new();
let task_pool = TaskPool::default();
world.insert_resource(ComputeTaskPool(TaskPool::default()));
let e1 = world.spawn().insert(A(1)).id();
let e2 = world.spawn().insert(A(2)).id();
let e3 = world.spawn().insert(A(3)).id();
Expand All @@ -382,7 +382,7 @@ mod tests {
let results = Arc::new(Mutex::new(Vec::new()));
world
.query::<(Entity, &A)>()
.par_for_each(&world, &task_pool, 2, |(e, &A(i))| {
.par_for_each(&world, 2, |(e, &A(i))| {
results.lock().unwrap().push((e, i));
});
results.lock().unwrap().sort();
Expand All @@ -395,8 +395,7 @@ mod tests {
#[test]
fn par_for_each_sparse() {
let mut world = World::new();

let task_pool = TaskPool::default();
world.insert_resource(ComputeTaskPool(TaskPool::default()));
let e1 = world.spawn().insert(SparseStored(1)).id();
let e2 = world.spawn().insert(SparseStored(2)).id();
let e3 = world.spawn().insert(SparseStored(3)).id();
Expand All @@ -405,7 +404,6 @@ mod tests {
let results = Arc::new(Mutex::new(Vec::new()));
world.query::<(Entity, &SparseStored)>().par_for_each(
&world,
&task_pool,
2,
|(e, &SparseStored(i))| results.lock().unwrap().push((e, i)),
);
Expand Down
37 changes: 25 additions & 12 deletions crates/bevy_ecs/src/query/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ use crate::{
storage::TableId,
world::{World, WorldId},
};
use bevy_tasks::TaskPool;
use bevy_tasks::{ComputeTaskPool, TaskPool};
use fixedbitset::FixedBitSet;
use std::fmt;
use std::{fmt, ops::Deref};

use super::{QueryFetch, QueryItem, ROQueryFetch, ROQueryItem};

/// Provides scoped access to a [`World`] state according to a given [`WorldQuery`] and query filter.
pub struct QueryState<Q: WorldQuery, F: WorldQuery = ()> {
world_id: WorldId,
task_pool: Option<TaskPool>,
pub(crate) archetype_generation: ArchetypeGeneration,
pub(crate) matched_tables: FixedBitSet,
pub(crate) matched_archetypes: FixedBitSet,
Expand Down Expand Up @@ -59,6 +60,9 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {

let mut state = Self {
world_id: world.id(),
task_pool: world
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This clone is going to cause a subtle bug if anyone tries to replace the compute pool during running. The par_for_each is going ignore the pool being replaced and continue to use the original compute pool.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I think about this, the less such an operation makes sense. Even more reason to make TaskPools global and initialized once instead of allowing arbitrary reconfiguration mid execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, but since this is one of the objections against global task pools, I'm not sure we can merge this without more clarity there.

Copy link
Member Author

@james7132 james7132 May 20, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking on this a bit more, this isn't too different from the status quo. You can already spawn a task, embed a clone of the TaskPool it's running on, and have it run forever. Replacing the resource would spawn a new TaskPool, but the former pool wouldn't be cleaned up.

Is this a problem? Yes. Is it one solvable within this PR? No. Does integrating this into ECS exacerbate the issue? Probably. Is it worth blocking this change on it? I don't think so. However, in turn, we should prioritize reviewing and merging #2250 or some variant of it to address this issue.

.get_resource::<ComputeTaskPool>()
.map(|task_pool| task_pool.deref().clone()),
james7132 marked this conversation as resolved.
Show resolved Hide resolved
archetype_generation: ArchetypeGeneration::initial(),
matched_table_ids: Vec::new(),
matched_archetype_ids: Vec::new(),
Expand Down Expand Up @@ -683,15 +687,17 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
);
}

/// Runs `func` on each query result in parallel using the given `task_pool`.
/// Runs `func` on each query result in parallel.
///
/// This can only be called for read-only queries, see [`Self::par_for_each_mut`] for
/// write-queries.
///
/// # Panics
/// [`ComputeTaskPool`] is not stored as a resource in `world`.
#[inline]
pub fn par_for_each<'w, FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>(
&mut self,
world: &'w World,
task_pool: &TaskPool,
batch_size: usize,
func: FN,
) {
Expand All @@ -700,7 +706,6 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
self.update_archetypes(world);
self.par_for_each_unchecked_manual::<ROQueryFetch<Q>, FN>(
world,
task_pool,
batch_size,
func,
world.last_change_tick(),
Expand All @@ -709,12 +714,14 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
}
}

/// Runs `func` on each query result in parallel using the given `task_pool`.
/// Runs `func` on each query result in parallel.
///
/// # Panics
/// [`ComputeTaskPool`] is not stored as a resource in `world`.
#[inline]
pub fn par_for_each_mut<'w, FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(
&mut self,
world: &'w mut World,
task_pool: &TaskPool,
batch_size: usize,
func: FN,
) {
Expand All @@ -723,7 +730,6 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
self.update_archetypes(world);
self.par_for_each_unchecked_manual::<QueryFetch<Q>, FN>(
world,
task_pool,
batch_size,
func,
world.last_change_tick(),
Expand All @@ -732,10 +738,13 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
}
}

/// Runs `func` on each query result in parallel using the given `task_pool`.
/// Runs `func` on each query result in parallel.
///
/// This can only be called for read-only queries.
///
/// # Panics
/// [`ComputeTaskPool`] is not stored as a resource in `world`.
///
/// # Safety
///
/// This does not check for mutable query correctness. To be safe, make sure mutable queries
Expand All @@ -744,14 +753,12 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
pub unsafe fn par_for_each_unchecked<'w, FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(
&mut self,
world: &'w World,
task_pool: &TaskPool,
batch_size: usize,
func: FN,
) {
self.update_archetypes(world);
self.par_for_each_unchecked_manual::<QueryFetch<Q>, FN>(
world,
task_pool,
batch_size,
func,
world.last_change_tick(),
Expand Down Expand Up @@ -827,6 +834,9 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
/// the current change tick are given. This is faster than the equivalent
/// iter() method, but cannot be chained like a normal [`Iterator`].
///
/// # Panics
/// [`ComputeTaskPool`] is not stored as a resource in `world`.
james7132 marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Safety
///
/// This does not check for mutable query correctness. To be safe, make sure mutable queries
Expand All @@ -840,12 +850,15 @@ impl<Q: WorldQuery, F: WorldQuery> QueryState<Q, F> {
>(
&self,
world: &'w World,
task_pool: &TaskPool,
batch_size: usize,
func: FN,
last_change_tick: u32,
change_tick: u32,
) {
let task_pool = self
.task_pool
.clone()
.expect("Cannot iterate query in parallel. No ComputeTaskPool initialized.");
// NOTE: If you are changing query iteration code, remember to update the following places, where relevant:
// QueryIter, QueryIterationCursor, QueryState::for_each_unchecked_manual, QueryState::par_for_each_unchecked_manual
task_pool.scope(|scope| {
Expand Down
22 changes: 13 additions & 9 deletions crates/bevy_ecs/src/system/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use crate::{
},
world::{Mut, World},
};
use bevy_tasks::TaskPool;
use std::{any::TypeId, fmt::Debug};

/// Provides scoped access to components in a [`World`].
Expand Down Expand Up @@ -493,7 +492,7 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> {
};
}

/// Runs `f` on each query result in parallel using the given [`TaskPool`].
/// Runs `f` on each query result in parallel using the [`World`]'s [`ComputeTaskPool`].
///
/// This can only be called for immutable data, see [`Self::par_for_each_mut`] for
/// mutable access.
Expand All @@ -502,21 +501,24 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> {
///
/// The items in the query get sorted into batches.
/// Internally, this function spawns a group of futures that each take on a `batch_size` sized section of the items (or less if the division is not perfect).
/// Then, the tasks in the [`TaskPool`] work through these futures.
/// Then, the tasks in the [`ComputeTaskPool`] work through these futures.
///
/// You can use this value to tune between maximum multithreading ability (many small batches) and minimum parallelization overhead (few big batches).
/// Rule of thumb: If the function body is (mostly) computationally expensive but there are not many items, a small batch size (=more batches) may help to even out the load.
/// If the body is computationally cheap and you have many items, a large batch size (=fewer batches) avoids spawning additional futures that don't help to even out the load.
///
/// # Arguments
///
///* `task_pool` - The [`TaskPool`] to use
///* `batch_size` - The number of batches to spawn
///* `f` - The function to run on each item in the query
///
/// # Panics
/// [`ComputeTaskPool`] is not stored as a resource in `world`.
///
/// [`ComputeTaskPool`]: bevy_tasks::prelude::ComputeTaskPool
#[inline]
pub fn par_for_each<'this>(
&'this self,
task_pool: &TaskPool,
batch_size: usize,
f: impl Fn(ROQueryItem<'this, Q>) + Send + Sync + Clone,
) {
Expand All @@ -526,7 +528,6 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> {
self.state
.par_for_each_unchecked_manual::<ROQueryFetch<Q>, _>(
self.world,
task_pool,
batch_size,
f,
self.last_change_tick,
Expand All @@ -535,12 +536,16 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> {
};
}

/// Runs `f` on each query result in parallel using the given [`TaskPool`].
/// Runs `f` on each query result in parallel using the [`World`]'s [`ComputeTaskPool`].
/// See [`Self::par_for_each`] for more details.
///
/// # Panics
/// [`ComputeTaskPool`] is not stored as a resource in `world`.
///
/// [`ComputeTaskPool`]: bevy_tasks::prelude::ComputeTaskPool
#[inline]
pub fn par_for_each_mut<'a, FN: Fn(QueryItem<'a, Q>) + Send + Sync + Clone>(
&'a mut self,
task_pool: &TaskPool,
batch_size: usize,
f: FN,
) {
Expand All @@ -550,7 +555,6 @@ impl<'w, 's, Q: WorldQuery, F: WorldQuery> Query<'w, 's, Q, F> {
self.state
.par_for_each_unchecked_manual::<QueryFetch<Q>, FN>(
self.world,
task_pool,
batch_size,
f,
self.last_change_tick,
Expand Down
16 changes: 6 additions & 10 deletions examples/ecs/parallel_query.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use bevy::{prelude::*, tasks::prelude::*};
use bevy::prelude::*;
use rand::random;

#[derive(Component, Deref)]
Expand All @@ -21,26 +21,22 @@ fn spawn_system(mut commands: Commands, asset_server: Res<AssetServer>) {
}

// Move sprites according to their velocity
fn move_system(pool: Res<ComputeTaskPool>, mut sprites: Query<(&mut Transform, &Velocity)>) {
fn move_system(mut sprites: Query<(&mut Transform, &Velocity)>) {
// Compute the new location of each sprite in parallel on the
// ComputeTaskPool using batches of 32 sprites
//
// This example is only for demonstrative purposes. Using a
// This example is only for demonstrative purposes. Using a
// ParallelIterator for an inexpensive operation like addition on only 128
// elements will not typically be faster than just using a normal Iterator.
// See the ParallelIterator documentation for more information on when
// to use or not use ParallelIterator over a normal Iterator.
sprites.par_for_each_mut(&pool, 32, |(mut transform, velocity)| {
sprites.par_for_each_mut(32, |(mut transform, velocity)| {
transform.translation += velocity.extend(0.0);
});
}

// Bounce sprites outside the window
fn bounce_system(
pool: Res<ComputeTaskPool>,
windows: Res<Windows>,
mut sprites: Query<(&Transform, &mut Velocity)>,
) {
fn bounce_system(windows: Res<Windows>, mut sprites: Query<(&Transform, &mut Velocity)>) {
let window = windows.primary();
let width = window.width();
let height = window.height();
Expand All @@ -51,7 +47,7 @@ fn bounce_system(
sprites
// Batch size of 32 is chosen to limit the overhead of
// ParallelIterator, since negating a vector is very inexpensive.
.par_for_each_mut(&pool, 32, |(transform, mut v)| {
.par_for_each_mut(32, |(transform, mut v)| {
if !(left < transform.translation.x
&& transform.translation.x < right
&& bottom < transform.translation.y
Expand Down