diff --git a/Cargo.lock b/Cargo.lock index e3cb085d5189f..2908bdad15b57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10049,6 +10049,7 @@ dependencies = [ "pretty_assertions", "prost", "rand 0.8.5", + "rayon", "regex", "reqwest", "rustc_version_runtime", diff --git a/crates/turborepo-env/src/lib.rs b/crates/turborepo-env/src/lib.rs index f8c2ee706d336..bafa53e0d2844 100644 --- a/crates/turborepo-env/src/lib.rs +++ b/crates/turborepo-env/src/lib.rs @@ -13,6 +13,12 @@ use thiserror::Error; const DEFAULT_ENV_VARS: [&str; 1] = ["VERCEL_ANALYTICS_ID"]; +/// Environment mode after we've resolved the `Infer` variant +pub enum ResolvedEnvMode { + Loose, + Strict, +} + #[derive(Clone, Debug, Error)] pub enum Error { #[error("Failed to parse regex: {0}")] diff --git a/crates/turborepo-lib/Cargo.toml b/crates/turborepo-lib/Cargo.toml index 4ec579ce0b625..003dbac1226eb 100644 --- a/crates/turborepo-lib/Cargo.toml +++ b/crates/turborepo-lib/Cargo.toml @@ -96,6 +96,7 @@ lazy-regex = "2.5.0" node-semver = "2.1.0" num_cpus = "1.15.0" owo-colors.workspace = true +rayon = "1.7.0" regex.workspace = true tracing-appender = "0.2.2" tracing-chrome = { version = "0.7.1", optional = true } diff --git a/crates/turborepo-lib/src/engine/mod.rs b/crates/turborepo-lib/src/engine/mod.rs index ffef61fda0ee3..b213ef9a045f9 100644 --- a/crates/turborepo-lib/src/engine/mod.rs +++ b/crates/turborepo-lib/src/engine/mod.rs @@ -125,6 +125,14 @@ impl Engine { self.task_definitions.get(task_id) } + pub fn tasks(&self) -> impl Iterator { + self.task_graph.node_weights() + } + + pub fn task_definitions(&self) -> &HashMap, TaskDefinition> { + &self.task_definitions + } + pub fn validate( &self, package_graph: &PackageGraph, diff --git a/crates/turborepo-lib/src/framework.rs b/crates/turborepo-lib/src/framework.rs index 16149ef627e75..f727f5aa74230 100644 --- a/crates/turborepo-lib/src/framework.rs +++ b/crates/turborepo-lib/src/framework.rs @@ -21,6 +21,16 @@ pub struct Framework { dependency_match: Matcher, } +impl Framework { + pub fn slug(&self) -> &'static str { + self.slug + } + + pub fn env_wildcards(&self) -> &[&'static str] { + &self.env_wildcards + } +} + static FRAMEWORKS: OnceLock<[Framework; 12]> = OnceLock::new(); fn get_frameworks() -> &'static [Framework] { diff --git a/crates/turborepo-lib/src/hash/mod.rs b/crates/turborepo-lib/src/hash/mod.rs index 32fe73cafdc12..973545dae475b 100644 --- a/crates/turborepo-lib/src/hash/mod.rs +++ b/crates/turborepo-lib/src/hash/mod.rs @@ -10,10 +10,13 @@ use std::collections::HashMap; use capnp::message::{Builder, HeapAllocator}; pub use traits::TurboHash; +use turborepo_env::ResolvedEnvMode; -use crate::cli::EnvMode; +use crate::{cli::EnvMode, task_graph::TaskOutputs}; mod proto_capnp { + use turborepo_env::ResolvedEnvMode; + use crate::cli::EnvMode; include!(concat!(env!("OUT_DIR"), "/src/hash/proto_capnp.rs")); @@ -28,58 +31,54 @@ mod proto_capnp { } } - impl From for task_hashable::EnvMode { - fn from(value: EnvMode) -> Self { + impl From for task_hashable::EnvMode { + fn from(value: ResolvedEnvMode) -> Self { match value { - EnvMode::Infer => task_hashable::EnvMode::Infer, - EnvMode::Loose => task_hashable::EnvMode::Loose, - EnvMode::Strict => task_hashable::EnvMode::Strict, + ResolvedEnvMode::Loose => task_hashable::EnvMode::Loose, + ResolvedEnvMode::Strict => task_hashable::EnvMode::Strict, } } } } -struct TaskHashable { +pub struct TaskHashable<'a> { // hashes - global_hash: String, - task_dependency_hashes: Vec, - hash_of_files: String, - external_deps_hash: String, + pub(crate) global_hash: &'a str, + pub(crate) task_dependency_hashes: Vec, + pub(crate) hash_of_files: &'a str, + pub(crate) external_deps_hash: String, // task - package_dir: turbopath::RelativeUnixPathBuf, - task: String, - outputs: TaskOutputs, - pass_thru_args: Vec, + pub(crate) package_dir: turbopath::RelativeUnixPathBuf, + pub(crate) task: &'a str, + pub(crate) outputs: TaskOutputs, + pub(crate) pass_through_args: &'a [String], // env - env: Vec, - resolved_env_vars: EnvVarPairs, - pass_thru_env: Vec, - env_mode: EnvMode, - dot_env: Vec, + pub(crate) env: &'a [String], + pub(crate) resolved_env_vars: EnvVarPairs, + pub(crate) pass_through_env: &'a [String], + pub(crate) env_mode: ResolvedEnvMode, + pub(crate) dot_env: &'a [turbopath::RelativeUnixPathBuf], } #[derive(Debug)] -pub struct GlobalHashable { +pub struct GlobalHashable<'a> { pub global_cache_key: &'static str, pub global_file_hash_map: HashMap, pub root_external_dependencies_hash: String, - pub env: Vec, + pub env: &'a [String], pub resolved_env_vars: Vec, - pub pass_through_env: Vec, + pub pass_through_env: &'a [String], pub env_mode: EnvMode, pub framework_inference: bool, - pub dot_env: Vec, -} - -struct TaskOutputs { - inclusions: Vec, - exclusions: Vec, + pub dot_env: &'a [turbopath::RelativeUnixPathBuf], } pub struct LockFilePackages(pub Vec); -struct FileHashes(HashMap); + +#[derive(Debug, Clone)] +pub struct FileHashes(pub HashMap); impl From for Builder { fn from(value: TaskOutputs) -> Self { @@ -192,17 +191,17 @@ impl From for Builder { type EnvVarPairs = Vec; -impl From for Builder { +impl From> for Builder { fn from(task_hashable: TaskHashable) -> Self { let mut message = ::capnp::message::TypedBuilder::::new_default(); let mut builder = message.init_root(); - builder.set_global_hash(&task_hashable.global_hash); + builder.set_global_hash(task_hashable.global_hash); builder.set_package_dir(&task_hashable.package_dir.to_string()); - builder.set_hash_of_files(&task_hashable.hash_of_files); + builder.set_hash_of_files(task_hashable.hash_of_files); builder.set_external_deps_hash(&task_hashable.external_deps_hash); - builder.set_task(&task_hashable.task); + builder.set_task(task_hashable.task); builder.set_env_mode(task_hashable.env_mode.into()); { @@ -224,8 +223,8 @@ impl From for Builder { { let mut pass_thru_args_builder = builder .reborrow() - .init_pass_thru_args(task_hashable.pass_thru_args.len() as u32); - for (i, arg) in task_hashable.pass_thru_args.iter().enumerate() { + .init_pass_thru_args(task_hashable.pass_through_args.len() as u32); + for (i, arg) in task_hashable.pass_through_args.iter().enumerate() { pass_thru_args_builder.set(i as u32, arg); } } @@ -240,8 +239,8 @@ impl From for Builder { { let mut pass_thru_env_builder = builder .reborrow() - .init_pass_thru_env(task_hashable.pass_thru_env.len() as u32); - for (i, env) in task_hashable.pass_thru_env.iter().enumerate() { + .init_pass_thru_env(task_hashable.pass_through_env.len() as u32); + for (i, env) in task_hashable.pass_through_env.iter().enumerate() { pass_thru_env_builder.set(i as u32, env); } } @@ -281,7 +280,7 @@ impl From for Builder { } } -impl From for Builder { +impl<'a> From> for Builder { fn from(hashable: GlobalHashable) -> Self { let mut message = ::capnp::message::TypedBuilder::::new_default(); @@ -372,6 +371,7 @@ impl From for Builder { #[cfg(test)] mod test { use test_case::test_case; + use turborepo_env::ResolvedEnvMode; use turborepo_lockfiles::Package; use super::{ @@ -382,22 +382,22 @@ mod test { #[test] fn task_hashable() { let task_hashable = TaskHashable { - global_hash: "global_hash".to_string(), + global_hash: "global_hash", task_dependency_hashes: vec!["task_dependency_hash".to_string()], package_dir: turbopath::RelativeUnixPathBuf::new("package_dir").unwrap(), - hash_of_files: "hash_of_files".to_string(), + hash_of_files: "hash_of_files", external_deps_hash: "external_deps_hash".to_string(), - task: "task".to_string(), + task: "task", outputs: TaskOutputs { inclusions: vec!["inclusions".to_string()], exclusions: vec!["exclusions".to_string()], }, - pass_thru_args: vec!["pass_thru_args".to_string()], - env: vec!["env".to_string()], + pass_through_args: &["pass_thru_args".to_string()], + env: &["env".to_string()], resolved_env_vars: vec![], - pass_thru_env: vec!["pass_thru_env".to_string()], - env_mode: EnvMode::Infer, - dot_env: vec![turbopath::RelativeUnixPathBuf::new("dotenv".to_string()).unwrap()], + pass_through_env: &["pass_thru_env".to_string()], + env_mode: ResolvedEnvMode::Loose, + dot_env: &[turbopath::RelativeUnixPathBuf::new("dotenv".to_string()).unwrap()], }; assert_eq!(task_hashable.hash(), "ff765ee2f83bc034"); @@ -414,13 +414,13 @@ mod test { .into_iter() .collect(), root_external_dependencies_hash: "0000000000000000".to_string(), - env: vec!["env".to_string()], + env: &["env".to_string()], resolved_env_vars: vec![], - pass_through_env: vec!["pass_through_env".to_string()], + pass_through_env: &["pass_through_env".to_string()], env_mode: EnvMode::Infer, framework_inference: true, - dot_env: vec![turbopath::RelativeUnixPathBuf::new("dotenv".to_string()).unwrap()], + dot_env: &[turbopath::RelativeUnixPathBuf::new("dotenv".to_string()).unwrap()], }; assert_eq!(global_hash.hash(), "c0ddf8138bd686e8"); diff --git a/crates/turborepo-lib/src/hash/proto.capnp b/crates/turborepo-lib/src/hash/proto.capnp index b5887f0e9d734..4f6f4c00b9459 100644 --- a/crates/turborepo-lib/src/hash/proto.capnp +++ b/crates/turborepo-lib/src/hash/proto.capnp @@ -22,9 +22,8 @@ struct TaskHashable { dotEnv @12 :List(Text); enum EnvMode { - infer @0; - loose @1; - strict @2; + loose @0; + strict @1; } } diff --git a/crates/turborepo-lib/src/lib.rs b/crates/turborepo-lib/src/lib.rs index 1d92d0dc912b1..d7a698dd6b111 100644 --- a/crates/turborepo-lib/src/lib.rs +++ b/crates/turborepo-lib/src/lib.rs @@ -29,6 +29,7 @@ mod rewrite_json; mod run; mod shim; mod task_graph; +mod task_hash; mod tracing; use anyhow::Result; diff --git a/crates/turborepo-lib/src/opts.rs b/crates/turborepo-lib/src/opts.rs index 3d837ea2be78b..50e66bd253685 100644 --- a/crates/turborepo-lib/src/opts.rs +++ b/crates/turborepo-lib/src/opts.rs @@ -63,7 +63,7 @@ pub struct RunOpts<'a> { pub(crate) framework_inference: bool, profile: Option<&'a str>, continue_on_error: bool, - passthrough_args: &'a [String], + pub(crate) pass_through_args: &'a [String], pub(crate) only: bool, dry_run: bool, pub(crate) dry_run_json: bool, @@ -121,7 +121,7 @@ impl<'a> TryFrom<&'a RunArgs> for RunOpts<'a> { parallel: args.parallel, profile: args.profile.as_deref(), continue_on_error: args.continue_execution, - passthrough_args: args.pass_through_args.as_ref(), + pass_through_args: args.pass_through_args.as_ref(), only: args.only, no_daemon: args.no_daemon, single_package: args.single_package, diff --git a/crates/turborepo-lib/src/run/global_hash.rs b/crates/turborepo-lib/src/run/global_hash.rs index eddaefbc73996..d2b119b5ebb96 100644 --- a/crates/turborepo-lib/src/run/global_hash.rs +++ b/crates/turborepo-lib/src/run/global_hash.rs @@ -26,35 +26,35 @@ const GLOBAL_CACHE_KEY: &str = "You don't understand! I coulda had class. I coul enum GlobalHashError {} #[derive(Debug, Default)] -pub struct GlobalHashableInputs { +pub struct GlobalHashableInputs<'a> { global_cache_key: &'static str, global_file_hash_map: HashMap, root_external_dependencies_hash: String, - env: Vec, + env: &'a [String], // Only Option to allow #[derive(Default)] resolved_env_vars: Option, - pass_through_env: Vec, + pass_through_env: &'a [String], env_mode: EnvMode, framework_inference: bool, - dot_env: Vec, + dot_env: &'a [RelativeUnixPathBuf], } #[allow(clippy::too_many_arguments)] -pub fn get_global_hash_inputs( +pub fn get_global_hash_inputs<'a, L: ?Sized + Lockfile>( root_workspace: &WorkspaceInfo, root_path: &AbsoluteSystemPath, package_manager: &PackageManager, lockfile: Option<&L>, - global_file_dependencies: Vec, + global_file_dependencies: &'a [String], env_at_execution_start: &EnvironmentVariableMap, - global_env: Vec, - global_pass_through_env: Vec, + global_env: &'a [String], + global_pass_through_env: &'a [String], env_mode: EnvMode, framework_inference: bool, - dot_env: Vec, -) -> Result { + dot_env: &'a [RelativeUnixPathBuf], +) -> Result> { let global_hashable_env_vars = - get_global_hashable_env_vars(env_at_execution_start, &global_env)?; + get_global_hashable_env_vars(env_at_execution_start, global_env)?; debug!( "global hash env vars {:?}", @@ -68,7 +68,7 @@ pub fn get_global_hash_inputs( let files = globwalk::globwalk( root_path, - &global_file_dependencies, + global_file_dependencies, &globs.raw_exclusions, WalkType::All, )?; @@ -122,14 +122,14 @@ pub fn get_global_hash_inputs( }) } -impl GlobalHashableInputs { +impl<'a> GlobalHashableInputs<'a> { pub fn calculate_global_hash_from_inputs(mut self) -> String { match self.env_mode { EnvMode::Infer if !self.pass_through_env.is_empty() => { self.env_mode = EnvMode::Strict; } EnvMode::Loose => { - self.pass_through_env = Vec::new(); + self.pass_through_env = &[]; } _ => {} } diff --git a/crates/turborepo-lib/src/run/mod.rs b/crates/turborepo-lib/src/run/mod.rs index 81221d6a68f29..ce61eca6c179f 100644 --- a/crates/turborepo-lib/src/run/mod.rs +++ b/crates/turborepo-lib/src/run/mod.rs @@ -13,6 +13,7 @@ use std::{ use anyhow::{anyhow, Context as ErrorContext, Result}; pub use cache::{RunCache, TaskCache}; use itertools::Itertools; +use rayon::iter::ParallelBridge; use tracing::{debug, info}; use turbopath::AbsoluteSystemPathBuf; use turborepo_cache::{http::APIAuth, AsyncCache}; @@ -22,6 +23,7 @@ use turborepo_ui::ColorSelector; use self::task_id::TaskName; use crate::{ + cli::EnvMode, commands::CommandBase, config::TurboJson, daemon::DaemonConnector, @@ -32,6 +34,7 @@ use crate::{ package_json::PackageJson, run::global_hash::get_global_hash_inputs, task_graph::Visitor, + task_hash::PackageInputsHashes, }; #[derive(Debug)] @@ -205,13 +208,13 @@ impl Run { &self.base.repo_root, pkg_dep_graph.package_manager(), pkg_dep_graph.lockfile(), - root_turbo_json.global_deps, + &root_turbo_json.global_deps, &env_at_execution_start, - root_turbo_json.global_env, - root_turbo_json.global_pass_through_env, + &root_turbo_json.global_env, + &root_turbo_json.global_pass_through_env, opts.run_opts.env_mode, opts.run_opts.framework_inference, - root_turbo_json.global_dot_env, + &root_turbo_json.global_dot_env, )?; let global_hash = global_hash_inputs.calculate_global_hash_from_inputs(); @@ -229,10 +232,37 @@ impl Run { self.base.ui, )); + let mut global_env_mode = opts.run_opts.env_mode; + if matches!(global_env_mode, EnvMode::Infer) + && !root_turbo_json.global_pass_through_env.is_empty() + { + global_env_mode = EnvMode::Strict; + } + + let workspaces = pkg_dep_graph.workspaces().collect(); + let package_inputs_hashes = PackageInputsHashes::calculate_file_hashes( + scm, + engine.tasks().par_bridge(), + workspaces, + engine.task_definitions(), + &self.base.repo_root, + )?; + + debug!("package inputs hashes: {:?}", package_inputs_hashes); + let pkg_dep_graph = Arc::new(pkg_dep_graph); let engine = Arc::new(engine); - let visitor = Visitor::new(pkg_dep_graph, runcache, &opts); - visitor.visit(engine).await?; + let visitor = Visitor::new( + pkg_dep_graph.clone(), + runcache, + &opts, + package_inputs_hashes, + &env_at_execution_start, + &global_hash, + global_env_mode, + ); + + visitor.visit(engine.clone()).await?; Ok(()) } diff --git a/crates/turborepo-lib/src/task_graph/mod.rs b/crates/turborepo-lib/src/task_graph/mod.rs index 429e95998686d..c2c9852adccf4 100644 --- a/crates/turborepo-lib/src/task_graph/mod.rs +++ b/crates/turborepo-lib/src/task_graph/mod.rs @@ -77,11 +77,11 @@ pub struct TaskDefinition { pub(crate) cache: bool, // This field is custom-marshalled from `env` and `depends_on`` - env: Vec, + pub(crate) env: Vec, - pass_through_env: Vec, + pub(crate) pass_through_env: Vec, - dot_env: Vec, + pub(crate) dot_env: Vec, // TopologicalDependencies are tasks from package dependencies. // E.g. "build" is a topological dependency in: @@ -97,7 +97,7 @@ pub struct TaskDefinition { // Inputs indicate the list of files this Task depends on. If any of those files change // we can conclude that any cached outputs or logs for this Task should be invalidated. - inputs: Vec, + pub(crate) inputs: Vec, // OutputMode determines how we should log the output. pub(crate) output_mode: OutputLogsMode, diff --git a/crates/turborepo-lib/src/task_graph/visitor.rs b/crates/turborepo-lib/src/task_graph/visitor.rs index c17ffec32f005..bdaec35f459e4 100644 --- a/crates/turborepo-lib/src/task_graph/visitor.rs +++ b/crates/turborepo-lib/src/task_graph/visitor.rs @@ -3,8 +3,11 @@ use std::sync::{Arc, OnceLock}; use futures::{stream::FuturesUnordered, StreamExt}; use regex::Regex; use tokio::sync::mpsc; +use tracing::debug; +use turborepo_env::{EnvironmentVariableMap, ResolvedEnvMode}; use crate::{ + cli::EnvMode, engine::{Engine, ExecutionOptions}, opts::Opts, package_graph::{PackageGraph, WorkspaceName}, @@ -12,13 +15,17 @@ use crate::{ task_id::{self, TaskId}, RunCache, }, + task_hash, + task_hash::{PackageInputsHashes, TaskHasher}, }; // This holds the whole world pub struct Visitor<'a> { - package_graph: Arc, run_cache: Arc, + package_graph: Arc, opts: &'a Opts<'a>, + task_hasher: TaskHasher<'a>, + global_env_mode: EnvMode, } #[derive(Debug, thiserror::Error)] @@ -36,14 +43,33 @@ pub enum Error { MissingDefinition, #[error("error while executing engine: {0}")] Engine(#[from] crate::engine::ExecuteError), + #[error(transparent)] + TaskHash(#[from] task_hash::Error), } impl<'a> Visitor<'a> { - pub fn new(package_graph: Arc, run_cache: Arc, opts: &'a Opts) -> Self { + pub fn new( + package_graph: Arc, + run_cache: Arc, + opts: &'a Opts, + package_inputs_hashes: PackageInputsHashes, + env_at_execution_start: &'a EnvironmentVariableMap, + global_hash: &'a str, + global_env_mode: EnvMode, + ) -> Self { + let task_hasher = TaskHasher::new( + package_inputs_hashes, + opts, + env_at_execution_start, + global_hash, + ); + Self { - package_graph, run_cache, + package_graph, opts, + task_hasher, + global_env_mode, } } @@ -61,19 +87,26 @@ impl<'a> Visitor<'a> { while let Some(message) = node_stream.recv().await { let crate::engine::Message { info, callback } = message; let package_name = WorkspaceName::from(info.package()); - let package_json = self + let workspace_dir = + self.package_graph + .workspace_dir(&package_name) + .ok_or_else(|| Error::MissingPackage { + package_name: package_name.clone(), + task_id: info.clone(), + })?; + let workspace_info = self .package_graph - .package_json(&package_name) + .workspace_info(&package_name) .ok_or_else(|| Error::MissingPackage { package_name: package_name.clone(), task_id: info.clone(), })?; - let workspace_dir = self - .package_graph - .workspace_dir(&package_name) - .unwrap_or_else(|| panic!("no directory for workspace {package_name}")); - let command = package_json.scripts.get(info.task()).cloned(); + let command = workspace_info + .package_json + .scripts + .get(info.task()) + .cloned(); match command { Some(cmd) @@ -87,13 +120,38 @@ impl<'a> Visitor<'a> { _ => (), } - let task_def = engine + let task_definition = engine .task_definition(&info) .ok_or(Error::MissingDefinition)?; + let task_env_mode = match self.global_env_mode { + // Task env mode is only independent when global env mode is `infer`. + EnvMode::Infer if !task_definition.pass_through_env.is_empty() => { + ResolvedEnvMode::Strict + } + // If we're in infer mode we have just detected non-usage of strict env vars. + // But our behavior's actual meaning of this state is `loose`. + EnvMode::Infer => ResolvedEnvMode::Loose, + // Otherwise we just use the global env mode. + EnvMode::Strict => ResolvedEnvMode::Strict, + EnvMode::Loose => ResolvedEnvMode::Loose, + }; + + let dependency_set = engine.dependencies(&info).ok_or(Error::MissingDefinition)?; + + let task_hash = self.task_hasher.calculate_task_hash( + &info, + task_definition, + task_env_mode, + workspace_info, + dependency_set, + )?; + + debug!("task {} hash is {}", info, task_hash); + let task_cache = self.run_cache - .task_cache(task_def, workspace_dir, info.clone(), "fake"); + .task_cache(task_definition, workspace_dir, info.clone(), &task_hash); tasks.push(tokio::spawn(async move { println!( diff --git a/crates/turborepo-lib/src/task_hash.rs b/crates/turborepo-lib/src/task_hash.rs new file mode 100644 index 0000000000000..940be3150a476 --- /dev/null +++ b/crates/turborepo-lib/src/task_hash.rs @@ -0,0 +1,346 @@ +use std::{ + collections::{HashMap, HashSet}, + sync::Mutex, +}; + +use rayon::prelude::*; +use thiserror::Error; +use tracing::debug; +use turbopath::{AbsoluteSystemPath, AnchoredSystemPath, AnchoredSystemPathBuf}; +use turborepo_env::{BySource, DetailedMap, EnvironmentVariableMap, ResolvedEnvMode}; +use turborepo_scm::SCM; + +use crate::{ + engine::TaskNode, + framework::infer_framework, + hash::{FileHashes, TaskHashable, TurboHash}, + opts::Opts, + package_graph::{WorkspaceInfo, WorkspaceName}, + run::task_id::{TaskId, ROOT_PKG_NAME}, + task_graph::TaskDefinition, +}; + +#[derive(Debug, Error)] +pub enum Error { + #[error("missing pipeline entry {0}")] + MissingPipelineEntry(TaskId<'static>), + #[error("missing package.json for {0}")] + MissingPackageJson(String), + #[error("cannot find package-file hash for {0}")] + MissingPackageFileHash(String), + #[error("missing hash for dependent task {0}")] + MissingDependencyTaskHash(String), + #[error("cannot acquire lock for task hash tracker")] + Mutex, + #[error(transparent)] + Scm(#[from] turborepo_scm::Error), + #[error(transparent)] + Env(#[from] turborepo_env::Error), + #[error(transparent)] + Regex(#[from] regex::Error), + #[error(transparent)] + Path(#[from] turbopath::PathError), +} + +impl TaskHashable<'_> { + fn calculate_task_hash(mut self) -> String { + if matches!(self.env_mode, ResolvedEnvMode::Loose) { + self.pass_through_env = &[]; + } + + self.hash() + } +} + +#[derive(Debug, Default)] +pub struct PackageInputsHashes { + hashes: HashMap, String>, + expanded_hashes: HashMap, FileHashes>, +} + +impl PackageInputsHashes { + pub fn calculate_file_hashes<'a>( + scm: SCM, + all_tasks: impl ParallelIterator, + workspaces: HashMap<&WorkspaceName, &WorkspaceInfo>, + task_definitions: &HashMap, TaskDefinition>, + repo_root: &AbsoluteSystemPath, + ) -> Result { + let (hashes, expanded_hashes): (HashMap<_, _>, HashMap<_, _>) = all_tasks + .filter_map(|task| { + let TaskNode::Task(task_id) = task else { + return None; + }; + + if task_id.package() == ROOT_PKG_NAME { + return None; + } + + let task_definition = match task_definitions + .get(task_id) + .ok_or_else(|| Error::MissingPipelineEntry(task_id.clone())) + { + Ok(def) => def, + Err(err) => return Some(Err(err)), + }; + + // TODO: Look into making WorkspaceName take a Cow + let workspace_name = WorkspaceName::Other(task_id.package().to_string()); + + let pkg = match workspaces + .get(&workspace_name) + .ok_or_else(|| Error::MissingPackageJson(workspace_name.to_string())) + { + Ok(pkg) => pkg, + Err(err) => return Some(Err(err)), + }; + + let package_path = pkg + .package_json_path + .parent() + .unwrap_or_else(|| AnchoredSystemPath::new("").unwrap()); + + let mut hash_object = match scm.get_package_file_hashes( + repo_root, + package_path, + &task_definition.inputs, + ) { + Ok(hash_object) => hash_object, + Err(err) => return Some(Err(err.into())), + }; + + if !task_definition.dot_env.is_empty() { + let absolute_package_path = repo_root.resolve(package_path); + let dot_env_object = match scm.hash_existing_of( + &absolute_package_path, + task_definition + .dot_env + .iter() + .map(|p| p.to_anchored_system_path_buf()), + ) { + Ok(dot_env_object) => dot_env_object, + Err(err) => return Some(Err(err.into())), + }; + + for (key, value) in dot_env_object { + hash_object.insert(key, value); + } + } + + let file_hashes = FileHashes(hash_object); + let hash = file_hashes.clone().hash(); + + Some(Ok(( + (task_id.clone(), hash), + (task_id.clone(), file_hashes), + ))) + }) + .collect::>()?; + + Ok(PackageInputsHashes { + hashes, + expanded_hashes, + }) + } +} + +#[derive(Default)] +pub struct TaskHashTracker { + package_task_env_vars: HashMap, DetailedMap>, + package_task_hashes: HashMap, String>, + package_task_framework: HashMap, String>, + package_task_outputs: HashMap, Vec>, +} + +/// Caches package-inputs hashes, and package-task hashes. +pub struct TaskHasher<'a> { + package_inputs_hashes: PackageInputsHashes, + opts: &'a Opts<'a>, + env_at_execution_start: &'a EnvironmentVariableMap, + global_hash: &'a str, + + task_hash_tracker: Mutex, +} + +impl<'a> TaskHasher<'a> { + pub fn new( + package_inputs_hashes: PackageInputsHashes, + opts: &'a Opts, + env_at_execution_start: &'a EnvironmentVariableMap, + global_hash: &'a str, + ) -> Self { + Self { + package_inputs_hashes, + opts, + env_at_execution_start, + global_hash, + task_hash_tracker: Mutex::new(TaskHashTracker::default()), + } + } + + pub fn calculate_task_hash( + &self, + task_id: &TaskId<'static>, + task_definition: &TaskDefinition, + task_env_mode: ResolvedEnvMode, + workspace: &WorkspaceInfo, + dependency_set: HashSet<&TaskNode>, + ) -> Result { + let do_framework_inference = self.opts.run_opts.framework_inference; + let is_monorepo = !self.opts.run_opts.single_package; + + let hash_of_files = self + .package_inputs_hashes + .hashes + .get(task_id) + .ok_or_else(|| Error::MissingPackageFileHash(task_id.to_string()))?; + let mut explicit_env_var_map = EnvironmentVariableMap::default(); + let mut all_env_var_map = EnvironmentVariableMap::default(); + let mut matching_env_var_map = EnvironmentVariableMap::default(); + + if do_framework_inference { + // Se if we infer a framework + if let Some(framework) = infer_framework(workspace, is_monorepo) { + debug!("auto detected framework for {}", task_id.package()); + debug!( + "framework: {}, env_prefix: {:?}", + framework.slug(), + framework.env_wildcards() + ); + let mut computed_wildcards = framework + .env_wildcards() + .iter() + .map(|s| s.to_string()) + .collect::>(); + + if let Some(exclude_prefix) = + self.env_at_execution_start.get("TURBO_CI_VENDOR_ENV_KEY") + { + if !exclude_prefix.is_empty() { + let computed_exclude = format!("!{}*", exclude_prefix); + debug!( + "excluding environment variables matching wildcard {}", + computed_exclude + ); + computed_wildcards.push(computed_exclude); + } + } + + let inference_env_var_map = self + .env_at_execution_start + .from_wildcards(&computed_wildcards)?; + + let user_env_var_set = self + .env_at_execution_start + .wildcard_map_from_wildcards_unresolved(&task_definition.env)?; + + all_env_var_map.union(&user_env_var_set.inclusions); + all_env_var_map.union(&inference_env_var_map); + all_env_var_map.difference(&user_env_var_set.exclusions); + + explicit_env_var_map.union(&user_env_var_set.inclusions); + explicit_env_var_map.difference(&user_env_var_set.exclusions); + + matching_env_var_map.union(&inference_env_var_map); + matching_env_var_map.difference(&user_env_var_set.exclusions); + } else { + let all_env_var_map = self + .env_at_execution_start + .from_wildcards(&task_definition.env)?; + + explicit_env_var_map.union(&all_env_var_map); + } + } else { + all_env_var_map = self + .env_at_execution_start + .from_wildcards(&task_definition.env)?; + + explicit_env_var_map.union(&all_env_var_map); + } + + let env_vars = DetailedMap { + all: all_env_var_map, + by_source: BySource { + explicit: explicit_env_var_map, + matching: matching_env_var_map, + }, + }; + + let hashable_env_pairs = env_vars.all.to_hashable(); + let outputs = task_definition.hashable_outputs(task_id); + let task_dependency_hashes = self.calculate_dependency_hashes(dependency_set)?; + + debug!( + "task hash env vars for {}:{}\n vars: {:?}", + task_id.package(), + task_id.task(), + hashable_env_pairs + ); + + let task_hashable = TaskHashable { + global_hash: self.global_hash, + task_dependency_hashes, + package_dir: workspace.package_path().to_unix(), + hash_of_files, + external_deps_hash: workspace.get_external_deps_hash(), + task: task_id.task(), + outputs, + + pass_through_args: self.opts.run_opts.pass_through_args, + env: &task_definition.env, + resolved_env_vars: hashable_env_pairs, + pass_through_env: &task_definition.pass_through_env, + env_mode: task_env_mode, + dot_env: &task_definition.dot_env, + }; + let task_hash = task_hashable.hash(); + + let mut task_hash_tracker = self.task_hash_tracker.lock().map_err(|_| Error::Mutex)?; + task_hash_tracker + .package_task_env_vars + .insert(task_id.clone(), env_vars); + task_hash_tracker + .package_task_hashes + .insert(task_id.clone(), task_hash.clone()); + + Ok(task_hash) + } + + /// Gets the hashes of a task's dependencies. Because the visitor + /// receives the nodes in topological order, we know that all of + /// the dependencies have been processed before the current task. + /// + /// # Arguments + /// + /// * `dependency_set`: The dependencies of the current task + /// + /// returns: Result, Error> + fn calculate_dependency_hashes( + &self, + dependency_set: HashSet<&TaskNode>, + ) -> Result, Error> { + let mut dependency_hash_set = HashSet::new(); + + for dependency_task in dependency_set { + let TaskNode::Task(dependency_task_id) = dependency_task else { + continue; + }; + + if dependency_task_id.package() == ROOT_PKG_NAME { + continue; + } + + let task_hash_tracker = self.task_hash_tracker.lock().map_err(|_| Error::Mutex)?; + let dependency_hash = task_hash_tracker + .package_task_hashes + .get(dependency_task_id) + .ok_or_else(|| Error::MissingDependencyTaskHash(dependency_task.to_string()))?; + dependency_hash_set.insert(dependency_hash.clone()); + } + + let mut dependency_hash_list = dependency_hash_set.into_iter().collect::>(); + dependency_hash_list.sort(); + + Ok(dependency_hash_list) + } +} diff --git a/crates/turborepo-paths/src/absolute_system_path.rs b/crates/turborepo-paths/src/absolute_system_path.rs index 08b6b12b21927..f1637027eee1b 100644 --- a/crates/turborepo-paths/src/absolute_system_path.rs +++ b/crates/turborepo-paths/src/absolute_system_path.rs @@ -450,8 +450,6 @@ impl<'a> TryFrom<&'a Path> for &'a AbsoluteSystemPath { #[cfg(test)] mod tests { - use std::str::FromStr; - use anyhow::Result; use tempdir::TempDir; use test_case::test_case; diff --git a/crates/turborepo-scm/src/manual.rs b/crates/turborepo-scm/src/manual.rs index f5c239e9856ab..905fbaf150099 100644 --- a/crates/turborepo-scm/src/manual.rs +++ b/crates/turborepo-scm/src/manual.rs @@ -4,7 +4,7 @@ use globwalk::fix_glob_pattern; use hex::ToHex; use ignore::WalkBuilder; use sha1::{Digest, Sha1}; -use turbopath::{AbsoluteSystemPath, AnchoredSystemPath, AnchoredSystemPathBuf, IntoUnix}; +use turbopath::{AbsoluteSystemPath, AnchoredSystemPath, IntoUnix}; use wax::{any, Glob, Pattern}; use crate::{package_deps::GitHashes, Error}; @@ -47,7 +47,7 @@ pub(crate) fn hash_files( pub(crate) fn get_package_file_hashes_from_processing_gitignore>( turbo_root: &AbsoluteSystemPath, - package_path: &AnchoredSystemPathBuf, + package_path: &AnchoredSystemPath, inputs: &[S], ) -> Result { let full_package_path = turbo_root.resolve(package_path); @@ -118,7 +118,9 @@ pub(crate) fn get_package_file_hashes_from_processing_gitignore>( #[cfg(test)] mod tests { use test_case::test_case; - use turbopath::{AbsoluteSystemPathBuf, RelativeUnixPath, RelativeUnixPathBuf}; + use turbopath::{ + AbsoluteSystemPathBuf, AnchoredSystemPathBuf, RelativeUnixPath, RelativeUnixPathBuf, + }; use super::*; diff --git a/crates/turborepo-scm/src/package_deps.rs b/crates/turborepo-scm/src/package_deps.rs index dcc224e3a25b3..bff30ca495623 100644 --- a/crates/turborepo-scm/src/package_deps.rs +++ b/crates/turborepo-scm/src/package_deps.rs @@ -2,9 +2,7 @@ use std::collections::HashMap; use itertools::{Either, Itertools}; use tracing::debug; -use turbopath::{ - AbsoluteSystemPath, AnchoredSystemPath, AnchoredSystemPathBuf, PathError, RelativeUnixPathBuf, -}; +use turbopath::{AbsoluteSystemPath, AnchoredSystemPath, PathError, RelativeUnixPathBuf}; use crate::{hash_object::hash_objects, Error, Git, SCM}; @@ -27,7 +25,7 @@ impl SCM { pub fn get_package_file_hashes>( &self, turbo_root: &AbsoluteSystemPath, - package_path: &AnchoredSystemPathBuf, + package_path: &AnchoredSystemPath, inputs: &[S], ) -> Result { match self { @@ -79,7 +77,7 @@ impl Git { fn get_package_file_hashes>( &self, turbo_root: &AbsoluteSystemPath, - package_path: &AnchoredSystemPathBuf, + package_path: &AnchoredSystemPath, inputs: &[S], ) -> Result { if inputs.is_empty() { @@ -92,7 +90,7 @@ impl Git { fn get_package_file_hashes_from_index( &self, turbo_root: &AbsoluteSystemPath, - package_path: &AnchoredSystemPathBuf, + package_path: &AnchoredSystemPath, ) -> Result { let full_pkg_path = turbo_root.resolve(package_path); let git_to_pkg_path = self.root.anchor(&full_pkg_path)?; @@ -126,7 +124,7 @@ impl Git { fn get_package_file_hashes_from_inputs>( &self, turbo_root: &AbsoluteSystemPath, - package_path: &AnchoredSystemPathBuf, + package_path: &AnchoredSystemPath, inputs: &[S], ) -> Result { let full_pkg_path = turbo_root.resolve(package_path); @@ -190,7 +188,7 @@ impl Git { mod tests { use std::{assert_matches::assert_matches, collections::HashMap, process::Command}; - use turbopath::{AbsoluteSystemPathBuf, RelativeUnixPathBuf}; + use turbopath::{AbsoluteSystemPathBuf, AnchoredSystemPathBuf, RelativeUnixPathBuf}; use super::*; use crate::{manual::get_package_file_hashes_from_processing_gitignore, SCM};