diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 130d37b546c3e..4e169ab5d890a 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -380,11 +380,12 @@ def sync(self) -> None: body = {"message": e.body} retries = self.task_publish_retries[key] - # In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries + # In case of exceeded quota, conflict errors, or rate limiting, requeue the task as per the task_publish_max_retries message = body.get("message", "") if ( (str(e.status) == "403" and "exceeded quota" in message) or (str(e.status) == "409" and "object has been modified" in message) + or str(e.status) == "429" # Add support for rate limiting errors ) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries): self.log.warning( "[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s", @@ -682,6 +683,17 @@ def adopt_launched_task( ) except ApiException as e: self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e) + + # Log detailed information for rate limiting errors (429) which can cause task history loss + if str(e.status) == "429": + self.log.warning( + "Kubernetes API rate limiting (429) prevented adoption of pod %s for task %s. " + "This may cause task history loss if the task was previously running. " + "Consider implementing rate limiting backoff or increasing API quota.", + pod.metadata.name, + ti_key, + ) + return del tis_to_flush_by_key[ti_key] diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 7fb6f790d8792..cf0f7e5130c3d 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -396,9 +396,9 @@ def setup_method(self) -> None: pytest.param( HTTPResponse(body="Too many requests, please try again later.", status=429), 1, - False, - State.FAILED, - id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1)", + True, + State.SUCCESS, + id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1) (retry succeeded)", ), pytest.param( HTTPResponse(body="", status=429), @@ -407,6 +407,13 @@ def setup_method(self) -> None: State.FAILED, id="429 Too Many Requests (empty body)", ), + pytest.param( + HTTPResponse(body="", status=429), + 1, + True, + State.SUCCESS, + id="429 Too Many Requests (empty body) (task_publish_max_retries=1) (retry succeeded)", + ), ], ) @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")