diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py index 05fb79bd0bf29..25c8134203b4d 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/structure.py @@ -26,7 +26,10 @@ from airflow.api_fastapi.core_api.datamodels.ui.structure import StructureDataResponse from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.security import requires_access_dag -from airflow.api_fastapi.core_api.services.ui.structure import get_upstream_assets +from airflow.api_fastapi.core_api.services.ui.structure import ( + bind_output_assets_to_tasks, + get_upstream_assets, +) from airflow.models.dag_version import DagVersion from airflow.models.serialized_dag import SerializedDagModel from airflow.utils.dag_edges import dag_edges @@ -139,4 +142,6 @@ def structure_data( data["edges"] += start_edges + edges + end_edges + bind_output_assets_to_tasks(data["edges"], serialized_dag) + return StructureDataResponse(**data) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py index 128dc93b7706d..6f5f415d3fdb7 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/services/ui/structure.py @@ -23,6 +23,8 @@ from __future__ import annotations +from airflow.models.serialized_dag import SerializedDagModel + def get_upstream_assets( asset_expression: dict, entry_node_ref: str, level: int = 0 @@ -112,3 +114,32 @@ def get_upstream_assets( edges = edges + e return nodes, edges + + +def bind_output_assets_to_tasks(edges: list[dict], serialized_dag: SerializedDagModel) -> None: + """ + Try to bind the downstream assets to the relevant task that produces them. + + This function will mutate the `edges` in place. + """ + outlet_asset_references = serialized_dag.dag_model.task_outlet_asset_references + + downstream_asset_related_edges = [edge for edge in edges if edge["target_id"].startswith("asset:")] + + for edge in downstream_asset_related_edges: + asset_id = int(edge["target_id"].strip("asset:")) + try: + # Try to attach the outlet asset to the relevant task + outlet_asset_reference = next( + outlet_asset_reference + for outlet_asset_reference in outlet_asset_references + if outlet_asset_reference.asset_id == asset_id + ) + edge["source_id"] = outlet_asset_reference.task_id + continue + except StopIteration: + # If no asset reference found, fallback to using the exit node reference + # This can happen because asset aliases are not yet handled, they do no populate + # the `outlet_asset_references` when resolved. Extra lookup is needed. Same for asset-name-ref and + # asset-uri-ref. + pass diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py index 928c0b2cc8490..035721a1dcbe4 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py @@ -369,7 +369,7 @@ def test_should_return_200_with_asset(self, test_client, asset1_id, asset2_id, a { "is_setup_teardown": None, "label": None, - "source_id": "task_2", + "source_id": "task_1", "target_id": f"asset:{asset3_id}", "is_source_asset": None, },