From 37a7b6799727f8480dea808182b19ca5e7216b01 Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Tue, 2 Sep 2025 06:44:40 +0800 Subject: [PATCH 1/4] fix(k8s): Preserve task history during API rate limiting - Handle 429 errors in KubernetesExecutor task publishing retry logic - Detect orphaned tasks and record TaskInstanceHistory in failure handler - Add detailed logging for rate limiting scenarios --- .../src/airflow/models/taskinstance.py | 22 +++++++++++++++++++ .../executors/kubernetes_executor.py | 13 ++++++++++- 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 5e873096712d5..fd1259341dede 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1573,6 +1573,28 @@ def fetch_handle_failure_context( # about to retry so we record the task instance history. For other states, the task # instance was cleared and already recorded in the task instance history. ti.prepare_db_for_next_try(session) + elif ti.state is None and ti.start_date is not None and ti.end_date is None: + # If the task instance state is None but has a start_date without end_date, + # it likely means the task was running but became orphaned and its state was reset. + # This can happen during scheduler restarts when executors fail to adopt running tasks + # (e.g., due to Kubernetes API 429 errors). We should still record the task instance + # history to maintain complete log history for troubleshooting. + from airflow.models.taskinstancehistory import TaskInstanceHistory + + log.info( + "Recording task instance history for orphaned task %s that was previously running " + "(start_date: %s, state reset to None)", + ti.key, + ti.start_date, + ) + # Temporarily set state to RUNNING to trigger proper history recording + original_state = ti.state + ti.state = TaskInstanceState.RUNNING + try: + TaskInstanceHistory.record_ti(ti, session=session) + finally: + # Restore the original state + ti.state = original_state ti.state = State.UP_FOR_RETRY diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index efb018b5cbf80..50e2c9e1e6c6e 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -384,7 +384,7 @@ def sync(self) -> None: body = {"message": e.body} retries = self.task_publish_retries[key] - # In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries + # In case of exceeded quota, conflict errors, or rate limiting, requeue the task as per the task_publish_max_retries message = body.get("message", "") if ( (str(e.status) == "403" and "exceeded quota" in message) @@ -692,6 +692,17 @@ def adopt_launched_task( ) except ApiException as e: self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e) + + # Log detailed information for rate limiting errors (429) which can cause task history loss + if str(e.status) == "429": + self.log.warning( + "Kubernetes API rate limiting (429) prevented adoption of pod %s for task %s. " + "This may cause task history loss if the task was previously running. " + "Consider implementing rate limiting backoff or increasing API quota.", + pod.metadata.name, + ti_key, + ) + return del tis_to_flush_by_key[ti_key] From cbfbbf47916ec34f5ffcdba72f77c58a800aa3c9 Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Tue, 2 Sep 2025 21:06:02 +0800 Subject: [PATCH 2/4] fix(k8s): Update tests to reflect new 429 error retry behavior --- .../cncf/kubernetes/executors/test_kubernetes_executor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index a80a031f66d6a..8662cd7d16072 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -396,9 +396,9 @@ def setup_method(self) -> None: pytest.param( HTTPResponse(body="Too many requests, please try again later.", status=429), 1, - False, - State.FAILED, - id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1)", + True, + State.SUCCESS, + id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1) (retry succeeded)", ), pytest.param( HTTPResponse(body="", status=429), From 6389c9876b08ba9d207603889227f1c8aaafc7f8 Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Tue, 2 Sep 2025 22:41:15 +0800 Subject: [PATCH 3/4] fix: Record history for orphaned tasks during K8s executor failures Move orphaned task detection before end_date assignment to ensure TaskInstanceHistory is recorded for tasks that become detached during scheduler restarts due to Kubernetes API 429 errors. --- .../src/airflow/models/taskinstance.py | 51 +++++----- .../tests/unit/models/test_taskinstance.py | 92 +++++++++++++++++++ 2 files changed, 121 insertions(+), 22 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index fd1259341dede..f8d18c06ba37b 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1538,6 +1538,35 @@ def fetch_handle_failure_context( if not test_mode: ti.refresh_from_db(session) + # Check for orphaned task before setting end_date + if ( + ti.is_eligible_to_retry() + and ti.state is None + and ti.start_date is not None + and ti.end_date is None + ): + # If the task instance state is None but has a start_date without end_date, + # it likely means the task was running but became orphaned and its state was reset. + # This can happen during scheduler restarts when executors fail to adopt running tasks + # (e.g., due to Kubernetes API 429 errors). We should still record the task instance + # history to maintain complete log history for troubleshooting. + from airflow.models.taskinstancehistory import TaskInstanceHistory + + log.info( + "Recording task instance history for orphaned task %s that was previously running " + "(start_date: %s, state reset to None)", + ti.key, + ti.start_date, + ) + # Temporarily set state to RUNNING to trigger proper history recording + original_state = ti.state + ti.state = TaskInstanceState.RUNNING + try: + TaskInstanceHistory.record_ti(ti, session=session) + finally: + # Restore the original state + ti.state = original_state + ti.end_date = timezone.utcnow() ti.set_duration() @@ -1573,28 +1602,6 @@ def fetch_handle_failure_context( # about to retry so we record the task instance history. For other states, the task # instance was cleared and already recorded in the task instance history. ti.prepare_db_for_next_try(session) - elif ti.state is None and ti.start_date is not None and ti.end_date is None: - # If the task instance state is None but has a start_date without end_date, - # it likely means the task was running but became orphaned and its state was reset. - # This can happen during scheduler restarts when executors fail to adopt running tasks - # (e.g., due to Kubernetes API 429 errors). We should still record the task instance - # history to maintain complete log history for troubleshooting. - from airflow.models.taskinstancehistory import TaskInstanceHistory - - log.info( - "Recording task instance history for orphaned task %s that was previously running " - "(start_date: %s, state reset to None)", - ti.key, - ti.start_date, - ) - # Temporarily set state to RUNNING to trigger proper history recording - original_state = ti.state - ti.state = TaskInstanceState.RUNNING - try: - TaskInstanceHistory.record_ti(ti, session=session) - finally: - # Restore the original state - ti.state = original_state ti.state = State.UP_FOR_RETRY diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 08cc0712b22ce..072dff88f70e8 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -2505,6 +2505,98 @@ def test_handle_failure_task_undefined(self, create_task_instance): del ti.task ti.handle_failure("test ti.task undefined") + @patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti") + def test_fetch_handle_failure_context_orphaned_task_records_history( + self, mock_record_ti, dag_maker, session + ): + """ + Test that orphaned tasks (state=None, start_date!=None, end_date=None) get their history recorded. + This scenario occurs when tasks are running but become orphaned due to executor failures + (e.g., Kubernetes API 429 errors causing scheduler restarts and pod adoption failures). + """ + with dag_maker(dag_id="test_orphaned_task"): + task = EmptyOperator(task_id="orphaned_task", retries=2) # Allow 2 retries + + dr = dag_maker.create_dagrun() + ti = dr.get_task_instance(task.task_id, session=session) + ti.task = task + + # Simulate an orphaned task: state=None but has start_date (was running) and no end_date + start_time = timezone.utcnow() - datetime.timedelta(minutes=5) + ti.state = None # State was reset during scheduler restart + ti.start_date = start_time # Task had started previously + ti.end_date = None # Task was still running when it became orphaned + ti.try_number = 1 # First attempt + ti.max_tries = 3 # 1 original + 2 retries = 3 total attempts + + session.merge(ti) + session.commit() + + # Call fetch_handle_failure_context which should detect and handle orphaned tasks + failure_context = TaskInstance.fetch_handle_failure_context( + ti=ti, + error="Test orphaned task error", + test_mode=False, + session=session, + fail_fast=False, + ) + + # Verify that TaskInstanceHistory.record_ti was called for the orphaned task + mock_record_ti.assert_called_once() + call_args = mock_record_ti.call_args + recorded_ti = call_args[0][0] # First positional argument (ti) + + # Verify the correct TaskInstance was recorded + assert recorded_ti.task_id == ti.task_id + assert recorded_ti.dag_id == ti.dag_id + assert recorded_ti.run_id == ti.run_id + assert recorded_ti.start_date == start_time + + # Verify the task instance state is set to UP_FOR_RETRY after failure handling + assert ti.state == State.UP_FOR_RETRY + assert failure_context["ti"] == ti + + @patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti") + def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_history( + self, mock_record_ti, dag_maker, session + ): + """ + Test that tasks with state=None but no start_date do NOT trigger orphaned task history recording. + This ensures we only record history for tasks that were actually running. + """ + with dag_maker(dag_id="test_not_orphaned_task"): + task = EmptyOperator(task_id="not_orphaned_task", retries=2) + + dr = dag_maker.create_dagrun() + ti = dr.get_task_instance(task.task_id, session=session) + ti.task = task + + # Simulate a task that was never started: state=None and no start_date + ti.state = None + ti.start_date = None # Task never started + ti.end_date = None + ti.try_number = 1 + ti.max_tries = 3 # Allow retries + + session.merge(ti) + session.commit() + + # Call fetch_handle_failure_context + failure_context = TaskInstance.fetch_handle_failure_context( + ti=ti, + error="Test non-orphaned task error", + test_mode=False, + session=session, + fail_fast=False, + ) + + # Verify that TaskInstanceHistory.record_ti was NOT called + mock_record_ti.assert_not_called() + + # Verify the task instance state is set to UP_FOR_RETRY after failure handling + assert ti.state == State.UP_FOR_RETRY + assert failure_context["ti"] == ti + def test_handle_failure_fail_fast(self, dag_maker, session): start_date = timezone.datetime(2016, 6, 1) From d9b125d7c43447464c9798ff6f62476c21d1a7ea Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Thu, 23 Oct 2025 19:36:50 +0800 Subject: [PATCH 4/4] Remove CNCF provider part and fix CI Error --- .../tests/unit/models/test_taskinstance.py | 32 +++++++------------ .../executors/kubernetes_executor.py | 13 +------- .../executors/test_kubernetes_executor.py | 6 ++-- 3 files changed, 16 insertions(+), 35 deletions(-) diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 072dff88f70e8..ba308574b3f57 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -2511,11 +2511,10 @@ def test_fetch_handle_failure_context_orphaned_task_records_history( ): """ Test that orphaned tasks (state=None, start_date!=None, end_date=None) get their history recorded. - This scenario occurs when tasks are running but become orphaned due to executor failures - (e.g., Kubernetes API 429 errors causing scheduler restarts and pod adoption failures). + This scenario occurs when tasks are running but become orphaned due to executor failures. """ with dag_maker(dag_id="test_orphaned_task"): - task = EmptyOperator(task_id="orphaned_task", retries=2) # Allow 2 retries + task = EmptyOperator(task_id="orphaned_task", retries=2) dr = dag_maker.create_dagrun() ti = dr.get_task_instance(task.task_id, session=session) @@ -2523,16 +2522,15 @@ def test_fetch_handle_failure_context_orphaned_task_records_history( # Simulate an orphaned task: state=None but has start_date (was running) and no end_date start_time = timezone.utcnow() - datetime.timedelta(minutes=5) - ti.state = None # State was reset during scheduler restart - ti.start_date = start_time # Task had started previously - ti.end_date = None # Task was still running when it became orphaned - ti.try_number = 1 # First attempt - ti.max_tries = 3 # 1 original + 2 retries = 3 total attempts + ti.state = None + ti.start_date = start_time + ti.end_date = None + ti.try_number = 1 + ti.max_tries = 3 session.merge(ti) session.commit() - # Call fetch_handle_failure_context which should detect and handle orphaned tasks failure_context = TaskInstance.fetch_handle_failure_context( ti=ti, error="Test orphaned task error", @@ -2544,17 +2542,14 @@ def test_fetch_handle_failure_context_orphaned_task_records_history( # Verify that TaskInstanceHistory.record_ti was called for the orphaned task mock_record_ti.assert_called_once() call_args = mock_record_ti.call_args - recorded_ti = call_args[0][0] # First positional argument (ti) + recorded_ti = call_args[0][0] - # Verify the correct TaskInstance was recorded assert recorded_ti.task_id == ti.task_id assert recorded_ti.dag_id == ti.dag_id assert recorded_ti.run_id == ti.run_id assert recorded_ti.start_date == start_time - - # Verify the task instance state is set to UP_FOR_RETRY after failure handling assert ti.state == State.UP_FOR_RETRY - assert failure_context["ti"] == ti + assert failure_context == ti @patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti") def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_history( @@ -2573,10 +2568,10 @@ def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_histor # Simulate a task that was never started: state=None and no start_date ti.state = None - ti.start_date = None # Task never started + ti.start_date = None ti.end_date = None ti.try_number = 1 - ti.max_tries = 3 # Allow retries + ti.max_tries = 3 session.merge(ti) session.commit() @@ -2590,12 +2585,9 @@ def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_histor fail_fast=False, ) - # Verify that TaskInstanceHistory.record_ti was NOT called mock_record_ti.assert_not_called() - - # Verify the task instance state is set to UP_FOR_RETRY after failure handling assert ti.state == State.UP_FOR_RETRY - assert failure_context["ti"] == ti + assert failure_context == ti def test_handle_failure_fail_fast(self, dag_maker, session): start_date = timezone.datetime(2016, 6, 1) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 50e2c9e1e6c6e..efb018b5cbf80 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -384,7 +384,7 @@ def sync(self) -> None: body = {"message": e.body} retries = self.task_publish_retries[key] - # In case of exceeded quota, conflict errors, or rate limiting, requeue the task as per the task_publish_max_retries + # In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries message = body.get("message", "") if ( (str(e.status) == "403" and "exceeded quota" in message) @@ -692,17 +692,6 @@ def adopt_launched_task( ) except ApiException as e: self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e) - - # Log detailed information for rate limiting errors (429) which can cause task history loss - if str(e.status) == "429": - self.log.warning( - "Kubernetes API rate limiting (429) prevented adoption of pod %s for task %s. " - "This may cause task history loss if the task was previously running. " - "Consider implementing rate limiting backoff or increasing API quota.", - pod.metadata.name, - ti_key, - ) - return del tis_to_flush_by_key[ti_key] diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 8662cd7d16072..a80a031f66d6a 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -396,9 +396,9 @@ def setup_method(self) -> None: pytest.param( HTTPResponse(body="Too many requests, please try again later.", status=429), 1, - True, - State.SUCCESS, - id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1) (retry succeeded)", + False, + State.FAILED, + id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1)", ), pytest.param( HTTPResponse(body="", status=429),