diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index d26df876eff54..b8235bb5ac49b 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -98,9 +98,7 @@ def run(self) -> None: kube_client, self.resource_version, self.scheduler_job_id, self.kube_config ) except ReadTimeoutError: - self.log.warning( - "There was a timeout error accessing the Kube API. Retrying request.", exc_info=True - ) + self.log.info("Kubernetes watch timed out waiting for events. Restarting watch.") time.sleep(1) except Exception: self.log.exception("Unknown error in KubernetesJobWatcher. Failing") @@ -141,7 +139,7 @@ def _run( ) -> str | None: self.log.info("Event: and now my watch begins starting at resource_version: %s", resource_version) - kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"} + kwargs: dict[str, Any] = {"label_selector": f"airflow-worker={scheduler_job_id}"} if resource_version: kwargs["resource_version"] = resource_version if kube_config.kube_client_request_args: @@ -150,6 +148,14 @@ def _run( last_resource_version: str | None = None + # For info about k8s timeout settings see + # https://github.com/kubernetes-client/python/blob/v29.0.0/examples/watch/timeout-settings.md + # and https://github.com/kubernetes-client/python/blob/v29.0.0/kubernetes/client/api_client.py#L336-L339 + client_timeout = 30 + server_conn_timeout = 3600 + kwargs["_request_timeout"] = client_timeout + kwargs["timeout_seconds"] = server_conn_timeout + for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs): task = event["object"] self.log.debug("Event: %s had an event of type %s", task.metadata.name, event["type"])