diff --git a/airflow-core/src/airflow/models/dagrun.py b/airflow-core/src/airflow/models/dagrun.py index 562fdc5e9fc82..39fad84be6a70 100644 --- a/airflow-core/src/airflow/models/dagrun.py +++ b/airflow-core/src/airflow/models/dagrun.py @@ -1228,7 +1228,7 @@ def recalculate(self) -> _UnfinishedStates: isinstance(d.reference, DeadlineReference.TYPES.DAGRUN) for d in cast("list", dag.deadline) ): - Deadline.prune_deadlines(session=session, conditions={DagRun.run_id: self.run_id}) + Deadline.prune_deadlines(session=session, conditions={DagRun.id: self.id}) # if *all tasks* are deadlocked, the run failed elif unfinished.should_schedule and not are_runnable_tasks: diff --git a/airflow-core/tests/unit/models/test_dagrun.py b/airflow-core/tests/unit/models/test_dagrun.py index 74cf038dcec4d..a3192bc409893 100644 --- a/airflow-core/tests/unit/models/test_dagrun.py +++ b/airflow-core/tests/unit/models/test_dagrun.py @@ -27,7 +27,7 @@ import pendulum import pytest -from sqlalchemy import select +from sqlalchemy import exists, select from sqlalchemy.orm import joinedload from airflow import settings @@ -36,6 +36,7 @@ from airflow.models.dag import DagModel, infer_automated_data_interval from airflow.models.dag_version import DagVersion from airflow.models.dagrun import DagRun, DagRunNote +from airflow.models.deadline import Deadline from airflow.models.serialized_dag import SerializedDagModel from airflow.models.taskinstance import TaskInstance, TaskInstanceNote, clear_task_instances from airflow.models.taskmap import TaskMap @@ -1292,6 +1293,60 @@ def on_success_callable(context): # Callbacks are not added until handle_callback = False is passed to dag_run.update_state() assert callback is None + def test_dagrun_success_deadline_prune(self, dag_maker, session): + """Ensure only the deadline associated with dagrun marked as success is deleted.""" + now = timezone.utcnow() + future_date = datetime.datetime.now() + datetime.timedelta(days=365) + initial_task_states = { + "test_state_succeeded1": TaskInstanceState.SUCCESS, + } + + with dag_maker( + dag_id="dag_1", + schedule=datetime.timedelta(days=1), + deadline=DeadlineAlert( + reference=DeadlineReference.FIXED_DATETIME(future_date), + interval=datetime.timedelta(hours=1), + callback=AsyncCallback(empty_callback_for_deadline), + ), + session=session, + ) as dag1: + EmptyOperator(task_id="test_state_succeeded1") + + dag_run1 = self.create_dag_run( + dag=dag1, session=session, logical_date=now, task_states=initial_task_states + ) + + with dag_maker( + dag_id="dag_2", + schedule=datetime.timedelta(days=1), + deadline=DeadlineAlert( + reference=DeadlineReference.FIXED_DATETIME(future_date), + interval=datetime.timedelta(hours=1), + callback=AsyncCallback(empty_callback_for_deadline), + ), + session=session, + ) as dag2: + EmptyOperator(task_id="test_state_succeeded1") + + dag_run2 = self.create_dag_run( + dag=dag2, session=session, logical_date=now, task_states=initial_task_states + ) + + dag_run1_deadline = exists().where(Deadline.dagrun_id == dag_run1.id) + dag_run2_deadline = exists().where(Deadline.dagrun_id == dag_run2.id) + + assert session.query(dag_run1_deadline).scalar() + assert session.query(dag_run2_deadline).scalar() + + session.add(dag_run1) + dag_run1.update_state() + + assert not session.query(dag_run1_deadline).scalar() + assert session.query(dag_run2_deadline).scalar() + assert dag_run1.state == DagRunState.SUCCESS + assert dag_run2.state == DagRunState.RUNNING + @pytest.mark.parametrize( ("run_type", "expected_tis"),