diff --git a/airflow-core/docs/core-concepts/auth-manager/index.rst b/airflow-core/docs/core-concepts/auth-manager/index.rst index 71db38330303a..2e787c9e7fb6b 100644 --- a/airflow-core/docs/core-concepts/auth-manager/index.rst +++ b/airflow-core/docs/core-concepts/auth-manager/index.rst @@ -136,7 +136,6 @@ These authorization methods are: Also, ``is_authorized_dag`` is called for any entity related to Dags (e.g. task instances, Dag runs, ...). This information is passed in ``access_entity``. Example: ``auth_manager.is_authorized_dag(method="GET", access_entity=DagAccessEntity.Run, details=DagDetails(id="dag-1"))`` asks whether the user has permission to read the Dag runs of the Dag "dag-1". -* ``is_authorized_backfill``: Return whether the user is authorized to access Airflow backfills. Some details about the backfill can be provided (e.g. the backfill ID). * ``is_authorized_asset``: Return whether the user is authorized to access Airflow assets. Some details about the asset can be provided (e.g. the asset ID). * ``is_authorized_asset_alias``: Return whether the user is authorized to access Airflow asset aliases. Some details about the asset alias can be provided (e.g. the asset alias ID). * ``is_authorized_pool``: Return whether the user is authorized to access Airflow pools. Some details about the pool can be provided (e.g. the pool name). diff --git a/airflow-core/newsfragments/61400.significant.rst b/airflow-core/newsfragments/61400.significant.rst new file mode 100644 index 0000000000000..7bdee4390a7ea --- /dev/null +++ b/airflow-core/newsfragments/61400.significant.rst @@ -0,0 +1,20 @@ +AuthManager Backfill permissions are now handled by the ``requires_access_dag`` on the ``DagAccessEntity.Run`` + +``is_authorized_backfill`` of the ``BaseAuthManager`` interface has been removed. Core will no longer call this method and their +provider counterpart implementation will be marked as deprecated. +Permissions for backfill operations are now checked against the ``DagAccessEntity.Run`` permission using the existing +``requires_access_dag`` decorator. In other words, if a user has permission to run a DAG, they can perform backfill operations on it. + +Please update your security policies to ensure that users who need to perform backfill operations have the appropriate ``DagAccessEntity.Run`` permissions. (Users +having the Backfill permissions without having the DagRun ones will no longer be able to perform backfill operations without any update) + +* Types of change + + * [ ] Dag changes + * [ ] Config changes + * [x] API changes + * [ ] CLI changes + * [x] Behaviour changes + * [ ] Plugin changes + * [ ] Dependency changes + * [ ] Code interface changes diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py index 5e98dc86fb087..12b975cd28afd 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/base_auth_manager.py @@ -29,7 +29,6 @@ from airflow.api_fastapi.auth.managers.models.base_user import BaseUser from airflow.api_fastapi.auth.managers.models.resource_details import ( - BackfillDetails, ConnectionDetails, DagDetails, PoolDetails, @@ -230,22 +229,6 @@ def is_authorized_dag( :param details: optional details about the DAG """ - @abstractmethod - def is_authorized_backfill( - self, - *, - method: ResourceMethod, - user: T, - details: BackfillDetails | None = None, - ) -> bool: - """ - Return whether the user is authorized to perform a given action on a backfill. - - :param method: the method to perform - :param user: the user to performing the action - :param details: optional details about the backfill - """ - @abstractmethod def is_authorized_asset( self, diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py b/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py index 6a0041570a159..4ff380d2aedc5 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/models/resource_details.py @@ -48,7 +48,12 @@ class DagDetails: @dataclass class BackfillDetails: - """Represents the details of a backfill.""" + """ + Represents the details of a backfill. + + .. deprecated:: 3.1.8 + Use DagAccessEntity.Run instead for a dag level access control. + """ id: NonNegativeInt | None = None diff --git a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py index f9f9c395e3fe2..1bdbcc0a3a233 100644 --- a/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py +++ b/airflow-core/src/airflow/api_fastapi/auth/managers/simple/simple_auth_manager.py @@ -36,7 +36,7 @@ from airflow.api_fastapi.app import AUTH_MANAGER_FASTAPI_APP_PREFIX from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager -from airflow.api_fastapi.auth.managers.models.resource_details import BackfillDetails, TeamDetails +from airflow.api_fastapi.auth.managers.models.resource_details import TeamDetails from airflow.api_fastapi.auth.managers.simple.user import SimpleAuthManagerUser from airflow.api_fastapi.common.types import MenuItem from airflow.configuration import AIRFLOW_HOME, conf @@ -194,20 +194,6 @@ def is_authorized_dag( user=user, ) - def is_authorized_backfill( - self, - *, - method: ResourceMethod, - user: SimpleAuthManagerUser, - details: BackfillDetails | None = None, - ) -> bool: - return self._is_authorized( - method=method, - allow_get_role=SimpleAuthManagerRole.VIEWER, - allow_role=SimpleAuthManagerRole.OP, - user=user, - ) - def is_authorized_asset( self, *, diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py index adbac360247c0..b106248f136ed 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/public/backfills.py @@ -25,7 +25,6 @@ from sqlalchemy.orm import joinedload from airflow._shared.timezones import timezone -from airflow.api_fastapi.auth.managers.models.resource_details import DagAccessEntity from airflow.api_fastapi.common.db.common import ( SessionDep, paginated_select, @@ -42,7 +41,7 @@ from airflow.api_fastapi.core_api.openapi.exceptions import ( create_openapi_http_exception_doc, ) -from airflow.api_fastapi.core_api.security import GetUserDep, requires_access_backfill, requires_access_dag +from airflow.api_fastapi.core_api.security import GetUserDep, requires_access_backfill from airflow.api_fastapi.logging.decorators import action_logging from airflow.exceptions import DagNotFound from airflow.models import DagRun @@ -121,7 +120,6 @@ def get_backfill( dependencies=[ Depends(action_logging()), Depends(requires_access_backfill(method="PUT")), - Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)), ], ) def pause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> BackfillResponse: @@ -149,7 +147,6 @@ def pause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> Backfill dependencies=[ Depends(action_logging()), Depends(requires_access_backfill(method="PUT")), - Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)), ], ) def unpause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> BackfillResponse: @@ -175,7 +172,6 @@ def unpause_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> Backfi ), dependencies=[ Depends(action_logging()), - Depends(requires_access_dag(method="PUT", access_entity=DagAccessEntity.RUN)), Depends(requires_access_backfill(method="PUT")), ], ) @@ -222,7 +218,6 @@ def cancel_backfill(backfill_id: NonNegativeInt, session: SessionDep) -> Backfil responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]), dependencies=[ Depends(action_logging()), - Depends(requires_access_dag(method="POST", access_entity=DagAccessEntity.RUN)), Depends(requires_access_backfill(method="POST")), ], ) @@ -270,7 +265,6 @@ def create_backfill( path="/dry_run", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]), dependencies=[ - Depends(requires_access_dag(method="POST", access_entity=DagAccessEntity.RUN)), Depends(requires_access_backfill(method="POST")), ], ) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py index c4c7ffe79a7fc..32b2891b9543a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/backfills.py @@ -36,7 +36,7 @@ from airflow.api_fastapi.core_api.openapi.exceptions import ( create_openapi_http_exception_doc, ) -from airflow.api_fastapi.core_api.security import requires_access_backfill, requires_access_dag +from airflow.api_fastapi.core_api.security import requires_access_backfill from airflow.models.backfill import Backfill backfills_router = AirflowRouter(tags=["Backfill"], prefix="/backfills") @@ -47,7 +47,6 @@ responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), dependencies=[ Depends(requires_access_backfill(method="GET")), - Depends(requires_access_dag(method="GET")), ], ) def list_backfills_ui( diff --git a/airflow-core/src/airflow/api_fastapi/core_api/security.py b/airflow-core/src/airflow/api_fastapi/core_api/security.py index 875513c8b2e0d..25b0a15a3a514 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/security.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/security.py @@ -16,16 +16,17 @@ # under the License. from __future__ import annotations -from collections.abc import Callable +from collections.abc import Callable, Coroutine +from json import JSONDecodeError from pathlib import Path -from typing import TYPE_CHECKING, Annotated, cast +from typing import TYPE_CHECKING, Annotated, Any, cast from urllib.parse import ParseResult, unquote, urljoin, urlparse from fastapi import Depends, HTTPException, Request, status from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer, OAuth2PasswordBearer from jwt import ExpiredSignatureError, InvalidTokenError -from pydantic import NonNegativeInt -from sqlalchemy import or_ +from sqlalchemy import or_, select +from sqlalchemy.orm import Session from airflow.api_fastapi.app import get_auth_manager from airflow.api_fastapi.auth.managers.base_auth_manager import ( @@ -42,7 +43,6 @@ AccessView, AssetAliasDetails, AssetDetails, - BackfillDetails, ConfigurationDetails, ConnectionDetails, DagAccessEntity, @@ -50,6 +50,7 @@ PoolDetails, VariableDetails, ) +from airflow.api_fastapi.common.db.common import SessionDep from airflow.api_fastapi.core_api.base import OrmClause from airflow.api_fastapi.core_api.datamodels.common import ( BulkAction, @@ -64,6 +65,7 @@ from airflow.api_fastapi.core_api.datamodels.variables import VariableBody from airflow.configuration import conf from airflow.models import Connection, Pool, Variable +from airflow.models.backfill import Backfill from airflow.models.dag import DagModel, DagRun, DagTag from airflow.models.dagwarning import DagWarning from airflow.models.log import Log @@ -146,14 +148,21 @@ async def get_user( def requires_access_dag( - method: ResourceMethod, access_entity: DagAccessEntity | None = None + method: ResourceMethod, + access_entity: DagAccessEntity | None = None, + param_dag_id: str | None = None, ) -> Callable[[Request, BaseUser], None]: def inner( request: Request, user: GetUserDep, ) -> None: - dag_id = request.path_params.get("dag_id") or request.query_params.get("dag_id") - dag_id = dag_id if dag_id != "~" else None + # Required for the closure to capture the dag_id but still be able to mutate it. + # Prevent from using a nonlocal statement causing test failures. + dag_id = param_dag_id + if dag_id is None: + dag_id = request.path_params.get("dag_id") or request.query_params.get("dag_id") + dag_id = dag_id if dag_id != "~" else None + team_name = DagModel.get_team_name(dag_id) if dag_id else None _requires_access( @@ -263,17 +272,35 @@ def depends_permitted_dags_filter( ] -def requires_access_backfill(method: ResourceMethod) -> Callable[[Request, BaseUser], None]: - def inner( +def requires_access_backfill( + method: ResourceMethod, +) -> Callable[[Request, BaseUser, Session], Coroutine[Any, Any, None]]: + """Wrap ``requires_access_dag`` and extract the dag_id from the backfill_id.""" + + async def inner( request: Request, user: GetUserDep, + session: SessionDep, ) -> None: - backfill_id: NonNegativeInt | None = request.path_params.get("backfill_id") - - _requires_access( - is_authorized_callback=lambda: get_auth_manager().is_authorized_backfill( - method=method, details=BackfillDetails(id=backfill_id), user=user - ), + dag_id = None + + # Try to retrieve the dag_id from the backfill_id path param + backfill_id = request.path_params.get("backfill_id") + if backfill_id is not None and isinstance(backfill_id, int): + backfill = session.scalars(select(Backfill).where(Backfill.id == backfill_id)).one_or_none() + dag_id = backfill.dag_id if backfill else None + + # Try to retrieve the dag_id from the request body (POST backfill) + if dag_id is None: + try: + dag_id = (await request.json()).get("dag_id") + except JSONDecodeError: + # Not a json body, ignore + pass + + requires_access_dag(method, DagAccessEntity.RUN, dag_id)( + request, + user, ) return inner diff --git a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py index 2c67f79a5b879..1356a9fe1e8b4 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py +++ b/airflow-core/tests/unit/api_fastapi/auth/managers/simple/test_simple_auth_manager.py @@ -135,7 +135,6 @@ def test_serialize_user(self, auth_manager): "is_authorized_dag", "is_authorized_asset", "is_authorized_asset_alias", - "is_authorized_backfill", "is_authorized_pool", "is_authorized_variable", ], @@ -191,7 +190,6 @@ def test_is_authorized_view_methods(self, auth_manager, api, kwargs, role, resul "is_authorized_connection", "is_authorized_asset", "is_authorized_asset_alias", - "is_authorized_backfill", "is_authorized_pool", "is_authorized_variable", ], @@ -237,7 +235,6 @@ def test_is_authorized_methods_user_role_required(self, auth_manager, api, role, "is_authorized_dag", "is_authorized_asset", "is_authorized_asset_alias", - "is_authorized_backfill", "is_authorized_pool", ], ) diff --git a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py index e719ff4836b58..71769ef49a600 100644 --- a/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py +++ b/airflow-core/tests/unit/api_fastapi/auth/managers/test_base_auth_manager.py @@ -25,7 +25,6 @@ from airflow.api_fastapi.auth.managers.base_auth_manager import BaseAuthManager, T from airflow.api_fastapi.auth.managers.models.base_user import BaseUser from airflow.api_fastapi.auth.managers.models.resource_details import ( - BackfillDetails, ConnectionDetails, DagDetails, PoolDetails, @@ -92,15 +91,6 @@ def is_authorized_dag( ) -> bool: raise NotImplementedError() - def is_authorized_backfill( - self, - *, - method: ResourceMethod, - details: BackfillDetails | None = None, - user: BaseAuthManagerUserTest | None = None, - ) -> bool: - raise NotImplementedError() - def is_authorized_asset( self, *, diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py index ade10293ef6d9..c55e284de62fd 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_backfills.py @@ -126,7 +126,7 @@ def test_list_backfill(self, test_client, session): session.add(b) session.commit() - with assert_queries_count(2): + with assert_queries_count(3): response = test_client.get(f"/backfills?dag_id={dag.dag_id}") assert response.status_code == 200 diff --git a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py index 5aaf509b66bde..8773d882d93bf 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/test_security.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/test_security.py @@ -27,6 +27,7 @@ from airflow.api_fastapi.auth.managers.models.resource_details import ( ConnectionDetails, DagAccessEntity, + DagDetails, PoolDetails, VariableDetails, ) @@ -38,6 +39,7 @@ from airflow.api_fastapi.core_api.security import ( get_user, is_safe_url, + requires_access_backfill, requires_access_connection, requires_access_connection_bulk, requires_access_dag, @@ -48,6 +50,7 @@ resolve_user_from_token, ) from airflow.models import Connection, Pool, Variable +from airflow.models.dag import DagModel from tests_common.test_utils.config import conf_vars @@ -147,37 +150,233 @@ async def test_get_user_with_token( mock_resolve_user_from_token.assert_called_once_with(expected) @pytest.mark.db_test + @pytest.mark.parametrize( + ("param_dag_id", "path_params", "query_params", "expected_dag_id"), + [ + # param_dag_id takes precedence when provided + ("dag_id_from_param", {}, {}, "dag_id_from_param"), + ( + "dag_id_from_param", + {"dag_id": "dag_id_from_path_params"}, + {"dag_id": "dag_id_from_query_params"}, + "dag_id_from_param", + ), + # path_params used when param_dag_id is None + (None, {"dag_id": "dag_id_from_path_params"}, {}, "dag_id_from_path_params"), + ( + None, + {"dag_id": "dag_id_from_path_params"}, + {"dag_id": "dag_id_from_query_params"}, + "dag_id_from_path_params", + ), + # query_params used when param_dag_id and path_params have no dag_id + (None, {}, {"dag_id": "dag_id_from_query_params"}, "dag_id_from_query_params"), + # "~" is treated as None + (None, {"dag_id": "~"}, {}, None), + (None, {}, {"dag_id": "~"}, None), + # No source → dag_id None + (None, {}, {}, None), + ], + ) + @patch.object(DagModel, "get_team_name") @patch("airflow.api_fastapi.core_api.security.get_auth_manager") - def test_requires_access_dag_authorized(self, mock_get_auth_manager): + def test_requires_access_dag_authorized( + self, + mock_get_auth_manager, + mock_get_team_name, + param_dag_id, + path_params, + query_params, + expected_dag_id, + ): auth_manager = Mock() auth_manager.is_authorized_dag.return_value = True mock_get_auth_manager.return_value = auth_manager + mock_get_team_name.return_value = "team1" if expected_dag_id else None + fastapi_request = Mock() - fastapi_request.path_params = {} - fastapi_request.query_params = {} + fastapi_request.path_params = path_params + fastapi_request.query_params = query_params + user = Mock() - requires_access_dag("GET", DagAccessEntity.CODE)(fastapi_request, Mock()) + requires_access_dag("GET", DagAccessEntity.CODE, param_dag_id=param_dag_id)(fastapi_request, user) - auth_manager.is_authorized_dag.assert_called_once() + auth_manager.is_authorized_dag.assert_called_once_with( + method="GET", + access_entity=DagAccessEntity.CODE, + details=DagDetails(id=expected_dag_id, team_name=mock_get_team_name.return_value), + user=user, + ) @pytest.mark.db_test + @pytest.mark.parametrize( + ("param_dag_id", "path_params", "query_params", "expected_dag_id"), + [ + ("dag_id_from_param", {}, {}, "dag_id_from_param"), + (None, {"dag_id": "dag_id_from_path_params"}, {}, "dag_id_from_path_params"), + (None, {}, {"dag_id": "dag_id_from_query_params"}, "dag_id_from_query_params"), + (None, {}, {}, None), + ], + ) + @patch.object(DagModel, "get_team_name") @patch("airflow.api_fastapi.core_api.security.get_auth_manager") - def test_requires_access_dag_unauthorized(self, mock_get_auth_manager): + def test_requires_access_dag_unauthorized( + self, + mock_get_auth_manager, + mock_get_team_name, + param_dag_id, + path_params, + query_params, + expected_dag_id, + ): auth_manager = Mock() auth_manager.is_authorized_dag.return_value = False mock_get_auth_manager.return_value = auth_manager + mock_get_team_name.return_value = "team1" if expected_dag_id else None + fastapi_request = Mock() - fastapi_request.path_params = {} - fastapi_request.query_params = {} + fastapi_request.path_params = path_params + fastapi_request.query_params = query_params + user = Mock() - mock_request = Mock() - mock_request.path_params.return_value = {} - mock_request.query_params.return_value = {} + with pytest.raises(HTTPException, match="Forbidden"): + requires_access_dag("GET", DagAccessEntity.CODE, param_dag_id=param_dag_id)(fastapi_request, user) + + auth_manager.is_authorized_dag.assert_called_once_with( + method="GET", + access_entity=DagAccessEntity.CODE, + details=DagDetails(id=expected_dag_id, team_name=mock_get_team_name.return_value), + user=user, + ) + + @pytest.mark.db_test + @pytest.mark.asyncio + @patch.object(DagModel, "get_team_name") + @patch("airflow.api_fastapi.core_api.security.get_auth_manager") + async def test_requires_access_backfill_authorized_from_path( + self, mock_get_auth_manager, mock_get_team_name + ): + """When backfill_id is in path and Backfill exists, dag_id from backfill is used.""" + auth_manager = Mock() + auth_manager.is_authorized_dag.return_value = True + mock_get_auth_manager.return_value = auth_manager + mock_get_team_name.return_value = "team1" + + backfill = Mock() + backfill.dag_id = "backfill_dag_id" + session = Mock() + session.scalars.return_value.one_or_none.return_value = backfill + + request = Mock() + request.path_params = {"backfill_id": 42} + request.json = AsyncMock(return_value={}) + + user = Mock() + + inner = requires_access_backfill("PUT") + await inner(request, user, session) + + auth_manager.is_authorized_dag.assert_called_once_with( + method="PUT", + access_entity=DagAccessEntity.RUN, + details=DagDetails(id="backfill_dag_id", team_name="team1"), + user=user, + ) + + @pytest.mark.db_test + @pytest.mark.asyncio + @patch.object(DagModel, "get_team_name") + @patch("airflow.api_fastapi.core_api.security.get_auth_manager") + async def test_requires_access_backfill_authorized_from_body( + self, mock_get_auth_manager, mock_get_team_name + ): + """When backfill_id is missing or not int, dag_id can come from request body (POST backfill).""" + auth_manager = Mock() + auth_manager.is_authorized_dag.return_value = True + mock_get_auth_manager.return_value = auth_manager + mock_get_team_name.return_value = "team1" + + session = Mock() + session.scalars.return_value.one_or_none.return_value = None + + request = Mock() + request.path_params = {} + request.json = AsyncMock(return_value={"dag_id": "body_dag_id"}) + + user = Mock() + + inner = requires_access_backfill("POST") + await inner(request, user, session) + + auth_manager.is_authorized_dag.assert_called_once_with( + method="POST", + access_entity=DagAccessEntity.RUN, + details=DagDetails(id="body_dag_id", team_name="team1"), + user=user, + ) + + @pytest.mark.db_test + @pytest.mark.asyncio + @patch.object(DagModel, "get_team_name") + @patch("airflow.api_fastapi.core_api.security.get_auth_manager") + async def test_requires_access_backfill_unauthorized(self, mock_get_auth_manager, mock_get_team_name): + """When is_authorized_dag returns False, Forbidden is raised.""" + auth_manager = Mock() + auth_manager.is_authorized_dag.return_value = False + mock_get_auth_manager.return_value = auth_manager + mock_get_team_name.return_value = None + + backfill = Mock() + backfill.dag_id = "unauthorized_dag" + session = Mock() + session.scalars.return_value.one_or_none.return_value = backfill + + request = Mock() + request.path_params = {"backfill_id": 1} + user = Mock() + inner = requires_access_backfill("GET") with pytest.raises(HTTPException, match="Forbidden"): - requires_access_dag("GET", DagAccessEntity.CODE)(fastapi_request, Mock()) + await inner(request, user, session) + + auth_manager.is_authorized_dag.assert_called_once_with( + method="GET", + access_entity=DagAccessEntity.RUN, + details=DagDetails(id="unauthorized_dag", team_name=None), + user=user, + ) - auth_manager.is_authorized_dag.assert_called_once() + @pytest.mark.db_test + @pytest.mark.asyncio + @patch.object(DagModel, "get_team_name") + @patch("airflow.api_fastapi.core_api.security.get_auth_manager") + async def test_requires_access_backfill_backfill_not_found_falls_back_to_body( + self, mock_get_auth_manager, mock_get_team_name + ): + """When backfill_id is int but Backfill not found, dag_id from body is used.""" + auth_manager = Mock() + auth_manager.is_authorized_dag.return_value = True + mock_get_auth_manager.return_value = auth_manager + mock_get_team_name.return_value = "team1" + + session = Mock() + session.scalars.return_value.one_or_none.return_value = None + + request = Mock() + request.path_params = {"backfill_id": 999} + request.json = AsyncMock(return_value={"dag_id": "fallback_dag_id"}) + + user = Mock() + + inner = requires_access_backfill("POST") + await inner(request, user, session) + + auth_manager.is_authorized_dag.assert_called_once_with( + method="POST", + access_entity=DagAccessEntity.RUN, + details=DagDetails(id="fallback_dag_id", team_name="team1"), + user=user, + ) @pytest.mark.parametrize( ("url", "expected_is_safe"),