From 55ef228845afe532549aaa877a6ade5655421ebf Mon Sep 17 00:00:00 2001 From: Tom Dyas Date: Wed, 22 Jun 2022 13:31:44 -0400 Subject: [PATCH] Increment the missing-digest backtracking level once per attempt (Cherry pick of #15889) (#15897) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Increment the missing-digest backtracking level once per attempt (#15889) As described in #15867 (and reported in #15885 and #15888), backtracking for missing digests currently suffers from a race condition where if multiple consuming nodes observe a `MissingDigest`, the producing node may be restarted multiple times, leading to some backtracking levels being skipped. To address this, the backtracking level (née "attempt") is recorded in the result value, and is incremented exactly once per observation of a `MissingDigest` for a particular node and level, rather than once per run of the producing node (which can be restarted for multiple reasons, including cancellation). [ci skip-build-wheels] Co-authored-by: Stu Hood --- src/rust/engine/src/context.rs | 86 ++++++++++++++++++------------- src/rust/engine/src/intrinsics.rs | 2 +- src/rust/engine/src/nodes.rs | 47 +++++++++++------ 3 files changed, 80 insertions(+), 55 deletions(-) diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 798965c8a50..4a4b41b871f 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -372,7 +372,7 @@ impl Core { .unwrap_or_else(|| leaf_runner.clone()), full_store, local_cache, - remoting_opts.cache_eager_fetch, + eager_fetch, process_execution_metadata, )) } else { @@ -653,7 +653,7 @@ pub struct Context { /// Presence in this map at process runtime indicates that the pricess is being retried, and that /// there was something invalid or unusable about previous attempts. Successive attempts should /// run in a different mode (skipping caches, etc) to attempt to produce a valid result. - backtrack_attempts: Arc>>, + backtrack_levels: Arc>>, stats: Arc>, } @@ -665,7 +665,7 @@ impl Context { core, session, run_id, - backtrack_attempts: Arc::default(), + backtrack_levels: Arc::default(), stats: Arc::default(), } } @@ -704,37 +704,59 @@ impl Context { return result; }; - // Locate the source(s) of this Digest. + // Locate the source(s) of this Digest and their backtracking levels. // TODO: Currently needs a combination of `visit_live` and `invalidate_from_roots` because - // `invalidate_from_roots` cannot view `Node` results. This could lead to a race condition - // where a `Node` is invalidated multiple times, which might cause it to increment its attempt - // count multiple times. See https://github.com/pantsbuild/pants/issues/15867 - let mut roots = HashSet::new(); + // `invalidate_from_roots` cannot view `Node` results. Would be more efficient as a merged + // method. + let mut candidate_roots = Vec::new(); self.core.graph.visit_live(self, |k, v| match k { NodeKey::ExecuteProcess(p) if v.digests().contains(&digest) => { - roots.insert(p.clone()); + if let NodeOutput::ProcessResult(pr) = v { + candidate_roots.push((p.clone(), pr.backtrack_level)); + } } _ => (), }); - if roots.is_empty() { + if candidate_roots.is_empty() { // We did not identify any roots to invalidate: allow the Node to fail. return result; } - // Trigger backtrack attempts for the matched Nodes. - { - let mut backtrack_attempts = self.backtrack_attempts.lock(); - for root in &roots { - let attempt = backtrack_attempts.entry((**root).clone()).or_insert(1); - let description = &root.process.description; - workunit.increment_counter(Metric::BacktrackAttempts, 1); - log::warn!( - "Making attempt {attempt} to backtrack and retry `{description}`, due to \ - missing digest {digest:?}." - ); - } - } + // Attempt to trigger backtrack attempts for the matched Nodes. It's possible that we are not + // the first consumer to notice that this Node needs to backtrack, so we only actually report + // that we're backtracking if the new level is an increase from the old level. + let roots = candidate_roots + .into_iter() + .filter_map(|(root, invalid_level)| { + let next_level = invalid_level + 1; + let maybe_new_level = { + let mut backtrack_levels = self.backtrack_levels.lock(); + if let Some(old_backtrack_level) = backtrack_levels.get_mut(&root) { + if next_level > *old_backtrack_level { + *old_backtrack_level = next_level; + Some(next_level) + } else { + None + } + } else { + backtrack_levels.insert((*root).clone(), next_level); + Some(next_level) + } + }; + if let Some(new_level) = maybe_new_level { + workunit.increment_counter(Metric::BacktrackAttempts, 1); + let description = &root.process.description; + log::warn!( + "Making attempt {new_level} to backtrack and retry `{description}`, due to \ + missing digest {digest:?}." + ); + Some(root) + } else { + None + } + }) + .collect::>(); // Invalidate the matched roots. self @@ -753,22 +775,12 @@ impl Context { } /// - /// Called before executing a process to determine whether it is backtracking, and if so, to - /// increment the attempt count. + /// Called before executing a process to determine whether it is backtracking. /// - /// A process which has not been marked backtracking will always return 0, regardless of the - /// number of calls to this method. + /// A process which has not been marked backtracking will always return 0. /// pub fn maybe_start_backtracking(&self, node: &ExecuteProcess) -> usize { - let mut backtrack_attempts = self.backtrack_attempts.lock(); - let entry: Option<&mut usize> = backtrack_attempts.get_mut(node); - if let Some(entry) = entry { - let attempt = *entry; - *entry += 1; - attempt - } else { - 0 - } + self.backtrack_levels.lock().get(node).cloned().unwrap_or(0) } } @@ -790,7 +802,7 @@ impl NodeContext for Context { core: self.core.clone(), session: self.session.clone(), run_id: self.run_id, - backtrack_attempts: self.backtrack_attempts.clone(), + backtrack_levels: self.backtrack_levels.clone(), stats: self.stats.clone(), } } diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index 3f7ee6514a5..ef2a186a88e 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -179,7 +179,7 @@ fn process_request_to_process_result( .map_err(|e| e.enrich("Error lifting Process")) .await?; - let result = context.get(process_request).await?.0; + let result = context.get(process_request).await?.result; let store = context.core.store(); let (stdout_bytes, stderr_bytes) = try_join!( diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index b142cced741..4b420acc684 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -384,18 +384,22 @@ impl ExecuteProcess { self, context: Context, workunit: &mut RunningWorkunit, - attempt: usize, + backtrack_level: usize, ) -> NodeResult { let request = self.process; - let command_runner = context.core.command_runners.get(attempt).ok_or_else(|| { - // NB: We only backtrack for a Process if it produces a Digest which cannot be consumed - // from disk: if we've fallen all the way back to local execution, and even that - // produces an unreadable Digest, then there is a fundamental implementation issue. - throw(format!( - "Process {request:?} produced an invalid result on all configured command runners." - )) - })?; + let command_runner = context + .core + .command_runners + .get(backtrack_level) + .ok_or_else(|| { + // NB: We only backtrack for a Process if it produces a Digest which cannot be consumed + // from disk: if we've fallen all the way back to local execution, and even that + // produces an unreadable Digest, then there is a fundamental implementation issue. + throw(format!( + "Process {request:?} produced an invalid result on all configured command runners." + )) + })?; let execution_context = process_execution::Context::new( context.session.workunit_store(), @@ -456,7 +460,10 @@ impl ExecuteProcess { } } - Ok(ProcessResult(res)) + Ok(ProcessResult { + result: res, + backtrack_level, + }) } } @@ -471,7 +478,13 @@ impl WrappedNode for ExecuteProcess { } #[derive(Clone, Debug, DeepSizeOf, Eq, PartialEq)] -pub struct ProcessResult(pub process_execution::FallibleProcessResultWithPlatform); +pub struct ProcessResult { + pub result: process_execution::FallibleProcessResultWithPlatform, + /// The backtrack_level which produced this result. If a Digest from a particular result is + /// missing, the next attempt needs to use a higher level of backtracking (i.e.: remove more + /// caches). + pub backtrack_level: usize, +} /// /// A Node that represents reading the destination of a symlink (non-recursively). @@ -1375,8 +1388,8 @@ impl Node for NodeKey { NodeKey::DigestFile(n) => n.run_node(context).await.map(NodeOutput::FileDigest), NodeKey::DownloadedFile(n) => n.run_node(context).await.map(NodeOutput::Snapshot), NodeKey::ExecuteProcess(n) => { - let attempt = context.maybe_start_backtracking(&n); - n.run_node(context, workunit, attempt) + let backtrack_level = context.maybe_start_backtracking(&n); + n.run_node(context, workunit, backtrack_level) .await .map(|r| NodeOutput::ProcessResult(Box::new(r))) } @@ -1461,7 +1474,7 @@ impl Node for NodeKey { match ep.process.cache_scope { ProcessCacheScope::Always | ProcessCacheScope::PerRestartAlways => true, ProcessCacheScope::Successful | ProcessCacheScope::PerRestartSuccessful => { - process_result.0.exit_code == 0 + process_result.result.exit_code == 0 } ProcessCacheScope::PerSession => false, } @@ -1572,9 +1585,9 @@ impl NodeOutput { dd.digests() } NodeOutput::ProcessResult(p) => { - let mut digests = p.0.output_directory.digests(); - digests.push(p.0.stdout_digest); - digests.push(p.0.stderr_digest); + let mut digests = p.result.output_directory.digests(); + digests.push(p.result.stdout_digest); + digests.push(p.result.stderr_digest); digests } NodeOutput::DirectoryListing(_)