diff --git a/pyproject.toml b/pyproject.toml index 2423a38..102eca0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,5 +1,5 @@ [build-system] -requires = ['setuptools>=36.2.2', 'wheel>=0.28.0'] +requires = ['setuptools>=62', 'wheel'] [tool.black] line-length = 79 diff --git a/src/resolvelib/__init__.py b/src/resolvelib/__init__.py index ae15dc5..fd9f249 100644 --- a/src/resolvelib/__init__.py +++ b/src/resolvelib/__init__.py @@ -14,9 +14,10 @@ __version__ = "1.0.2.dev0" -from .providers import AbstractProvider, AbstractResolver +from .providers import AbstractProvider from .reporters import BaseReporter from .resolvers import ( + AbstractResolver, InconsistentCandidate, RequirementsConflicted, ResolutionError, diff --git a/src/resolvelib/__init__.pyi b/src/resolvelib/__init__.pyi deleted file mode 100644 index d64c52c..0000000 --- a/src/resolvelib/__init__.pyi +++ /dev/null @@ -1,11 +0,0 @@ -__version__: str - -from .providers import AbstractProvider as AbstractProvider -from .providers import AbstractResolver as AbstractResolver -from .reporters import BaseReporter as BaseReporter -from .resolvers import InconsistentCandidate as InconsistentCandidate -from .resolvers import RequirementsConflicted as RequirementsConflicted -from .resolvers import ResolutionError as ResolutionError -from .resolvers import ResolutionImpossible as ResolutionImpossible -from .resolvers import ResolutionTooDeep as ResolutionTooDeep -from .resolvers import Resolver as Resolver diff --git a/src/resolvelib/providers.py b/src/resolvelib/providers.py index 66a19f9..f7e240f 100644 --- a/src/resolvelib/providers.py +++ b/src/resolvelib/providers.py @@ -1,7 +1,28 @@ -class AbstractProvider(object): +from __future__ import annotations + +from typing import ( + TYPE_CHECKING, + Generic, + Iterable, + Iterator, + Mapping, + Sequence, +) + +from .structs import CT, KT, RT, Matches, RequirementInformation + +if TYPE_CHECKING: + from typing import Any, Protocol + + class Preference(Protocol): + def __lt__(self, __other: Any) -> bool: + ... + + +class AbstractProvider(Generic[RT, CT, KT]): """Delegate class to provide the required interface for the resolver.""" - def identify(self, requirement_or_candidate): + def identify(self, requirement_or_candidate: RT | CT) -> KT: """Given a requirement or candidate, return an identifier for it. This is used to identify, e.g. whether two requirements @@ -12,13 +33,13 @@ def identify(self, requirement_or_candidate): def get_preference( self, - identifier, - resolutions, - candidates, - information, - backtrack_causes, - ): - """Produce a sort key for the given requirement based on preference. + identifier: KT, + resolutions: Mapping[KT, CT], + candidates: Mapping[KT, Iterator[CT]], + information: Mapping[KT, Iterator[RequirementInformation[RT, CT]]], + backtrack_causes: Sequence[RequirementInformation[RT, CT]], + ) -> Preference: + """Produce a sort key for given requirement based on preference. The preference is defined as "I think this requirement should be resolved first". The lower the return value is, the more preferred @@ -62,7 +83,12 @@ def get_preference( """ raise NotImplementedError - def find_matches(self, identifier, requirements, incompatibilities): + def find_matches( + self, + identifier: KT, + requirements: Mapping[KT, Iterator[RT]], + incompatibilities: Mapping[KT, Iterator[CT]], + ) -> Matches: """Find all possible candidates that satisfy the given constraints. :param identifier: An identifier as returned by ``identify()``. All @@ -92,7 +118,7 @@ def find_matches(self, identifier, requirements, incompatibilities): """ raise NotImplementedError - def is_satisfied_by(self, requirement, candidate): + def is_satisfied_by(self, requirement: RT, candidate: CT) -> bool: """Whether the given requirement can be satisfied by a candidate. The candidate is guaranteed to have been generated from the @@ -103,34 +129,10 @@ def is_satisfied_by(self, requirement, candidate): """ raise NotImplementedError - def get_dependencies(self, candidate): + def get_dependencies(self, candidate: CT) -> Iterable[RT]: """Get dependencies of a candidate. This should return a collection of requirements that `candidate` specifies as its dependencies. """ raise NotImplementedError - - -class AbstractResolver(object): - """The thing that performs the actual resolution work.""" - - base_exception = Exception - - def __init__(self, provider, reporter): - self.provider = provider - self.reporter = reporter - - def resolve(self, requirements, **kwargs): - """Take a collection of constraints, spit out the resolution result. - - This returns a representation of the final resolution state, with one - guaranteed attribute ``mapping`` that contains resolved candidates as - values. The keys are their respective identifiers. - - :param requirements: A collection of constraints. - :param kwargs: Additional keyword arguments that subclasses may accept. - - :raises: ``self.base_exception`` or its subclass. - """ - raise NotImplementedError diff --git a/src/resolvelib/providers.pyi b/src/resolvelib/providers.pyi deleted file mode 100644 index ec05419..0000000 --- a/src/resolvelib/providers.pyi +++ /dev/null @@ -1,44 +0,0 @@ -from typing import ( - Any, - Generic, - Iterable, - Iterator, - Mapping, - Protocol, - Sequence, - Union, -) - -from .reporters import BaseReporter -from .resolvers import RequirementInformation -from .structs import CT, KT, RT, Matches - -class Preference(Protocol): - def __lt__(self, __other: Any) -> bool: ... - -class AbstractProvider(Generic[RT, CT, KT]): - def identify(self, requirement_or_candidate: Union[RT, CT]) -> KT: ... - def get_preference( - self, - identifier: KT, - resolutions: Mapping[KT, CT], - candidates: Mapping[KT, Iterator[CT]], - information: Mapping[KT, Iterator[RequirementInformation[RT, CT]]], - backtrack_causes: Sequence[RequirementInformation[RT, CT]], - ) -> Preference: ... - def find_matches( - self, - identifier: KT, - requirements: Mapping[KT, Iterator[RT]], - incompatibilities: Mapping[KT, Iterator[CT]], - ) -> Matches: ... - def is_satisfied_by(self, requirement: RT, candidate: CT) -> bool: ... - def get_dependencies(self, candidate: CT) -> Iterable[RT]: ... - -class AbstractResolver(Generic[RT, CT, KT]): - base_exception = Exception - provider: AbstractProvider[RT, CT, KT] - reporter: BaseReporter - def __init__( - self, provider: AbstractProvider[RT, CT, KT], reporter: BaseReporter - ): ... diff --git a/src/resolvelib/reporters.py b/src/resolvelib/reporters.py index 688b5e1..05c7226 100644 --- a/src/resolvelib/reporters.py +++ b/src/resolvelib/reporters.py @@ -1,26 +1,41 @@ -class BaseReporter(object): +from __future__ import annotations + +from typing import Generic + +from .structs import ( + CT, + KT, + RT, + Collection, + Criterion, + RequirementInformation, + State, +) + + +class BaseReporter(Generic[RT, CT, KT]): """Delegate class to provider progress reporting for the resolver.""" - def starting(self): + def starting(self) -> None: """Called before the resolution actually starts.""" - def starting_round(self, index): + def starting_round(self, index: int) -> None: """Called before each round of resolution starts. The index is zero-based. """ - def ending_round(self, index, state): + def ending_round(self, index: int, state: State[RT, CT, KT]) -> None: """Called before each round of resolution ends. This is NOT called if the resolution ends at this round. Use `ending` if you want to report finalization. The index is zero-based. """ - def ending(self, state): + def ending(self, state: State[RT, CT, KT]) -> None: """Called before the resolution ends successfully.""" - def adding_requirement(self, requirement, parent): + def adding_requirement(self, requirement: RT, parent: CT | None) -> None: """Called when adding a new requirement into the resolve criteria. :param requirement: The additional requirement to be applied to filter @@ -30,14 +45,18 @@ def adding_requirement(self, requirement, parent): requirements passed in from ``Resolver.resolve()``. """ - def resolving_conflicts(self, causes): + def resolving_conflicts( + self, causes: Collection[RequirementInformation[RT, CT]] + ) -> None: """Called when starting to attempt requirement conflict resolution. :param causes: The information on the collision that caused the backtracking. """ - def rejecting_candidate(self, criterion, candidate): + def rejecting_candidate( + self, criterion: Criterion[RT, CT], candidate: CT + ) -> None: """Called when rejecting a candidate during backtracking.""" - def pinning(self, candidate): + def pinning(self, candidate: CT) -> None: """Called when adding a candidate to the potential solution.""" diff --git a/src/resolvelib/reporters.pyi b/src/resolvelib/reporters.pyi deleted file mode 100644 index b2ad286..0000000 --- a/src/resolvelib/reporters.pyi +++ /dev/null @@ -1,11 +0,0 @@ -from typing import Any - -class BaseReporter: - def starting(self) -> Any: ... - def starting_round(self, index: int) -> Any: ... - def ending_round(self, index: int, state: Any) -> Any: ... - def ending(self, state: Any) -> Any: ... - def adding_requirement(self, requirement: Any, parent: Any) -> Any: ... - def rejecting_candidate(self, criterion: Any, candidate: Any) -> Any: ... - def resolving_conflicts(self, causes: Any) -> Any: ... - def pinning(self, candidate: Any) -> Any: ... diff --git a/src/resolvelib/resolvers.py b/src/resolvelib/resolvers.py index cf2a97a..58a48dd 100644 --- a/src/resolvelib/resolvers.py +++ b/src/resolvelib/resolvers.py @@ -1,14 +1,44 @@ +from __future__ import annotations + import collections import itertools import operator +from typing import ( + TYPE_CHECKING, + Any, + Collection, + Generic, + Iterable, + Mapping, + NamedTuple, +) -from .providers import AbstractResolver -from .structs import DirectedGraph, IteratorMapping, build_iter_view - -RequirementInformation = collections.namedtuple( - "RequirementInformation", ["requirement", "parent"] +from .providers import AbstractProvider +from .reporters import BaseReporter +from .structs import ( + CT, + KT, + RT, + Criterion, + DirectedGraph, + IterableView, + IteratorMapping, + RequirementInformation, + State, + build_iter_view, ) +if TYPE_CHECKING: + from .providers import Preference + + class Result(NamedTuple, Generic[RT, CT, KT]): + mapping: Mapping[KT, CT] + graph: DirectedGraph[KT | None] + criteria: Mapping[KT, Criterion[RT, CT]] + +else: + Result = collections.namedtuple("Result", ["mapping", "graph", "criteria"]) + class ResolverException(Exception): """A base class for all exceptions raised by this module. @@ -18,110 +48,71 @@ class ResolverException(Exception): """ -class RequirementsConflicted(ResolverException): - def __init__(self, criterion): - super(RequirementsConflicted, self).__init__(criterion) +class RequirementsConflicted(ResolverException, Generic[RT, CT]): + def __init__(self, criterion: Criterion[RT, CT]) -> None: + super().__init__(criterion) self.criterion = criterion - def __str__(self): + def __str__(self) -> str: return "Requirements conflict: {}".format( ", ".join(repr(r) for r in self.criterion.iter_requirement()), ) -class InconsistentCandidate(ResolverException): - def __init__(self, candidate, criterion): - super(InconsistentCandidate, self).__init__(candidate, criterion) +class InconsistentCandidate(ResolverException, Generic[RT, CT]): + def __init__(self, candidate: CT, criterion: Criterion[RT, CT]): + super().__init__(candidate, criterion) self.candidate = candidate self.criterion = criterion - def __str__(self): + def __str__(self) -> str: return "Provided candidate {!r} does not satisfy {}".format( self.candidate, ", ".join(repr(r) for r in self.criterion.iter_requirement()), ) -class Criterion(object): - """Representation of possible resolution results of a package. - - This holds three attributes: - - * `information` is a collection of `RequirementInformation` pairs. - Each pair is a requirement contributing to this criterion, and the - candidate that provides the requirement. - * `incompatibilities` is a collection of all known not-to-work candidates - to exclude from consideration. - * `candidates` is a collection containing all possible candidates deducted - from the union of contributing requirements and known incompatibilities. - It should never be empty, except when the criterion is an attribute of a - raised `RequirementsConflicted` (in which case it is always empty). - - .. note:: - This class is intended to be externally immutable. **Do not** mutate - any of its attribute containers. - """ - - def __init__(self, candidates, information, incompatibilities): - self.candidates = candidates - self.information = information - self.incompatibilities = incompatibilities - - def __repr__(self): - requirements = ", ".join( - "({!r}, via={!r})".format(req, parent) - for req, parent in self.information - ) - return "Criterion({})".format(requirements) - - def iter_requirement(self): - return (i.requirement for i in self.information) - - def iter_parent(self): - return (i.parent for i in self.information) - - class ResolutionError(ResolverException): pass -class ResolutionImpossible(ResolutionError): - def __init__(self, causes): - super(ResolutionImpossible, self).__init__(causes) +class ResolutionImpossible(ResolutionError, Generic[RT, CT]): + def __init__(self, causes: Collection[RequirementInformation[RT, CT]]): + super().__init__(causes) # causes is a list of RequirementInformation objects self.causes = causes class ResolutionTooDeep(ResolutionError): - def __init__(self, round_count): - super(ResolutionTooDeep, self).__init__(round_count) + def __init__(self, round_count: int) -> None: + super().__init__(round_count) self.round_count = round_count -# Resolution state in a round. -State = collections.namedtuple("State", "mapping criteria backtrack_causes") - - -class Resolution(object): +class Resolution(Generic[RT, CT, KT]): """Stateful resolution object. This is designed as a one-off object that holds information to kick start the resolution process, and holds the results afterwards. """ - def __init__(self, provider, reporter): + def __init__( + self, + provider: AbstractProvider[RT, CT, KT], + reporter: BaseReporter[RT, CT, KT], + ) -> None: self._p = provider self._r = reporter - self._states = [] + self._states: list[State[RT, CT, KT]] = [] @property - def state(self): + def state(self) -> State[RT, CT, KT]: try: return self._states[-1] except IndexError: raise AttributeError("state") - def _push_new_state(self): + def _push_new_state(self) -> None: """Push a new state into history. This new state will be used to hold resolution results of the next @@ -135,7 +126,12 @@ def _push_new_state(self): ) self._states.append(state) - def _add_to_criteria(self, criteria, requirement, parent): + def _add_to_criteria( + self, + criteria: dict[KT, Criterion[RT, CT]], + requirement: RT, + parent: CT | None, + ) -> None: self._r.adding_requirement(requirement=requirement, parent=parent) identifier = self._p.identify(requirement_or_candidate=requirement) @@ -174,7 +170,9 @@ def _add_to_criteria(self, criteria, requirement, parent): raise RequirementsConflicted(criterion) criteria[identifier] = criterion - def _remove_information_from_criteria(self, criteria, parents): + def _remove_information_from_criteria( + self, criteria: dict[KT, Criterion[RT, CT]], parents: Collection[KT] + ) -> None: """Remove information from parents of criteria. Concretely, removes all values from each criterion's ``information`` @@ -199,7 +197,7 @@ def _remove_information_from_criteria(self, criteria, parents): criterion.incompatibilities, ) - def _get_preference(self, name): + def _get_preference(self, name: KT) -> Preference: return self._p.get_preference( identifier=name, resolutions=self.state.mapping, @@ -214,7 +212,9 @@ def _get_preference(self, name): backtrack_causes=self.state.backtrack_causes, ) - def _is_current_pin_satisfying(self, name, criterion): + def _is_current_pin_satisfying( + self, name: KT, criterion: Criterion[RT, CT] + ) -> bool: try: current_pin = self.state.mapping[name] except KeyError: @@ -224,16 +224,18 @@ def _is_current_pin_satisfying(self, name, criterion): for r in criterion.iter_requirement() ) - def _get_updated_criteria(self, candidate): + def _get_updated_criteria( + self, candidate: CT + ) -> dict[KT, Criterion[RT, CT]]: criteria = self.state.criteria.copy() for requirement in self._p.get_dependencies(candidate=candidate): self._add_to_criteria(criteria, requirement, parent=candidate) return criteria - def _attempt_to_pin_criterion(self, name): + def _attempt_to_pin_criterion(self, name: KT) -> list[Criterion[RT, CT]]: criterion = self.state.criteria[name] - causes = [] + causes: list[Criterion[RT, CT]] = [] for candidate in criterion.candidates: try: criteria = self._get_updated_criteria(candidate) @@ -258,7 +260,7 @@ def _attempt_to_pin_criterion(self, name): # Put newly-pinned candidate at the end. This is essential because # backtracking looks at this mapping to get the last pin. - self.state.mapping.pop(name, None) + self.state.mapping.pop(name, None) # type: ignore[arg-type] self.state.mapping[name] = candidate return [] @@ -267,7 +269,7 @@ def _attempt_to_pin_criterion(self, name): # end, signal for backtracking. return causes - def _backjump(self, causes): + def _backjump(self, causes: list[RequirementInformation[RT, CT]]) -> bool: """Perform backjumping. When we enter here, the stack is like this:: @@ -298,7 +300,7 @@ def _backjump(self, causes): the new Z and go back to step 2. 5b. If the incompatibilities apply cleanly, end backtracking. """ - incompatible_reqs = itertools.chain( + incompatible_reqs: Iterable[CT | RT] = itertools.chain( (c.parent for c in causes if c.parent is not None), (c.requirement for c in causes), ) @@ -309,6 +311,7 @@ def _backjump(self, causes): # Ensure to backtrack to a state that caused the incompatibility incompatible_state = False + broken_state = self.state while not incompatible_state: # Retrieve the last candidate pin and known incompatibilities. try: @@ -334,7 +337,7 @@ def _backjump(self, causes): # Create a new state from the last known-to-work one, and apply # the previously gathered incompatibility information. - def _patch_criteria(): + def _patch_criteria() -> bool: for k, incompatibilities in incompatibilities_from_broken: if not incompatibilities: continue @@ -354,7 +357,7 @@ def _patch_criteria(): {k: incompatibilities}, ), ) - candidates = build_iter_view(matches) + candidates: IterableView[CT] = build_iter_view(matches) if not candidates: return False incompatibilities.extend(criterion.incompatibilities) @@ -378,7 +381,9 @@ def _patch_criteria(): # No way to backtrack anymore. return False - def resolve(self, requirements, max_rounds): + def resolve( + self, requirements: Iterable[RT], max_rounds: int + ) -> State[RT, CT, KT]: if self._states: raise RuntimeError("already resolved") @@ -457,11 +462,17 @@ def resolve(self, requirements, max_rounds): raise ResolutionTooDeep(max_rounds) -def _has_route_to_root(criteria, key, all_keys, connected): +def _has_route_to_root( + criteria: Mapping[KT, Criterion[RT, CT]], + key: KT | None, + all_keys: dict[int, KT | None], + connected: set[KT | None], +) -> bool: if key in connected: return True if key not in criteria: return False + assert key is not None for p in criteria[key].iter_parent(): try: pkey = all_keys[id(p)] @@ -476,18 +487,15 @@ def _has_route_to_root(criteria, key, all_keys, connected): return False -Result = collections.namedtuple("Result", "mapping graph criteria") - - -def _build_result(state): +def _build_result(state: State[RT, CT, KT]) -> Result[RT, CT, KT]: mapping = state.mapping - all_keys = {id(v): k for k, v in mapping.items()} + all_keys: dict[int, KT | None] = {id(v): k for k, v in mapping.items()} all_keys[id(None)] = None - graph = DirectedGraph() + graph: DirectedGraph[KT | None] = DirectedGraph() graph.add(None) # Sentinel as root dependencies' parent. - connected = {None} + connected: set[KT | None] = {None} for key, criterion in state.criteria.items(): if not _has_route_to_root(state.criteria, key, all_keys, connected): continue @@ -509,12 +517,46 @@ def _build_result(state): ) -class Resolver(AbstractResolver): +class AbstractResolver(Generic[RT, CT, KT]): + """The thing that performs the actual resolution work.""" + + base_exception = Exception + + def __init__( + self, + provider: AbstractProvider[RT, CT, KT], + reporter: BaseReporter[RT, CT, KT], + ) -> None: + self.provider = provider + self.reporter = reporter + + def resolve( + self, requirements: Iterable[RT], **kwargs: Any + ) -> Result[RT, CT, KT]: + """Take a collection of constraints, spit out the resolution result. + + This returns a representation of the final resolution state, with one + guarenteed attribute ``mapping`` that contains resolved candidates as + values. The keys are their respective identifiers. + + :param requirements: A collection of constraints. + :param kwargs: Additional keyword arguments that subclasses may accept. + + :raises: ``self.base_exception`` or its subclass. + """ + raise NotImplementedError + + +class Resolver(AbstractResolver[RT, CT, KT]): """The thing that performs the actual resolution work.""" base_exception = ResolverException - def resolve(self, requirements, max_rounds=100): + def resolve( # type: ignore[override] + self, + requirements: Iterable[RT], + max_rounds: int = 100, + ) -> Result[RT, CT, KT]: """Take a collection of constraints, spit out the resolution result. The return value is a representation to the final resolution result. It diff --git a/src/resolvelib/resolvers.pyi b/src/resolvelib/resolvers.pyi deleted file mode 100644 index 528a1a2..0000000 --- a/src/resolvelib/resolvers.pyi +++ /dev/null @@ -1,79 +0,0 @@ -from typing import ( - Collection, - Generic, - Iterable, - Iterator, - List, - Mapping, - Optional, -) - -from .providers import AbstractProvider, AbstractResolver -from .structs import CT, KT, RT, DirectedGraph, IterableView - -# This should be a NamedTuple, but Python 3.6 has a bug that prevents it. -# https://stackoverflow.com/a/50531189/1376863 -class RequirementInformation(tuple, Generic[RT, CT]): - requirement: RT - parent: Optional[CT] - -class Criterion(Generic[RT, CT, KT]): - candidates: IterableView[CT] - information: Collection[RequirementInformation[RT, CT]] - incompatibilities: List[CT] - @classmethod - def from_requirement( - cls, - provider: AbstractProvider[RT, CT, KT], - requirement: RT, - parent: Optional[CT], - ) -> Criterion[RT, CT, KT]: ... - def iter_requirement(self) -> Iterator[RT]: ... - def iter_parent(self) -> Iterator[Optional[CT]]: ... - def merged_with( - self, - provider: AbstractProvider[RT, CT, KT], - requirement: RT, - parent: Optional[CT], - ) -> Criterion[RT, CT, KT]: ... - def excluded_of(self, candidates: List[CT]) -> Criterion[RT, CT, KT]: ... - -class ResolverException(Exception): ... - -class RequirementsConflicted(ResolverException, Generic[RT, CT, KT]): - criterion: Criterion[RT, CT, KT] - -class ResolutionError(ResolverException): ... - -class InconsistentCandidate(ResolverException, Generic[RT, CT, KT]): - candidate: CT - criterion: Criterion[RT, CT, KT] - -class ResolutionImpossible(ResolutionError, Generic[RT, CT]): - causes: List[RequirementInformation[RT, CT]] - -class ResolutionTooDeep(ResolutionError): - round_count: int - -# This should be a NamedTuple, but Python 3.6 has a bug that prevents it. -# https://stackoverflow.com/a/50531189/1376863 -class State(tuple, Generic[RT, CT, KT]): - mapping: Mapping[KT, CT] - criteria: Mapping[KT, Criterion[RT, CT, KT]] - backtrack_causes: Collection[RequirementInformation[RT, CT]] - -class Resolution(Generic[RT, CT, KT]): - def resolve( - self, requirements: Iterable[RT], max_rounds: int - ) -> State[RT, CT, KT]: ... - -class Result(Generic[RT, CT, KT]): - mapping: Mapping[KT, CT] - graph: DirectedGraph[Optional[KT]] - criteria: Mapping[KT, Criterion[RT, CT, KT]] - -class Resolver(AbstractResolver, Generic[RT, CT, KT]): - base_exception = ResolverException - def resolve( - self, requirements: Iterable[RT], max_rounds: int = 100 - ) -> Result[RT, CT, KT]: ... diff --git a/src/resolvelib/structs.py b/src/resolvelib/structs.py index 359a34f..93ed3e1 100644 --- a/src/resolvelib/structs.py +++ b/src/resolvelib/structs.py @@ -1,34 +1,75 @@ -import itertools - -from .compat import collections_abc - +from __future__ import annotations -class DirectedGraph(object): +import itertools +from abc import ABCMeta +from collections import namedtuple +from typing import ( + TYPE_CHECKING, + Callable, + Collection, + Container, + Generic, + Iterable, + Iterator, + Mapping, + NamedTuple, + Sequence, + TypeVar, + Union, +) + +KT = TypeVar("KT") # Identifier. +RT = TypeVar("RT") # Requirement. +CT = TypeVar("CT") # Candidate. + +Matches = Union[Iterable[CT], Callable[[], Iterable[CT]]] + +if TYPE_CHECKING: + + class RequirementInformation(NamedTuple, Generic[RT, CT]): + requirement: RT + parent: CT | None + + class State(NamedTuple, Generic[RT, CT, KT]): + """Resolution state in a round.""" + + mapping: dict[KT, CT] + criteria: dict[KT, Criterion[RT, CT]] + backtrack_causes: list[RequirementInformation[RT, CT]] + +else: + RequirementInformation = namedtuple( + "RequirementInformation", ["requirement", "parent"] + ) + State = namedtuple("State", ["mapping", "criteria", "backtrack_causes"]) + + +class DirectedGraph(Generic[KT]): """A graph structure with directed edges.""" - def __init__(self): - self._vertices = set() - self._forwards = {} # -> Set[] - self._backwards = {} # -> Set[] + def __init__(self) -> None: + self._vertices: set[KT] = set() + self._forwards: dict[KT, set[KT]] = {} # -> Set[] + self._backwards: dict[KT, set[KT]] = {} # -> Set[] - def __iter__(self): + def __iter__(self) -> Iterator[KT]: return iter(self._vertices) - def __len__(self): + def __len__(self) -> int: return len(self._vertices) - def __contains__(self, key): + def __contains__(self, key: KT) -> bool: return key in self._vertices - def copy(self): + def copy(self) -> DirectedGraph[KT]: """Return a shallow copy of this graph.""" - other = DirectedGraph() + other = type(self)() other._vertices = set(self._vertices) other._forwards = {k: set(v) for k, v in self._forwards.items()} other._backwards = {k: set(v) for k, v in self._backwards.items()} return other - def add(self, key): + def add(self, key: KT) -> None: """Add a new vertex to the graph.""" if key in self._vertices: raise ValueError("vertex exists") @@ -36,7 +77,7 @@ def add(self, key): self._forwards[key] = set() self._backwards[key] = set() - def remove(self, key): + def remove(self, key: KT) -> None: """Remove a vertex from the graph, disconnecting all edges from/to it.""" self._vertices.remove(key) for f in self._forwards.pop(key): @@ -44,10 +85,10 @@ def remove(self, key): for t in self._backwards.pop(key): self._forwards[t].remove(key) - def connected(self, f, t): + def connected(self, f: KT, t: KT) -> bool: return f in self._backwards[t] and t in self._forwards[f] - def connect(self, f, t): + def connect(self, f: KT, t: KT) -> None: """Connect two existing vertices. Nothing happens if the vertices are already connected. @@ -57,56 +98,59 @@ def connect(self, f, t): self._forwards[f].add(t) self._backwards[t].add(f) - def iter_edges(self): + def iter_edges(self) -> Iterator[tuple[KT, KT]]: for f, children in self._forwards.items(): for t in children: yield f, t - def iter_children(self, key): + def iter_children(self, key: KT) -> Iterator[KT]: return iter(self._forwards[key]) - def iter_parents(self, key): + def iter_parents(self, key: KT) -> Iterator[KT]: return iter(self._backwards[key]) -class IteratorMapping(collections_abc.Mapping): - def __init__(self, mapping, accessor, appends=None): +class IteratorMapping(Mapping[KT, Iterator[CT]], Generic[RT, CT, KT]): + def __init__( + self, + mapping: Mapping[KT, RT], + accessor: Callable[[RT], Iterable[CT]], + appends: Mapping[KT, Iterable[CT]] | None = None, + ) -> None: self._mapping = mapping self._accessor = accessor - self._appends = appends or {} + self._appends: Mapping[KT, Iterable[CT]] = appends or {} - def __repr__(self): + def __repr__(self) -> str: return "IteratorMapping({!r}, {!r}, {!r})".format( self._mapping, self._accessor, self._appends, ) - def __bool__(self): + def __bool__(self) -> bool: return bool(self._mapping or self._appends) - __nonzero__ = __bool__ # XXX: Python 2. - - def __contains__(self, key): + def __contains__(self, key: object) -> bool: return key in self._mapping or key in self._appends - def __getitem__(self, k): + def __getitem__(self, k: KT) -> Iterator[CT]: try: v = self._mapping[k] except KeyError: return iter(self._appends[k]) return itertools.chain(self._accessor(v), self._appends.get(k, ())) - def __iter__(self): + def __iter__(self) -> Iterator[KT]: more = (k for k in self._appends if k not in self._mapping) return itertools.chain(self._mapping, more) - def __len__(self): + def __len__(self) -> int: more = sum(1 for k in self._appends if k not in self._mapping) return len(self._mapping) + more -class _FactoryIterableView(object): +class _FactoryIterableView(Generic[RT]): """Wrap an iterator factory returned by `find_matches()`. Calling `iter()` on this class would invoke the underlying iterator @@ -115,23 +159,21 @@ class _FactoryIterableView(object): built-in Python sequence types. """ - def __init__(self, factory): + def __init__(self, factory: Callable[[], Iterable[RT]]) -> None: self._factory = factory - self._iterable = None + self._iterable: Iterable[RT] | None = None - def __repr__(self): - return "{}({})".format(type(self).__name__, list(self)) + def __repr__(self) -> str: + return f"{type(self).__name__}({list(self)})" - def __bool__(self): + def __bool__(self) -> bool: try: next(iter(self)) except StopIteration: return False return True - __nonzero__ = __bool__ # XXX: Python 2. - - def __iter__(self): + def __iter__(self) -> Iterator[RT]: iterable = ( self._factory() if self._iterable is None else self._iterable ) @@ -139,32 +181,79 @@ def __iter__(self): return current -class _SequenceIterableView(object): +class _SequenceIterableView(Generic[RT]): """Wrap an iterable returned by find_matches(). This is essentially just a proxy to the underlying sequence that provides the same interface as `_FactoryIterableView`. """ - def __init__(self, sequence): + def __init__(self, sequence: Sequence[RT]): self._sequence = sequence - def __repr__(self): - return "{}({})".format(type(self).__name__, self._sequence) + def __repr__(self) -> str: + return f"{type(self).__name__}({self._sequence})" - def __bool__(self): + def __bool__(self) -> bool: return bool(self._sequence) - __nonzero__ = __bool__ # XXX: Python 2. - - def __iter__(self): + def __iter__(self) -> Iterator[RT]: return iter(self._sequence) -def build_iter_view(matches): +class IterableView(Container[CT], Iterator[CT], metaclass=ABCMeta): + pass + + +def build_iter_view( + matches: Iterable[CT] | Callable[[], Iterable[CT]] +) -> IterableView[CT]: """Build an iterable view from the value returned by `find_matches()`.""" if callable(matches): - return _FactoryIterableView(matches) - if not isinstance(matches, collections_abc.Sequence): + return _FactoryIterableView(matches) # type: ignore[return-value] + if not isinstance(matches, Sequence): matches = list(matches) - return _SequenceIterableView(matches) + return _SequenceIterableView(matches) # type: ignore[return-value] + + +class Criterion(Generic[RT, CT]): + """Representation of possible resolution results of a package. + + This holds three attributes: + + * `information` is a collection of `RequirementInformation` pairs. + Each pair is a requirement contributing to this criterion, and the + candidate that provides the requirement. + * `incompatibilities` is a collection of all known not-to-work candidates + to exclude from consideration. + * `candidates` is a collection containing all possible candidates deducted + from the union of contributing requirements and known incompatibilities. + It should never be empty, except when the criterion is an attribute of a + raised `RequirementsConflicted` (in which case it is always empty). + + .. note:: + This class is intended to be externally immutable. **Do not** mutate + any of its attribute containers. + """ + + def __init__( + self, + candidates: IterableView[CT], + information: Collection[RequirementInformation[RT, CT]], + incompatibilities: Collection[CT], + ) -> None: + self.candidates = candidates + self.information = information + self.incompatibilities = incompatibilities + + def __repr__(self) -> str: + requirements = ", ".join( + f"({req!r}, via={parent!r})" for req, parent in self.information + ) + return f"Criterion({requirements})" + + def iter_requirement(self) -> Iterator[RT]: + return (i.requirement for i in self.information) + + def iter_parent(self) -> Iterator[CT | None]: + return (i.parent for i in self.information) diff --git a/src/resolvelib/structs.pyi b/src/resolvelib/structs.pyi deleted file mode 100644 index 0ac59f0..0000000 --- a/src/resolvelib/structs.pyi +++ /dev/null @@ -1,40 +0,0 @@ -from abc import ABCMeta -from typing import ( - Callable, - Container, - Generic, - Iterable, - Iterator, - Mapping, - Tuple, - TypeVar, - Union, -) - -KT = TypeVar("KT") # Identifier. -RT = TypeVar("RT") # Requirement. -CT = TypeVar("CT") # Candidate. -_T = TypeVar("_T") - -Matches = Union[Iterable[CT], Callable[[], Iterable[CT]]] - -class IteratorMapping(Mapping[KT, _T], metaclass=ABCMeta): - pass - -class IterableView(Container[CT], Iterable[CT], metaclass=ABCMeta): - pass - -class DirectedGraph(Generic[KT]): - def __iter__(self) -> Iterator[KT]: ... - def __len__(self) -> int: ... - def __contains__(self, key: KT) -> bool: ... - def copy(self) -> "DirectedGraph[KT]": ... - def add(self, key: KT) -> None: ... - def remove(self, key: KT) -> None: ... - def connected(self, f: KT, t: KT) -> bool: ... - def connect(self, f: KT, t: KT) -> None: ... - def iter_edges(self) -> Iterable[Tuple[KT, KT]]: ... - def iter_children(self, key: KT) -> Iterable[KT]: ... - def iter_parents(self, key: KT) -> Iterable[KT]: ... - -def build_iter_view(matches: Matches) -> IterableView[CT]: ... diff --git a/tests/test_resolvers.py b/tests/test_resolvers.py index fe95e4b..46d0f5d 100644 --- a/tests/test_resolvers.py +++ b/tests/test_resolvers.py @@ -1,17 +1,9 @@ -from typing import TYPE_CHECKING +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, Iterator, Sequence, Tuple if TYPE_CHECKING: - from typing import ( - Any, - Iterable, - Iterator, - List, - Mapping, - Sequence, - Set, - Tuple, - Union, - ) + from typing import Iterable, Mapping import pytest from packaging.requirements import Requirement @@ -196,11 +188,9 @@ def test_pin_conflict_with_self(monkeypatch, reporter): Verify correct behavior of attempting to pin a candidate version that conflicts with a previously pinned (now invalidated) version for that same candidate (#91). """ - if TYPE_CHECKING: - Candidate = Tuple[ # noqa: F841 - str, Version, Sequence[str] - ] # name, version, requirements - all_candidates = { + Candidate = Tuple[str, Version, Sequence[str]] + + all_candidates: Mapping[str, Sequence[Candidate]] = { "parent": [("parent", Version("1"), ["child<2"])], "child": [ ("child", Version("2"), ["grandchild>=2"]), @@ -211,11 +201,10 @@ def test_pin_conflict_with_self(monkeypatch, reporter): ("grandchild", Version("2"), []), ("grandchild", Version("1"), []), ], - } # type: Mapping[str, Sequence[Candidate]] + } - class Provider(AbstractProvider): # AbstractProvider[str, Candidate, str] - def identify(self, requirement_or_candidate): - # type: (Union[str, Candidate]) -> str + class Provider(AbstractProvider[str, Candidate, str]): + def identify(self, requirement_or_candidate: str | Candidate) -> str: result = ( Requirement(requirement_or_candidate).name if isinstance(requirement_or_candidate, str) @@ -224,22 +213,21 @@ def identify(self, requirement_or_candidate): assert result in all_candidates, "unknown requirement_or_candidate" return result - def get_preference(self, identifier, *args, **kwargs): - # type: (str, *object, **object) -> str + def get_preference( + self, identifier: str, *args: Any, **kwargs: Any + ) -> str: # prefer child over parent (alphabetically) return identifier - def get_dependencies(self, candidate): - # type: (Candidate) -> Sequence[str] + def get_dependencies(self, candidate: Candidate) -> Sequence[str]: return candidate[2] def find_matches( self, - identifier, # type: str - requirements, # type: Mapping[str, Iterator[str]] - incompatibilities, # type: Mapping[str, Iterator[Candidate]] - ): - # type: (...) -> Iterator[Candidate] + identifier: str, + requirements: Mapping[str, Iterator[str]], + incompatibilities: Mapping[str, Iterator[Candidate]], + ) -> Iterator[Candidate]: return ( candidate for candidate in all_candidates[identifier] @@ -250,12 +238,13 @@ def find_matches( if candidate not in incompatibilities[identifier] ) - def is_satisfied_by(self, requirement, candidate): - # type: (str, Candidate) -> bool + def is_satisfied_by( + self, requirement: str, candidate: Candidate + ) -> bool: return candidate[1] in Requirement(requirement).specifier # patch Resolution._get_updated_criteria to collect rejected states - rejected_criteria = [] # type: List[Criterion] + rejected_criteria: list[Criterion] = [] get_updated_criteria_orig = ( Resolution._get_updated_criteria # type: ignore[attr-defined] ) @@ -271,13 +260,12 @@ def get_updated_criteria_patch(self, candidate): Resolution, "_get_updated_criteria", get_updated_criteria_patch ) - resolver = Resolver( - Provider(), reporter - ) # type: Resolver[str, Candidate, str] + resolver: Resolver[str, Candidate, str] = Resolver(Provider(), reporter) result = resolver.resolve(["child", "parent"]) - def get_child_versions(information): - # type: (Iterable[RequirementInformation[str, Candidate]]) -> Set[str] + def get_child_versions( + information: Iterable[RequirementInformation[str, Candidate]] + ) -> set[str]: return { str(inf.parent[1]) for inf in information