From ff728a4419aadfb4303faef211b45edf5ee7b23e Mon Sep 17 00:00:00 2001 From: rom-impala Date: Thu, 21 Aug 2025 14:52:36 +0300 Subject: [PATCH 1/4] Retry on 409 conflict --- .../cncf/kubernetes/executors/kubernetes_executor.py | 4 ++-- .../cncf/kubernetes/executors/test_kubernetes_executor.py | 7 +++++++ 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index d047886061d76..77d83c9ff1d52 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -375,8 +375,8 @@ def sync(self) -> None: retries = self.task_publish_retries[key] # In case of exceeded quota errors, requeue the task as per the task_publish_max_retries if ( - str(e.status) == "403" - and "exceeded quota" in body["message"] + (str(e.status) == "403" and "exceeded quota" in body["message"]) + or (str(e.status) == "409" and "object has been modified" in body["message"]) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries) ): self.log.warning( diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 62616ea4bc348..e09b1063f04c1 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -373,6 +373,13 @@ def setup_method(self) -> None: State.FAILED, id="12345 fake-unhandled-reason (task_publish_max_retries=1) (retry failed)", ), + pytest.param( + HTTPResponse(body='{"message": "the object has been modified; please apply your changes to the latest version and try again"}', status=409), + 1, + True, + State.SUCCESS, + id="409 conflict", + ), ], ) @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") From fabe791d29b06466572def6d969b7a9f88be3e9b Mon Sep 17 00:00:00 2001 From: rom-impala Date: Thu, 21 Aug 2025 15:37:55 +0300 Subject: [PATCH 2/4] Change comment --- .../providers/cncf/kubernetes/executors/kubernetes_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 77d83c9ff1d52..1cefb24d30708 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -373,7 +373,7 @@ def sync(self) -> None: except ApiException as e: body = json.loads(e.body) retries = self.task_publish_retries[key] - # In case of exceeded quota errors, requeue the task as per the task_publish_max_retries + # In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries if ( (str(e.status) == "403" and "exceeded quota" in body["message"]) or (str(e.status) == "409" and "object has been modified" in body["message"]) From cae79182bb1565c0595a8286a2b42f76dcd14a6c Mon Sep 17 00:00:00 2001 From: romsharon98 Date: Sat, 23 Aug 2025 18:27:37 +0300 Subject: [PATCH 3/4] linting --- .../cncf/kubernetes/executors/test_kubernetes_executor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index e09b1063f04c1..28251abb64c77 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -374,7 +374,10 @@ def setup_method(self) -> None: id="12345 fake-unhandled-reason (task_publish_max_retries=1) (retry failed)", ), pytest.param( - HTTPResponse(body='{"message": "the object has been modified; please apply your changes to the latest version and try again"}', status=409), + HTTPResponse( + body='{"message": "the object has been modified; please apply your changes to the latest version and try again"}', + status=409, + ), 1, True, State.SUCCESS, From 3b378603cf0bbfc2eaa3b25178e171d62ec5d55e Mon Sep 17 00:00:00 2001 From: romsharon98 Date: Sat, 23 Aug 2025 18:32:29 +0300 Subject: [PATCH 4/4] change or and hirarchy --- .../providers/cncf/kubernetes/executors/kubernetes_executor.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 1cefb24d30708..906df0bff2792 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -377,8 +377,7 @@ def sync(self) -> None: if ( (str(e.status) == "403" and "exceeded quota" in body["message"]) or (str(e.status) == "409" and "object has been modified" in body["message"]) - and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries) - ): + ) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries): self.log.warning( "[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s", self.task_publish_retries[key] + 1,