Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more context for Job errors. #2479

Merged
merged 3 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pex/build_system/pep_517.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,13 @@ def _invoke_build_hook(
stderr=subprocess.PIPE,
)
return SpawnedJob.file(
Job(command=args, process=process),
Job(
command=args,
process=process,
context="PEP-517:{hook_method} at {project_directory}".format(
hook_method=hook_method, project_directory=project_directory
),
),
output_file=fp.name,
result_func=lambda file_content: json.loads(file_content.decode("utf-8")),
)
Expand Down
34 changes: 28 additions & 6 deletions pex/dist_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,22 +651,40 @@ def requires_dists(location):
)


# Frozen exception types don't work under 3.11+ where the `__traceback__` attribute can be set
# after construction in some cases.
@attr.s
class RequirementParseError(Exception):
"""Indicates and invalid requirement string.

See PEP-508: https://www.python.org/dev/peps/pep-0508
"""

error = attr.ib() # type: Any
source = attr.ib(default=None) # type: Optional[str]

def __str__(self):
# type: () -> str
if not self.source:
return str(self.error)
return "Failed to parse a requirement of {source}: {err}".format(
err=self.error, source=self.source
)


@attr.s(frozen=True)
class Constraint(object):
@classmethod
def parse(cls, constraint):
# type: (Text) -> Constraint
def parse(
cls,
constraint, # type: Text
source=None, # type: Optional[str]
):
# type: (...) -> Constraint
try:
return cls.from_packaging_requirement(PackagingRequirement(constraint))
except InvalidRequirement as e:
raise RequirementParseError(str(e))
raise RequirementParseError(str(e), source=source)

@classmethod
def from_packaging_requirement(cls, requirement):
Expand Down Expand Up @@ -777,12 +795,16 @@ def as_requirement(self):
@attr.s(frozen=True)
class Requirement(Constraint):
@classmethod
def parse(cls, requirement):
# type: (Text) -> Requirement
def parse(
cls,
requirement, # type: Text
source=None, # type: Optional[str]
):
# type: (...) -> Requirement
try:
return cls.from_packaging_requirement(PackagingRequirement(requirement))
except InvalidRequirement as e:
raise RequirementParseError(str(e))
raise RequirementParseError(str(e), source=source)

@classmethod
def from_packaging_requirement(cls, requirement):
Expand Down
40 changes: 0 additions & 40 deletions pex/interpreter.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
Mapping,
MutableMapping,
Optional,
Sequence,
Text,
Tuple,
Union,
Expand Down Expand Up @@ -1566,45 +1565,6 @@ def __repr__(self):
)


def spawn_python_job(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was only used in a test - now inlined.

args, # type: Iterable[str]
env=None, # type: Optional[Mapping[str, str]]
interpreter=None, # type: Optional[PythonInterpreter]
expose=None, # type: Optional[Sequence[str]]
pythonpath=None, # type: Optional[Iterable[str]]
**subprocess_kwargs # type: Any
):
# type: (...) -> Job
"""Spawns a python job.

:param args: The arguments to pass to the python interpreter.
:param env: The environment to spawn the python interpreter process in. Defaults to the ambient
environment.
:param interpreter: The interpreter to use to spawn the python job. Defaults to the current
interpreter.
:param expose: The names of any vendored distributions to expose to the spawned python process.
These will be appended to `pythonpath` if passed.
:param pythonpath: The PYTHONPATH to expose to the spawned python process. These will be
pre-pended to the `expose` path if passed.
:param subprocess_kwargs: Any additional :class:`subprocess.Popen` kwargs to pass through.
:returns: A job handle to the spawned python process.
"""
pythonpath = list(pythonpath or ())
subprocess_env = dict(env or os.environ)
if expose:
# In order to expose vendored distributions with their un-vendored import paths in-tact, we
# need to set `__PEX_UNVENDORED__`. See: vendor.__main__.ImportRewriter._modify_import.
subprocess_env["__PEX_UNVENDORED__"] = ",".join(expose)

pythonpath.extend(third_party.expose(expose, interpreter=interpreter))

interpreter = interpreter or PythonInterpreter.get()
cmd, process = interpreter.open_process(
args=args, pythonpath=pythonpath, env=subprocess_env, **subprocess_kwargs
)
return Job(command=cmd, process=process)


# See the "Test results from various systems" table here:
# https://www.in-ulm.de/~mascheck/various/shebang/#length
MAX_SHEBANG_LENGTH = 512 if sys.platform == "darwin" else 128
Expand Down
22 changes: 20 additions & 2 deletions pex/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,33 @@ def __init__(
exitcode, # type: int
stderr, # type: Optional[Text]
message, # type: str
context=None, # type: Optional[str]
):
# type: (...) -> None
super(Job.Error, self).__init__(message)
super(Job.Error, self).__init__(
"{ctx}: {msg}".format(ctx=context, msg=message) if context else message
)
self.pid = pid
self.command = command
self.exitcode = exitcode
self.stderr = stderr
self._context = context

def contextualized_stderr(self):
# type: () -> Iterator[Text]
if self.stderr:
for line in self.stderr.splitlines():
if not self._context:
yield line
else:
yield "{ctx}: {line}".format(ctx=self._context, line=line)

def __init__(
self,
command, # type: Iterable[str]
process, # type: subprocess.Popen
finalizer=None, # type: Optional[Callable[[int], None]]
context=None, # type: Optional[str]
):
# type: (...) -> None
"""
Expand All @@ -84,10 +98,13 @@ def __init__(
:param finalizer: An optional cleanup function to call exactly once with the process return
code when the underlying process terminates in the course of calling this
job's public methods.
:param context: An optional context labeling the job that will be used to decorate error
information.
"""
self._command = tuple(command)
self._process = process
self._finalizer = finalizer
self._context = context

def wait(self):
# type: () -> None
Expand Down Expand Up @@ -153,6 +170,7 @@ def create_error(
exitcode=self._process.returncode,
stderr=err,
message=msg,
context=self._context,
)

def _finalize_job(self):
Expand Down Expand Up @@ -407,7 +425,7 @@ def job_error_message(
pid=job_error.pid,
command=" ".join(job_error.command),
exitcode=job_error.exitcode,
stderr=job_error.stderr,
stderr="\n".join(job_error.contextualized_stderr()),
)

@abstractmethod
Expand Down
2 changes: 1 addition & 1 deletion pex/pip/dependencies/requires.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def requires(self, *args, **kwargs):
modified_requires = []
orig = orig_requires(self, *args, **kwargs)
for req in orig:
requirement = PexRequirement.parse(str(req))
requirement = PexRequirement.parse(str(req), source=repr(self))
excluded_by = dependency_configuration.excluded_by(requirement)
if excluded_by:
logger.debug(
Expand Down
28 changes: 22 additions & 6 deletions pex/pip/log_analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import subprocess
from abc import abstractmethod

from pex.common import safe_delete
from pex.jobs import Job
from pex.typing import TYPE_CHECKING, Generic

Expand Down Expand Up @@ -81,7 +82,7 @@ def __init__(
self._log = log
self._log_analyzers = list(log_analyzers)
self._preserve_log = preserve_log
super(LogScrapeJob, self).__init__(command, process, finalizer=finalizer)
super(LogScrapeJob, self).__init__(command, process, finalizer=finalizer, context="pip")

def _check_returncode(self, stderr=None):
# type: (Optional[bytes]) -> None
Expand All @@ -90,6 +91,7 @@ def _check_returncode(self, stderr=None):
for analyzer in self._log_analyzers
if analyzer.should_collect(self._process.returncode)
]
analyzed_stderr = b"" # type: bytes
if activated_analyzers:
collected = []
# A process may fail so early that there is no log file to analyze.
Expand All @@ -100,7 +102,7 @@ def _check_returncode(self, stderr=None):
for line in fp:
if not activated_analyzers:
break
for index, analyzer in enumerate(activated_analyzers):
for index, analyzer in tuple(enumerate(activated_analyzers)):
result = analyzer.analyze(line)
if isinstance(result.data, ErrorMessage):
collected.append(result.data)
Expand All @@ -110,7 +112,21 @@ def _check_returncode(self, stderr=None):
break
for analyzer in activated_analyzers:
analyzer.analysis_completed()
if not self._preserve_log:
os.unlink(self._log)
stderr = (stderr or b"") + "".join(collected).encode("utf-8")
super(LogScrapeJob, self)._check_returncode(stderr=stderr)
if collected:
analyzed_stderr = "".join(collected).encode("utf-8")

# Fall back to displaying the Pip logs in full if we have no stderr output or analysis. It's
# likely overwhelming, but better than silence and useful for debugging.
if (
not stderr
and not analyzed_stderr
and self._process.returncode != 0
and os.path.isfile(self._log)
):
with open(self._log, "rb") as fp:
analyzed_stderr = fp.read()

if not self._preserve_log:
safe_delete(self._log)

super(LogScrapeJob, self)._check_returncode(stderr=(stderr or b"") + analyzed_stderr)
Loading