diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 8522697c21c1f..2c794a709895f 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -2316,9 +2316,17 @@ def _visit_relevant_relatives_for_normal(task_ids: Iterable[str]) -> None: visited.update(partial_dag.task_dict) def _visit_relevant_relatives_for_mapped(mapped_tasks: Iterable[tuple[str, int]]) -> None: + from airflow.exceptions import NotMapped + for task_id, map_index in mapped_tasks: task = dag.get_task(task_id) - ti_count = get_mapped_ti_count(task, run_id, session=session) + try: + ti_count = get_mapped_ti_count(task, run_id, session=session) + except NotMapped: + # Task is not actually mapped (not a MappedOperator and not inside a mapped task group). + # Treat it as a normal task instead. + _visit_relevant_relatives_for_normal([task_id]) + continue # TODO (GH-52141): This should return scheduler operator types, but # currently get_flat_relatives is inherited from SDK DAGNode. relatives = cast("Iterable[Operator]", task.get_flat_relatives(upstream=direction == "upstream")) diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 3a5ff369e9bd0..ef8122de39e29 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -3239,6 +3239,31 @@ def g(v): assert result == expected +def test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, session): + """Test that specifying a non-mapped task as a tuple doesn't raise NotMapped exception.""" + # t1 -> t2 (non-mapped) -> t3 + with dag_maker(session=session) as dag: + t1 = EmptyOperator(task_id="t1") + t2 = EmptyOperator(task_id="t2") + t3 = EmptyOperator(task_id="t3") + t1 >> t2 >> t3 + + dr = dag_maker.create_dagrun(state="success") + + # Specifying t2 as a tuple (t2, 0) even though it's not mapped should not raise NotMapped + # It should treat t2 as a normal task and return its upstream t1 + result = find_relevant_relatives( + normal_tasks=[], + mapped_tasks=[("t2", 0)], + direction="upstream", + dag=dag, + run_id=dr.run_id, + session=session, + ) + # Should return t1 as the upstream of t2 + assert result == {"t1"} + + def test_when_dag_run_has_partition_then_asset_does(dag_maker, session): asset = Asset(name="hello") with dag_maker(dag_id="asset_event_tester", schedule=None) as dag: