Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/models/dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ def recalculate(self) -> _UnfinishedStates:
isinstance(d.reference, DeadlineReference.TYPES.DAGRUN)
for d in cast("list", dag.deadline)
):
Deadline.prune_deadlines(session=session, conditions={DagRun.run_id: self.run_id})
Deadline.prune_deadlines(session=session, conditions={DagRun.id: self.id})

# if *all tasks* are deadlocked, the run failed
elif unfinished.should_schedule and not are_runnable_tasks:
Expand Down
57 changes: 56 additions & 1 deletion airflow-core/tests/unit/models/test_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import pendulum
import pytest
from sqlalchemy import select
from sqlalchemy import exists, select
from sqlalchemy.orm import joinedload

from airflow import settings
Expand All @@ -36,6 +36,7 @@
from airflow.models.dag import DagModel, infer_automated_data_interval
from airflow.models.dag_version import DagVersion
from airflow.models.dagrun import DagRun, DagRunNote
from airflow.models.deadline import Deadline
from airflow.models.serialized_dag import SerializedDagModel
from airflow.models.taskinstance import TaskInstance, TaskInstanceNote, clear_task_instances
from airflow.models.taskmap import TaskMap
Expand Down Expand Up @@ -1292,6 +1293,60 @@ def on_success_callable(context):
# Callbacks are not added until handle_callback = False is passed to dag_run.update_state()
assert callback is None

def test_dagrun_success_deadline_prune(self, dag_maker, session):
"""Ensure only the deadline associated with dagrun marked as success is deleted."""
now = timezone.utcnow()
future_date = datetime.datetime.now() + datetime.timedelta(days=365)
initial_task_states = {
"test_state_succeeded1": TaskInstanceState.SUCCESS,
}

with dag_maker(
dag_id="dag_1",
schedule=datetime.timedelta(days=1),
deadline=DeadlineAlert(
reference=DeadlineReference.FIXED_DATETIME(future_date),
interval=datetime.timedelta(hours=1),
callback=AsyncCallback(empty_callback_for_deadline),
),
session=session,
) as dag1:
EmptyOperator(task_id="test_state_succeeded1")

dag_run1 = self.create_dag_run(
dag=dag1, session=session, logical_date=now, task_states=initial_task_states
)

with dag_maker(
dag_id="dag_2",
schedule=datetime.timedelta(days=1),
deadline=DeadlineAlert(
reference=DeadlineReference.FIXED_DATETIME(future_date),
interval=datetime.timedelta(hours=1),
callback=AsyncCallback(empty_callback_for_deadline),
),
session=session,
) as dag2:
EmptyOperator(task_id="test_state_succeeded1")

dag_run2 = self.create_dag_run(
dag=dag2, session=session, logical_date=now, task_states=initial_task_states
)

dag_run1_deadline = exists().where(Deadline.dagrun_id == dag_run1.id)
dag_run2_deadline = exists().where(Deadline.dagrun_id == dag_run2.id)

assert session.query(dag_run1_deadline).scalar()
assert session.query(dag_run2_deadline).scalar()

session.add(dag_run1)
dag_run1.update_state()

assert not session.query(dag_run1_deadline).scalar()
assert session.query(dag_run2_deadline).scalar()
assert dag_run1.state == DagRunState.SUCCESS
assert dag_run2.state == DagRunState.RUNNING


@pytest.mark.parametrize(
("run_type", "expected_tis"),
Expand Down