Skip to content

Commit

Permalink
Avoid grouping task instance stats by try_number for dynamic mapped t…
Browse files Browse the repository at this point in the history
…asks (#44300)
  • Loading branch information
shahar1 authored Nov 24, 2024
1 parent 49c6067 commit 5e52bd2
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 2 deletions.
15 changes: 13 additions & 2 deletions airflow/www/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,10 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], session: Session) ->
TaskInstance.task_id,
TaskInstance.run_id,
TaskInstance.state,
TaskInstance.try_number,
case(
(TaskInstance.map_index == -1, TaskInstance.try_number),
else_=None,
).label("try_number"),
func.min(TaskInstanceNote.content).label("note"),
func.count(func.coalesce(TaskInstance.state, sqla.literal("no_status"))).label("state_count"),
func.min(TaskInstance.queued_dttm).label("queued_dttm"),
Expand All @@ -329,7 +332,15 @@ def dag_to_grid(dag: DagModel, dag_runs: Sequence[DagRun], session: Session) ->
TaskInstance.dag_id == dag.dag_id,
TaskInstance.run_id.in_([dag_run.run_id for dag_run in dag_runs]),
)
.group_by(TaskInstance.task_id, TaskInstance.run_id, TaskInstance.state, TaskInstance.try_number)
.group_by(
TaskInstance.task_id,
TaskInstance.run_id,
TaskInstance.state,
case(
(TaskInstance.map_index == -1, TaskInstance.try_number),
else_=None,
),
)
.order_by(TaskInstance.task_id, TaskInstance.run_id)
)

Expand Down
1 change: 1 addition & 0 deletions newsfragments/44300.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix stats of dynamic mapped tasks after automatic retries of failed tasks
44 changes: 44 additions & 0 deletions tests/www/views/test_views_grid.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,3 +520,47 @@ def test_next_run_assets_404(admin_client):
resp = admin_client.get("/object/next_run_assets/missingdag", follow_redirects=True)
assert resp.status_code == 404, resp.json
assert resp.json == {"error": "can't find dag missingdag"}


@pytest.mark.usefixtures("_freeze_time_for_dagruns")
def test_dynamic_mapped_task_with_retries(admin_client, dag_with_runs: list[DagRun], session):
"""
Test a DAG with a dynamic mapped task with retries
"""
run1, run2 = dag_with_runs

for ti in run1.task_instances:
ti.state = TaskInstanceState.SUCCESS
for ti in sorted(run2.task_instances, key=lambda ti: (ti.task_id, ti.map_index)):
if ti.task_id == "task1":
ti.state = TaskInstanceState.SUCCESS
elif ti.task_id == "group.mapped":
if ti.map_index == 0:
ti.state = TaskInstanceState.FAILED
ti.start_date = pendulum.DateTime(2021, 7, 1, 1, 0, 0, tzinfo=pendulum.UTC)
ti.end_date = pendulum.DateTime(2021, 7, 1, 1, 2, 3, tzinfo=pendulum.UTC)
elif ti.map_index == 1:
ti.try_number = 1
ti.state = TaskInstanceState.SUCCESS
ti.start_date = pendulum.DateTime(2021, 7, 1, 2, 3, 4, tzinfo=pendulum.UTC)
ti.end_date = None
elif ti.map_index == 2:
ti.try_number = 2
ti.state = TaskInstanceState.FAILED
ti.start_date = pendulum.DateTime(2021, 7, 1, 2, 3, 4, tzinfo=pendulum.UTC)
ti.end_date = None
elif ti.map_index == 3:
ti.try_number = 3
ti.state = TaskInstanceState.SUCCESS
ti.start_date = pendulum.DateTime(2021, 7, 1, 2, 3, 4, tzinfo=pendulum.UTC)
ti.end_date = None
session.flush()

resp = admin_client.get(f"/object/grid_data?dag_id={DAG_ID}", follow_redirects=True)

assert resp.status_code == 200, resp.json

assert resp.json["groups"]["children"][-1]["children"][-1]["instances"][-1]["mapped_states"] == {
"failed": 2,
"success": 2,
}

0 comments on commit 5e52bd2

Please sign in to comment.