Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airflow-core/docs/core-concepts/auth-manager/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ The following methods aren't required to override to have a functional Airflow a
* ``batch_is_authorized_pool``: Batch version of ``is_authorized_pool``. If not overridden, it will call ``is_authorized_pool`` for every single item.
* ``batch_is_authorized_variable``: Batch version of ``is_authorized_variable``. If not overridden, it will call ``is_authorized_variable`` for every single item.
* ``get_authorized_dag_ids``: Return the list of Dag IDs the user has access to. If not overridden, it will call ``is_authorized_dag`` for every single Dag available in the environment.
* ``is_authorized_hitl_task``: Return whether the user is authorized to approve or reject a Human-in-the-loop (HITL) task. Override this method to implement custom authorization logic for HITL tasks. If not overridden, it checks if the user's ID is in the assigned users list.

* Note: To filter the results of ``get_authorized_dag_ids``, it is recommended that you define the filtering logic in your ``filter_authorized_dag_ids`` method. For example, this may be useful if you rely on per-Dag access controls derived from one or more fields on a given Dag (e.g. Dag tags).
* This method requires an active session with the Airflow metadata database. As such, overriding the ``get_authorized_dag_ids`` method is an advanced use case, which should be considered carefully -- it is recommended you refer to the :doc:`../../database-erd-ref`.
Expand Down
1 change: 1 addition & 0 deletions airflow-core/newsfragments/59399.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add ``is_authorized_hitl_task()`` method to check whether a user a is authorized to approve a HITL task
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,18 @@ def filter_authorized_menu_items(self, menu_items: list[MenuItem], *, user: T) -
:param user: the user
"""

def is_authorized_hitl_task(self, *, assigned_users: set[str], user: T) -> bool:
"""
Check if a user is allowed to approve/reject a HITL task.

By default, checks if the user's ID is in the assigned_users set.
Auth managers can override this method to implement custom logic.

:param assigned_users: set of user IDs assigned to the task
:param user: the user to check authorization for
"""
return user.get_id() in assigned_users

def batch_is_authorized_connection(
self,
requests: Sequence[IsAuthorizedConnectionRequest],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,26 @@ def filter_authorized_menu_items(
) -> list[MenuItem]:
return menu_items

def is_authorized_hitl_task(self, *, assigned_users: set[str], user: SimpleAuthManagerUser) -> bool:
"""
Check if a user is allowed to approve/reject a HITL task.

When simple_auth_manager_all_admins=True, all authenticated users are allowed
to approve/reject any task. Otherwise, the user must be in the assigned_users set.
"""
is_simple_auth_manager_all_admins = conf.getboolean("core", "simple_auth_manager_all_admins")

if is_simple_auth_manager_all_admins:
# In all-admin mode, everyone is allowed
return True

# If no assigned_users specified, allow access
if not assigned_users:
return True

# Delegate to parent class for the actual authorization check
return super().is_authorized_hitl_task(assigned_users=assigned_users, user=user)

def get_fastapi_app(self) -> FastAPI | None:
"""
Specify a sub FastAPI application specific to the auth manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@
UpdateHITLDetailPayload,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import GetUserDep, ReadableTIFilterDep, requires_access_dag
from airflow.api_fastapi.core_api.security import (
GetUserDep,
ReadableTIFilterDep,
get_auth_manager,
requires_access_dag,
)
from airflow.api_fastapi.logging.decorators import action_logging
from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -155,7 +160,9 @@ def update_hitl_detail(
user_id = str(user_id)
hitl_user = HITLUser(id=user_id, name=user_name)
if hitl_detail_model.assigned_users:
if hitl_user not in hitl_detail_model.assigned_users:
# Convert assigned_users list to set of user IDs for authorization check
assigned_user_ids = {assigned_user["id"] for assigned_user in hitl_detail_model.assigned_users}
if not get_auth_manager().is_authorized_hitl_task(assigned_users=assigned_user_ids, user=user):
log.error("User=%s (id=%s) is not a respondent for the task", user_name, user_id)
raise HTTPException(
status.HTTP_403_FORBIDDEN,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,25 @@ def test_filter_authorized_menu_items(self, auth_manager):
items, user=SimpleAuthManagerUser(username="test", role=None)
)
assert results == items

@pytest.mark.parametrize(
("all_admins", "user_id", "assigned_users", "expected"),
[
# When simple_auth_manager_all_admins=True, any user should be allowed
(True, "user1", {"user2"}, True),
(True, "user2", {"user2"}, True),
(True, "admin", {"test_user"}, True),
# When simple_auth_manager_all_admins=False, user must be in assigned_users
(False, "user1", {"user1"}, True),
(False, "user2", {"user1"}, False),
(False, "admin", {"test_user"}, False),
# When no assigned_users, allow access
(False, "user1", set(), True),
],
)
def test_is_authorized_hitl_task(self, auth_manager, all_admins, user_id, assigned_users, expected):
"""Test is_authorized_hitl_task method with different configurations."""
with conf_vars({("core", "simple_auth_manager_all_admins"): str(all_admins)}):
user = SimpleAuthManagerUser(username=user_id, role="user")
result = auth_manager.is_authorized_hitl_task(assigned_users=assigned_users, user=user)
assert result == expected