From dadea2fc7313a8ddb76694192c542006f094c0cd Mon Sep 17 00:00:00 2001 From: Isaiah Iruoha Date: Wed, 20 Aug 2025 17:00:36 -0700 Subject: [PATCH 1/2] Fix Graph View for DAGs with mapped operators --- airflow-core/src/airflow/utils/dag_edges.py | 4 ++-- task-sdk/src/airflow/sdk/definitions/taskgroup.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/utils/dag_edges.py b/airflow-core/src/airflow/utils/dag_edges.py index 878d52aeb8ca8..f8ed846f25532 100644 --- a/airflow-core/src/airflow/utils/dag_edges.py +++ b/airflow-core/src/airflow/utils/dag_edges.py @@ -18,13 +18,13 @@ from typing import TYPE_CHECKING, TypeAlias, cast +from airflow.models.mappedoperator import MappedOperator from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator from airflow.serialization.serialized_objects import SerializedBaseOperator if TYPE_CHECKING: from collections.abc import Iterable - from airflow.models.mappedoperator import MappedOperator from airflow.sdk import DAG Operator: TypeAlias = MappedOperator | SerializedBaseOperator @@ -65,7 +65,7 @@ def dag_edges(dag: DAG): def collect_edges(task_group): """Update edges_to_add and edges_to_skip according to TaskGroups.""" - if isinstance(task_group, (AbstractOperator, SerializedBaseOperator)): + if isinstance(task_group, (AbstractOperator, SerializedBaseOperator, MappedOperator)): return for target_id in task_group.downstream_group_ids: diff --git a/task-sdk/src/airflow/sdk/definitions/taskgroup.py b/task-sdk/src/airflow/sdk/definitions/taskgroup.py index 7d5e9006cbdc1..ac4816b4dcf14 100644 --- a/task-sdk/src/airflow/sdk/definitions/taskgroup.py +++ b/task-sdk/src/airflow/sdk/definitions/taskgroup.py @@ -686,11 +686,11 @@ def get_task_group_children_getter() -> Callable: def task_group_to_dict(task_item_or_group, parent_group_is_mapped=False): """Create a nested dict representation of this TaskGroup and its children used to construct the Graph.""" + from airflow.models.mappedoperator import MappedOperator from airflow.sdk.definitions._internal.abstractoperator import AbstractOperator - from airflow.sdk.definitions.mappedoperator import MappedOperator from airflow.serialization.serialized_objects import SerializedBaseOperator - if isinstance(task := task_item_or_group, (AbstractOperator, SerializedBaseOperator)): + if isinstance(task := task_item_or_group, (AbstractOperator, SerializedBaseOperator, MappedOperator)): node_operator = { "id": task.task_id, "label": task.label, From db3a4781bbe0f3b1d672959c6c26ca92f0ca1851 Mon Sep 17 00:00:00 2001 From: Isaiah Iruoha Date: Thu, 21 Aug 2025 13:41:25 -0700 Subject: [PATCH 2/2] Add unit tests to prevent regression --- .../core_api/routes/ui/test_structure.py | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py index ba9a7bd047643..ea58e610813bf 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_structure.py @@ -654,3 +654,71 @@ def test_should_return_404_when_dag_version_not_found(self, test_client): response.json()["detail"] == "Dag with id dag_with_multiple_versions and version number 999 was not found" ) + + def test_mapped_operator_graph_view(self, dag_maker, test_client, session): + """ + Ensures structure_data endpoint handles MappedOperator without AttributeError. + """ + from airflow.providers.standard.operators.bash import BashOperator + + with dag_maker( + dag_id="test_mapped_operator_dag", + serialized=True, + session=session, + start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), + ): + task1 = EmptyOperator(task_id="task1") + mapped_task = BashOperator.partial( + task_id="mapped_bash_task", + do_xcom_push=False, + ).expand(bash_command=["echo 1", "echo 2", "echo 3"]) + task2 = EmptyOperator(task_id="task2") + + task1 >> mapped_task >> task2 + + dag_maker.sync_dagbag_to_db() + response = test_client.get("/structure/structure_data", params={"dag_id": "test_mapped_operator_dag"}) + assert response.status_code == 200 + data = response.json() + + mapped_node = next(node for node in data["nodes"] if node["id"] == "mapped_bash_task") + assert mapped_node["is_mapped"] is True + assert mapped_node["operator"] == "BashOperator" + assert len(data["edges"]) == 2 + + def test_mapped_operator_in_task_group(self, dag_maker, test_client, session): + """ + Test that mapped operators within task groups are handled correctly. + Specifically tests task_group_to_dict function with MappedOperator instances. + """ + from airflow.providers.standard.operators.python import PythonOperator + from airflow.sdk.definitions.taskgroup import TaskGroup + + with dag_maker( + dag_id="test_mapped_in_group_dag", + serialized=True, + session=session, + start_date=pendulum.DateTime(2023, 2, 1, 0, 0, 0, tzinfo=pendulum.UTC), + ): + with TaskGroup(group_id="processing_group"): + prep = EmptyOperator(task_id="prep") + mapped = PythonOperator.partial( + task_id="process", + python_callable=lambda x: print(f"Processing {x}"), + ).expand(op_args=[[1], [2], [3], [4]]) + + prep >> mapped + + dag_maker.sync_dagbag_to_db() + response = test_client.get("/structure/structure_data", params={"dag_id": "test_mapped_in_group_dag"}) + + assert response.status_code == 200 + data = response.json() + group_node = next(node for node in data["nodes"] if node["id"] == "processing_group") + assert group_node["children"] is not None + + mapped_in_group = next( + child for child in group_node["children"] if child["id"] == "processing_group.process" + ) + assert mapped_in_group["is_mapped"] is True + assert mapped_in_group["operator"] == "PythonOperator"