diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 130d37b546c3e..33a023c359fdd 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -165,6 +165,7 @@ def __init__(self): self.task_publish_max_retries = conf.getint( "kubernetes_executor", "task_publish_max_retries", fallback=0 ) + self.completed: set[KubernetesResults] = set() super().__init__(parallelism=self.kube_config.parallelism) def _list_pods(self, query_kwargs): @@ -343,6 +344,9 @@ def sync(self) -> None: finally: self.result_queue.task_done() + for result in self.completed: + self._change_state(result) + from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils import ResourceVersion resource_instance = ResourceVersion() @@ -720,7 +724,16 @@ def _adopt_completed_pods(self, kube_client: client.CoreV1Api) -> None: continue ti_id = annotations_to_key(pod.metadata.annotations) - self.running.add(ti_id) + self.completed.add( + KubernetesResults( + key=ti_id, + state="completed", + pod_name=pod.metadata.name, + namespace=pod.metadata.namespace, + resource_version=pod.metadata.resource_version, + failure_details=None, + ) + ) def _flush_task_queue(self) -> None: if TYPE_CHECKING: diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 7fb6f790d8792..d076fef3baeff 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -1188,7 +1188,7 @@ def get_annotations(pod_name): ], any_order=True, ) - assert executor.running == expected_running_ti_keys + assert {k8s_res.key for k8s_res in executor.completed} == expected_running_ti_keys @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor.DynamicClient") @mock.patch("airflow.providers.cncf.kubernetes.kube_client.get_kube_client")