Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Basic adaptive batching for parallel query iteration #4777

Closed
wants to merge 61 commits into from
Closed
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
65bd41f
Remove task_pool parameter from par_for_each(_mut)
james7132 May 9, 2022
b110039
Fix benchmarks
james7132 May 9, 2022
cf10758
Embed task pool into QueryState
james7132 May 9, 2022
076db46
Remove the clone
james7132 May 9, 2022
12eefa1
Update docs
james7132 May 9, 2022
3c76af9
Merge branch 'main' into parallel-ergonomics
james7132 May 14, 2022
332e851
Update docs.
james7132 May 16, 2022
186fd50
Update docs.
james7132 May 16, 2022
88044d1
Update docs.
james7132 May 16, 2022
704cf61
Merge branch 'main' into parallel-ergonomics
james7132 May 16, 2022
cedd756
Merge branch 'main' into parallel-ergonomics
james7132 May 16, 2022
1f94913
Basic adaptive batching
james7132 May 17, 2022
5b3a730
Fix CI
james7132 May 17, 2022
8b365f4
Add par_iter impls
james7132 May 17, 2022
21d95cb
Merge branch 'parallel-ergonomics' into adaptive-batching
james7132 May 17, 2022
92af1f4
Fix CI
james7132 May 17, 2022
2309ab7
Add default batches per thread
james7132 May 21, 2022
f788a7a
Merge branch 'main' into adaptive-batching
james7132 May 30, 2022
6fd0cf1
Formatting
james7132 May 30, 2022
10904c1
Merge branch 'main' into adaptive-batching
james7132 Jun 15, 2022
ac4c524
Merge branch 'main' into adaptive-batching
james7132 Jun 16, 2022
0b8c1eb
Update example comments
james7132 Jun 20, 2022
3674f10
Add documentation comments to `bevy_window` (#4333)
arnavc52 Jun 16, 2022
08bf88b
bevy_render: Fix KTX2 UASTC format mapping (#4569)
superdump Jun 17, 2022
3674e19
update hashbrown to 0.12 (#5035)
mockersf Jun 17, 2022
619bdb9
WGSL: use correct syntax for matrix access (#5039)
mockersf Jun 18, 2022
65a6c9a
Implement `Eq` and `PartialEq` for `MouseScrollUnit` (#5048)
frewsxcv Jun 19, 2022
8959c2d
enable optional dependencies to stay optional (#5023)
mockersf Jun 20, 2022
9a8e5fd
gltf: do not import IoTaskPool in wasm (#5038)
mockersf Jun 20, 2022
03ffbe8
Physical viewport calculation fix (#5055)
aevyrie Jun 20, 2022
b89d878
Cleanups in diagnostics (#3871)
mockersf Jun 20, 2022
d3b997b
`bevy_reflect`: put `serialize` into external `ReflectSerialize` type…
jakobhellermann Jun 20, 2022
c5df0d6
Add benchmarks for schedule dependency resolution (#4961)
JoJoJet Jun 20, 2022
d5a5993
change panicking test to not run on global task pool (#4998)
hymm Jun 20, 2022
606635f
Add a `release_all` function to `Input`. (#5011)
Hoidigan Jun 20, 2022
5f6a290
Update `clap` to 3.2 in tools using `value_parser` (#5031)
mlodato517 Jun 20, 2022
ce10028
Fix redundant "have" in CONTRIBUTING (#5036)
mlodato517 Jun 20, 2022
ec9a481
Add `Input::reset_all` (#5015)
Hoidigan Jun 20, 2022
d74a318
Fix Nix section of linux_dependencies.md (#5050)
fluunke Jun 20, 2022
2381ba2
Fixed bevy_ui touch input (#4099)
ManevilleF Jun 20, 2022
d025d03
Improve entity and component API docs (#4767)
Nilirad Jun 21, 2022
4132b60
Change check_visibility to use thread-local queues instead of a chann…
james7132 Jun 21, 2022
34ae6ba
Mark mutable APIs under ECS storage as pub(crate) (#5065)
james7132 Jun 21, 2022
1eaee67
Callable PBR functions (#4939)
superdump Jun 21, 2022
f3eef7f
depend on dioxus(and bevy)-maintained fork of stretch (taffy) (#4716)
colepoirier Jun 21, 2022
4ab1465
Make the batch size more configurable
james7132 Jun 22, 2022
7bd1617
Allow reusing the same ParIter
james7132 Jun 22, 2022
8bc37a0
More complete docs
james7132 Jun 22, 2022
bc2c649
Merge branch 'main' into adaptive-batching
james7132 Jun 22, 2022
b57d547
More CI fixes
james7132 Jun 22, 2022
5556377
Merge branch 'main' into adaptive-batching
james7132 Nov 14, 2022
a82ff07
Fix CI
james7132 Nov 14, 2022
fd8fefa
Defer to for_each if there is zero or one threads
james7132 Nov 14, 2022
a751055
Merge branch 'main' into adaptive-batching
james7132 Dec 29, 2022
9111a00
Fix CI
james7132 Dec 29, 2022
266bfce
Merge branch 'main' into adaptive-batching
james7132 Jan 6, 2023
2cfcb16
Fix build
james7132 Jan 6, 2023
73e5dfc
Merge branch 'main' into adaptive-batching
james7132 Jan 18, 2023
acf2f5b
Formatting
james7132 Jan 18, 2023
015e201
Add documentation for BatchingStrategy
james7132 Jan 18, 2023
c6363ea
Merge branch 'main' into adaptive-batching
james7132 Jan 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions benches/benches/bevy_ecs/ecs_bench_suite/heavy_compute.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use bevy_ecs::prelude::*;
use bevy_tasks::TaskPool;
use bevy_tasks::{ComputeTaskPool, TaskPool};
use glam::*;

#[derive(Component, Copy, Clone)]
Expand Down Expand Up @@ -29,8 +29,8 @@ impl Benchmark {
)
}));

fn sys(task_pool: Res<TaskPool>, mut query: Query<(&mut Position, &mut Transform)>) {
query.par_for_each_mut(&task_pool, 128, |(mut pos, mut mat)| {
fn sys(mut query: Query<(&mut Position, &mut Transform)>) {
query.par_iter_mut().for_each_mut(|(mut pos, mut mat)| {
for _ in 0..100 {
mat.0 = mat.0.inverse();
}
Expand All @@ -39,7 +39,7 @@ impl Benchmark {
});
}

world.insert_resource(TaskPool::default());
world.insert_resource(ComputeTaskPool(TaskPool::default()));
let mut system = IntoSystem::into_system(sys);
system.initialize(&mut world);
system.update_archetype_component_access(&world);
Expand Down
20 changes: 9 additions & 11 deletions crates/bevy_ecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ mod tests {
query::{Added, ChangeTrackers, Changed, FilteredAccess, With, Without, WorldQuery},
world::{Mut, World},
};
use bevy_tasks::TaskPool;
use bevy_tasks::{ComputeTaskPool, TaskPool};
use std::{
any::TypeId,
sync::{
Expand Down Expand Up @@ -376,7 +376,7 @@ mod tests {
#[test]
fn par_for_each_dense() {
let mut world = World::new();
let task_pool = TaskPool::default();
world.insert_resource(ComputeTaskPool(TaskPool::default()));
let e1 = world.spawn().insert(A(1)).id();
let e2 = world.spawn().insert(A(2)).id();
let e3 = world.spawn().insert(A(3)).id();
Expand All @@ -385,7 +385,8 @@ mod tests {
let results = Arc::new(Mutex::new(Vec::new()));
world
.query::<(Entity, &A)>()
.par_for_each(&world, &task_pool, 2, |(e, &A(i))| {
.par_iter(&world)
.for_each(|(e, &A(i))| {
results.lock().unwrap().push((e, i));
});
results.lock().unwrap().sort();
Expand All @@ -398,20 +399,17 @@ mod tests {
#[test]
fn par_for_each_sparse() {
let mut world = World::new();

let task_pool = TaskPool::default();
world.insert_resource(ComputeTaskPool(TaskPool::default()));
let e1 = world.spawn().insert(SparseStored(1)).id();
let e2 = world.spawn().insert(SparseStored(2)).id();
let e3 = world.spawn().insert(SparseStored(3)).id();
let e4 = world.spawn().insert_bundle((SparseStored(4), A(1))).id();
let e5 = world.spawn().insert_bundle((SparseStored(5), A(1))).id();
let results = Arc::new(Mutex::new(Vec::new()));
world.query::<(Entity, &SparseStored)>().par_for_each(
&world,
&task_pool,
2,
|(e, &SparseStored(i))| results.lock().unwrap().push((e, i)),
);
world
.query::<(Entity, &SparseStored)>()
.par_iter(&world)
.for_each(|(e, &SparseStored(i))| results.lock().unwrap().push((e, i)));
results.lock().unwrap().sort();
assert_eq!(
&*results.lock().unwrap(),
Expand Down
2 changes: 2 additions & 0 deletions crates/bevy_ecs/src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ mod access;
mod fetch;
mod filter;
mod iter;
mod par_iter;
mod state;

pub use access::*;
pub use fetch::*;
pub use filter::*;
pub use iter::*;
pub use par_iter::*;
pub use state::*;

#[allow(unreachable_code)]
Expand Down
135 changes: 135 additions & 0 deletions crates/bevy_ecs/src/query/par_iter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use crate::world::World;

use super::{Fetch, QueryFetch, QueryItem, QueryState, ROQueryFetch, ROQueryItem, WorldQuery};

pub struct QueryParIter<'w, 's, Q: WorldQuery, QF: Fetch<'w, State = Q::State>, F: WorldQuery> {
pub(crate) world: &'w World,
pub(crate) state: &'s QueryState<Q, F>,
pub(crate) batch_size: Option<usize>,
pub(crate) marker_: std::marker::PhantomData<fn() -> QF>,
}

impl<'w, 's, Q: WorldQuery, QF, F: WorldQuery> QueryParIter<'w, 's, Q, QF, F>
where
QF: Fetch<'w, State = Q::State>,
{
pub fn batch_size(mut self, batch_size: usize) -> Self {
james7132 marked this conversation as resolved.
Show resolved Hide resolved
self.batch_size = Some(batch_size);
self
}

/// Runs `func` on each query result in parallel.
///
/// This can only be called for read-only queries, see [`Self::for_each_mut`] for
/// write-queries.
///
/// # Panics
/// The [`ComputeTaskPool`] resource must be added to the `World` before using this method. If using this from a query
/// that is being initialized and run from the ECS scheduler, this should never panic.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
pub fn for_each<FN: Fn(ROQueryItem<'w, Q>) + Send + Sync + Clone>(self, func: FN) {
let batch_size = match self.batch_size.or_else(|| self.get_default_batch_size()) {
Some(batch_size) => batch_size.max(1),
None => return,
};
// SAFETY: query is read only
unsafe {
self.state
.par_for_each_unchecked_manual::<ROQueryFetch<Q>, FN>(
self.world,
batch_size,
func,
self.world.last_change_tick(),
self.world.read_change_tick(),
);
}
}

/// Runs `func` on each query result in parallel.
///
/// # Panics
/// The [`ComputeTaskPool`] resource must be added to the `World` before using this method. If using this from a query
/// that is being initialized and run from the ECS scheduler, this should never panic.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
pub fn for_each_mut<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(self, func: FN) {
let batch_size = match self.batch_size.or_else(|| self.get_default_batch_size()) {
Some(batch_size) => batch_size.max(1),
None => return,
};
// SAFETY: query has unique world access
unsafe {
self.state
.par_for_each_unchecked_manual::<QueryFetch<Q>, FN>(
self.world,
batch_size,
func,
self.world.last_change_tick(),
self.world.read_change_tick(),
);
}
}

/// Runs `func` on each query result in parallel.
///
/// # Panics
/// The [`ComputeTaskPool`] resource must be added to the `World` before using this method. If using this from a query
/// that is being initialized and run from the ECS scheduler, this should never panic.
///
/// # Safety
///
/// This does not check for mutable query correctness. To be safe, make sure mutable queries
/// have unique access to the components they query.
///
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
#[inline]
pub unsafe fn for_each_unchecked<FN: Fn(QueryItem<'w, Q>) + Send + Sync + Clone>(
self,
func: FN,
) {
let batch_size = match self.batch_size.or_else(|| self.get_default_batch_size()) {
Some(batch_size) => batch_size.max(1),
None => return,
};
self.state
.par_for_each_unchecked_manual::<QueryFetch<Q>, FN>(
self.world,
batch_size,
func,
self.world.last_change_tick(),
self.world.read_change_tick(),
);
}

fn get_default_batch_size(&self) -> Option<usize> {
let thread_count = self
.state
.task_pool
.as_ref()
.map(|pool| pool.thread_num())
.unwrap_or(0);
assert!(
thread_count > 0,
"Attempted to run parallel iteration over a query with an empty TaskPool"
);
let max_size = if QF::IS_DENSE && <QueryFetch<'static, F>>::IS_DENSE {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the new ExactSizeIterator APIs here? Do we want to?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately ExactSizeIterator computes the sum of all table/archetype sizes, this computes the max of them instead.

let tables = &self.world.storages().tables;
self.state
.matched_table_ids
.iter()
.map(|id| tables[*id].len())
.max()
} else {
let archetypes = &self.world.archetypes();
self.state
.matched_archetype_ids
.iter()
.map(|id| archetypes[*id].len())
.max()
};
max_size.map(|max| max / thread_count)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think max / thread_count is too coarse. Consider the case where we have one archetype, if for any reason we don't get all the threads, the task will take twice as long to complete. This is because one of the threads will have to do 2 batches instead of just one. We probably want some type of multiplier on the thread count so we get like 4 or 5 batches per a thread. Probably needs some testing to know what that number is.

Copy link
Member Author

@james7132 james7132 May 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Set one up at a default of 4 per thread for now. Ultimately it comes down to how much of a cost the overhead of scheduling a new task onto a thread is. If that overhead is small, we can afford to be much more granular with how we schedule each one.

As I proposed on Discord, it might be a good idea to schedule one task per thread, and just push the archetypes/batches through a SPMC channel. It'd be likely be lighter weight than scheduling a new task per archetype or batch.

}
}
Loading