Skip to content

Commit

Permalink
Add interruptible tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
NicholasLYang committed Oct 7, 2024
1 parent 5faae0c commit b4e033d
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 45 deletions.
47 changes: 24 additions & 23 deletions crates/turborepo-lib/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub struct Engine<S = Built> {
task_definitions: HashMap<TaskId<'static>, TaskDefinition>,
task_locations: HashMap<TaskId<'static>, Spanned<()>>,
package_tasks: HashMap<PackageName, Vec<petgraph::graph::NodeIndex>>,
pub(crate) has_persistent_tasks: bool,
pub(crate) has_non_interruptible_tasks: bool,
}

impl Engine<Building> {
Expand All @@ -60,7 +60,7 @@ impl Engine<Building> {
task_definitions: HashMap::default(),
task_locations: HashMap::default(),
package_tasks: HashMap::default(),
has_persistent_tasks: false,
has_non_interruptible_tasks: false,
}
}

Expand All @@ -87,8 +87,8 @@ impl Engine<Building> {
task_id: TaskId<'static>,
definition: TaskDefinition,
) -> Option<TaskDefinition> {
if definition.persistent {
self.has_persistent_tasks = true;
if definition.persistent && !definition.interruptible {
self.has_non_interruptible_tasks = true;
}
self.task_definitions.insert(task_id, definition)
}
Expand All @@ -115,7 +115,7 @@ impl Engine<Building> {
task_definitions,
task_locations,
package_tasks,
has_persistent_tasks: has_persistent_task,
has_non_interruptible_tasks,
..
} = self;
Engine {
Expand All @@ -126,7 +126,7 @@ impl Engine<Building> {
task_definitions,
task_locations,
package_tasks,
has_persistent_tasks: has_persistent_task,
has_non_interruptible_tasks,
}
}
}
Expand Down Expand Up @@ -169,7 +169,7 @@ impl Engine<Built> {
.get(task)
.expect("task should have definition");

if def.persistent {
if def.persistent && !def.interruptible {
return None;
}
}
Expand Down Expand Up @@ -208,13 +208,13 @@ impl Engine<Built> {
task_locations: self.task_locations.clone(),
package_tasks: self.package_tasks.clone(),
// We've filtered out persistent tasks
has_persistent_tasks: false,
has_non_interruptible_tasks: false,
}
}

/// Creates an `Engine` with persistent tasks filtered out. Used in watch
/// mode to re-run the non-persistent tasks.
pub fn create_engine_without_persistent_tasks(&self) -> Engine<Built> {
/// Creates an `Engine` with only interruptible tasks, i.e. non-persistent
/// tasks and persistent tasks that are allowed to be interrupted
pub fn create_engine_for_interruptible_tasks(&self) -> Engine<Built> {
let new_graph = self.task_graph.filter_map(
|node_idx, node| match &self.task_graph[node_idx] {
TaskNode::Task(task) => {
Expand All @@ -223,7 +223,7 @@ impl Engine<Built> {
.get(task)
.expect("task should have definition");

if !def.persistent {
if !def.persistent || def.interruptible {
Some(node.clone())
} else {
None
Expand Down Expand Up @@ -260,12 +260,13 @@ impl Engine<Built> {
task_definitions: self.task_definitions.clone(),
task_locations: self.task_locations.clone(),
package_tasks: self.package_tasks.clone(),
has_persistent_tasks: false,
has_non_interruptible_tasks: false,
}
}

/// Creates an `Engine` that is only the persistent tasks.
pub fn create_engine_for_persistent_tasks(&self) -> Engine<Built> {
/// Creates an `Engine` that is only the tasks that are not interruptible,
/// i.e. persistent and not allowed to be restarted
pub fn create_engine_for_non_interruptible_tasks(&self) -> Engine<Built> {
let mut new_graph = self.task_graph.filter_map(
|node_idx, node| match &self.task_graph[node_idx] {
TaskNode::Task(task) => {
Expand All @@ -274,7 +275,7 @@ impl Engine<Built> {
.get(task)
.expect("task should have definition");

if def.persistent {
if def.persistent && !def.interruptible {
Some(node.clone())
} else {
None
Expand Down Expand Up @@ -320,7 +321,7 @@ impl Engine<Built> {
task_definitions: self.task_definitions.clone(),
task_locations: self.task_locations.clone(),
package_tasks: self.package_tasks.clone(),
has_persistent_tasks: true,
has_non_interruptible_tasks: true,
}
}

Expand Down Expand Up @@ -711,20 +712,20 @@ mod test {

let engine = engine.seal();

let persistent_tasks_engine = engine.create_engine_for_persistent_tasks();
for node in persistent_tasks_engine.tasks() {
let non_interruptible_tasks_engine = engine.create_engine_for_non_interruptible_tasks();
for node in non_interruptible_tasks_engine.tasks() {
if let TaskNode::Task(task_id) = node {
let def = persistent_tasks_engine
let def = non_interruptible_tasks_engine
.task_definition(task_id)
.expect("task should have definition");
assert!(def.persistent, "task should be persistent");
}
}

let non_persistent_tasks_engine = engine.create_engine_without_persistent_tasks();
for node in non_persistent_tasks_engine.tasks() {
let interruptible_tasks_engine = engine.create_engine_for_interruptible_tasks();
for node in interruptible_tasks_engine.tasks() {
if let TaskNode::Task(task_id) = node {
let def = non_persistent_tasks_engine
let def = interruptible_tasks_engine
.task_definition(task_id)
.expect("task should have definition");
assert!(!def.persistent, "task should not be persistent");
Expand Down
12 changes: 6 additions & 6 deletions crates/turborepo-lib/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ type WuiResult = UIResult<WebUISender>;
type TuiResult = UIResult<TuiSender>;

impl Run {
fn has_persistent_tasks(&self) -> bool {
self.engine.has_persistent_tasks
fn has_non_interruptible_tasks(&self) -> bool {
self.engine.has_non_interruptible_tasks
}
fn print_run_prelude(&self) {
let targets_list = self.opts.run_opts.tasks.join(", ");
Expand Down Expand Up @@ -136,17 +136,17 @@ impl Run {
&self.root_turbo_json
}

pub fn create_run_for_persistent_tasks(&self) -> Self {
pub fn create_run_for_non_interruptible_tasks(&self) -> Self {
let mut new_run = self.clone();
let new_engine = new_run.engine.create_engine_for_persistent_tasks();
let new_engine = new_run.engine.create_engine_for_non_interruptible_tasks();
new_run.engine = Arc::new(new_engine);

new_run
}

pub fn create_run_without_persistent_tasks(&self) -> Self {
pub fn create_run_for_interruptible_tasks(&self) -> Self {
let mut new_run = self.clone();
let new_engine = new_run.engine.create_engine_without_persistent_tasks();
let new_engine = new_run.engine.create_engine_for_interruptible_tasks();
new_run.engine = Arc::new(new_engine);

new_run
Expand Down
3 changes: 3 additions & 0 deletions crates/turborepo-lib/src/run/summary/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ pub struct TaskSummaryTaskDefinition {
inputs: Vec<String>,
output_logs: OutputLogsMode,
persistent: bool,
interruptible: bool,
env: Vec<String>,
pass_through_env: Option<Vec<String>>,
interactive: bool,
Expand Down Expand Up @@ -280,6 +281,7 @@ impl From<TaskDefinition> for TaskSummaryTaskDefinition {
mut inputs,
output_logs,
persistent,
interruptible,
interactive,
env_mode,
} = value;
Expand Down Expand Up @@ -313,6 +315,7 @@ impl From<TaskDefinition> for TaskSummaryTaskDefinition {
inputs,
output_logs,
persistent,
interruptible,
interactive,
env,
pass_through_env,
Expand Down
57 changes: 43 additions & 14 deletions crates/turborepo-lib/src/run/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ impl ChangedPackages {
pub struct WatchClient {
run: Arc<Run>,
watched_packages: HashSet<PackageName>,
persistent_tasks_handle: Option<PersistentRunHandle>,
persistent_tasks_handle: Option<RunHandle>,
connector: DaemonConnector,
base: CommandBase,
telemetry: CommandEventBuilder,
Expand All @@ -57,7 +57,7 @@ pub struct WatchClient {
ui_handle: Option<JoinHandle<Result<(), turborepo_ui::Error>>>,
}

struct PersistentRunHandle {
struct RunHandle {
stopper: run::RunStopper,
run_task: JoinHandle<Result<i32, run::Error>>,
}
Expand Down Expand Up @@ -189,6 +189,7 @@ impl WatchClient {
};

let run_fut = async {
let mut run_handle: Option<RunHandle> = None;
loop {
notify_run.notified().await;
let some_changed_packages = {
Expand All @@ -197,8 +198,17 @@ impl WatchClient {
(!changed_packages_guard.is_empty())
.then(|| std::mem::take(changed_packages_guard.deref_mut()))
};

if let Some(changed_packages) = some_changed_packages {
self.execute_run(changed_packages).await?;
// Clean up currently running tasks
if let Some(RunHandle { stopper, run_task }) = run_handle.take() {
// Shut down the tasks for the run
stopper.stop().await;
// Run should exit shortly after we stop all child tasks, wait for it to
// finish to ensure all messages are flushed.
let _ = run_task.await;
}
run_handle = Some(self.execute_run(changed_packages).await?);
}
}
};
Expand Down Expand Up @@ -251,7 +261,13 @@ impl WatchClient {
Ok(())
}

async fn execute_run(&mut self, changed_packages: ChangedPackages) -> Result<i32, Error> {
/// Executes a run with the given changed packages. Splits the run into two
/// parts:
/// 1. The persistent tasks that are not allowed to be interrupted
/// 2. The non-persistent tasks and the persistent tasks that are allowed to
/// be interrupted
/// Returns a handle to the task running (2)

Check failure on line 269 in crates/turborepo-lib/src/run/watch.rs

View workflow job for this annotation

GitHub Actions / Rust lints

doc list item without indentation
async fn execute_run(&mut self, changed_packages: ChangedPackages) -> Result<RunHandle, Error> {
// Should we recover here?
trace!("handling run with changed packages: {changed_packages:?}");
match changed_packages {
Expand Down Expand Up @@ -303,7 +319,11 @@ impl WatchClient {
.map_err(|err| Error::UISend(err.to_string()))?;
}

Ok(run.run(self.ui_sender.clone(), true).await?)
let ui_sender = self.ui_sender.clone();
Ok(RunHandle {
stopper: run.stopper(),
run_task: tokio::spawn(async move { Ok(run.run(ui_sender, true).await?) }),

Check failure on line 325 in crates/turborepo-lib/src/run/watch.rs

View workflow job for this annotation

GitHub Actions / Rust lints

question mark operator is useless here
})
}
ChangedPackages::All => {
let mut args = self.base.args().clone();
Expand Down Expand Up @@ -339,9 +359,7 @@ impl WatchClient {
self.watched_packages = self.run.get_relevant_packages();

// Clean up currently running persistent tasks
if let Some(PersistentRunHandle { stopper, run_task }) =
self.persistent_tasks_handle.take()
{
if let Some(RunHandle { stopper, run_task }) = self.persistent_tasks_handle.take() {
// Shut down the tasks for the run
stopper.stop().await;
// Run should exit shortly after we stop all child tasks, wait for it to finish
Expand All @@ -355,27 +373,38 @@ impl WatchClient {
.map_err(|err| Error::UISend(err.to_string()))?;
}

if self.run.has_persistent_tasks() {
if self.run.has_non_interruptible_tasks() {
debug_assert!(
self.persistent_tasks_handle.is_none(),
"persistent handle should be empty before creating a new one"
);
let persistent_run = self.run.create_run_for_persistent_tasks();
let persistent_run = self.run.create_run_for_non_interruptible_tasks();
let ui_sender = self.ui_sender.clone();
// If we have persistent tasks, we run them on a separate thread
// since persistent tasks don't finish
self.persistent_tasks_handle = Some(PersistentRunHandle {
self.persistent_tasks_handle = Some(RunHandle {
stopper: persistent_run.stopper(),
run_task: tokio::spawn(
async move { persistent_run.run(ui_sender, true).await },
),
});

// But we still run the regular tasks blocking
let non_persistent_run = self.run.create_run_without_persistent_tasks();
Ok(non_persistent_run.run(self.ui_sender.clone(), true).await?)
let non_persistent_run = self.run.create_run_for_interruptible_tasks();
let ui_sender = self.ui_sender.clone();
Ok(RunHandle {
stopper: non_persistent_run.stopper(),
run_task: tokio::spawn(async move {
Ok(non_persistent_run.run(ui_sender, true).await?)

Check failure on line 398 in crates/turborepo-lib/src/run/watch.rs

View workflow job for this annotation

GitHub Actions / Rust lints

question mark operator is useless here
}),
})
} else {
Ok(self.run.run(self.ui_sender.clone(), true).await?)
let ui_sender = self.ui_sender.clone();
let run = self.run.clone();
Ok(RunHandle {
stopper: run.stopper(),
run_task: tokio::spawn(async move { Ok(run.run(ui_sender, true).await?) }),

Check failure on line 406 in crates/turborepo-lib/src/run/watch.rs

View workflow job for this annotation

GitHub Actions / Rust lints

question mark operator is useless here
})
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions crates/turborepo-lib/src/task_graph/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,14 @@ pub struct TaskDefinition {
pub(crate) output_logs: OutputLogsMode,

// Persistent indicates whether the Task is expected to exit or not
// Tasks marked Persistent do not exit (e.g. --watch mode or dev servers)
// Tasks marked Persistent do not exit (e.g. watch mode or dev servers)
pub persistent: bool,

// Interactive marks that a task can have it's stdin written to.
// Indicates whether a persistent task can be interrupted in the middle of execution
// by watch mode
pub interruptible: bool,

// Interactive marks that a task can have its stdin written to.
// Tasks that take stdin input cannot be cached as their outputs may depend on the
// input.
pub interactive: bool,
Expand All @@ -93,6 +97,7 @@ impl Default for TaskDefinition {
inputs: Default::default(),
output_logs: Default::default(),
persistent: Default::default(),
interruptible: Default::default(),
interactive: Default::default(),
env_mode: Default::default(),
}
Expand Down
4 changes: 4 additions & 0 deletions crates/turborepo-lib/src/turbo_json/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ pub struct RawTaskDefinition {
#[serde(skip_serializing_if = "Option::is_none")]
persistent: Option<Spanned<bool>>,
#[serde(skip_serializing_if = "Option::is_none")]
interruptible: Option<Spanned<bool>>,
#[serde(skip_serializing_if = "Option::is_none")]
outputs: Option<Vec<Spanned<UnescapedString>>>,
#[serde(skip_serializing_if = "Option::is_none")]
output_logs: Option<Spanned<OutputLogsMode>>,
Expand Down Expand Up @@ -257,6 +259,7 @@ impl RawTaskDefinition {
set_field!(self, other, inputs);
set_field!(self, other, output_logs);
set_field!(self, other, persistent);
set_field!(self, other, interruptible);
set_field!(self, other, env);
set_field!(self, other, pass_through_env);
set_field!(self, other, interactive);
Expand Down Expand Up @@ -408,6 +411,7 @@ impl TryFrom<RawTaskDefinition> for TaskDefinition {
pass_through_env,
output_logs: *raw_task.output_logs.unwrap_or_default(),
persistent: *raw_task.persistent.unwrap_or_default(),
interruptible: *raw_task.interruptible.unwrap_or_default(),
interactive,
env_mode: raw_task.env_mode,
})
Expand Down
2 changes: 2 additions & 0 deletions crates/turborepo-lib/src/turbo_json/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl WithMetadata for RawTaskDefinition {
self.inputs.add_text(text.clone());
self.pass_through_env.add_text(text.clone());
self.persistent.add_text(text.clone());
self.interruptible.add_text(text.clone());
self.outputs.add_text(text.clone());
self.output_logs.add_text(text.clone());
self.interactive.add_text(text);
Expand All @@ -160,6 +161,7 @@ impl WithMetadata for RawTaskDefinition {
self.inputs.add_path(path.clone());
self.pass_through_env.add_path(path.clone());
self.persistent.add_path(path.clone());
self.interruptible.add_path(path.clone());
self.outputs.add_path(path.clone());
self.output_logs.add_path(path.clone());
self.interactive.add_path(path);
Expand Down

0 comments on commit b4e033d

Please sign in to comment.