diff --git a/changelog.d/20240401_164617_derek_consent_forest_sc_30764.rst b/changelog.d/20240401_164617_derek_consent_forest_sc_30764.rst new file mode 100644 index 000000000..2d68ec1f2 --- /dev/null +++ b/changelog.d/20240401_164617_derek_consent_forest_sc_30764.rst @@ -0,0 +1,7 @@ + +Added +~~~~~ + +- Added a new AuthClient method ``get_consents`` and supporting local data objects. + These allows a client to poll and interact with the current Globus Auth consent state + of a particular identity rooted at their client. (:pr:`NUMBER`) diff --git a/docs/experimental/consents.rst b/docs/experimental/consents.rst new file mode 100644 index 000000000..504808621 --- /dev/null +++ b/docs/experimental/consents.rst @@ -0,0 +1,30 @@ + +.. py:currentmodule:: globus_sdk.experimental.consents + +Consents +======== + +The Consents model provides a data model for loading consent information polled from +Globus Auth's ``get_consents`` API. + +Consents are modeled as a ``ConsentForest`` full of ``ConsentTrees`` containing related +``Consents``. These consents detail a path of authorization grants that have been +provided by a user to client applications for token grants under certain scoped +contexts. + +Reference +========= + +.. autoclass:: ConsentForest + :members: + +.. autoclass:: ConsentTree + :members: + +.. autoclass:: Consent + :members: + +.. autoexception:: ConsentParseError + +.. autoexception:: ConsentTreeConstructionError + diff --git a/docs/experimental/index.rst b/docs/experimental/index.rst index 4025d385f..846e619d2 100644 --- a/docs/experimental/index.rst +++ b/docs/experimental/index.rst @@ -18,3 +18,4 @@ Globus SDK Experimental Components auth_requirements_errors scope_parser + consents diff --git a/docs/services/auth.rst b/docs/services/auth.rst index 49065ad84..8dc8e5509 100644 --- a/docs/services/auth.rst +++ b/docs/services/auth.rst @@ -65,10 +65,19 @@ Auth Responses :members: :show-inheritance: +.. autoclass:: GetConsentsResponse + :members: + :show-inheritance: + .. autoclass:: GetIdentitiesResponse :members: :show-inheritance: +Errors +------ + +.. autoexception:: AuthAPIError + OAuth2 Flow Managers -------------------- diff --git a/src/globus_sdk/__init__.py b/src/globus_sdk/__init__.py index f91ad60f4..b6c4a2db1 100644 --- a/src/globus_sdk/__init__.py +++ b/src/globus_sdk/__init__.py @@ -58,6 +58,7 @@ def _force_eager_imports() -> None: "ConfidentialAppAuthClient", "AuthAPIError", "IdentityMap", + "GetConsentsResponse", "GetIdentitiesResponse", "OAuthDependentTokenResponse", "OAuthTokenResponse", @@ -172,6 +173,7 @@ def _force_eager_imports() -> None: from .services.auth import ConfidentialAppAuthClient from .services.auth import AuthAPIError from .services.auth import IdentityMap + from .services.auth import GetConsentsResponse from .services.auth import GetIdentitiesResponse from .services.auth import OAuthDependentTokenResponse from .services.auth import OAuthTokenResponse @@ -298,6 +300,7 @@ def __getattr__(name: str) -> t.Any: "GCSAPIError", "GCSClient", "GCSRoleDocument", + "GetConsentsResponse", "GetIdentitiesResponse", "GlobusAPIError", "GlobusConnectPersonalOwnerInfo", diff --git a/src/globus_sdk/_generate_init.py b/src/globus_sdk/_generate_init.py index e33c97cf9..79811b112 100755 --- a/src/globus_sdk/_generate_init.py +++ b/src/globus_sdk/_generate_init.py @@ -120,6 +120,7 @@ def __getattr__(name: str) -> t.Any: # high-level helpers "IdentityMap", # responses + "GetConsentsResponse", "GetIdentitiesResponse", "OAuthDependentTokenResponse", "OAuthTokenResponse", diff --git a/src/globus_sdk/_testing/data/auth/get_consents.py b/src/globus_sdk/_testing/data/auth/get_consents.py new file mode 100644 index 000000000..3270cb528 --- /dev/null +++ b/src/globus_sdk/_testing/data/auth/get_consents.py @@ -0,0 +1,49 @@ +from globus_sdk._testing.models import RegisteredResponse, ResponseSet + +_DATA_ACCESS = ( + "https://auth.globus.org/scopes/542a86fc-1766-450d-841f-065488a2ec01/data_access" +) + +RESPONSES = ResponseSet( + default=RegisteredResponse( + service="auth", + path="/v2/api/identities/8ca28797-3541-4a5d-a264-05b00f91e608/consents", + json={ + "consents": [ + { + "created": "2022-09-21T17:10:14.270581+00:00", + "id": 142632, + "status": "approved", + "updated": "2022-09-21T17:10:14.270581+00:00", + "allows_refresh": True, + "dependency_path": [142632], + "scope_name": "urn:globus:auth:scope:transfer.api.globus.org:all", + "atomically_revocable": False, + "effective_identity": "8ca28797-3541-4a5d-a264-05b00f91e608", + "auto_approved": False, + "last_used": "2024-03-18T17:34:04.719126+00:00", + "scope": "89ecabba-4acf-4e2e-a98d-ce592ccc2818", + "client": "065db752-2f43-4fe1-a633-2ee68c9da889", + }, + { + "created": "2024-03-18T17:32:51.496893+00:00", + "id": 433892, + "status": "approved", + "updated": "2024-03-18T17:32:51.496893+00:00", + "allows_refresh": True, + "dependency_path": [142632, 433892], + "scope_name": _DATA_ACCESS, + "atomically_revocable": True, + "effective_identity": "8ca28797-3541-4a5d-a264-05b00f91e608", + "auto_approved": False, + "last_used": "2024-03-18T17:33:05.178254+00:00", + "scope": "fe334c19-4fe6-4d03-ac73-8992beb231b6", + "client": "2fbdda78-a599-4cb5-ac3d-1fbcfbc6a754", + }, + ] + }, + metadata={ + "identity_id": "8ca28797-3541-4a5d-a264-05b00f91e608", + }, + ), +) diff --git a/src/globus_sdk/experimental/consents/__init__.py b/src/globus_sdk/experimental/consents/__init__.py new file mode 100644 index 000000000..837163a62 --- /dev/null +++ b/src/globus_sdk/experimental/consents/__init__.py @@ -0,0 +1,10 @@ +from ._errors import ConsentParseError, ConsentTreeConstructionError +from ._model import Consent, ConsentForest, ConsentTree + +__all__ = [ + "Consent", + "ConsentTree", + "ConsentForest", + "ConsentParseError", + "ConsentTreeConstructionError", +] diff --git a/src/globus_sdk/experimental/consents/_errors.py b/src/globus_sdk/experimental/consents/_errors.py new file mode 100644 index 000000000..3c1b9ca19 --- /dev/null +++ b/src/globus_sdk/experimental/consents/_errors.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +import typing as t + +if t.TYPE_CHECKING: + from ._model import Consent + + +class ConsentParseError(Exception): + """An error raised if consent parsing/loading fails.""" + + def __init__(self, message: str, raw_consent: dict[str, t.Any]): + super().__init__(message) + self.raw_consent = raw_consent + + +class ConsentTreeConstructionError(Exception): + """An error raised if consent tree construction fails.""" + + def __init__(self, message: str, consents: list[Consent]): + super().__init__(message) + self.consents = consents diff --git a/src/globus_sdk/experimental/consents/_model.py b/src/globus_sdk/experimental/consents/_model.py new file mode 100644 index 000000000..c7c69403b --- /dev/null +++ b/src/globus_sdk/experimental/consents/_model.py @@ -0,0 +1,322 @@ +""" +This module provides convenience data structures to model and interact with + Globus Auth consents. + +The resources defined herein are: + * ``Consent`` + * A data object modeling a user's grant for a client to perform some scoped + operation on their behalf. + * This grant is conditional on the invocation path leading to the client's + attempted operation being initiated through a chain of similarly scoped + operations (consents) defined in the "dependency_path". + * ``ConsentTree`` + * A tree composed of Consent nodes with edges modeling the dependency + relationships between them. + * A `meets_scope_requirements` method is defined to check whether a scope + requirement, including dependent scope requirements, is satisfied by the + tree. + * ``ConsentForest`` + * A collection of all ConsentTrees for a user rooted under a particular client + (the client that initiated the request for consents). + * A `meets_scope_requirements` method is defined to check whether a scope + requirement, including dependent scope requirements, is satisfied by any + tree in the forest. +""" + +from __future__ import annotations + +import typing as t +from dataclasses import dataclass +from datetime import datetime + +from globus_sdk import Scope +from globus_sdk._types import UUIDLike + +from ._errors import ConsentParseError, ConsentTreeConstructionError + + +@dataclass +class Consent: + """ + Consent Data Object + + This object models: + * A grant which a user has provided for a client to perform a particular + scoped operation on their behalf. + * The consent is conditional on the invocation path leading to the client's + attempted operation being initiated through a chain of similarly scoped + operations (consents) defined in the "dependency_path". + """ + + client: UUIDLike + scope: UUIDLike + scope_name: str + id: int + effective_identity: UUIDLike + # A list representing the path of consent dependencies leading from a "root consent" + # to this. The last element of this list will always be this consent's ID. + # Downstream dependency relationships may exist but will not be defined here. + dependency_path: list[int] + created: datetime + updated: datetime + last_used: datetime + status: str + allows_refresh: bool + auto_approved: bool + atomically_revocable: bool + + @classmethod + def load(cls, data: t.Mapping[str, t.Any]) -> Consent: + """ + Load a Consent object from a raw data dictionary. + + :param data: A dictionary containing the raw consent data. + :raises: ConsentParseError if the data is missing a required key. + """ + try: + return cls( + id=data["id"], + client=data["client"], + scope=data["scope"], + effective_identity=data["effective_identity"], + dependency_path=data["dependency_path"], + scope_name=data["scope_name"], + created=datetime.fromisoformat(data["created"]), + updated=datetime.fromisoformat(data["updated"]), + last_used=datetime.fromisoformat(data["last_used"]), + status=data["status"], + allows_refresh=data["allows_refresh"], + auto_approved=data["auto_approved"], + atomically_revocable=data["atomically_revocable"], + ) + except KeyError as e: + raise ConsentParseError( + f"Failed to load Consent object. Missing required key: {e}.", + raw_consent=dict(data), + ) from e + + def __str__(self) -> str: + client_id = str(self.client) + return f"Consent [{self.id}]: Client [{client_id}] -> Scope [{self.scope_name}]" + + +class ConsentForest: + """ + A ConsentForest is a data structure which models relationships between Consents, + objects describing explicit access users have granted to particular clients. + It exists to expose a simple interface for evaluating whether resource server grant + requirements, as defined by a scope object are satisfied. + + Consents should be retrieved from the AuthClient's `get_consents` method. + + Example usage: + + >>> auth_client = AuthClient(...) + >>> identity_id = ... + >>> forest = auth_client.get_consents(identity_id).to_forest() + >>> + >>> # Check whether the forest contains a scope relationship + >>> dependent_scope = GCSCollectionScopeBuilder(collection_id).data_access + >>> scope = f"{TransferScopes.all}[{dependent_scope}]" + >>> forest.contains_scopes(scope) + + + The following diagram demonstrates a Consent Forest in which a user has consented + to a client ("CLI") initiating transfers against two collections, both of which + require a "data_access" dynamic scope. + Contained Scope String: + `transfer:all[:data_access :data_access]` + + .. code-block:: rst + + [Consent A ] [Consent B ] + [Client: CLI ] -> [Client: Transfer ] + [Scope: transfer:all] [Scope: :data_access] + | + | [Consent C ] + |--------------> [Client: Transfer ] + [Scope: :data_access] + """ + + def __init__(self, consents: t.Iterable[t.Mapping[str, t.Any] | Consent]): + """ + :param consents: An iterable of consent data objects. Typically, this will be + a ConsentForestResponse retrieved via `auth_client.get_consents(identity)`. + This iterable may contain either raw consent data as a dict or pre-loaded + Consents. + """ + self.nodes = [ + consent if isinstance(consent, Consent) else Consent.load(consent) + for consent in consents + ] + # Build an index on consent id for constant time lookups + self._node_by_id = {node.id: node for node in self.nodes} + + self.edges = self._compute_edges() + self.trees = self._build_trees() + + def _compute_edges(self) -> dict[int, set[int]]: + """ + Compute the edges of the forest mapping parent -> child. + + A consent's parent node id is defined as the penultimate element of the + consent's dependency path. + A consent with dependency list of length 1 is a root node (has no parent). + """ + edges: dict[int, set[int]] = {node.id: set() for node in self.nodes} + for node in self.nodes: + if len(node.dependency_path) > 1: + parent_id = node.dependency_path[-2] + try: + edges[parent_id].add(node.id) + except KeyError as e: + raise ConsentTreeConstructionError( + f"Failed to compute forest edges. Missing parent node: {e}.", + consents=self.nodes, + ) from e + return edges + + def _build_trees(self) -> list[ConsentTree]: + """ + Build out the list of trees in the forest. + + A distinct tree is built out for each "root nodes" (nodes with no parents). + """ + + # A node with dependency path length 1 has no parents, so it is a root. + roots = [node for node in self.nodes if len(node.dependency_path) == 1] + return [ConsentTree(root.id, self) for root in roots] + + def get_node(self, consent_id: int) -> Consent: + return self._node_by_id[consent_id] + + def meets_scope_requirements(self, scopes: Scope | str | list[Scope | str]) -> bool: + """ + Check whether this consent meets one or more scope requirements. + + A consent forest meets a particular scope requirement if any consent tree + inside the forest meets the scope requirements. + + :param scopes: A single scope, a list of scopes, or a scope string to check + against the forest. + :returns: True if all scope requirements are met, False otherwise. + """ + for scope in _normalize_scope_types(scopes): + if not any(tree.meets_scope_requirements(scope) for tree in self.trees): + return False + return True + + +class ConsentTree: + """ + A tree of Consent nodes with edges modeling the dependency relationships between + them. + + :raises: ConsentParseError if the tree cannot be constructed due to missing + consent dependencies. + """ + + def __init__(self, root_id: int, forest: ConsentForest): + self.root = forest.get_node(root_id) + self.nodes = [self.root] + self._node_by_id = {root_id: self.root} + self.edges: dict[int, set[int]] = {} + + self._populate_connected_nodes_and_edges(forest) + + def _populate_connected_nodes_and_edges(self, forest: ConsentForest) -> None: + """ + Populate the nodes and edges of the tree by traversing the forest. + + Nodes/Edges are included in the tree iff they are reachable from the root. + """ + nodes_to_evaluate = {self.root.id} + while nodes_to_evaluate: + consent_id = nodes_to_evaluate.pop() + consent = forest.get_node(consent_id) + self.edges[consent.id] = forest.edges[consent.id] + + for child_id in self.edges[consent.id]: + if child_id not in self._node_by_id: + self.nodes.append(forest.get_node(child_id)) + self._node_by_id[child_id] = forest.get_node(child_id) + nodes_to_evaluate.add(child_id) + + def get_node(self, consent_id: int) -> Consent: + return self._node_by_id[consent_id] + + def meets_scope_requirements(self, scope: Scope) -> bool: + """ + Check whether this consent tree meets a particular scope requirement. + + :param scope: A Scope requirement to check against the tree. + """ + return self._meets_scope_requirements_recursive(self.root, scope) + + def _meets_scope_requirements_recursive(self, node: Consent, scope: Scope) -> bool: + """ + Check recursively whether a consent node meets the scope requirements defined + by a scope object. + """ + if node.scope_name != scope.scope_string: + return False + + for dependent_scope in scope.dependencies: + for child_id in self.edges[node.id]: + if self._meets_scope_requirements_recursive( + self.get_node(child_id), dependent_scope + ): + # We found a child containing this full dependent scope tree + # Move onto the next dependent scope tree + break + else: + # We didn't find any child containing this full dependent scope tree + return False + # We found at least one child containing each full dependent scope tree + return True + + @property + def max_depth(self) -> int: + return self._max_depth_recursive(self.root, 1) + + def _max_depth_recursive(self, node: Consent, depth: int) -> int: + if len(self.edges[node.id]) == 0: + return depth + return max( + self._max_depth_recursive(self.get_node(child_id), depth + 1) + for child_id in self.edges[node.id] + ) + + def __str__(self) -> str: + """Returns a textual representation of the tree to stdout (one line per node)""" + return self._str_recursive(self.root, 0) + + def _str_recursive(self, node: Consent, tab_depth: int) -> str: + _str = f"{' ' * tab_depth} - {node}" + for child_id in self.edges[node.id]: + _str += "\n" + self._str_recursive(self.get_node(child_id), tab_depth + 2) + return _str + + +def _normalize_scope_types(scopes: Scope | str | list[Scope | str]) -> list[Scope]: + """ + Normalize the input scope types into a list of Scope objects. + + Strings are parsed into 1 or more Scopes using `Scope.parse`. + + :param scopes: Some collection of 0 or more scopes as Scope or scope strings. + :returns: A list of Scope objects. + """ + + if isinstance(scopes, Scope): + return [scopes] + elif isinstance(scopes, str): + return Scope.parse(scopes) + else: + scope_list = [] + for scope in scopes: + if isinstance(scope, str): + scope_list.extend(Scope.parse(scope)) + else: + scope_list.append(scope) + return scope_list diff --git a/src/globus_sdk/scopes/data.py b/src/globus_sdk/scopes/data.py index 28879b20f..58951a0d6 100644 --- a/src/globus_sdk/scopes/data.py +++ b/src/globus_sdk/scopes/data.py @@ -59,6 +59,7 @@ class _AuthScopesBuilder(ScopeBuilder): "view_authentications", "view_clients", "view_clients_and_scopes", + "view_consents", "view_identities", "view_identity_set", ], diff --git a/src/globus_sdk/services/auth/__init__.py b/src/globus_sdk/services/auth/__init__.py index 85dfd22f8..cf15dc3fd 100644 --- a/src/globus_sdk/services/auth/__init__.py +++ b/src/globus_sdk/services/auth/__init__.py @@ -12,6 +12,7 @@ ) from .identity_map import IdentityMap from .response import ( + GetConsentsResponse, GetIdentitiesResponse, OAuthDependentTokenResponse, OAuthTokenResponse, @@ -26,12 +27,13 @@ # errors "AuthAPIError", # high-level helpers - "IdentityMap", "DependentScopeSpec", + "IdentityMap", # flow managers "GlobusNativeAppFlowManager", "GlobusAuthorizationCodeFlowManager", # responses + "GetConsentsResponse", "GetIdentitiesResponse", "OAuthDependentTokenResponse", "OAuthTokenResponse", diff --git a/src/globus_sdk/services/auth/client/service_client.py b/src/globus_sdk/services/auth/client/service_client.py index 657ebc037..6a603a3bc 100644 --- a/src/globus_sdk/services/auth/client/service_client.py +++ b/src/globus_sdk/services/auth/client/service_client.py @@ -19,6 +19,7 @@ from ..response import ( GetClientCredentialsResponse, GetClientsResponse, + GetConsentsResponse, GetIdentitiesResponse, GetIdentityProvidersResponse, GetPoliciesResponse, @@ -1789,7 +1790,7 @@ def delete_scope(self, scope_id: UUIDLike) -> GlobusHTTPResponse: >>> ac = globus_sdk.AuthClient(...) >>> scope_id = ... - >>> r = ac.delete_policy(scope_id) + >>> r = ac.delete_scope(scope_id) .. tab-item:: Example Response Data @@ -1803,3 +1804,44 @@ def delete_scope(self, scope_id: UUIDLike) -> GlobusHTTPResponse: :ref: auth/reference/#delete_scope """ return self.delete(f"/v2/api/scopes/{scope_id}") + + def get_consents( + self, + identity_id: UUIDLike, + *, + # pylint: disable=redefined-builtin + all: bool = False, + ) -> GetConsentsResponse: + """ + Look up consents for a user. + + If requesting "all" consents, the view_consents scope is required. + + :param identity_id: The ID of the identity to look up consents for + :param all: If true, return all consents, including those that have + been issued to other clients. If false, return only consents rooted at this + client id for the requested identity. Most clients should pass False. + + .. tab-set:: + + .. tab-item:: Example Usage + + .. code-block:: pycon + + >>> ac = globus_sdk.AuthClient(...) + >>> identity_id = ... + >>> forest = ac.get_consents(identity_id).to_forest() + + .. tab-item:: Example Response Data + + .. expandtestfixture:: auth.get_consents + + .. tab-item:: API Info + + ``GET /v2/api/identities/{identity_id}/consents`` + """ + return GetConsentsResponse( + self.get( + f"/v2/api/identities/{identity_id}/consents", query_params={"all": all} + ) + ) diff --git a/src/globus_sdk/services/auth/response/__init__.py b/src/globus_sdk/services/auth/response/__init__.py index c22beec92..e56c0e167 100644 --- a/src/globus_sdk/services/auth/response/__init__.py +++ b/src/globus_sdk/services/auth/response/__init__.py @@ -1,4 +1,5 @@ from .clients import GetClientsResponse +from .consents import GetConsentsResponse from .credentials import GetClientCredentialsResponse from .identities import GetIdentitiesResponse, GetIdentityProvidersResponse from .oauth import OAuthDependentTokenResponse, OAuthTokenResponse @@ -11,6 +12,7 @@ "GetClientsResponse", "GetIdentitiesResponse", "GetIdentityProvidersResponse", + "GetConsentsResponse", "GetPoliciesResponse", "GetProjectsResponse", "GetScopesResponse", diff --git a/src/globus_sdk/services/auth/response/consents.py b/src/globus_sdk/services/auth/response/consents.py new file mode 100644 index 000000000..e0b9452c4 --- /dev/null +++ b/src/globus_sdk/services/auth/response/consents.py @@ -0,0 +1,25 @@ +from globus_sdk import IterableResponse +from globus_sdk.experimental.consents import ConsentForest + + +class GetConsentsResponse(IterableResponse): + """ + Response class specific to the Get Consents API + + Provides iteration on the "consents" array in the response. + """ + + default_iter_key = "consents" + + def to_forest(self) -> ConsentForest: + """ + Creates a ConsentForest from the consents in this response. + + ConsentForest is a convenience class to make interacting with the + tree of consents simpler. + + Note: + This interface relies on the experimental Consents data model which is + subject to change. + """ + return ConsentForest(self) diff --git a/tests/functional/services/auth/service_client/test_get_consents.py b/tests/functional/services/auth/service_client/test_get_consents.py new file mode 100644 index 000000000..792b47d0b --- /dev/null +++ b/tests/functional/services/auth/service_client/test_get_consents.py @@ -0,0 +1,16 @@ +from globus_sdk._testing import load_response + + +def test_get_consents(service_client): + meta = load_response(service_client.get_consents).metadata + res = service_client.get_consents(meta["identity_id"]) + + forest = res.to_forest() + assert len(forest.nodes) == 2 + assert len(forest.trees) == 1 + tree = forest.trees[0] + assert forest.trees[0].max_depth == 2 + assert tree.root.scope_name.endswith("transfer.api.globus.org:all") + children = [tree.get_node(child_id) for child_id in tree.edges[tree.root.id]] + assert len(children) == 1 + assert children[0].scope_name.endswith("data_access") diff --git a/tests/unit/services/auth/test_consents.py b/tests/unit/services/auth/test_consents.py new file mode 100644 index 000000000..a7ac1f572 --- /dev/null +++ b/tests/unit/services/auth/test_consents.py @@ -0,0 +1,204 @@ +from __future__ import annotations + +from collections import namedtuple +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from random import randint +from types import SimpleNamespace +from uuid import UUID + +import pytest + +from globus_sdk._types import UUIDLike +from globus_sdk.experimental.consents import ( + Consent, + ConsentForest, + ConsentTreeConstructionError, +) + +_zero_uuid = str(UUID(int=0)) + + +def _uuid_of(char: str) -> str: + if len(char) != 1: + raise ValueError(f"char must be a single character, got {char!r}") + return _zero_uuid.replace("0", char) + + +Clients = SimpleNamespace( + Zero=_uuid_of("0"), + One=_uuid_of("1"), + Two=_uuid_of("2"), + Three=_uuid_of("3"), +) +ScopeRepr = namedtuple("Scope", ["id", "name"]) +Scopes = SimpleNamespace( + A=ScopeRepr(_uuid_of("A"), "A"), + B=ScopeRepr(_uuid_of("B"), "B"), + C=ScopeRepr(_uuid_of("C"), "C"), + D=ScopeRepr(_uuid_of("D"), "D"), +) + + +@dataclass +class ConsentTest(Consent): + """ + A convenience Consent data subclass with default values for most fields to make + test case definition less verbose. + + Required fields: client, scope, scope_name + """ + + client: UUIDLike + scope: UUIDLike + scope_name: str + id: int = field(default_factory=lambda: randint(1, 10000)) + effective_identity: UUIDLike = _uuid_of("9") + dependency_path: list[int] = field(default_factory=list) + created: datetime = field( + default_factory=lambda: datetime.now() - timedelta(days=1) + ) + updated: datetime = field( + default_factory=lambda: datetime.now() - timedelta(days=1) + ) + last_used: datetime = field(default_factory=datetime.now) + status: str = "approved" + allows_refresh: bool = True + auto_approved: bool = False + atomically_revocable: bool = False + + def __post_init__(self): + # Append self to the dependency path if it's not already there + if not self.dependency_path or self.dependency_path[-1] != self.id: + self.dependency_path.append(self.id) + + @classmethod + def of( + cls, + client: str, + scope: ScopeRepr, + *, + parent: ConsentTest | None = None, + **kwargs, + ) -> ConsentTest: + return cls( + client=client, + scope=scope.id, + scope_name=scope.name, + dependency_path=list(parent.dependency_path) if parent else [], + **kwargs, + ) + + +def test_consent_forest_creation(): + root = ConsentTest.of(Clients.Zero, Scopes.A) + node1 = ConsentTest.of(Clients.One, Scopes.B, parent=root) + node2 = ConsentTest.of(Clients.Two, Scopes.C, parent=node1) + + forest = ConsentForest([root, node1, node2]) + assert len(forest.trees) == 1 + tree = forest.trees[0] + assert tree.root == root + assert tree.max_depth == 3 + + assert tree.edges[tree.root.id] == {node1.id} + assert tree.edges[node1.id] == {node2.id} + assert tree.edges[node2.id] == set() + + assert tree.get_node(root.id) == root + assert tree.get_node(node1.id) == node1 + assert tree.get_node(node2.id) == node2 + + +def test_consent_forest_scope_requirement_evaluation(): + root = ConsentTest.of(Clients.Zero, Scopes.A) + node1 = ConsentTest.of(Clients.One, Scopes.B, parent=root) + node2 = ConsentTest.of(Clients.Two, Scopes.C, parent=node1) + + forest = ConsentForest([root, node1, node2]) + + assert forest.meets_scope_requirements("A") + assert forest.meets_scope_requirements("A[B[C]]") + assert not forest.meets_scope_requirements("B") + assert not forest.meets_scope_requirements("A[C]") + + +def test_consent_forest_scope_requirement_with_sibling_dependent_scopes(): + root = ConsentTest.of(Clients.Zero, Scopes.A) + node1 = ConsentTest.of(Clients.One, Scopes.B, parent=root) + node2 = ConsentTest.of(Clients.Two, Scopes.C, parent=root) + + forest = ConsentForest([root, node1, node2]) + + assert forest.meets_scope_requirements("A") + assert forest.meets_scope_requirements("A[B]") + assert forest.meets_scope_requirements("A[C]") + assert forest.meets_scope_requirements("A[B C]") + assert not forest.meets_scope_requirements("A[B[C]]") + assert not forest.meets_scope_requirements("A[C[B]]") + + +@pytest.mark.parametrize("atomically_revocable", (True, False)) +def test_consent_forest_scope_requirement_with_optional_dependent_scopes( + atomically_revocable: bool, +): + """ + Dependent scope optionality is intentionally ignored for this implementation. + + In formal terms, the scope "A[*B]" is only satisfied by a tree matching the shape + A -> B where B is "atomically revocable". + We've decided that this is an auth service concern, not a concern for local clients + to be making decisions about; so we intentionally ignore this distinction in order + to give standard users a simpler verification mechanism to ask "will my request + work with the current set of consents?". + """ + root = ConsentTest.of(Clients.Zero, Scopes.A) + child = ConsentTest.of( + Clients.One, Scopes.B, parent=root, atomically_revocable=atomically_revocable + ) + + forest = ConsentForest([root, child]) + + assert forest.meets_scope_requirements("A[B]") + assert forest.meets_scope_requirements("A[*B]") + + +def test_consent_forest_with_disjoint_consents_with_duplicate_scopes(): + """ + Strange state to reproduce in practice but this test case simulates the forest of + Tree 1: A (Client Zero) -> B (Client Zero) + Tree 2: B (Client Zero) -> C (Client Zero) + + In this situation, A[B] and B[C] are both satisfied, but A[B[C]] is not. + """ + root1 = ConsentTest.of(Clients.Zero, Scopes.A) + child1 = ConsentTest.of(Clients.Zero, Scopes.B, parent=root1) + + root2 = ConsentTest.of(Clients.Zero, Scopes.B) + child2 = ConsentTest.of(Clients.Zero, Scopes.C, parent=root2) + + forest = ConsentForest([root1, child1, root2, child2]) + + assert forest.meets_scope_requirements("A[B]") + assert forest.meets_scope_requirements("B[C]") + assert not forest.meets_scope_requirements("A[B[C]]") + + +def test_consent_forest_with_missing_intermediary_nodes(): + """ + Simulate a situation in which we didn't receive the full list of consents from + Auth. So the tree has holes + + Tree: A -> -> C + """ + root = ConsentTest.of(Clients.Zero, Scopes.A) + node1 = ConsentTest.of(Clients.One, Scopes.B, parent=root) + node2 = ConsentTest.of(Clients.Two, Scopes.C, parent=node1) + + # Only add the first and last node to the forest. + # The last node (C) references the middle node (B) and so forest loading should + # fail. + with pytest.raises( + ConsentTreeConstructionError, match=rf"Missing parent node: {node1.id}" + ): + ConsentForest([root, node2])