Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: port task graph validation #5703

Merged
merged 12 commits into from
Aug 15, 2023
112 changes: 111 additions & 1 deletion crates/turborepo-lib/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ use std::{
pub use builder::EngineBuilder;
use petgraph::Graph;

use crate::{run::task_id::TaskId, task_graph::TaskDefinition};
use crate::{
package_graph::{PackageGraph, WorkspaceName},
run::task_id::TaskId,
task_graph::TaskDefinition,
};

#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum TaskNode {
Expand Down Expand Up @@ -110,4 +114,110 @@ impl Engine<Built> {
.collect(),
)
}

pub fn validate(
&self,
package_graph: &PackageGraph,
concurrency: u32,
) -> Result<(), Vec<ValidateError>> {
// TODO(olszewski) once this is hooked up to a real run, we should
// see if using rayon to parallelize would provide a speedup
let (persistent_count, mut validation_errors) = self
.task_graph
.node_indices()
.map(|node_index| {
let TaskNode::Task(task_id) = self
.task_graph
.node_weight(node_index)
.expect("graph should contain weight for node index")
else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't even know you could apply this syntax on enums like that. Neat.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I too learned that this was possible from @NicholasLYang and I've been in love ever since.

// No need to check the root node if that's where we are.
return Ok(false);
};
let is_persistent = self
.task_definitions
.get(task_id)
.map_or(false, |task_def| task_def.persistent);

for dep_index in self
.task_graph
.neighbors_directed(node_index, petgraph::Direction::Outgoing)
{
let TaskNode::Task(dep_id) = self
.task_graph
.node_weight(dep_index)
.expect("index comes from iterating the graph and must be present")
else {
panic!("{task_id} depends on root task node");
};

let task_definition = self.task_definitions.get(dep_id).ok_or_else(|| {
ValidateError::MissingTask {
task_id: dep_id.to_string(),
package_name: dep_id.package().to_string(),
}
})?;

let package_json = package_graph
.package_json(&WorkspaceName::from(dep_id.package()))
.ok_or_else(|| ValidateError::MissingPackageJson {
package: dep_id.package().to_string(),
})?;
if task_definition.persistent
&& package_json.scripts.contains_key(dep_id.task())
{
return Err(ValidateError::DependencyOnPersistentTask {
persistent_task: dep_id.to_string(),
dependant: task_id.to_string(),
});
}
}

Ok(is_persistent)
})
.fold((0, Vec::new()), |(mut count, mut errs), result| {
match result {
Ok(true) => count += 1,
Ok(false) => (),
Err(e) => errs.push(e),
}
(count, errs)
});

if persistent_count >= concurrency {
validation_errors.push(ValidateError::PersistentTasksExceedConcurrency {
Copy link
Contributor

@arlyon arlyon Aug 15, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this will address the current issue where persistent tasks count towards the concurrency limit despite not being targeted by filter / scoping rules? (sorry if this is obvious, I am not sure if this is validated before or after we calculate the subgraph)

To elaborate, if I have 100 packages with a persistent task in them and then run with --filter package1 it will complain about exceeding the limit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's interesting, both in Go/Rust the task graph is constructed using only packages that are in scope. Didn't realize this was an issue. Either way this is meant to be the port of the Go behavior.

persistent_count,
concurrency,
})
}

match validation_errors.is_empty() {
true => Ok(()),
false => Err(validation_errors),
}
}
}

#[derive(Debug, thiserror::Error)]
pub enum ValidateError {
#[error("Cannot find task definition for {task_id} in package {package_name}")]
MissingTask {
task_id: String,
package_name: String,
},
#[error("Cannot find package {package}")]
MissingPackageJson { package: String },
#[error("\"{persistent_task}\" is a persistent task, \"{dependant}\" cannot depend on it")]
DependencyOnPersistentTask {
persistent_task: String,
dependant: String,
},
#[error(
"You have {persistent_count} persistent tasks, but `turbo` is configured for concurrency \
of {concurrency}. Set --concurrency to at least {persistent_count}"
)]
PersistentTasksExceedConcurrency {
persistent_count: u32,
concurrency: u32,
},
}
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/opts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@
#[derive(Debug)]
pub struct RunOpts<'a> {
pub(crate) tasks: &'a [String],
concurrency: u32,
pub(crate) concurrency: u32,
parallel: bool,

Check warning on line 48 in crates/turborepo-lib/src/opts.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (ubuntu, ubuntu-latest)

multiple fields are never read

Check warning on line 48 in crates/turborepo-lib/src/opts.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (macos, macos-latest)

multiple fields are never read

Check warning on line 48 in crates/turborepo-lib/src/opts.rs

View workflow job for this annotation

GitHub Actions / Build Turborepo (windows, windows-latest)

multiple fields are never read

Check warning on line 48 in crates/turborepo-lib/src/opts.rs

View workflow job for this annotation

GitHub Actions / Go Integration Tests (ubuntu, ubuntu-latest)

multiple fields are never read

Check warning on line 48 in crates/turborepo-lib/src/opts.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (macos, macos-latest)

multiple fields are never read

Check warning on line 48 in crates/turborepo-lib/src/opts.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (windows, windows-latest)

multiple fields are never read

Check warning on line 48 in crates/turborepo-lib/src/opts.rs

View workflow job for this annotation

GitHub Actions / Turborepo E2E Tests (ubuntu, ubuntu-latest)

multiple fields are never read

Check warning on line 48 in crates/turborepo-lib/src/opts.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (ubuntu, ubuntu-latest)

multiple fields are never read

Check warning on line 48 in crates/turborepo-lib/src/opts.rs

View workflow job for this annotation

GitHub Actions / Turborepo Examples (macos, macos-latest)

multiple fields are never read
pub(crate) env_mode: EnvMode,
// Whether or not to infer the framework for each workspace.
pub(crate) framework_inference: bool,
Expand Down
57 changes: 6 additions & 51 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ pub mod task_id;

use std::io::IsTerminal;

use anyhow::{Context as ErrorContext, Result};
use anyhow::{anyhow, Context as ErrorContext, Result};
use itertools::Itertools;
use tracing::{debug, info};
use turborepo_cache::{http::APIAuth, AsyncCache};
use turborepo_env::EnvironmentVariableMap;
use turborepo_scm::SCM;
use turborepo_ui::UI;

use self::task_id::TaskName;
use crate::{
Expand Down Expand Up @@ -142,7 +142,7 @@ impl Run {
)?;

info!("created cache");
let _engine = EngineBuilder::new(
let engine = EngineBuilder::new(
&self.base.repo_root,
&pkg_dep_graph,
opts.run_opts.single_package,
Expand All @@ -168,54 +168,9 @@ impl Run {
)
.build()?;

engine
.validate(&pkg_dep_graph, opts.run_opts.concurrency)
.map_err(|errors| anyhow!("Validation failed:\n{}", errors.into_iter().join("\n")))?;
Ok(())
}
}

#[cfg(test)]
mod test {

use anyhow::Result;
use tempfile::tempdir;
use turbopath::AbsoluteSystemPathBuf;
use turborepo_ui::UI;

use crate::{
cli::{Command, RunArgs},
commands::CommandBase,
get_version,
run::Run,
Args,
};

#[tokio::test]
async fn test_run() -> Result<()> {
let dir = tempdir()?;
let repo_root = AbsoluteSystemPathBuf::try_from(dir.path())?;
let mut args = Args::default();
// Daemon does not work with run stub yet
let run_args = RunArgs {
no_daemon: true,
pkg_inference_root: Some(["apps", "my-app"].join(std::path::MAIN_SEPARATOR_STR)),
..Default::default()
};
args.command = Some(Command::Run(Box::new(run_args)));

let ui = UI::infer();

// Add package.json
repo_root
.join_component("package.json")
.create_with_contents("{\"workspaces\": [\"apps/*\"]}")?;
repo_root
.join_component("package-lock.json")
.create_with_contents("")?;
repo_root
.join_component("turbo.json")
.create_with_contents("{}")?;

let base = CommandBase::new(args, repo_root, get_version(), ui)?;
let mut run = Run::new(base);
run.run().await
}
}
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/task_graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ pub struct TaskDefinition {

// Persistent indicates whether the Task is expected to exit or not
// Tasks marked Persistent do not exit (e.g. --watch mode or dev servers)
persistent: bool,
pub persistent: bool,
}

impl BookkeepingTaskDefinition {
Expand Down
Loading