From c875a94c9a4817adc96644d666b6d9d435d22392 Mon Sep 17 00:00:00 2001 From: Matthias Dellweg Date: Mon, 22 Jan 2024 09:29:08 +0100 Subject: [PATCH] Retrieve missing password from dbus secret Use the secretservice library to interface with dbus compatible password managers. Fixes #821 --- CHANGES/821.feature | 1 + CHANGES/pulp-glue/821.feature | 1 + lower_bounds_constraints.lock | 1 + pulp-glue/pulp_glue/common/context.py | 12 ++- pulp-glue/pulp_glue/common/openapi.py | 125 +++++++++++++------------- pulpcore/cli/common/generic.py | 82 ++++++++++++++++- pyproject.toml | 1 + test_requirements.txt | 1 + 8 files changed, 153 insertions(+), 71 deletions(-) create mode 100644 CHANGES/821.feature create mode 100644 CHANGES/pulp-glue/821.feature diff --git a/CHANGES/821.feature b/CHANGES/821.feature new file mode 100644 index 000000000..ec7af8b69 --- /dev/null +++ b/CHANGES/821.feature @@ -0,0 +1 @@ +Added support for the dbus secret service to make use of password managers. diff --git a/CHANGES/pulp-glue/821.feature b/CHANGES/pulp-glue/821.feature new file mode 100644 index 000000000..fc689dee6 --- /dev/null +++ b/CHANGES/pulp-glue/821.feature @@ -0,0 +1 @@ +Added `auth` to `apikwargs` so you can plug in any `requests.auth.AuthBase`. diff --git a/lower_bounds_constraints.lock b/lower_bounds_constraints.lock index 4d80e7093..d9986de71 100644 --- a/lower_bounds_constraints.lock +++ b/lower_bounds_constraints.lock @@ -4,3 +4,4 @@ PyYAML==5.3 requests==2.24.0 schema==0.7.5 toml==0.10.2 +SecretStorage==3.3.3 diff --git a/pulp-glue/pulp_glue/common/context.py b/pulp-glue/pulp_glue/common/context.py index 240677419..50dbbf3fc 100644 --- a/pulp-glue/pulp_glue/common/context.py +++ b/pulp-glue/pulp_glue/common/context.py @@ -248,7 +248,7 @@ def echo(self, message: str, nl: bool = True, err: bool = False) -> None: """ raise NotImplementedError("PulpContext is an abstract class.") - def prompt(self, text: str, hide_input: bool = False) -> Any: + def prompt(self, text: str, hide_input: bool = False) -> str: """ Abstract function that will be called to ask for a password interactively. @@ -307,8 +307,14 @@ def api(self) -> OpenAPI: All calls to the API should be performed via `call`. """ if self._api is None: - if self._api_kwargs.get("username") and not self._api_kwargs.get("password"): - self._api_kwargs["password"] = self.prompt("password", hide_input=True) + if self._api_kwargs.get("username"): + # TODO Deprecate this interface for 'auth'. + if not self._api_kwargs.get("password"): + self._api_kwargs["password"] = self.prompt("password", hide_input=True) + self._api_kwargs["auth"] = ( + self._api_kwargs.pop("username"), + self._api_kwargs.pop("password", None), + ) try: self._api = OpenAPI( doc_path=f"{self._api_root}api/v3/docs/api.json", **self._api_kwargs diff --git a/pulp-glue/pulp_glue/common/openapi.py b/pulp-glue/pulp_glue/common/openapi.py index bddf4f434..9ed6870fa 100644 --- a/pulp-glue/pulp_glue/common/openapi.py +++ b/pulp-glue/pulp_glue/common/openapi.py @@ -5,9 +5,9 @@ import datetime import json import os +import typing as t from contextlib import suppress from io import BufferedReader -from typing import IO, Any, Callable, Dict, List, Optional, Tuple, Union from urllib.parse import urljoin import requests @@ -19,7 +19,7 @@ translation = get_translation(__package__) _ = translation.gettext -UploadType = Union[bytes, IO[bytes]] +UploadType = t.Union[bytes, t.IO[bytes]] SAFE_METHODS = ["GET", "HEAD", "OPTIONS"] ISO_DATE_FORMAT = "%Y-%m-%d" @@ -52,8 +52,7 @@ class OpenAPI: base_url: The base URL inlcuding the HTTP scheme, hostname and optional subpaths of the served api. doc_path: Path of the json api doc schema relative to the `base_url`. - username: Username used for basic auth. - password: Password used for basic auth. + auth: A requests compatible auth object. cert: Client certificate used for auth. key: Matching key for `cert` if not already included. validate_certs: Whether to check server TLS certificates agains a CA. @@ -68,40 +67,36 @@ def __init__( self, base_url: str, doc_path: str, - username: Optional[str] = None, - password: Optional[str] = None, - cert: Optional[str] = None, - key: Optional[str] = None, + auth: t.Optional[t.Union[t.Tuple[str, str], requests.auth.AuthBase]] = None, + cert: t.Optional[str] = None, + key: t.Optional[str] = None, validate_certs: bool = True, refresh_cache: bool = False, safe_calls_only: bool = False, - debug_callback: Optional[Callable[[int, str], Any]] = None, - user_agent: Optional[str] = None, - cid: Optional[str] = None, + debug_callback: t.Optional[t.Callable[[int, str], t.Any]] = None, + user_agent: t.Optional[str] = None, + cid: t.Optional[str] = None, ): if not validate_certs: urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - self.debug_callback: Callable[[int, str], Any] = debug_callback or (lambda i, x: None) + self.debug_callback: t.Callable[[int, str], t.Any] = debug_callback or (lambda i, x: None) self.base_url: str = base_url self.doc_path: str = doc_path self.safe_calls_only: bool = safe_calls_only self._session: requests.Session = requests.session() - if username and password: + if auth: if cert or key: - raise OpenAPIError(_("Cannot use both username/password and cert auth.")) - self._session.auth = (username, password) - elif username: - raise OpenAPIError(_("Password is required if username is set.")) - elif password: - raise OpenAPIError(_("Username is required if password is set.")) - elif cert and key: - self._session.cert = (cert, key) - elif cert: - self._session.cert = cert - elif key: - raise OpenAPIError(_("Cert is required if key is set.")) + raise OpenAPIError(_("Cannot use both 'auth' and 'cert'.")) + self._session.auth = auth + else: + if cert and key: + self._session.cert = (cert, key) + elif cert: + self._session.cert = cert + elif key: + raise OpenAPIError(_("Cert is required if key is set.")) self._session.headers.update( { "User-Agent": user_agent or f"Pulp-glue openapi parser ({__version__})", @@ -112,7 +107,7 @@ def __init__( self._session.headers["Correlation-Id"] = cid self._session.max_redirects = 0 - verify: Optional[Union[bool, str]] = ( + verify: t.Optional[t.Union[bool, str]] = ( os.environ.get("PULP_CA_BUNDLE") if validate_certs is not False else False ) session_settings = self._session.merge_environment_settings( @@ -147,12 +142,12 @@ def load_api(self, refresh_cache: bool = False) -> None: f.write(data) def _parse_api(self, data: bytes) -> None: - self.api_spec: Dict[str, Any] = json.loads(data) + self.api_spec: t.Dict[str, t.Any] = json.loads(data) if self.api_spec.get("openapi", "").startswith("3."): self.openapi_version: int = 3 else: raise OpenAPIError(_("Unknown schema version")) - self.operations: Dict[str, Any] = { + self.operations: t.Dict[str, t.Any] = { method_entry["operationId"]: (method, path) for path, path_entry in self.api_spec["paths"].items() for method, method_entry in path_entry.items() @@ -182,7 +177,7 @@ def _set_correlation_id(self, correlation_id: str) -> None: def param_spec( self, operation_id: str, param_type: str, required: bool = False - ) -> Dict[str, Any]: + ) -> t.Dict[str, t.Any]: method, path = self.operations[operation_id] path_spec = self.api_spec["paths"][path] method_spec = path_spec[method] @@ -206,10 +201,10 @@ def param_spec( def extract_params( self, param_in: str, - path_spec: Dict[str, Any], - method_spec: Dict[str, Any], - params: Dict[str, Any], - ) -> Dict[str, Any]: + path_spec: t.Dict[str, t.Any], + method_spec: t.Dict[str, t.Any], + params: t.Dict[str, t.Any], + ) -> t.Dict[str, t.Any]: param_specs = { entry["name"]: entry for entry in path_spec.get("parameters", []) @@ -222,7 +217,7 @@ def extract_params( if entry["in"] == param_in } ) - result: Dict[str, Any] = {} + result: t.Dict[str, t.Any] = {} for name in list(params.keys()): if name in param_specs: param = params.pop(name) @@ -231,7 +226,7 @@ def extract_params( if param_schema: param = self.validate_schema(param_schema, name, param) - if isinstance(param, List): + if isinstance(param, t.List): if not param: # Don't propagate an empty list here continue @@ -257,7 +252,7 @@ def extract_params( ) return result - def validate_schema(self, schema: Any, name: str, value: Any) -> Any: + def validate_schema(self, schema: t.Any, name: str, value: t.Any) -> t.Any: # Look if the schema is provided by reference schema_ref = schema.get("$ref") if schema_ref: @@ -336,8 +331,8 @@ def validate_schema(self, schema: Any, name: str, value: Any) -> Any: ) return value - def validate_object(self, schema: Any, name: str, value: Any) -> Dict[str, Any]: - if not isinstance(value, Dict): + def validate_object(self, schema: t.Any, name: str, value: t.Any) -> t.Dict[str, t.Any]: + if not isinstance(value, t.Dict): raise OpenAPIValidationError( _("'{name}' is expected to be an object.").format(name=name) ) @@ -366,13 +361,13 @@ def validate_object(self, schema: Any, name: str, value: Any) -> Dict[str, Any]: ) return value - def validate_array(self, schema: Any, name: str, value: Any) -> List[Any]: - if not isinstance(value, List): + def validate_array(self, schema: t.Any, name: str, value: t.Any) -> t.List[t.Any]: + if not isinstance(value, t.List): raise OpenAPIValidationError(_("'{name}' is expected to be a list.").format(name=name)) item_schema = schema["items"] return [self.validate_schema(item_schema, name, item) for item in value] - def validate_string(self, schema: Any, name: str, value: Any) -> Union[str, UploadType]: + def validate_string(self, schema: t.Any, name: str, value: t.Any) -> t.Union[str, UploadType]: enum = schema.get("enum") if enum: if value not in enum: @@ -411,7 +406,7 @@ def validate_string(self, schema: Any, name: str, value: Any) -> Union[str, Uplo ) return value - def validate_integer(self, schema: Any, name: str, value: Any) -> int: + def validate_integer(self, schema: t.Any, name: str, value: t.Any) -> int: if not isinstance(value, int): raise OpenAPIValidationError( _("'{name}' is expected to be an integer.").format(name=name) @@ -428,7 +423,7 @@ def validate_integer(self, schema: Any, name: str, value: Any) -> int: ) return value - def validate_number(self, schema: Any, name: str, value: Any) -> float: + def validate_number(self, schema: t.Any, name: str, value: t.Any) -> float: # https://swagger.io/specification/#data-types describes float and double. # Python does not distinguish them. if not isinstance(value, float): @@ -439,16 +434,16 @@ def validate_number(self, schema: Any, name: str, value: Any) -> float: def render_request_body( self, - method_spec: Dict[str, Any], - body: Optional[Dict[str, Any]] = None, + method_spec: t.Dict[str, t.Any], + body: t.Optional[t.Dict[str, t.Any]] = None, validate_body: bool = True, - ) -> Tuple[ - Optional[str], - Optional[Dict[str, Any]], - Optional[Dict[str, Any]], - Optional[List[Tuple[str, Tuple[str, UploadType, str]]]], + ) -> t.Tuple[ + t.Optional[str], + t.Optional[t.Dict[str, t.Any]], + t.Optional[t.Dict[str, t.Any]], + t.Optional[t.List[t.Tuple[str, t.Tuple[str, UploadType, str]]]], ]: - content_types: List[str] = [] + content_types: t.List[str] = [] try: request_body_spec = method_spec["requestBody"] except KeyError: @@ -463,10 +458,10 @@ def render_request_body( content_types = list(request_body_spec["content"].keys()) assert body is not None - content_type: Optional[str] = None - data: Optional[Dict[str, Any]] = None - json: Optional[Dict[str, Any]] = None - files: Optional[List[Tuple[str, Tuple[str, UploadType, str]]]] = None + content_type: t.Optional[str] = None + data: t.Optional[t.Dict[str, t.Any]] = None + json: t.Optional[t.Dict[str, t.Any]] = None + files: t.Optional[t.List[t.Tuple[str, t.Tuple[str, UploadType, str]]]] = None candidate_content_types = [ "multipart/form-data", @@ -476,7 +471,7 @@ def render_request_body( "application/json", "application/x-www-form-urlencoded", ] + candidate_content_types - errors: List[str] = [] + errors: t.List[str] = [] for candidate in candidate_content_types: content_type = next( ( @@ -504,7 +499,7 @@ def render_request_body( elif content_type.startswith("application/x-www-form-urlencoded"): data = body elif content_type.startswith("multipart/form-data"): - uploads: Dict[str, Tuple[str, UploadType, str]] = {} + uploads: t.Dict[str, t.Tuple[str, UploadType, str]] = {} data = {} # Extract and prepare the files to upload if body: @@ -536,12 +531,12 @@ def render_request_body( def render_request( self, - path_spec: Dict[str, Any], + path_spec: t.Dict[str, t.Any], method: str, url: str, - params: Dict[str, Any], - headers: Dict[str, str], - body: Optional[Dict[str, Any]] = None, + params: t.Dict[str, t.Any], + headers: t.Dict[str, str], + body: t.Optional[t.Dict[str, t.Any]] = None, validate_body: bool = True, ) -> requests.PreparedRequest: method_spec = path_spec[method] @@ -557,7 +552,7 @@ def render_request( ), f"{request.headers['content-type']} != {content_type}" return request - def parse_response(self, method_spec: Dict[str, Any], response: requests.Response) -> Any: + def parse_response(self, method_spec: t.Dict[str, t.Any], response: requests.Response) -> t.Any: if response.status_code == 204: return "{}" @@ -581,10 +576,10 @@ def parse_response(self, method_spec: Dict[str, Any], response: requests.Respons def call( self, operation_id: str, - parameters: Optional[Dict[str, Any]] = None, - body: Optional[Dict[str, Any]] = None, + parameters: t.Optional[t.Dict[str, t.Any]] = None, + body: t.Optional[t.Dict[str, t.Any]] = None, validate_body: bool = True, - ) -> Any: + ) -> t.Any: """ Make a call to the server. diff --git a/pulpcore/cli/common/generic.py b/pulpcore/cli/common/generic.py index 248e9504c..e7c71e598 100644 --- a/pulpcore/cli/common/generic.py +++ b/pulpcore/cli/common/generic.py @@ -2,9 +2,11 @@ import json import re import typing as t +from contextlib import closing from functools import lru_cache, wraps import click +import requests import schema as s import yaml from pulp_glue.common.context import ( @@ -37,6 +39,13 @@ PYGMENTS = True PYGMENTS_STYLE = "solarized-dark" +try: + import secretstorage +except ImportError: + SECRET_STORAGE = False +else: + SECRET_STORAGE = True + translation = get_translation(__package__) _ = translation.gettext @@ -70,6 +79,68 @@ def default(self, obj: t.Any) -> t.Any: return super().default(obj) +class PulpCLIBasicAuth(requests.auth.HTTPBasicAuth): + def __init__( + self, base_url: str, api_root: str, username: str, password: t.Optional[str] + ) -> None: + super().__init__(username, password or "") + self.base_url = base_url + self.api_root = api_root + if SECRET_STORAGE: + assert isinstance(self.username, str) + + self.attr: t.Dict[str, str] = { + "service": "pulp-cli", + "base_url": self.base_url, + "api_root": self.api_root, + "username": self.username, + } + + def __eq__(self, other: object) -> bool: + return ( + super().__eq__(other) + and self.base_url == getattr(other, "base_url", None) + and self.api_root == getattr(other, "api_root", None) + ) + + def response_hook(self, response: requests.Response, **kwargs: t.Any) -> requests.Response: + # https://docs.python-requests.org/en/latest/_modules/requests/auth/#HTTPDigestAuth + if 200 <= response.status_code < 300 and not self.password_in_manager: + assert isinstance(self.password, str) + + with closing(secretstorage.dbus_init()) as connection: + collection = secretstorage.get_default_collection(connection) + collection.create_item("Pulp CLI", self.attr, self.password.encode(), replace=True) + if response.status_code == 401 and self.password_in_manager: + with closing(secretstorage.dbus_init()) as connection: + collection = secretstorage.get_default_collection(connection) + item = next(collection.search_items(self.attr), None) + if item: + if click.confirm(_("Remove failed password from password manager?")): + item.delete() + return response + + def __call__(self, request: requests.PreparedRequest) -> requests.PreparedRequest: + if self.password == "": + if SECRET_STORAGE: + assert isinstance(self.username, str) + assert isinstance(self.password, str) + + with closing(secretstorage.dbus_init()) as connection: + collection = secretstorage.get_default_collection(connection) + item = next(collection.search_items(self.attr), None) + if item: + self.password = item.get_secret().decode() + self.password_in_manager = True + else: + self.password = str(click.prompt("Password", hide_input=True)) + self.password_in_manager = False + request.register_hook("response", self.response_hook) # type: ignore + else: + self.password = click.prompt("Password", hide_input=True) + return super().__call__(request) # type: ignore + + class PulpCLIContext(PulpContext): """ Subclass of the Context that overwrites the CLI specifics. @@ -93,6 +164,14 @@ def __init__( format: str, domain: str = "default", ) -> None: + username = api_kwargs.pop("username", None) + if username is not None: + api_kwargs["auth"] = PulpCLIBasicAuth( + api_kwargs["base_url"], + api_root, + username, + api_kwargs.pop("password", None), + ) super().__init__( api_root=api_root, api_kwargs=api_kwargs, @@ -105,9 +184,6 @@ def __init__( def echo(self, message: str, nl: bool = True, err: bool = False) -> None: click.echo(message, nl=nl, err=err) - def prompt(self, text: str, hide_input: bool = False) -> t.Any: - return click.prompt(text, hide_input=hide_input) - def output_result(self, result: t.Any) -> None: """ Dump the provided result to the console using the selected renderer. diff --git a/pyproject.toml b/pyproject.toml index 8bd93fcc5..57bd2ff45 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,7 @@ dependencies = [ [project.optional-dependencies] pygments = ["pygments"] shell = ["click-shell~=2.1"] +password-manager = ["SecretStorage>=3.3.3,<3.3.4"] [project.urls] documentation = "https://docs.pulpproject.org/pulp_cli/" diff --git a/test_requirements.txt b/test_requirements.txt index 48a015f63..9ea81aade 100644 --- a/test_requirements.txt +++ b/test_requirements.txt @@ -3,3 +3,4 @@ pygments pytest pytest-subtests python-gnupg +SecretStorage