diff --git a/airflow-core/docs/core-concepts/auth-manager/index.rst b/airflow-core/docs/core-concepts/auth-manager/index.rst index 258b9aae3622a..be626edde7fe4 100644 --- a/airflow-core/docs/core-concepts/auth-manager/index.rst +++ b/airflow-core/docs/core-concepts/auth-manager/index.rst @@ -201,6 +201,7 @@ The following methods aren't required to override to have a functional Airflow a * ``batch_is_authorized_pool``: Batch version of ``is_authorized_pool``. If not overridden, it will call ``is_authorized_pool`` for every single item. * ``batch_is_authorized_variable``: Batch version of ``is_authorized_variable``. If not overridden, it will call ``is_authorized_variable`` for every single item. * ``get_authorized_dag_ids``: Return the list of Dag IDs the user has access to. If not overridden, it will call ``is_authorized_dag`` for every single Dag available in the environment. +* ``is_authorized_hitl_task``: Return whether the user is authorized to approve or reject a Human-in-the-loop (HITL) task. Override this method to implement custom authorization logic for HITL tasks. If not overridden, it checks if the user's ID is in the assigned users list. * Note: To filter the results of ``get_authorized_dag_ids``, it is recommended that you define the filtering logic in your ``filter_authorized_dag_ids`` method. For example, this may be useful if you rely on per-Dag access controls derived from one or more fields on a given Dag (e.g. Dag tags). * This method requires an active session with the Airflow metadata database. As such, overriding the ``get_authorized_dag_ids`` method is an advanced use case, which should be considered carefully -- it is recommended you refer to the :doc:`../../database-erd-ref`. diff --git a/airflow-core/newsfragments/59399.feature.rst b/airflow-core/newsfragments/59399.feature.rst new file mode 100644 index 0000000000000..4017b3ec0e946 --- /dev/null +++ b/airflow-core/newsfragments/59399.feature.rst @@ -0,0 +1 @@ +Add ``is_authorized_hitl_task()`` method to check whether a user a is authorized to approve a HITL task diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py index b8656cd068f10..d7d2acabe593e 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py @@ -318,6 +318,18 @@ def filter_authorized_menu_items(self, menu_items: list[MenuItem], *, user: T) - :param user: the user """ + def is_authorized_hitl_task(self, *, assigned_users: set[str], user: T) -> bool: + """ + Check if a user is allowed to approve/reject a HITL task. + + By default, checks if the user's ID is in the assigned_users set. + Auth managers can override this method to implement custom logic. + + :param assigned_users: set of user IDs assigned to the task + :param user: the user to check authorization for + """ + return user.get_id() in assigned_users + def batch_is_authorized_connection( self, requests: Sequence[IsAuthorizedConnectionRequest], diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py index 28e3c172f8995..09250168e14ed 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py @@ -272,6 +272,26 @@ def filter_authorized_menu_items( ) -> list[MenuItem]: return menu_items + def is_authorized_hitl_task(self, *, assigned_users: set[str], user: SimpleAuthManagerUser) -> bool: + """ + Check if a user is allowed to approve/reject a HITL task. + + When simple_auth_manager_all_admins=True, all authenticated users are allowed + to approve/reject any task. Otherwise, the user must be in the assigned_users set. + """ + is_simple_auth_manager_all_admins = conf.getboolean("core", "simple_auth_manager_all_admins") + + if is_simple_auth_manager_all_admins: + # In all-admin mode, everyone is allowed + return True + + # If no assigned_users specified, allow access + if not assigned_users: + return True + + # Delegate to parent class for the actual authorization check + return super().is_authorized_hitl_task(assigned_users=assigned_users, user=user) + def get_fastapi_app(self) -> FastAPI | None: """ Specify a sub FastAPI application specific to the auth manager. diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py index 361d5faf933f2..ba7e29a123b81 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/hitl.py @@ -51,7 +51,12 @@ UpdateHITLDetailPayload, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc -from airflow.api_fastapi.core_api.security import GetUserDep, ReadableTIFilterDep, requires_access_dag +from airflow.api_fastapi.core_api.security import ( + GetUserDep, + ReadableTIFilterDep, + get_auth_manager, + requires_access_dag, +) from airflow.api_fastapi.logging.decorators import action_logging from airflow.models.dag_version import DagVersion from airflow.models.dagrun import DagRun @@ -155,7 +160,9 @@ def update_hitl_detail( user_id = str(user_id) hitl_user = HITLUser(id=user_id, name=user_name) if hitl_detail_model.assigned_users: - if hitl_user not in hitl_detail_model.assigned_users: + # Convert assigned_users list to set of user IDs for authorization check + assigned_user_ids = {assigned_user["id"] for assigned_user in hitl_detail_model.assigned_users} + if not get_auth_manager().is_authorized_hitl_task(assigned_users=assigned_user_ids, user=user): log.error("User=%s (id=%s) is not a respondent for the task", user_name, user_id) raise HTTPException( status.HTTP_403_FORBIDDEN, diff --git a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py index 15bbb19a1a212..de58efca7e1a8 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py +++ b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py @@ -268,3 +268,25 @@ def test_filter_authorized_menu_items(self, auth_manager): items, user=SimpleAuthManagerUser(username="test", role=None) ) assert results == items + + @pytest.mark.parametrize( + ("all_admins", "user_id", "assigned_users", "expected"), + [ + # When simple_auth_manager_all_admins=True, any user should be allowed + (True, "user1", {"user2"}, True), + (True, "user2", {"user2"}, True), + (True, "admin", {"test_user"}, True), + # When simple_auth_manager_all_admins=False, user must be in assigned_users + (False, "user1", {"user1"}, True), + (False, "user2", {"user1"}, False), + (False, "admin", {"test_user"}, False), + # When no assigned_users, allow access + (False, "user1", set(), True), + ], + ) + def test_is_authorized_hitl_task(self, auth_manager, all_admins, user_id, assigned_users, expected): + """Test is_authorized_hitl_task method with different configurations.""" + with conf_vars({("core", "simple_auth_manager_all_admins"): str(all_admins)}): + user = SimpleAuthManagerUser(username=user_id, role="user") + result = auth_manager.is_authorized_hitl_task(assigned_users=assigned_users, user=user) + assert result == expected