diff --git a/Cargo.lock b/Cargo.lock index 69e8f2c2407f..11b37fa60b72 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3395,8 +3395,10 @@ name = "rspack_util" version = "0.1.0" dependencies = [ "concat-string", + "dashmap", "once_cell", "regex", + "rustc-hash", "sugar_path", "swc_core", ] diff --git a/crates/rspack_core/src/compiler/compilation.rs b/crates/rspack_core/src/compiler/compilation.rs index c4e079f96469..b51b49f4647c 100644 --- a/crates/rspack_core/src/compiler/compilation.rs +++ b/crates/rspack_core/src/compiler/compilation.rs @@ -32,7 +32,7 @@ use super::{ use crate::{ build_chunk_graph::build_chunk_graph, cache::{use_code_splitting_cache, Cache, CodeSplittingCache}, - is_source_equal, + create_queue_handle, is_source_equal, tree_shaking::{optimizer, visitor::SymbolRef, BailoutFlag, OptimizeDependencyResult}, AddQueue, AddTask, AddTaskResult, AdditionalChunkRuntimeRequirementsArgs, AdditionalModuleRequirementsArgs, AsyncDependenciesBlock, BoxDependency, BoxModule, BuildQueue, @@ -44,7 +44,7 @@ use crate::{ FactorizeQueue, FactorizeTask, FactorizeTaskResult, Filename, Logger, Module, ModuleCreationCallback, ModuleFactory, ModuleGraph, ModuleIdentifier, ModuleProfile, PathData, ProcessAssetsArgs, ProcessDependenciesQueue, ProcessDependenciesResult, ProcessDependenciesTask, - RenderManifestArgs, Resolve, ResolverFactory, RuntimeGlobals, RuntimeModule, + QueueHandler, RenderManifestArgs, Resolve, ResolverFactory, RuntimeGlobals, RuntimeModule, RuntimeRequirementsInTreeArgs, RuntimeSpec, SharedPluginDriver, SourceType, Stats, TaskResult, WorkerTask, }; @@ -111,6 +111,8 @@ pub struct Compilation { pub build_dependencies: IndexSet>, pub side_effects_free_modules: IdentifierSet, pub module_item_map: IdentifierMap>, + + pub queue_handle: Option, } impl Compilation { @@ -172,6 +174,8 @@ impl Compilation { side_effects_free_modules: IdentifierSet::default(), module_item_map: IdentifierMap::default(), include_module_ids: IdentifierSet::default(), + + queue_handle: None, } } @@ -487,14 +491,19 @@ impl Compilation { let mut active_task_count = 0usize; let is_expected_shutdown = Arc::new(AtomicBool::new(false)); let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel::>(); + let mut factorize_queue = FactorizeQueue::new(); let mut add_queue = AddQueue::new(); let mut build_queue = BuildQueue::new(); let mut process_dependencies_queue = ProcessDependenciesQueue::new(); + let mut make_failed_dependencies: HashSet = HashSet::default(); let mut make_failed_module = HashSet::default(); let mut errored = None; + let (handler, mut processor) = create_queue_handle(); + self.queue_handle = Some(handler); + deps_builder .revoke_modules(&mut self.module_graph) .into_iter() @@ -545,6 +554,14 @@ impl Compilation { tokio::task::block_in_place(|| loop { let start = factorize_time.start(); + processor.try_process( + self, + &mut factorize_queue, + &mut add_queue, + &mut build_queue, + &mut process_dependencies_queue, + ); + while let Some(task) = factorize_queue.get_task() { tokio::spawn({ let result_tx = result_tx.clone(); @@ -687,6 +704,37 @@ impl Compilation { match result_rx.try_recv() { Ok(item) => { + if let Ok(item) = &item { + match item { + TaskResult::Factorize(result) => processor.flush_callback( + crate::TaskCategory::Factorize, + &result.dependency.to_string(), + result.module_graph_module.module_identifier, + self, + ), + TaskResult::Add(result) => { + let module = match result.as_ref() { + AddTaskResult::ModuleReused { module } => module.identifier(), + AddTaskResult::ModuleAdded { module, .. } => module.identifier(), + }; + + processor.flush_callback(crate::TaskCategory::Add, &module, module, self) + } + TaskResult::Build(result) => { + let id = result.module.identifier(); + processor.flush_callback(crate::TaskCategory::Build, &id, id, self); + } + TaskResult::ProcessDependencies(result) => { + processor.flush_callback( + crate::TaskCategory::ProcessDependencies, + &result.module_identifier, + result.module_identifier, + self, + ); + } + } + } + match item { Ok(TaskResult::Factorize(box task_result)) => { let FactorizeTaskResult { diff --git a/crates/rspack_core/src/compiler/queue.rs b/crates/rspack_core/src/compiler/queue.rs index 0f75f77f8f80..a8e84efce514 100644 --- a/crates/rspack_core/src/compiler/queue.rs +++ b/crates/rspack_core/src/compiler/queue.rs @@ -1,6 +1,8 @@ use std::sync::Arc; use rspack_error::{Diagnostic, IntoTWithDiagnosticArray, Result}; +use rustc_hash::FxHashMap; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use crate::{ cache::Cache, BoxDependency, BuildContext, BuildResult, Compilation, CompilerContext, @@ -42,6 +44,30 @@ pub struct FactorizeTask { pub callback: Option, } +impl std::fmt::Debug for FactorizeTask { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FactorizeTask") + .field( + "original_module_identifier", + &self.original_module_identifier, + ) + .field("original_module_context", &self.original_module_context) + .field("issuer", &self.issuer) + .field("dependency", &self.dependency) + .field("dependencies", &self.dependencies) + .field("is_entry", &self.is_entry) + .field("resolve_options", &self.resolve_options) + .field("resolver_factory", &self.resolver_factory) + .field("loader_resolver_factory", &self.loader_resolver_factory) + .field("options", &self.options) + .field("lazy_visit_modules", &self.lazy_visit_modules) + .field("plugin_driver", &self.plugin_driver) + .field("cache", &self.cache) + .field("current_profile", &self.current_profile) + .finish() + } +} + /// a struct temporarily used creating ExportsInfo #[derive(Debug)] pub struct ExportsInfoRelated { @@ -51,6 +77,7 @@ pub struct ExportsInfoRelated { } pub struct FactorizeTaskResult { + pub dependency: DependencyId, pub original_module_identifier: Option, pub factory_result: ModuleFactoryResult, pub module_graph_module: Box, @@ -89,6 +116,7 @@ impl WorkerTask for FactorizeTask { current_profile.mark_factory_start(); } let dependency = self.dependency; + let dep_id = *dependency.id(); let context = if let Some(context) = dependency.get_context() { context @@ -129,6 +157,7 @@ impl WorkerTask for FactorizeTask { } Ok(TaskResult::Factorize(Box::new(FactorizeTaskResult { + dependency: dep_id, is_entry: self.is_entry, original_module_identifier: self.original_module_identifier, from_cache: result.from_cache, @@ -159,6 +188,22 @@ pub struct AddTask { pub callback: Option, } +impl std::fmt::Debug for AddTask { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AddTask") + .field( + "original_module_identifier", + &self.original_module_identifier, + ) + .field("module", &self.module) + .field("module_graph_module", &self.module_graph_module) + .field("dependencies", &self.dependencies) + .field("is_entry", &self.is_entry) + .field("current_profile", &self.current_profile) + .finish() + } +} + #[derive(Debug)] pub enum AddTaskResult { ModuleReused { @@ -244,6 +289,7 @@ fn set_resolved_module( pub type AddQueue = WorkerQueue; +#[derive(Debug)] pub struct BuildTask { pub module: Box, pub resolver_factory: Arc, @@ -333,6 +379,7 @@ impl WorkerTask for BuildTask { pub type BuildQueue = WorkerQueue; +#[derive(Debug)] pub struct ProcessDependenciesTask { pub original_module_identifier: ModuleIdentifier, pub dependencies: Vec, @@ -399,3 +446,141 @@ impl CleanTask { pub type CleanQueue = WorkerQueue; pub type ModuleCreationCallback = Box; + +pub type QueueHandleCallback = Box; + +#[derive(Debug)] +pub enum QueueTask { + Factorize(Box), + Add(Box), + Build(Box), + ProcessDependencies(Box), + + Subscription(Box), +} + +#[derive(Debug, Copy, Clone)] +pub enum TaskCategory { + Factorize = 0, + Add = 1, + Build = 2, + ProcessDependencies = 3, +} + +pub struct Subscription { + category: TaskCategory, + key: String, + callback: QueueHandleCallback, +} + +impl std::fmt::Debug for Subscription { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Subscription") + .field("category", &self.category) + .finish() + } +} + +#[derive(Clone)] +pub struct QueueHandler { + sender: UnboundedSender, +} + +impl std::fmt::Debug for QueueHandler { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("QueueHandler") + .field("sender", &self.sender) + .finish() + } +} + +impl QueueHandler { + pub fn wait_for(&self, key: String, category: TaskCategory, callback: QueueHandleCallback) { + self + .sender + .send(QueueTask::Subscription(Box::new(Subscription { + category, + key, + callback, + }))) + .expect("failed to wait task"); + } +} + +pub struct QueueHandlerProcessor { + receiver: UnboundedReceiver, + callbacks: [FxHashMap>; 4], + finished: [FxHashMap; 4], +} + +impl QueueHandlerProcessor { + pub fn try_process( + &mut self, + compilation: &mut Compilation, + factorize_queue: &mut FactorizeQueue, + add_queue: &mut AddQueue, + build_queue: &mut BuildQueue, + process_dependencies_queue: &mut ProcessDependenciesQueue, + ) { + while let Ok(task) = self.receiver.try_recv() { + match task { + QueueTask::Factorize(task) => { + factorize_queue.add_task(*task); + } + QueueTask::Add(task) => { + add_queue.add_task(*task); + } + QueueTask::Build(task) => { + build_queue.add_task(*task); + } + QueueTask::ProcessDependencies(task) => { + process_dependencies_queue.add_task(*task); + } + QueueTask::Subscription(subscription) => { + let Subscription { + category, + key, + callback, + } = *subscription; + if let Some(module) = self.finished[category as usize].get(&key) { + // already finished + callback(*module, compilation); + } else { + self.callbacks[category as usize] + .entry(key) + .or_default() + .push(callback); + } + } + } + } + } + + pub fn flush_callback( + &mut self, + category: TaskCategory, + key: &str, + module: ModuleIdentifier, + compilation: &mut Compilation, + ) { + self.finished[category as usize].insert(key.into(), module); + if let Some(callbacks) = self.callbacks[category as usize].get_mut(key) { + while let Some(cb) = callbacks.pop() { + cb(module, compilation); + } + } + } +} + +pub fn create_queue_handle() -> (QueueHandler, QueueHandlerProcessor) { + let (tx, rx) = unbounded_channel(); + + ( + QueueHandler { sender: tx }, + QueueHandlerProcessor { + receiver: rx, + callbacks: Default::default(), + finished: Default::default(), + }, + ) +} diff --git a/crates/rspack_error/src/lib.rs b/crates/rspack_error/src/lib.rs index 93a1a8faaf7f..61385db55850 100644 --- a/crates/rspack_error/src/lib.rs +++ b/crates/rspack_error/src/lib.rs @@ -44,6 +44,10 @@ impl TWithDiagnosticArray { pub fn split_into_parts(self) -> (T, Vec) { (self.inner, self.diagnostic) } + + pub fn get(&self) -> &T { + &self.inner + } } impl Clone for TWithDiagnosticArray { diff --git a/crates/rspack_util/Cargo.toml b/crates/rspack_util/Cargo.toml index 25eb313ef0a0..856a57d7e117 100644 --- a/crates/rspack_util/Cargo.toml +++ b/crates/rspack_util/Cargo.toml @@ -9,8 +9,10 @@ version = "0.1.0" [dependencies] concat-string = { workspace = true } +dashmap = { workspace = true } once_cell = { workspace = true } regex = { workspace = true } +rustc-hash = { workspace = true } sugar_path = { workspace = true } swc_core = { workspace = true, features = ["ecma_ast"] } diff --git a/crates/rspack_util/src/fx_dashmap.rs b/crates/rspack_util/src/fx_dashmap.rs new file mode 100644 index 000000000000..d778e5e0a05a --- /dev/null +++ b/crates/rspack_util/src/fx_dashmap.rs @@ -0,0 +1,7 @@ +use std::hash::BuildHasherDefault; + +use dashmap::{DashMap, DashSet}; +use rustc_hash::FxHasher; + +pub type FxDashMap = DashMap>; +pub type FxDashSet = DashSet>; diff --git a/crates/rspack_util/src/lib.rs b/crates/rspack_util/src/lib.rs index fd252cce4eda..76cf7932513e 100644 --- a/crates/rspack_util/src/lib.rs +++ b/crates/rspack_util/src/lib.rs @@ -3,6 +3,7 @@ use std::future::Future; pub mod comparators; pub mod ext; +pub mod fx_dashmap; pub mod identifier; pub mod number_hash; pub mod swc;