Skip to content

Commit

Permalink
introduce "tail tasks" concept
Browse files Browse the repository at this point in the history
[ci skip-build-wheels]
  • Loading branch information
Tom Dyas committed Sep 21, 2022
1 parent 5b68eb0 commit 35757b6
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 2 deletions.
13 changes: 11 additions & 2 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ use async_trait::async_trait;
use concrete_time::{Duration, TimeSpan};
use deepsize::DeepSizeOf;
use fs::{DirectoryDigest, RelativePath, EMPTY_DIRECTORY_DIGEST};
use futures::future::try_join_all;
use futures::future::{try_join_all, BoxFuture};
use futures::try_join;
use hashing::Digest;
use itertools::Itertools;
use parking_lot::Mutex;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use remexec::ExecutedActionMetadata;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -847,6 +848,7 @@ pub struct Context {
workunit_store: WorkunitStore,
build_id: String,
run_id: RunId,
tail_tasks: Arc<Mutex<Vec<BoxFuture<'static, ()>>>>,
}

impl Default for Context {
Expand All @@ -855,16 +857,23 @@ impl Default for Context {
workunit_store: WorkunitStore::new(false, log::Level::Debug),
build_id: String::default(),
run_id: RunId(0),
tail_tasks: Arc::default(),
}
}
}

impl Context {
pub fn new(workunit_store: WorkunitStore, build_id: String, run_id: RunId) -> Context {
pub fn new(
workunit_store: WorkunitStore,
build_id: String,
run_id: RunId,
tail_tasks: Arc<Mutex<Vec<BoxFuture<'static, ()>>>>,
) -> Context {
Context {
workunit_store,
build_id,
run_id,
tail_tasks,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,6 +895,7 @@ async fn sends_headers() {
workunit_store: WorkunitStore::new(false, log::Level::Debug),
build_id: String::from("marmosets"),
run_id: RunId(0),
..Context::default()
};
command_runner
.run(context, &mut workunit, execute_request)
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ impl ExecuteProcess {
context.session.workunit_store(),
context.session.build_id().to_string(),
context.session.run_id(),
context.session.tail_tasks(),
);

let res = command_runner
Expand Down
22 changes: 22 additions & 0 deletions src/rust/engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::time::{Duration, Instant};

use deepsize::DeepSizeOf;
use futures::{future, FutureExt};
use futures_core::future::BoxFuture;
use log::debug;
use tokio::time;

Expand Down Expand Up @@ -321,6 +322,10 @@ impl Scheduler {
refresh_delay = time::sleep(Self::refresh_delay(interval, deadline)).boxed();
}
res = &mut execution_task => {
// Wait for tail tasks to complete.
let tail_tasks = session.tail_tasks().lock().drain(..).collect();
Self::wait_for_tail_tasks(tail_tasks).await;

// Completed successfully.
break Ok(Self::execute_record_results(&request.roots, session, res));
}
Expand All @@ -331,6 +336,23 @@ impl Scheduler {
})
}

async fn wait_for_tail_tasks(tasks: Vec<BoxFuture<'static, ()>>) {
if !tasks.is_empty() {
log::trace!("waiting for {} tail tasks to complete", tasks.len());

let joined_tail_tasks_fut = futures::future::join_all(tasks);
let timeout_fut = time::timeout(Duration::from_secs(5), joined_tail_tasks_fut);
match timeout_fut.await {
Ok(_) => {
log::trace!("tail tasks completed successfully");
}
Err(_) => {
log::trace!("tail tasks failed to complete within timeout");
}
}
}
}

fn refresh_delay(refresh_interval: Duration, deadline: Option<Instant>) -> Duration {
deadline
.and_then(|deadline| deadline.checked_duration_since(Instant::now()))
Expand Down
11 changes: 11 additions & 0 deletions src/rust/engine/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::python::{Failure, Value};

use async_latch::AsyncLatch;
use futures::future::{self, AbortHandle, Abortable, FutureExt};
use futures_core::future::BoxFuture;
use graph::LastObserved;
use log::warn;
use parking_lot::Mutex;
Expand Down Expand Up @@ -89,6 +90,8 @@ struct SessionState {
// entire Session, but in some cases (in particular, a `--loop`) the caller wants to retain the
// same Session while still observing new values for uncacheable rules like Goals.
run_id: AtomicU32,
/// Tasks to await at the "tail" of the session.
tail_tasks: Arc<Mutex<Vec<BoxFuture<'static, ()>>>>,
}

///
Expand Down Expand Up @@ -183,6 +186,7 @@ impl Session {
workunit_store,
session_values: Mutex::new(session_values),
run_id: AtomicU32::new(run_id.0),
tail_tasks: Arc::default(),
}),
})
}
Expand Down Expand Up @@ -366,6 +370,13 @@ impl Session {
}
}
}

/// Returns a Vec of futures representing an asynchronous "tail" task that should not block
/// individual nodes in the build graph but should block (up to a configurable timeout)
/// ending this `Session`.
pub fn tail_tasks(&self) -> Arc<Mutex<Vec<BoxFuture<'static, ()>>>> {
self.state.tail_tasks.clone()
}
}

///
Expand Down

0 comments on commit 35757b6

Please sign in to comment.