Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support volatile processes. #10768

Merged
merged 3 commits into from
Sep 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions src/python/pants/backend/python/goals/pytest_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from dataclasses import dataclass
from pathlib import PurePath
from typing import Optional, Tuple
from uuid import UUID

from pants.backend.python.goals.coverage_py import (
CoverageConfig,
Expand Down Expand Up @@ -43,7 +42,6 @@
from pants.core.util_rules.source_files import SourceFiles, SourceFilesRequest
from pants.engine.addresses import Address, Addresses, AddressInput
from pants.engine.fs import AddPrefix, Digest, DigestSubset, MergeDigests, PathGlobs, Snapshot
from pants.engine.internals.uuid import UUIDRequest
from pants.engine.process import FallibleProcessResult, InteractiveProcess, Process
from pants.engine.rules import Get, MultiGet, collect_rules, rule
from pants.engine.target import (
Expand Down Expand Up @@ -253,17 +251,6 @@ async def setup_pytest_for_target(

extra_env.update(test_extra_env.env)

if test_subsystem.force and not request.is_debug:
# This is a slightly hacky way to force the process to run: since the env var
# value is unique, this input combination will never have been seen before,
# and therefore never cached. The two downsides are:
# 1. This leaks into the test's environment, albeit with a funky var name that is
# unlikely to cause problems in practice.
# 2. This run will be cached even though it can never be re-used.
# TODO: A more principled way of forcing rules to run?
uuid = await Get(UUID, UUIDRequest())
extra_env["__PANTS_FORCE_TEST_RUN__"] = str(uuid)

process = await Get(
Process,
PexProcess(
Expand All @@ -276,6 +263,7 @@ async def setup_pytest_for_target(
execution_slot_variable=pytest.options.execution_slot_var,
description=f"Run Pytest for {request.field_set.address}",
level=LogLevel.DEBUG,
uncacheable=test_subsystem.force and not request.is_debug,
),
)
return TestSetup(process, results_file_name=results_file_name)
Expand Down
8 changes: 6 additions & 2 deletions src/python/pants/backend/python/util_rules/pex.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
PathGlobs,
)
from pants.engine.platform import Platform, PlatformConstraint
from pants.engine.process import MultiPlatformProcess, Process, ProcessResult
from pants.engine.process import MultiPlatformProcess, Process, ProcessResult, UncacheableProcess
from pants.engine.rules import Get, collect_rules, rule
from pants.python.python_repos import PythonRepos
from pants.python.python_setup import PythonSetup
Expand Down Expand Up @@ -564,6 +564,7 @@ class PexProcess:
output_directories: Optional[Tuple[str, ...]]
timeout_seconds: Optional[int]
execution_slot_variable: Optional[str]
uncacheable: bool

def __init__(
self,
Expand All @@ -578,6 +579,7 @@ def __init__(
output_directories: Optional[Iterable[str]] = None,
timeout_seconds: Optional[int] = None,
execution_slot_variable: Optional[str] = None,
uncacheable: bool = False,
) -> None:
self.pex = pex
self.argv = tuple(argv)
Expand All @@ -589,6 +591,7 @@ def __init__(
self.output_directories = tuple(output_directories) if output_directories else None
self.timeout_seconds = timeout_seconds
self.execution_slot_variable = execution_slot_variable
self.uncacheable = uncacheable


@rule
Expand All @@ -601,7 +604,7 @@ async def setup_pex_process(request: PexProcess, pex_environment: PexEnvironment
always_use_shebang=request.pex.internal_only,
)
env = {**pex_environment.environment_dict, **(request.extra_env or {})}
return Process(
process = Process(
argv,
description=request.description,
level=request.level,
Expand All @@ -612,6 +615,7 @@ async def setup_pex_process(request: PexProcess, pex_environment: PexEnvironment
timeout_seconds=request.timeout_seconds,
execution_slot_variable=request.execution_slot_variable,
)
return await Get(Process, UncacheableProcess(process)) if request.uncacheable else process


def rules():
Expand Down
50 changes: 41 additions & 9 deletions src/python/pants/engine/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
from dataclasses import dataclass
from textwrap import dedent
from typing import TYPE_CHECKING, Dict, Iterable, Mapping, Optional, Tuple, Union
from uuid import UUID

from pants.base.exception_sink import ExceptionSink
from pants.engine.engine_aware import EngineAwareReturnType
from pants.engine.fs import EMPTY_DIGEST, CreateDigest, Digest, FileContent
from pants.engine.internals.uuid import UUIDRequest
from pants.engine.platform import Platform, PlatformConstraint
from pants.engine.rules import Get, collect_rules, rule, side_effecting
from pants.util.frozendict import FrozenDict
Expand Down Expand Up @@ -344,13 +346,37 @@ def first_path(self) -> Optional[str]:
return next(iter(self.paths), None)


@dataclass(frozen=True)
class UncacheableProcess:
"""Ensures the wrapped Process will always be run and its results never re-used."""

process: Process


@rule
async def make_process_uncacheable(uncacheable_process: UncacheableProcess) -> Process:
uuid = await Get(UUID, UUIDRequest())

process = uncacheable_process.process
env = dict(process.env)

# This is a slightly hacky way to force the process to run: since the env var
# value is unique, this input combination will never have been seen before,
# and therefore never cached. The two downsides are:
# 1. This leaks into the process' environment, albeit with a funky var name that is
# unlikely to cause problems in practice.
# 2. This run will be cached even though it can never be re-used.
# TODO: A more principled way of forcing rules to run?
env["__PANTS_FORCE_PROCESS_RUN__"] = str(uuid)

return dataclasses.replace(process, env=FrozenDict(env))


@rule(desc="Find binary path", level=LogLevel.DEBUG)
async def find_binary(request: BinaryPathRequest) -> BinaryPaths:
# TODO(John Sirois): Replace this script with a statically linked native binary so we don't
# depend on either /bin/bash being available on the Process host.
# TODO(#10507): Running the script directly from a shebang sometimes results in a "Text file
# busy" error.
#

# Note: the backslash after the """ marker ensures that the shebang is at the start of the
# script file. Many OSs will not see the shebang if there is intervening whitespace.
script_path = "./script.sh"
Expand All @@ -376,12 +402,18 @@ async def find_binary(request: BinaryPathRequest) -> BinaryPaths:
search_path = create_path_env_var(request.search_path)
result = await Get(
FallibleProcessResult,
Process(
description=f"Searching for `{request.binary_name}` on PATH={search_path}",
level=LogLevel.DEBUG,
input_digest=script_digest,
argv=[script_path, request.binary_name],
env={"PATH": search_path},
# We use a volatile process to force re-run since any binary found on the host system today
# could be gone tomorrow. Ideally we'd only do this for local processes since all known
# remoting configurations include a static container image as part of their cache key which
# automatically avoids this problem.
UncacheableProcess(
Process(
description=f"Searching for `{request.binary_name}` on PATH={search_path}",
level=LogLevel.DEBUG,
input_digest=script_digest,
argv=[script_path, request.binary_name],
env={"PATH": search_path},
)
),
)
if result.exit_code == 0:
Expand Down