Skip to content

Commit

Permalink
resolvelib: Option to skip packages with empty PyPI links page.
Browse files Browse the repository at this point in the history
A notable case of this is pkg_resources==0.0.0 which is installed by
Debian's pip distribution.

Currently resolvelib does not support skipping a dependency when presented
with an empty candidates list for it. So, the only viable way to achieve
the desired effect is to inject a fake candidate to make resolvelib happy,
and add appropriate handling code for it on the pip-audit side.

Exposed to cli via the --skip-empty flag.
  • Loading branch information
m000 committed Jan 8, 2022
1 parent d3429dc commit 0ffe38e
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 21 deletions.
9 changes: 8 additions & 1 deletion pip_audit/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,11 @@ def audit() -> None:
action="store_true",
help="show only results for dependencies in the local environment",
)
parser.add_argument(
"--skip-empty",
action="store_true",
help="skip packages with an empty PyPI links page",
)
dep_source_args.add_argument(
"-r",
"--requirement",
Expand Down Expand Up @@ -255,7 +260,9 @@ def audit() -> None:
if args.requirements is not None:
req_files: List[Path] = [Path(req.name) for req in args.requirements]
source = RequirementSource(
req_files, ResolveLibResolver(args.timeout, args.cache_dir, state), state
req_files,
ResolveLibResolver(args.timeout, args.cache_dir, state, args.skip_empty),
state,
)
else:
source = PipSource(local=args.local, paths=args.paths)
Expand Down
90 changes: 74 additions & 16 deletions pip_audit/_dependency_source/resolvelib/pypi_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import itertools
from datetime import date
from email.message import EmailMessage
from email.parser import BytesParser
from io import BytesIO
Expand All @@ -24,6 +25,7 @@
from packaging.utils import canonicalize_name, parse_sdist_filename, parse_wheel_filename
from packaging.version import Version
from resolvelib.providers import AbstractProvider
from resolvelib.resolvers import ResolutionImpossible

from pip_audit._cache import caching_session
from pip_audit._state import AuditState
Expand All @@ -50,6 +52,7 @@ def __init__(
url: str,
extras: Set[str],
is_wheel: bool,
is_fake: bool,
session: CacheControl,
timeout: Optional[int] = None,
state: AuditState = AuditState(),
Expand All @@ -64,6 +67,7 @@ def __init__(
self.url = url
self.extras = extras
self.is_wheel = is_wheel
self.is_fake = is_fake
self._session = session
self._timeout = timeout
self._state = state
Expand All @@ -75,9 +79,8 @@ def __repr__(self): # pragma: no cover
"""
A string representation for `Candidate`.
"""
if not self.extras:
return f"<{self.name}=={self.version} wheel={self.is_wheel}>"
return f"<{self.name}[{','.join(self.extras)}]=={self.version} wheel={self.is_wheel}>"
extras = f"[{','.join(self.extras)}]" if self.extras else ""
return f"<{self.name}{extras}=={self.version} wheel={self.is_wheel} fake={self.is_fake}>"

@property
def metadata(self) -> EmailMessage:
Expand All @@ -88,7 +91,9 @@ def metadata(self) -> EmailMessage:
if self._metadata is None:
self._state.update_state(f"Fetching metadata for {self.name} ({self.version})")

if self.is_wheel:
if self.is_fake:
self._metadata = self._get_metadata_for_fake()
elif self.is_wheel:
self._metadata = self._get_metadata_for_wheel()
else:
self._metadata = self._get_metadata_for_sdist()
Expand Down Expand Up @@ -119,6 +124,15 @@ def dependencies(self) -> List[Requirement]:
self._dependencies = list(self._get_dependencies())
return self._dependencies

def _get_metadata_for_fake(self):
"""
Create fake/empty metadata if the candidate is marked as fake.
This is used in cases where we know that the candidate metadata cannot
be retrieved, but we ant to avoid a cascading resolution failure in
resolvelib.
"""
return EmailMessage()

def _get_metadata_for_wheel(self):
"""
Extracts the metadata for this candidate, if it's a wheel.
Expand Down Expand Up @@ -173,7 +187,7 @@ def _get_metadata_for_sdist(self):


def get_project_from_pypi(
session, project, extras, timeout: Optional[int], state: AuditState
session, project, extras, timeout: Optional[int], state: AuditState, skip_empty: bool
) -> Iterator[Candidate]:
"""Return candidates created from the project name and extras."""
url = "https://pypi.org/simple/{}/".format(project)
Expand All @@ -183,7 +197,9 @@ def get_project_from_pypi(
response.raise_for_status()
data = response.content
doc = html5lib.parse(data, namespaceHTMLElements=False)
no_project_links = True
for i in doc.findall(".//a"):
no_project_links = False
url = i.attrib["href"]
py_req = i.attrib.get("data-requires-python")
# Skip items that need a different Python version
Expand Down Expand Up @@ -215,13 +231,39 @@ def get_project_from_pypi(
url=url,
extras=extras,
is_wheel=is_wheel,
is_fake=False,
timeout=timeout,
state=state,
session=session,
)
except Exception:
continue

if no_project_links and skip_empty:
# Links page found but contains no links. Yield a fake candidate
# to prevent a cascading resolution failure.
# We have no generic way to create a version number that matches the
# requirement specifier. Instead, we create a PEP440 compliant version,
# and add special handling of fakes where needed.
version = Version(f"{date.today():%Y.%m}+pip.audit.fake")
filename = f"{project}-{version}.tar.gz"
yield Candidate(
project,
Path(filename),
version,
url="http://www.pypi.org",
extras=extras,
is_wheel=False,
is_fake=True,
timeout=timeout,
state=state,
session=session,
)
elif no_project_links:
raise PyPINoProjectLinksError(
f"Resolution of {project} failed because no links were found in page {url}."
)


class PyPIProvider(AbstractProvider):
"""
Expand All @@ -234,6 +276,7 @@ def __init__(
timeout: Optional[int] = None,
cache_dir: Optional[Path] = None,
state: AuditState = AuditState(),
skip_empty: bool = False,
):
"""
Create a new `PyPIProvider`.
Expand All @@ -244,10 +287,13 @@ def __init__(
`cache_dir` is an optional argument to override the default HTTP caching directory.
`state` is an `AuditState` to use for state callbacks.
`skip_empty` skips packages with an empty PyPI links page.
"""
self.timeout = timeout
self.session = caching_session(cache_dir, use_pip=True)
self._state = state
self.skip_empty = skip_empty

def identify(self, requirement_or_candidate):
"""
Expand Down Expand Up @@ -276,18 +322,19 @@ def find_matches(self, identifier, requirements, incompatibilities):
for r in requirements:
extras |= r.extras

# Need to pass the extras to the search, so they
# are added to the candidate at creation - we
# treat candidates as immutable once created.
# Need to pass the extras to the search, so they are added to the
# candidate at creation. Candidates are treated as immutable once created.
candidates_filter = lambda c: ( # noqa: E731 # allow use of lambda
c.version not in bad_versions
and (c.is_fake or all(c.version in r.specifier for r in requirements))
)
candidates = sorted(
[
candidate
for candidate in get_project_from_pypi(
self.session, identifier, extras, self.timeout, self._state
)
if candidate.version not in bad_versions
and all(candidate.version in r.specifier for r in requirements)
],
filter(
candidates_filter,
get_project_from_pypi(
self.session, identifier, extras, self.timeout, self._state, self.skip_empty
),
),
key=attrgetter("version", "is_wheel"),
reverse=True,
)
Expand All @@ -307,6 +354,8 @@ def is_satisfied_by(self, requirement, candidate):
"""
See `resolvelib.providers.AbstractProvider.is_satisfied_by`.
"""
if candidate.is_fake:
return True
if canonicalize_name(requirement.name) != candidate.name:
return False
return candidate.version in requirement.specifier
Expand All @@ -324,3 +373,12 @@ class PyPINotFoundError(Exception):
"""

pass


class PyPINoProjectLinksError(ResolutionImpossible):
"""
An error to signify that resolution of a dependency was impossible because
no links were found in the PyPI links page of the project.
"""

pass
13 changes: 11 additions & 2 deletions pip_audit/_dependency_source/resolvelib/resolvelib.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(
timeout: Optional[int] = None,
cache_dir: Optional[Path] = None,
state: AuditState = AuditState(),
skip_empty: bool = False,
) -> None:
"""
Create a new `ResolveLibResolver`.
Expand All @@ -39,10 +40,13 @@ def __init__(
and caching, respectively.
`state` is an `AuditState` to use for state callbacks.
`skip_empty` skips packages with an empty PyPI links page.
"""
self.provider = PyPIProvider(timeout, cache_dir, state)
self.provider = PyPIProvider(timeout, cache_dir, state, skip_empty)
self.reporter = BaseReporter()
self.resolver: Resolver = Resolver(self.provider, self.reporter)
self.skip_empty = skip_empty

def resolve(self, req: Requirement) -> List[Dependency]:
"""
Expand All @@ -58,7 +62,12 @@ def resolve(self, req: Requirement) -> List[Dependency]:
except HTTPError as e:
raise ResolveLibResolverError("failed to resolve dependencies") from e
for name, candidate in result.mapping.items():
deps.append(ResolvedDependency(name, candidate.version))
if candidate.is_fake:
deps.append(
SkippedDependency(name, skip_reason=f"Fake version: {candidate.version}.")
)
else:
deps.append(ResolvedDependency(name, candidate.version))
return deps


Expand Down
104 changes: 102 additions & 2 deletions test/dependency_source/test_resolvelib.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from email.message import EmailMessage
from textwrap import dedent
from typing import List

import pytest
Expand All @@ -10,6 +11,7 @@

from pip_audit._dependency_source import resolvelib
from pip_audit._dependency_source.resolvelib import pypi_provider
from pip_audit._dependency_source.resolvelib.pypi_provider import PyPINoProjectLinksError
from pip_audit._service.interface import ResolvedDependency, SkippedDependency


Expand All @@ -25,8 +27,16 @@ def raise_for_status(self):
return Doc(data)


def get_metadata_mock():
return EmailMessage()
def get_metadata_mock(requires=None):
metadata = EmailMessage()
requires = requires or []
for req in requires:
metadata["Requires-Dist"] = req
return metadata


def _dedent(s):
return dedent(s.lstrip("\n")).rstrip("\n")


def check_deps(resolved_deps: List[ResolvedDependency], expected_deps: List[ResolvedDependency]):
Expand Down Expand Up @@ -299,3 +309,93 @@ def __init__(self):
]
assert req in resolved_deps
assert resolved_deps[req] == expected_deps


@pytest.mark.parametrize("skip_empty", [True, False], ids=lambda v: "skip={}".format(v))
def test_resolvelib_emptylinks(monkeypatch, skip_empty):
data = _dedent(
"""
<!DOCTYPE html>
<html>
<head>
<meta name="pypi:repository-version" content="1.0">
<title>Links for pkg-resources</title>
</head>
<body>
<h1>Links for pkg-resources</h1>
</body>
</html>
"""
)
resolver = resolvelib.ResolveLibResolver(skip_empty=skip_empty)
monkeypatch.setattr(
resolver.provider.session, "get", lambda _url, **kwargs: get_package_mock(data)
)
req = Requirement("pkg-resources==0.0.0")

if not skip_empty:
with pytest.raises(PyPINoProjectLinksError) as e:
_ = dict(resolver.resolve_all(iter([req])))
assert e.match(r"no links were found.*pkg-resources")
else:
resolved_deps = list(resolver.resolve_all(iter([req])))
assert len(resolved_deps) == 1
rreq, rdep = resolved_deps.pop()
assert rreq == req
assert len(rdep) == 1
assert rdep[0].name == rreq.name
assert isinstance(rdep[0], SkippedDependency)


@pytest.mark.parametrize("skip_empty", [True, False], ids=lambda v: "skip={}".format(v))
def test_resolvelib_emptylinks_indirect(monkeypatch, skip_empty):
data = {
"https://pypi.org/simple/pkg-resources/": _dedent(
"""
<!DOCTYPE html>
<html>
<head>
<meta name="pypi:repository-version" content="1.0">
<title>Links for pkg-resources</title>
</head>
<body>
<h1>Links for pkg-resources</h1>
</body>
</html>
"""
),
"https://pypi.org/simple/flask/": _dedent(
"""
<a href="https://example.com/Flask-2.0.1.tar.gz">Flask-2.0.1.tar.gz</a><br/>
"""
),
}
requires = {
"pkg-resources": None,
"flask": ["pkg-resources==0.0.0"],
}
resolver = resolvelib.ResolveLibResolver(skip_empty=skip_empty)
monkeypatch.setattr(
resolver.provider.session, "get", lambda url, **kwargs: get_package_mock(data[url])
)
monkeypatch.setattr(
pypi_provider.Candidate,
"_get_metadata_for_sdist",
lambda candidate: get_metadata_mock(requires[candidate.name]),
)
req = Requirement("flask==2.0.1")

if not skip_empty:
with pytest.raises(PyPINoProjectLinksError) as e:
_ = dict(resolver.resolve_all(iter([req])))
assert e.match(r"no links were found.*pkg-resources")
else:
resolved_deps = list(resolver.resolve_all(iter([req])))
assert len(resolved_deps) == 1
rreq, rdep = resolved_deps.pop()
assert rreq == req
assert len(rdep) == 2
assert rdep[0].name == rreq.name
assert isinstance(rdep[0], ResolvedDependency)
assert rdep[1].name == "pkg-resources"
assert isinstance(rdep[1], SkippedDependency)

0 comments on commit 0ffe38e

Please sign in to comment.