Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record ongoing backtrack attempts (Cherry-pick of #16075) #16080

Merged
merged 2 commits into from
Jul 7, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 25 additions & 5 deletions src/rust/engine/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::time::Duration;

use crate::intrinsics::Intrinsics;
use crate::nodes::{ExecuteProcess, NodeKey, NodeOutput, NodeResult, WrappedNode};
use crate::python::Failure;
use crate::python::{throw, Failure};
use crate::session::{Session, Sessions};
use crate::tasks::{Rule, Tasks};
use crate::types::Types;
Expand All @@ -22,6 +22,7 @@ use async_oncecell::OnceCell;
use cache::PersistentCache;
use fs::{safe_create_dir_all_ioerror, GitignoreStyleExcludes, PosixFS};
use graph::{self, EntryId, Graph, InvalidationResult, NodeContext};
use hashing::Digest;
use log::info;
use parking_lot::Mutex;
use process_execution::{
Expand Down Expand Up @@ -650,10 +651,12 @@ pub struct Context {
run_id: RunId,
/// The number of attempts which have been made to backtrack to a particular ExecuteProcess node.
///
/// Presence in this map at process runtime indicates that the pricess is being retried, and that
/// Presence in this map at process runtime indicates that the process is being retried, and that
/// there was something invalid or unusable about previous attempts. Successive attempts should
/// run in a different mode (skipping caches, etc) to attempt to produce a valid result.
backtrack_levels: Arc<Mutex<HashMap<ExecuteProcess, usize>>>,
/// The Digests that we have successfully invalidated a Node for.
backtrack_digests: Arc<Mutex<HashSet<Digest>>>,
stats: Arc<Mutex<graph::Stats>>,
}

Expand All @@ -666,6 +669,7 @@ impl Context {
session,
run_id,
backtrack_levels: Arc::default(),
backtrack_digests: Arc::default(),
stats: Arc::default(),
}
}
Expand Down Expand Up @@ -704,7 +708,7 @@ impl Context {
return result;
};

// Locate the source(s) of this Digest and their backtracking levels.
// Locate live source(s) of this Digest and their backtracking levels.
// TODO: Currently needs a combination of `visit_live` and `invalidate_from_roots` because
// `invalidate_from_roots` cannot view `Node` results. Would be more efficient as a merged
// method.
Expand All @@ -719,8 +723,23 @@ impl Context {
});

if candidate_roots.is_empty() {
// We did not identify any roots to invalidate: allow the Node to fail.
return result;
// If there are no live sources of the Digest, see whether any have already been invalidated
// by other consumers.
if self.backtrack_digests.lock().get(&digest).is_some() {
// Some other consumer has already identified and invalidated the source of this Digest: we
// can wait for the attempt to complete.
return Err(Failure::Invalidated);
} else {
// There are no live or invalidated sources of this Digest. Directly fail.
return result.map_err(|e| {
throw(format!(
"Could not identify a process to backtrack to for: {e}"
))
});
}
} else {
// We have identified a Node to backtrack on. Record it.
self.backtrack_digests.lock().insert(digest);
}

// Attempt to trigger backtrack attempts for the matched Nodes. It's possible that we are not
Expand Down Expand Up @@ -803,6 +822,7 @@ impl NodeContext for Context {
session: self.session.clone(),
run_id: self.run_id,
backtrack_levels: self.backtrack_levels.clone(),
backtrack_digests: self.backtrack_digests.clone(),
stats: self.stats.clone(),
}
}
Expand Down