Skip to content
Merged
1 change: 1 addition & 0 deletions airflow-core/docs/core-concepts/auth-manager/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ The following methods aren't required to override to have a functional Airflow a
* ``filter_authorized_dag_ids``: Given a list of Dag IDs, return the list of Dag IDs the user has access to. If not overridden, it calls ``is_authorized_dag`` for every single Dag passes as parameter.
* ``filter_authorized_pools``: Given a list of pool names, return the list of pool names the user has access to. If not overridden, it calls ``is_authorized_pool`` for every single pool passed as parameter.
* ``filter_authorized_variables``: Given a list of variable keys, return the list of variable keys the user has access to. If not overridden, it calls ``is_authorized_variable`` for every single variable passed as parameter.
* ``is_authorized_hitl_task``: Return whether the user is authorized to approve or reject a Human-in-the-loop (HITL) task. Override this method to implement custom authorization logic for HITL tasks. If not overridden, it checks if the user's ID is in the assigned users list.

CLI
^^^
Expand Down
1 change: 1 addition & 0 deletions airflow-core/newsfragments/59399.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``is_authorized_hitl_task()`` method to check whether a user a is authorized to approve a HITL task
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,18 @@ def filter_authorized_menu_items(self, menu_items: list[MenuItem], *, user: T) -
:param user: the user
"""

def is_authorized_hitl_task(self, *, assigned_users: set[str], user: T) -> bool:
"""
Check if a user is allowed to approve/reject a HITL task.

By default, checks if the user's ID is in the assigned_users set.
Auth managers can override this method to implement custom logic.

:param assigned_users: set of user IDs assigned to the task
:param user: the user to check authorization for
"""
return user.get_id() in assigned_users

def batch_is_authorized_connection(
self,
requests: Sequence[IsAuthorizedConnectionRequest],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,26 @@ def filter_authorized_menu_items(
) -> list[MenuItem]:
return menu_items

def is_authorized_hitl_task(self, *, assigned_users: set[str], user: SimpleAuthManagerUser) -> bool:
"""
Check if a user is allowed to approve/reject a HITL task.

When simple_auth_manager_all_admins=True, all authenticated users are allowed
to approve/reject any task. Otherwise, the user must be in the assigned_users set.
"""
is_simple_auth_manager_all_admins = conf.getboolean("core", "simple_auth_manager_all_admins")

if is_simple_auth_manager_all_admins:
# In all-admin mode, everyone is allowed
return True

# If no assigned_users specified, allow access
if not assigned_users:
return True

# Delegate to parent class for the actual authorization check
return super().is_authorized_hitl_task(assigned_users=assigned_users, user=user)

def get_fastapi_app(self) -> FastAPI | None:
"""
Specify a sub FastAPI application specific to the auth manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@
UpdateHITLDetailPayload,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import GetUserDep, ReadableTIFilterDep, requires_access_dag
from airflow.api_fastapi.core_api.security import (
GetUserDep,
ReadableTIFilterDep,
get_auth_manager,
requires_access_dag,
)
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -155,7 +160,9 @@ def update_hitl_detail(
user_id = str(user_id)
hitl_user = HITLUser(id=user_id, name=user_name)
if hitl_detail_model.assigned_users:
if hitl_user not in hitl_detail_model.assigned_users:
# Convert assigned_users list to set of user IDs for authorization check
assigned_user_ids = {assigned_user["id"] for assigned_user in hitl_detail_model.assigned_users}
if not get_auth_manager().is_authorized_hitl_task(assigned_users=assigned_user_ids, user=user):
log.error("User=%s (id=%s) is not a respondent for the task", user_name, user_id)
raise HTTPException(
status.HTTP_403_FORBIDDEN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,3 +271,25 @@ def test_filter_authorized_menu_items(self, auth_manager):
items, user=SimpleAuthManagerUser(username="test", role=None)
)
assert results == items

@pytest.mark.parametrize(
("all_admins", "user_id", "assigned_users", "expected"),
[
# When simple_auth_manager_all_admins=True, any user should be allowed
(True, "user1", {"user2"}, True),
(True, "user2", {"user2"}, True),
(True, "admin", {"test_user"}, True),
# When simple_auth_manager_all_admins=False, user must be in assigned_users
(False, "user1", {"user1"}, True),
(False, "user2", {"user1"}, False),
(False, "admin", {"test_user"}, False),
# When no assigned_users, allow access
(False, "user1", set(), True),
],
)
def test_is_authorized_hitl_task(self, auth_manager, all_admins, user_id, assigned_users, expected):
"""Test is_authorized_hitl_task method with different configurations."""
with conf_vars({("core", "simple_auth_manager_all_admins"): str(all_admins)}):
user = SimpleAuthManagerUser(username=user_id, role="user")
result = auth_manager.is_authorized_hitl_task(assigned_users=assigned_users, user=user)
assert result == expected
Original file line number Diff line number Diff line change
Expand Up @@ -594,3 +594,23 @@ def side_effect_func(
session.execute.return_value.all.return_value = rows
result = auth_manager.get_authorized_pools(user=user, session=session)
assert result == expected

@pytest.mark.parametrize(
("user_id", "assigned_users", "expected"),
[
# User in assigned_users
("user1", {"user1", "user2"}, True),
("user2", {"user1", "user2"}, True),
# User not in assigned_users
("user3", {"user1", "user2"}, False),
# Empty assigned_users
("user1", set(), False),
],
)
def test_is_authorized_hitl_task(
self, auth_manager, user_id: str, assigned_users: set[str], expected: bool
):
"""Test is_authorized_hitl_task method with the new signature."""
user = BaseAuthManagerUserTest(name=user_id)
result = auth_manager.is_authorized_hitl_task(assigned_users=assigned_users, user=user)
assert result == expected
Loading