Skip to content

Commit 93e7ffa

Browse files
committed
Allow using the asset processor in single-threaded mode.
1 parent d3bb065 commit 93e7ffa

File tree

3 files changed

+97
-118
lines changed

3 files changed

+97
-118
lines changed

crates/bevy_asset/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ crossbeam-channel = { version = "0.5", default-features = false, features = [
5252
downcast-rs = { version = "2", default-features = false }
5353
disqualified = { version = "1.0", default-features = false }
5454
either = { version = "1.13", default-features = false }
55+
futures-util = { version = "0.3", default-features = false, features = [
56+
"alloc",
57+
] }
5558
futures-io = { version = "0.3", default-features = false }
5659
futures-lite = { version = "2.0.1", default-features = false }
5760
blake3 = { version = "1.5", default-features = false }

crates/bevy_asset/src/processor/mod.rs

Lines changed: 75 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
mod log;
4141
mod process;
4242

43+
use bevy_tasks::BoxedFuture;
4344
pub use log::*;
4445
pub use process::*;
4546

@@ -62,9 +63,10 @@ use bevy_platform::{
6263
collections::{HashMap, HashSet},
6364
sync::{PoisonError, RwLock},
6465
};
65-
use bevy_tasks::IoTaskPool;
66+
use bevy_tasks::{poll_once, IoTaskPool};
6667
use futures_io::ErrorKind;
6768
use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt};
69+
use futures_util::{future::join_all, stream::select_all};
6870
use std::{
6971
path::{Path, PathBuf},
7072
sync::Mutex,
@@ -222,16 +224,13 @@ impl AssetProcessor {
222224

223225
/// Starts the processor in a background thread.
224226
pub fn start(_processor: Res<Self>) {
225-
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
226-
error!("Cannot run AssetProcessor in single threaded mode (or Wasm) yet.");
227-
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
228-
{
229-
let processor = _processor.clone();
230-
std::thread::spawn(move || {
231-
processor.process_assets();
232-
bevy_tasks::block_on(processor.listen_for_source_change_events());
233-
});
234-
}
227+
let processor = _processor.clone();
228+
IoTaskPool::get()
229+
.spawn(async move {
230+
processor.process_assets().await;
231+
processor.listen_for_source_change_events().await;
232+
})
233+
.detach();
235234
}
236235

237236
/// Processes all assets. This will:
@@ -244,23 +243,20 @@ impl AssetProcessor {
244243
/// * For each asset in the unprocessed [`AssetReader`](crate::io::AssetReader), kick off a new
245244
/// "process job", which will process the asset
246245
/// (if the latest version of the asset has not been processed).
247-
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
248-
pub fn process_assets(&self) {
246+
pub async fn process_assets(&self) {
249247
let start_time = std::time::Instant::now();
250248
debug!("Processing Assets");
251-
IoTaskPool::get().scope(|scope| {
252-
scope.spawn(async move {
253-
self.initialize().await.unwrap();
254-
for source in self.sources().iter_processed() {
255-
self.process_assets_internal(scope, source, PathBuf::from(""))
256-
.await
257-
.unwrap();
258-
}
259-
});
260-
});
261-
// This must happen _after_ the scope resolves or it will happen "too early"
262-
// Don't move this into the async scope above! process_assets is a blocking/sync function this is fine
263-
bevy_tasks::block_on(self.finish_processing_assets());
249+
let mut tasks = vec![];
250+
self.initialize().await.unwrap();
251+
for source in self.sources().iter_processed() {
252+
self.process_assets_internal(source, PathBuf::from(""), &mut tasks)
253+
.await
254+
.unwrap();
255+
}
256+
257+
join_all(tasks).await;
258+
259+
self.finish_processing_assets().await;
264260
let end_time = std::time::Instant::now();
265261
debug!("Processing finished in {:?}", end_time - start_time);
266262
}
@@ -269,26 +265,51 @@ impl AssetProcessor {
269265
// PERF: parallelize change event processing
270266
pub async fn listen_for_source_change_events(&self) {
271267
debug!("Listening for changes to source assets");
272-
loop {
273-
let mut started_processing = false;
274-
275-
for source in self.data.sources.iter_processed() {
276-
if let Some(receiver) = source.event_receiver() {
277-
// TODO: Handle TryRecvError::Closed.
278-
while let Ok(event) = receiver.try_recv() {
279-
if !started_processing {
280-
self.set_state(ProcessorState::Processing).await;
281-
started_processing = true;
282-
}
283268

284-
self.handle_asset_source_event(source, event).await;
269+
// Collect all the event receivers and select across all of them.
270+
let mut source_receivers = vec![];
271+
for source in self.data.sources.iter_processed() {
272+
let Some(receiver) = source.event_receiver() else {
273+
continue;
274+
};
275+
let source_id = source.id();
276+
let source_receiver = receiver
277+
.clone()
278+
.map(move |event| (source_id.clone(), event));
279+
source_receivers.push(Box::pin(source_receiver));
280+
}
281+
let mut all_receiver = select_all(source_receivers);
282+
283+
// We want to await an entry in the stream, but once we have one, we want to process all the
284+
// events in the channels before sending the "finished processing" state. So await the next
285+
// item, then keep getting the next entry in the stream until we have to sleep.
286+
let mut next = None;
287+
while let Some(mut item) = {
288+
if next.is_none() {
289+
next = Some(all_receiver.next());
290+
}
291+
next.take().unwrap()
292+
}
293+
.await
294+
{
295+
self.set_state(ProcessorState::Processing).await;
296+
297+
loop {
298+
let (source_id, event) = item;
299+
self.handle_asset_source_event(self.data.sources.get(source_id).unwrap(), event)
300+
.await;
301+
let mut next_next = all_receiver.next();
302+
item = match poll_once(&mut next_next).await {
303+
None => {
304+
next = Some(next_next);
305+
break;
285306
}
286-
}
307+
Some(None) => return,
308+
Some(Some(item)) => item,
309+
};
287310
}
288311

289-
if started_processing {
290-
self.finish_processing_assets().await;
291-
}
312+
self.finish_processing_assets().await;
292313
}
293314
}
294315

@@ -448,16 +469,12 @@ impl AssetProcessor {
448469
"Folder {} was added. Attempting to re-process",
449470
AssetPath::from_path(&path).with_source(source.id())
450471
);
451-
#[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))]
452-
error!("AddFolder event cannot be handled in single threaded mode (or Wasm) yet.");
453-
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
454-
IoTaskPool::get().scope(|scope| {
455-
scope.spawn(async move {
456-
self.process_assets_internal(scope, source, path)
457-
.await
458-
.unwrap();
459-
});
460-
});
472+
let mut tasks = vec![];
473+
self.process_assets_internal(source, path, &mut tasks)
474+
.await
475+
.unwrap();
476+
477+
join_all(tasks).await;
461478
}
462479

463480
/// Responds to a removed meta event by reprocessing the asset at the given path.
@@ -567,24 +584,23 @@ impl AssetProcessor {
567584
self.set_state(ProcessorState::Finished).await;
568585
}
569586

570-
#[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))]
571587
async fn process_assets_internal<'scope>(
572588
&'scope self,
573-
scope: &'scope bevy_tasks::Scope<'scope, '_, ()>,
574589
source: &'scope AssetSource,
575590
path: PathBuf,
591+
tasks: &mut Vec<BoxedFuture<'scope, ()>>,
576592
) -> Result<(), AssetReaderError> {
577593
if source.reader().is_directory(&path).await? {
578594
let mut path_stream = source.reader().read_directory(&path).await?;
579595
while let Some(path) = path_stream.next().await {
580-
Box::pin(self.process_assets_internal(scope, source, path)).await?;
596+
Box::pin(self.process_assets_internal(source, path, tasks)).await?;
581597
}
582598
} else {
583599
// Files without extensions are skipped
584600
let processor = self.clone();
585-
scope.spawn(async move {
601+
tasks.push(Box::pin(async move {
586602
processor.process_asset(source, path).await;
587-
});
603+
}));
588604
}
589605
Ok(())
590606
}
@@ -661,13 +677,6 @@ impl AssetProcessor {
661677
/// This info will later be used to determine whether or not to re-process an asset
662678
///
663679
/// This will validate transactions and recover failed transactions when necessary.
664-
#[cfg_attr(
665-
any(target_arch = "wasm32", not(feature = "multi_threaded")),
666-
expect(
667-
dead_code,
668-
reason = "This function is only used when the `multi_threaded` feature is enabled, and when not on WASM."
669-
)
670-
)]
671680
async fn initialize(&self) -> Result<(), InitializeError> {
672681
self.validate_transaction_log_and_recover().await;
673682
let mut asset_infos = self.data.asset_infos.write().await;
@@ -962,6 +971,7 @@ impl AssetProcessor {
962971
}
963972
}
964973
}
974+
965975
// Note: this lock must remain alive until all processed asset and meta writes have finished (or failed)
966976
// See ProcessedAssetInfo::file_transaction_lock docs for more info
967977
let _transaction_lock = {
@@ -1002,6 +1012,7 @@ impl AssetProcessor {
10021012
new_processed_info.full_hash = full_hash;
10031013
*processed_meta.processed_info_mut() = Some(new_processed_info.clone());
10041014
let meta_bytes = processed_meta.serialize();
1015+
10051016
processed_writer
10061017
.write_meta_bytes(path, &meta_bytes)
10071018
.await
@@ -1565,7 +1576,5 @@ pub enum SetTransactionLogFactoryError {
15651576
AlreadyInUse,
15661577
}
15671578

1568-
// The asset processor currently requires multi_threaded.
1569-
#[cfg(feature = "multi_threaded")]
15701579
#[cfg(test)]
15711580
mod tests;

crates/bevy_asset/src/processor/tests.rs

Lines changed: 19 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ use crate::{
2121
AssetSource, AssetSourceId, Reader,
2222
},
2323
processor::{
24-
AssetProcessor, LoadTransformAndSave, LogEntry, ProcessorTransactionLog,
24+
AssetProcessor, LoadTransformAndSave, LogEntry, ProcessorState, ProcessorTransactionLog,
2525
ProcessorTransactionLogFactory,
2626
},
2727
saver::AssetSaver,
28-
tests::{CoolText, CoolTextLoader, CoolTextRon, SubText},
28+
tests::{run_app_until, CoolText, CoolTextLoader, CoolTextRon, SubText},
2929
transformer::{AssetTransformer, TransformedAsset},
3030
Asset, AssetApp, AssetLoader, AssetMode, AssetPath, AssetPlugin, LoadContext,
3131
};
@@ -120,6 +120,18 @@ fn create_app_with_asset_processor() -> AppWithProcessor {
120120
}
121121
}
122122

123+
fn run_app_until_finished_processing(app: &mut App) {
124+
run_app_until(app, |world| {
125+
if bevy_tasks::block_on(world.resource::<AssetProcessor>().get_state())
126+
== ProcessorState::Finished
127+
{
128+
Some(())
129+
} else {
130+
None
131+
}
132+
});
133+
}
134+
123135
struct CoolTextSaver;
124136

125137
impl AssetSaver for CoolTextSaver {
@@ -211,16 +223,7 @@ fn no_meta_or_default_processor_copies_asset() {
211223

212224
source_dir.insert_asset_text(path, source_asset);
213225

214-
// Start the app, which also starts the asset processor.
215-
app.update();
216-
217-
// Wait for all processing to finish.
218-
bevy_tasks::block_on(
219-
app.world()
220-
.resource::<AssetProcessor>()
221-
.data()
222-
.wait_until_finished(),
223-
);
226+
run_app_until_finished_processing(&mut app);
224227

225228
let processed_asset = processed_dir.get_asset(path).unwrap();
226229
let processed_asset = str::from_utf8(processed_asset.value()).unwrap();
@@ -266,16 +269,7 @@ fn asset_processor_transforms_asset_default_processor() {
266269
)"#,
267270
);
268271

269-
// Start the app, which also starts the asset processor.
270-
app.update();
271-
272-
// Wait for all processing to finish.
273-
bevy_tasks::block_on(
274-
app.world()
275-
.resource::<AssetProcessor>()
276-
.data()
277-
.wait_until_finished(),
278-
);
272+
run_app_until_finished_processing(&mut app);
279273

280274
let processed_asset = processed_dir.get_asset(path).unwrap();
281275
let processed_asset = str::from_utf8(processed_asset.value()).unwrap();
@@ -334,16 +328,7 @@ fn asset_processor_transforms_asset_with_meta() {
334328
),
335329
)"#);
336330

337-
// Start the app, which also starts the asset processor.
338-
app.update();
339-
340-
// Wait for all processing to finish.
341-
bevy_tasks::block_on(
342-
app.world()
343-
.resource::<AssetProcessor>()
344-
.data()
345-
.wait_until_finished(),
346-
);
331+
run_app_until_finished_processing(&mut app);
347332

348333
let processed_asset = processed_dir.get_asset(path).unwrap();
349334
let processed_asset = str::from_utf8(processed_asset.value()).unwrap();
@@ -536,16 +521,7 @@ fn asset_processor_loading_can_read_processed_assets() {
536521
)"#,
537522
);
538523

539-
// Start the app, which also starts the asset processor.
540-
app.update();
541-
542-
// Wait for all processing to finish.
543-
bevy_tasks::block_on(
544-
app.world()
545-
.resource::<AssetProcessor>()
546-
.data()
547-
.wait_until_finished(),
548-
);
524+
run_app_until_finished_processing(&mut app);
549525

550526
let processed_bsn = processed_dir.get_asset(bsn_path).unwrap();
551527
let processed_bsn = str::from_utf8(processed_bsn.value()).unwrap();
@@ -700,16 +676,7 @@ fn asset_processor_loading_can_read_source_assets() {
700676
)"#,
701677
);
702678

703-
// Start the app, which also starts the asset processor.
704-
app.update();
705-
706-
// Wait for all processing to finish.
707-
bevy_tasks::block_on(
708-
app.world()
709-
.resource::<AssetProcessor>()
710-
.data()
711-
.wait_until_finished(),
712-
);
679+
run_app_until_finished_processing(&mut app);
713680

714681
// Sanity check that the two gltf files were actually processed.
715682
let processed_gltf_1 = processed_dir.get_asset(gltf_path_1).unwrap();

0 commit comments

Comments
 (0)