Skip to content

Commit

Permalink
Add timeout when watching pod events in k8s executor (#39551)
Browse files Browse the repository at this point in the history
If we don't set a timeout, it may hang indefinitely if there's a network issue.

---------

Co-authored-by: Ryan Hatter <25823361+RNHTTR@users.noreply.github.com>
  • Loading branch information
dstandish and RNHTTR authored May 15, 2024
1 parent a7960a1 commit 610747d
Showing 1 changed file with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,7 @@ def run(self) -> None:
kube_client, self.resource_version, self.scheduler_job_id, self.kube_config
)
except ReadTimeoutError:
self.log.warning(
"There was a timeout error accessing the Kube API. Retrying request.", exc_info=True
)
self.log.info("Kubernetes watch timed out waiting for events. Restarting watch.")
time.sleep(1)
except Exception:
self.log.exception("Unknown error in KubernetesJobWatcher. Failing")
Expand Down Expand Up @@ -141,7 +139,7 @@ def _run(
) -> str | None:
self.log.info("Event: and now my watch begins starting at resource_version: %s", resource_version)

kwargs = {"label_selector": f"airflow-worker={scheduler_job_id}"}
kwargs: dict[str, Any] = {"label_selector": f"airflow-worker={scheduler_job_id}"}
if resource_version:
kwargs["resource_version"] = resource_version
if kube_config.kube_client_request_args:
Expand All @@ -150,6 +148,14 @@ def _run(

last_resource_version: str | None = None

# For info about k8s timeout settings see
# https://github.com/kubernetes-client/python/blob/v29.0.0/examples/watch/timeout-settings.md
# and https://github.com/kubernetes-client/python/blob/v29.0.0/kubernetes/client/api_client.py#L336-L339
client_timeout = 30
server_conn_timeout = 3600
kwargs["_request_timeout"] = client_timeout
kwargs["timeout_seconds"] = server_conn_timeout

for event in self._pod_events(kube_client=kube_client, query_kwargs=kwargs):
task = event["object"]
self.log.debug("Event: %s had an event of type %s", task.metadata.name, event["type"])
Expand Down

0 comments on commit 610747d

Please sign in to comment.