From 3163494a897cf31f2891923d4512ac1c541a4dce Mon Sep 17 00:00:00 2001 From: TempestShaw <109138344+TempestShaw@users.noreply.github.com> Date: Sat, 13 Dec 2025 22:36:48 -0500 Subject: [PATCH 1/9] Fix ApprovalOperator with SimpleAuthManager when all_admins=True Fixes #59348. Added is_allowed() method to BaseAuthManager and all implementations to properly delegate HITL permission checks. - SimpleAuthManager: Returns True when simple_auth_manager_all_admins=True - Other managers: Check if user is in assigned_users list - Updated hitl.py to use auth manager's is_allowed() method Remove duplicate import --- .../auth/managers/base_auth_manager.py | 10 ++++++++ .../managers/simple/simple_auth_manager.py | 24 +++++++++++++++++++ .../core_api/routes/public/hitl.py | 9 +++++-- .../simple/test_simple_auth_manager.py | 21 ++++++++++++++++ .../aws/auth_manager/aws_auth_manager.py | 9 +++++++ .../aws/auth_manager/test_aws_auth_manager.py | 21 ++++++++++++++++ .../fab/auth_manager/fab_auth_manager.py | 11 +++++++++ .../fab/auth_manager/test_fab_auth_manager.py | 21 ++++++++++++++++ .../auth_manager/keycloak_auth_manager.py | 11 +++++++++ .../test_keycloak_auth_manager.py | 21 ++++++++++++++++ 10 files changed, 156 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py index a06f3799a4654..9eed8453e079a 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py @@ -69,6 +69,7 @@ DagAccessEntity, ) from airflow.cli.cli_config import CLICommand + from airflow.models.hitl import HITLUser # This cannot be in the TYPE_CHECKING block since some providers import it globally. # TODO: Move this inside once all providers drop Airflow 2.x support. @@ -347,6 +348,15 @@ def filter_authorized_menu_items(self, menu_items: list[MenuItem], *, user: T) - :param user: the user """ + @abstractmethod + def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: + """ + Check if a user is allowed to approve/reject a HITL task. + + :param user_id: the user id to check + :param assigned_users: list of users assigned to the task + """ + def batch_is_authorized_connection( self, requests: Sequence[IsAuthorizedConnectionRequest], diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py index 15834a2cfb16e..87d6bc7e1cb34 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py @@ -43,6 +43,8 @@ from airflow.configuration import AIRFLOW_HOME, conf if TYPE_CHECKING: + from collections.abc import Sequence + from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod from airflow.api_fastapi.auth.managers.models.resource_details import ( AccessView, @@ -55,6 +57,7 @@ PoolDetails, VariableDetails, ) + from airflow.models.hitl import HITLUser log = logging.getLogger(__name__) @@ -283,6 +286,27 @@ def filter_authorized_menu_items( ) -> list[MenuItem]: return menu_items + def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: + """ + Check if a user is allowed to approve/reject a HITL task. + + When simple_auth_manager_all_admins=True, all authenticated users are allowed + to approve/reject any task. Otherwise, the user must be in the assigned_users list. + """ + is_simple_auth_manager_all_admins = conf.getboolean("core", "simple_auth_manager_all_admins") + + if is_simple_auth_manager_all_admins: + # In all-admin mode, everyone is allowed + return True + + # If no assigned_users specified, allow access + if not assigned_users: + return True + + # Check if user is in the assigned_users list + allowed = any(user["id"] == user_id for user in assigned_users) + return allowed + def get_fastapi_app(self) -> FastAPI | None: """ Specify a sub FastAPI application specific to the auth manager. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py index 831a1cd094121..42a45de34deb8 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py @@ -51,7 +51,12 @@ UpdateHITLDetailPayload, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.api_fastapi.core_api.security import GetUserDep, ReadableTIFilterDep, requires_access_dag +from airflow.api_fastapi.core_api.security import ( + GetUserDep, + ReadableTIFilterDep, + get_auth_manager, + requires_access_dag, +) from airflow.api_fastapi.logging.decorators import action_logging from airflow.models.dag_version import DagVersion from airflow.models.dagrun import DagRun @@ -155,7 +160,7 @@ def update_hitl_detail( user_id = str(user_id) hitl_user = HITLUser(id=user_id, name=user_name) if hitl_detail_model.assigned_users: - if hitl_user not in hitl_detail_model.assigned_users: + if not get_auth_manager().is_allowed(user_id, hitl_detail_model.assigned_users): log.error("User=%s (id=%s) is not a respondent for the task", user_name, user_id) raise HTTPException( status.HTTP_403_FORBIDDEN, diff --git a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py index f5566be6595ad..a59d6ec38b522 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py +++ b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py @@ -271,3 +271,24 @@ def test_filter_authorized_menu_items(self, auth_manager): items, user=SimpleAuthManagerUser(username="test", role=None) ) assert results == items + + @pytest.mark.parametrize( + ("all_admins", "user_id", "assigned_users", "expected"), + [ + # When simple_auth_manager_all_admins=True, any user should be allowed + (True, "user1", [{"id": "user2", "name": "User 2"}], True), + (True, "user2", [{"id": "user2", "name": "User 2"}], True), + (True, "admin", [{"id": "test_user", "name": "Test User"}], True), + # When simple_auth_manager_all_admins=False, user must be in assigned_users + (False, "user1", [{"id": "user1", "name": "User 1"}], True), + (False, "user2", [{"id": "user1", "name": "User 1"}], False), + (False, "admin", [{"id": "test_user", "name": "Test User"}], False), + # When no assigned_users, allow access + (False, "user1", [], True), + ], + ) + def test_is_allowed(self, auth_manager, all_admins, user_id, assigned_users, expected): + """Test is_allowed method with different configurations.""" + with conf_vars({("core", "simple_auth_manager_all_admins"): str(all_admins)}): + result = auth_manager.is_allowed(user_id, assigned_users) + assert result is expected diff --git a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py index 1e499a46c1494..64a769180bce6 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py @@ -62,6 +62,7 @@ VariableDetails, ) from airflow.api_fastapi.common.types import MenuItem + from airflow.models.hitl import HITLUser class AwsAuthManager(BaseAuthManager[AwsAuthManagerUser]): @@ -247,6 +248,14 @@ def _has_access_to_menu_item(request: IsAuthorizedRequest): return [menu_item for menu_item in menu_items if _has_access_to_menu_item(requests[menu_item.value])] + def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: + """ + Check if a user is allowed to approve/reject a HITL task. + + User must be in assigned_users list. + """ + return any(user["id"] == user_id for user in assigned_users) + def batch_is_authorized_connection( self, requests: Sequence[IsAuthorizedConnectionRequest], diff --git a/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py b/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py index 8ee6fa8470b89..466ab31e49f29 100644 --- a/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py +++ b/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py @@ -730,3 +730,24 @@ def test_get_url_login(self, auth_manager): def test_get_cli_commands_return_cli_commands(self, auth_manager): assert len(auth_manager.get_cli_commands()) > 0 + + @pytest.mark.parametrize( + ("user_id", "assigned_users", "expected"), + [ + # User in assigned_users list + ("user1", [{"id": "user1", "name": "User 1"}], True), + # User not in assigned_users list + ("user2", [{"id": "user1", "name": "User 1"}], False), + # Multiple users - user1 in list + ("user1", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), + # Multiple users - user2 in list + ("user2", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), + # Multiple users - user3 not in list + ("user3", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], False), + # Empty assigned_users list + ("user1", [], False), + ], + ) + def test_is_allowed(self, user_id, assigned_users, expected, auth_manager): + result = auth_manager.is_allowed(user_id, assigned_users) + assert result is expected diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py index 397f174fb6dbb..63d61746d40cf 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py @@ -112,10 +112,13 @@ from airflow.utils.yaml import safe_load if TYPE_CHECKING: + from collections.abc import Sequence + from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod from airflow.cli.cli_config import ( CLICommand, ) + from airflow.models.hitl import HITLUser from airflow.providers.common.compat.assets import AssetAliasDetails, AssetDetails from airflow.providers.fab.auth_manager.security_manager.override import ( FabAirflowSecurityManagerOverride, @@ -467,6 +470,14 @@ def filter_authorized_menu_items(self, menu_items: list[MenuItem], user: User) - ) ] + def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: + """ + Check if a user is allowed to approve/reject a HITL task. + + User must be in assigned_users list. + """ + return any(user["id"] == user_id for user in assigned_users) + @provide_session def get_authorized_connections( self, diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py index fee62bb56704a..204eaec18660d 100644 --- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py +++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py @@ -877,6 +877,27 @@ def test_get_db_manager(self, auth_manager): result = auth_manager.get_db_manager() assert result == "airflow.providers.fab.auth_manager.models.db.FABDBManager" + @pytest.mark.parametrize( + ("user_id", "assigned_users", "expected"), + [ + # User in assigned_users list + ("user1", [{"id": "user1", "name": "User 1"}], True), + # User not in assigned_users list + ("user2", [{"id": "user1", "name": "User 1"}], False), + # Multiple users - user1 in list + ("user1", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), + # Multiple users - user2 in list + ("user2", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), + # Multiple users - user3 not in list + ("user3", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], False), + # Empty assigned_users list + ("user1", [], False), + ], + ) + def test_is_allowed(self, user_id, assigned_users, expected, auth_manager): + result = auth_manager.is_allowed(user_id, assigned_users) + assert result is expected + @pytest.mark.db_test @pytest.mark.parametrize("skip_init", [False, True]) diff --git a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py index 2ec4f20bf72d9..294cdd9581f19 100644 --- a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py +++ b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py @@ -57,6 +57,8 @@ from airflow.utils.helpers import prune_dict if TYPE_CHECKING: + from collections.abc import Sequence + from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod from airflow.api_fastapi.auth.managers.models.resource_details import ( AccessView, @@ -70,6 +72,7 @@ PoolDetails, VariableDetails, ) + from airflow.models.hitl import HITLUser log = logging.getLogger(__name__) @@ -288,6 +291,14 @@ def filter_authorized_menu_items( ) return [MenuItem(menu[1]) for menu in authorized_menus] + def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: + """ + Check if a user is allowed to approve/reject a HITL task. + + User must be in assigned_users list. + """ + return any(user["id"] == user_id for user in assigned_users) + def get_fastapi_app(self) -> FastAPI | None: from airflow.providers.keycloak.auth_manager.routes.login import login_router from airflow.providers.keycloak.auth_manager.routes.token import token_router diff --git a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py index 2902e7d0687d8..c0efcbfb71929 100644 --- a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py +++ b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py @@ -497,3 +497,24 @@ def test_token_expired(self, auth_manager, expiration, expected): token = auth_manager._get_token_signer(expiration_time_in_seconds=expiration).generate({}) assert KeycloakAuthManager._token_expired(token) is expected + + @pytest.mark.parametrize( + ("user_id", "assigned_users", "expected"), + [ + # User in assigned_users list + ("user1", [{"id": "user1", "name": "User 1"}], True), + # User not in assigned_users list + ("user2", [{"id": "user1", "name": "User 1"}], False), + # Multiple users - user1 in list + ("user1", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), + # Multiple users - user2 in list + ("user2", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), + # Multiple users - user3 not in list + ("user3", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], False), + # Empty assigned_users list + ("user1", [], False), + ], + ) + def test_is_allowed(self, user_id, assigned_users, expected, auth_manager): + result = auth_manager.is_allowed(user_id, assigned_users) + assert result is expected From 4c923aa812bef68733f8e32c57a5749efd4cd695 Mon Sep 17 00:00:00 2001 From: TempestShaw <109138344+TempestShaw@users.noreply.github.com> Date: Sun, 14 Dec 2025 08:28:26 -0500 Subject: [PATCH 2/9] Refactor HITL authorization methods across auth managers to improve consistency and remove redundant code --- .../auth/managers/base_auth_manager.py | 7 +++++-- .../aws/auth_manager/aws_auth_manager.py | 9 -------- .../aws/auth_manager/test_aws_auth_manager.py | 21 ------------------- .../fab/auth_manager/fab_auth_manager.py | 11 ---------- .../fab/auth_manager/test_fab_auth_manager.py | 21 ------------------- .../auth_manager/keycloak_auth_manager.py | 11 ---------- .../test_keycloak_auth_manager.py | 21 ------------------- 7 files changed, 5 insertions(+), 96 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py index 9eed8453e079a..19b10d35869c0 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py @@ -348,14 +348,17 @@ def filter_authorized_menu_items(self, menu_items: list[MenuItem], *, user: T) - :param user: the user """ - @abstractmethod - def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: + def is_authorized_hitl_task(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: """ Check if a user is allowed to approve/reject a HITL task. + By default, checks if the user_id matches any user in the assigned_users list. + Auth managers can override this method to implement custom logic. + :param user_id: the user id to check :param assigned_users: list of users assigned to the task """ + return any(user["id"] == user_id for user in assigned_users) def batch_is_authorized_connection( self, diff --git a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py index 64a769180bce6..1e499a46c1494 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py @@ -62,7 +62,6 @@ VariableDetails, ) from airflow.api_fastapi.common.types import MenuItem - from airflow.models.hitl import HITLUser class AwsAuthManager(BaseAuthManager[AwsAuthManagerUser]): @@ -248,14 +247,6 @@ def _has_access_to_menu_item(request: IsAuthorizedRequest): return [menu_item for menu_item in menu_items if _has_access_to_menu_item(requests[menu_item.value])] - def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: - """ - Check if a user is allowed to approve/reject a HITL task. - - User must be in assigned_users list. - """ - return any(user["id"] == user_id for user in assigned_users) - def batch_is_authorized_connection( self, requests: Sequence[IsAuthorizedConnectionRequest], diff --git a/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py b/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py index 466ab31e49f29..8ee6fa8470b89 100644 --- a/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py +++ b/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py @@ -730,24 +730,3 @@ def test_get_url_login(self, auth_manager): def test_get_cli_commands_return_cli_commands(self, auth_manager): assert len(auth_manager.get_cli_commands()) > 0 - - @pytest.mark.parametrize( - ("user_id", "assigned_users", "expected"), - [ - # User in assigned_users list - ("user1", [{"id": "user1", "name": "User 1"}], True), - # User not in assigned_users list - ("user2", [{"id": "user1", "name": "User 1"}], False), - # Multiple users - user1 in list - ("user1", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), - # Multiple users - user2 in list - ("user2", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), - # Multiple users - user3 not in list - ("user3", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], False), - # Empty assigned_users list - ("user1", [], False), - ], - ) - def test_is_allowed(self, user_id, assigned_users, expected, auth_manager): - result = auth_manager.is_allowed(user_id, assigned_users) - assert result is expected diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py index 63d61746d40cf..397f174fb6dbb 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py @@ -112,13 +112,10 @@ from airflow.utils.yaml import safe_load if TYPE_CHECKING: - from collections.abc import Sequence - from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod from airflow.cli.cli_config import ( CLICommand, ) - from airflow.models.hitl import HITLUser from airflow.providers.common.compat.assets import AssetAliasDetails, AssetDetails from airflow.providers.fab.auth_manager.security_manager.override import ( FabAirflowSecurityManagerOverride, @@ -470,14 +467,6 @@ def filter_authorized_menu_items(self, menu_items: list[MenuItem], user: User) - ) ] - def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: - """ - Check if a user is allowed to approve/reject a HITL task. - - User must be in assigned_users list. - """ - return any(user["id"] == user_id for user in assigned_users) - @provide_session def get_authorized_connections( self, diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py index 204eaec18660d..fee62bb56704a 100644 --- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py +++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py @@ -877,27 +877,6 @@ def test_get_db_manager(self, auth_manager): result = auth_manager.get_db_manager() assert result == "airflow.providers.fab.auth_manager.models.db.FABDBManager" - @pytest.mark.parametrize( - ("user_id", "assigned_users", "expected"), - [ - # User in assigned_users list - ("user1", [{"id": "user1", "name": "User 1"}], True), - # User not in assigned_users list - ("user2", [{"id": "user1", "name": "User 1"}], False), - # Multiple users - user1 in list - ("user1", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), - # Multiple users - user2 in list - ("user2", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), - # Multiple users - user3 not in list - ("user3", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], False), - # Empty assigned_users list - ("user1", [], False), - ], - ) - def test_is_allowed(self, user_id, assigned_users, expected, auth_manager): - result = auth_manager.is_allowed(user_id, assigned_users) - assert result is expected - @pytest.mark.db_test @pytest.mark.parametrize("skip_init", [False, True]) diff --git a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py index 294cdd9581f19..2ec4f20bf72d9 100644 --- a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py +++ b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py @@ -57,8 +57,6 @@ from airflow.utils.helpers import prune_dict if TYPE_CHECKING: - from collections.abc import Sequence - from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod from airflow.api_fastapi.auth.managers.models.resource_details import ( AccessView, @@ -72,7 +70,6 @@ PoolDetails, VariableDetails, ) - from airflow.models.hitl import HITLUser log = logging.getLogger(__name__) @@ -291,14 +288,6 @@ def filter_authorized_menu_items( ) return [MenuItem(menu[1]) for menu in authorized_menus] - def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: - """ - Check if a user is allowed to approve/reject a HITL task. - - User must be in assigned_users list. - """ - return any(user["id"] == user_id for user in assigned_users) - def get_fastapi_app(self) -> FastAPI | None: from airflow.providers.keycloak.auth_manager.routes.login import login_router from airflow.providers.keycloak.auth_manager.routes.token import token_router diff --git a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py index c0efcbfb71929..2902e7d0687d8 100644 --- a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py +++ b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py @@ -497,24 +497,3 @@ def test_token_expired(self, auth_manager, expiration, expected): token = auth_manager._get_token_signer(expiration_time_in_seconds=expiration).generate({}) assert KeycloakAuthManager._token_expired(token) is expected - - @pytest.mark.parametrize( - ("user_id", "assigned_users", "expected"), - [ - # User in assigned_users list - ("user1", [{"id": "user1", "name": "User 1"}], True), - # User not in assigned_users list - ("user2", [{"id": "user1", "name": "User 1"}], False), - # Multiple users - user1 in list - ("user1", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), - # Multiple users - user2 in list - ("user2", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), - # Multiple users - user3 not in list - ("user3", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], False), - # Empty assigned_users list - ("user1", [], False), - ], - ) - def test_is_allowed(self, user_id, assigned_users, expected, auth_manager): - result = auth_manager.is_allowed(user_id, assigned_users) - assert result is expected From 535aa034b3ae2103eac2f4cd0b03c028aa217cc1 Mon Sep 17 00:00:00 2001 From: TempestShaw <109138344+TempestShaw@users.noreply.github.com> Date: Mon, 15 Dec 2025 13:57:54 -0500 Subject: [PATCH 3/9] Refactor is_authorized_hitl_task signature - Update method to use keyword-only parameters - Take full user object instead of just user_id - Add unit tests for BaseAuthManager and SimpleAuthManager - Update hitl.py call site to match new signature --- .../docs/core-concepts/auth-manager/index.rst | 1 + airflow-core/newsfragments/59399.feature.rst | 1 + .../auth/managers/base_auth_manager.py | 11 ++++----- .../managers/simple/simple_auth_manager.py | 12 ++++------ .../core_api/routes/public/hitl.py | 4 +++- .../simple/test_simple_auth_manager.py | 23 ++++++++++--------- .../auth/managers/test_base_auth_manager.py | 20 ++++++++++++++++ 7 files changed, 46 insertions(+), 26 deletions(-) create mode 100644 airflow-core/newsfragments/59399.feature.rst diff --git a/airflow-core/docs/core-concepts/auth-manager/index.rst b/airflow-core/docs/core-concepts/auth-manager/index.rst index 597507d06cf24..b3e56a4312fd7 100644 --- a/airflow-core/docs/core-concepts/auth-manager/index.rst +++ b/airflow-core/docs/core-concepts/auth-manager/index.rst @@ -141,6 +141,7 @@ These authorization methods are: * ``is_authorized_asset_alias``: Return whether the user is authorized to access Airflow asset aliases. Some details about the asset alias can be provided (e.g. the asset alias ID). * ``is_authorized_pool``: Return whether the user is authorized to access Airflow pools. Some details about the pool can be provided (e.g. the pool name). * ``is_authorized_variable``: Return whether the user is authorized to access Airflow variables. Some details about the variable can be provided (e.g. the variable key). +* ``is_authorized_hitl_task``: Return whether the user is authorized to approve or reject a Human-in-the-loop (HITL) task. * ``is_authorized_view``: Return whether the user is authorized to access a specific view in Airflow. The view is specified through ``access_view`` (e.g. ``AccessView.CLUSTER_ACTIVITY``). * ``is_authorized_custom_view``: Return whether the user is authorized to access a specific view not defined in Airflow. This view can be provided by the auth manager itself or a plugin defined by the user. * ``filter_authorized_menu_items``: Given the list of menu items in the UI, return the list of menu items the user has access to. diff --git a/airflow-core/newsfragments/59399.feature.rst b/airflow-core/newsfragments/59399.feature.rst new file mode 100644 index 0000000000000..6c0a4cc7ef543 --- /dev/null +++ b/airflow-core/newsfragments/59399.feature.rst @@ -0,0 +1 @@ +Add ``is_authorized_hitl_task()`` method to fix ApprovalOperator authorization with SimpleAuthManager diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py index 19b10d35869c0..ea5b1200337b6 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py @@ -69,7 +69,6 @@ DagAccessEntity, ) from airflow.cli.cli_config import CLICommand - from airflow.models.hitl import HITLUser # This cannot be in the TYPE_CHECKING block since some providers import it globally. # TODO: Move this inside once all providers drop Airflow 2.x support. @@ -348,17 +347,17 @@ def filter_authorized_menu_items(self, menu_items: list[MenuItem], *, user: T) - :param user: the user """ - def is_authorized_hitl_task(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: + def is_authorized_hitl_task(self, *, assigned_users: set[str], user: T) -> bool: """ Check if a user is allowed to approve/reject a HITL task. - By default, checks if the user_id matches any user in the assigned_users list. + By default, checks if the user's ID is in the assigned_users set. Auth managers can override this method to implement custom logic. - :param user_id: the user id to check - :param assigned_users: list of users assigned to the task + :param assigned_users: set of user IDs assigned to the task + :param user: the user to check authorization for """ - return any(user["id"] == user_id for user in assigned_users) + return user.get_id() in assigned_users def batch_is_authorized_connection( self, diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py index 87d6bc7e1cb34..6218b48296b71 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py @@ -43,8 +43,6 @@ from airflow.configuration import AIRFLOW_HOME, conf if TYPE_CHECKING: - from collections.abc import Sequence - from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod from airflow.api_fastapi.auth.managers.models.resource_details import ( AccessView, @@ -57,7 +55,6 @@ PoolDetails, VariableDetails, ) - from airflow.models.hitl import HITLUser log = logging.getLogger(__name__) @@ -286,12 +283,12 @@ def filter_authorized_menu_items( ) -> list[MenuItem]: return menu_items - def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: + def is_authorized_hitl_task(self, *, assigned_users: set[str], user: SimpleAuthManagerUser) -> bool: """ Check if a user is allowed to approve/reject a HITL task. When simple_auth_manager_all_admins=True, all authenticated users are allowed - to approve/reject any task. Otherwise, the user must be in the assigned_users list. + to approve/reject any task. Otherwise, the user must be in the assigned_users set. """ is_simple_auth_manager_all_admins = conf.getboolean("core", "simple_auth_manager_all_admins") @@ -303,9 +300,8 @@ def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: if not assigned_users: return True - # Check if user is in the assigned_users list - allowed = any(user["id"] == user_id for user in assigned_users) - return allowed + # Delegate to parent class for the actual authorization check + return super().is_authorized_hitl_task(assigned_users=assigned_users, user=user) def get_fastapi_app(self) -> FastAPI | None: """ diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py index 42a45de34deb8..5e3439c0938b7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py @@ -160,7 +160,9 @@ def update_hitl_detail( user_id = str(user_id) hitl_user = HITLUser(id=user_id, name=user_name) if hitl_detail_model.assigned_users: - if not get_auth_manager().is_allowed(user_id, hitl_detail_model.assigned_users): + # Convert assigned_users list to set of user IDs for authorization check + assigned_user_ids = {assigned_user["id"] for assigned_user in hitl_detail_model.assigned_users} + if not get_auth_manager().is_authorized_hitl_task(assigned_users=assigned_user_ids, user=user): log.error("User=%s (id=%s) is not a respondent for the task", user_name, user_id) raise HTTPException( status.HTTP_403_FORBIDDEN, diff --git a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py index a59d6ec38b522..2c67f79a5b879 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py +++ b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py @@ -276,19 +276,20 @@ def test_filter_authorized_menu_items(self, auth_manager): ("all_admins", "user_id", "assigned_users", "expected"), [ # When simple_auth_manager_all_admins=True, any user should be allowed - (True, "user1", [{"id": "user2", "name": "User 2"}], True), - (True, "user2", [{"id": "user2", "name": "User 2"}], True), - (True, "admin", [{"id": "test_user", "name": "Test User"}], True), + (True, "user1", {"user2"}, True), + (True, "user2", {"user2"}, True), + (True, "admin", {"test_user"}, True), # When simple_auth_manager_all_admins=False, user must be in assigned_users - (False, "user1", [{"id": "user1", "name": "User 1"}], True), - (False, "user2", [{"id": "user1", "name": "User 1"}], False), - (False, "admin", [{"id": "test_user", "name": "Test User"}], False), + (False, "user1", {"user1"}, True), + (False, "user2", {"user1"}, False), + (False, "admin", {"test_user"}, False), # When no assigned_users, allow access - (False, "user1", [], True), + (False, "user1", set(), True), ], ) - def test_is_allowed(self, auth_manager, all_admins, user_id, assigned_users, expected): - """Test is_allowed method with different configurations.""" + def test_is_authorized_hitl_task(self, auth_manager, all_admins, user_id, assigned_users, expected): + """Test is_authorized_hitl_task method with different configurations.""" with conf_vars({("core", "simple_auth_manager_all_admins"): str(all_admins)}): - result = auth_manager.is_allowed(user_id, assigned_users) - assert result is expected + user = SimpleAuthManagerUser(username=user_id, role="user") + result = auth_manager.is_authorized_hitl_task(assigned_users=assigned_users, user=user) + assert result == expected diff --git a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py index cfaaeb3c3c364..e719ff4836b58 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py +++ b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py @@ -594,3 +594,23 @@ def side_effect_func( session.execute.return_value.all.return_value = rows result = auth_manager.get_authorized_pools(user=user, session=session) assert result == expected + + @pytest.mark.parametrize( + ("user_id", "assigned_users", "expected"), + [ + # User in assigned_users + ("user1", {"user1", "user2"}, True), + ("user2", {"user1", "user2"}, True), + # User not in assigned_users + ("user3", {"user1", "user2"}, False), + # Empty assigned_users + ("user1", set(), False), + ], + ) + def test_is_authorized_hitl_task( + self, auth_manager, user_id: str, assigned_users: set[str], expected: bool + ): + """Test is_authorized_hitl_task method with the new signature.""" + user = BaseAuthManagerUserTest(name=user_id) + result = auth_manager.is_authorized_hitl_task(assigned_users=assigned_users, user=user) + assert result == expected From e1a1bd1f2e8241b21f5e3788c0ff6d3504dfc057 Mon Sep 17 00:00:00 2001 From: TempestShaw <109138344+TempestShaw@users.noreply.github.com> Date: Sat, 13 Dec 2025 22:36:48 -0500 Subject: [PATCH 4/9] Fix ApprovalOperator with SimpleAuthManager when all_admins=True Fixes #59348. Added is_allowed() method to BaseAuthManager and all implementations to properly delegate HITL permission checks. - SimpleAuthManager: Returns True when simple_auth_manager_all_admins=True - Other managers: Check if user is in assigned_users list - Updated hitl.py to use auth manager's is_allowed() method Remove duplicate import --- .../auth/managers/base_auth_manager.py | 1 + .../managers/simple/simple_auth_manager.py | 3 +++ .../aws/auth_manager/aws_auth_manager.py | 9 ++++++++ .../aws/auth_manager/test_aws_auth_manager.py | 21 +++++++++++++++++++ .../fab/auth_manager/fab_auth_manager.py | 11 ++++++++++ .../fab/auth_manager/test_fab_auth_manager.py | 21 +++++++++++++++++++ .../auth_manager/keycloak_auth_manager.py | 11 ++++++++++ .../test_keycloak_auth_manager.py | 21 +++++++++++++++++++ 8 files changed, 98 insertions(+) diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py index ea5b1200337b6..b80ef04021272 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py @@ -69,6 +69,7 @@ DagAccessEntity, ) from airflow.cli.cli_config import CLICommand + from airflow.models.hitl import HITLUser # This cannot be in the TYPE_CHECKING block since some providers import it globally. # TODO: Move this inside once all providers drop Airflow 2.x support. diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py index 6218b48296b71..8a6bcee643e76 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py @@ -43,6 +43,8 @@ from airflow.configuration import AIRFLOW_HOME, conf if TYPE_CHECKING: + from collections.abc import Sequence + from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod from airflow.api_fastapi.auth.managers.models.resource_details import ( AccessView, @@ -55,6 +57,7 @@ PoolDetails, VariableDetails, ) + from airflow.models.hitl import HITLUser log = logging.getLogger(__name__) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py index 1e499a46c1494..64a769180bce6 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py @@ -62,6 +62,7 @@ VariableDetails, ) from airflow.api_fastapi.common.types import MenuItem + from airflow.models.hitl import HITLUser class AwsAuthManager(BaseAuthManager[AwsAuthManagerUser]): @@ -247,6 +248,14 @@ def _has_access_to_menu_item(request: IsAuthorizedRequest): return [menu_item for menu_item in menu_items if _has_access_to_menu_item(requests[menu_item.value])] + def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: + """ + Check if a user is allowed to approve/reject a HITL task. + + User must be in assigned_users list. + """ + return any(user["id"] == user_id for user in assigned_users) + def batch_is_authorized_connection( self, requests: Sequence[IsAuthorizedConnectionRequest], diff --git a/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py b/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py index 8ee6fa8470b89..466ab31e49f29 100644 --- a/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py +++ b/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py @@ -730,3 +730,24 @@ def test_get_url_login(self, auth_manager): def test_get_cli_commands_return_cli_commands(self, auth_manager): assert len(auth_manager.get_cli_commands()) > 0 + + @pytest.mark.parametrize( + ("user_id", "assigned_users", "expected"), + [ + # User in assigned_users list + ("user1", [{"id": "user1", "name": "User 1"}], True), + # User not in assigned_users list + ("user2", [{"id": "user1", "name": "User 1"}], False), + # Multiple users - user1 in list + ("user1", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), + # Multiple users - user2 in list + ("user2", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), + # Multiple users - user3 not in list + ("user3", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], False), + # Empty assigned_users list + ("user1", [], False), + ], + ) + def test_is_allowed(self, user_id, assigned_users, expected, auth_manager): + result = auth_manager.is_allowed(user_id, assigned_users) + assert result is expected diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py index 397f174fb6dbb..63d61746d40cf 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py @@ -112,10 +112,13 @@ from airflow.utils.yaml import safe_load if TYPE_CHECKING: + from collections.abc import Sequence + from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod from airflow.cli.cli_config import ( CLICommand, ) + from airflow.models.hitl import HITLUser from airflow.providers.common.compat.assets import AssetAliasDetails, AssetDetails from airflow.providers.fab.auth_manager.security_manager.override import ( FabAirflowSecurityManagerOverride, @@ -467,6 +470,14 @@ def filter_authorized_menu_items(self, menu_items: list[MenuItem], user: User) - ) ] + def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: + """ + Check if a user is allowed to approve/reject a HITL task. + + User must be in assigned_users list. + """ + return any(user["id"] == user_id for user in assigned_users) + @provide_session def get_authorized_connections( self, diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py index fee62bb56704a..204eaec18660d 100644 --- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py +++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py @@ -877,6 +877,27 @@ def test_get_db_manager(self, auth_manager): result = auth_manager.get_db_manager() assert result == "airflow.providers.fab.auth_manager.models.db.FABDBManager" + @pytest.mark.parametrize( + ("user_id", "assigned_users", "expected"), + [ + # User in assigned_users list + ("user1", [{"id": "user1", "name": "User 1"}], True), + # User not in assigned_users list + ("user2", [{"id": "user1", "name": "User 1"}], False), + # Multiple users - user1 in list + ("user1", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), + # Multiple users - user2 in list + ("user2", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), + # Multiple users - user3 not in list + ("user3", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], False), + # Empty assigned_users list + ("user1", [], False), + ], + ) + def test_is_allowed(self, user_id, assigned_users, expected, auth_manager): + result = auth_manager.is_allowed(user_id, assigned_users) + assert result is expected + @pytest.mark.db_test @pytest.mark.parametrize("skip_init", [False, True]) diff --git a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py index 2ec4f20bf72d9..294cdd9581f19 100644 --- a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py +++ b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py @@ -57,6 +57,8 @@ from airflow.utils.helpers import prune_dict if TYPE_CHECKING: + from collections.abc import Sequence + from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod from airflow.api_fastapi.auth.managers.models.resource_details import ( AccessView, @@ -70,6 +72,7 @@ PoolDetails, VariableDetails, ) + from airflow.models.hitl import HITLUser log = logging.getLogger(__name__) @@ -288,6 +291,14 @@ def filter_authorized_menu_items( ) return [MenuItem(menu[1]) for menu in authorized_menus] + def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: + """ + Check if a user is allowed to approve/reject a HITL task. + + User must be in assigned_users list. + """ + return any(user["id"] == user_id for user in assigned_users) + def get_fastapi_app(self) -> FastAPI | None: from airflow.providers.keycloak.auth_manager.routes.login import login_router from airflow.providers.keycloak.auth_manager.routes.token import token_router diff --git a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py index 2902e7d0687d8..c0efcbfb71929 100644 --- a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py +++ b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py @@ -497,3 +497,24 @@ def test_token_expired(self, auth_manager, expiration, expected): token = auth_manager._get_token_signer(expiration_time_in_seconds=expiration).generate({}) assert KeycloakAuthManager._token_expired(token) is expected + + @pytest.mark.parametrize( + ("user_id", "assigned_users", "expected"), + [ + # User in assigned_users list + ("user1", [{"id": "user1", "name": "User 1"}], True), + # User not in assigned_users list + ("user2", [{"id": "user1", "name": "User 1"}], False), + # Multiple users - user1 in list + ("user1", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), + # Multiple users - user2 in list + ("user2", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), + # Multiple users - user3 not in list + ("user3", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], False), + # Empty assigned_users list + ("user1", [], False), + ], + ) + def test_is_allowed(self, user_id, assigned_users, expected, auth_manager): + result = auth_manager.is_allowed(user_id, assigned_users) + assert result is expected From e058625a66fb629c7c992ad3043fbc47442dcf70 Mon Sep 17 00:00:00 2001 From: TempestShaw <109138344+TempestShaw@users.noreply.github.com> Date: Sun, 14 Dec 2025 08:28:26 -0500 Subject: [PATCH 5/9] Refactor HITL authorization methods across auth managers to improve consistency and remove redundant code --- .../aws/auth_manager/aws_auth_manager.py | 9 -------- .../aws/auth_manager/test_aws_auth_manager.py | 21 ------------------- .../fab/auth_manager/fab_auth_manager.py | 11 ---------- .../fab/auth_manager/test_fab_auth_manager.py | 21 ------------------- .../auth_manager/keycloak_auth_manager.py | 11 ---------- .../test_keycloak_auth_manager.py | 21 ------------------- 6 files changed, 94 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py index 64a769180bce6..1e499a46c1494 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py @@ -62,7 +62,6 @@ VariableDetails, ) from airflow.api_fastapi.common.types import MenuItem - from airflow.models.hitl import HITLUser class AwsAuthManager(BaseAuthManager[AwsAuthManagerUser]): @@ -248,14 +247,6 @@ def _has_access_to_menu_item(request: IsAuthorizedRequest): return [menu_item for menu_item in menu_items if _has_access_to_menu_item(requests[menu_item.value])] - def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: - """ - Check if a user is allowed to approve/reject a HITL task. - - User must be in assigned_users list. - """ - return any(user["id"] == user_id for user in assigned_users) - def batch_is_authorized_connection( self, requests: Sequence[IsAuthorizedConnectionRequest], diff --git a/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py b/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py index 466ab31e49f29..8ee6fa8470b89 100644 --- a/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py +++ b/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py @@ -730,24 +730,3 @@ def test_get_url_login(self, auth_manager): def test_get_cli_commands_return_cli_commands(self, auth_manager): assert len(auth_manager.get_cli_commands()) > 0 - - @pytest.mark.parametrize( - ("user_id", "assigned_users", "expected"), - [ - # User in assigned_users list - ("user1", [{"id": "user1", "name": "User 1"}], True), - # User not in assigned_users list - ("user2", [{"id": "user1", "name": "User 1"}], False), - # Multiple users - user1 in list - ("user1", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), - # Multiple users - user2 in list - ("user2", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), - # Multiple users - user3 not in list - ("user3", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], False), - # Empty assigned_users list - ("user1", [], False), - ], - ) - def test_is_allowed(self, user_id, assigned_users, expected, auth_manager): - result = auth_manager.is_allowed(user_id, assigned_users) - assert result is expected diff --git a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py index 63d61746d40cf..397f174fb6dbb 100644 --- a/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py +++ b/providers/fab/src/airflow/providers/fab/auth_manager/fab_auth_manager.py @@ -112,13 +112,10 @@ from airflow.utils.yaml import safe_load if TYPE_CHECKING: - from collections.abc import Sequence - from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod from airflow.cli.cli_config import ( CLICommand, ) - from airflow.models.hitl import HITLUser from airflow.providers.common.compat.assets import AssetAliasDetails, AssetDetails from airflow.providers.fab.auth_manager.security_manager.override import ( FabAirflowSecurityManagerOverride, @@ -470,14 +467,6 @@ def filter_authorized_menu_items(self, menu_items: list[MenuItem], user: User) - ) ] - def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: - """ - Check if a user is allowed to approve/reject a HITL task. - - User must be in assigned_users list. - """ - return any(user["id"] == user_id for user in assigned_users) - @provide_session def get_authorized_connections( self, diff --git a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py index 204eaec18660d..fee62bb56704a 100644 --- a/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py +++ b/providers/fab/tests/unit/fab/auth_manager/test_fab_auth_manager.py @@ -877,27 +877,6 @@ def test_get_db_manager(self, auth_manager): result = auth_manager.get_db_manager() assert result == "airflow.providers.fab.auth_manager.models.db.FABDBManager" - @pytest.mark.parametrize( - ("user_id", "assigned_users", "expected"), - [ - # User in assigned_users list - ("user1", [{"id": "user1", "name": "User 1"}], True), - # User not in assigned_users list - ("user2", [{"id": "user1", "name": "User 1"}], False), - # Multiple users - user1 in list - ("user1", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), - # Multiple users - user2 in list - ("user2", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), - # Multiple users - user3 not in list - ("user3", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], False), - # Empty assigned_users list - ("user1", [], False), - ], - ) - def test_is_allowed(self, user_id, assigned_users, expected, auth_manager): - result = auth_manager.is_allowed(user_id, assigned_users) - assert result is expected - @pytest.mark.db_test @pytest.mark.parametrize("skip_init", [False, True]) diff --git a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py index 294cdd9581f19..2ec4f20bf72d9 100644 --- a/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py +++ b/providers/keycloak/src/airflow/providers/keycloak/auth_manager/keycloak_auth_manager.py @@ -57,8 +57,6 @@ from airflow.utils.helpers import prune_dict if TYPE_CHECKING: - from collections.abc import Sequence - from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod from airflow.api_fastapi.auth.managers.models.resource_details import ( AccessView, @@ -72,7 +70,6 @@ PoolDetails, VariableDetails, ) - from airflow.models.hitl import HITLUser log = logging.getLogger(__name__) @@ -291,14 +288,6 @@ def filter_authorized_menu_items( ) return [MenuItem(menu[1]) for menu in authorized_menus] - def is_allowed(self, user_id: str, assigned_users: Sequence[HITLUser]) -> bool: - """ - Check if a user is allowed to approve/reject a HITL task. - - User must be in assigned_users list. - """ - return any(user["id"] == user_id for user in assigned_users) - def get_fastapi_app(self) -> FastAPI | None: from airflow.providers.keycloak.auth_manager.routes.login import login_router from airflow.providers.keycloak.auth_manager.routes.token import token_router diff --git a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py index c0efcbfb71929..2902e7d0687d8 100644 --- a/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py +++ b/providers/keycloak/tests/unit/keycloak/auth_manager/test_keycloak_auth_manager.py @@ -497,24 +497,3 @@ def test_token_expired(self, auth_manager, expiration, expected): token = auth_manager._get_token_signer(expiration_time_in_seconds=expiration).generate({}) assert KeycloakAuthManager._token_expired(token) is expected - - @pytest.mark.parametrize( - ("user_id", "assigned_users", "expected"), - [ - # User in assigned_users list - ("user1", [{"id": "user1", "name": "User 1"}], True), - # User not in assigned_users list - ("user2", [{"id": "user1", "name": "User 1"}], False), - # Multiple users - user1 in list - ("user1", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), - # Multiple users - user2 in list - ("user2", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], True), - # Multiple users - user3 not in list - ("user3", [{"id": "user1", "name": "User 1"}, {"id": "user2", "name": "User 2"}], False), - # Empty assigned_users list - ("user1", [], False), - ], - ) - def test_is_allowed(self, user_id, assigned_users, expected, auth_manager): - result = auth_manager.is_allowed(user_id, assigned_users) - assert result is expected From 6df3d61ee169b131117b3e87880fba088d162b7f Mon Sep 17 00:00:00 2001 From: TempestShaw <109138344+TempestShaw@users.noreply.github.com> Date: Mon, 15 Dec 2025 13:57:54 -0500 Subject: [PATCH 6/9] Refactor is_authorized_hitl_task signature - Update method to use keyword-only parameters - Take full user object instead of just user_id - Add unit tests for BaseAuthManager and SimpleAuthManager - Update hitl.py call site to match new signature --- .../src/airflow/api_fastapi/auth/managers/base_auth_manager.py | 1 - .../api_fastapi/auth/managers/simple/simple_auth_manager.py | 3 --- 2 files changed, 4 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py index b80ef04021272..ea5b1200337b6 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py @@ -69,7 +69,6 @@ DagAccessEntity, ) from airflow.cli.cli_config import CLICommand - from airflow.models.hitl import HITLUser # This cannot be in the TYPE_CHECKING block since some providers import it globally. # TODO: Move this inside once all providers drop Airflow 2.x support. diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py index 8a6bcee643e76..6218b48296b71 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py @@ -43,8 +43,6 @@ from airflow.configuration import AIRFLOW_HOME, conf if TYPE_CHECKING: - from collections.abc import Sequence - from airflow.api_fastapi.auth.managers.base_auth_manager import ResourceMethod from airflow.api_fastapi.auth.managers.models.resource_details import ( AccessView, @@ -57,7 +55,6 @@ PoolDetails, VariableDetails, ) - from airflow.models.hitl import HITLUser log = logging.getLogger(__name__) From 659abfdd7a275c4d034fa23dc9e95c8974c1f639 Mon Sep 17 00:00:00 2001 From: Victor Kwong <109138344+TempestShaw@users.noreply.github.com> Date: Thu, 18 Dec 2025 09:55:52 +0800 Subject: [PATCH 7/9] Update airflow-core/newsfragments/59399.feature.rst Co-authored-by: Vincent <97131062+vincbeck@users.noreply.github.com> --- airflow-core/newsfragments/59399.feature.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/newsfragments/59399.feature.rst b/airflow-core/newsfragments/59399.feature.rst index 6c0a4cc7ef543..4017b3ec0e946 100644 --- a/airflow-core/newsfragments/59399.feature.rst +++ b/airflow-core/newsfragments/59399.feature.rst @@ -1 +1 @@ -Add ``is_authorized_hitl_task()`` method to fix ApprovalOperator authorization with SimpleAuthManager +Add ``is_authorized_hitl_task()`` method to check whether a user a is authorized to approve a HITL task From 216e684f12f9436947b9af0b8014e7cce763be6d Mon Sep 17 00:00:00 2001 From: TempestShaw <109138344+TempestShaw@users.noreply.github.com> Date: Mon, 22 Dec 2025 10:55:06 +0800 Subject: [PATCH 8/9] Clarify is_authorized_hitl_task method documentation to specify custom authorization logic for HITL tasks --- airflow-core/docs/core-concepts/auth-manager/index.rst | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airflow-core/docs/core-concepts/auth-manager/index.rst b/airflow-core/docs/core-concepts/auth-manager/index.rst index b3e56a4312fd7..9b3d337c4257a 100644 --- a/airflow-core/docs/core-concepts/auth-manager/index.rst +++ b/airflow-core/docs/core-concepts/auth-manager/index.rst @@ -141,11 +141,9 @@ These authorization methods are: * ``is_authorized_asset_alias``: Return whether the user is authorized to access Airflow asset aliases. Some details about the asset alias can be provided (e.g. the asset alias ID). * ``is_authorized_pool``: Return whether the user is authorized to access Airflow pools. Some details about the pool can be provided (e.g. the pool name). * ``is_authorized_variable``: Return whether the user is authorized to access Airflow variables. Some details about the variable can be provided (e.g. the variable key). -* ``is_authorized_hitl_task``: Return whether the user is authorized to approve or reject a Human-in-the-loop (HITL) task. * ``is_authorized_view``: Return whether the user is authorized to access a specific view in Airflow. The view is specified through ``access_view`` (e.g. ``AccessView.CLUSTER_ACTIVITY``). * ``is_authorized_custom_view``: Return whether the user is authorized to access a specific view not defined in Airflow. This view can be provided by the auth manager itself or a plugin defined by the user. * ``filter_authorized_menu_items``: Given the list of menu items in the UI, return the list of menu items the user has access to. - It should be noted that the ``method`` parameter listed above may only have relevance for a specific subset of the auth manager's authorization methods. For example, the ``configuration`` resource is by definition read-only, so only the ``GET`` parameter is relevant in the context of ``is_authorized_configuration``. @@ -205,6 +203,7 @@ The following methods aren't required to override to have a functional Airflow a * ``filter_authorized_dag_ids``: Given a list of Dag IDs, return the list of Dag IDs the user has access to. If not overridden, it calls ``is_authorized_dag`` for every single Dag passes as parameter. * ``filter_authorized_pools``: Given a list of pool names, return the list of pool names the user has access to. If not overridden, it calls ``is_authorized_pool`` for every single pool passed as parameter. * ``filter_authorized_variables``: Given a list of variable keys, return the list of variable keys the user has access to. If not overridden, it calls ``is_authorized_variable`` for every single variable passed as parameter. +* ``is_authorized_hitl_task``: Return whether the user is authorized to approve or reject a Human-in-the-loop (HITL) task. Override this method to implement custom authorization logic for HITL tasks. If not overridden, it checks if the user's ID is in the assigned users list. CLI ^^^ From dbc6f745d9489b45e0f15c432da00cf952fbee9c Mon Sep 17 00:00:00 2001 From: TempestShaw <109138344+TempestShaw@users.noreply.github.com> Date: Mon, 22 Dec 2025 12:05:17 +0800 Subject: [PATCH 9/9] modify documentation for is_authorized_hitl_task method --- airflow-core/docs/core-concepts/auth-manager/index.rst | 1 + 1 file changed, 1 insertion(+) diff --git a/airflow-core/docs/core-concepts/auth-manager/index.rst b/airflow-core/docs/core-concepts/auth-manager/index.rst index 9b3d337c4257a..511aa308beb1f 100644 --- a/airflow-core/docs/core-concepts/auth-manager/index.rst +++ b/airflow-core/docs/core-concepts/auth-manager/index.rst @@ -144,6 +144,7 @@ These authorization methods are: * ``is_authorized_view``: Return whether the user is authorized to access a specific view in Airflow. The view is specified through ``access_view`` (e.g. ``AccessView.CLUSTER_ACTIVITY``). * ``is_authorized_custom_view``: Return whether the user is authorized to access a specific view not defined in Airflow. This view can be provided by the auth manager itself or a plugin defined by the user. * ``filter_authorized_menu_items``: Given the list of menu items in the UI, return the list of menu items the user has access to. + It should be noted that the ``method`` parameter listed above may only have relevance for a specific subset of the auth manager's authorization methods. For example, the ``configuration`` resource is by definition read-only, so only the ``GET`` parameter is relevant in the context of ``is_authorized_configuration``.