From ab9c7679e2e7478019594c1c5c9ff395a11cea0a Mon Sep 17 00:00:00 2001 From: Natanel Rudyuklakir Date: Mon, 15 Sep 2025 22:24:04 +0300 Subject: [PATCH 1/3] fixed kubernetes executor open_slots issue --- .../kubernetes/executors/kubernetes_executor.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 130d37b546c3e..33a023c359fdd 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -165,6 +165,7 @@ def __init__(self): self.task_publish_max_retries = conf.getint( "kubernetes_executor", "task_publish_max_retries", fallback=0 ) + self.completed: set[KubernetesResults] = set() super().__init__(parallelism=self.kube_config.parallelism) def _list_pods(self, query_kwargs): @@ -343,6 +344,9 @@ def sync(self) -> None: finally: self.result_queue.task_done() + for result in self.completed: + self._change_state(result) + from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ResourceVersion resource_instance = ResourceVersion() @@ -720,7 +724,16 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: continue ti_id = annotations_to_key(pod.metadata.annotations) - self.running.add(ti_id) + self.completed.add( + KubernetesResults( + key=ti_id, + state="completed", + pod_name=pod.metadata.name, + namespace=pod.metadata.namespace, + resource_version=pod.metadata.resource_version, + failure_details=None, + ) + ) def _flush_task_queue(self) -> None: if TYPE_CHECKING: From f714178eeab1188ea860f88e15a8741ef800a451 Mon Sep 17 00:00:00 2001 From: Natanel Rudyuklakir Date: Wed, 17 Sep 2025 21:35:19 +0300 Subject: [PATCH 2/3] fixed tests --- .../executors/test_kubernetes_executor.py | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 7fb6f790d8792..53f149cedc050 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -92,9 +92,9 @@ def _cases(self): ("影師嗎", "中華民國;$"), ] - cases.extend( - [(self._gen_random_string(seed, 200), self._gen_random_string(seed, 200)) for seed in range(100)] - ) + cases.extend([ + (self._gen_random_string(seed, 200), self._gen_random_string(seed, 200)) for seed in range(100) + ]) return cases @@ -933,14 +933,12 @@ def test_try_adopt_task_instances( ): executor = self.kubernetes_executor executor.scheduler_job_id = "10" - ti_key = annotations_to_key( - { - "dag_id": "dag", - "run_id": "run_id", - "task_id": "task", - "try_number": "1", - } - ) + ti_key = annotations_to_key({ + "dag_id": "dag", + "run_id": "run_id", + "task_id": "task", + "try_number": "1", + }) mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", key=ti_key) pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo")) mock_kube_client = mock.MagicMock() @@ -1188,7 +1186,7 @@ def get_annotations(pod_name): ], any_order=True, ) - assert executor.running == expected_running_ti_keys + assert {k8s_res.key for k8s_res in executor.completed} == expected_running_ti_keys @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client") From 9540f903721be7209f16dd18affa49592d4ad643 Mon Sep 17 00:00:00 2001 From: Natanel Rudyuklakir Date: Mon, 27 Oct 2025 20:48:44 +0200 Subject: [PATCH 3/3] formatted file --- .../executors/test_kubernetes_executor.py | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 53f149cedc050..d076fef3baeff 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -92,9 +92,9 @@ def _cases(self): ("影師嗎", "中華民國;$"), ] - cases.extend([ - (self._gen_random_string(seed, 200), self._gen_random_string(seed, 200)) for seed in range(100) - ]) + cases.extend( + [(self._gen_random_string(seed, 200), self._gen_random_string(seed, 200)) for seed in range(100)] + ) return cases @@ -933,12 +933,14 @@ def test_try_adopt_task_instances( ): executor = self.kubernetes_executor executor.scheduler_job_id = "10" - ti_key = annotations_to_key({ - "dag_id": "dag", - "run_id": "run_id", - "task_id": "task", - "try_number": "1", - }) + ti_key = annotations_to_key( + { + "dag_id": "dag", + "run_id": "run_id", + "task_id": "task", + "try_number": "1", + } + ) mock_ti = mock.MagicMock(queued_by_job_id="1", external_executor_id="1", key=ti_key) pod = k8s.V1Pod(metadata=k8s.V1ObjectMeta(name="foo")) mock_kube_client = mock.MagicMock()