diff --git a/src/poetry/config/config.py b/src/poetry/config/config.py index 38291a48752..28a0a6818e3 100644 --- a/src/poetry/config/config.py +++ b/src/poetry/config/config.py @@ -112,6 +112,20 @@ def _all(config: dict, parent_key: str = "") -> dict: def raw(self) -> dict[str, Any]: return self._config + @staticmethod + def _get_environment_repositories() -> dict[str, dict[str, str]]: + repositories = {} + pattern = re.compile(r"POETRY_REPOSITORIES_(?P[A-Z_]+)_URL") + + for env_key in os.environ.keys(): + match = pattern.match(env_key) + if match: + repositories[match.group("name").lower().replace("_", "-")] = { + "url": os.environ[env_key] + } + + return repositories + def get(self, setting_name: str, default: Any = None) -> Any: """ Retrieve a setting value. @@ -121,6 +135,12 @@ def get(self, setting_name: str, default: Any = None) -> Any: # Looking in the environment if the setting # is set via a POETRY_* environment variable if self._use_environment: + if setting_name == "repositories": + # repositories setting is special for now + repositories = self._get_environment_repositories() + if repositories: + return repositories + env = "POETRY_" + "_".join(k.upper().replace("-", "_") for k in keys) env_value = os.getenv(env) if env_value is not None: diff --git a/src/poetry/factory.py b/src/poetry/factory.py index ef0ae390efb..df32ac76d77 100644 --- a/src/poetry/factory.py +++ b/src/poetry/factory.py @@ -184,8 +184,6 @@ def create_legacy_repository( cls, source: dict[str, str], auth_config: Config, disable_cache: bool = False ) -> LegacyRepository: from poetry.repositories.legacy_repository import LegacyRepository - from poetry.utils.helpers import get_cert - from poetry.utils.helpers import get_client_cert if "url" not in source: raise RuntimeError("Unsupported source specified") @@ -200,8 +198,6 @@ def create_legacy_repository( name, url, config=auth_config, - cert=get_cert(auth_config, name), - client_cert=get_client_cert(auth_config, name), disable_cache=disable_cache, ) diff --git a/src/poetry/installation/pip_installer.py b/src/poetry/installation/pip_installer.py index 5e6d01d9f70..da01d507a16 100644 --- a/src/poetry/installation/pip_installer.py +++ b/src/poetry/installation/pip_installer.py @@ -12,6 +12,7 @@ from poetry.core.pyproject.toml import PyProjectTOML from poetry.installation.base_installer import BaseInstaller +from poetry.repositories.http import HTTPRepository from poetry.utils._compat import encode from poetry.utils.helpers import remove_directory from poetry.utils.pip import pip_install @@ -57,23 +58,28 @@ def install(self, package: Package, update: bool = False) -> None: ) args += ["--trusted-host", parsed.hostname] - if repository.cert: - args += ["--cert", str(repository.cert)] + if isinstance(repository, HTTPRepository): + if repository.cert: + args += ["--cert", str(repository.cert)] - if repository.client_cert: - args += ["--client-cert", str(repository.client_cert)] + if repository.client_cert: + args += ["--client-cert", str(repository.client_cert)] - index_url = repository.authenticated_url + index_url = repository.authenticated_url + + args += ["--index-url", index_url] - args += ["--index-url", index_url] if ( self._pool.has_default() and repository.name != self._pool.repositories[0].name ): - args += [ - "--extra-index-url", - self._pool.repositories[0].authenticated_url, - ] + first_repository = self._pool.repositories[0] + + if isinstance(first_repository, HTTPRepository): + args += [ + "--extra-index-url", + first_repository.authenticated_url, + ] if update: args.append("-U") diff --git a/src/poetry/publishing/publisher.py b/src/poetry/publishing/publisher.py index 4fd6ad00584..95f79afdab1 100644 --- a/src/poetry/publishing/publisher.py +++ b/src/poetry/publishing/publisher.py @@ -69,14 +69,14 @@ def publish( logger.debug( f"Found authentication information for {repository_name}." ) - username = auth["username"] - password = auth["password"] + username = auth.username + password = auth.password resolved_client_cert = client_cert or get_client_cert( self._poetry.config, repository_name ) # Requesting missing credentials but only if there is not a client cert defined. - if not resolved_client_cert: + if not resolved_client_cert and hasattr(self._io, "ask"): if username is None: username = self._io.ask("Username:") diff --git a/src/poetry/repositories/cached.py b/src/poetry/repositories/cached.py index 12d57fbacf4..bee34caf4fc 100644 --- a/src/poetry/repositories/cached.py +++ b/src/poetry/repositories/cached.py @@ -4,7 +4,6 @@ from abc import abstractmethod from typing import TYPE_CHECKING -from cachecontrol.caches import FileCache from cachy import CacheManager from poetry.core.semver.helpers import parse_constraint @@ -21,7 +20,7 @@ class CachedRepository(Repository, ABC): CACHE_VERSION = parse_constraint("1.0.0") - def __init__(self, name: str, cache_group: str, disable_cache: bool = False): + def __init__(self, name: str, disable_cache: bool = False): super().__init__(name) self._disable_cache = disable_cache self._cache_dir = REPOSITORY_CACHE_DIR / name @@ -36,7 +35,6 @@ def __init__(self, name: str, cache_group: str, disable_cache: bool = False): }, } ) - self._cache_control_cache = FileCache(str(self._cache_dir / cache_group)) @abstractmethod def _get_release_info(self, name: str, version: str) -> dict: diff --git a/src/poetry/repositories/http.py b/src/poetry/repositories/http.py index 9eee6c667e9..02d8210c46c 100644 --- a/src/poetry/repositories/http.py +++ b/src/poetry/repositories/http.py @@ -1,21 +1,18 @@ from __future__ import annotations -import contextlib import hashlib import os import urllib +import urllib.parse from abc import ABC from collections import defaultdict from pathlib import Path from typing import TYPE_CHECKING from typing import Any -from urllib.parse import quote import requests -import requests.auth -from cachecontrol import CacheControl from poetry.core.packages.dependency import Dependency from poetry.core.packages.utils.link import Link from poetry.core.version.markers import parse_marker @@ -42,41 +39,18 @@ def __init__( url: str, config: Config | None = None, disable_cache: bool = False, - cert: Path | None = None, - client_cert: Path | None = None, ) -> None: - super().__init__(name, "_http", disable_cache) + super().__init__(name, disable_cache) self._url = url - self._client_cert = client_cert - self._cert = cert - self._authenticator = Authenticator( - config=config or Config(use_environment=True) - ) - - self._session = CacheControl( - self._authenticator.session, cache=self._cache_control_cache + config=config or Config(use_environment=True), + cache_id=name, + disable_cache=disable_cache, ) - username, password = self._authenticator.get_credentials_for_url(self._url) - if username is not None and password is not None: - self._authenticator.session.auth = requests.auth.HTTPBasicAuth( - username, password - ) - - if self._cert: - self._authenticator.session.verify = str(self._cert) - - if self._client_cert: - self._authenticator.session.cert = str(self._client_cert) - @property - def session(self) -> CacheControl: - return self._session - - def __del__(self) -> None: - with contextlib.suppress(AttributeError): - self._session.close() + def session(self) -> Authenticator: + return self._authenticator @property def url(self) -> str: @@ -84,22 +58,21 @@ def url(self) -> str: @property def cert(self) -> Path | None: - return self._cert + cert = self._authenticator.get_certs_for_url(self.url).get("verify") + if cert: + return Path(cert) + return None @property def client_cert(self) -> Path | None: - return self._client_cert + cert = self._authenticator.get_certs_for_url(self.url).get("cert") + if cert: + return Path(cert) + return None @property def authenticated_url(self) -> str: - if not self._session.auth: - return self.url - - parsed = urllib.parse.urlparse(self.url) - username = quote(self._session.auth.username, safe="") - password = quote(self._session.auth.password, safe="") - - return f"{parsed.scheme}://{username}:{password}@{parsed.netloc}{parsed.path}" + return self._authenticator.authenticated_url(url=self.url) def _download(self, url: str, dest: str) -> None: return download_file(url, dest, session=self.session) @@ -286,7 +259,7 @@ def _links_to_data(self, links: list[Link], data: PackageInfo) -> dict: def _get_response(self, endpoint: str) -> requests.Response | None: url = self._url + endpoint try: - response = self.session.get(url) + response = self.session.get(url, raise_for_status=False) if response.status_code in (401, 403): self._log( f"Authorization error accessing {url}", diff --git a/src/poetry/repositories/legacy_repository.py b/src/poetry/repositories/legacy_repository.py index a065ab5bafc..e9c596bdcc8 100644 --- a/src/poetry/repositories/legacy_repository.py +++ b/src/poetry/repositories/legacy_repository.py @@ -13,8 +13,6 @@ if TYPE_CHECKING: - from pathlib import Path - from poetry.core.packages.dependency import Dependency from poetry.core.packages.utils.link import Link @@ -28,15 +26,11 @@ def __init__( url: str, config: Config | None = None, disable_cache: bool = False, - cert: Path | None = None, - client_cert: Path | None = None, ) -> None: if name == "pypi": raise ValueError("The name [pypi] is reserved for repositories") - super().__init__( - name, url.rstrip("/"), config, disable_cache, cert, client_cert - ) + super().__init__(name, url.rstrip("/"), config, disable_cache) def find_packages(self, dependency: Dependency) -> list[Package]: packages = [] diff --git a/src/poetry/repositories/pypi_repository.py b/src/poetry/repositories/pypi_repository.py index cb35e69c69b..db0ab62713f 100644 --- a/src/poetry/repositories/pypi_repository.py +++ b/src/poetry/repositories/pypi_repository.py @@ -232,7 +232,7 @@ def _get(self, endpoint: str) -> dict | None: except requests.exceptions.TooManyRedirects: # Cache control redirect loop. # We try to remove the cache and try again - self._cache_control_cache.delete(self._base_url + endpoint) + self.session.delete_cache(self._base_url + endpoint) json_response = self.session.get(self._base_url + endpoint) if json_response.status_code == 404: diff --git a/src/poetry/utils/authenticator.py b/src/poetry/utils/authenticator.py index eb25805e855..01af47887eb 100644 --- a/src/poetry/utils/authenticator.py +++ b/src/poetry/utils/authenticator.py @@ -1,20 +1,28 @@ from __future__ import annotations +import contextlib +import dataclasses +import functools import logging import time import urllib.parse +from os.path import commonprefix from typing import TYPE_CHECKING from typing import Any -from typing import Iterator import requests import requests.auth import requests.exceptions +from cachecontrol import CacheControl +from cachecontrol.caches import FileCache + from poetry.exceptions import PoetryException +from poetry.locations import REPOSITORY_CACHE_DIR from poetry.utils.helpers import get_cert from poetry.utils.helpers import get_client_cert +from poetry.utils.password_manager import HTTPAuthCredential from poetry.utils.password_manager import PasswordManager @@ -26,43 +34,146 @@ from poetry.config.config import Config -logger = logging.getLogger() +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class AuthenticatorRepositoryConfig: + name: str + url: str + netloc: str = dataclasses.field(init=False) + path: str = dataclasses.field(init=False) + + def __post_init__(self) -> None: + parsed_url = urllib.parse.urlsplit(self.url) + self.netloc = parsed_url.netloc + self.path = parsed_url.path + + def certs(self, config: Config) -> dict[str, Path | None]: + return { + "cert": get_client_cert(config, self.name), + "verify": get_cert(config, self.name), + } + + @property + def http_credential_keys(self) -> list[str]: + return [self.url, self.netloc, self.name] + + def get_http_credentials( + self, password_manager: PasswordManager, username: str | None = None + ) -> HTTPAuthCredential: + # try with the repository name via the password manager + credential = HTTPAuthCredential( + **(password_manager.get_http_auth(self.name) or {}) + ) + + if credential.password is None: + # fallback to url and netloc based keyring entries + credential = password_manager.keyring.get_credential( + self.url, self.netloc, username=credential.username + ) + + if credential.password is not None: + return HTTPAuthCredential( + username=credential.username, password=credential.password + ) + + return credential class Authenticator: - def __init__(self, config: Config, io: IO | None = None) -> None: + def __init__( + self, + config: Config, + io: IO | None = None, + cache_id: str | None = None, + disable_cache: bool = False, + ) -> None: self._config = config self._io = io - self._session: requests.Session | None = None - self._credentials: dict[str, tuple[str, str]] = {} + self._sessions_for_netloc: dict[str, requests.Session] = {} + self._credentials: dict[str, HTTPAuthCredential] = {} self._certs: dict[str, dict[str, Path | None]] = {} + self._configured_repositories: dict[ + str, AuthenticatorRepositoryConfig + ] | None = None self._password_manager = PasswordManager(self._config) + self._cache_control = ( + FileCache( + str(REPOSITORY_CACHE_DIR / (cache_id or "_default_cache") / "_http") + ) + if not disable_cache + else None + ) - def _log(self, message: str, level: str = "debug") -> None: - if self._io is not None: - self._io.write_line(f"<{level}>{message}") - else: - getattr(logger, level, logger.debug)(message) + @property + def cache(self) -> FileCache | None: + return self._cache_control @property - def session(self) -> requests.Session: - if self._session is None: - self._session = requests.Session() + def is_cached(self) -> bool: + return self._cache_control is not None - return self._session + def create_session(self) -> requests.Session: + session = requests.Session() + + if not self.is_cached: + return session + + return CacheControl(sess=session, cache=self._cache_control) + + def get_session(self, url: str | None = None) -> requests.Session: + if not url: + return self.create_session() + + parsed_url = urllib.parse.urlsplit(url) + netloc = parsed_url.netloc + + if netloc not in self._sessions_for_netloc: + logger.debug("Creating new session for %s", netloc) + self._sessions_for_netloc[netloc] = self.create_session() + + return self._sessions_for_netloc[netloc] + + def close(self) -> None: + for session in self._sessions_for_netloc.values(): + if session is not None: + with contextlib.suppress(AttributeError): + session.close() def __del__(self) -> None: - if self._session is not None: - self._session.close() + self.close() + + def delete_cache(self, url: str) -> None: + if self.is_cached: + self._cache_control.delete(key=url) + + def authenticated_url(self, url: str) -> str: + parsed = urllib.parse.urlparse(url) + credential = self.get_credentials_for_url(url) + + if credential.username is not None and credential.password is not None: + username = urllib.parse.quote(credential.username, safe="") + password = urllib.parse.quote(credential.password, safe="") - def request(self, method: str, url: str, **kwargs: Any) -> requests.Response: + return ( + f"{parsed.scheme}://{username}:{password}@{parsed.netloc}{parsed.path}" + ) + + return url + + def request( + self, method: str, url: str, raise_for_status: bool = True, **kwargs: Any + ) -> requests.Response: request = requests.Request(method, url) - username, password = self.get_credentials_for_url(url) + credential = self.get_credentials_for_url(url) - if username is not None and password is not None: - request = requests.auth.HTTPBasicAuth(username, password)(request) + if credential.username is not None or credential.password is not None: + request = requests.auth.HTTPBasicAuth( + credential.username or "", credential.password or "" + )(request) - session = self.session + session = self.get_session(url=url) prepared_request = session.prepare_request(request) proxies = kwargs.get("proxies", {}) @@ -100,31 +211,71 @@ def request(self, method: str, url: str, **kwargs: Any) -> requests.Response: raise e else: if resp.status_code not in [502, 503, 504] or is_last_attempt: - resp.raise_for_status() + if resp.status_code is not None and raise_for_status: + resp.raise_for_status() return resp if not is_last_attempt: attempt += 1 delay = 0.5 * attempt - self._log(f"Retrying HTTP request in {delay} seconds.", level="debug") + logger.debug(f"Retrying HTTP request in {delay} seconds.") time.sleep(delay) continue # this should never really be hit under any sane circumstance raise PoetryException("Failed HTTP {} request", method.upper()) - def get_credentials_for_url(self, url: str) -> tuple[str | None, str | None]: - parsed_url = urllib.parse.urlsplit(url) + def get(self, url: str, **kwargs: Any) -> requests.Response: + return self.request("get", url, **kwargs) - netloc = parsed_url.netloc + def post(self, url: str, **kwargs: Any) -> requests.Response: + return self.request("post", url, **kwargs) - credentials: tuple[str | None, str | None] = self._credentials.get( - netloc, (None, None) + def _get_credentials_for_repository( + self, repository: AuthenticatorRepositoryConfig, username: str | None = None + ) -> HTTPAuthCredential: + # cache repository credentials by repository url to avoid multiple keyring + # backend queries when packages are being downloaded from the same source + key = f"{repository.url}#username={username or ''}" + + if key not in self._credentials: + self._credentials[key] = repository.get_http_credentials( + password_manager=self._password_manager, username=username + ) + + return self._credentials[key] + + def _get_credentials_for_url(self, url: str) -> HTTPAuthCredential: + repository = self.get_repository_config_for_url(url) + + credential = ( + self._get_credentials_for_repository(repository=repository) + if repository is not None + else HTTPAuthCredential() ) - if credentials == (None, None): + if credential.password is None: + parsed_url = urllib.parse.urlsplit(url) + netloc = parsed_url.netloc + credential = self._password_manager.keyring.get_credential( + url, netloc, username=credential.username + ) + + return HTTPAuthCredential( + username=credential.username, password=credential.password + ) + + return credential + + def get_credentials_for_url(self, url: str) -> HTTPAuthCredential: + parsed_url = urllib.parse.urlsplit(url) + netloc = parsed_url.netloc + + if url not in self._credentials: if "@" not in netloc: - credentials = self._get_credentials_for_netloc(netloc) + # no credentials were provided in the url, try finding the + # best repository configuration + self._credentials[url] = self._get_credentials_for_url(url) else: # Split from the right because that's how urllib.parse.urlsplit() # behaves if more than one @ is present (which can be checked using @@ -134,110 +285,89 @@ def get_credentials_for_url(self, url: str) -> tuple[str | None, str | None]: # behaves if more than one : is present (which again can be checked # using the password attribute of the return value) user, password = auth.split(":", 1) if ":" in auth else (auth, "") - credentials = ( + self._credentials[url] = HTTPAuthCredential( urllib.parse.unquote(user), urllib.parse.unquote(password), ) - if any(credential is not None for credential in credentials): - credentials = (credentials[0] or "", credentials[1] or "") - self._credentials[netloc] = credentials - - return credentials + return self._credentials[url] def get_pypi_token(self, name: str) -> str | None: return self._password_manager.get_pypi_token(name) - def get_http_auth(self, name: str) -> dict[str, str | None] | None: - return self._get_http_auth(name, None) - - def _get_http_auth( - self, name: str, netloc: str | None - ) -> dict[str, str | None] | None: + def get_http_auth( + self, name: str, username: str | None = None + ) -> HTTPAuthCredential | None: if name == "pypi": - url = "https://upload.pypi.org/legacy/" + repository = AuthenticatorRepositoryConfig( + name, "https://upload.pypi.org/legacy/" + ) else: - url = self._config.get(f"repositories.{name}.url") - if not url: + if name not in self.configured_repositories: return None + repository = self.configured_repositories[name] - parsed_url = urllib.parse.urlsplit(url) - - if netloc is None or netloc == parsed_url.netloc: - auth = self._password_manager.get_http_auth(name) - auth = auth or {} - - if auth.get("password") is None: - username = auth.get("username") - auth = self._get_credentials_for_netloc_from_keyring( - url, parsed_url.netloc, username - ) - - return auth - - return None - - def _get_credentials_for_netloc(self, netloc: str) -> tuple[str | None, str | None]: - for repository_name, _ in self._get_repository_netlocs(): - auth = self._get_http_auth(repository_name, netloc) - - if auth is None: - continue + return self._get_credentials_for_repository( + repository=repository, username=username + ) - return auth.get("username"), auth.get("password") + @property + def configured_repositories(self) -> dict[str, AuthenticatorRepositoryConfig]: + if self._configured_repositories is None: + self._configured_repositories = {} + for repository_name in self._config.get("repositories", []): + url = self._config.get(f"repositories.{repository_name}.url") + self._configured_repositories[ + repository_name + ] = AuthenticatorRepositoryConfig(repository_name, url) - return None, None + return self._configured_repositories def get_certs_for_url(self, url: str) -> dict[str, Path | None]: + if url not in self._certs: + self._certs[url] = self._get_certs_for_url(url) + return self._certs[url] + + @functools.lru_cache(maxsize=None) + def get_repository_config_for_url( + self, url: str + ) -> AuthenticatorRepositoryConfig | None: parsed_url = urllib.parse.urlsplit(url) - - netloc = parsed_url.netloc - - return self._certs.setdefault( - netloc, - self._get_certs_for_netloc_from_config(netloc), - ) - - def _get_repository_netlocs(self) -> Iterator[tuple[str, str]]: - for repository_name in self._config.get("repositories", []): - url = self._config.get(f"repositories.{repository_name}.url") - parsed_url = urllib.parse.urlsplit(url) - yield repository_name, parsed_url.netloc - - def _get_credentials_for_netloc_from_keyring( - self, url: str, netloc: str, username: str | None - ) -> dict[str, str | None] | None: - import keyring - - cred = keyring.get_credential(url, username) - if cred is not None: - return { - "username": cred.username, - "password": cred.password, - } - - cred = keyring.get_credential(netloc, username) - if cred is not None: - return { - "username": cred.username, - "password": cred.password, - } - - if username: - return { - "username": username, - "password": None, - } - - return None - - def _get_certs_for_netloc_from_config(self, netloc: str) -> dict[str, Path | None]: - certs: dict[str, Path | None] = {"cert": None, "verify": None} - - for repository_name, repository_netloc in self._get_repository_netlocs(): - if netloc == repository_netloc: - certs["cert"] = get_client_cert(self._config, repository_name) - certs["verify"] = get_cert(self._config, repository_name) - break - - return certs + candidates_netloc_only = [] + candidates_path_match = [] + + for repository in self.configured_repositories.values(): + + if repository.netloc == parsed_url.netloc: + if parsed_url.path.startswith(repository.path) or commonprefix( + (parsed_url.path, repository.path) + ): + candidates_path_match.append(repository) + continue + candidates_netloc_only.append(repository) + + if candidates_path_match: + candidates = candidates_path_match + elif candidates_netloc_only: + candidates = candidates_netloc_only + else: + return None + + if len(candidates) > 1: + logger.debug( + "Multiple source configurations found for %s - %s", + parsed_url.netloc, + ", ".join(map(lambda c: c.name, candidates)), + ) + # prefer the more specific path + candidates.sort( + key=lambda c: len(commonprefix([parsed_url.path, c.path])), reverse=True + ) + + return candidates[0] + + def _get_certs_for_url(self, url: str) -> dict[str, Path | None]: + selected = self.get_repository_config_for_url(url) + if selected: + return selected.certs(config=self._config) + return {"cert": None, "verify": None} diff --git a/src/poetry/utils/helpers.py b/src/poetry/utils/helpers.py index 326b5d44516..cd55b3faf17 100644 --- a/src/poetry/utils/helpers.py +++ b/src/poetry/utils/helpers.py @@ -20,6 +20,7 @@ from requests import Session from poetry.config.config import Config + from poetry.utils.authenticator import Authenticator _canonicalize_regex = re.compile("[-_]+") @@ -94,7 +95,7 @@ def merge_dicts(d1: dict, d2: dict) -> None: def download_file( url: str, dest: str, - session: Session | None = None, + session: Authenticator | Session | None = None, chunk_size: int = 1024, ) -> None: import requests diff --git a/src/poetry/utils/password_manager.py b/src/poetry/utils/password_manager.py index b525bdbae45..15b8d764b73 100644 --- a/src/poetry/utils/password_manager.py +++ b/src/poetry/utils/password_manager.py @@ -1,5 +1,6 @@ from __future__ import annotations +import dataclasses import logging from contextlib import suppress @@ -22,6 +23,12 @@ class KeyRingError(Exception): pass +@dataclasses.dataclass +class HTTPAuthCredential: + username: str | None = dataclasses.field(default=None) + password: str | None = dataclasses.field(default=None) + + class KeyRing: def __init__(self, namespace: str) -> None: self._namespace = namespace @@ -32,6 +39,25 @@ def __init__(self, namespace: str) -> None: def is_available(self) -> bool: return self._is_available + def get_credential( + self, *names: str, username: str | None = None + ) -> HTTPAuthCredential: + default = HTTPAuthCredential(username=username, password=None) + + if not self.is_available(): + return default + + import keyring + + for name in names: + credential = keyring.get_credential(name, username) + if credential: + return HTTPAuthCredential( + username=credential.username, password=credential.password + ) + + return default + def get_password(self, name: str, username: str) -> str | None: if not self.is_available(): return None diff --git a/tests/installation/test_pip_installer.py b/tests/installation/test_pip_installer.py index 719df21cd5b..797e7874eba 100644 --- a/tests/installation/test_pip_installer.py +++ b/tests/installation/test_pip_installer.py @@ -117,44 +117,22 @@ def test_install_with_non_pypi_default_repository(pool: Pool, installer: PipInst installer.install(bar) -def test_install_with_cert(): - ca_path = "path/to/cert.pem" - pool = Pool() - - default = LegacyRepository("default", "https://foo.bar", cert=Path(ca_path)) - - pool.add_repository(default, default=True) - - null_env = NullEnv() - - installer = PipInstaller(null_env, NullIO(), pool) - - foo = Package( - "foo", - "0.0.0", - source_type="legacy", - source_reference=default.name, - source_url=default.url, - ) - - installer.install(foo) - - assert len(null_env.executed) == 1 - cmd = null_env.executed[0] - assert "--cert" in cmd - cert_index = cmd.index("--cert") - # Need to do the str(Path()) bit because Windows paths get modified by Path - assert cmd[cert_index + 1] == str(Path(ca_path)) - - -def test_install_with_client_cert(): +@pytest.mark.parametrize( + ("key", "option"), + [ + ("cert", "client-cert"), + ("verify", "cert"), + ], +) +def test_install_with_certs(mocker: MockerFixture, key: str, option: str): client_path = "path/to/client.pem" - pool = Pool() - - default = LegacyRepository( - "default", "https://foo.bar", client_cert=Path(client_path) + mocker.patch( + "poetry.utils.authenticator.Authenticator.get_certs_for_url", + return_value={key: client_path}, ) + default = LegacyRepository("default", "https://foo.bar") + pool = Pool() pool.add_repository(default, default=True) null_env = NullEnv() @@ -173,8 +151,8 @@ def test_install_with_client_cert(): assert len(null_env.executed) == 1 cmd = null_env.executed[0] - assert "--client-cert" in cmd - cert_index = cmd.index("--client-cert") + assert f"--{option}" in cmd + cert_index = cmd.index(f"--{option}") # Need to do the str(Path()) bit because Windows paths get modified by Path assert cmd[cert_index + 1] == str(Path(client_path)) diff --git a/tests/repositories/test_legacy_repository.py b/tests/repositories/test_legacy_repository.py index 85b89646c6e..0c5f0111432 100644 --- a/tests/repositories/test_legacy_repository.py +++ b/tests/repositories/test_legacy_repository.py @@ -28,6 +28,11 @@ from _pytest.monkeypatch import MonkeyPatch +@pytest.fixture(autouse=True) +def _use_simple_keyring(with_simple_keyring: None) -> None: + pass + + class MockRepository(LegacyRepository): FIXTURES = Path(__file__).parent / "fixtures" / "legacy" @@ -405,7 +410,7 @@ def test_get_redirected_response_url( repo = MockHttpRepository({"/foo": 200}, http) redirect_url = "http://legacy.redirect.bar" - def get_mock(url: str) -> requests.Response: + def get_mock(url: str, raise_for_status: bool = True) -> requests.Response: response = requests.Response() response.status_code = 200 response.url = redirect_url + "/foo" diff --git a/tests/repositories/test_pypi_repository.py b/tests/repositories/test_pypi_repository.py index 6b30dc46ad3..7a188d7cb07 100644 --- a/tests/repositories/test_pypi_repository.py +++ b/tests/repositories/test_pypi_repository.py @@ -22,6 +22,11 @@ from pytest_mock import MockerFixture +@pytest.fixture(autouse=True) +def _use_simple_keyring(with_simple_keyring: None) -> None: + pass + + class MockRepository(PyPiRepository): JSON_FIXTURES = Path(__file__).parent / "fixtures" / "pypi.org" / "json" @@ -218,10 +223,11 @@ def test_get_should_invalid_cache_on_too_many_redirects_error(mocker: MockerFixt delete_cache = mocker.patch("cachecontrol.caches.file_cache.FileCache.delete") response = Response() + response.status_code = 200 response.encoding = "utf-8" response.raw = BytesIO(encode('{"foo": "bar"}')) mocker.patch( - "cachecontrol.adapter.CacheControlAdapter.send", + "poetry.utils.authenticator.Authenticator.get", side_effect=[TooManyRedirects(), response], ) repository = PyPiRepository() diff --git a/tests/utils/test_authenticator.py b/tests/utils/test_authenticator.py index 4c8f77faedf..2d9d282e1d1 100644 --- a/tests/utils/test_authenticator.py +++ b/tests/utils/test_authenticator.py @@ -1,5 +1,6 @@ from __future__ import annotations +import base64 import re import uuid @@ -286,19 +287,16 @@ def callback( assert sleep.call_count == attempts -@pytest.fixture -def environment_repository_credentials(monkeypatch: MonkeyPatch) -> None: - monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_USERNAME", "bar") - monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_PASSWORD", "baz") - - def test_authenticator_uses_env_provided_credentials( config: Config, environ: None, mock_remote: type[httpretty.httpretty], http: type[httpretty.httpretty], - environment_repository_credentials: None, + monkeypatch: MonkeyPatch, ): + monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_USERNAME", "bar") + monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_PASSWORD", "baz") + config.merge({"repositories": {"foo": {"url": "https://foo.bar/simple/"}}}) authenticator = Authenticator(config, NullIO()) @@ -339,10 +337,12 @@ def test_authenticator_uses_certs_from_config_if_not_provided( ) authenticator = Authenticator(config, NullIO()) - session_send = mocker.patch.object(authenticator.session, "send") + url = "https://foo.bar/files/foo-0.1.0.tar.gz" + session = authenticator.get_session(url) + session_send = mocker.patch.object(session, "send") authenticator.request( "get", - "https://foo.bar/files/foo-0.1.0.tar.gz", + url, verify=cert, cert=client_cert, ) @@ -350,3 +350,177 @@ def test_authenticator_uses_certs_from_config_if_not_provided( assert Path(kwargs["verify"]) == Path(cert or configured_cert) assert Path(kwargs["cert"]) == Path(client_cert or configured_client_cert) + + +def test_authenticator_uses_credentials_from_config_matched_by_url_path( + config: Config, mock_remote: None, http: type[httpretty.httpretty] +): + config.merge( + { + "repositories": { + "foo-alpha": {"url": "https://foo.bar/alpha/files/simple/"}, + "foo-beta": {"url": "https://foo.bar/beta/files/simple/"}, + }, + "http-basic": { + "foo-alpha": {"username": "bar", "password": "alpha"}, + "foo-beta": {"username": "baz", "password": "beta"}, + }, + } + ) + + authenticator = Authenticator(config, NullIO()) + authenticator.request("get", "https://foo.bar/alpha/files/simple/foo-0.1.0.tar.gz") + + request = http.last_request() + + basic_auth = base64.b64encode(b"bar:alpha").decode() + assert request.headers["Authorization"] == f"Basic {basic_auth}" + + # Make request on second repository with the same netloc but different credentials + authenticator.request("get", "https://foo.bar/beta/files/simple/foo-0.1.0.tar.gz") + + request = http.last_request() + + basic_auth = base64.b64encode(b"baz:beta").decode() + assert request.headers["Authorization"] == f"Basic {basic_auth}" + + +def test_authenticator_uses_credentials_from_config_with_at_sign_in_path( + config: Config, mock_remote: None, http: type[httpretty.httpretty] +): + config.merge( + { + "repositories": { + "foo": {"url": "https://foo.bar/beta/files/simple/"}, + }, + "http-basic": { + "foo": {"username": "bar", "password": "baz"}, + }, + } + ) + authenticator = Authenticator(config, NullIO()) + authenticator.request("get", "https://foo.bar/beta/files/simple/f@@-0.1.0.tar.gz") + + request = http.last_request() + + basic_auth = base64.b64encode(b"bar:baz").decode() + assert request.headers["Authorization"] == f"Basic {basic_auth}" + + +def test_authenticator_falls_back_to_keyring_url_matched_by_path( + config: Config, + mock_remote: None, + http: type[httpretty.httpretty], + with_simple_keyring: None, + dummy_keyring: DummyBackend, +): + config.merge( + { + "repositories": { + "foo-alpha": {"url": "https://foo.bar/alpha/files/simple/"}, + "foo-beta": {"url": "https://foo.bar/beta/files/simple/"}, + } + } + ) + + dummy_keyring.set_password( + "https://foo.bar/alpha/files/simple/", None, SimpleCredential(None, "bar") + ) + dummy_keyring.set_password( + "https://foo.bar/beta/files/simple/", None, SimpleCredential(None, "baz") + ) + + authenticator = Authenticator(config, NullIO()) + + authenticator.request("get", "https://foo.bar/alpha/files/simple/foo-0.1.0.tar.gz") + request = http.last_request() + + basic_auth = base64.b64encode(b":bar").decode() + assert request.headers["Authorization"] == f"Basic {basic_auth}" + + authenticator.request("get", "https://foo.bar/beta/files/simple/foo-0.1.0.tar.gz") + request = http.last_request() + + basic_auth = base64.b64encode(b":baz").decode() + assert request.headers["Authorization"] == f"Basic {basic_auth}" + + +def test_authenticator_uses_env_provided_credentials_matched_by_url_path( + config: Config, + environ: None, + mock_remote: type[httpretty.httpretty], + http: type[httpretty.httpretty], + monkeypatch: MonkeyPatch, +): + monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_ALPHA_USERNAME", "bar") + monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_ALPHA_PASSWORD", "alpha") + monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_BETA_USERNAME", "baz") + monkeypatch.setenv("POETRY_HTTP_BASIC_FOO_BETA_PASSWORD", "beta") + + config.merge( + { + "repositories": { + "foo-alpha": {"url": "https://foo.bar/alpha/files/simple/"}, + "foo-beta": {"url": "https://foo.bar/beta/files/simple/"}, + } + } + ) + + authenticator = Authenticator(config, NullIO()) + + authenticator.request("get", "https://foo.bar/alpha/files/simple/foo-0.1.0.tar.gz") + request = http.last_request() + + basic_auth = base64.b64encode(b"bar:alpha").decode() + assert request.headers["Authorization"] == f"Basic {basic_auth}" + + authenticator.request("get", "https://foo.bar/beta/files/simple/foo-0.1.0.tar.gz") + request = http.last_request() + + basic_auth = base64.b64encode(b"baz:beta").decode() + assert request.headers["Authorization"] == f"Basic {basic_auth}" + + +def test_authenticator_azure_feed_guid_credentials( + config: Config, + mock_remote: None, + http: type[httpretty.httpretty], + with_simple_keyring: None, + dummy_keyring: DummyBackend, +): + config.merge( + { + "repositories": { + "alpha": { + "url": "https://foo.bar/org-alpha/_packaging/feed/pypi/simple/" + }, + "beta": { + "url": "https://foo.bar/org-beta/_packaging/feed/pypi/simple/" + }, + }, + "http-basic": { + "alpha": {"username": "foo", "password": "bar"}, + "beta": {"username": "baz", "password": "qux"}, + }, + } + ) + + authenticator = Authenticator(config, NullIO()) + + authenticator.request( + "get", + "https://foo.bar/org-alpha/_packaging/GUID/pypi/simple/a/1.0.0/a-1.0.0.whl", + ) + request = http.last_request() + + basic_auth = base64.b64encode(b"foo:bar").decode() + assert request.headers["Authorization"] == f"Basic {basic_auth}" + + authenticator.request( + "get", + "https://foo.bar/org-beta/_packaging/GUID/pypi/simple/b/1.0.0/a-1.0.0.whl", + ) + request = http.last_request() + + basic_auth = base64.b64encode(b"baz:qux").decode() + assert request.headers["Authorization"] == f"Basic {basic_auth}"