diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 06597a24fc93d..9caeb8579ed77 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -294,6 +294,7 @@ def clear_task_instances( dr.last_scheduling_decision = None dr.start_date = None dr.clear_number += 1 + dr.queued_at = timezone.utcnow() session.flush() diff --git a/airflow-core/tests/unit/models/test_cleartasks.py b/airflow-core/tests/unit/models/test_cleartasks.py index 9a1f37c89ca21..421e962515f8d 100644 --- a/airflow-core/tests/unit/models/test_cleartasks.py +++ b/airflow-core/tests/unit/models/test_cleartasks.py @@ -276,6 +276,7 @@ def test_clear_task_instances_on_finished_dr(self, state, last_scheduling, dag_m ti1.state = TaskInstanceState.SUCCESS session = dag_maker.session session.flush() + original_queued_at = dr.queued_at # we use order_by(task_id) here because for the test DAG structure of ours # this is equivalent to topological sort. It would not work in general case @@ -291,6 +292,10 @@ def test_clear_task_instances_on_finished_dr(self, state, last_scheduling, dag_m assert dr.start_date is None assert dr.last_scheduling_decision is None + # The initial finished run has queued_at=None, clearing should populate it. + assert original_queued_at is None + assert dr.queued_at is not None + @pytest.mark.parametrize("delete_tasks", [True, False]) def test_clear_task_instances_maybe_task_removed(self, delete_tasks, dag_maker, session): """This verifies the behavior of clear_task_instances re task removal.