diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index b7e03459d6089..122cfd9f8fab6 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -530,7 +530,17 @@ def terminate(self) -> None: self.log.debug("Terminating kube_watchers...") for kube_watcher in self.kube_watchers.values(): kube_watcher.terminate() - kube_watcher.join() + self.log.debug("kube_watcher=%s", kube_watcher) + + # for now 20 seconds is max wait time for kube watchers to terminate. + max_wait_time = 20 + start_time = time.time() + for kube_watcher in self.kube_watchers.values(): + kube_watcher.join(timeout=max(int(max_wait_time - (time.time() - start_time)), 0)) + if kube_watcher.is_alive(): + self.log.warning("kube_watcher didn't terminate in time=%s", kube_watcher) + kube_watcher.kill() + kube_watcher.join() self.log.debug("kube_watcher=%s", kube_watcher) self.log.debug("Flushing watcher_queue...") self._flush_watcher_queue()