diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 359621a745391..3ae3391c1a4b0 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -528,6 +528,8 @@ paths: summary: Get Assets description: Get assets. operationId: get_assets + security: + - OAuth2PasswordBearer: [] parameters: - name: limit in: query @@ -728,6 +730,8 @@ paths: summary: Get Asset Events description: Get asset events. operationId: get_asset_events + security: + - OAuth2PasswordBearer: [] parameters: - name: limit in: query @@ -847,6 +851,8 @@ paths: summary: Create Asset Event description: Create asset events. operationId: create_asset_event + security: + - OAuth2PasswordBearer: [] requestBody: required: true content: @@ -891,6 +897,8 @@ paths: summary: Materialize Asset description: Materialize an asset by triggering a DAG run that produces it. operationId: materialize_asset + security: + - OAuth2PasswordBearer: [] parameters: - name: asset_id in: path @@ -942,6 +950,8 @@ paths: summary: Get Asset Queued Events description: Get queued asset events for an asset. operationId: get_asset_queued_events + security: + - OAuth2PasswordBearer: [] parameters: - name: asset_id in: path @@ -994,6 +1004,8 @@ paths: summary: Delete Asset Queued Events description: Delete queued asset events for an asset. operationId: delete_asset_queued_events + security: + - OAuth2PasswordBearer: [] parameters: - name: asset_id in: path @@ -1009,6 +1021,14 @@ paths: - type: string - type: 'null' title: Before + - name: dag_id + in: query + required: false + schema: + anyOf: + - type: string + - type: 'null' + title: Dag Id responses: '204': description: Successful Response @@ -1043,6 +1063,8 @@ paths: summary: Get Asset description: Get an asset. operationId: get_asset + security: + - OAuth2PasswordBearer: [] parameters: - name: asset_id in: path @@ -1088,12 +1110,16 @@ paths: summary: Get Dag Asset Queued Events description: Get queued asset events for a DAG. operationId: get_dag_asset_queued_events + security: + - OAuth2PasswordBearer: [] parameters: - name: dag_id in: path required: true schema: - type: string + anyOf: + - type: string + - type: 'null' title: Dag Id - name: before in: query @@ -1139,12 +1165,16 @@ paths: - Asset summary: Delete Dag Asset Queued Events operationId: delete_dag_asset_queued_events + security: + - OAuth2PasswordBearer: [] parameters: - name: dag_id in: path required: true schema: - type: string + anyOf: + - type: string + - type: 'null' title: Dag Id - name: before in: query @@ -1194,12 +1224,16 @@ paths: summary: Get Dag Asset Queued Event description: Get a queued asset event for a DAG. operationId: get_dag_asset_queued_event + security: + - OAuth2PasswordBearer: [] parameters: - name: dag_id in: path required: true schema: - type: string + anyOf: + - type: string + - type: 'null' title: Dag Id - name: asset_id in: path @@ -1252,12 +1286,16 @@ paths: summary: Delete Dag Asset Queued Event description: Delete a queued asset event for a DAG. operationId: delete_dag_asset_queued_event + security: + - OAuth2PasswordBearer: [] parameters: - name: dag_id in: path required: true schema: - type: string + anyOf: + - type: string + - type: 'null' title: Dag Id - name: asset_id in: path @@ -2223,6 +2261,8 @@ paths: description: If dag run is asset-triggered, return the asset events that triggered it. operationId: get_upstream_asset_events + security: + - OAuth2PasswordBearer: [] parameters: - name: dag_id in: path diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 46cb9bf2c7b00..36f4042c753bc 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -53,6 +53,7 @@ ) from airflow.api_fastapi.core_api.datamodels.dag_run import DAGRunResponse from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.security import requires_access_asset, requires_access_dag from airflow.api_fastapi.logging.decorators import action_logging from airflow.assets.manager import asset_manager from airflow.models.asset import ( @@ -91,6 +92,7 @@ def _generate_queued_event_where_clause( @assets_router.get( "/assets", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_asset(method="GET"))], ) def get_assets( limit: QueryLimit, @@ -173,6 +175,7 @@ def get_asset_alias(asset_alias_id: int, session: SessionDep): @assets_router.get( "/assets/events", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_asset(method="GET"))], ) def get_asset_events( limit: QueryLimit, @@ -232,7 +235,7 @@ def get_asset_events( @assets_router.post( "/assets/events", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), - dependencies=[Depends(action_logging())], + dependencies=[Depends(requires_access_asset(method="POST")), Depends(action_logging())], ) def create_asset_event( body: CreateAssetEventsBody, @@ -259,7 +262,7 @@ def create_asset_event( @assets_router.post( "/assets/{asset_id}/materialize", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND, status.HTTP_409_CONFLICT]), - dependencies=[Depends(action_logging())], + dependencies=[Depends(requires_access_asset(method="POST")), Depends(action_logging())], ) def materialize_asset( asset_id: int, @@ -306,6 +309,7 @@ def materialize_asset( @assets_router.get( "/assets/{asset_id}/queuedEvents", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_asset(method="GET"))], ) def get_asset_queued_events( asset_id: int, @@ -339,6 +343,7 @@ def get_asset_queued_events( @assets_router.get( "/assets/{asset_id}", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_asset(method="GET"))], ) def get_asset( asset_id: int, @@ -360,6 +365,7 @@ def get_asset( @assets_router.get( "/dags/{dag_id}/assets/queuedEvents", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_asset(method="GET")), Depends(requires_access_dag(method="GET"))], ) def get_dag_asset_queued_events( dag_id: str, @@ -389,6 +395,7 @@ def get_dag_asset_queued_events( @assets_router.get( "/dags/{dag_id}/assets/{asset_id}/queuedEvents", responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), + dependencies=[Depends(requires_access_asset(method="GET")), Depends(requires_access_dag(method="GET"))], ) def get_dag_asset_queued_event( dag_id: str, @@ -413,7 +420,11 @@ def get_dag_asset_queued_event( "/assets/{asset_id}/queuedEvents", status_code=status.HTTP_204_NO_CONTENT, responses=create_openapi_http_exception_doc([status.HTTP_404_NOT_FOUND]), - dependencies=[Depends(action_logging())], + dependencies=[ + Depends(requires_access_asset(method="DELETE")), + Depends(requires_access_dag(method="GET")), + Depends(action_logging()), + ], ) def delete_asset_queued_events( asset_id: int, @@ -440,7 +451,11 @@ def delete_asset_queued_events( status.HTTP_404_NOT_FOUND, ] ), - dependencies=[Depends(action_logging())], + dependencies=[ + Depends(requires_access_asset(method="DELETE")), + Depends(requires_access_dag(method="GET")), + Depends(action_logging()), + ], ) def delete_dag_asset_queued_events( dag_id: str, @@ -465,7 +480,11 @@ def delete_dag_asset_queued_events( status.HTTP_404_NOT_FOUND, ] ), - dependencies=[Depends(action_logging())], + dependencies=[ + Depends(requires_access_asset(method="DELETE")), + Depends(requires_access_dag(method="GET")), + Depends(action_logging()), + ], ) def delete_dag_asset_queued_event( dag_id: str, diff --git a/airflow/api_fastapi/core_api/routes/public/dag_run.py b/airflow/api_fastapi/core_api/routes/public/dag_run.py index fe278b5972666..8703fe42e423a 100644 --- a/airflow/api_fastapi/core_api/routes/public/dag_run.py +++ b/airflow/api_fastapi/core_api/routes/public/dag_run.py @@ -59,6 +59,7 @@ TaskInstanceResponse, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.api_fastapi.core_api.security import requires_access_asset from airflow.api_fastapi.logging.decorators import action_logging from airflow.exceptions import ParamValidationError from airflow.listeners.listener import get_listener_manager @@ -189,6 +190,7 @@ def patch_dag_run( status.HTTP_404_NOT_FOUND, ] ), + dependencies=[Depends(requires_access_asset(method="GET"))], ) def get_upstream_asset_events( dag_id: str, dag_run_id: str, session: SessionDep diff --git a/airflow/api_fastapi/core_api/security.py b/airflow/api_fastapi/core_api/security.py index 1b000afa72cf1..0c9be4be64667 100644 --- a/airflow/api_fastapi/core_api/security.py +++ b/airflow/api_fastapi/core_api/security.py @@ -26,6 +26,7 @@ from airflow.api_fastapi.app import get_auth_manager from airflow.auth.managers.models.base_user import BaseUser from airflow.auth.managers.models.resource_details import ( + AssetDetails, ConnectionDetails, DagAccessEntity, DagDetails, @@ -138,6 +139,22 @@ def inner( return inner +def requires_access_asset(method: ResourceMethod) -> Callable: + def inner( + request: Request, + user: Annotated[BaseUser | None, Depends(get_user)] = None, + ) -> None: + asset_id = request.path_params.get("asset_id") + + _requires_access( + is_authorized_callback=lambda: get_auth_manager().is_authorized_asset( + method=method, details=AssetDetails(id=asset_id), user=user + ), + ) + + return inner + + def _requires_access( *, is_authorized_callback: Callable[[], bool], diff --git a/airflow/auth/managers/models/resource_details.py b/airflow/auth/managers/models/resource_details.py index 6dec2236bf233..39d4a8a1450c2 100644 --- a/airflow/auth/managers/models/resource_details.py +++ b/airflow/auth/managers/models/resource_details.py @@ -46,7 +46,7 @@ class DagDetails: class AssetDetails: """Represents the details of an asset.""" - uri: str | None = None + id: str | None = None @dataclass diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 35212e2885d2b..955bb41075246 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -4351,6 +4351,7 @@ export const useVariableServiceBulkVariables = < * @param data The data for the request. * @param data.assetId * @param data.before + * @param data.dagId * @returns void Successful Response * @throws ApiError */ @@ -4366,6 +4367,7 @@ export const useAssetServiceDeleteAssetQueuedEvents = < { assetId: number; before?: string; + dagId?: string; }, TContext >, @@ -4378,11 +4380,12 @@ export const useAssetServiceDeleteAssetQueuedEvents = < { assetId: number; before?: string; + dagId?: string; }, TContext >({ - mutationFn: ({ assetId, before }) => - AssetService.deleteAssetQueuedEvents({ assetId, before }) as unknown as Promise, + mutationFn: ({ assetId, before, dagId }) => + AssetService.deleteAssetQueuedEvents({ assetId, before, dagId }) as unknown as Promise, ...options, }); /** diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index eb24ccbb202cd..56b080ef68172 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -448,6 +448,7 @@ export class AssetService { * @param data The data for the request. * @param data.assetId * @param data.before + * @param data.dagId * @returns void Successful Response * @throws ApiError */ @@ -462,6 +463,7 @@ export class AssetService { }, query: { before: data.before, + dag_id: data.dagId, }, errors: { 401: "Unauthorized", diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index c4e321e4391f9..f3578ab98a9c8 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1678,6 +1678,7 @@ export type GetAssetQueuedEventsResponse = QueuedEventCollectionResponse; export type DeleteAssetQueuedEventsData = { assetId: number; before?: string | null; + dagId?: string | null; }; export type DeleteAssetQueuedEventsResponse = void; @@ -1690,14 +1691,14 @@ export type GetAssetResponse = AssetResponse; export type GetDagAssetQueuedEventsData = { before?: string | null; - dagId: string; + dagId: string | null; }; export type GetDagAssetQueuedEventsResponse = QueuedEventCollectionResponse; export type DeleteDagAssetQueuedEventsData = { before?: string | null; - dagId: string; + dagId: string | null; }; export type DeleteDagAssetQueuedEventsResponse = void; @@ -1705,7 +1706,7 @@ export type DeleteDagAssetQueuedEventsResponse = void; export type GetDagAssetQueuedEventData = { assetId: number; before?: string | null; - dagId: string; + dagId: string | null; }; export type GetDagAssetQueuedEventResponse = QueuedEventResponse; @@ -1713,7 +1714,7 @@ export type GetDagAssetQueuedEventResponse = QueuedEventResponse; export type DeleteDagAssetQueuedEventData = { assetId: number; before?: string | null; - dagId: string; + dagId: string | null; }; export type DeleteDagAssetQueuedEventResponse = void; diff --git a/newsfragments/47136.significant.rst b/newsfragments/47136.significant.rst new file mode 100644 index 0000000000000..d881cd789fa99 --- /dev/null +++ b/newsfragments/47136.significant.rst @@ -0,0 +1,13 @@ +``uri`` is replaced with ``id`` in ``AssetDetails`` . Hence, ``is_authorized_asset`` method needs to be updated in Auth Managers to use ``id`` instead of ``uri``. + + +* Types of change + + * [ ] Dag changes + * [ ] Config changes + * [ ] API changes + * [ ] CLI changes + * [ ] Behaviour changes + * [ ] Plugin changes + * [ ] Dependency changes + * [X] Code interface changes diff --git a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py index dfcb0e1c46878..4bc661d31669f 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/auth_manager/aws_auth_manager.py @@ -153,9 +153,9 @@ def is_authorized_dag( def is_authorized_asset( self, *, method: ResourceMethod, user: AwsAuthManagerUser, details: AssetDetails | None = None ) -> bool: - asset_uri = details.uri if details else None + asset_id = details.id if details else None return self.avp_facade.is_authorized( - method=method, entity_type=AvpEntities.ASSET, user=user, entity_id=asset_uri + method=method, entity_type=AvpEntities.ASSET, user=user, entity_id=asset_id ) def is_authorized_pool( diff --git a/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py b/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py index a1f213c2b7355..be3485d40c179 100644 --- a/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py +++ b/providers/amazon/tests/unit/amazon/aws/auth_manager/test_aws_auth_manager.py @@ -205,7 +205,7 @@ def test_is_authorized_dag( "details, user, expected_user, expected_entity_id", [ (None, mock, ANY, None), - (AssetDetails(uri="uri"), mock, mock, "uri"), + (AssetDetails(id="1"), mock, mock, "1"), ], ) @patch.object(AwsAuthManager, "avp_facade") diff --git a/tests/api_fastapi/conftest.py b/tests/api_fastapi/conftest.py index 6149168f0bd60..10de8e46f12a9 100644 --- a/tests/api_fastapi/conftest.py +++ b/tests/api_fastapi/conftest.py @@ -134,7 +134,6 @@ def make_dag_with_multiple_versions(dag_maker, configure_git_connection_for_dag_ """ dag_id = "dag_with_multiple_versions" - for version_number in range(1, 4): with dag_maker(dag_id) as dag: for task_number in range(version_number): diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index c8f148af12406..8ac1c08cbc32c 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -263,6 +263,14 @@ def test_should_respond_200(self, test_client, session): "total_entries": 2, } + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.get("/public/assets") + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.get("/public/assets") + assert response.status_code == 403 + def test_order_by_raises_400_for_invalid_attr(self, test_client, session): response = test_client.get("/public/assets?order_by=fake") @@ -594,6 +602,14 @@ def test_should_respond_200(self, test_client, session): "total_entries": 2, } + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.get("/public/assets/events") + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.get("/public/assets/events") + assert response.status_code == 403 + @pytest.mark.parametrize( "params, total_entries", [ @@ -787,6 +803,14 @@ def test_should_respond_200(self, test_client, session): "aliases": [], } + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.get("/public/assets/1") + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.get("/public/assets/1") + assert response.status_code == 403 + def test_should_respond_404(self, test_client): response = test_client.get("/public/assets/1") assert response.status_code == 404 @@ -862,6 +886,14 @@ def test_should_respond_200(self, test_client, session, create_dummy_dag): "total_entries": 1, } + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.get("/public/dags/random/assets/queuedEvents") + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.get("/public/dags/random/assets/queuedEvents") + assert response.status_code == 403 + def test_should_respond_404(self, test_client): dag_id = "not_exists" @@ -893,6 +925,14 @@ def test_should_respond_204(self, test_client, session, create_dummy_dag): assert len(adrqs) == 0 check_last_log(session, dag_id=dag_id, event="delete_dag_asset_queued_events", logical_date=None) + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.delete("/public/dags/random/assets/queuedEvents") + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.get("/public/dags/random/assets/queuedEvents") + assert response.status_code == 403 + def test_should_respond_404_invalid_dag(self, test_client): dag_id = "not_exists" @@ -941,6 +981,18 @@ def test_should_respond_200(self, test_client, session): } check_last_log(session, dag_id=None, event="create_asset_event", logical_date=None) + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.post( + "/public/assets/events", json={"asset_uri": "s3://bucket/key/1"} + ) + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.post( + "/public/assets/events", json={"asset_uri": "s3://bucket/key/1"} + ) + assert response.status_code == 403 + def test_invalid_attr_not_allowed(self, test_client, session): self.create_assets(session) event_invalid_payload = {"asset_uri": "s3://bucket/key/1", "extra": {"foo": "bar"}, "fake": {}} @@ -1015,6 +1067,14 @@ def test_should_respond_200(self, test_client): "note": None, } + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.post("/public/assets/2/materialize") + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.post("/public/assets/2/materialize") + assert response.status_code == 403 + def test_should_respond_409_on_multiple_dags(self, test_client): response = test_client.post("/public/assets/2/materialize") assert response.status_code == 409 @@ -1048,6 +1108,14 @@ def test_should_respond_200(self, test_client, session, create_dummy_dag): "total_entries": 1, } + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.get("/public/assets/1/queuedEvents") + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.get("/public/assets/1/queuedEvents") + assert response.status_code == 403 + def test_should_respond_404(self, test_client): response = test_client.get("/public/assets/1/queuedEvents") assert response.status_code == 404 @@ -1069,6 +1137,14 @@ def test_should_respond_204(self, test_client, session, create_dummy_dag): assert session.get(AssetDagRunQueue, (asset_id, dag_id)) is None check_last_log(session, dag_id=None, event="delete_asset_queued_events", logical_date=None) + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.delete("/public/assets/1/queuedEvents") + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.delete("/public/assets/1/queuedEvents") + assert response.status_code == 403 + def test_should_respond_404(self, test_client): response = test_client.delete("/public/assets/1/queuedEvents") assert response.status_code == 404 @@ -1095,6 +1171,14 @@ def test_delete_should_respond_204(self, test_client, session, create_dummy_dag) assert len(adrq) == 0 check_last_log(session, dag_id=dag_id, event="delete_dag_asset_queued_event", logical_date=None) + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.delete("/public/dags/random/assets/random/queuedEvents") + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.delete("/public/dags/random/assets/random/queuedEvents") + assert response.status_code == 403 + def test_should_respond_404(self, test_client): dag_id = "not_exists" asset_id = 1 diff --git a/tests/api_fastapi/core_api/routes/public/test_dag_run.py b/tests/api_fastapi/core_api/routes/public/test_dag_run.py index f12cab28468ff..f62bff0665ec6 100644 --- a/tests/api_fastapi/core_api/routes/public/test_dag_run.py +++ b/tests/api_fastapi/core_api/routes/public/test_dag_run.py @@ -1073,6 +1073,18 @@ def test_should_respond_200(self, test_client, dag_maker, session): } assert response.json() == expected_response + def test_should_respond_401(self, unauthenticated_test_client): + response = unauthenticated_test_client.get( + "/public/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents", + ) + assert response.status_code == 401 + + def test_should_respond_403(self, unauthorized_test_client): + response = unauthorized_test_client.get( + "/public/dags/TEST_DAG_ID/dagRuns/TEST_DAG_RUN_ID/upstreamAssetEvents" + ) + assert response.status_code == 403 + def test_should_respond_404(self, test_client): response = test_client.get( "public/dags/invalid-id/dagRuns/invalid-run-id/upstreamAssetEvents", @@ -1191,7 +1203,6 @@ def test_should_respond_200( if data_interval_end is not None: request_json["data_interval_end"] = data_interval_end request_json["logical_date"] = fixed_now - response = test_client.post( f"/public/dags/{DAG1_ID}/dagRuns", json=request_json,