Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Global TaskPool API improvements #10008

Merged
merged 5 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/benches/bevy_ecs/iteration/heavy_compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn heavy_compute(c: &mut Criterion) {
group.warm_up_time(std::time::Duration::from_millis(500));
group.measurement_time(std::time::Duration::from_secs(4));
group.bench_function("base", |b| {
ComputeTaskPool::init(TaskPool::default);
ComputeTaskPool::get_or_init(TaskPool::default);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Half of the calls are with the default value, maybe add a .get_or_init_default method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My intuition is that a lot of these calls could just be replaced with calls to the panicking getter, but you would have to look at the context of each call to be sure, which is probably out of the scope of this PR.

Regardless, I don't think that a default initializer should be added, as TaskPoolOptions from bevy_core is the preferred way of initializing the global taskpools. We should be encouraging using that, and discouraging using TaskPool::default.

TaskPoolOptions could potentially be moved from bevy_core to bevy_tasks to facilitate using that instead of TaskPool::default, but that is definitely outside of the scope of this PR.


let mut world = World::default();

Expand Down
6 changes: 3 additions & 3 deletions crates/bevy_core/src/task_pool_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl TaskPoolOptions {
trace!("IO Threads: {}", io_threads);
remaining_threads = remaining_threads.saturating_sub(io_threads);

IoTaskPool::init(|| {
IoTaskPool::get_or_init(|| {
TaskPoolBuilder::default()
.num_threads(io_threads)
.thread_name("IO Task Pool".to_string())
Expand All @@ -124,7 +124,7 @@ impl TaskPoolOptions {
trace!("Async Compute Threads: {}", async_compute_threads);
remaining_threads = remaining_threads.saturating_sub(async_compute_threads);

AsyncComputeTaskPool::init(|| {
AsyncComputeTaskPool::get_or_init(|| {
TaskPoolBuilder::default()
.num_threads(async_compute_threads)
.thread_name("Async Compute Task Pool".to_string())
Expand All @@ -141,7 +141,7 @@ impl TaskPoolOptions {

trace!("Compute Threads: {}", compute_threads);

ComputeTaskPool::init(|| {
ComputeTaskPool::get_or_init(|| {
TaskPoolBuilder::default()
.num_threads(compute_threads)
.thread_name("Compute Task Pool".to_string())
Expand Down
4 changes: 2 additions & 2 deletions crates/bevy_ecs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ mod tests {

#[test]
fn par_for_each_dense() {
ComputeTaskPool::init(TaskPool::default);
ComputeTaskPool::get_or_init(TaskPool::default);
let mut world = World::new();
let e1 = world.spawn(A(1)).id();
let e2 = world.spawn(A(2)).id();
Expand All @@ -423,7 +423,7 @@ mod tests {

#[test]
fn par_for_each_sparse() {
ComputeTaskPool::init(TaskPool::default);
ComputeTaskPool::get_or_init(TaskPool::default);
let mut world = World::new();
let e1 = world.spawn(SparseStored(1)).id();
let e2 = world.spawn(SparseStored(2)).id();
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_ecs/src/schedule/executor/multi_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ impl SystemExecutor for MultiThreadedExecutor {
mut conditions,
} = SyncUnsafeSchedule::new(schedule);

ComputeTaskPool::init(TaskPool::default).scope_with_executor(
ComputeTaskPool::get_or_init(TaskPool::default).scope_with_executor(
false,
thread_executor,
|scope| {
Expand Down
2 changes: 1 addition & 1 deletion crates/bevy_ecs/src/schedule/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ mod tests {

let mut world = World::default();
let mut schedule = Schedule::default();
let thread_count = ComputeTaskPool::init(TaskPool::default).thread_num();
let thread_count = ComputeTaskPool::get_or_init(TaskPool::default).thread_num();

let barrier = Arc::new(Barrier::new(thread_count));

Expand Down
156 changes: 63 additions & 93 deletions crates/bevy_tasks/src/usages.rs
Original file line number Diff line number Diff line change
@@ -1,107 +1,77 @@
use super::TaskPool;
use std::{ops::Deref, sync::OnceLock};

static COMPUTE_TASK_POOL: OnceLock<ComputeTaskPool> = OnceLock::new();
static ASYNC_COMPUTE_TASK_POOL: OnceLock<AsyncComputeTaskPool> = OnceLock::new();
static IO_TASK_POOL: OnceLock<IoTaskPool> = OnceLock::new();

/// A newtype for a task pool for CPU-intensive work that must be completed to
/// deliver the next frame
///
/// See [`TaskPool`] documentation for details on Bevy tasks.
/// [`AsyncComputeTaskPool`] should be preferred if the work does not have to be
/// completed before the next frame.
#[derive(Debug)]
pub struct ComputeTaskPool(TaskPool);

impl ComputeTaskPool {
/// Initializes the global [`ComputeTaskPool`] instance.
pub fn init(f: impl FnOnce() -> TaskPool) -> &'static Self {
COMPUTE_TASK_POOL.get_or_init(|| Self(f()))
}

/// Gets the global [`ComputeTaskPool`] instance.
///
/// # Panics
/// Panics if no pool has been initialized yet.
pub fn get() -> &'static Self {
COMPUTE_TASK_POOL.get().expect(
"A ComputeTaskPool has not been initialized yet. Please call \
ComputeTaskPool::init beforehand.",
)
}
macro_rules! taskpool {
($(#[$attr:meta])* ($static:ident, $type:ident)) => {
static $static: OnceLock<$type> = OnceLock::new();

$(#[$attr])*
#[derive(Debug)]
pub struct $type(TaskPool);

impl $type {
#[doc = concat!(" Gets the global [`", stringify!($type), "`] instance, or initializes it with `f`.")]
pub fn get_or_init(f: impl FnOnce() -> TaskPool) -> &'static Self {
$static.get_or_init(|| Self(f()))
}

#[doc = concat!(" Attempts to get the global [`", stringify!($type), "`] instance, \
or returns `None` if it is not initialized.")]
pub fn try_get() -> Option<&'static Self> {
$static.get()
}

#[doc = concat!(" Gets the global [`", stringify!($type), "`] instance.")]
#[doc = ""]
#[doc = " # Panics"]
#[doc = " Panics if the global instance has not been initialized yet."]
pub fn get() -> &'static Self {
$static.get().expect(
concat!(
"The ",
stringify!($type),
" has not been initialized yet. Please call ",
stringify!($type),
"::get_or_init beforehand."
)
)
}
}

impl Deref for $type {
type Target = TaskPool;

fn deref(&self) -> &Self::Target {
&self.0
}
}
};
}

impl Deref for ComputeTaskPool {
type Target = TaskPool;

fn deref(&self) -> &Self::Target {
&self.0
}
}

/// A newtype for a task pool for CPU-intensive work that may span across multiple frames
///
/// See [`TaskPool`] documentation for details on Bevy tasks. Use [`ComputeTaskPool`] if
/// the work must be complete before advancing to the next frame.
#[derive(Debug)]
pub struct AsyncComputeTaskPool(TaskPool);

impl AsyncComputeTaskPool {
/// Initializes the global [`AsyncComputeTaskPool`] instance.
pub fn init(f: impl FnOnce() -> TaskPool) -> &'static Self {
ASYNC_COMPUTE_TASK_POOL.get_or_init(|| Self(f()))
}

/// Gets the global [`AsyncComputeTaskPool`] instance.
taskpool! {
/// A newtype for a task pool for CPU-intensive work that must be completed to
/// deliver the next frame
///
/// # Panics
/// Panics if no pool has been initialized yet.
pub fn get() -> &'static Self {
ASYNC_COMPUTE_TASK_POOL.get().expect(
"A AsyncComputeTaskPool has not been initialized yet. Please call \
AsyncComputeTaskPool::init beforehand.",
)
}
}

impl Deref for AsyncComputeTaskPool {
type Target = TaskPool;

fn deref(&self) -> &Self::Target {
&self.0
}
/// See [`TaskPool`] documentation for details on Bevy tasks.
/// [`AsyncComputeTaskPool`] should be preferred if the work does not have to be
/// completed before the next frame.
(COMPUTE_TASK_POOL, ComputeTaskPool)
}

/// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a
/// "woken" state)
#[derive(Debug)]
pub struct IoTaskPool(TaskPool);

impl IoTaskPool {
/// Initializes the global [`IoTaskPool`] instance.
pub fn init(f: impl FnOnce() -> TaskPool) -> &'static Self {
IO_TASK_POOL.get_or_init(|| Self(f()))
}

/// Gets the global [`IoTaskPool`] instance.
taskpool! {
/// A newtype for a task pool for CPU-intensive work that may span across multiple frames
///
/// # Panics
/// Panics if no pool has been initialized yet.
pub fn get() -> &'static Self {
IO_TASK_POOL.get().expect(
"A IoTaskPool has not been initialized yet. Please call \
IoTaskPool::init beforehand.",
)
}
/// See [`TaskPool`] documentation for details on Bevy tasks.
/// Use [`ComputeTaskPool`] if the work must be complete before advancing to the next frame.
(ASYNC_COMPUTE_TASK_POOL, AsyncComputeTaskPool)
}

impl Deref for IoTaskPool {
type Target = TaskPool;

fn deref(&self) -> &Self::Target {
&self.0
}
taskpool! {
/// A newtype for a task pool for IO-intensive work (i.e. tasks that spend very little time in a
/// "woken" state)
///
/// See [`TaskPool`] documentation for details on Bevy tasks.
(IO_TASK_POOL, IoTaskPool)
}

/// A function used by `bevy_core` to tick the global tasks pools on the main thread.
Expand Down
10 changes: 5 additions & 5 deletions crates/bevy_transform/src/systems.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ mod test {

#[test]
fn correct_parent_removed() {
ComputeTaskPool::init(TaskPool::default);
ComputeTaskPool::get_or_init(TaskPool::default);
let mut world = World::default();
let offset_global_transform =
|offset| GlobalTransform::from(Transform::from_xyz(offset, offset, offset));
Expand Down Expand Up @@ -248,7 +248,7 @@ mod test {

#[test]
fn did_propagate() {
ComputeTaskPool::init(TaskPool::default);
ComputeTaskPool::get_or_init(TaskPool::default);
let mut world = World::default();

let mut schedule = Schedule::default();
Expand Down Expand Up @@ -326,7 +326,7 @@ mod test {

#[test]
fn correct_children() {
ComputeTaskPool::init(TaskPool::default);
ComputeTaskPool::get_or_init(TaskPool::default);
let mut world = World::default();

let mut schedule = Schedule::default();
Expand Down Expand Up @@ -404,7 +404,7 @@ mod test {
#[test]
fn correct_transforms_when_no_children() {
let mut app = App::new();
ComputeTaskPool::init(TaskPool::default);
ComputeTaskPool::get_or_init(TaskPool::default);

app.add_systems(Update, (sync_simple_transforms, propagate_transforms));

Expand Down Expand Up @@ -446,7 +446,7 @@ mod test {
#[test]
#[should_panic]
fn panic_when_hierarchy_cycle() {
ComputeTaskPool::init(TaskPool::default);
ComputeTaskPool::get_or_init(TaskPool::default);
// We cannot directly edit Parent and Children, so we use a temp world to break
// the hierarchy's invariants.
let mut temp = World::new();
Expand Down