diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 10c511e8bb4bf..cfc35d5ea7630 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2357,8 +2357,16 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int: reset_tis_message = [] for ti in to_reset: reset_tis_message.append(repr(ti)) + # If we reset a TI, it will be eligible to be scheduled again. + # This can cause the scheduler to increase the try_number on the TI. + # Record the current try to TaskInstanceHistory first so users have an audit trail for + # the attempt that was abandoned. + ti.prepare_db_for_next_try(session=session) + ti.state = None ti.queued_by_job_id = None + ti.external_executor_id = None + ti.clear_next_method_args() for ti in set(tis_to_adopt_or_reset) - set(to_reset): ti.queued_by_job_id = self.job.id diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 7f815ccea7353..39e0438343214 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -3955,6 +3955,8 @@ def test_adopt_or_reset_orphaned_tasks_nothing(self): list(sorted(State.adoptable_states)), ) def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, session): + from airflow.models.taskinstancehistory import TaskInstanceHistory + dag_id = "test_adopt_or_reset_adoptable_tasks_" + adoptable_state.name with dag_maker(dag_id=dag_id, schedule="@daily"): task_id = dag_id + "_task" @@ -3969,6 +3971,8 @@ def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, sessi ti = dr1.get_task_instances(session=session)[0] ti.state = adoptable_state ti.queued_by_job_id = old_job.id + old_ti_id = ti.id + old_try_number = ti.try_number session.merge(ti) session.merge(dr1) session.commit() @@ -3976,6 +3980,22 @@ def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, sessi num_reset_tis = self.job_runner.adopt_or_reset_orphaned_tasks(session=session) assert num_reset_tis == 1 + ti.refresh_from_db(session=session) + assert ti.id != old_ti_id + assert ( + session.scalar( + select(TaskInstanceHistory).where( + TaskInstanceHistory.dag_id == ti.dag_id, + TaskInstanceHistory.task_id == ti.task_id, + TaskInstanceHistory.run_id == ti.run_id, + TaskInstanceHistory.map_index == ti.map_index, + TaskInstanceHistory.try_number == old_try_number, + TaskInstanceHistory.task_instance_id == old_ti_id, + ) + ) + is not None + ) + def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self, dag_maker, session): dag_id = "test_reset_orphaned_tasks_external_triggered_dag" with dag_maker(dag_id=dag_id, schedule="@daily"):