From 31f8e41b6d0cf378cb07918a67b8300ef675c8be Mon Sep 17 00:00:00 2001 From: Mike Hsu Date: Sat, 9 Apr 2022 14:15:53 -0700 Subject: [PATCH 01/29] convert scope.spawn_local to scoped executor Co-authored-by: TheRawMeatball --- .../src/schedule/executor_parallel.rs | 2 +- crates/bevy_tasks/src/task_pool.rs | 117 +++++++++--------- 2 files changed, 59 insertions(+), 60 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 96b272d03f2c8..db30f51738773 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -271,7 +271,7 @@ impl ParallelExecutor { if system_data.is_send { scope.spawn(task); } else { - scope.spawn_local(task); + scope.spawn_on_scope(task); } } } diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index ff4d5c6309792..c7cdbc87dd981 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -145,64 +145,63 @@ impl TaskPool { F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, T: Send + 'static, { - TaskPool::LOCAL_EXECUTOR.with(|local_executor| { - // SAFETY: This function blocks until all futures complete, so this future must return - // before this function returns. However, rust has no way of knowing - // this so we must convert to 'static here to appease the compiler as it is unable to - // validate safety. - let executor: &async_executor::Executor = &*self.executor; - let executor: &'scope async_executor::Executor = unsafe { mem::transmute(executor) }; - let local_executor: &'scope async_executor::LocalExecutor = - unsafe { mem::transmute(local_executor) }; - let mut scope = Scope { - executor, - local_executor, - spawned: Vec::new(), - }; + // SAFETY: This function blocks until all futures complete, so this future must return + // before this function returns. However, rust has no way of knowing + // this so we must convert to 'static here to appease the compiler as it is unable to + // validate safety. + let executor: &async_executor::Executor = &*self.executor; + let executor: &'scope async_executor::Executor = unsafe { mem::transmute(executor) }; + let task_scope_executor = &async_executor::Executor::default(); + let task_scope_executor: &'scope async_executor::Executor = + unsafe { mem::transmute(task_scope_executor) }; + let mut scope = Scope { + executor, + task_scope_executor, + spawned: Vec::new(), + }; + + f(&mut scope); + + if scope.spawned.is_empty() { + Vec::default() + } else if scope.spawned.len() == 1 { + vec![future::block_on(&mut scope.spawned[0])] + } else { + let fut = async move { + let mut results = Vec::with_capacity(scope.spawned.len()); + for task in scope.spawned { + results.push(task.await); + } - f(&mut scope); - - if scope.spawned.is_empty() { - Vec::default() - } else if scope.spawned.len() == 1 { - vec![future::block_on(&mut scope.spawned[0])] - } else { - let fut = async move { - let mut results = Vec::with_capacity(scope.spawned.len()); - for task in scope.spawned { - results.push(task.await); - } + results + }; - results + // Pin the futures on the stack. + pin!(fut); + + // SAFETY: This function blocks until all futures complete, so we do not read/write + // the data from futures outside of the 'scope lifetime. However, + // rust has no way of knowing this so we must convert to 'static + // here to appease the compiler as it is unable to validate safety. + let fut: Pin<&mut (dyn Future> + 'static + Send)> = fut; + let fut: Pin<&'static mut (dyn Future> + 'static + Send)> = + unsafe { mem::transmute(fut) }; + + // The thread that calls scope() will participate in driving tasks in the pool + // forward until the tasks that are spawned by this scope() call + // complete. (If the caller of scope() happens to be a thread in + // this thread pool, and we only have one thread in the pool, then + // simply calling future::block_on(spawned) would deadlock.) + let mut spawned = task_scope_executor.spawn(fut); + loop { + if let Some(result) = future::block_on(future::poll_once(&mut spawned)) { + break result; }; - // Pin the futures on the stack. - pin!(fut); - - // SAFETY: This function blocks until all futures complete, so we do not read/write - // the data from futures outside of the 'scope lifetime. However, - // rust has no way of knowing this so we must convert to 'static - // here to appease the compiler as it is unable to validate safety. - let fut: Pin<&mut (dyn Future>)> = fut; - let fut: Pin<&'static mut (dyn Future> + 'static)> = - unsafe { mem::transmute(fut) }; - - // The thread that calls scope() will participate in driving tasks in the pool - // forward until the tasks that are spawned by this scope() call - // complete. (If the caller of scope() happens to be a thread in - // this thread pool, and we only have one thread in the pool, then - // simply calling future::block_on(spawned) would deadlock.) - let mut spawned = local_executor.spawn(fut); - loop { - if let Some(result) = future::block_on(future::poll_once(&mut spawned)) { - break result; - }; - - self.executor.try_tick(); - local_executor.try_tick(); - } + self.executor.try_tick(); + task_scope_executor.try_tick(); } - }) + } } /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be @@ -256,7 +255,7 @@ impl Drop for TaskPool { #[derive(Debug)] pub struct Scope<'scope, T> { executor: &'scope async_executor::Executor<'scope>, - local_executor: &'scope async_executor::LocalExecutor<'scope>, + task_scope_executor: &'scope async_executor::Executor<'scope>, spawned: Vec>, } @@ -280,8 +279,8 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// [`Scope::spawn`] instead, unless the provided future is not `Send`. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_local + 'scope>(&mut self, f: Fut) { - let task = self.local_executor.spawn(f); + pub fn spawn_on_scope + 'scope + Send>(&mut self, f: Fut) { + let task = self.task_scope_executor.spawn(f); self.spawned.push(task); } } @@ -327,7 +326,7 @@ mod tests { } #[test] - fn test_mixed_spawn_local_and_spawn() { + fn test_mixed_spawn_on_scope_and_spawn() { let pool = TaskPool::new(); let foo = Box::new(42); @@ -350,7 +349,7 @@ mod tests { }); } else { let count_clone = local_count.clone(); - scope.spawn_local(async move { + scope.spawn_on_scope(async move { if *foo != 42 { panic!("not 42!?!?") } else { @@ -391,7 +390,7 @@ mod tests { }); let spawner = std::thread::current().id(); let inner_count_clone = count_clone.clone(); - scope.spawn_local(async move { + scope.spawn_on_scope(async move { inner_count_clone.fetch_add(1, Ordering::Release); if std::thread::current().id() != spawner { // NOTE: This check is using an atomic rather than simply panicing the From f02b9823cdcb0b727574c262201bee2c1c70e109 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Mon, 11 Apr 2022 12:53:00 -0700 Subject: [PATCH 02/29] copy test from #4343 Co-authored-by: MiniaczQ --- crates/bevy_tasks/src/task_pool.rs | 39 ++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index c7cdbc87dd981..104f49f7797ab 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -406,4 +406,43 @@ mod tests { assert!(!thread_check_failed.load(Ordering::Acquire)); assert_eq!(count.load(Ordering::Acquire), 200); } + + #[test] + fn test_nested_spawn() { + let pool = TaskPool::new(); + + let foo = Box::new(42); + let foo = &*foo; + + let count = Arc::new(AtomicI32::new(0)); + + let outputs: Vec = pool.scope(|scope| { + for _ in 0..10 { + let count_clone = count.clone(); + let scope = scope.clone(); + scope.clone().spawn(async move { + for _ in 0..10 { + let count_clone_clone = count_clone.clone(); + scope.spawn(async move { + if *foo != 42 { + panic!("not 42!?!?") + } else { + count_clone_clone.fetch_add(1, Ordering::Relaxed); + *foo + } + }); + } + *foo + }); + } + }).collect(); + + for output in &outputs { + assert_eq!(*output, 42); + } + + // the inner loop runs 100 times and the outer one runs 10. 100 + 10 + assert_eq!(outputs.len(), 110); + assert_eq!(count.load(Ordering::Relaxed), 100); + } } From c69dc7c9a0aff354e1d05a2f6681c0318e3195b5 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Mon, 11 Apr 2022 14:38:41 -0700 Subject: [PATCH 03/29] convert to using a concurrent queue for interior mutability --- .../src/schedule/executor_parallel.rs | 6 ++- crates/bevy_tasks/Cargo.toml | 1 + crates/bevy_tasks/src/task_pool.rs | 40 ++++++++++++------- 3 files changed, 30 insertions(+), 17 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index db30f51738773..6a9c952fa9eab 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ archetype::ArchetypeComponentId, query::Access, @@ -125,7 +127,7 @@ impl ParallelSystemExecutor for ParallelExecutor { } ComputeTaskPool::init(TaskPool::default).scope(|scope| { - self.prepare_systems(scope, systems, world); + self.prepare_systems(scope.clone(), systems, world); if self.should_run.count_ones(..) == 0 { return; } @@ -166,7 +168,7 @@ impl ParallelExecutor { /// queues systems with no dependencies to run (or skip) at next opportunity. fn prepare_systems<'scope>( &mut self, - scope: &mut Scope<'scope, ()>, + scope: Arc>, systems: &'scope mut [ParallelSystemContainer], world: &'scope World, ) { diff --git a/crates/bevy_tasks/Cargo.toml b/crates/bevy_tasks/Cargo.toml index 2fc20552bf1ae..0d273bf855c44 100644 --- a/crates/bevy_tasks/Cargo.toml +++ b/crates/bevy_tasks/Cargo.toml @@ -15,6 +15,7 @@ async-executor = "1.3.0" async-channel = "1.4.2" num_cpus = "1" once_cell = "1.7" +concurrent-queue = "1.2.2" [target.'cfg(target_arch = "wasm32")'.dependencies] wasm-bindgen-futures = "0.4" diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 104f49f7797ab..ac064464a89f2 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -6,6 +6,7 @@ use std::{ thread::{self, JoinHandle}, }; +use concurrent_queue::ConcurrentQueue; use futures_lite::{future, pin}; use crate::Task; @@ -142,7 +143,7 @@ impl TaskPool { /// This is similar to `rayon::scope` and `crossbeam::scope` pub fn scope<'scope, F, T>(&self, f: F) -> Vec where - F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, + F: FnOnce(Arc>) + 'scope + Send, T: Send + 'static, { // SAFETY: This function blocks until all futures complete, so this future must return @@ -154,22 +155,27 @@ impl TaskPool { let task_scope_executor = &async_executor::Executor::default(); let task_scope_executor: &'scope async_executor::Executor = unsafe { mem::transmute(task_scope_executor) }; - let mut scope = Scope { + + let spawned: ConcurrentQueue> = ConcurrentQueue::unbounded(); + // TODO: figure out if this is safe + let spawned_ref: &'scope ConcurrentQueue> = + unsafe { mem::transmute(&spawned) }; + let scope = Scope { executor, task_scope_executor, - spawned: Vec::new(), + spawned: spawned_ref, }; - f(&mut scope); + f(Arc::new(scope)); - if scope.spawned.is_empty() { + if spawned.is_empty() { Vec::default() - } else if scope.spawned.len() == 1 { - vec![future::block_on(&mut scope.spawned[0])] + } else if spawned.len() == 1 { + vec![future::block_on(&mut spawned.pop().unwrap())] } else { let fut = async move { - let mut results = Vec::with_capacity(scope.spawned.len()); - for task in scope.spawned { + let mut results = Vec::with_capacity(spawned.len()); + while let Ok(task) = spawned.pop() { results.push(task.await); } @@ -256,7 +262,7 @@ impl Drop for TaskPool { pub struct Scope<'scope, T> { executor: &'scope async_executor::Executor<'scope>, task_scope_executor: &'scope async_executor::Executor<'scope>, - spawned: Vec>, + spawned: &'scope ConcurrentQueue>, } impl<'scope, T: Send + 'scope> Scope<'scope, T> { @@ -268,9 +274,11 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// instead. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn + 'scope + Send>(&mut self, f: Fut) { + pub fn spawn + 'scope + Send>(&self, f: Fut) { let task = self.executor.spawn(f); - self.spawned.push(task); + // ConcurrentQueue only errors when closed or full, but we never + // close and use an unbouded queue, so it is safe to unwrap + self.spawned.push(task).unwrap(); } /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive @@ -279,9 +287,11 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// [`Scope::spawn`] instead, unless the provided future is not `Send`. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_scope + 'scope + Send>(&mut self, f: Fut) { + pub fn spawn_on_scope + 'scope + Send>(&self, f: Fut) { let task = self.task_scope_executor.spawn(f); - self.spawned.push(task); + // ConcurrentQueue only errors when closed or full, but we never + // close and use an unbouded queue, so it is safe to unwrap + self.spawned.push(task).unwrap(); } } @@ -435,7 +445,7 @@ mod tests { *foo }); } - }).collect(); + }); for output in &outputs { assert_eq!(*output, 42); From a08dc5b9803d8e07c75a4bafe3faba8a94a655b6 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Mon, 11 Apr 2022 17:34:23 -0700 Subject: [PATCH 04/29] remove optimization not correct as a single task can spawn more tasks --- crates/bevy_tasks/src/task_pool.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index ac064464a89f2..1b519c8d45f40 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -170,8 +170,6 @@ impl TaskPool { if spawned.is_empty() { Vec::default() - } else if spawned.len() == 1 { - vec![future::block_on(&mut spawned.pop().unwrap())] } else { let fut = async move { let mut results = Vec::with_capacity(spawned.len()); From 4f46d10e8dbbca7b5c7ca57a133a9fe2c99059bc Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Mon, 11 Apr 2022 19:10:18 -0700 Subject: [PATCH 05/29] remove if statement better perf on more than one spawn, at cost to one spawn or zero spawns being slower --- crates/bevy_tasks/src/task_pool.rs | 64 ++++++++++++++---------------- 1 file changed, 30 insertions(+), 34 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 1b519c8d45f40..5c09220d8ffd2 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -168,43 +168,39 @@ impl TaskPool { f(Arc::new(scope)); - if spawned.is_empty() { - Vec::default() - } else { - let fut = async move { - let mut results = Vec::with_capacity(spawned.len()); - while let Ok(task) = spawned.pop() { - results.push(task.await); - } + let fut = async move { + let mut results = Vec::with_capacity(spawned.len()); + while let Ok(task) = spawned.pop() { + results.push(task.await); + } - results - }; + results + }; - // Pin the futures on the stack. - pin!(fut); - - // SAFETY: This function blocks until all futures complete, so we do not read/write - // the data from futures outside of the 'scope lifetime. However, - // rust has no way of knowing this so we must convert to 'static - // here to appease the compiler as it is unable to validate safety. - let fut: Pin<&mut (dyn Future> + 'static + Send)> = fut; - let fut: Pin<&'static mut (dyn Future> + 'static + Send)> = - unsafe { mem::transmute(fut) }; - - // The thread that calls scope() will participate in driving tasks in the pool - // forward until the tasks that are spawned by this scope() call - // complete. (If the caller of scope() happens to be a thread in - // this thread pool, and we only have one thread in the pool, then - // simply calling future::block_on(spawned) would deadlock.) - let mut spawned = task_scope_executor.spawn(fut); - loop { - if let Some(result) = future::block_on(future::poll_once(&mut spawned)) { - break result; - }; + // Pin the futures on the stack. + pin!(fut); + + // SAFETY: This function blocks until all futures complete, so we do not read/write + // the data from futures outside of the 'scope lifetime. However, + // rust has no way of knowing this so we must convert to 'static + // here to appease the compiler as it is unable to validate safety. + let fut: Pin<&mut (dyn Future> + 'static + Send)> = fut; + let fut: Pin<&'static mut (dyn Future> + 'static + Send)> = + unsafe { mem::transmute(fut) }; + + // The thread that calls scope() will participate in driving tasks in the pool + // forward until the tasks that are spawned by this scope() call + // complete. (If the caller of scope() happens to be a thread in + // this thread pool, and we only have one thread in the pool, then + // simply calling future::block_on(spawned) would deadlock.) + let mut spawned = task_scope_executor.spawn(fut); + loop { + if let Some(result) = future::block_on(future::poll_once(&mut spawned)) { + break result; + }; - self.executor.try_tick(); - task_scope_executor.try_tick(); - } + self.executor.try_tick(); + task_scope_executor.try_tick(); } } From b93d70ed37390c30ffbfd2cd7f32bef777a00e40 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Mon, 11 Apr 2022 19:18:39 -0700 Subject: [PATCH 06/29] check locality of scope_on_scope for nested spawns --- crates/bevy_tasks/src/task_pool.rs | 38 ++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 5c09220d8ffd2..d63e62f9c43f5 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -449,4 +449,42 @@ mod tests { assert_eq!(outputs.len(), 110); assert_eq!(count.load(Ordering::Relaxed), 100); } + + #[test] + fn test_nested_locality() { + let pool = Arc::new(TaskPool::new()); + let count = Arc::new(AtomicI32::new(0)); + let barrier = Arc::new(Barrier::new(101)); + let thread_check_failed = Arc::new(AtomicBool::new(false)); + + for _ in 0..100 { + let inner_barrier = barrier.clone(); + let count_clone = count.clone(); + let inner_pool = pool.clone(); + let inner_thread_check_failed = thread_check_failed.clone(); + std::thread::spawn(move || { + inner_pool.scope(|scope| { + let spawner = std::thread::current().id(); + let inner_count_clone = count_clone.clone(); + scope.clone().spawn(async move { + inner_count_clone.fetch_add(1, Ordering::Release); + + // spawning on the scope from another thread runs the futures on the scope's thread + scope.spawn_on_scope(async move { + inner_count_clone.fetch_add(1, Ordering::Release); + if std::thread::current().id() != spawner { + // NOTE: This check is using an atomic rather than simply panicing the + // thread to avoid deadlocking the barrier on failure + inner_thread_check_failed.store(true, Ordering::Release); + } + }); + }); + }); + inner_barrier.wait(); + }); + } + barrier.wait(); + assert!(!thread_check_failed.load(Ordering::Acquire)); + assert_eq!(count.load(Ordering::Acquire), 200); + } } From eeb6aeb0c06f96d96dc20e08bb9e09f5a067db18 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Mon, 11 Apr 2022 20:13:48 -0700 Subject: [PATCH 07/29] update some comments --- crates/bevy_tasks/src/task_pool.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index d63e62f9c43f5..443a280ac48ea 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -157,16 +157,19 @@ impl TaskPool { unsafe { mem::transmute(task_scope_executor) }; let spawned: ConcurrentQueue> = ConcurrentQueue::unbounded(); - // TODO: figure out if this is safe let spawned_ref: &'scope ConcurrentQueue> = unsafe { mem::transmute(&spawned) }; - let scope = Scope { + // TODO: try to figure out if there's a way not to use an Arc here. + // It's needed because rust complains about "borrowed data escapes the closure", which + // makes sense since the futures run after the closure is run. It'd be nice if we could + // teach rust that it's ok since the future's return before 'scope is done. + let scope = Arc::new(Scope { executor, task_scope_executor, spawned: spawned_ref, - }; + }); - f(Arc::new(scope)); + f(scope); let fut = async move { let mut results = Vec::with_capacity(spawned.len()); @@ -275,10 +278,10 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { self.spawned.push(task).unwrap(); } - /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive + /// Spawns a scoped future onto the thread the scope is run on. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. Users should generally prefer to use - /// [`Scope::spawn`] instead, unless the provided future is not `Send`. + /// [`Scope::spawn`] instead, unless the provided future needs to run on the scope's thread. /// /// For more information, see [`TaskPool::scope`]. pub fn spawn_on_scope + 'scope + Send>(&self, f: Fut) { From 666f2d22326a0a2d615dbb874ed0e03b0a5eccb7 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Mon, 11 Apr 2022 20:47:06 -0700 Subject: [PATCH 08/29] transmute scope ref to &'scope to get rid of Arc --- crates/bevy_ecs/src/schedule/executor_parallel.rs | 6 ++---- crates/bevy_tasks/src/task_pool.rs | 15 ++++++++------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 6a9c952fa9eab..6e90ba7d44a31 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use crate::{ archetype::ArchetypeComponentId, query::Access, @@ -127,7 +125,7 @@ impl ParallelSystemExecutor for ParallelExecutor { } ComputeTaskPool::init(TaskPool::default).scope(|scope| { - self.prepare_systems(scope.clone(), systems, world); + self.prepare_systems(scope, systems, world); if self.should_run.count_ones(..) == 0 { return; } @@ -168,7 +166,7 @@ impl ParallelExecutor { /// queues systems with no dependencies to run (or skip) at next opportunity. fn prepare_systems<'scope>( &mut self, - scope: Arc>, + scope: &Scope<'scope, ()>, systems: &'scope mut [ParallelSystemContainer], world: &'scope World, ) { diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 443a280ac48ea..565071f61e868 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -143,7 +143,7 @@ impl TaskPool { /// This is similar to `rayon::scope` and `crossbeam::scope` pub fn scope<'scope, F, T>(&self, f: F) -> Vec where - F: FnOnce(Arc>) + 'scope + Send, + F: FnOnce(&'scope Scope<'scope, T>) + 'scope + Send, T: Send + 'static, { // SAFETY: This function blocks until all futures complete, so this future must return @@ -163,13 +163,15 @@ impl TaskPool { // It's needed because rust complains about "borrowed data escapes the closure", which // makes sense since the futures run after the closure is run. It'd be nice if we could // teach rust that it's ok since the future's return before 'scope is done. - let scope = Arc::new(Scope { + let scope = Scope { executor, task_scope_executor, spawned: spawned_ref, - }); + }; + + let scope_ref: &'scope Scope<'scope, T> = unsafe { mem::transmute(&scope) }; - f(scope); + f(scope_ref); let fut = async move { let mut results = Vec::with_capacity(spawned.len()); @@ -426,8 +428,7 @@ mod tests { let outputs: Vec = pool.scope(|scope| { for _ in 0..10 { let count_clone = count.clone(); - let scope = scope.clone(); - scope.clone().spawn(async move { + scope.spawn(async move { for _ in 0..10 { let count_clone_clone = count_clone.clone(); scope.spawn(async move { @@ -469,7 +470,7 @@ mod tests { inner_pool.scope(|scope| { let spawner = std::thread::current().id(); let inner_count_clone = count_clone.clone(); - scope.clone().spawn(async move { + scope.spawn(async move { inner_count_clone.fetch_add(1, Ordering::Release); // spawning on the scope from another thread runs the futures on the scope's thread From 27187d9e979785ef6cb4a08b2494f3e7e0366553 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Mon, 11 Apr 2022 22:08:34 -0700 Subject: [PATCH 09/29] clarify safety comment --- crates/bevy_tasks/src/task_pool.rs | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 565071f61e868..991e47289b12e 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -146,23 +146,20 @@ impl TaskPool { F: FnOnce(&'scope Scope<'scope, T>) + 'scope + Send, T: Send + 'static, { - // SAFETY: This function blocks until all futures complete, so this future must return - // before this function returns. However, rust has no way of knowing - // this so we must convert to 'static here to appease the compiler as it is unable to - // validate safety. + // SAFETY: This safety comment applies to all references transmuted to 'scope. + // Any futures spawned with these references need to return before this function completes. + // This is guaranteed because we drive all the futures spawned onto the Scope + // to completion in this function. However, rust has no way of knowing this so we + // transmute the lifetimes to 'scope here to appease the compiler as it is unable to validate safety. let executor: &async_executor::Executor = &*self.executor; let executor: &'scope async_executor::Executor = unsafe { mem::transmute(executor) }; let task_scope_executor = &async_executor::Executor::default(); let task_scope_executor: &'scope async_executor::Executor = unsafe { mem::transmute(task_scope_executor) }; - let spawned: ConcurrentQueue> = ConcurrentQueue::unbounded(); let spawned_ref: &'scope ConcurrentQueue> = unsafe { mem::transmute(&spawned) }; - // TODO: try to figure out if there's a way not to use an Arc here. - // It's needed because rust complains about "borrowed data escapes the closure", which - // makes sense since the futures run after the closure is run. It'd be nice if we could - // teach rust that it's ok since the future's return before 'scope is done. + let scope = Scope { executor, task_scope_executor, From 69fa59b2350d7f425d726b16170d85c773ac9f6b Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Mon, 11 Apr 2022 22:14:28 -0700 Subject: [PATCH 10/29] add a test for UB --- crates/bevy_tasks/src/task_pool.rs | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 991e47289b12e..fd4a6a0d57732 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -141,6 +141,7 @@ impl TaskPool { /// to spawn tasks. This function will await the completion of all tasks before returning. /// /// This is similar to `rayon::scope` and `crossbeam::scope` + // TODO: add compile fail test here for passing scope to a thread not spawned in the scope pub fn scope<'scope, F, T>(&self, f: F) -> Vec where F: FnOnce(&'scope Scope<'scope, T>) + 'scope + Send, @@ -488,4 +489,18 @@ mod tests { assert!(!thread_check_failed.load(Ordering::Acquire)); assert_eq!(count.load(Ordering::Acquire), 200); } + + #[test] + fn compile_fail() { + let pool = TaskPool::new(); + let foo = Box::new(42); + pool.scope(|scope| { + std::thread::spawn(move || { + // UB. This could spawn on the scope after `.scope` returns and the internal Scope is dropped. + scope.spawn(async move { + assert_eq!(*foo, 42); + }); + }); + }); + } } From 069750f4a90bda95c44aa1c1c1b60bf83f7a7db2 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Tue, 12 Apr 2022 20:37:42 -0700 Subject: [PATCH 11/29] copy lifetimes from std scope --- .../src/schedule/executor_parallel.rs | 2 +- crates/bevy_tasks/src/task_pool.rs | 57 ++++++++++--------- 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 6e90ba7d44a31..1f6405a88aafb 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -166,7 +166,7 @@ impl ParallelExecutor { /// queues systems with no dependencies to run (or skip) at next opportunity. fn prepare_systems<'scope>( &mut self, - scope: &Scope<'scope, ()>, + scope: &Scope<'_, 'scope, ()>, systems: &'scope mut [ParallelSystemContainer], world: &'scope World, ) { diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index fd4a6a0d57732..ea7a8de86732a 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -1,5 +1,6 @@ use std::{ future::Future, + marker::PhantomData, mem, pin::Pin, sync::Arc, @@ -142,9 +143,9 @@ impl TaskPool { /// /// This is similar to `rayon::scope` and `crossbeam::scope` // TODO: add compile fail test here for passing scope to a thread not spawned in the scope - pub fn scope<'scope, F, T>(&self, f: F) -> Vec + pub fn scope<'env, F, T>(&self, f: F) -> Vec where - F: FnOnce(&'scope Scope<'scope, T>) + 'scope + Send, + F: for<'scope> FnOnce(&'env Scope<'scope, 'env, T>), T: Send + 'static, { // SAFETY: This safety comment applies to all references transmuted to 'scope. @@ -153,21 +154,23 @@ impl TaskPool { // to completion in this function. However, rust has no way of knowing this so we // transmute the lifetimes to 'scope here to appease the compiler as it is unable to validate safety. let executor: &async_executor::Executor = &*self.executor; - let executor: &'scope async_executor::Executor = unsafe { mem::transmute(executor) }; + let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; let task_scope_executor = &async_executor::Executor::default(); - let task_scope_executor: &'scope async_executor::Executor = + let task_scope_executor: &'env async_executor::Executor = unsafe { mem::transmute(task_scope_executor) }; let spawned: ConcurrentQueue> = ConcurrentQueue::unbounded(); - let spawned_ref: &'scope ConcurrentQueue> = + let spawned_ref: &'env ConcurrentQueue> = unsafe { mem::transmute(&spawned) }; let scope = Scope { executor, task_scope_executor, spawned: spawned_ref, + scope: PhantomData, + env: PhantomData, }; - let scope_ref: &'scope Scope<'scope, T> = unsafe { mem::transmute(&scope) }; + let scope_ref: &'env Scope<'_, 'env, T> = unsafe { mem::transmute(&scope) }; f(scope_ref); @@ -256,13 +259,15 @@ impl Drop for TaskPool { /// /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] -pub struct Scope<'scope, T> { - executor: &'scope async_executor::Executor<'scope>, - task_scope_executor: &'scope async_executor::Executor<'scope>, - spawned: &'scope ConcurrentQueue>, +pub struct Scope<'scope, 'env: 'scope, T> { + executor: &'env async_executor::Executor<'env>, + task_scope_executor: &'env async_executor::Executor<'env>, + spawned: &'env ConcurrentQueue>, + scope: PhantomData<&'scope mut &'scope ()>, + env: PhantomData<&'env mut &'env ()>, } -impl<'scope, T: Send + 'scope> Scope<'scope, T> { +impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// Spawns a scoped future onto the thread pool. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. @@ -271,7 +276,7 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// instead. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn + 'scope + Send>(&self, f: Fut) { + pub fn spawn + 'env + Send>(&self, f: Fut) { let task = self.executor.spawn(f); // ConcurrentQueue only errors when closed or full, but we never // close and use an unbouded queue, so it is safe to unwrap @@ -284,7 +289,7 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// [`Scope::spawn`] instead, unless the provided future needs to run on the scope's thread. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_scope + 'scope + Send>(&self, f: Fut) { + pub fn spawn_on_scope + 'env + Send>(&self, f: Fut) { let task = self.task_scope_executor.spawn(f); // ConcurrentQueue only errors when closed or full, but we never // close and use an unbouded queue, so it is safe to unwrap @@ -490,17 +495,17 @@ mod tests { assert_eq!(count.load(Ordering::Acquire), 200); } - #[test] - fn compile_fail() { - let pool = TaskPool::new(); - let foo = Box::new(42); - pool.scope(|scope| { - std::thread::spawn(move || { - // UB. This could spawn on the scope after `.scope` returns and the internal Scope is dropped. - scope.spawn(async move { - assert_eq!(*foo, 42); - }); - }); - }); - } + // #[test] + // fn compile_fail() { + // let pool = TaskPool::new(); + // let foo = Box::new(42); + // pool.scope(|scope| { + // std::thread::spawn(move || { + // // UB. This could spawn on the scope after `.scope` returns and the internal Scope is dropped. + // scope.spawn(async move { + // assert_eq!(*foo, 42); + // }); + // }); + // }); + // } } From ffc9688c08d9e4a8ecaaa931b3e76e8ea840ae35 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Wed, 13 Apr 2022 11:16:29 -0700 Subject: [PATCH 12/29] add compile fail test --- crates/bevy_tasks/src/task_pool.rs | 31 +++++++++++++++--------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index ea7a8de86732a..9b74cc4940c73 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -142,7 +142,22 @@ impl TaskPool { /// to spawn tasks. This function will await the completion of all tasks before returning. /// /// This is similar to `rayon::scope` and `crossbeam::scope` - // TODO: add compile fail test here for passing scope to a thread not spawned in the scope + /// + /// ```compile_fail + /// use bevy_tasks::TaskPool; + /// fn compile_fail() { + /// let pool = TaskPool::new(); + /// let foo = Box::new(42); + /// pool.scope(|scope| { + /// std::thread::spawn(move || { + /// // UB. This could spawn on the scope after `.scope` returns and the internal Scope is dropped. + /// scope.spawn(async move { + /// assert_eq!(*foo, 42); + /// }); + /// }); + /// }); + /// } + /// ``` pub fn scope<'env, F, T>(&self, f: F) -> Vec where F: for<'scope> FnOnce(&'env Scope<'scope, 'env, T>), @@ -494,18 +509,4 @@ mod tests { assert!(!thread_check_failed.load(Ordering::Acquire)); assert_eq!(count.load(Ordering::Acquire), 200); } - - // #[test] - // fn compile_fail() { - // let pool = TaskPool::new(); - // let foo = Box::new(42); - // pool.scope(|scope| { - // std::thread::spawn(move || { - // // UB. This could spawn on the scope after `.scope` returns and the internal Scope is dropped. - // scope.spawn(async move { - // assert_eq!(*foo, 42); - // }); - // }); - // }); - // } } From 79c4b4e17e6b7097842b4927874bdedc23a631cb Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Wed, 13 Apr 2022 11:49:55 -0700 Subject: [PATCH 13/29] clean up comments --- crates/bevy_tasks/src/task_pool.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 9b74cc4940c73..06f15c398da1e 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -163,11 +163,11 @@ impl TaskPool { F: for<'scope> FnOnce(&'env Scope<'scope, 'env, T>), T: Send + 'static, { - // SAFETY: This safety comment applies to all references transmuted to 'scope. + // SAFETY: This safety comment applies to all references transmuted to 'env. // Any futures spawned with these references need to return before this function completes. // This is guaranteed because we drive all the futures spawned onto the Scope // to completion in this function. However, rust has no way of knowing this so we - // transmute the lifetimes to 'scope here to appease the compiler as it is unable to validate safety. + // transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety. let executor: &async_executor::Executor = &*self.executor; let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) }; let task_scope_executor = &async_executor::Executor::default(); @@ -278,6 +278,7 @@ pub struct Scope<'scope, 'env: 'scope, T> { executor: &'env async_executor::Executor<'env>, task_scope_executor: &'env async_executor::Executor<'env>, spawned: &'env ConcurrentQueue>, + // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, env: PhantomData<&'env mut &'env ()>, } From ea2fcef32d6ab5ece70bc177751053aeff9f9f1c Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Wed, 13 Apr 2022 20:17:11 -0700 Subject: [PATCH 14/29] change single threaded task pool type signatures to match changes --- .../src/single_threaded_task_pool.rs | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 757b711d999d3..764c5a028dfba 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -1,5 +1,6 @@ use std::{ future::Future, + marker::PhantomData, mem, sync::{Arc, Mutex}, }; @@ -61,27 +62,30 @@ impl TaskPool { /// to spawn tasks. This function will await the completion of all tasks before returning. /// /// This is similar to `rayon::scope` and `crossbeam::scope` - pub fn scope<'scope, F, T>(&self, f: F) -> Vec + pub fn scope<'env, F, T>(&self, f: F) -> Vec where - F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send, + F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>), T: Send + 'static, { let executor = &async_executor::LocalExecutor::new(); - let executor: &'scope async_executor::LocalExecutor<'scope> = + let executor: &'env async_executor::LocalExecutor<'env> = unsafe { mem::transmute(executor) }; let mut scope = Scope { executor, - results: Vec::new(), + results: Arc::new(Mutex::new(Vec::new())), + scope: PhantomData, }; - f(&mut scope); + let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) }; + + f(scope_ref); // Loop until all tasks are done while executor.try_tick() {} - scope - .results + let results = scope.results.lock().unwrap(); + results .iter() .map(|result| result.lock().unwrap().take().unwrap()) .collect() @@ -127,12 +131,14 @@ impl FakeTask { /// /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] -pub struct Scope<'scope, T> { - executor: &'scope async_executor::LocalExecutor<'scope>, +pub struct Scope<'scope, 'env: 'scope, T> { + executor: &'env async_executor::LocalExecutor<'env>, // Vector to gather results of all futures spawned during scope run - results: Vec>>>, + results: Arc>>>>>, + scope: PhantomData<&'scope ()>, } +impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of @@ -141,8 +147,8 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// On the single threaded task pool, it just calls [`Scope::spawn_local`]. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn + 'scope + Send>(&mut self, f: Fut) { - self.spawn_local(f); + pub fn spawn + 'env>(&self, f: Fut) { + self.spawn_on_scope(f); } /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive @@ -150,9 +156,9 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// [`TaskPool::scope`]'s return value. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_local + 'scope>(&mut self, f: Fut) { + pub fn spawn_on_scope + 'env>(&self, f: Fut) { let result = Arc::new(Mutex::new(None)); - self.results.push(result.clone()); + self.results.lock().unwrap().push(result.clone()); let f = async move { result.lock().unwrap().replace(f.await); }; From b523732a01ca94dbfac445a16dc696f7878c0a78 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Wed, 13 Apr 2022 20:27:46 -0700 Subject: [PATCH 15/29] fix doc comment --- crates/bevy_tasks/src/task_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 06f15c398da1e..ea0f09bf151f0 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -288,7 +288,7 @@ impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. /// - /// If the provided future is non-`Send`, [`Scope::spawn_local`] should be used + /// For futures that should run on the thread `scope` is called on [`Scope::spawn_on_scope`] should be used /// instead. /// /// For more information, see [`TaskPool::scope`]. From 575ae9470f6bb7692e7111dcd50770546f87903b Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Thu, 14 Apr 2022 10:02:31 -0700 Subject: [PATCH 16/29] change Arc to immutable reference --- crates/bevy_tasks/src/single_threaded_task_pool.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 764c5a028dfba..991ac8e63fb6c 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -71,9 +71,12 @@ impl TaskPool { let executor: &'env async_executor::LocalExecutor<'env> = unsafe { mem::transmute(executor) }; + let results: Mutex>>>> = Mutex::new(Vec::new()); + let results: &'env Mutex>>>> = unsafe { mem::transmute(&results) }; + let mut scope = Scope { executor, - results: Arc::new(Mutex::new(Vec::new())), + results, scope: PhantomData, }; @@ -134,7 +137,7 @@ impl FakeTask { pub struct Scope<'scope, 'env: 'scope, T> { executor: &'env async_executor::LocalExecutor<'env>, // Vector to gather results of all futures spawned during scope run - results: Arc>>>>>, + results: &'env Mutex>>>>, scope: PhantomData<&'scope ()>, } From 2aae929c3b6c896c6b537b171cede57b8376c810 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Sat, 16 Apr 2022 14:19:52 -0700 Subject: [PATCH 17/29] add another compile fail test --- crates/bevy_tasks/src/task_pool.rs | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index ea0f09bf151f0..77b737231b505 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -145,7 +145,7 @@ impl TaskPool { /// /// ```compile_fail /// use bevy_tasks::TaskPool; - /// fn compile_fail() { + /// fn scope_escapes_closure() { /// let pool = TaskPool::new(); /// let foo = Box::new(42); /// pool.scope(|scope| { @@ -158,6 +158,20 @@ impl TaskPool { /// }); /// } /// ``` + /// + /// ```compile_fail + /// use bevy_tasks::TaskPool; + /// fn cannot_borrow_from_closure() { + /// let pool = TaskPool::new(); + /// pool.scope(|scope| { + /// let x = 1; + /// let y = &x; + /// scope.spawn(async move { + /// assert_eq!(*y, 1); + /// }); + /// }); + /// } + /// pub fn scope<'env, F, T>(&self, f: F) -> Vec where F: for<'scope> FnOnce(&'env Scope<'scope, 'env, T>), From aa051ec9d00e45f194774baab9ea4fef0c6cc674 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Mon, 18 Apr 2022 14:46:16 -0700 Subject: [PATCH 18/29] match lifetime of return value to lifetimes of futures --- crates/bevy_tasks/src/task_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 77b737231b505..2144989d2f59c 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -297,7 +297,7 @@ pub struct Scope<'scope, 'env: 'scope, T> { env: PhantomData<&'env mut &'env ()>, } -impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { +impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// Spawns a scoped future onto the thread pool. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. From 97da059a72619d8a76c0383dc03792d797c53d9a Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Thu, 21 Apr 2022 19:27:13 -0700 Subject: [PATCH 19/29] rework executor code for performance --- crates/bevy_tasks/src/task_pool.rs | 50 +++++++++++------------------- 1 file changed, 18 insertions(+), 32 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 2144989d2f59c..2fbd613fec151 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -2,13 +2,12 @@ use std::{ future::Future, marker::PhantomData, mem, - pin::Pin, sync::Arc, thread::{self, JoinHandle}, }; use concurrent_queue::ConcurrentQueue; -use futures_lite::{future, pin}; +use futures_lite::{future, FutureExt}; use crate::Task; @@ -203,40 +202,27 @@ impl TaskPool { f(scope_ref); - let fut = async move { - let mut results = Vec::with_capacity(spawned.len()); - while let Ok(task) = spawned.pop() { - results.push(task.await); - } + future::block_on(async move { + let get_results = async move { + let mut results = Vec::with_capacity(spawned.len()); + while let Ok(task) = spawned.pop() { + results.push(task.await); + } - results - }; + results + }; - // Pin the futures on the stack. - pin!(fut); - - // SAFETY: This function blocks until all futures complete, so we do not read/write - // the data from futures outside of the 'scope lifetime. However, - // rust has no way of knowing this so we must convert to 'static - // here to appease the compiler as it is unable to validate safety. - let fut: Pin<&mut (dyn Future> + 'static + Send)> = fut; - let fut: Pin<&'static mut (dyn Future> + 'static + Send)> = - unsafe { mem::transmute(fut) }; - - // The thread that calls scope() will participate in driving tasks in the pool - // forward until the tasks that are spawned by this scope() call - // complete. (If the caller of scope() happens to be a thread in - // this thread pool, and we only have one thread in the pool, then - // simply calling future::block_on(spawned) would deadlock.) - let mut spawned = task_scope_executor.spawn(fut); - loop { - if let Some(result) = future::block_on(future::poll_once(&mut spawned)) { - break result; + let tick_forever = async move { + loop { + self.executor.try_tick(); + task_scope_executor.try_tick(); + + future::yield_now().await + } }; - self.executor.try_tick(); - task_scope_executor.try_tick(); - } + get_results.or(tick_forever).await + }) } /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be From 624e2825cd5f254b2c8c632a577747b0946af3dd Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Fri, 22 Apr 2022 19:55:58 -0700 Subject: [PATCH 20/29] add invariance of lifetimes to single threaded executor --- crates/bevy_tasks/src/single_threaded_task_pool.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 991ac8e63fb6c..838c949cc89f7 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -78,6 +78,7 @@ impl TaskPool { executor, results, scope: PhantomData, + env: PhantomData, }; let scope_ref: &'env mut Scope<'_, 'env, T> = unsafe { mem::transmute(&mut scope) }; @@ -138,7 +139,10 @@ pub struct Scope<'scope, 'env: 'scope, T> { executor: &'env async_executor::LocalExecutor<'env>, // Vector to gather results of all futures spawned during scope run results: &'env Mutex>>>>, - scope: PhantomData<&'scope ()>, + + // make `Scope` invariant over 'scope and 'env + scope: PhantomData<&'scope mut &'scope ()>, + env: PhantomData<&'env mut &'env ()>, } impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { From b2b675637a8c1c9b41f99b061d33c91e07331b94 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Fri, 22 Apr 2022 20:21:16 -0700 Subject: [PATCH 21/29] add some docs about the lifetimes --- crates/bevy_tasks/src/task_pool.rs | 39 ++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 2fbd613fec151..b0e94faf4eaaa 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -142,6 +142,45 @@ impl TaskPool { /// /// This is similar to `rayon::scope` and `crossbeam::scope` /// + /// # Example + /// + /// ``` + /// use bevy_tasks::TaskPool; + /// + /// let pool = TaskPool::new(); + /// let mut x = 0; + /// let results = pool.scope(|s| { + /// s.spawn(async { + /// // you can borrow the spawner inside a task and spawn tasks from within the task + /// s.spawn(async { + /// // borrow x and mutate it. + /// x = 2; + /// // return a value from the task + /// 1 + /// }); + /// // return some other value from the first task + /// 0 + /// }); + /// }); + /// + /// // results are returned in the order the tasks are spawned in. + /// // Note: the ordering may become non-deterministic if you spawn from within tasks. + /// // the ordering is only guaranteed when tasks are spawned directly from the main closure. + /// assert_eq!(&results[..], &[0, 1]); + /// // can access x after scope runs + /// assert_eq!(x, 2); + /// ``` + /// + /// # Lifetimes + /// + /// The [`TaskPool::Scope`] object takes two lifetimes: `'scope` and `'env`. + /// + /// The `'scope` lifetime represents the lifetime of the scope. That is the time during + /// which the provided closure and tasks that are spawned into the scope are run. + /// + /// The `'env` lifetime represents the lifetime of whatever is borrowed by the scope. + /// Thus this lifetime must outlive `'scope`. + /// /// ```compile_fail /// use bevy_tasks::TaskPool; /// fn scope_escapes_closure() { From 4f38e5338fdef433ff3751bac2fd7046e9aa668a Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Fri, 22 Apr 2022 20:39:59 -0700 Subject: [PATCH 22/29] fix doc link --- crates/bevy_tasks/src/task_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index b0e94faf4eaaa..be2b47b88c564 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -173,7 +173,7 @@ impl TaskPool { /// /// # Lifetimes /// - /// The [`TaskPool::Scope`] object takes two lifetimes: `'scope` and `'env`. + /// The [`Scope`] object takes two lifetimes: `'scope` and `'env`. /// /// The `'scope` lifetime represents the lifetime of the scope. That is the time during /// which the provided closure and tasks that are spawned into the scope are run. From f60d41d3957a78bec36553eb3c74fc8b65e2a4b5 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Sun, 24 Apr 2022 20:04:25 -0700 Subject: [PATCH 23/29] fix some issues from rebase --- crates/bevy_tasks/src/single_threaded_task_pool.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index 838c949cc89f7..af5b9ad774303 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -146,7 +146,6 @@ pub struct Scope<'scope, 'env: 'scope, T> { } impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { -impl<'scope, T: Send + 'scope> Scope<'scope, T> { /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. @@ -158,9 +157,9 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> { self.spawn_on_scope(f); } - /// Spawns a scoped future onto the thread-local executor. The scope *must* outlive - /// the provided future. The results of the future will be returned as a part of - /// [`TaskPool::scope`]'s return value. + /// Spawns a scoped future that runs on the thread the scope called from. The + /// scope *must* outlive the provided future. The results of the future will be + /// returned as a part of [`TaskPool::scope`]'s return value. /// /// For more information, see [`TaskPool::scope`]. pub fn spawn_on_scope + 'env>(&self, f: Fut) { From 0279dd1ca7fc5bf9803c5d9c04a22e1fd01f6c95 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Mon, 25 Apr 2022 10:32:59 -0700 Subject: [PATCH 24/29] run cargo fmt --- crates/bevy_tasks/src/single_threaded_task_pool.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/bevy_tasks/src/single_threaded_task_pool.rs b/crates/bevy_tasks/src/single_threaded_task_pool.rs index af5b9ad774303..8fa37f4f2361b 100644 --- a/crates/bevy_tasks/src/single_threaded_task_pool.rs +++ b/crates/bevy_tasks/src/single_threaded_task_pool.rs @@ -157,8 +157,8 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { self.spawn_on_scope(f); } - /// Spawns a scoped future that runs on the thread the scope called from. The - /// scope *must* outlive the provided future. The results of the future will be + /// Spawns a scoped future that runs on the thread the scope called from. The + /// scope *must* outlive the provided future. The results of the future will be /// returned as a part of [`TaskPool::scope`]'s return value. /// /// For more information, see [`TaskPool::scope`]. From b48b0566a64c86189e6bc30ca0812be9d8c38c66 Mon Sep 17 00:00:00 2001 From: Mike Hsu Date: Sat, 4 Jun 2022 12:12:27 -0700 Subject: [PATCH 25/29] fix new clippy error --- crates/bevy_tasks/src/task_pool.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index be2b47b88c564..1f9701615f3e5 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -256,7 +256,7 @@ impl TaskPool { self.executor.try_tick(); task_scope_executor.try_tick(); - future::yield_now().await + future::yield_now().await; } }; From 57bf8caeaefe80e83fd945553cab20634a50b49d Mon Sep 17 00:00:00 2001 From: Mike Hsu Date: Thu, 9 Jun 2022 22:37:46 -0700 Subject: [PATCH 26/29] revert change to executor --- crates/bevy_tasks/src/task_pool.rs | 46 +++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 14 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 1f9701615f3e5..edaec1ca29e33 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -3,11 +3,11 @@ use std::{ marker::PhantomData, mem, sync::Arc, - thread::{self, JoinHandle}, + thread::{self, JoinHandle}, pin::Pin, }; use concurrent_queue::ConcurrentQueue; -use futures_lite::{future, FutureExt}; +use futures_lite::{future, pin}; use crate::Task; @@ -241,7 +241,9 @@ impl TaskPool { f(scope_ref); - future::block_on(async move { + if spawned.is_empty() { + Vec::new() + } else { let get_results = async move { let mut results = Vec::with_capacity(spawned.len()); while let Ok(task) = spawned.pop() { @@ -251,17 +253,33 @@ impl TaskPool { results }; - let tick_forever = async move { - loop { - self.executor.try_tick(); - task_scope_executor.try_tick(); - - future::yield_now().await; - } - }; - - get_results.or(tick_forever).await - }) + // Pin the futures on the stack. + pin!(get_results); + + // SAFETY: This function blocks until all futures complete, so we do not read/write + // the data from futures outside of the 'scope lifetime. However, + // rust has no way of knowing this so we must convert to 'static + // here to appease the compiler as it is unable to validate safety. + let get_results: Pin<&mut (dyn Future> + 'static + Send)> = get_results; + let get_results: Pin<&'static mut (dyn Future> + 'static + Send)> = + unsafe { mem::transmute(get_results) }; + + // The thread that calls scope() will participate in driving tasks in the pool + // forward until the tasks that are spawned by this scope() call + // complete. (If the caller of scope() happens to be a thread in + // this thread pool, and we only have one thread in the pool, then + // simply calling future::block_on(spawned) would deadlock.) + let mut spawned = task_scope_executor.spawn(get_results); + + loop { + if let Some(result) = future::block_on(future::poll_once(&mut spawned)) { + break result; + }; + + self.executor.try_tick(); + task_scope_executor.try_tick(); + } + } } /// Spawns a static future onto the thread pool. The returned Task is a future. It can also be From 2e6e438681e7fe61a6bc453d0f34eaa062d708b9 Mon Sep 17 00:00:00 2001 From: Mike Hsu Date: Fri, 10 Jun 2022 10:19:36 -0700 Subject: [PATCH 27/29] fix rebase formatting issues --- crates/bevy_tasks/src/task_pool.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index edaec1ca29e33..86e737b207335 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -2,8 +2,9 @@ use std::{ future::Future, marker::PhantomData, mem, + pin::Pin, sync::Arc, - thread::{self, JoinHandle}, pin::Pin, + thread::{self, JoinHandle}, }; use concurrent_queue::ConcurrentQueue; @@ -253,7 +254,7 @@ impl TaskPool { results }; - // Pin the futures on the stack. + // Pin the futures on the stack. pin!(get_results); // SAFETY: This function blocks until all futures complete, so we do not read/write @@ -275,7 +276,7 @@ impl TaskPool { if let Some(result) = future::block_on(future::poll_once(&mut spawned)) { break result; }; - + self.executor.try_tick(); task_scope_executor.try_tick(); } From 243ae2f8c43b26a277e5d81ff253e109a96f6a1b Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Wed, 20 Apr 2022 19:37:34 -0700 Subject: [PATCH 28/29] change lifetimes to match std --- crates/bevy_tasks/src/task_pool.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/bevy_tasks/src/task_pool.rs b/crates/bevy_tasks/src/task_pool.rs index 86e737b207335..82d4e3fff8689 100644 --- a/crates/bevy_tasks/src/task_pool.rs +++ b/crates/bevy_tasks/src/task_pool.rs @@ -213,7 +213,7 @@ impl TaskPool { /// pub fn scope<'env, F, T>(&self, f: F) -> Vec where - F: for<'scope> FnOnce(&'env Scope<'scope, 'env, T>), + F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>), T: Send + 'static, { // SAFETY: This safety comment applies to all references transmuted to 'env. @@ -333,15 +333,15 @@ impl Drop for TaskPool { /// For more information, see [`TaskPool::scope`]. #[derive(Debug)] pub struct Scope<'scope, 'env: 'scope, T> { - executor: &'env async_executor::Executor<'env>, - task_scope_executor: &'env async_executor::Executor<'env>, - spawned: &'env ConcurrentQueue>, + executor: &'scope async_executor::Executor<'scope>, + task_scope_executor: &'scope async_executor::Executor<'scope>, + spawned: &'scope ConcurrentQueue>, // make `Scope` invariant over 'scope and 'env scope: PhantomData<&'scope mut &'scope ()>, env: PhantomData<&'env mut &'env ()>, } -impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { +impl<'scope, 'env, T: Send + 'scope> Scope<'scope, 'env, T> { /// Spawns a scoped future onto the thread pool. The scope *must* outlive /// the provided future. The results of the future will be returned as a part of /// [`TaskPool::scope`]'s return value. @@ -350,7 +350,7 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// instead. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn + 'env + Send>(&self, f: Fut) { + pub fn spawn + 'scope + Send>(&self, f: Fut) { let task = self.executor.spawn(f); // ConcurrentQueue only errors when closed or full, but we never // close and use an unbouded queue, so it is safe to unwrap @@ -363,7 +363,7 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> { /// [`Scope::spawn`] instead, unless the provided future needs to run on the scope's thread. /// /// For more information, see [`TaskPool::scope`]. - pub fn spawn_on_scope + 'env + Send>(&self, f: Fut) { + pub fn spawn_on_scope + 'scope + Send>(&self, f: Fut) { let task = self.task_scope_executor.spawn(f); // ConcurrentQueue only errors when closed or full, but we never // close and use an unbouded queue, so it is safe to unwrap From e4ad01d8fa67a83e532025e827a4bb5b0f170753 Mon Sep 17 00:00:00 2001 From: Michael Hsu Date: Wed, 14 Sep 2022 14:48:29 -0700 Subject: [PATCH 29/29] fix rebase error --- crates/bevy_ecs/src/schedule/executor_parallel.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bevy_ecs/src/schedule/executor_parallel.rs b/crates/bevy_ecs/src/schedule/executor_parallel.rs index 1f6405a88aafb..0f3c13689acce 100644 --- a/crates/bevy_ecs/src/schedule/executor_parallel.rs +++ b/crates/bevy_ecs/src/schedule/executor_parallel.rs @@ -236,7 +236,7 @@ impl ParallelExecutor { if system_data.is_send { scope.spawn(task); } else { - scope.spawn_local(task); + scope.spawn_on_scope(task); } #[cfg(test)]