Skip to content

Commit

Permalink
Increment the missing-digest backtracking level once per attempt (Che…
Browse files Browse the repository at this point in the history
…rry pick of #15889) (#15897)

Increment the missing-digest backtracking level once per attempt (#15889)

As described in #15867 (and reported in #15885 and #15888), backtracking for missing digests currently suffers from a race condition where if multiple consuming nodes observe a `MissingDigest`, the producing node may be restarted multiple times, leading to some backtracking levels being skipped.

To address this, the backtracking level (née "attempt") is recorded in the result value, and is incremented exactly once per observation of a `MissingDigest` for a particular node and level, rather than once per run of the producing node (which can be restarted for multiple reasons, including cancellation).

[ci skip-build-wheels]

Co-authored-by: Stu Hood <stuhood@gmail.com>
  • Loading branch information
Tom Dyas and stuhood authored Jun 22, 2022
1 parent d91c57d commit 55ef228
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 55 deletions.
86 changes: 49 additions & 37 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl Core {
.unwrap_or_else(|| leaf_runner.clone()),
full_store,
local_cache,
remoting_opts.cache_eager_fetch,
eager_fetch,
process_execution_metadata,
))
} else {
Expand Down Expand Up @@ -653,7 +653,7 @@ pub struct Context {
/// Presence in this map at process runtime indicates that the pricess is being retried, and that
/// there was something invalid or unusable about previous attempts. Successive attempts should
/// run in a different mode (skipping caches, etc) to attempt to produce a valid result.
backtrack_attempts: Arc<Mutex<HashMap<ExecuteProcess, usize>>>,
backtrack_levels: Arc<Mutex<HashMap<ExecuteProcess, usize>>>,
stats: Arc<Mutex<graph::Stats>>,
}

Expand All @@ -665,7 +665,7 @@ impl Context {
core,
session,
run_id,
backtrack_attempts: Arc::default(),
backtrack_levels: Arc::default(),
stats: Arc::default(),
}
}
Expand Down Expand Up @@ -704,37 +704,59 @@ impl Context {
return result;
};

// Locate the source(s) of this Digest.
// Locate the source(s) of this Digest and their backtracking levels.
// TODO: Currently needs a combination of `visit_live` and `invalidate_from_roots` because
// `invalidate_from_roots` cannot view `Node` results. This could lead to a race condition
// where a `Node` is invalidated multiple times, which might cause it to increment its attempt
// count multiple times. See https://github.com/pantsbuild/pants/issues/15867
let mut roots = HashSet::new();
// `invalidate_from_roots` cannot view `Node` results. Would be more efficient as a merged
// method.
let mut candidate_roots = Vec::new();
self.core.graph.visit_live(self, |k, v| match k {
NodeKey::ExecuteProcess(p) if v.digests().contains(&digest) => {
roots.insert(p.clone());
if let NodeOutput::ProcessResult(pr) = v {
candidate_roots.push((p.clone(), pr.backtrack_level));
}
}
_ => (),
});

if roots.is_empty() {
if candidate_roots.is_empty() {
// We did not identify any roots to invalidate: allow the Node to fail.
return result;
}

// Trigger backtrack attempts for the matched Nodes.
{
let mut backtrack_attempts = self.backtrack_attempts.lock();
for root in &roots {
let attempt = backtrack_attempts.entry((**root).clone()).or_insert(1);
let description = &root.process.description;
workunit.increment_counter(Metric::BacktrackAttempts, 1);
log::warn!(
"Making attempt {attempt} to backtrack and retry `{description}`, due to \
missing digest {digest:?}."
);
}
}
// Attempt to trigger backtrack attempts for the matched Nodes. It's possible that we are not
// the first consumer to notice that this Node needs to backtrack, so we only actually report
// that we're backtracking if the new level is an increase from the old level.
let roots = candidate_roots
.into_iter()
.filter_map(|(root, invalid_level)| {
let next_level = invalid_level + 1;
let maybe_new_level = {
let mut backtrack_levels = self.backtrack_levels.lock();
if let Some(old_backtrack_level) = backtrack_levels.get_mut(&root) {
if next_level > *old_backtrack_level {
*old_backtrack_level = next_level;
Some(next_level)
} else {
None
}
} else {
backtrack_levels.insert((*root).clone(), next_level);
Some(next_level)
}
};
if let Some(new_level) = maybe_new_level {
workunit.increment_counter(Metric::BacktrackAttempts, 1);
let description = &root.process.description;
log::warn!(
"Making attempt {new_level} to backtrack and retry `{description}`, due to \
missing digest {digest:?}."
);
Some(root)
} else {
None
}
})
.collect::<HashSet<_>>();

// Invalidate the matched roots.
self
Expand All @@ -753,22 +775,12 @@ impl Context {
}

///
/// Called before executing a process to determine whether it is backtracking, and if so, to
/// increment the attempt count.
/// Called before executing a process to determine whether it is backtracking.
///
/// A process which has not been marked backtracking will always return 0, regardless of the
/// number of calls to this method.
/// A process which has not been marked backtracking will always return 0.
///
pub fn maybe_start_backtracking(&self, node: &ExecuteProcess) -> usize {
let mut backtrack_attempts = self.backtrack_attempts.lock();
let entry: Option<&mut usize> = backtrack_attempts.get_mut(node);
if let Some(entry) = entry {
let attempt = *entry;
*entry += 1;
attempt
} else {
0
}
self.backtrack_levels.lock().get(node).cloned().unwrap_or(0)
}
}

Expand All @@ -790,7 +802,7 @@ impl NodeContext for Context {
core: self.core.clone(),
session: self.session.clone(),
run_id: self.run_id,
backtrack_attempts: self.backtrack_attempts.clone(),
backtrack_levels: self.backtrack_levels.clone(),
stats: self.stats.clone(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ fn process_request_to_process_result(
.map_err(|e| e.enrich("Error lifting Process"))
.await?;

let result = context.get(process_request).await?.0;
let result = context.get(process_request).await?.result;

let store = context.core.store();
let (stdout_bytes, stderr_bytes) = try_join!(
Expand Down
47 changes: 30 additions & 17 deletions src/rust/engine/src/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,18 +384,22 @@ impl ExecuteProcess {
self,
context: Context,
workunit: &mut RunningWorkunit,
attempt: usize,
backtrack_level: usize,
) -> NodeResult<ProcessResult> {
let request = self.process;

let command_runner = context.core.command_runners.get(attempt).ok_or_else(|| {
// NB: We only backtrack for a Process if it produces a Digest which cannot be consumed
// from disk: if we've fallen all the way back to local execution, and even that
// produces an unreadable Digest, then there is a fundamental implementation issue.
throw(format!(
"Process {request:?} produced an invalid result on all configured command runners."
))
})?;
let command_runner = context
.core
.command_runners
.get(backtrack_level)
.ok_or_else(|| {
// NB: We only backtrack for a Process if it produces a Digest which cannot be consumed
// from disk: if we've fallen all the way back to local execution, and even that
// produces an unreadable Digest, then there is a fundamental implementation issue.
throw(format!(
"Process {request:?} produced an invalid result on all configured command runners."
))
})?;

let execution_context = process_execution::Context::new(
context.session.workunit_store(),
Expand Down Expand Up @@ -456,7 +460,10 @@ impl ExecuteProcess {
}
}

Ok(ProcessResult(res))
Ok(ProcessResult {
result: res,
backtrack_level,
})
}
}

Expand All @@ -471,7 +478,13 @@ impl WrappedNode for ExecuteProcess {
}

#[derive(Clone, Debug, DeepSizeOf, Eq, PartialEq)]
pub struct ProcessResult(pub process_execution::FallibleProcessResultWithPlatform);
pub struct ProcessResult {
pub result: process_execution::FallibleProcessResultWithPlatform,
/// The backtrack_level which produced this result. If a Digest from a particular result is
/// missing, the next attempt needs to use a higher level of backtracking (i.e.: remove more
/// caches).
pub backtrack_level: usize,
}

///
/// A Node that represents reading the destination of a symlink (non-recursively).
Expand Down Expand Up @@ -1375,8 +1388,8 @@ impl Node for NodeKey {
NodeKey::DigestFile(n) => n.run_node(context).await.map(NodeOutput::FileDigest),
NodeKey::DownloadedFile(n) => n.run_node(context).await.map(NodeOutput::Snapshot),
NodeKey::ExecuteProcess(n) => {
let attempt = context.maybe_start_backtracking(&n);
n.run_node(context, workunit, attempt)
let backtrack_level = context.maybe_start_backtracking(&n);
n.run_node(context, workunit, backtrack_level)
.await
.map(|r| NodeOutput::ProcessResult(Box::new(r)))
}
Expand Down Expand Up @@ -1461,7 +1474,7 @@ impl Node for NodeKey {
match ep.process.cache_scope {
ProcessCacheScope::Always | ProcessCacheScope::PerRestartAlways => true,
ProcessCacheScope::Successful | ProcessCacheScope::PerRestartSuccessful => {
process_result.0.exit_code == 0
process_result.result.exit_code == 0
}
ProcessCacheScope::PerSession => false,
}
Expand Down Expand Up @@ -1572,9 +1585,9 @@ impl NodeOutput {
dd.digests()
}
NodeOutput::ProcessResult(p) => {
let mut digests = p.0.output_directory.digests();
digests.push(p.0.stdout_digest);
digests.push(p.0.stderr_digest);
let mut digests = p.result.output_directory.digests();
digests.push(p.result.stdout_digest);
digests.push(p.result.stderr_digest);
digests
}
NodeOutput::DirectoryListing(_)
Expand Down

0 comments on commit 55ef228

Please sign in to comment.