From 3abcb4c3c91837a4cf4c441541279c45b446837a Mon Sep 17 00:00:00 2001 From: Stu Hood Date: Fri, 10 Dec 2021 15:32:33 -0800 Subject: [PATCH] Add symlinked immutable Process inputs (#13848) As designed by @tdyas in #12716, this change adds `immutable_inputs` to `Process`, and adapts the JVM `@rules` to use them: for now, only for the tools themselves. A followup change will move all JVM classpath inputs to `immutable_inputs`. Nailgun support is now enabled by specifying the subset of the `immutable_inputs` which should be used to start the server: all others will be used to start the client. [ci skip-build-wheels] --- .../pants/backend/java/compile/javac.py | 3 +- .../java/dependency_inference/java_parser.py | 33 ++--- .../java_parser_launcher.py | 2 +- .../java/lint/google_java_format/rules.py | 17 +-- .../backend/java/package/deploy_jar_test.py | 5 +- src/python/pants/backend/java/test/junit.py | 37 +++-- .../pants/backend/scala/compile/scalac.py | 51 +++---- .../backend/scala/compile/scalac_plugins.py | 4 +- .../dependency_inference/scala_parser.py | 33 ++--- src/python/pants/backend/scala/goals/repl.py | 9 +- .../backend/scala/lint/scalafmt/rules.py | 42 +++--- src/python/pants/engine/process.py | 9 +- src/python/pants/engine/rules_test.py | 2 +- src/python/pants/jvm/jdk_rules.py | 20 ++- src/python/pants/jvm/jdk_rules_test.py | 2 +- .../pants/jvm/resolve/coursier_fetch.py | 10 +- .../pants/jvm/resolve/coursier_setup.py | 24 ++-- src/rust/engine/fs/store/src/snapshot_ops.rs | 5 +- .../process_execution/src/cache_tests.rs | 5 +- .../process_execution/src/immutable_inputs.rs | 83 +++++++++++ src/rust/engine/process_execution/src/lib.rs | 134 +++++++++++++++--- .../engine/process_execution/src/local.rs | 77 ++++++---- .../process_execution/src/local_tests.rs | 91 ++++++++++-- .../process_execution/src/nailgun/mod.rs | 58 ++++---- .../src/nailgun/nailgun_pool.rs | 21 ++- .../process_execution/src/nailgun/tests.rs | 20 +-- .../process_execution/src/named_caches.rs | 36 ++--- .../process_execution/src/remote_tests.rs | 102 ++++++++++++- src/rust/engine/process_executor/src/main.rs | 31 +--- src/rust/engine/src/context.rs | 14 +- src/rust/engine/src/externs/mod.rs | 7 +- src/rust/engine/src/intrinsics.rs | 33 +++-- src/rust/engine/src/nodes.rs | 35 +++-- 33 files changed, 698 insertions(+), 357 deletions(-) create mode 100644 src/rust/engine/process_execution/src/immutable_inputs.rs diff --git a/src/python/pants/backend/java/compile/javac.py b/src/python/pants/backend/java/compile/javac.py index d3676ae47b0..261c79b5d5d 100644 --- a/src/python/pants/backend/java/compile/javac.py +++ b/src/python/pants/backend/java/compile/javac.py @@ -184,7 +184,8 @@ async def compile_java_source( ), ], input_digest=merged_digest, - use_nailgun=jdk_setup.digest, + immutable_input_digests=jdk_setup.immutable_input_digests, + use_nailgun=jdk_setup.immutable_input_digests.keys(), append_only_caches=jdk_setup.append_only_caches, env=jdk_setup.env, output_directories=(dest_dir,), diff --git a/src/python/pants/backend/java/dependency_inference/java_parser.py b/src/python/pants/backend/java/dependency_inference/java_parser.py index dc4b1149302..5a57b442deb 100644 --- a/src/python/pants/backend/java/dependency_inference/java_parser.py +++ b/src/python/pants/backend/java/dependency_inference/java_parser.py @@ -15,7 +15,7 @@ ) from pants.backend.java.dependency_inference.types import JavaSourceDependencyAnalysis from pants.core.util_rules.source_files import SourceFiles -from pants.engine.fs import AddPrefix, Digest, DigestContents, MergeDigests +from pants.engine.fs import AddPrefix, Digest, DigestContents from pants.engine.process import BashBinary, FallibleProcessResult, Process, ProcessExecutionFailure from pants.engine.rules import Get, MultiGet, collect_rules, rule from pants.jvm.jdk_rules import JdkSetup @@ -84,33 +84,23 @@ async def analyze_java_source_dependencies( source_prefix = "__source_to_analyze" source_path = os.path.join(source_prefix, source_files.files[0]) processorcp_relpath = "__processorcp" + toolcp_relpath = "__toolcp" - ( - tool_classpath, - prefixed_processor_classfiles_digest, - prefixed_source_files_digest, - ) = await MultiGet( + (tool_classpath, prefixed_source_files_digest,) = await MultiGet( Get( MaterializedClasspath, MaterializedClasspathRequest( - prefix="__toolcp", artifact_requirements=(java_parser_artifact_requirements(),), ), ), - Get(Digest, AddPrefix(processor_classfiles.digest, processorcp_relpath)), Get(Digest, AddPrefix(source_files.snapshot.digest, source_prefix)), ) - tool_digest = await Get( - Digest, - MergeDigests( - ( - prefixed_processor_classfiles_digest, - tool_classpath.digest, - jdk_setup.digest, - ) - ), - ) + immutable_input_digests = { + **jdk_setup.immutable_input_digests, + toolcp_relpath: tool_classpath.digest, + processorcp_relpath: processor_classfiles.digest, + } analysis_output_path = "__source_analysis.json" @@ -118,14 +108,17 @@ async def analyze_java_source_dependencies( FallibleProcessResult, Process( argv=[ - *jdk_setup.args(bash, [*tool_classpath.classpath_entries(), processorcp_relpath]), + *jdk_setup.args( + bash, [*tool_classpath.classpath_entries(toolcp_relpath), processorcp_relpath] + ), "org.pantsbuild.javaparser.PantsJavaParserLauncher", analysis_output_path, source_path, ], input_digest=prefixed_source_files_digest, + immutable_input_digests=immutable_input_digests, output_files=(analysis_output_path,), - use_nailgun=tool_digest, + use_nailgun=immutable_input_digests.keys(), append_only_caches=jdk_setup.append_only_caches, env=jdk_setup.env, description=f"Analyzing {source_files.files[0]}", diff --git a/src/python/pants/backend/java/dependency_inference/java_parser_launcher.py b/src/python/pants/backend/java/dependency_inference/java_parser_launcher.py index 6d1fb089d2c..758a84d70e7 100644 --- a/src/python/pants/backend/java/dependency_inference/java_parser_launcher.py +++ b/src/python/pants/backend/java/dependency_inference/java_parser_launcher.py @@ -87,7 +87,6 @@ async def build_processors(bash: BashBinary, jdk_setup: JdkSetup) -> JavaParserC MergeDigests( ( materialized_classpath.digest, - jdk_setup.digest, source_digest, ) ), @@ -108,6 +107,7 @@ async def build_processors(bash: BashBinary, jdk_setup: JdkSetup) -> JavaParserC ], input_digest=merged_digest, append_only_caches=jdk_setup.append_only_caches, + immutable_input_digests=jdk_setup.immutable_input_digests, env=jdk_setup.env, output_directories=(dest_dir,), description=f"Compile {_LAUNCHER_BASENAME} import processors with javac", diff --git a/src/python/pants/backend/java/lint/google_java_format/rules.py b/src/python/pants/backend/java/lint/google_java_format/rules.py index ba44887d873..26a95d0b2f2 100644 --- a/src/python/pants/backend/java/lint/google_java_format/rules.py +++ b/src/python/pants/backend/java/lint/google_java_format/rules.py @@ -10,7 +10,7 @@ from pants.core.goals.fmt import FmtResult from pants.core.goals.lint import LintRequest, LintResult, LintResults from pants.core.util_rules.source_files import SourceFiles, SourceFilesRequest -from pants.engine.fs import Digest, MergeDigests +from pants.engine.fs import Digest from pants.engine.internals.selectors import Get, MultiGet from pants.engine.process import BashBinary, FallibleProcessResult, Process, ProcessResult from pants.engine.rules import collect_rules, rule @@ -69,7 +69,6 @@ async def setup_google_java_format( Get( MaterializedClasspath, MaterializedClasspathRequest( - prefix="__toolcp", lockfiles=(tool.resolved_lockfile(),), ), ), @@ -81,10 +80,11 @@ async def setup_google_java_format( else setup_request.request.prior_formatter_result ) - input_digest = await Get( - Digest, - MergeDigests([source_files_snapshot.digest, tool_classpath.digest, jdk_setup.digest]), - ) + toolcp_relpath = "__toolcp" + immutable_input_digests = { + **jdk_setup.immutable_input_digests, + toolcp_relpath: tool_classpath.digest, + } maybe_java16_or_higher_options = [] if jdk_setup.jre_major_version >= 16: @@ -97,7 +97,7 @@ async def setup_google_java_format( ] args = [ - *jdk_setup.args(bash, tool_classpath.classpath_entries()), + *jdk_setup.args(bash, tool_classpath.classpath_entries(toolcp_relpath)), *maybe_java16_or_higher_options, "com.google.googlejavaformat.java.Main", *(["--aosp"] if tool.aosp else []), @@ -107,7 +107,8 @@ async def setup_google_java_format( process = Process( argv=args, - input_digest=input_digest, + input_digest=source_files_snapshot.digest, + immutable_input_digests=immutable_input_digests, output_files=source_files_snapshot.files, append_only_caches=jdk_setup.append_only_caches, env=jdk_setup.env, diff --git a/src/python/pants/backend/java/package/deploy_jar_test.py b/src/python/pants/backend/java/package/deploy_jar_test.py index 54af8db2134..d9ee79e6d41 100644 --- a/src/python/pants/backend/java/package/deploy_jar_test.py +++ b/src/python/pants/backend/java/package/deploy_jar_test.py @@ -15,7 +15,6 @@ from pants.backend.java.target_types import rules as target_types_rules from pants.build_graph.address import Address from pants.core.goals.package import BuiltPackage -from pants.engine.fs import Digest, MergeDigests from pants.engine.process import BashBinary, Process, ProcessResult from pants.jvm import jdk_rules from pants.jvm.classpath import rules as classpath_rules @@ -323,15 +322,15 @@ def _deploy_jar_test(rule_runner: RuleRunner, target_name: str) -> None: jdk_setup = rule_runner.request(JdkSetup, []) bash = rule_runner.request(BashBinary, []) - input_digests = rule_runner.request(Digest, [MergeDigests([jdk_setup.digest, fat_jar.digest])]) process_result = rule_runner.request( ProcessResult, [ Process( argv=jdk_setup.args(bash, []) + ("-jar", "dave.jar"), description="Run that test jar", - input_digest=input_digests, + input_digest=fat_jar.digest, append_only_caches=jdk_setup.append_only_caches, + immutable_input_digests=jdk_setup.immutable_input_digests, env=jdk_setup.env, ) ], diff --git a/src/python/pants/backend/java/test/junit.py b/src/python/pants/backend/java/test/junit.py index 77b685a71a5..1a8071353c5 100644 --- a/src/python/pants/backend/java/test/junit.py +++ b/src/python/pants/backend/java/test/junit.py @@ -8,9 +8,9 @@ from pants.backend.java.target_types import JavaTestSourceField from pants.core.goals.test import TestDebugRequest, TestFieldSet, TestResult, TestSubsystem from pants.engine.addresses import Addresses -from pants.engine.fs import Digest, DigestSubset, MergeDigests, PathGlobs, RemovePrefix, Snapshot +from pants.engine.fs import Digest, DigestSubset, PathGlobs, RemovePrefix, Snapshot from pants.engine.process import BashBinary, FallibleProcessResult, Process -from pants.engine.rules import Get, collect_rules, rule +from pants.engine.rules import Get, MultiGet, collect_rules, rule from pants.engine.unions import UnionRule from pants.jvm.classpath import Classpath from pants.jvm.jdk_rules import JdkSetup @@ -40,18 +40,22 @@ async def run_junit_test( test_subsystem: TestSubsystem, field_set: JavaTestFieldSet, ) -> TestResult: - classpath = await Get(Classpath, Addresses([field_set.address])) - junit_classpath = await Get( - MaterializedClasspath, - MaterializedClasspathRequest( - prefix="__thirdpartycp", - lockfiles=(junit.resolved_lockfile(),), + classpath, junit_classpath = await MultiGet( + Get(Classpath, Addresses([field_set.address])), + Get( + MaterializedClasspath, + MaterializedClasspathRequest( + prefix="__thirdpartycp", + lockfiles=(junit.resolved_lockfile(),), + ), ), ) - merged_digest = await Get( - Digest, - MergeDigests((classpath.content.digest, jdk_setup.digest, junit_classpath.digest)), - ) + + toolcp_relpath = "__toolcp" + immutable_input_digests = { + **jdk_setup.immutable_input_digests, + toolcp_relpath: junit_classpath.digest, + } reports_dir_prefix = "__reports_dir" reports_dir = f"{reports_dir_prefix}/{field_set.address.path_safe_spec}" @@ -63,7 +67,11 @@ async def run_junit_test( Process( argv=[ *jdk_setup.args( - bash, [*classpath.classpath_entries(), *junit_classpath.classpath_entries()] + bash, + [ + *classpath.classpath_entries(), + *junit_classpath.classpath_entries(toolcp_relpath), + ], ), "org.junit.platform.console.ConsoleLauncher", *(("--classpath", user_classpath_arg) if user_classpath_arg else ()), @@ -72,7 +80,8 @@ async def run_junit_test( reports_dir, *junit.options.args, ], - input_digest=merged_digest, + input_digest=classpath.content.digest, + immutable_input_digests=immutable_input_digests, output_directories=(reports_dir,), append_only_caches=jdk_setup.append_only_caches, env=jdk_setup.env, diff --git a/src/python/pants/backend/scala/compile/scalac.py b/src/python/pants/backend/scala/compile/scalac.py index ea776f13efb..546167e3857 100644 --- a/src/python/pants/backend/scala/compile/scalac.py +++ b/src/python/pants/backend/scala/compile/scalac.py @@ -110,11 +110,14 @@ async def compile_scala_source( exit_code=0, ) - (tool_classpath, merged_transitive_dependency_classpath_entries_digest,) = await MultiGet( + toolcp_relpath = "__toolcp" + scalac_plugins_relpath = "__plugincp" + usercp = "__cp" + + (tool_classpath, merged_transitive_dependency_classpath_entries_digest) = await MultiGet( Get( MaterializedClasspath, MaterializedClasspathRequest( - prefix="__toolcp", artifact_requirements=( ArtifactRequirements.from_coordinates( [ @@ -143,32 +146,29 @@ async def compile_scala_source( ), ) - usercp = "__cp" prefixed_transitive_dependency_classpath_digest = await Get( Digest, AddPrefix(merged_transitive_dependency_classpath_entries_digest, usercp) ) - merged_tool_digest, merged_input_digest = await MultiGet( - Get( - Digest, - MergeDigests( - (tool_classpath.digest, scalac_plugins.classpath.digest, jdk_setup.digest) - ), - ), - Get( - Digest, - MergeDigests( - ( - prefixed_transitive_dependency_classpath_digest, - *( - sources.snapshot.digest - for _, sources in component_members_and_scala_source_files - ), - ) - ), + merged_input_digest = await Get( + Digest, + MergeDigests( + ( + prefixed_transitive_dependency_classpath_digest, + *( + sources.snapshot.digest + for _, sources in component_members_and_scala_source_files + ), + ) ), ) + immutable_input_digests = { + **jdk_setup.immutable_input_digests, + toolcp_relpath: tool_classpath.digest, + scalac_plugins_relpath: scalac_plugins.classpath.digest, + } + classpath_arg = ClasspathEntry.arg( ClasspathEntry.closure(direct_dependency_classpath_entries), prefix=usercp ) @@ -178,11 +178,11 @@ async def compile_scala_source( FallibleProcessResult, Process( argv=[ - *jdk_setup.args(bash, tool_classpath.classpath_entries()), + *jdk_setup.args(bash, tool_classpath.classpath_entries(toolcp_relpath)), "scala.tools.nsc.Main", "-bootclasspath", - ":".join(tool_classpath.classpath_entries()), - *scalac_plugins.args(), + ":".join(tool_classpath.classpath_entries(toolcp_relpath)), + *scalac_plugins.args(scalac_plugins_relpath), *(("-classpath", classpath_arg) if classpath_arg else ()), "-d", output_file, @@ -194,7 +194,8 @@ async def compile_scala_source( ), ], input_digest=merged_input_digest, - use_nailgun=merged_tool_digest, + immutable_input_digests=immutable_input_digests, + use_nailgun=immutable_input_digests.keys(), output_files=(output_file,), description=f"Compile {request.component} with scalac", level=LogLevel.DEBUG, diff --git a/src/python/pants/backend/scala/compile/scalac_plugins.py b/src/python/pants/backend/scala/compile/scalac_plugins.py index 2600b92133e..67a181b1d2a 100644 --- a/src/python/pants/backend/scala/compile/scalac_plugins.py +++ b/src/python/pants/backend/scala/compile/scalac_plugins.py @@ -122,8 +122,8 @@ class GlobalScalacPlugins: names: tuple[str, ...] classpath: MaterializedClasspath - def args(self) -> Iterator[str]: - scalac_plugins_arg = ":".join(self.classpath.classpath_entries()) + def args(self, prefix: str | None = None) -> Iterator[str]: + scalac_plugins_arg = ":".join(self.classpath.classpath_entries(prefix)) if scalac_plugins_arg: yield f"-Xplugin:{scalac_plugins_arg}" for name in self.names: diff --git a/src/python/pants/backend/scala/dependency_inference/scala_parser.py b/src/python/pants/backend/scala/dependency_inference/scala_parser.py index 23299b76c8c..2c7f239d5bc 100644 --- a/src/python/pants/backend/scala/dependency_inference/scala_parser.py +++ b/src/python/pants/backend/scala/dependency_inference/scala_parser.py @@ -229,33 +229,23 @@ async def analyze_scala_source_dependencies( source_prefix = "__source_to_analyze" source_path = os.path.join(source_prefix, source_files.files[0]) processorcp_relpath = "__processorcp" + toolcp_relpath = "__toolcp" - ( - tool_classpath, - prefixed_processor_classfiles_digest, - prefixed_source_files_digest, - ) = await MultiGet( + (tool_classpath, prefixed_source_files_digest,) = await MultiGet( Get( MaterializedClasspath, MaterializedClasspathRequest( - prefix="__toolcp", artifact_requirements=(SCALA_PARSER_ARTIFACT_REQUIREMENTS,), ), ), - Get(Digest, AddPrefix(processor_classfiles.digest, processorcp_relpath)), Get(Digest, AddPrefix(source_files.snapshot.digest, source_prefix)), ) - tool_digest = await Get( - Digest, - MergeDigests( - ( - prefixed_processor_classfiles_digest, - tool_classpath.digest, - jdk_setup.digest, - ) - ), - ) + immutable_input_digests = { + **jdk_setup.immutable_input_digests, + toolcp_relpath: tool_classpath.digest, + processorcp_relpath: processor_classfiles.digest, + } analysis_output_path = "__source_analysis.json" @@ -263,14 +253,17 @@ async def analyze_scala_source_dependencies( FallibleProcessResult, Process( argv=[ - *jdk_setup.args(bash, [*tool_classpath.classpath_entries(), processorcp_relpath]), + *jdk_setup.args( + bash, [*tool_classpath.classpath_entries(toolcp_relpath), processorcp_relpath] + ), "org.pantsbuild.backend.scala.dependency_inference.ScalaParser", analysis_output_path, source_path, ], input_digest=prefixed_source_files_digest, + immutable_input_digests=immutable_input_digests, output_files=(analysis_output_path,), - use_nailgun=tool_digest, + use_nailgun=immutable_input_digests.keys(), append_only_caches=jdk_setup.append_only_caches, env=jdk_setup.env, description=f"Analyzing {source_files.files[0]}", @@ -368,7 +361,6 @@ async def setup_scala_parser_classfiles( ( tool_classpath.digest, parser_classpath.digest, - jdk_setup.digest, source_digest, ) ), @@ -391,6 +383,7 @@ async def setup_scala_parser_classfiles( ], input_digest=merged_digest, append_only_caches=jdk_setup.append_only_caches, + immutable_input_digests=jdk_setup.immutable_input_digests, env=jdk_setup.env, output_directories=(dest_dir,), description="Compile Scala parser for dependency inference with scalac", diff --git a/src/python/pants/backend/scala/goals/repl.py b/src/python/pants/backend/scala/goals/repl.py index bb56fb5586d..7d4852cd103 100644 --- a/src/python/pants/backend/scala/goals/repl.py +++ b/src/python/pants/backend/scala/goals/repl.py @@ -73,9 +73,16 @@ async def create_scala_repl_request( Digest, AddPrefix(user_classpath.content.digest, user_classpath_prefix) ) + # TODO: Manually merging the `immutable_input_digests` since InteractiveProcess doesn't + # support them yet. See https://github.com/pantsbuild/pants/issues/13852. + jdk_setup_digests = await MultiGet( + Get(Digest, AddPrefix(digest, relpath)) + for relpath, digest in jdk_setup.immutable_input_digests.items() + ) + repl_digest = await Get( Digest, - MergeDigests([prefixed_user_classpath, tool_classpath.content.digest, jdk_setup.digest]), + MergeDigests([prefixed_user_classpath, tool_classpath.content.digest, *jdk_setup_digests]), ) return ReplRequest( diff --git a/src/python/pants/backend/scala/lint/scalafmt/rules.py b/src/python/pants/backend/scala/lint/scalafmt/rules.py index 4ba8585d38f..2712036248e 100644 --- a/src/python/pants/backend/scala/lint/scalafmt/rules.py +++ b/src/python/pants/backend/scala/lint/scalafmt/rules.py @@ -91,7 +91,7 @@ class ScalafmtConfigFiles: class SetupScalafmtPartition: merged_sources_digest: Digest jdk_invoke_args: tuple[str, ...] - tool_digest: Digest + immutable_input_digests: FrozenDict[str, Digest] config_file: str files: tuple[str, ...] check_only: bool @@ -162,8 +162,6 @@ async def setup_scalafmt_partition( ), ) - input_digest = await Get(Digest, MergeDigests([sources_digest, request.tool_digest])) - args = [ *request.jdk_invoke_args, "org.scalafmt.cli.Cli", @@ -176,9 +174,10 @@ async def setup_scalafmt_partition( process = Process( argv=args, - input_digest=input_digest, + input_digest=sources_digest, output_files=request.files, append_only_caches=jdk_setup.append_only_caches, + immutable_input_digests=request.immutable_input_digests, env=jdk_setup.env, description=f"Run `scalafmt` on {pluralize(len(request.files), 'file')}.", level=LogLevel.DEBUG, @@ -194,6 +193,7 @@ async def setup_scalafmt( jdk_setup: JdkSetup, bash: BashBinary, ) -> Setup: + toolcp_relpath = "__toolcp" source_files, tool_classpath = await MultiGet( Get( SourceFiles, @@ -202,7 +202,6 @@ async def setup_scalafmt( Get( MaterializedClasspath, MaterializedClasspathRequest( - prefix="__toolcp", lockfiles=(tool.resolved_lockfile(),), ), ), @@ -218,26 +217,19 @@ async def setup_scalafmt( ScalafmtConfigFiles, GatherScalafmtConfigFilesRequest(source_files_snapshot) ) - merged_sources_digest, tool_digest = await MultiGet( - Get( - Digest, - MergeDigests( - [ - source_files_snapshot.digest, - config_files.snapshot.digest, - ] - ), - ), - Get( - Digest, - MergeDigests( - [ - tool_classpath.digest, - jdk_setup.digest, - ] - ), + merged_sources_digest = await Get( + Digest, + MergeDigests( + [ + source_files_snapshot.digest, + config_files.snapshot.digest, + ] ), ) + immutable_input_digests = { + **jdk_setup.immutable_input_digests, + toolcp_relpath: tool_classpath.digest, + } # Partition the work by which source files share the same config file (regardless of directory). source_files_by_config_file: dict[str, set[str]] = defaultdict(set) @@ -247,13 +239,13 @@ async def setup_scalafmt( os.path.join(source_dir, name) for name in files_in_source_dir ) - jdk_invoke_args = jdk_setup.args(bash, tool_classpath.classpath_entries()) + jdk_invoke_args = jdk_setup.args(bash, tool_classpath.classpath_entries(toolcp_relpath)) partitions = await MultiGet( Get( Partition, SetupScalafmtPartition( merged_sources_digest=merged_sources_digest, - tool_digest=tool_digest, + immutable_input_digests=FrozenDict(immutable_input_digests), jdk_invoke_args=jdk_invoke_args, config_file=config_file, files=tuple(sorted(files)), diff --git a/src/python/pants/engine/process.py b/src/python/pants/engine/process.py index daf0c74f46a..f4498fe48e4 100644 --- a/src/python/pants/engine/process.py +++ b/src/python/pants/engine/process.py @@ -56,6 +56,8 @@ class Process: description: str = dataclasses.field(compare=False) level: LogLevel input_digest: Digest + immutable_input_digests: FrozenDict[str, Digest] + use_nailgun: tuple[str, ...] working_directory: str | None env: FrozenDict[str, str] append_only_caches: FrozenDict[str, str] @@ -63,7 +65,6 @@ class Process: output_directories: tuple[str, ...] timeout_seconds: int | float jdk_home: str | None - use_nailgun: Digest execution_slot_variable: str | None cache_scope: ProcessCacheScope platform: str | None @@ -75,6 +76,8 @@ def __init__( description: str, level: LogLevel = LogLevel.INFO, input_digest: Digest = EMPTY_DIGEST, + immutable_input_digests: Mapping[str, Digest] | None = None, + use_nailgun: Iterable[str] = (), working_directory: str | None = None, env: Mapping[str, str] | None = None, append_only_caches: Mapping[str, str] | None = None, @@ -82,7 +85,6 @@ def __init__( output_directories: Iterable[str] | None = None, timeout_seconds: int | float | None = None, jdk_home: str | None = None, - use_nailgun: Digest = EMPTY_DIGEST, execution_slot_variable: str | None = None, cache_scope: ProcessCacheScope = ProcessCacheScope.SUCCESSFUL, platform: Platform | None = None, @@ -119,6 +121,8 @@ def __init__( self.description = description self.level = level self.input_digest = input_digest + self.immutable_input_digests = FrozenDict(immutable_input_digests or {}) + self.use_nailgun = tuple(use_nailgun) self.working_directory = working_directory self.env = FrozenDict(env or {}) self.append_only_caches = FrozenDict(append_only_caches or {}) @@ -127,7 +131,6 @@ def __init__( # NB: A negative or None time value is normalized to -1 to ease the transfer to Rust. self.timeout_seconds = timeout_seconds if timeout_seconds and timeout_seconds > 0 else -1 self.jdk_home = jdk_home - self.use_nailgun = use_nailgun self.execution_slot_variable = execution_slot_variable self.cache_scope = cache_scope self.platform = platform.value if platform is not None else None diff --git a/src/python/pants/engine/rules_test.py b/src/python/pants/engine/rules_test.py index ae8ff341c32..f8535c533f7 100644 --- a/src/python/pants/engine/rules_test.py +++ b/src/python/pants/engine/rules_test.py @@ -43,7 +43,7 @@ def create_scheduler(rules, validate=True): ignore_patterns=[], use_gitignore=False, build_root=str(Path.cwd()), - local_execution_root_dir="./.pants.d", + local_execution_root_dir=".", named_caches_dir="./.pants.d/named_caches", ca_certs_path=None, rules=rules, diff --git a/src/python/pants/jvm/jdk_rules.py b/src/python/pants/jvm/jdk_rules.py index ea2bf375ad0..a8d20d8fbc0 100644 --- a/src/python/pants/jvm/jdk_rules.py +++ b/src/python/pants/jvm/jdk_rules.py @@ -3,6 +3,7 @@ from __future__ import annotations +import os import re import shlex import textwrap @@ -22,11 +23,13 @@ @dataclass(frozen=True) class JdkSetup: - digest: Digest + _digest: Digest nailgun_jar: str coursier: Coursier jre_major_version: int - jdk_preparation_script: ClassVar[str] = "__jdk.sh" + + bin_dir: ClassVar[str] = "__jdk" + jdk_preparation_script: ClassVar[str] = f"{bin_dir}/jdk.sh" java_home: ClassVar[str] = "__java_home" def args(self, bash: BashBinary, classpath_entries: Iterable[str]) -> tuple[str, ...]: @@ -46,6 +49,10 @@ def env(self) -> dict[str, str]: def append_only_caches(self) -> dict[str, str]: return self.coursier.append_only_caches + @property + def immutable_input_digests(self) -> dict[str, Digest]: + return {**self.coursier.immutable_input_digests, self.bin_dir: self._digest} + VERSION_REGEX = re.compile(r"version \"(.+?)\"") @@ -91,8 +98,8 @@ async def setup_jdk(coursier: Coursier, javac: JavacSubsystem, bash: BashBinary) "-c", f"$({java_home_command})/bin/java -version", ), - input_digest=coursier.digest, append_only_caches=coursier.append_only_caches, + immutable_input_digests=coursier.immutable_input_digests, env=coursier.env, description=f"Ensure download of JDK {coursier_jdk_option}.", cache_scope=ProcessCacheScope.PER_RESTART_SUCCESSFUL, @@ -132,7 +139,7 @@ async def setup_jdk(coursier: Coursier, javac: JavacSubsystem, bash: BashBinary) CreateDigest( [ FileContent( - JdkSetup.jdk_preparation_script, + os.path.basename(JdkSetup.jdk_preparation_script), jdk_preparation_script.encode("utf-8"), is_executable=True, ), @@ -140,17 +147,16 @@ async def setup_jdk(coursier: Coursier, javac: JavacSubsystem, bash: BashBinary) ), ) return JdkSetup( - digest=await Get( + _digest=await Get( Digest, MergeDigests( [ - coursier.digest, jdk_preparation_script_digest, nailgun.digest, ] ), ), - nailgun_jar=nailgun.filenames[0], + nailgun_jar=os.path.join(JdkSetup.bin_dir, nailgun.filenames[0]), coursier=coursier, jre_major_version=jre_major_version, ) diff --git a/src/python/pants/jvm/jdk_rules_test.py b/src/python/pants/jvm/jdk_rules_test.py index 3c6e7a327b7..583e8a61f41 100644 --- a/src/python/pants/jvm/jdk_rules_test.py +++ b/src/python/pants/jvm/jdk_rules_test.py @@ -53,8 +53,8 @@ def run_javac_version(rule_runner: RuleRunner) -> str: *jdk_setup.args(bash, []), "-version", ], - input_digest=jdk_setup.digest, append_only_caches=jdk_setup.append_only_caches, + immutable_input_digests=jdk_setup.immutable_input_digests, env=jdk_setup.env, description="", ) diff --git a/src/python/pants/jvm/resolve/coursier_fetch.py b/src/python/pants/jvm/resolve/coursier_fetch.py index 94c1b59080f..d759ce2c63e 100644 --- a/src/python/pants/jvm/resolve/coursier_fetch.py +++ b/src/python/pants/jvm/resolve/coursier_fetch.py @@ -433,8 +433,6 @@ async def coursier_resolve_lockfile( CoursierResolveInfo, ArtifactRequirements, artifact_requirements ) - input_digest = await Get(Digest, MergeDigests([coursier_resolve_info.digest, coursier.digest])) - coursier_report_file_name = "coursier_report.json" process_result = await Get( ProcessResult, @@ -454,7 +452,8 @@ async def coursier_resolve_lockfile( ], wrapper=[bash.path, coursier.wrapper_script], ), - input_digest=input_digest, + input_digest=coursier_resolve_info.digest, + immutable_input_digests=coursier.immutable_input_digests, output_directories=("classpath",), output_files=(coursier_report_file_name,), append_only_caches=coursier.append_only_caches, @@ -595,8 +594,6 @@ async def coursier_fetch_one_coord( ArtifactRequirements([req]), ) - input_digest = await Get(Digest, MergeDigests([coursier.digest, coursier_resolve_info.digest])) - coursier_report_file_name = "coursier_report.json" process_result = await Get( ProcessResult, @@ -605,7 +602,8 @@ async def coursier_fetch_one_coord( [coursier_report_file_name, "--intransitive", *coursier_resolve_info.coord_strings], wrapper=[bash.path, coursier.wrapper_script], ), - input_digest=input_digest, + input_digest=coursier_resolve_info.digest, + immutable_input_digests=coursier.immutable_input_digests, output_directories=("classpath",), output_files=(coursier_report_file_name,), append_only_caches=coursier.append_only_caches, diff --git a/src/python/pants/jvm/resolve/coursier_setup.py b/src/python/pants/jvm/resolve/coursier_setup.py index 3b9e5a539cc..77594f38089 100644 --- a/src/python/pants/jvm/resolve/coursier_setup.py +++ b/src/python/pants/jvm/resolve/coursier_setup.py @@ -99,18 +99,20 @@ def generate_exe(self, plat: Platform) -> str: @dataclass(frozen=True) class Coursier: - """The Coursier tool and various utilities, materialized to a `Digest` and ready to use.""" + """The Coursier tool and various utilities, prepared for use via `immutable_input_digests`.""" coursier: DownloadedExternalTool - digest: Digest - wrapper_script: ClassVar[str] = "coursier_wrapper_script.sh" - post_processing_script: ClassVar[str] = "coursier_post_processing_script.py" + _digest: Digest + + bin_dir: ClassVar[str] = "__coursier" + wrapper_script: ClassVar[str] = f"{bin_dir}/coursier_wrapper_script.sh" + post_processing_script: ClassVar[str] = f"{bin_dir}/coursier_post_processing_script.py" cache_name: ClassVar[str] = "coursier" cache_dir: ClassVar[str] = ".cache" working_directory_placeholder: ClassVar[str] = "___COURSIER_WORKING_DIRECTORY___" def args(self, args: Iterable[str], *, wrapper: Iterable[str] = ()) -> tuple[str, ...]: - return tuple((*wrapper, self.coursier.exe, *args)) + return tuple((*wrapper, os.path.join(self.bin_dir, self.coursier.exe), *args)) @property def env(self) -> dict[str, str]: @@ -127,6 +129,10 @@ def env(self) -> dict[str, str]: def append_only_caches(self) -> dict[str, str]: return {self.cache_name: self.cache_dir} + @property + def immutable_input_digests(self) -> dict[str, Digest]: + return {self.bin_dir: self._digest} + @rule async def setup_coursier( @@ -148,7 +154,7 @@ async def setup_coursier( --json-output-file="$json_output_file" \ "${{@//{Coursier.working_directory_placeholder}/$working_dir}}" /bin/mkdir -p classpath - {python.path} coursier_post_processing_script.py "$json_output_file" + {python.path} {Coursier.bin_dir}/coursier_post_processing_script.py "$json_output_file" """ ) @@ -162,12 +168,12 @@ async def setup_coursier( CreateDigest( [ FileContent( - Coursier.wrapper_script, + os.path.basename(Coursier.wrapper_script), coursier_wrapper_script.encode("utf-8"), is_executable=True, ), FileContent( - Coursier.post_processing_script, + os.path.basename(Coursier.post_processing_script), COURSIER_POST_PROCESSING_SCRIPT.encode("utf-8"), is_executable=True, ), @@ -181,7 +187,7 @@ async def setup_coursier( return Coursier( coursier=downloaded_coursier, - digest=await Get( + _digest=await Get( Digest, MergeDigests( [ diff --git a/src/rust/engine/fs/store/src/snapshot_ops.rs b/src/rust/engine/fs/store/src/snapshot_ops.rs index 5d4855f77a9..b11fdcc6fd6 100644 --- a/src/rust/engine/fs/store/src/snapshot_ops.rs +++ b/src/rust/engine/fs/store/src/snapshot_ops.rs @@ -674,9 +674,8 @@ pub trait SnapshotOps: StoreWrapper + 'static { async fn add_prefix( &self, mut digest: Digest, - prefix: RelativePath, + prefix: &RelativePath, ) -> Result { - let prefix: PathBuf = prefix.into(); let mut prefix_iter = prefix.iter(); while let Some(parent) = prefix_iter.next_back() { let dir_node = remexec::DirectoryNode { @@ -780,7 +779,7 @@ pub trait SnapshotOps: StoreWrapper + 'static { snapshot_glob_match(self.clone(), digest, globs).await } - async fn create_empty_dir(&self, path: RelativePath) -> Result { + async fn create_empty_dir(&self, path: &RelativePath) -> Result { self.add_prefix(EMPTY_DIGEST, path).await } } diff --git a/src/rust/engine/process_execution/src/cache_tests.rs b/src/rust/engine/process_execution/src/cache_tests.rs index 04ee582b26b..422cb0a4c48 100644 --- a/src/rust/engine/process_execution/src/cache_tests.rs +++ b/src/rust/engine/process_execution/src/cache_tests.rs @@ -11,8 +11,8 @@ use testutil::relative_paths; use workunit_store::{RunningWorkunit, WorkunitStore}; use crate::{ - CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, NamedCaches, - Process, ProcessMetadata, + CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, ImmutableInputs, + NamedCaches, Process, ProcessMetadata, }; struct RoundtripResults { @@ -31,6 +31,7 @@ fn create_local_runner() -> (Box, Store, TempDir) { runtime.clone(), base_dir.path().to_owned(), NamedCaches::new(named_cache_dir), + ImmutableInputs::new(store.clone(), base_dir.path()).unwrap(), true, )); (runner, store, base_dir) diff --git a/src/rust/engine/process_execution/src/immutable_inputs.rs b/src/rust/engine/process_execution/src/immutable_inputs.rs new file mode 100644 index 00000000000..15c4b486f6d --- /dev/null +++ b/src/rust/engine/process_execution/src/immutable_inputs.rs @@ -0,0 +1,83 @@ +use std::collections::{BTreeMap, HashMap}; +use std::path::{Path, PathBuf}; +use std::sync::Arc; + +use double_checked_cell_async::DoubleCheckedCell; +use fs::{Permissions, RelativePath}; +use hashing::Digest; +use parking_lot::Mutex; +use store::Store; +use tempfile::TempDir; + +use crate::WorkdirSymlink; + +/// Holds Digests materialized into a temporary directory, for symlinking into local sandboxes. +pub struct ImmutableInputs { + store: Store, + // The TempDir that digests are materialized in. + workdir: TempDir, + // A map from Digest to the location it has been materialized at. The DoubleCheckedCell allows + // for cooperation between threads attempting to create Digests. + contents: Mutex>>>, +} + +impl ImmutableInputs { + pub fn new(store: Store, base: &Path) -> Result { + let workdir = TempDir::new_in(base).map_err(|e| { + format!( + "Failed to create temporary directory for immutable inputs: {}", + e + ) + })?; + Ok(Self { + store, + workdir, + contents: Mutex::default(), + }) + } + + /// Returns an absolute Path to immutably consume the given Digest from. + async fn path(&self, digest: Digest) -> Result { + let cell = self.contents.lock().entry(digest).or_default().clone(); + let value: Result<_, String> = cell + .get_or_try_init(async { + let digest_str = digest.hash.to_hex(); + + let path = self.workdir.path().join(digest_str); + self + .store + .materialize_directory(path.clone(), digest, Permissions::ReadOnly) + .await?; + Ok(path) + }) + .await; + Ok(value?.clone()) + } + + /// + /// Returns symlinks to create for the given set of immutable cache paths. + /// + pub(crate) async fn local_paths( + &self, + immutable_inputs: &BTreeMap, + ) -> Result, String> { + let dsts = futures::future::try_join_all( + immutable_inputs + .values() + .map(|d| self.path(*d)) + .collect::>(), + ) + .await?; + + Ok( + immutable_inputs + .keys() + .zip(dsts.into_iter()) + .map(|(src, dst)| WorkdirSymlink { + src: src.clone(), + dst, + }) + .collect(), + ) + } +} diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index a444dfca21c..cdf4b674d35 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -36,6 +36,8 @@ use async_semaphore::AsyncSemaphore; use async_trait::async_trait; use concrete_time::{Duration, TimeSpan}; use fs::RelativePath; +use futures::future::try_join_all; +use futures::try_join; use hashing::Digest; use log::Level; use protos::gen::build::bazel::remote::execution::v2 as remexec; @@ -50,10 +52,16 @@ mod cache_tests; pub mod children; +pub mod immutable_inputs; + pub mod local; #[cfg(test)] mod local_tests; +pub mod nailgun; + +pub mod named_caches; + pub mod remote; #[cfg(test)] pub mod remote_tests; @@ -62,14 +70,11 @@ pub mod remote_cache; #[cfg(test)] mod remote_cache_tests; -pub mod nailgun; - -pub mod named_caches; - extern crate uname; pub use crate::children::ManagedChild; -pub use crate::named_caches::{CacheDest, CacheName, NamedCaches}; +pub use crate::immutable_inputs::ImmutableInputs; +pub use crate::named_caches::{CacheName, NamedCaches}; pub use crate::remote_cache::RemoteCacheWarningsBehavior; #[derive(PartialOrd, Ord, Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] @@ -178,40 +183,90 @@ fn serialize_level(level: &log::Level, s: S) -> Result, + + /// If non-empty, use nailgun in supported runners, using the specified `immutable_inputs` keys + /// as server inputs. All other keys (and the input_files) will be client inputs. + pub use_nailgun: Vec, } impl InputDigests { pub async fn new( store: &Store, input_files: Digest, - use_nailgun: Digest, + immutable_inputs: BTreeMap, + use_nailgun: Vec, ) -> Result { - let complete = store.merge(vec![input_files, use_nailgun]).await?; + // Collect all digests into `complete`. + let mut complete_digests = try_join_all( + immutable_inputs + .iter() + .map(|(path, digest)| store.add_prefix(*digest, path)) + .collect::>(), + ) + .await?; + // And collect only the subset of the Digests which impact nailgun into `nailgun`. + let nailgun_digests = immutable_inputs + .keys() + .zip(complete_digests.iter()) + .filter_map(|(path, digest)| { + if use_nailgun.contains(path) { + Some(*digest) + } else { + None + } + }) + .collect::>(); + complete_digests.push(input_files); + + let (complete, nailgun) = + try_join!(store.merge(complete_digests), store.merge(nailgun_digests),)?; Ok(Self { complete, + nailgun, input_files, + immutable_inputs, use_nailgun, }) } @@ -219,18 +274,55 @@ impl InputDigests { pub fn with_input_files(input_files: Digest) -> Self { Self { complete: input_files, + nailgun: hashing::EMPTY_DIGEST, input_files, - use_nailgun: hashing::EMPTY_DIGEST, + immutable_inputs: BTreeMap::new(), + use_nailgun: Vec::new(), } } + + /// Split the InputDigests into client and server subsets. + /// + /// TODO: The server subset will have an accurate `complete` Digest, but the client will not. + /// This is currently safe because the nailgun client code does not consume that field, but it + /// would be good to find a better factoring. + pub fn nailgun_client_and_server(&self) -> (InputDigests, InputDigests) { + let (server, client) = self + .immutable_inputs + .clone() + .into_iter() + .partition(|(path, _digest)| self.use_nailgun.contains(path)); + + ( + // Client. + InputDigests { + // TODO: See method doc. + complete: hashing::EMPTY_DIGEST, + nailgun: hashing::EMPTY_DIGEST, + input_files: self.input_files, + immutable_inputs: client, + use_nailgun: vec![], + }, + // Server. + InputDigests { + complete: self.nailgun, + nailgun: hashing::EMPTY_DIGEST, + input_files: hashing::EMPTY_DIGEST, + immutable_inputs: server, + use_nailgun: vec![], + }, + ) + } } impl Default for InputDigests { fn default() -> Self { Self { complete: hashing::EMPTY_DIGEST, + nailgun: hashing::EMPTY_DIGEST, input_files: hashing::EMPTY_DIGEST, - use_nailgun: hashing::EMPTY_DIGEST, + immutable_inputs: BTreeMap::new(), + use_nailgun: Vec::new(), } } } @@ -298,7 +390,7 @@ pub struct Process { /// These caches are globally shared and so must be concurrency safe: a consumer of the cache /// must never assume that it has exclusive access to the provided directory. /// - pub append_only_caches: BTreeMap, + pub append_only_caches: BTreeMap, /// /// If present, a symlink will be created at .jdk which points to this directory for local @@ -374,7 +466,7 @@ impl Process { /// pub fn append_only_caches( mut self, - append_only_caches: BTreeMap, + append_only_caches: BTreeMap, ) -> Process { self.append_only_caches = append_only_caches; self diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index bf3ffe04c4e..470b7e3e0b4 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -33,7 +33,7 @@ use tryfuture::try_future; use workunit_store::{in_workunit, Level, Metric, RunningWorkunit, WorkunitMetadata}; use crate::{ - Context, FallibleProcessResultWithPlatform, NamedCaches, Platform, Process, + Context, FallibleProcessResultWithPlatform, ImmutableInputs, NamedCaches, Platform, Process, ProcessResultMetadata, ProcessResultSource, }; @@ -44,6 +44,7 @@ pub struct CommandRunner { executor: task_executor::Executor, work_dir_base: PathBuf, named_caches: NamedCaches, + immutable_inputs: ImmutableInputs, cleanup_local_dirs: bool, platform: Platform, spawn_lock: RwLock<()>, @@ -55,6 +56,7 @@ impl CommandRunner { executor: task_executor::Executor, work_dir_base: PathBuf, named_caches: NamedCaches, + immutable_inputs: ImmutableInputs, cleanup_local_dirs: bool, ) -> CommandRunner { CommandRunner { @@ -62,6 +64,7 @@ impl CommandRunner { executor, work_dir_base, named_caches, + immutable_inputs, cleanup_local_dirs, platform: Platform::current().unwrap(), spawn_lock: RwLock::new(()), @@ -125,6 +128,14 @@ impl CommandRunner { }) .boxed() } + + pub fn named_caches(&self) -> &NamedCaches { + &self.named_caches + } + + pub fn immutable_inputs(&self) -> &ImmutableInputs { + &self.immutable_inputs + } } pub struct HermeticCommand { @@ -286,11 +297,12 @@ impl super::CommandRunner for CommandRunner { let exclusive_spawn = prepare_workdir( workdir_path.clone(), &req, - req.input_digests.complete, + req.input_digests.input_files, context.clone(), self.store.clone(), self.executor.clone(), - self.named_caches(), + &self.named_caches, + &self.immutable_inputs, ) .await?; @@ -341,10 +353,6 @@ impl super::CommandRunner for CommandRunner { impl CapturedWorkdir for CommandRunner { type WorkdirToken = (); - fn named_caches(&self) -> &NamedCaches { - &self.named_caches - } - async fn run_in_workdir<'a, 'b, 'c>( &'a self, workdir_path: &'b Path, @@ -567,8 +575,6 @@ pub trait CapturedWorkdir { } } - fn named_caches(&self) -> &NamedCaches; - /// /// Spawn the given process in a working directory prepared with its expected input digest. /// @@ -596,24 +602,34 @@ pub trait CapturedWorkdir { ) -> Result>, String>; } -/// /// Prepares the given workdir for use by the given Process. /// /// Returns true if the executable for the Process was created in the workdir, indicating that /// `exclusive_spawn` is required. /// +/// TODO: Both the symlinks for named_caches/immutable_inputs and the empty output directories +/// required by the spec should be created via a synthetic Digest containing SymlinkNodes and +/// the empty output directories. That would: +/// 1. improve validation that nothing we create collides. +/// 2. allow for materialization to safely occur fully in parallel, rather than partially +/// synchronously in the background. +/// pub async fn prepare_workdir( workdir_path: PathBuf, req: &Process, - input_digest: hashing::Digest, + materialized_input_digest: hashing::Digest, context: Context, store: Store, executor: task_executor::Executor, named_caches: &NamedCaches, + immutable_inputs: &ImmutableInputs, ) -> Result { - // If named caches are configured, collect the symlinks to create. - let named_cache_symlinks = named_caches - .local_paths(&req.append_only_caches) + // Collect the symlinks to create for immutable inputs or named caches. + let workdir_symlinks = immutable_inputs + .local_paths(&req.input_digests.immutable_inputs) + .await? + .into_iter() + .chain(named_caches.local_paths(&req.append_only_caches)) .collect::>(); // Capture argv0 as the executable path so that we can test whether we have created it in the @@ -628,7 +644,7 @@ pub async fn prepare_workdir( // Start with async materialization of input snapshots, followed by synchronous materialization // of other configured inputs. Note that we don't do this in parallel, as that might cause - // non-determinism when paths overlap. + // non-determinism when paths overlap: see the method doc. let store2 = store.clone(); let workdir_path_2 = workdir_path.clone(); in_workunit!( @@ -640,7 +656,11 @@ pub async fn prepare_workdir( }, |_workunit| async move { store2 - .materialize_directory(workdir_path_2, input_digest, Permissions::Writable) + .materialize_directory( + workdir_path_2, + materialized_input_digest, + Permissions::Writable, + ) .await }, ) @@ -658,18 +678,12 @@ pub async fn prepare_workdir( } // The bazel remote execution API specifies that the parent directories for output files and - // output directories should be created before execution completes: see - // https://github.com/pantsbuild/pants/issues/7084. - // TODO: we use a HashSet to deduplicate directory paths to create, but it would probably be - // even more efficient to only retain the directories at greatest nesting depth, as - // create_dir_all() will ensure all parents are created. At that point, we might consider - // explicitly enumerating all the directories to be created and just using create_dir(), - // unless there is some optimization in create_dir_all() that makes that less efficient. + // output directories should be created before execution completes: see the method doc. let parent_paths_to_create: HashSet<_> = output_file_paths .iter() .chain(output_dir_paths.iter()) .map(|relative_path| relative_path.as_ref()) - .chain(named_cache_symlinks.iter().map(|s| s.dst.as_path())) + .chain(workdir_symlinks.iter().map(|s| s.src.as_path())) .filter_map(|rel_path| rel_path.parent()) .map(|parent_relpath| workdir_path2.join(parent_relpath)) .collect(); @@ -682,20 +696,21 @@ pub async fn prepare_workdir( })?; } - for named_cache_symlink in named_cache_symlinks { - safe_create_dir_all_ioerror(&named_cache_symlink.src).map_err(|err| { + for workdir_symlink in workdir_symlinks { + // TODO: Move initialization of the dst directory into NamedCaches. + safe_create_dir_all_ioerror(&workdir_symlink.dst).map_err(|err| { format!( "Error making {} for local execution: {:?}", - named_cache_symlink.src.display(), + workdir_symlink.dst.display(), err ) })?; - let dst = workdir_path2.join(&named_cache_symlink.dst); - symlink(&named_cache_symlink.src, &dst).map_err(|err| { + let src = workdir_path2.join(&workdir_symlink.src); + symlink(&workdir_symlink.dst, &src).map_err(|err| { format!( "Error linking {} -> {} for local execution: {:?}", - named_cache_symlink.src.display(), - dst.display(), + src.display(), + workdir_symlink.dst.display(), err ) })?; diff --git a/src/rust/engine/process_execution/src/local_tests.rs b/src/rust/engine/process_execution/src/local_tests.rs index 8edeffc74ec..f1c21785530 100644 --- a/src/rust/engine/process_execution/src/local_tests.rs +++ b/src/rust/engine/process_execution/src/local_tests.rs @@ -1,25 +1,27 @@ -use tempfile; -use testutil; - -use crate::{ - CacheDest, CacheName, CommandRunner as CommandRunnerTrait, Context, - FallibleProcessResultWithPlatform, InputDigests, NamedCaches, Platform, Process, RelativePath, -}; -use hashing::EMPTY_DIGEST; -use shell_quote::bash; -use spectral::{assert_that, string::StrAssertions}; use std; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::path::PathBuf; use std::str; use std::time::Duration; + +use hashing::EMPTY_DIGEST; +use maplit::hashset; +use shell_quote::bash; +use spectral::{assert_that, string::StrAssertions}; use store::Store; +use tempfile; use tempfile::TempDir; +use testutil; use testutil::data::{TestData, TestDirectory}; use testutil::path::{find_bash, which}; use testutil::{owned_string_vec, relative_paths}; use workunit_store::{RunningWorkunit, WorkunitStore}; +use crate::{ + CacheName, CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, + ImmutableInputs, InputDigests, NamedCaches, Platform, Process, RelativePath, +}; + #[derive(PartialEq, Debug)] struct LocalTestResult { original: FallibleProcessResultWithPlatform, @@ -306,7 +308,7 @@ async fn append_only_cache_created() { let name = "geo"; let dest_base = ".cache"; let cache_name = CacheName::new(name.to_owned()).unwrap(); - let cache_dest = CacheDest::new(format!("{}/{}", dest_base, name)).unwrap(); + let cache_dest = RelativePath::new(format!("{}/{}", dest_base, name)).unwrap(); let result = run_command_locally( Process::new(owned_string_vec(&["/bin/ls", dest_base])) .append_only_caches(vec![(cache_name, cache_dest)].into_iter().collect()), @@ -578,6 +580,68 @@ async fn working_directory() { assert_eq!(result.original.platform, Platform::current().unwrap()); } +#[tokio::test] +async fn immutable_inputs() { + let (_, mut workunit) = WorkunitStore::setup_for_tests(); + + let store_dir = TempDir::new().unwrap(); + let executor = task_executor::Executor::new(); + let store = Store::local_only(executor.clone(), store_dir.path()).unwrap(); + + store + .store_file_bytes(TestData::roland().bytes(), false) + .await + .expect("Error saving file bytes"); + store + .record_directory(&TestDirectory::containing_roland().directory(), true) + .await + .expect("Error saving directory"); + store + .record_directory(&TestDirectory::containing_falcons_dir().directory(), true) + .await + .expect("Error saving directory"); + + let work_dir = TempDir::new().unwrap(); + + let mut process = Process::new(vec![find_bash(), "-c".to_owned(), "/bin/ls".to_string()]); + process.input_digests = InputDigests::new( + &store, + TestDirectory::containing_falcons_dir().digest(), + { + let mut map = BTreeMap::new(); + map.insert( + RelativePath::new("cats").unwrap(), + TestDirectory::containing_roland().digest(), + ); + map + }, + vec![], + ) + .await + .unwrap(); + process.timeout = one_second(); + process.description = "confused-cat".to_string(); + + let result = run_command_locally_in_dir( + process, + work_dir.path().to_owned(), + true, + &mut workunit, + Some(store), + Some(executor), + ) + .await + .unwrap(); + + let stdout_lines = str::from_utf8(&result.stdout_bytes) + .unwrap() + .lines() + .collect::>(); + assert_eq!(stdout_lines, hashset! {"falcons", "cats"}); + assert_eq!(result.stderr_bytes, "".as_bytes()); + assert_eq!(result.original.exit_code, 0); +} + async fn run_command_locally(req: Process) -> Result { let (_, mut workunit) = WorkunitStore::setup_for_tests(); let work_dir = TempDir::new().unwrap(); @@ -601,8 +665,9 @@ async fn run_command_locally_in_dir( let runner = crate::local::CommandRunner::new( store.clone(), executor.clone(), - dir, + dir.clone(), NamedCaches::new(named_cache_dir.path().to_owned()), + ImmutableInputs::new(store.clone(), &dir)?, cleanup, ); let original = runner.run(Context::default(), workunit, req.into()).await?; diff --git a/src/rust/engine/process_execution/src/nailgun/mod.rs b/src/rust/engine/process_execution/src/nailgun/mod.rs index b62a7b3b44f..37f0eb5435e 100644 --- a/src/rust/engine/process_execution/src/nailgun/mod.rs +++ b/src/rust/engine/process_execution/src/nailgun/mod.rs @@ -13,9 +13,7 @@ use tokio::net::TcpStream; use workunit_store::{in_workunit, Metric, RunningWorkunit, WorkunitMetadata}; use crate::local::{prepare_workdir, CapturedWorkdir, ChildOutput}; -use crate::{ - Context, FallibleProcessResultWithPlatform, InputDigests, NamedCaches, Platform, Process, -}; +use crate::{Context, FallibleProcessResultWithPlatform, InputDigests, Platform, Process}; #[cfg(test)] pub mod tests; @@ -33,26 +31,16 @@ use parsed_jvm_command_lines::ParsedJVMCommandLines; static NAILGUN_MAIN_CLASS: &str = "com.martiansoftware.nailgun.NGServer"; static ARGS_TO_START_NAILGUN: [&str; 1] = [":0"]; -/// -/// Constructs the Process that would be used -/// to start the nailgun servers if we needed to. -/// fn construct_nailgun_server_request( + client_request: Process, + input_digests: InputDigests, nailgun_name: &str, args_for_the_jvm: Vec, - client_request: Process, ) -> Process { let mut full_args = args_for_the_jvm; full_args.push(NAILGUN_MAIN_CLASS.to_string()); full_args.extend(ARGS_TO_START_NAILGUN.iter().map(|&a| a.to_string())); - // Strip the input_files, preserving only the use_nailgun digest. - let input_digests = InputDigests { - complete: client_request.input_digests.use_nailgun, - use_nailgun: client_request.input_digests.use_nailgun, - input_files: hashing::EMPTY_DIGEST, - }; - Process { argv: full_args, input_digests, @@ -70,6 +58,7 @@ fn construct_nailgun_server_request( fn construct_nailgun_client_request( original_req: Process, + input_digests: InputDigests, client_main_class: String, mut client_args: Vec, ) -> Process { @@ -77,6 +66,7 @@ fn construct_nailgun_client_request( Process { argv: client_args, jdk_home: None, + input_digests, // The append_only_caches are created and preserved by the server. append_only_caches: BTreeMap::new(), ..original_req @@ -105,16 +95,9 @@ impl CommandRunner { executor: Executor, nailgun_pool_size: usize, ) -> Self { - let named_caches = runner.named_caches().clone(); CommandRunner { inner: runner, - nailgun_pool: NailgunPool::new( - workdir_base, - nailgun_pool_size, - store, - executor.clone(), - named_caches, - ), + nailgun_pool: NailgunPool::new(workdir_base, nailgun_pool_size, store, executor.clone()), executor, } } @@ -132,7 +115,7 @@ impl super::CommandRunner for CommandRunner { workunit: &mut RunningWorkunit, req: Process, ) -> Result { - if req.input_digests.use_nailgun == hashing::EMPTY_DIGEST { + if req.input_digests.use_nailgun.is_empty() { trace!("The request is not nailgunnable! Short-circuiting to regular process execution"); return self.inner.run(context, workunit, req).await; } @@ -160,18 +143,28 @@ impl super::CommandRunner for CommandRunner { client_main_class, .. } = ParsedJVMCommandLines::parse_command_lines(&req.argv)?; - let nailgun_req = construct_nailgun_server_request( - &CommandRunner::calculate_nailgun_name(&client_main_class), - nailgun_args, + let nailgun_name = CommandRunner::calculate_nailgun_name(&client_main_class); + let (client_input_digests, server_input_digests) = + req.input_digests.nailgun_client_and_server(); + let client_req = construct_nailgun_client_request( req.clone(), + client_input_digests, + client_main_class, + client_args, ); - let client_req = construct_nailgun_client_request(req, client_main_class, client_args); + let server_req = + construct_nailgun_server_request(req, server_input_digests, &nailgun_name, nailgun_args); trace!("Running request under nailgun:\n {:#?}", &client_req); // Get an instance of a nailgun server for this fingerprint, and then run in its directory. let mut nailgun_process = self .nailgun_pool - .acquire(nailgun_req, context.clone()) + .acquire( + server_req, + context.clone(), + self.inner.named_caches(), + self.inner.immutable_inputs(), + ) .await .map_err(|e| format!("Failed to connect to nailgun! {}", e))?; @@ -183,7 +176,8 @@ impl super::CommandRunner for CommandRunner { context.clone(), self.inner.store.clone(), self.executor.clone(), - self.named_caches(), + self.inner.named_caches(), + self.inner.immutable_inputs(), ) .await?; @@ -215,10 +209,6 @@ impl super::CommandRunner for CommandRunner { impl CapturedWorkdir for CommandRunner { type WorkdirToken = (String, SocketAddr); - fn named_caches(&self) -> &NamedCaches { - self.inner.named_caches() - } - async fn run_in_workdir<'a, 'b, 'c>( &'a self, workdir_path: &'b Path, diff --git a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs index 0595eb685a1..db76f61c823 100644 --- a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs +++ b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs @@ -22,7 +22,7 @@ use task_executor::Executor; use tempfile::TempDir; use crate::local::prepare_workdir; -use crate::{Context, NamedCaches, Process, ProcessMetadata}; +use crate::{Context, ImmutableInputs, NamedCaches, Process, ProcessMetadata}; lazy_static! { static ref NAILGUN_PORT_REGEX: Regex = Regex::new(r".*\s+port\s+(\d+)\.$").unwrap(); @@ -62,24 +62,16 @@ pub struct NailgunPool { size: usize, store: Store, executor: Executor, - named_caches: NamedCaches, processes: Arc>>, } impl NailgunPool { - pub fn new( - workdir_base: PathBuf, - size: usize, - store: Store, - executor: Executor, - named_caches: NamedCaches, - ) -> Self { + pub fn new(workdir_base: PathBuf, size: usize, store: Store, executor: Executor) -> Self { NailgunPool { workdir_base, size, store, executor, - named_caches, processes: Arc::default(), } } @@ -95,6 +87,8 @@ impl NailgunPool { &self, server_process: Process, context: Context, + named_caches: &NamedCaches, + immutable_inputs: &ImmutableInputs, ) -> Result { let name = server_process.description.clone(); let requested_fingerprint = NailgunProcessFingerprint::new(name.clone(), &server_process)?; @@ -138,7 +132,8 @@ impl NailgunPool { context, &self.store, self.executor.clone(), - &self.named_caches, + named_caches, + immutable_inputs, requested_fingerprint, ) .await?, @@ -346,6 +341,7 @@ impl NailgunProcess { store: &Store, executor: Executor, named_caches: &NamedCaches, + immutable_inputs: &ImmutableInputs, nailgun_server_fingerprint: NailgunProcessFingerprint, ) -> Result { let workdir = tempfile::Builder::new() @@ -360,11 +356,12 @@ impl NailgunProcess { prepare_workdir( workdir.path().to_owned(), &startup_options, - startup_options.input_digests.use_nailgun, + startup_options.input_digests.input_files, context.clone(), store.clone(), executor.clone(), named_caches, + immutable_inputs, ) .await?; let workdir_include_names = list_workdir(workdir.path()).await?; diff --git a/src/rust/engine/process_execution/src/nailgun/tests.rs b/src/rust/engine/process_execution/src/nailgun/tests.rs index a9e9f4bd51b..68d28dfa419 100644 --- a/src/rust/engine/process_execution/src/nailgun/tests.rs +++ b/src/rust/engine/process_execution/src/nailgun/tests.rs @@ -7,25 +7,27 @@ use testutil::owned_string_vec; use workunit_store::WorkunitStore; use crate::nailgun::NailgunPool; -use crate::{Context, NamedCaches, Process}; +use crate::{Context, ImmutableInputs, NamedCaches, Process}; -fn pool(size: usize) -> NailgunPool { +fn pool(size: usize) -> (NailgunPool, NamedCaches, ImmutableInputs) { let _ = WorkunitStore::setup_for_tests(); let named_caches_dir = TempDir::new().unwrap(); let store_dir = TempDir::new().unwrap(); let executor = Executor::new(); let store = Store::local_only(executor.clone(), store_dir.path()).unwrap(); - NailgunPool::new( - std::env::temp_dir(), - size, - store, - executor, + let base_dir = std::env::temp_dir(); + + let pool = NailgunPool::new(base_dir.clone(), size, store.clone(), executor); + ( + pool, NamedCaches::new(named_caches_dir.path().to_owned()), + ImmutableInputs::new(store, &base_dir).unwrap(), ) } -async fn run(pool: &NailgunPool, port: u16) -> PathBuf { +async fn run(pool: &(NailgunPool, NamedCaches, ImmutableInputs), port: u16) -> PathBuf { let mut p = pool + .0 .acquire( Process::new(owned_string_vec(&[ "/bin/bash", @@ -33,6 +35,8 @@ async fn run(pool: &NailgunPool, port: u16) -> PathBuf { &format!("echo Mock port {}.; sleep 10", port), ])), Context::default(), + &pool.1, + &pool.2, ) .await .unwrap(); diff --git a/src/rust/engine/process_execution/src/named_caches.rs b/src/rust/engine/process_execution/src/named_caches.rs index fec4f5fa090..b2f3d222d4a 100644 --- a/src/rust/engine/process_execution/src/named_caches.rs +++ b/src/rust/engine/process_execution/src/named_caches.rs @@ -3,8 +3,8 @@ use std::path::PathBuf; use serde::Serialize; -use crate::RelativePath; -use fs::default_cache_path; +use crate::WorkdirSymlink; +use fs::{default_cache_path, RelativePath}; #[derive(Clone, Debug, Eq, PartialEq, Hash, PartialOrd, Ord, Serialize)] pub struct CacheName(String); @@ -25,24 +25,6 @@ impl CacheName { } } -#[derive(Clone, Debug, Eq, PartialEq, Hash, Serialize)] -pub struct CacheDest(String); - -impl CacheDest { - pub fn new(dest: String) -> Result { - // We validate as RelativePath, but store as a String to avoid needing to assert later that - // path is valid unicode. - let _ = RelativePath::new(&dest)?; - Ok(CacheDest(dest)) - } -} - -#[derive(Debug)] -pub struct NamedCacheSymlink { - pub src: PathBuf, - pub dst: PathBuf, -} - #[derive(Clone)] pub struct NamedCaches { /// @@ -67,13 +49,13 @@ impl NamedCaches { /// pub fn local_paths<'a>( &'a self, - caches: &'a BTreeMap, - ) -> impl Iterator + 'a { + caches: &'a BTreeMap, + ) -> impl Iterator + 'a { caches .iter() - .map(move |(cache_name, cache_dest)| NamedCacheSymlink { - src: self.local_base.join(&cache_name.0), - dst: PathBuf::from(&cache_dest.0), + .map(move |(cache_name, workdir_rel_path)| WorkdirSymlink { + src: workdir_rel_path.clone(), + dst: self.local_base.join(&cache_name.0), }) } @@ -84,7 +66,7 @@ impl NamedCaches { /// See https://docs.google.com/document/d/1n_MVVGjrkTKTPKHqRPlyfFzQyx2QioclMG_Q3DMUgYk/edit#. /// pub fn platform_properties<'a>( - caches: &'a BTreeMap, + caches: &'a BTreeMap, namespace: &'a Option, ) -> impl Iterator + 'a { namespace @@ -93,7 +75,7 @@ impl NamedCaches { .chain(caches.iter().map(move |(cache_name, cache_dest)| { ( format!("x_append_only_cache:{}", cache_name.0), - cache_dest.0.clone(), + cache_dest.display().to_string(), ) })) } diff --git a/src/rust/engine/process_execution/src/remote_tests.rs b/src/rust/engine/process_execution/src/remote_tests.rs index bef105431d0..896e5f457b7 100644 --- a/src/rust/engine/process_execution/src/remote_tests.rs +++ b/src/rust/engine/process_execution/src/remote_tests.rs @@ -15,7 +15,7 @@ use protos::gen::google::longrunning::Operation; use remexec::ExecutedActionMetadata; use spectral::prelude::*; use spectral::{assert_that, string::StrAssertions}; -use store::Store; +use store::{SnapshotOps, Store}; use tempfile::TempDir; use testutil::data::{TestData, TestDirectory, TestTree}; use testutil::{owned_string_vec, relative_paths}; @@ -26,6 +26,7 @@ use crate::{ CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, InputDigests, Platform, Process, ProcessCacheScope, ProcessMetadata, }; +use fs::RelativePath; use std::any::type_name; use std::io::Cursor; use tonic::{Code, Status}; @@ -545,6 +546,105 @@ async fn make_execute_request_with_timeout() { ); } +#[tokio::test] +async fn make_execute_request_using_immutable_inputs() { + let executor = task_executor::Executor::new(); + let store_dir = TempDir::new().unwrap(); + let store = Store::local_only(executor, store_dir).unwrap(); + + let prefix = RelativePath::new("cats").unwrap(); + let input_directory = TestDirectory::containing_roland(); + let input_digests = InputDigests::new( + &store, + EMPTY_DIGEST, + { + let mut map = BTreeMap::new(); + map.insert(prefix.clone(), input_directory.digest()); + map + }, + vec![], + ) + .await + .unwrap(); + + // The computed input root digest should be prefixed will be prefixed with the mount point. + let expected_digest = store + .add_prefix(input_directory.digest(), &prefix) + .await + .unwrap(); + + let req = Process { + argv: owned_string_vec(&["/bin/echo", "yo"]), + env: vec![("SOME".to_owned(), "value".to_owned())] + .into_iter() + .collect(), + working_directory: None, + input_digests, + output_files: relative_paths(&["path/to/file.ext", "other/file.ext"]).collect(), + output_directories: relative_paths(&["directory/name"]).collect(), + timeout: None, + description: "some description".to_owned(), + level: log::Level::Info, + append_only_caches: BTreeMap::new(), + jdk_home: None, + platform_constraint: None, + execution_slot_variable: None, + cache_scope: ProcessCacheScope::Always, + }; + + let want_command = remexec::Command { + arguments: vec!["/bin/echo".to_owned(), "yo".to_owned()], + environment_variables: vec![ + remexec::command::EnvironmentVariable { + name: crate::remote::CACHE_KEY_TARGET_PLATFORM_ENV_VAR_NAME.to_owned(), + value: "none".to_owned(), + }, + remexec::command::EnvironmentVariable { + name: "SOME".to_owned(), + value: "value".to_owned(), + }, + ], + output_files: vec!["other/file.ext".to_owned(), "path/to/file.ext".to_owned()], + output_directories: vec!["directory/name".to_owned()], + platform: Some(remexec::Platform::default()), + ..Default::default() + }; + + let want_action = remexec::Action { + command_digest: Some( + (&Digest::new( + Fingerprint::from_hex_string( + "c426b29478ec1ddbd872fbfad63ae9151eb9196edcd1a10aa0aab3aa1b48eef8", + ) + .unwrap(), + 123, + )) + .into(), + ), + input_root_digest: Some((&expected_digest).into()), + ..Default::default() + }; + + let want_execute_request = remexec::ExecuteRequest { + action_digest: Some( + (&Digest::new( + Fingerprint::from_hex_string( + "2ec7e0e5e552ddf715ffec03d735ae4c3d6ccd4ad9647fb7aeaa43efec3450c4", + ) + .unwrap(), + 140, + )) + .into(), + ), + ..Default::default() + }; + + assert_eq!( + crate::remote::make_execute_request(&req, ProcessMetadata::default()), + Ok((want_action, want_command, want_execute_request)) + ); +} + #[tokio::test] async fn successful_with_only_call_to_execute() { WorkunitStore::setup_for_tests(); diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index bef4cc25573..2934311075b 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -33,9 +33,9 @@ use std::process::exit; use std::time::Duration; use fs::{Permissions, RelativePath}; -use hashing::{Digest, Fingerprint, EMPTY_DIGEST}; +use hashing::{Digest, Fingerprint}; use process_execution::{ - Context, InputDigests, NamedCaches, Platform, ProcessCacheScope, ProcessMetadata, + Context, ImmutableInputs, InputDigests, NamedCaches, Platform, ProcessCacheScope, ProcessMetadata, }; use prost::Message; use protos::gen::build::bazel::remote::execution::v2::{Action, Command}; @@ -188,15 +188,6 @@ struct Opt { #[structopt(long, default_value = "128")] cache_rpc_concurrency: usize, - /// If set, run the process through a Nailgun server with the given digest. - /// This will start a new Nailgun server as a side effect, but will tear it down on exit. - #[structopt(long)] - use_nailgun_digest: Option, - - /// Length of the `use_nailgun_digest` digest. - #[structopt(long)] - use_nailgun_digest_length: Option, - /// Overall timeout in seconds for each request from time of submission. #[structopt(long, default_value = "600")] overall_deadline_secs: u64, @@ -278,6 +269,7 @@ async fn main() { .chain(request.argv.into_iter()) .collect(); } + let workdir = args.work_dir.unwrap_or_else(std::env::temp_dir); let runner: Box = match args.server { Some(address) => { @@ -319,12 +311,13 @@ async fn main() { None => Box::new(process_execution::local::CommandRunner::new( store.clone(), executor, - args.work_dir.unwrap_or_else(std::env::temp_dir), + workdir.clone(), NamedCaches::new( args .named_cache_path .unwrap_or_else(NamedCaches::default_path), ), + ImmutableInputs::new(store.clone(), &workdir).unwrap(), true, )) as Box, }; @@ -420,18 +413,8 @@ async fn make_request_from_flat_args( }) .transpose()?; - let use_nailgun = match (args.use_nailgun_digest, args.use_nailgun_digest_length) { - (Some(d), Some(l)) => Digest::new(d, l), - (Some(_), None) | (None, Some(_)) => { - return Err( - "If either `use_nailgun_digest` or `use_nailgun_digest_length` are set, both must be." - .to_owned(), - ) - } - _ => EMPTY_DIGEST, - }; - - let input_digests = InputDigests::new(store, input_files, use_nailgun) + // TODO: Add support for immutable inputs. + let input_digests = InputDigests::new(store, input_files, BTreeMap::default(), vec![]) .await .map_err(|e| format!("Could not create input digest for process: {:?}", e))?; diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 6c26927132f..5454286362b 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -24,8 +24,8 @@ use graph::{self, EntryId, Graph, InvalidationResult, NodeContext}; use log::info; use parking_lot::Mutex; use process_execution::{ - self, BoundedCommandRunner, CommandRunner, NamedCaches, Platform, ProcessMetadata, - RemoteCacheWarningsBehavior, + self, BoundedCommandRunner, CommandRunner, ImmutableInputs, NamedCaches, Platform, + ProcessMetadata, RemoteCacheWarningsBehavior, }; use protos::gen::build::bazel::remote::execution::v2::ServerCapabilities; use regex::Regex; @@ -167,12 +167,14 @@ impl Core { local_execution_root_dir: &Path, named_caches_dir: &Path, exec_strategy_opts: &ExecutionStrategyOptions, - ) -> Box { + ) -> Result, String> { + let immutable_inputs = ImmutableInputs::new(store.clone(), local_execution_root_dir)?; let local_command_runner = process_execution::local::CommandRunner::new( store.clone(), executor.clone(), local_execution_root_dir.to_path_buf(), NamedCaches::new(named_caches_dir.to_path_buf()), + immutable_inputs, exec_strategy_opts.local_cleanup, ); @@ -194,10 +196,10 @@ impl Core { Box::new(local_command_runner) }; - Box::new(BoundedCommandRunner::new( + Ok(Box::new(BoundedCommandRunner::new( maybe_nailgunnable_local_command_runner, exec_strategy_opts.local_parallelism, - )) + ))) } fn make_command_runner( @@ -231,7 +233,7 @@ impl Core { local_execution_root_dir, named_caches_dir, exec_strategy_opts, - ); + )?; // Possibly either add the remote execution runner or the remote cache runner. // `global_options.py` already validates that both are not set at the same time. diff --git a/src/rust/engine/src/externs/mod.rs b/src/rust/engine/src/externs/mod.rs index 5d930f756fe..7097cc5b997 100644 --- a/src/rust/engine/src/externs/mod.rs +++ b/src/rust/engine/src/externs/mod.rs @@ -132,8 +132,11 @@ pub fn collect_iterable(value: &PyAny) -> Result, String> { } } -/// Read a `FrozenDict[str, str]`. -pub fn getattr_from_str_frozendict(value: &PyAny, field: &str) -> BTreeMap { +/// Read a `FrozenDict[str, T]`. +pub fn getattr_from_str_frozendict<'p, T: FromPyObject<'p>>( + value: &'p PyAny, + field: &str, +) -> BTreeMap { let frozendict = getattr(value, field).unwrap(); let pydict: &PyDict = getattr(frozendict, "_data").unwrap(); pydict diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index 3dda535a8f2..d31082bf8c7 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -23,7 +23,7 @@ use fs::{safe_create_dir_all_ioerror, Permissions, PreparedPathGlobs, RelativePa use futures::future::{self, BoxFuture, FutureExt, TryFutureExt}; use hashing::{Digest, EMPTY_DIGEST}; use indexmap::IndexMap; -use process_execution::{CacheDest, CacheName, ManagedChild, NamedCaches}; +use process_execution::{CacheName, ManagedChild, NamedCaches}; use pyo3::{PyRef, Python}; use stdio::TryCloneAsFile; use store::{SnapshotOps, SubsetParams}; @@ -172,7 +172,7 @@ fn process_request_to_process_result( ) -> BoxFuture<'static, NodeResult> { async move { let process_request = ExecuteProcess::lift(&context.core.store(), args.pop().unwrap()) - .map_err(|e| throw(format!("Error lifting MultiPlatformExecuteProcess: {}", e))) + .map_err(|e| throw(format!("Error lifting Process: {}", e))) .await?; let result = context.get(process_request).await?.0; @@ -337,7 +337,7 @@ fn add_prefix_request_to_digest( let digest = context .core .store() - .add_prefix(digest, prefix) + .add_prefix(digest, &prefix) .await .map_err(|e| throw(format!("{:?}", e)))?; let gil = Python::acquire_gil(); @@ -490,7 +490,7 @@ fn create_digest_to_digest( res } CreateDigestItem::Dir(path) => store - .create_empty_dir(path) + .create_empty_dir(&path) .await .map_err(|e| format!("{:?}", e)), } @@ -564,11 +564,11 @@ fn interactive_process( let restartable: bool = externs::getattr(py_interactive_process, "restartable").unwrap(); let py_input_digest = externs::getattr(py_interactive_process, "input_digest").unwrap(); let input_digest: Digest = lift_directory_digest(py_input_digest)?; - let env = externs::getattr_from_str_frozendict(py_interactive_process, "env"); + let env: BTreeMap = externs::getattr_from_str_frozendict(py_interactive_process, "env"); - let append_only_caches = externs::getattr_from_str_frozendict(py_interactive_process, "append_only_caches") + let append_only_caches = externs::getattr_from_str_frozendict::<&str>(py_interactive_process, "append_only_caches") .into_iter() - .map(|(name, dest)| Ok((CacheName::new(name).unwrap(), CacheDest::new(dest).unwrap()))) + .map(|(name, dest)| Ok((CacheName::new(name)?, RelativePath::new(dest)?))) .collect::, String>>()?; if !append_only_caches.is_empty() && run_in_workspace { return Err("Local interactive process cannot use append-only caches when run in workspace.".to_owned()); @@ -608,39 +608,42 @@ fn interactive_process( .await?; } + // TODO: `immutable_input_digests` are not supported for InteractiveProcess, but they would be + // materialized here. + // see https://github.com/pantsbuild/pants/issues/13852 if !append_only_caches.is_empty() { let named_caches = NamedCaches::new(context.core.named_caches_dir.clone()); let named_cache_symlinks = named_caches .local_paths(&append_only_caches) .collect::>(); - let destination = match maybe_tempdir { + let workdir = match maybe_tempdir { Some(ref dir) => dir.path().to_path_buf(), None => unreachable!(), }; for named_cache_symlink in named_cache_symlinks { - safe_create_dir_all_ioerror(&named_cache_symlink.src).map_err(|err| { + safe_create_dir_all_ioerror(&named_cache_symlink.dst).map_err(|err| { format!( "Error making {} for local execution: {:?}", - named_cache_symlink.src.display(), + named_cache_symlink.dst.display(), err ) })?; - let dst = destination.join(&named_cache_symlink.dst); - if let Some(dir) = dst.parent() { + let src = workdir.join(&named_cache_symlink.src); + if let Some(dir) = src.parent() { safe_create_dir_all_ioerror(dir).map_err(|err| { format!( "Error making {} for local execution: {:?}", dir.display(), err ) })?; } - symlink(&named_cache_symlink.src, &dst).map_err(|err| { + symlink(&named_cache_symlink.dst, &src).map_err(|err| { format!( "Error linking {} -> {} for local execution: {:?}", - named_cache_symlink.src.display(), - dst.display(), + src.display(), + named_cache_symlink.dst.display(), err ) })?; diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 7e3da389c4a..d278d7cf624 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -1,7 +1,7 @@ // Copyright 2018 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use std::convert::{TryFrom, TryInto}; use std::fmt; use std::fmt::Display; @@ -31,8 +31,7 @@ use fs::{ Vfs, }; use process_execution::{ - self, CacheDest, CacheName, InputDigests, Platform, Process, ProcessCacheScope, - ProcessResultSource, + self, CacheName, InputDigests, Platform, Process, ProcessCacheScope, ProcessResultSource, }; use crate::externs::engine_aware::{EngineAwareParameter, EngineAwareReturnType}; @@ -291,10 +290,23 @@ impl ExecuteProcess { let value = (**value).as_ref(py); let input_files = lift_directory_digest(externs::getattr(value, "input_digest").unwrap()) .map_err(|err| format!("Error parsing input_digest {}", err))?; - let use_nailgun = lift_directory_digest(externs::getattr(value, "use_nailgun").unwrap()) - .map_err(|err| format!("Error parsing use_nailgun {}", err))?; - - Ok(InputDigests::new(store, input_files, use_nailgun)) + let immutable_inputs = + externs::getattr_from_str_frozendict::<&PyAny>(value, "immutable_input_digests") + .into_iter() + .map(|(path, digest)| Ok((RelativePath::new(path)?, lift_directory_digest(digest)?))) + .collect::, String>>()?; + let use_nailgun = externs::getattr::>(value, "use_nailgun") + .unwrap() + .into_iter() + .map(RelativePath::new) + .collect::, _>>()?; + + Ok(InputDigests::new( + store, + input_files, + immutable_inputs, + use_nailgun, + )) }); input_digests_fut? @@ -333,10 +345,11 @@ impl ExecuteProcess { let py_level = externs::getattr(value, "level").unwrap(); let level = externs::val_to_log_level(py_level)?; - let append_only_caches = externs::getattr_from_str_frozendict(value, "append_only_caches") - .into_iter() - .map(|(name, dest)| Ok((CacheName::new(name)?, CacheDest::new(dest)?))) - .collect::>()?; + let append_only_caches = + externs::getattr_from_str_frozendict::<&str>(value, "append_only_caches") + .into_iter() + .map(|(name, dest)| Ok((CacheName::new(name)?, RelativePath::new(dest)?))) + .collect::>()?; let jdk_home = externs::getattr_as_optional_string(value, "jdk_home").map(PathBuf::from);