Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion airflow-core/src/airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2316,9 +2316,17 @@ def _visit_relevant_relatives_for_normal(task_ids: Iterable[str]) -> None:
visited.update(partial_dag.task_dict)

def _visit_relevant_relatives_for_mapped(mapped_tasks: Iterable[tuple[str, int]]) -> None:
from airflow.exceptions import NotMapped

for task_id, map_index in mapped_tasks:
task = dag.get_task(task_id)
ti_count = get_mapped_ti_count(task, run_id, session=session)
try:
ti_count = get_mapped_ti_count(task, run_id, session=session)
except NotMapped:
# Task is not actually mapped (not a MappedOperator and not inside a mapped task group).
# Treat it as a normal task instead.
_visit_relevant_relatives_for_normal([task_id])
continue
# TODO (GH-52141): This should return scheduler operator types, but
# currently get_flat_relatives is inherited from SDK DAGNode.
relatives = cast("Iterable[Operator]", task.get_flat_relatives(upstream=direction == "upstream"))
Expand Down
25 changes: 25 additions & 0 deletions airflow-core/tests/unit/models/test_taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -3239,6 +3239,31 @@ def g(v):
assert result == expected


def test_find_relevant_relatives_with_non_mapped_task_as_tuple(dag_maker, session):
"""Test that specifying a non-mapped task as a tuple doesn't raise NotMapped exception."""
# t1 -> t2 (non-mapped) -> t3
with dag_maker(session=session) as dag:
t1 = EmptyOperator(task_id="t1")
t2 = EmptyOperator(task_id="t2")
t3 = EmptyOperator(task_id="t3")
t1 >> t2 >> t3

dr = dag_maker.create_dagrun(state="success")

# Specifying t2 as a tuple (t2, 0) even though it's not mapped should not raise NotMapped
# It should treat t2 as a normal task and return its upstream t1
result = find_relevant_relatives(
normal_tasks=[],
mapped_tasks=[("t2", 0)],
direction="upstream",
dag=dag,
run_id=dr.run_id,
session=session,
)
# Should return t1 as the upstream of t2
assert result == {"t1"}


def test_when_dag_run_has_partition_then_asset_does(dag_maker, session):
asset = Asset(name="hello")
with dag_maker(dag_id="asset_event_tester", schedule=None) as dag:
Expand Down