Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[jvm] Split nailgun digest from input file digests #13813

Merged
merged 4 commits into from
Dec 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion src/python/pants/backend/java/compile/javac.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ async def compile_java_source(
(
prefixed_direct_dependency_classpath_digest,
dest_dir_digest,
jdk_setup.digest,
*(
sources.snapshot.digest
for _, sources in component_members_and_java_source_files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,6 @@ async def analyze_java_source_dependencies(
)
),
)
merged_digest = await Get(
Digest,
MergeDigests(
(
tool_digest,
prefixed_source_files_digest,
)
),
)

analysis_output_path = "__source_analysis.json"

Expand All @@ -132,7 +123,7 @@ async def analyze_java_source_dependencies(
analysis_output_path,
source_path,
],
input_digest=merged_digest,
input_digest=prefixed_source_files_digest,
output_files=(analysis_output_path,),
use_nailgun=tool_digest,
append_only_caches=jdk_setup.append_only_caches,
Expand Down
29 changes: 15 additions & 14 deletions src/python/pants/backend/scala/compile/scalac.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,19 @@ async def compile_scala_source(
Digest, AddPrefix(merged_transitive_dependency_classpath_entries_digest, usercp)
)

merged_digest = await Get(
Digest,
MergeDigests(
(
prefixed_transitive_dependency_classpath_digest,
tool_classpath.digest,
jdk_setup.digest,
*(
sources.snapshot.digest
for _, sources in component_members_and_scala_source_files
),
)
merged_tool_digest, merged_input_digest = await MultiGet(
Get(Digest, MergeDigests((tool_classpath.digest, jdk_setup.digest))),
Get(
Digest,
MergeDigests(
(
prefixed_transitive_dependency_classpath_digest,
*(
sources.snapshot.digest
for _, sources in component_members_and_scala_source_files
),
)
),
),
)

Expand All @@ -183,8 +184,8 @@ async def compile_scala_source(
)
),
],
input_digest=merged_digest,
use_nailgun=jdk_setup.digest,
input_digest=merged_input_digest,
use_nailgun=merged_tool_digest,
output_files=(output_file,),
description=f"Compile {request.component} with scalac",
level=LogLevel.DEBUG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,6 @@ async def analyze_scala_source_dependencies(
)
),
)
merged_digest = await Get(
Digest,
MergeDigests(
(
tool_digest,
prefixed_source_files_digest,
)
),
)

analysis_output_path = "__source_analysis.json"

Expand All @@ -277,7 +268,7 @@ async def analyze_scala_source_dependencies(
analysis_output_path,
source_path,
],
input_digest=merged_digest,
input_digest=prefixed_source_files_digest,
output_files=(analysis_output_path,),
use_nailgun=tool_digest,
append_only_caches=jdk_setup.append_only_caches,
Expand Down
74 changes: 63 additions & 11 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use log::Level;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use remexec::ExecutedActionMetadata;
use serde::{Deserialize, Serialize};
use store::{SnapshotOps, SnapshotOpsError, Store};
use workunit_store::{in_workunit, RunId, RunningWorkunit, WorkunitMetadata, WorkunitStore};

pub mod cache;
Expand Down Expand Up @@ -177,6 +178,63 @@ fn serialize_level<S: serde::Serializer>(level: &log::Level, s: S) -> Result<S::
s.serialize_str(&level.to_string())
}

///
/// Input Digests for a process execution. The `complete` Digest is the computed union of all
/// inputs: the rest of the Digests should be disjoint (or have identical contents where they
/// overlap).
///
#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize)]
pub struct InputDigests {
///
/// All of the input Digests, merged.
///
pub complete: Digest,

///
/// The input files for the process execution, which will be materialize as mutable inputs in a
/// sandbox for the process.
///
pub input_files: Digest,

///
/// If non-empty, the Digest of a nailgun server to use to attempt to spawn the Process.
///
pub use_nailgun: Digest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just be called tool_files or tool_digest? Or would such a renaming be better done in its own PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(This also relates to the "reusable input digests" PR, maybe we should just call these kinds of digests "tool" since they are associated with the "tool" being run.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just be called tool_files or tool_digest? Or would such a renaming be better done in its own PR?

Maybe? But right now this being non-EMPTY_DIGEST is also treated as the trigger to attempt to spawn as nailgun, and I have been assuming that we would have a separate field for the immutable_input_*s, which don't need to differentiate between being for the tool or not. #12716 will be coming soon regardless, and can change the naming if that ends up looking right.

I think that Bazel calling them "tool_*" has to do with the fact that the worker protocol is not JVM specific (...neither is nailgun, really. but in practice).

}

impl InputDigests {
pub async fn new(
store: &Store,
input_files: Digest,
use_nailgun: Digest,
) -> Result<Self, SnapshotOpsError> {
let complete = store.merge(vec![input_files, use_nailgun]).await?;
Ok(Self {
complete,
input_files,
use_nailgun,
})
}

pub fn with_input_files(input_files: Digest) -> Self {
Self {
complete: input_files,
input_files,
use_nailgun: hashing::EMPTY_DIGEST,
}
}
}

impl Default for InputDigests {
fn default() -> Self {
Self {
complete: hashing::EMPTY_DIGEST,
input_files: hashing::EMPTY_DIGEST,
use_nailgun: hashing::EMPTY_DIGEST,
}
}
}

///
/// A process to be executed.
///
Expand Down Expand Up @@ -206,7 +264,10 @@ pub struct Process {
///
pub working_directory: Option<RelativePath>,

pub input_files: hashing::Digest,
///
/// All of the input digests for the process.
///
pub input_digests: InputDigests,

pub output_files: BTreeSet<RelativePath>,

Expand Down Expand Up @@ -251,14 +312,6 @@ pub struct Process {

pub platform_constraint: Option<Platform>,

///
/// If non-empty, the Digest of a nailgun server to use to attempt to spawn the Process.
///
/// TODO: Currently this Digest must be a subset of the `input_digest`, but we should consider
/// making it disjoint, and then automatically merging it.
///
pub use_nailgun: Digest,

pub cache_scope: ProcessCacheScope,
}

Expand All @@ -278,7 +331,7 @@ impl Process {
argv,
env: BTreeMap::new(),
working_directory: None,
input_files: hashing::EMPTY_DIGEST,
input_digests: InputDigests::default(),
output_files: BTreeSet::new(),
output_directories: BTreeSet::new(),
timeout: None,
Expand All @@ -287,7 +340,6 @@ impl Process {
append_only_caches: BTreeMap::new(),
jdk_home: None,
platform_constraint: None,
use_nailgun: hashing::EMPTY_DIGEST,
execution_slot_variable: None,
cache_scope: ProcessCacheScope::Successful,
}
Expand Down
29 changes: 16 additions & 13 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,18 @@ impl super::CommandRunner for CommandRunner {
}
};

// Prepare the workdir.
let exclusive_spawn = prepare_workdir(
workdir_path.clone(),
&req,
req.input_digests.complete,
context.clone(),
self.store.clone(),
self.executor.clone(),
self.named_caches(),
)
.await?;

workunit.increment_counter(Metric::LocalExecutionRequests, 1);
let res = self
.run_and_capture_workdir(
Expand All @@ -291,6 +303,7 @@ impl super::CommandRunner for CommandRunner {
self.executor.clone(),
workdir_path.clone(),
(),
exclusive_spawn,
self.platform(),
)
.map_err(|msg| {
Expand Down Expand Up @@ -453,21 +466,11 @@ pub trait CapturedWorkdir {
executor: task_executor::Executor,
workdir_path: PathBuf,
workdir_token: Self::WorkdirToken,
exclusive_spawn: bool,
platform: Platform,
) -> Result<FallibleProcessResultWithPlatform, String> {
let start_time = Instant::now();

// Prepare the workdir.
let exclusive_spawn = prepare_workdir(
workdir_path.clone(),
&req,
context.clone(),
store.clone(),
executor.clone(),
self.named_caches(),
)
.await?;

// Spawn the process.
// NB: We fully buffer up the `Stream` above into final `ChildResults` below and so could
// instead be using `CommandExt::output_async` above to avoid the `ChildResults::collect_from`
Expand Down Expand Up @@ -602,6 +605,7 @@ pub trait CapturedWorkdir {
pub async fn prepare_workdir(
workdir_path: PathBuf,
req: &Process,
input_digest: hashing::Digest,
context: Context,
store: Store,
executor: task_executor::Executor,
Expand All @@ -627,7 +631,6 @@ pub async fn prepare_workdir(
// non-determinism when paths overlap.
let store2 = store.clone();
let workdir_path_2 = workdir_path.clone();
let input_files = req.input_files;
in_workunit!(
context.workunit_store.clone(),
"setup_sandbox".to_owned(),
Expand All @@ -637,7 +640,7 @@ pub async fn prepare_workdir(
},
|_workunit| async move {
store2
.materialize_directory(workdir_path_2, input_files)
.materialize_directory(workdir_path_2, input_digest)
.await
},
)
Expand Down
6 changes: 3 additions & 3 deletions src/rust/engine/process_execution/src/local_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use testutil;

use crate::{
CacheDest, CacheName, CommandRunner as CommandRunnerTrait, Context,
FallibleProcessResultWithPlatform, NamedCaches, Platform, Process, RelativePath,
FallibleProcessResultWithPlatform, InputDigests, NamedCaches, Platform, Process, RelativePath,
};
use hashing::EMPTY_DIGEST;
use shell_quote::bash;
Expand Down Expand Up @@ -377,7 +377,7 @@ async fn test_directory_preservation() {

let mut process =
Process::new(argv.clone()).output_files(relative_paths(&["roland.ext"]).collect());
process.input_files = TestDirectory::nested().digest();
process.input_digests = InputDigests::with_input_files(TestDirectory::nested().digest());
process.working_directory = Some(RelativePath::new("cats").unwrap());

let result = run_command_locally_in_dir(
Expand Down Expand Up @@ -553,7 +553,7 @@ async fn working_directory() {
let mut process = Process::new(vec![find_bash(), "-c".to_owned(), "/bin/ls".to_string()]);
process.working_directory = Some(RelativePath::new("cats").unwrap());
process.output_directories = relative_paths(&["roland.ext"]).collect::<BTreeSet<_>>();
process.input_files = TestDirectory::nested().digest();
process.input_digests = InputDigests::with_input_files(TestDirectory::nested().digest());
process.timeout = one_second();
process.description = "confused-cat".to_string();

Expand Down
Loading