diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index d047886061d76..906df0bff2792 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -373,12 +373,11 @@ def sync(self) -> None: except ApiException as e: body = json.loads(e.body) retries = self.task_publish_retries[key] - # In case of exceeded quota errors, requeue the task as per the task_publish_max_retries + # In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries if ( - str(e.status) == "403" - and "exceeded quota" in body["message"] - and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries) - ): + (str(e.status) == "403" and "exceeded quota" in body["message"]) + or (str(e.status) == "409" and "object has been modified" in body["message"]) + ) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries): self.log.warning( "[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s", self.task_publish_retries[key] + 1, diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 62616ea4bc348..28251abb64c77 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -373,6 +373,16 @@ def setup_method(self) -> None: State.FAILED, id="12345 fake-unhandled-reason (task_publish_max_retries=1) (retry failed)", ), + pytest.param( + HTTPResponse( + body='{"message": "the object has been modified; please apply your changes to the latest version and try again"}', + status=409, + ), + 1, + True, + State.SUCCESS, + id="409 conflict", + ), ], ) @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher")