Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions crates/bevy_tasks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ crossbeam-queue = { version = "0.3", default-features = false, features = [
] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
pin-project = { version = "1" }
futures-channel = { version = "0.3", default-features = false }
pin-project = "1"
async-channel = "2.3.0"

[target.'cfg(not(all(target_has_atomic = "8", target_has_atomic = "16", target_has_atomic = "32", target_has_atomic = "64", target_has_atomic = "ptr")))'.dependencies]
async-task = { version = "4.4.0", default-features = false, features = [
Expand All @@ -72,6 +72,7 @@ atomic-waker = { version = "1", default-features = false, features = [
futures-lite = { version = "2.0.1", default-features = false, features = [
"std",
] }
async-channel = "2.3.0"

[lints]
workspace = true
Expand Down
34 changes: 14 additions & 20 deletions crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,32 +71,35 @@ use alloc::boxed::Box;
/// An owned and dynamically typed Future used when you can't statically type your result or need to add some indirection.
pub type BoxedFuture<'a, T> = core::pin::Pin<Box<dyn ConditionalSendFuture<Output = T> + 'a>>;

// Modules
mod executor;
pub mod futures;
mod iter;
mod slice;
mod task;
mod usages;

cfg::async_executor! {
if {} else {
mod edge_executor;
}
}

mod executor;

mod slice;
// Exports
pub use iter::ParallelIterator;
pub use slice::{ParallelSlice, ParallelSliceMut};
pub use task::Task;
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};

cfg::web! {
if {
#[path = "wasm_task.rs"]
mod task;
} else {
mod task;
pub use futures_lite;
pub use futures_lite::future::poll_once;

cfg::web! {
if {} else {
pub use usages::tick_global_task_pools_on_main_thread;
}
}

pub use task::Task;

cfg::multi_threaded! {
if {
mod task_pool;
Expand All @@ -111,10 +114,6 @@ cfg::multi_threaded! {
}
}

mod usages;
pub use futures_lite::future::poll_once;
pub use usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool};

cfg::switch! {
cfg::async_io => {
pub use async_io::block_on;
Expand Down Expand Up @@ -147,11 +146,6 @@ cfg::switch! {
}
}

mod iter;
pub use iter::ParallelIterator;

pub use futures_lite;

/// The tasks prelude.
///
/// This includes the most common types in this crate, re-exported for your convenience.
Expand Down
136 changes: 89 additions & 47 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,25 @@
use alloc::{string::String, vec::Vec};
use bevy_platform::sync::Arc;
use core::{cell::RefCell, future::Future, marker::PhantomData, mem};
use core::{cell::{RefCell, Cell}, future::Future, marker::PhantomData, mem};

use crate::Task;
use crate::executor::LocalExecutor;
use crate::{block_on, Task};

crate::cfg::std! {
if {
use std::thread_local;
use crate::executor::LocalExecutor;

use crate::executor::LocalExecutor as Executor;

thread_local! {
static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() };
static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };
}

type ScopeResult<T> = alloc::rc::Rc<RefCell<Option<T>>>;
} else {
use bevy_platform::sync::{Mutex, PoisonError};
use crate::executor::Executor as LocalExecutor;

static LOCAL_EXECUTOR: LocalExecutor<'static> = const { LocalExecutor::new() };
// Because we do not have thread-locals without std, we cannot use LocalExecutor here.
use crate::executor::Executor;

type ScopeResult<T> = Arc<Mutex<Option<T>>>;
static LOCAL_EXECUTOR: Executor<'static> = const { Executor::new() };
}
}

Expand Down Expand Up @@ -111,7 +110,7 @@ impl TaskPool {
/// This is similar to `rayon::scope` and `crossbeam::scope`
pub fn scope<'env, F, T>(&self, f: F) -> Vec<T>
where
F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),
T: Send + 'static,
{
self.scope_with_executor(false, None, f)
Expand All @@ -130,7 +129,7 @@ impl TaskPool {
f: F,
) -> Vec<T>
where
F: for<'scope> FnOnce(&'env mut Scope<'scope, 'env, T>),
F: for<'scope> FnOnce(&'scope mut Scope<'scope, 'env, T>),
T: Send + 'static,
{
// SAFETY: This safety comment applies to all references transmuted to 'env.
Expand All @@ -141,17 +140,22 @@ impl TaskPool {
// Any usages of the references passed into `Scope` must be accessed through
// the transmuted reference for the rest of this function.

let executor = &LocalExecutor::new();
let executor = LocalExecutor::new();
// SAFETY: As above, all futures must complete in this function so we can change the lifetime
let executor_ref: &'env LocalExecutor<'env> = unsafe { mem::transmute(&executor) };

let results: RefCell<Vec<Option<T>>> = RefCell::new(Vec::new());
// SAFETY: As above, all futures must complete in this function so we can change the lifetime
let executor: &'env LocalExecutor<'env> = unsafe { mem::transmute(executor) };
let results_ref: &'env RefCell<Vec<Option<T>>> = unsafe { mem::transmute(&results) };

let results: RefCell<Vec<ScopeResult<T>>> = RefCell::new(Vec::new());
let pending_tasks: Cell<usize> = Cell::new(0);
// SAFETY: As above, all futures must complete in this function so we can change the lifetime
let results: &'env RefCell<Vec<ScopeResult<T>>> = unsafe { mem::transmute(&results) };
let pending_tasks: &'env Cell<usize> = unsafe { mem::transmute(&pending_tasks) };

let mut scope = Scope {
executor,
results,
executor_ref,
pending_tasks,
results_ref,
scope: PhantomData,
env: PhantomData,
};
Expand All @@ -161,21 +165,17 @@ impl TaskPool {

f(scope_ref);

// Loop until all tasks are done
while executor.try_tick() {}
// Wait until the scope is complete
block_on(executor.run(async {
while pending_tasks.get() != 0 {
futures_lite::future::yield_now().await;
}
}));

let results = scope.results.borrow();
results
.iter()
.map(|result| crate::cfg::switch! {{
crate::cfg::std => {
result.borrow_mut().take().unwrap()
}
_ => {
let mut lock = result.lock().unwrap_or_else(PoisonError::into_inner);
lock.take().unwrap()
}
}})
.take()
.into_iter()
.map(|result| result.unwrap())
.collect()
}

Expand Down Expand Up @@ -239,7 +239,7 @@ impl TaskPool {
/// ```
pub fn with_local_executor<F, R>(&self, f: F) -> R
where
F: FnOnce(&LocalExecutor) -> R,
F: FnOnce(&Executor) -> R,
{
crate::cfg::switch! {{
crate::cfg::std => {
Expand All @@ -257,9 +257,11 @@ impl TaskPool {
/// For more information, see [`TaskPool::scope`].
#[derive(Debug)]
pub struct Scope<'scope, 'env: 'scope, T> {
executor: &'scope LocalExecutor<'scope>,
executor_ref: &'scope LocalExecutor<'scope>,
// The number of pending tasks spawned on the scope
pending_tasks: &'scope Cell<usize>,
// Vector to gather results of all futures spawned during scope run
results: &'env RefCell<Vec<ScopeResult<T>>>,
results_ref: &'env RefCell<Vec<Option<T>>>,

// make `Scope` invariant over 'scope and 'env
scope: PhantomData<&'scope mut &'scope ()>,
Expand Down Expand Up @@ -295,21 +297,32 @@ impl<'scope, 'env, T: Send + 'env> Scope<'scope, 'env, T> {
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_on_scope<Fut: Future<Output = T> + 'scope + MaybeSend>(&self, f: Fut) {
let result = ScopeResult::<T>::default();
self.results.borrow_mut().push(result.clone());
// increment the number of pending tasks
let pending_tasks = self.pending_tasks;
pending_tasks.update(|i| i + 1);

// add a spot to keep the result, and record the index
let results_ref = self.results_ref;
let mut results = results_ref.borrow_mut();
let task_number = results.len();
results.push(None);
drop(results);

// create the job closure
let f = async move {
let temp_result = f.await;

crate::cfg::std! {
if {
result.borrow_mut().replace(temp_result);
} else {
let mut lock = result.lock().unwrap_or_else(PoisonError::into_inner);
*lock = Some(temp_result);
}
}
let result = f.await;

// store the result in the allocated slot
let mut results = results_ref.borrow_mut();
results[task_number] = Some(result);
drop(results);

// decrement the pending tasks count
pending_tasks.update(|i| i - 1);
};
self.executor.spawn(f).detach();

// spawn the job itself
self.executor_ref.spawn(f).detach();
}
}

Expand All @@ -328,3 +341,32 @@ crate::cfg::std! {
impl<T: Sync> MaybeSync for T {}
}
}

#[cfg(test)]
mod test {
use std::{time, thread};

use super::*;

/// This test creates a scope with a single task that goes to sleep for a
/// nontrivial amount of time. At one point, the scope would (incorrectly)
/// return early under these conditions, causing a crash.
///
/// The correct behavior is for the scope to block until the receiver is
/// woken by the external thread.
#[test]
fn scoped_spawn() {
let (sender, recever) = async_channel::unbounded();
let task_pool = TaskPool {};
let thread = thread::spawn(move || {
let duration = time::Duration::from_millis(50);
thread::sleep(duration);
let _ = sender.send(0);
});
task_pool.scope(|scope| {
scope.spawn(async {
recever.recv().await
});
});
}
}
Loading
Loading