diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index eb808fb6cbc4b..4b995c6675061 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -384,6 +384,17 @@ def ti_update_state( ) if previous_state != TaskInstanceState.RUNNING: + # In HA, it's possible to receive a "late" finish/state update after another + # component already moved the TI to a terminal state. Treat this as an idempotent no-op to avoid + # crashing the process. + if previous_state in set(TerminalTIState): + requested_state = getattr(ti_patch_payload.state, "value", ti_patch_payload.state) + log.info( + "Ignoring state update for already terminal task instance", + previous_state=previous_state, + requested_state=requested_state, + ) + return log.warning( "Cannot update Task Instance in invalid state", previous_state=previous_state, diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index eb8e7b22770af..12d7a5f5059b8 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2584,8 +2584,16 @@ def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int: reset_tis_message = [] for ti in to_reset: reset_tis_message.append(repr(ti)) + # If we reset a TI, it will be eligible to be scheduled again. + # This can cause the scheduler to increase the try_number on the TI. + # Record the current try to TaskInstanceHistory first so users have an audit trail for + # the attempt that was abandoned. + ti.prepare_db_for_next_try(session=session) + ti.state = None ti.queued_by_job_id = None + ti.external_executor_id = None + ti.clear_next_method_args() for ti in set(tis_to_adopt_or_reset) - set(to_reset): ti.queued_by_job_id = self.job.id diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index d9f4e1f42424f..3e70c789b572b 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -1353,8 +1353,8 @@ def test_ti_update_state_to_failed_table_check(self, client, session, create_tas assert ti.next_kwargs is None assert ti.duration == 3600.00 - def test_ti_update_state_not_running(self, client, session, create_task_instance): - """Test that a 409 error is returned when attempting to update a TI that is not in RUNNING state.""" + def test_ti_update_state_not_running_terminal_is_idempotent(self, client, session, create_task_instance): + """Test that updating a terminal TI is treated as an idempotent no-op.""" ti = create_task_instance( task_id="test_ti_update_state_not_running", state=State.SUCCESS, @@ -1368,17 +1368,40 @@ def test_ti_update_state_not_running(self, client, session, create_task_instance "end_date": DEFAULT_END_DATE.isoformat(), } + response = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload) + assert response.status_code == 204 + assert response.text == "" + + # Verify the task instance state hasn't changed + session.refresh(ti) + assert ti.state == State.SUCCESS + + def test_ti_update_state_not_running_non_terminal(self, client, session, create_task_instance): + """Test that a 409 error is returned when attempting to update a TI that is not in RUNNING state.""" + ti = create_task_instance( + task_id="test_ti_update_state_not_running_non_terminal", + state=State.QUEUED, + session=session, + start_date=DEFAULT_START_DATE, + ) + session.commit() + + payload = { + "state": "failed", + "end_date": DEFAULT_END_DATE.isoformat(), + } + response = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload) assert response.status_code == 409 assert response.json()["detail"] == { "reason": "invalid_state", "message": "TI was not in the running state so it cannot be updated", - "previous_state": State.SUCCESS, + "previous_state": State.QUEUED, } # Verify the task instance state hasn't changed session.refresh(ti) - assert ti.state == State.SUCCESS + assert ti.state == State.QUEUED def test_ti_update_state_to_failed_without_fail_fast(self, client, session, dag_maker): """Test that SerializedDAG is NOT loaded when fail_fast=False (default).""" diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index 31b3a3f2c8593..303ad5d1ed8a2 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -4060,6 +4060,8 @@ def test_adopt_or_reset_orphaned_tasks_nothing(self): list(sorted(State.adoptable_states)), ) def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, session): + from airflow.models.taskinstancehistory import TaskInstanceHistory + dag_id = "test_adopt_or_reset_adoptable_tasks_" + adoptable_state.name with dag_maker(dag_id=dag_id, schedule="@daily"): task_id = dag_id + "_task" @@ -4074,6 +4076,8 @@ def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, sessi ti = dr1.get_task_instances(session=session)[0] ti.state = adoptable_state ti.queued_by_job_id = old_job.id + old_ti_id = ti.id + old_try_number = ti.try_number session.merge(ti) session.merge(dr1) session.commit() @@ -4081,6 +4085,22 @@ def test_adopt_or_reset_resettable_tasks(self, dag_maker, adoptable_state, sessi num_reset_tis = self.job_runner.adopt_or_reset_orphaned_tasks(session=session) assert num_reset_tis == 1 + ti.refresh_from_db(session=session) + assert ti.id != old_ti_id + assert ( + session.scalar( + select(TaskInstanceHistory).where( + TaskInstanceHistory.dag_id == ti.dag_id, + TaskInstanceHistory.task_id == ti.task_id, + TaskInstanceHistory.run_id == ti.run_id, + TaskInstanceHistory.map_index == ti.map_index, + TaskInstanceHistory.try_number == old_try_number, + TaskInstanceHistory.task_instance_id == old_ti_id, + ) + ) + is not None + ) + def test_adopt_or_reset_orphaned_tasks_external_triggered_dag(self, dag_maker, session): dag_id = "test_reset_orphaned_tasks_external_triggered_dag" with dag_maker(dag_id=dag_id, schedule="@daily"):