diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 5e873096712d5..f8d18c06ba37b 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1538,6 +1538,35 @@ def fetch_handle_failure_context( if not test_mode: ti.refresh_from_db(session) + # Check for orphaned task before setting end_date + if ( + ti.is_eligible_to_retry() + and ti.state is None + and ti.start_date is not None + and ti.end_date is None + ): + # If the task instance state is None but has a start_date without end_date, + # it likely means the task was running but became orphaned and its state was reset. + # This can happen during scheduler restarts when executors fail to adopt running tasks + # (e.g., due to Kubernetes API 429 errors). We should still record the task instance + # history to maintain complete log history for troubleshooting. + from airflow.models.taskinstancehistory import TaskInstanceHistory + + log.info( + "Recording task instance history for orphaned task %s that was previously running " + "(start_date: %s, state reset to None)", + ti.key, + ti.start_date, + ) + # Temporarily set state to RUNNING to trigger proper history recording + original_state = ti.state + ti.state = TaskInstanceState.RUNNING + try: + TaskInstanceHistory.record_ti(ti, session=session) + finally: + # Restore the original state + ti.state = original_state + ti.end_date = timezone.utcnow() ti.set_duration() diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 08cc0712b22ce..ba308574b3f57 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -2505,6 +2505,90 @@ def test_handle_failure_task_undefined(self, create_task_instance): del ti.task ti.handle_failure("test ti.task undefined") + @patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti") + def test_fetch_handle_failure_context_orphaned_task_records_history( + self, mock_record_ti, dag_maker, session + ): + """ + Test that orphaned tasks (state=None, start_date!=None, end_date=None) get their history recorded. + This scenario occurs when tasks are running but become orphaned due to executor failures. + """ + with dag_maker(dag_id="test_orphaned_task"): + task = EmptyOperator(task_id="orphaned_task", retries=2) + + dr = dag_maker.create_dagrun() + ti = dr.get_task_instance(task.task_id, session=session) + ti.task = task + + # Simulate an orphaned task: state=None but has start_date (was running) and no end_date + start_time = timezone.utcnow() - datetime.timedelta(minutes=5) + ti.state = None + ti.start_date = start_time + ti.end_date = None + ti.try_number = 1 + ti.max_tries = 3 + + session.merge(ti) + session.commit() + + failure_context = TaskInstance.fetch_handle_failure_context( + ti=ti, + error="Test orphaned task error", + test_mode=False, + session=session, + fail_fast=False, + ) + + # Verify that TaskInstanceHistory.record_ti was called for the orphaned task + mock_record_ti.assert_called_once() + call_args = mock_record_ti.call_args + recorded_ti = call_args[0][0] + + assert recorded_ti.task_id == ti.task_id + assert recorded_ti.dag_id == ti.dag_id + assert recorded_ti.run_id == ti.run_id + assert recorded_ti.start_date == start_time + assert ti.state == State.UP_FOR_RETRY + assert failure_context == ti + + @patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti") + def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_history( + self, mock_record_ti, dag_maker, session + ): + """ + Test that tasks with state=None but no start_date do NOT trigger orphaned task history recording. + This ensures we only record history for tasks that were actually running. + """ + with dag_maker(dag_id="test_not_orphaned_task"): + task = EmptyOperator(task_id="not_orphaned_task", retries=2) + + dr = dag_maker.create_dagrun() + ti = dr.get_task_instance(task.task_id, session=session) + ti.task = task + + # Simulate a task that was never started: state=None and no start_date + ti.state = None + ti.start_date = None + ti.end_date = None + ti.try_number = 1 + ti.max_tries = 3 + + session.merge(ti) + session.commit() + + # Call fetch_handle_failure_context + failure_context = TaskInstance.fetch_handle_failure_context( + ti=ti, + error="Test non-orphaned task error", + test_mode=False, + session=session, + fail_fast=False, + ) + + mock_record_ti.assert_not_called() + assert ti.state == State.UP_FOR_RETRY + assert failure_context == ti + def test_handle_failure_fail_fast(self, dag_maker, session): start_date = timezone.datetime(2016, 6, 1)