Skip to content

Commit

Permalink
feat(compilation): add queue handlers representing task_queue
Browse files Browse the repository at this point in the history
  • Loading branch information
JSerFeng committed Dec 19, 2023
1 parent caa3be2 commit 7f8321d
Show file tree
Hide file tree
Showing 7 changed files with 251 additions and 2 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 50 additions & 2 deletions crates/rspack_core/src/compiler/compilation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use super::{
use crate::{
build_chunk_graph::build_chunk_graph,
cache::{use_code_splitting_cache, Cache, CodeSplittingCache},
is_source_equal,
create_queue_handle, is_source_equal,
tree_shaking::{optimizer, visitor::SymbolRef, BailoutFlag, OptimizeDependencyResult},
AddQueue, AddTask, AddTaskResult, AdditionalChunkRuntimeRequirementsArgs,
AdditionalModuleRequirementsArgs, AsyncDependenciesBlock, BoxDependency, BoxModule, BuildQueue,
Expand All @@ -44,7 +44,7 @@ use crate::{
FactorizeQueue, FactorizeTask, FactorizeTaskResult, Filename, Logger, Module,
ModuleCreationCallback, ModuleFactory, ModuleGraph, ModuleIdentifier, ModuleProfile, PathData,
ProcessAssetsArgs, ProcessDependenciesQueue, ProcessDependenciesResult, ProcessDependenciesTask,
RenderManifestArgs, Resolve, ResolverFactory, RuntimeGlobals, RuntimeModule,
QueueHandler, RenderManifestArgs, Resolve, ResolverFactory, RuntimeGlobals, RuntimeModule,
RuntimeRequirementsInTreeArgs, RuntimeSpec, SharedPluginDriver, SourceType, Stats, TaskResult,
WorkerTask,
};
Expand Down Expand Up @@ -111,6 +111,8 @@ pub struct Compilation {
pub build_dependencies: IndexSet<PathBuf, BuildHasherDefault<FxHasher>>,
pub side_effects_free_modules: IdentifierSet,
pub module_item_map: IdentifierMap<Vec<ModuleItem>>,

pub queue_handle: Option<QueueHandler>,
}

impl Compilation {
Expand Down Expand Up @@ -172,6 +174,8 @@ impl Compilation {
side_effects_free_modules: IdentifierSet::default(),
module_item_map: IdentifierMap::default(),
include_module_ids: IdentifierSet::default(),

queue_handle: None,
}
}

Expand Down Expand Up @@ -487,14 +491,19 @@ impl Compilation {
let mut active_task_count = 0usize;
let is_expected_shutdown = Arc::new(AtomicBool::new(false));
let (result_tx, mut result_rx) = tokio::sync::mpsc::unbounded_channel::<Result<TaskResult>>();

let mut factorize_queue = FactorizeQueue::new();
let mut add_queue = AddQueue::new();
let mut build_queue = BuildQueue::new();
let mut process_dependencies_queue = ProcessDependenciesQueue::new();

let mut make_failed_dependencies: HashSet<BuildDependency> = HashSet::default();
let mut make_failed_module = HashSet::default();
let mut errored = None;

let (handler, mut processor) = create_queue_handle();
self.queue_handle = Some(handler);

deps_builder
.revoke_modules(&mut self.module_graph)
.into_iter()
Expand Down Expand Up @@ -545,6 +554,14 @@ impl Compilation {

tokio::task::block_in_place(|| loop {
let start = factorize_time.start();
processor.try_process(
self,
&mut factorize_queue,
&mut add_queue,
&mut build_queue,
&mut process_dependencies_queue,
);

while let Some(task) = factorize_queue.get_task() {
tokio::spawn({
let result_tx = result_tx.clone();
Expand Down Expand Up @@ -687,6 +704,37 @@ impl Compilation {

match result_rx.try_recv() {
Ok(item) => {
if let Ok(item) = &item {
match item {
TaskResult::Factorize(result) => processor.flush_callback(
crate::TaskCategory::Factorize,
&result.dependency.to_string(),
result.module_graph_module.module_identifier,
self,
),
TaskResult::Add(result) => {
let module = match result.as_ref() {
AddTaskResult::ModuleReused { module } => module.identifier(),
AddTaskResult::ModuleAdded { module, .. } => module.identifier(),
};

processor.flush_callback(crate::TaskCategory::Add, &module, module, self)
}
TaskResult::Build(result) => {
let id = result.module.identifier();
processor.flush_callback(crate::TaskCategory::Build, &id, id, self);
}
TaskResult::ProcessDependencies(result) => {
processor.flush_callback(
crate::TaskCategory::ProcessDependencies,
&result.module_identifier,
result.module_identifier,
self,
);
}
}
}

match item {
Ok(TaskResult::Factorize(box task_result)) => {
let FactorizeTaskResult {
Expand Down
185 changes: 185 additions & 0 deletions crates/rspack_core/src/compiler/queue.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::sync::Arc;

use rspack_error::{Diagnostic, IntoTWithDiagnosticArray, Result};
use rustc_hash::FxHashMap;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};

use crate::{
cache::Cache, BoxDependency, BuildContext, BuildResult, Compilation, CompilerContext,
Expand Down Expand Up @@ -42,6 +44,30 @@ pub struct FactorizeTask {
pub callback: Option<ModuleCreationCallback>,
}

impl std::fmt::Debug for FactorizeTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FactorizeTask")
.field(
"original_module_identifier",
&self.original_module_identifier,
)
.field("original_module_context", &self.original_module_context)
.field("issuer", &self.issuer)
.field("dependency", &self.dependency)
.field("dependencies", &self.dependencies)
.field("is_entry", &self.is_entry)
.field("resolve_options", &self.resolve_options)
.field("resolver_factory", &self.resolver_factory)
.field("loader_resolver_factory", &self.loader_resolver_factory)
.field("options", &self.options)
.field("lazy_visit_modules", &self.lazy_visit_modules)
.field("plugin_driver", &self.plugin_driver)
.field("cache", &self.cache)
.field("current_profile", &self.current_profile)
.finish()
}
}

/// a struct temporarily used creating ExportsInfo
#[derive(Debug)]
pub struct ExportsInfoRelated {
Expand All @@ -51,6 +77,7 @@ pub struct ExportsInfoRelated {
}

pub struct FactorizeTaskResult {
pub dependency: DependencyId,
pub original_module_identifier: Option<ModuleIdentifier>,
pub factory_result: ModuleFactoryResult,
pub module_graph_module: Box<ModuleGraphModule>,
Expand Down Expand Up @@ -89,6 +116,7 @@ impl WorkerTask for FactorizeTask {
current_profile.mark_factory_start();
}
let dependency = self.dependency;
let dep_id = *dependency.id();

let context = if let Some(context) = dependency.get_context() {
context
Expand Down Expand Up @@ -129,6 +157,7 @@ impl WorkerTask for FactorizeTask {
}

Ok(TaskResult::Factorize(Box::new(FactorizeTaskResult {
dependency: dep_id,
is_entry: self.is_entry,
original_module_identifier: self.original_module_identifier,
from_cache: result.from_cache,
Expand Down Expand Up @@ -159,6 +188,22 @@ pub struct AddTask {
pub callback: Option<ModuleCreationCallback>,
}

impl std::fmt::Debug for AddTask {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AddTask")
.field(
"original_module_identifier",
&self.original_module_identifier,
)
.field("module", &self.module)
.field("module_graph_module", &self.module_graph_module)
.field("dependencies", &self.dependencies)
.field("is_entry", &self.is_entry)
.field("current_profile", &self.current_profile)
.finish()
}
}

#[derive(Debug)]
pub enum AddTaskResult {
ModuleReused {
Expand Down Expand Up @@ -244,6 +289,7 @@ fn set_resolved_module(

pub type AddQueue = WorkerQueue<AddTask>;

#[derive(Debug)]
pub struct BuildTask {
pub module: Box<dyn Module>,
pub resolver_factory: Arc<ResolverFactory>,
Expand Down Expand Up @@ -333,6 +379,7 @@ impl WorkerTask for BuildTask {

pub type BuildQueue = WorkerQueue<BuildTask>;

#[derive(Debug)]
pub struct ProcessDependenciesTask {
pub original_module_identifier: ModuleIdentifier,
pub dependencies: Vec<DependencyId>,
Expand Down Expand Up @@ -399,3 +446,141 @@ impl CleanTask {
pub type CleanQueue = WorkerQueue<CleanTask>;

pub type ModuleCreationCallback = Box<dyn FnOnce(&BoxModule) + Send>;

pub type QueueHandleCallback = Box<dyn FnOnce(ModuleIdentifier, &mut Compilation) + Send + Sync>;

#[derive(Debug)]
pub enum QueueTask {
Factorize(Box<FactorizeTask>),
Add(Box<AddTask>),
Build(Box<BuildTask>),
ProcessDependencies(Box<ProcessDependenciesTask>),

Subscription(Box<Subscription>),
}

#[derive(Debug, Copy, Clone)]
pub enum TaskCategory {
Factorize = 0,
Add = 1,
Build = 2,
ProcessDependencies = 3,
}

pub struct Subscription {
category: TaskCategory,
key: String,
callback: QueueHandleCallback,
}

impl std::fmt::Debug for Subscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription")
.field("category", &self.category)
.finish()
}
}

#[derive(Clone)]
pub struct QueueHandler {
sender: UnboundedSender<QueueTask>,
}

impl std::fmt::Debug for QueueHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueueHandler")
.field("sender", &self.sender)
.finish()
}
}

impl QueueHandler {
pub fn wait_for(&self, key: String, category: TaskCategory, callback: QueueHandleCallback) {
self
.sender
.send(QueueTask::Subscription(Box::new(Subscription {
category,
key,
callback,
})))
.expect("failed to wait task");
}
}

pub struct QueueHandlerProcessor {
receiver: UnboundedReceiver<QueueTask>,
callbacks: [FxHashMap<String, Vec<QueueHandleCallback>>; 4],
finished: [FxHashMap<String, ModuleIdentifier>; 4],
}

impl QueueHandlerProcessor {
pub fn try_process(
&mut self,
compilation: &mut Compilation,
factorize_queue: &mut FactorizeQueue,
add_queue: &mut AddQueue,
build_queue: &mut BuildQueue,
process_dependencies_queue: &mut ProcessDependenciesQueue,
) {
while let Ok(task) = self.receiver.try_recv() {
match task {
QueueTask::Factorize(task) => {
factorize_queue.add_task(*task);
}
QueueTask::Add(task) => {
add_queue.add_task(*task);
}
QueueTask::Build(task) => {
build_queue.add_task(*task);
}
QueueTask::ProcessDependencies(task) => {
process_dependencies_queue.add_task(*task);
}
QueueTask::Subscription(subscription) => {
let Subscription {
category,
key,
callback,
} = *subscription;
if let Some(module) = self.finished[category as usize].get(&key) {
// already finished
callback(*module, compilation);
} else {
self.callbacks[category as usize]
.entry(key)
.or_default()
.push(callback);
}
}
}
}
}

pub fn flush_callback(
&mut self,
category: TaskCategory,
key: &str,
module: ModuleIdentifier,
compilation: &mut Compilation,
) {
self.finished[category as usize].insert(key.into(), module);
if let Some(callbacks) = self.callbacks[category as usize].get_mut(key) {
while let Some(cb) = callbacks.pop() {
cb(module, compilation);
}
}
}
}

pub fn create_queue_handle() -> (QueueHandler, QueueHandlerProcessor) {
let (tx, rx) = unbounded_channel();

(
QueueHandler { sender: tx },
QueueHandlerProcessor {
receiver: rx,
callbacks: Default::default(),
finished: Default::default(),
},
)
}
4 changes: 4 additions & 0 deletions crates/rspack_error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ impl<T: std::fmt::Debug> TWithDiagnosticArray<T> {
pub fn split_into_parts(self) -> (T, Vec<Diagnostic>) {
(self.inner, self.diagnostic)
}

pub fn get(&self) -> &T {
&self.inner
}
}

impl<T: Clone + std::fmt::Debug> Clone for TWithDiagnosticArray<T> {
Expand Down
2 changes: 2 additions & 0 deletions crates/rspack_util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ version = "0.1.0"

[dependencies]
concat-string = { workspace = true }
dashmap = { workspace = true }
once_cell = { workspace = true }
regex = { workspace = true }
rustc-hash = { workspace = true }
sugar_path = { workspace = true }

swc_core = { workspace = true, features = ["ecma_ast"] }
7 changes: 7 additions & 0 deletions crates/rspack_util/src/fx_dashmap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use std::hash::BuildHasherDefault;

use dashmap::{DashMap, DashSet};
use rustc_hash::FxHasher;

pub type FxDashMap<K, V> = DashMap<K, V, BuildHasherDefault<FxHasher>>;
pub type FxDashSet<V> = DashSet<V, BuildHasherDefault<FxHasher>>;
Loading

0 comments on commit 7f8321d

Please sign in to comment.