Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Nested spawns on scope #4466

Closed
wants to merge 30 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
31f8e41
convert scope.spawn_local to scoped executor
hymm Apr 9, 2022
f02b982
copy test from #4343
Apr 11, 2022
c69dc7c
convert to using a concurrent queue for interior mutability
Apr 11, 2022
a08dc5b
remove optimization
Apr 12, 2022
4f46d10
remove if statement
Apr 12, 2022
b93d70e
check locality of scope_on_scope for nested spawns
Apr 12, 2022
eeb6aeb
update some comments
Apr 12, 2022
666f2d2
transmute scope ref to &'scope to get rid of Arc
Apr 12, 2022
27187d9
clarify safety comment
Apr 12, 2022
69fa59b
add a test for UB
Apr 12, 2022
069750f
copy lifetimes from std scope
Apr 13, 2022
ffc9688
add compile fail test
Apr 13, 2022
79c4b4e
clean up comments
Apr 13, 2022
ea2fcef
change single threaded task pool type signatures to match changes
Apr 14, 2022
b523732
fix doc comment
Apr 14, 2022
575ae94
change Arc to immutable reference
Apr 14, 2022
2aae929
add another compile fail test
hymm Apr 16, 2022
aa051ec
match lifetime of return value to lifetimes of futures
hymm Apr 18, 2022
97da059
rework executor code for performance
hymm Apr 22, 2022
624e282
add invariance of lifetimes to single threaded executor
hymm Apr 23, 2022
b2b6756
add some docs about the lifetimes
hymm Apr 23, 2022
4f38e53
fix doc link
hymm Apr 23, 2022
f60d41d
fix some issues from rebase
hymm Apr 25, 2022
0279dd1
run cargo fmt
hymm Apr 25, 2022
b48b056
fix new clippy error
hymm Jun 4, 2022
57bf8ca
revert change to executor
hymm Jun 10, 2022
2e6e438
fix rebase formatting issues
hymm Jun 10, 2022
243ae2f
change lifetimes to match std
hymm Apr 21, 2022
e4ad01d
fix rebase error
hymm Sep 14, 2022
a73f6e9
Merge remote-tracking branch 'origin/main' into pr/hymm/4466
cart Sep 27, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions crates/bevy_ecs/src/schedule/executor_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ impl ParallelExecutor {
/// queues systems with no dependencies to run (or skip) at next opportunity.
fn prepare_systems<'scope>(
&mut self,
scope: &mut Scope<'scope, ()>,
scope: &Scope<'_, 'scope, ()>,
systems: &'scope mut [SystemContainer],
world: &'scope World,
) {
Expand Down Expand Up @@ -236,7 +236,7 @@ impl ParallelExecutor {
if system_data.is_send {
scope.spawn(task);
} else {
scope.spawn_local(task);
scope.spawn_on_scope(task);
}

#[cfg(test)]
Expand Down Expand Up @@ -271,7 +271,7 @@ impl ParallelExecutor {
if system_data.is_send {
scope.spawn(task);
} else {
scope.spawn_local(task);
scope.spawn_on_scope(task);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ futures-lite = "1.4.0"
async-executor = "1.3.0"
async-channel = "1.4.2"
once_cell = "1.7"
concurrent-queue = "1.2.2"

[target.'cfg(target_arch = "wasm32")'.dependencies]
wasm-bindgen-futures = "0.4"
Expand Down
48 changes: 30 additions & 18 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
future::Future,
marker::PhantomData,
mem,
sync::{Arc, Mutex},
};
Expand Down Expand Up @@ -61,27 +62,34 @@ impl TaskPool {
/// to spawn tasks. This function will await the completion of all tasks before returning.
///
/// This is similar to `rayon::scope` and `crossbeam::scope`
pub fn scope<'scope, F, T>(&self, f: F) -> Vec<T>
pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
hymm marked this conversation as resolved.
Show resolved Hide resolved
where
F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send,
F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
T: Send + 'static,
{
let executor = &async_executor::LocalExecutor::new();
let executor: &'scope async_executor::LocalExecutor<'scope> =
let executor: &'env async_executor::LocalExecutor<'env> =
unsafe { mem::transmute(executor) };

let results: Mutex<Vec<Arc<Mutex<Option<T>>>>> = Mutex::new(Vec::new());
let results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>> = unsafe { mem::transmute(&results) };

let mut scope = Scope {
executor,
results: Vec::new(),
results,
scope: PhantomData,
env: PhantomData,
};

f(&mut scope);
let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) };

f(scope_ref);

// Loop until all tasks are done
while executor.try_tick() {}

scope
.results
let results = scope.results.lock().unwrap();
results
.iter()
.map(|result| result.lock().unwrap().take().unwrap())
.collect()
Expand Down Expand Up @@ -127,32 +135,36 @@ impl FakeTask {
///
/// For more information, see [`TaskPool::scope`].
#[derive(Debug)]
pub struct Scope<'scope, T> {
executor: &'scope async_executor::LocalExecutor<'scope>,
pub struct Scope<'scope, 'env: 'scope, T> {
executor: &'env async_executor::LocalExecutor<'env>,
// Vector to gather results of all futures spawned during scope run
results: Vec<Arc<Mutex<Option<T>>>>,
results: &'env Mutex<Vec<Arc<Mutex<Option<T>>>>>,

// make `Scope` invariant over 'scope and 'env
scope: PhantomData<&'scope mut &'scope ()>,
env: PhantomData<&'env mut &'env ()>,
}

impl<'scope, T: Send + 'scope> Scope<'scope, T> {
impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
/// Spawns a scoped future onto the thread-local executor. The scope *must* outlive
/// the provided future. The results of the future will be returned as a part of
/// [`TaskPool::scope`]'s return value.
///
/// On the single threaded task pool, it just calls [`Scope::spawn_local`].
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&mut self, f: Fut) {
self.spawn_local(f);
pub fn spawn<Fut: Future<Output = T> + 'env>(&self, f: Fut) {
self.spawn_on_scope(f);
}

/// Spawns a scoped future onto the thread-local executor. The scope *must* outlive
/// the provided future. The results of the future will be returned as a part of
/// [`TaskPool::scope`]'s return value.
/// Spawns a scoped future that runs on the thread the scope called from. The
/// scope *must* outlive the provided future. The results of the future will be
/// returned as a part of [`TaskPool::scope`]'s return value.
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_local<Fut: Future<Output = T> + 'scope>(&mut self, f: Fut) {
pub fn spawn_on_scope<Fut: Future<Output = T> + 'env>(&self, f: Fut) {
let result = Arc::new(Mutex::new(None));
self.results.push(result.clone());
self.results.lock().unwrap().push(result.clone());
let f = async move {
result.lock().unwrap().replace(f.await);
};
Expand Down
Loading