From 272332b555630b4e06a8e64ea4e60dee205966c0 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Mon, 6 Oct 2025 23:34:41 -0700 Subject: [PATCH 1/3] Replace crossbeam_channel with async_channel for hot reloading. --- crates/bevy_asset/Cargo.toml | 4 +-- .../src/io/embedded/embedded_watcher.rs | 6 ++-- crates/bevy_asset/src/io/file/file_watcher.rs | 4 +-- crates/bevy_asset/src/io/source.rs | 29 +++++++++---------- crates/bevy_asset/src/lib.rs | 10 +++---- crates/bevy_asset/src/processor/mod.rs | 3 +- crates/bevy_asset/src/server/mod.rs | 4 +-- 7 files changed, 28 insertions(+), 32 deletions(-) diff --git a/crates/bevy_asset/Cargo.toml b/crates/bevy_asset/Cargo.toml index 85a3e52bbbaf9..dfe5890b5fb30 100644 --- a/crates/bevy_asset/Cargo.toml +++ b/crates/bevy_asset/Cargo.toml @@ -45,6 +45,7 @@ async-broadcast = { version = "0.7.2", default-features = false } async-fs = { version = "2.0", default-features = false } async-lock = { version = "3.0", default-features = false } bitflags = { version = "2.3", default-features = false } +async-channel = { version = "2", default-features = false } crossbeam-channel = { version = "0.5", default-features = false, features = [ "std", ] } @@ -91,9 +92,6 @@ notify-debouncer-full = { version = "0.5.0", default-features = false, optional ureq = { version = "3", optional = true, default-features = false } blocking = { version = "1.6", optional = true } -[dev-dependencies] -async-channel = "2" - [lints] workspace = true diff --git a/crates/bevy_asset/src/io/embedded/embedded_watcher.rs b/crates/bevy_asset/src/io/embedded/embedded_watcher.rs index b2e8e7c3fadbe..454c23bf596f5 100644 --- a/crates/bevy_asset/src/io/embedded/embedded_watcher.rs +++ b/crates/bevy_asset/src/io/embedded/embedded_watcher.rs @@ -28,7 +28,7 @@ impl EmbeddedWatcher { pub fn new( dir: Dir, root_paths: Arc, PathBuf>>>, - sender: crossbeam_channel::Sender, + sender: async_channel::Sender, debounce_wait_time: Duration, ) -> Self { let root = get_base_path(); @@ -50,7 +50,7 @@ impl AssetWatcher for EmbeddedWatcher {} /// binary-embedded Rust source files. This will read the contents of changed files from the file system and overwrite /// the initial static bytes from the file embedded in the binary. pub(crate) struct EmbeddedEventHandler { - sender: crossbeam_channel::Sender, + sender: async_channel::Sender, root_paths: Arc, PathBuf>>>, root: PathBuf, dir: Dir, @@ -90,7 +90,7 @@ impl FilesystemEventHandler for EmbeddedEventHandler { } } self.last_event = Some(event.clone()); - self.sender.send(event).unwrap(); + self.sender.send_blocking(event).unwrap(); } } } diff --git a/crates/bevy_asset/src/io/file/file_watcher.rs b/crates/bevy_asset/src/io/file/file_watcher.rs index e70cf1665f274..ebe508f5ca02c 100644 --- a/crates/bevy_asset/src/io/file/file_watcher.rs +++ b/crates/bevy_asset/src/io/file/file_watcher.rs @@ -3,8 +3,8 @@ use crate::{ path::normalize_path, }; use alloc::borrow::ToOwned; +use async_channel::Sender; use core::time::Duration; -use crossbeam_channel::Sender; use notify_debouncer_full::{ new_debouncer, notify::{ @@ -269,7 +269,7 @@ impl FilesystemEventHandler for FileEventHandler { fn handle(&mut self, _absolute_paths: &[PathBuf], event: AssetSourceEvent) { if self.last_event.as_ref() != Some(&event) { self.last_event = Some(event.clone()); - self.sender.send(event).unwrap(); + self.sender.send_blocking(event).unwrap(); } } } diff --git a/crates/bevy_asset/src/io/source.rs b/crates/bevy_asset/src/io/source.rs index 8cca30b885345..9f25b3edc631c 100644 --- a/crates/bevy_asset/src/io/source.rs +++ b/crates/bevy_asset/src/io/source.rs @@ -125,7 +125,7 @@ pub struct AssetSourceBuilder { /// The [`AssetWatcher`] to use for unprocessed assets, if any. pub watcher: Option< Box< - dyn FnMut(crossbeam_channel::Sender) -> Option> + dyn FnMut(async_channel::Sender) -> Option> + Send + Sync, >, @@ -138,7 +138,7 @@ pub struct AssetSourceBuilder { /// The [`AssetWatcher`] to use for processed assets, if any. pub processed_watcher: Option< Box< - dyn FnMut(crossbeam_channel::Sender) -> Option> + dyn FnMut(async_channel::Sender) -> Option> + Send + Sync, >, @@ -174,7 +174,7 @@ impl AssetSourceBuilder { }; if watch { - let (sender, receiver) = crossbeam_channel::unbounded(); + let (sender, receiver) = async_channel::unbounded(); match self.watcher.as_mut().and_then(|w| w(sender)) { Some(w) => { source.watcher = Some(w); @@ -189,7 +189,7 @@ impl AssetSourceBuilder { } if watch_processed { - let (sender, receiver) = crossbeam_channel::unbounded(); + let (sender, receiver) = async_channel::unbounded(); match self.processed_watcher.as_mut().and_then(|w| w(sender)) { Some(w) => { source.processed_watcher = Some(w); @@ -226,7 +226,7 @@ impl AssetSourceBuilder { /// Will use the given `watcher` function to construct unprocessed [`AssetWatcher`] instances. pub fn with_watcher( mut self, - watcher: impl FnMut(crossbeam_channel::Sender) -> Option> + watcher: impl FnMut(async_channel::Sender) -> Option> + Send + Sync + 'static, @@ -256,7 +256,7 @@ impl AssetSourceBuilder { /// Will use the given `watcher` function to construct processed [`AssetWatcher`] instances. pub fn with_processed_watcher( mut self, - watcher: impl FnMut(crossbeam_channel::Sender) -> Option> + watcher: impl FnMut(async_channel::Sender) -> Option> + Send + Sync + 'static, @@ -377,8 +377,8 @@ pub struct AssetSource { processed_writer: Option>, watcher: Option>, processed_watcher: Option>, - event_receiver: Option>, - processed_event_receiver: Option>, + event_receiver: Option>, + processed_event_receiver: Option>, } impl AssetSource { @@ -429,15 +429,13 @@ impl AssetSource { /// Return's this source's unprocessed event receiver, if the source is currently watching for changes. #[inline] - pub fn event_receiver(&self) -> Option<&crossbeam_channel::Receiver> { + pub fn event_receiver(&self) -> Option<&async_channel::Receiver> { self.event_receiver.as_ref() } /// Return's this source's processed event receiver, if the source is currently watching for changes. #[inline] - pub fn processed_event_receiver( - &self, - ) -> Option<&crossbeam_channel::Receiver> { + pub fn processed_event_receiver(&self) -> Option<&async_channel::Receiver> { self.processed_event_receiver.as_ref() } @@ -517,10 +515,9 @@ impl AssetSource { pub fn get_default_watcher( path: String, file_debounce_wait_time: Duration, - ) -> impl FnMut(crossbeam_channel::Sender) -> Option> - + Send - + Sync { - move |sender: crossbeam_channel::Sender| { + ) -> impl FnMut(async_channel::Sender) -> Option> + Send + Sync + { + move |sender: async_channel::Sender| { #[cfg(all( feature = "file_watcher", not(target_arch = "wasm32"), diff --git a/crates/bevy_asset/src/lib.rs b/crates/bevy_asset/src/lib.rs index 03da015711faa..f0205c9cd9027 100644 --- a/crates/bevy_asset/src/lib.rs +++ b/crates/bevy_asset/src/lib.rs @@ -737,6 +737,7 @@ mod tests { vec, vec::Vec, }; + use async_channel::{Receiver, Sender}; use bevy_app::{App, TaskPoolPlugin, Update}; use bevy_diagnostic::{DiagnosticsPlugin, DiagnosticsStore}; use bevy_ecs::{ @@ -750,7 +751,6 @@ mod tests { }; use bevy_reflect::TypePath; use core::time::Duration; - use crossbeam_channel::Sender; use serde::{Deserialize, Serialize}; use std::path::{Path, PathBuf}; use thiserror::Error; @@ -2153,8 +2153,8 @@ mod tests { // we've selected the reader. The GatedReader blocks this process, so we need to wait until // we gate in the loader instead. struct GatedLoader { - in_loader_sender: async_channel::Sender<()>, - gate_receiver: async_channel::Receiver<()>, + in_loader_sender: Sender<()>, + gate_receiver: Receiver<()>, } impl AssetLoader for GatedLoader { @@ -2381,7 +2381,7 @@ mod tests { // Sending an asset event should result in the asset being reloaded - resulting in a // "Modified" message. source_events - .send(AssetSourceEvent::ModifiedAsset(PathBuf::from( + .send_blocking(AssetSourceEvent::ModifiedAsset(PathBuf::from( "abc.cool.ron", ))) .unwrap(); @@ -2436,7 +2436,7 @@ mod tests { )"#, ); source_events - .send(AssetSourceEvent::AddedAsset(PathBuf::from("abc.cool.ron"))) + .send_blocking(AssetSourceEvent::AddedAsset(PathBuf::from("abc.cool.ron"))) .unwrap(); run_app_until(&mut app, |world| { diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index dcc74bd2d74f0..1b4c3a075b138 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -274,7 +274,8 @@ impl AssetProcessor { for source in self.data.sources.iter_processed() { if let Some(receiver) = source.event_receiver() { - for event in receiver.try_iter() { + // TODO: Handle TryRecvError::Closed. + while let Ok(event) = receiver.try_recv() { if !started_processing { self.set_state(ProcessorState::Processing).await; started_processing = true; diff --git a/crates/bevy_asset/src/server/mod.rs b/crates/bevy_asset/src/server/mod.rs index 49ed00233fe6d..3ae2ee1af6971 100644 --- a/crates/bevy_asset/src/server/mod.rs +++ b/crates/bevy_asset/src/server/mod.rs @@ -1836,14 +1836,14 @@ pub fn handle_internal_asset_events(world: &mut World) { match server.data.mode { AssetServerMode::Unprocessed => { if let Some(receiver) = source.event_receiver() { - for event in receiver.try_iter() { + while let Ok(event) = receiver.try_recv() { handle_event(source.id(), event); } } } AssetServerMode::Processed => { if let Some(receiver) = source.processed_event_receiver() { - for event in receiver.try_iter() { + while let Ok(event) = receiver.try_recv() { handle_event(source.id(), event); } } From 4a1412e03e38cf25cc9efc13b1befa3a10cf0489 Mon Sep 17 00:00:00 2001 From: andriyDev Date: Mon, 6 Oct 2025 23:15:31 -0700 Subject: [PATCH 2/3] Allow using the asset processor in single-threaded mode. --- crates/bevy_asset/Cargo.toml | 3 + crates/bevy_asset/src/processor/mod.rs | 141 ++++++++++++----------- crates/bevy_asset/src/processor/tests.rs | 71 +++--------- 3 files changed, 97 insertions(+), 118 deletions(-) diff --git a/crates/bevy_asset/Cargo.toml b/crates/bevy_asset/Cargo.toml index dfe5890b5fb30..8d529b045d525 100644 --- a/crates/bevy_asset/Cargo.toml +++ b/crates/bevy_asset/Cargo.toml @@ -52,6 +52,9 @@ crossbeam-channel = { version = "0.5", default-features = false, features = [ downcast-rs = { version = "2", default-features = false } disqualified = { version = "1.0", default-features = false } either = { version = "1.13", default-features = false } +futures-util = { version = "0.3", default-features = false, features = [ + "alloc", +] } futures-io = { version = "0.3", default-features = false } futures-lite = { version = "2.0.1", default-features = false } blake3 = { version = "1.5", default-features = false } diff --git a/crates/bevy_asset/src/processor/mod.rs b/crates/bevy_asset/src/processor/mod.rs index 1b4c3a075b138..63222795a7b0e 100644 --- a/crates/bevy_asset/src/processor/mod.rs +++ b/crates/bevy_asset/src/processor/mod.rs @@ -40,6 +40,7 @@ mod log; mod process; +use bevy_tasks::BoxedFuture; pub use log::*; pub use process::*; @@ -62,9 +63,10 @@ use bevy_platform::{ collections::{HashMap, HashSet}, sync::{PoisonError, RwLock}, }; -use bevy_tasks::IoTaskPool; +use bevy_tasks::{poll_once, IoTaskPool}; use futures_io::ErrorKind; use futures_lite::{AsyncReadExt, AsyncWriteExt, StreamExt}; +use futures_util::{future::join_all, stream::select_all}; use std::{ path::{Path, PathBuf}, sync::Mutex, @@ -222,16 +224,13 @@ impl AssetProcessor { /// Starts the processor in a background thread. pub fn start(_processor: Res) { - #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] - error!("Cannot run AssetProcessor in single threaded mode (or Wasm) yet."); - #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] - { - let processor = _processor.clone(); - std::thread::spawn(move || { - processor.process_assets(); - bevy_tasks::block_on(processor.listen_for_source_change_events()); - }); - } + let processor = _processor.clone(); + IoTaskPool::get() + .spawn(async move { + processor.process_assets().await; + processor.listen_for_source_change_events().await; + }) + .detach(); } /// Processes all assets. This will: @@ -244,23 +243,20 @@ impl AssetProcessor { /// * For each asset in the unprocessed [`AssetReader`](crate::io::AssetReader), kick off a new /// "process job", which will process the asset /// (if the latest version of the asset has not been processed). - #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] - pub fn process_assets(&self) { + pub async fn process_assets(&self) { let start_time = std::time::Instant::now(); debug!("Processing Assets"); - IoTaskPool::get().scope(|scope| { - scope.spawn(async move { - self.initialize().await.unwrap(); - for source in self.sources().iter_processed() { - self.process_assets_internal(scope, source, PathBuf::from("")) - .await - .unwrap(); - } - }); - }); - // This must happen _after_ the scope resolves or it will happen "too early" - // Don't move this into the async scope above! process_assets is a blocking/sync function this is fine - bevy_tasks::block_on(self.finish_processing_assets()); + let mut tasks = vec![]; + self.initialize().await.unwrap(); + for source in self.sources().iter_processed() { + self.process_assets_internal(source, PathBuf::from(""), &mut tasks) + .await + .unwrap(); + } + + join_all(tasks).await; + + self.finish_processing_assets().await; let end_time = std::time::Instant::now(); debug!("Processing finished in {:?}", end_time - start_time); } @@ -269,26 +265,51 @@ impl AssetProcessor { // PERF: parallelize change event processing pub async fn listen_for_source_change_events(&self) { debug!("Listening for changes to source assets"); - loop { - let mut started_processing = false; - - for source in self.data.sources.iter_processed() { - if let Some(receiver) = source.event_receiver() { - // TODO: Handle TryRecvError::Closed. - while let Ok(event) = receiver.try_recv() { - if !started_processing { - self.set_state(ProcessorState::Processing).await; - started_processing = true; - } - self.handle_asset_source_event(source, event).await; + // Collect all the event receivers and select across all of them. + let mut source_receivers = vec![]; + for source in self.data.sources.iter_processed() { + let Some(receiver) = source.event_receiver() else { + continue; + }; + let source_id = source.id(); + let source_receiver = receiver + .clone() + .map(move |event| (source_id.clone(), event)); + source_receivers.push(Box::pin(source_receiver)); + } + let mut all_receiver = select_all(source_receivers); + + // We want to await an entry in the stream, but once we have one, we want to process all the + // events in the channels before sending the "finished processing" state. So await the next + // item, then keep getting the next entry in the stream until we have to sleep. + let mut next = None; + while let Some(mut item) = { + if next.is_none() { + next = Some(all_receiver.next()); + } + next.take().unwrap() + } + .await + { + self.set_state(ProcessorState::Processing).await; + + loop { + let (source_id, event) = item; + self.handle_asset_source_event(self.data.sources.get(source_id).unwrap(), event) + .await; + let mut next_next = all_receiver.next(); + item = match poll_once(&mut next_next).await { + None => { + next = Some(next_next); + break; } - } + Some(None) => return, + Some(Some(item)) => item, + }; } - if started_processing { - self.finish_processing_assets().await; - } + self.finish_processing_assets().await; } } @@ -448,16 +469,12 @@ impl AssetProcessor { "Folder {} was added. Attempting to re-process", AssetPath::from_path(&path).with_source(source.id()) ); - #[cfg(any(target_arch = "wasm32", not(feature = "multi_threaded")))] - error!("AddFolder event cannot be handled in single threaded mode (or Wasm) yet."); - #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] - IoTaskPool::get().scope(|scope| { - scope.spawn(async move { - self.process_assets_internal(scope, source, path) - .await - .unwrap(); - }); - }); + let mut tasks = vec![]; + self.process_assets_internal(source, path, &mut tasks) + .await + .unwrap(); + + join_all(tasks).await; } /// Responds to a removed meta event by reprocessing the asset at the given path. @@ -567,24 +584,23 @@ impl AssetProcessor { self.set_state(ProcessorState::Finished).await; } - #[cfg(all(not(target_arch = "wasm32"), feature = "multi_threaded"))] async fn process_assets_internal<'scope>( &'scope self, - scope: &'scope bevy_tasks::Scope<'scope, '_, ()>, source: &'scope AssetSource, path: PathBuf, + tasks: &mut Vec>, ) -> Result<(), AssetReaderError> { if source.reader().is_directory(&path).await? { let mut path_stream = source.reader().read_directory(&path).await?; while let Some(path) = path_stream.next().await { - Box::pin(self.process_assets_internal(scope, source, path)).await?; + Box::pin(self.process_assets_internal(source, path, tasks)).await?; } } else { // Files without extensions are skipped let processor = self.clone(); - scope.spawn(async move { + tasks.push(Box::pin(async move { processor.process_asset(source, path).await; - }); + })); } Ok(()) } @@ -661,13 +677,6 @@ impl AssetProcessor { /// This info will later be used to determine whether or not to re-process an asset /// /// This will validate transactions and recover failed transactions when necessary. - #[cfg_attr( - any(target_arch = "wasm32", not(feature = "multi_threaded")), - expect( - dead_code, - reason = "This function is only used when the `multi_threaded` feature is enabled, and when not on WASM." - ) - )] async fn initialize(&self) -> Result<(), InitializeError> { self.validate_transaction_log_and_recover().await; let mut asset_infos = self.data.asset_infos.write().await; @@ -962,6 +971,7 @@ impl AssetProcessor { } } } + // Note: this lock must remain alive until all processed asset and meta writes have finished (or failed) // See ProcessedAssetInfo::file_transaction_lock docs for more info let _transaction_lock = { @@ -1002,6 +1012,7 @@ impl AssetProcessor { new_processed_info.full_hash = full_hash; *processed_meta.processed_info_mut() = Some(new_processed_info.clone()); let meta_bytes = processed_meta.serialize(); + processed_writer .write_meta_bytes(path, &meta_bytes) .await @@ -1565,7 +1576,5 @@ pub enum SetTransactionLogFactoryError { AlreadyInUse, } -// The asset processor currently requires multi_threaded. -#[cfg(feature = "multi_threaded")] #[cfg(test)] mod tests; diff --git a/crates/bevy_asset/src/processor/tests.rs b/crates/bevy_asset/src/processor/tests.rs index 25208f759fe4c..3cf62817a2159 100644 --- a/crates/bevy_asset/src/processor/tests.rs +++ b/crates/bevy_asset/src/processor/tests.rs @@ -21,11 +21,11 @@ use crate::{ AssetSource, AssetSourceId, Reader, }, processor::{ - AssetProcessor, LoadTransformAndSave, LogEntry, ProcessorTransactionLog, + AssetProcessor, LoadTransformAndSave, LogEntry, ProcessorState, ProcessorTransactionLog, ProcessorTransactionLogFactory, }, saver::AssetSaver, - tests::{CoolText, CoolTextLoader, CoolTextRon, SubText}, + tests::{run_app_until, CoolText, CoolTextLoader, CoolTextRon, SubText}, transformer::{AssetTransformer, TransformedAsset}, Asset, AssetApp, AssetLoader, AssetMode, AssetPath, AssetPlugin, LoadContext, }; @@ -120,6 +120,18 @@ fn create_app_with_asset_processor() -> AppWithProcessor { } } +fn run_app_until_finished_processing(app: &mut App) { + run_app_until(app, |world| { + if bevy_tasks::block_on(world.resource::().get_state()) + == ProcessorState::Finished + { + Some(()) + } else { + None + } + }); +} + struct CoolTextSaver; impl AssetSaver for CoolTextSaver { @@ -211,16 +223,7 @@ fn no_meta_or_default_processor_copies_asset() { source_dir.insert_asset_text(path, source_asset); - // Start the app, which also starts the asset processor. - app.update(); - - // Wait for all processing to finish. - bevy_tasks::block_on( - app.world() - .resource::() - .data() - .wait_until_finished(), - ); + run_app_until_finished_processing(&mut app); let processed_asset = processed_dir.get_asset(path).unwrap(); let processed_asset = str::from_utf8(processed_asset.value()).unwrap(); @@ -266,16 +269,7 @@ fn asset_processor_transforms_asset_default_processor() { )"#, ); - // Start the app, which also starts the asset processor. - app.update(); - - // Wait for all processing to finish. - bevy_tasks::block_on( - app.world() - .resource::() - .data() - .wait_until_finished(), - ); + run_app_until_finished_processing(&mut app); let processed_asset = processed_dir.get_asset(path).unwrap(); let processed_asset = str::from_utf8(processed_asset.value()).unwrap(); @@ -334,16 +328,7 @@ fn asset_processor_transforms_asset_with_meta() { ), )"#); - // Start the app, which also starts the asset processor. - app.update(); - - // Wait for all processing to finish. - bevy_tasks::block_on( - app.world() - .resource::() - .data() - .wait_until_finished(), - ); + run_app_until_finished_processing(&mut app); let processed_asset = processed_dir.get_asset(path).unwrap(); let processed_asset = str::from_utf8(processed_asset.value()).unwrap(); @@ -536,16 +521,7 @@ fn asset_processor_loading_can_read_processed_assets() { )"#, ); - // Start the app, which also starts the asset processor. - app.update(); - - // Wait for all processing to finish. - bevy_tasks::block_on( - app.world() - .resource::() - .data() - .wait_until_finished(), - ); + run_app_until_finished_processing(&mut app); let processed_bsn = processed_dir.get_asset(bsn_path).unwrap(); let processed_bsn = str::from_utf8(processed_bsn.value()).unwrap(); @@ -700,16 +676,7 @@ fn asset_processor_loading_can_read_source_assets() { )"#, ); - // Start the app, which also starts the asset processor. - app.update(); - - // Wait for all processing to finish. - bevy_tasks::block_on( - app.world() - .resource::() - .data() - .wait_until_finished(), - ); + run_app_until_finished_processing(&mut app); // Sanity check that the two gltf files were actually processed. let processed_gltf_1 = processed_dir.get_asset(gltf_path_1).unwrap(); From 0b12215e81938ffa2da7d252cb78ea547e963d5c Mon Sep 17 00:00:00 2001 From: andriyDev Date: Tue, 21 Oct 2025 15:02:00 -0700 Subject: [PATCH 3/3] Create a migration guide. --- .../migration-guides/asset_watcher_async_sender.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 release-content/migration-guides/asset_watcher_async_sender.md diff --git a/release-content/migration-guides/asset_watcher_async_sender.md b/release-content/migration-guides/asset_watcher_async_sender.md new file mode 100644 index 0000000000000..60b1383e5c23f --- /dev/null +++ b/release-content/migration-guides/asset_watcher_async_sender.md @@ -0,0 +1,12 @@ +--- +title: AssetSources now give an `async_channel::Sender` instead of a `crossbeam_channel::Sender` +pull_requests: [] +--- + +Previously, when creating an asset source, `AssetSourceBuilder::with_watcher` would provide users +with a `crossbeam_channel::Sender`. Now, this has been changed to `async_channel::Sender`. + +If you were previously calling `sender.send(AssetSourceEvent::ModifiedAsset("hello".into()))`, now +it would be `sender.send_blocking(AssetSourceEvent::ModifiedAsset("hello".into()))`. These channels +are very comparable, so finding an analogous method between `crossbeam_channel` and `async_channel` +should be straight forward.