From 2d3bca9759d4ca3651efee96ae469664385c6e91 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Mon, 12 May 2025 09:50:04 +0200 Subject: [PATCH] KubernetesPodOperator uses different timeouts to check for schedule timeout and startup timeout --- .../cncf/kubernetes/operators/pod.py | 9 +++- .../cncf/kubernetes/utils/pod_manager.py | 52 ++++++++++++++----- .../cncf/kubernetes/utils/test_pod_manager.py | 46 +++++++++++++--- 3 files changed, 85 insertions(+), 22 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index 428c63d41d5bb..32cbee5670e51 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -157,8 +157,9 @@ class KubernetesPodOperator(BaseOperator): :param reattach_on_restart: if the worker dies while the pod is running, reattach and monitor during the next try. If False, always create a new pod for each try. :param labels: labels to apply to the Pod. (templated) - :param startup_timeout_seconds: timeout in seconds to startup the pod. + :param startup_timeout_seconds: timeout in seconds to startup the pod after pod was scheduled. :param startup_check_interval_seconds: interval in seconds to check if the pod has already started + :param schedule_timeout_seconds: timeout in seconds to schedule pod in cluster. :param get_logs: get the stdout of the base container as logs of the tasks. :param init_container_logs: list of init containers whose logs will be published to stdout Takes a sequence of containers, a single container name or True. If True, @@ -289,6 +290,7 @@ def __init__( reattach_on_restart: bool = True, startup_timeout_seconds: int = 120, startup_check_interval_seconds: int = 5, + schedule_timeout_seconds: int | None = None, get_logs: bool = True, base_container_name: str | None = None, base_container_status_polling_interval: float = 1, @@ -347,6 +349,8 @@ def __init__( self.labels = labels or {} self.startup_timeout_seconds = startup_timeout_seconds self.startup_check_interval_seconds = startup_check_interval_seconds + # New parameter startup_timeout_seconds adds breaking change, to handle this as smooth as possible just reuse startup time + self.schedule_timeout_seconds = schedule_timeout_seconds or startup_timeout_seconds env_vars = convert_env_vars(env_vars) if env_vars else [] self.env_vars = env_vars pod_runtime_info_envs = ( @@ -574,8 +578,9 @@ def await_pod_start(self, pod: k8s.V1Pod) -> None: try: self.pod_manager.await_pod_start( pod=pod, + schedule_timeout=self.schedule_timeout_seconds, startup_timeout=self.startup_timeout_seconds, - startup_check_interval=self.startup_check_interval_seconds, + check_interval=self.startup_check_interval_seconds, ) except PodLaunchFailedException: if self.log_events_on_failure: diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 390d01e3c60bb..ec593f4d573c5 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -48,6 +48,7 @@ from kubernetes.client.models.core_v1_event_list import CoreV1EventList from kubernetes.client.models.v1_container_status import V1ContainerStatus from kubernetes.client.models.v1_pod import V1Pod + from kubernetes.client.models.v1_pod_condition import V1PodCondition from urllib3.response import HTTPResponse @@ -375,30 +376,55 @@ def create_pod(self, pod: V1Pod) -> V1Pod: return self.run_pod_async(pod) def await_pod_start( - self, pod: V1Pod, startup_timeout: int = 120, startup_check_interval: int = 1 + self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: int = 1 ) -> None: """ Wait for the pod to reach phase other than ``Pending``. :param pod: + :param schedule_timeout: Timeout (in seconds) for pod stay in schedule state + (if pod is taking to long in schedule state, fails task) :param startup_timeout: Timeout (in seconds) for startup of the pod - (if pod is pending for too long, fails task) - :param startup_check_interval: Interval (in seconds) between checks + (if pod is pending for too long after being scheduled, fails task) + :param check_interval: Interval (in seconds) between checks :return: """ - curr_time = time.time() + self.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout) + pod_was_scheduled = False + start_check_time = time.time() while True: remote_pod = self.read_pod(pod) - if remote_pod.status.phase != PodPhase.PENDING: + pod_status = remote_pod.status + if pod_status.phase != PodPhase.PENDING: + self.keep_watching_for_events = False + self.log.info("::endgroup::") break - self.log.warning("Pod not yet started: %s", pod.metadata.name) - if time.time() - curr_time >= startup_timeout: - msg = ( - f"Pod took longer than {startup_timeout} seconds to start. " - "Check the pod events in kubernetes to determine why." - ) - raise PodLaunchFailedException(msg) - time.sleep(startup_check_interval) + + # Check for timeout + pod_conditions: list[V1PodCondition] = pod_status.conditions + if pod_conditions and any( + (condition.type == "PodScheduled" and condition.status == "True") + for condition in pod_conditions + ): + if not pod_was_scheduled: + # POD was initially scheduled update timeout for getting POD launched + pod_was_scheduled = True + start_check_time = time.time() + self.log.info("Waiting %ss to get the POD running...", startup_timeout) + + if time.time() - start_check_time >= startup_timeout: + self.log.info("::endgroup::") + raise PodLaunchFailedException( + f"Pod took too long to start. More than {startup_timeout}s. Check the pod events in kubernetes." + ) + else: + if time.time() - start_check_time >= schedule_timeout: + self.log.info("::endgroup::") + raise PodLaunchFailedException( + f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes." + ) + + time.sleep(check_interval) def fetch_container_logs( self, diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 320470cf48a8f..e6f99e62ed40a 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -392,39 +392,71 @@ def test_start_pod_retries_three_times(self, mock_run_pod_async): assert mock_run_pod_async.call_count == 3 - def test_start_pod_raises_informative_error_on_timeout(self): + def test_start_pod_raises_informative_error_on_scheduled_timeout(self): pod_response = mock.MagicMock() pod_response.status.phase = "Pending" self.mock_kube_client.read_namespaced_pod.return_value = pod_response - expected_msg = "Check the pod events in kubernetes" + expected_msg = "Pod took too long to be scheduled on the cluster, giving up. More than 0s. Check the pod events in kubernetes." mock_pod = MagicMock() with pytest.raises(AirflowException, match=expected_msg): self.pod_manager.await_pod_start( pod=mock_pod, + schedule_timeout=0, + startup_timeout=0, + ) + + def test_start_pod_raises_informative_error_on_startup_timeout(self): + pod_response = mock.MagicMock() + pod_response.status.phase = "Pending" + condition = mock.MagicMock() + condition.type = "PodScheduled" + condition.status = "True" + pod_response.status.conditions = [condition] + + self.mock_kube_client.read_namespaced_pod.return_value = pod_response + expected_msg = "Pod took too long to start. More than 0s. Check the pod events in kubernetes." + mock_pod = MagicMock() + with pytest.raises(AirflowException, match=expected_msg): + self.pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=0, startup_timeout=0, ) @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.time.sleep") - def test_start_pod_startup_interval_seconds(self, mock_time_sleep): + def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog): + condition_scheduled = mock.MagicMock() + condition_scheduled.type = "PodScheduled" + condition_scheduled.status = "True" + pod_info_pending = mock.MagicMock(**{"status.phase": PodPhase.PENDING}) + pod_info_pending_scheduled = mock.MagicMock( + **{"status.phase": PodPhase.PENDING, "status.conditions": [condition_scheduled]} + ) pod_info_succeeded = mock.MagicMock(**{"status.phase": PodPhase.SUCCEEDED}) def pod_state_gen(): yield pod_info_pending - yield pod_info_pending + yield pod_info_pending_scheduled + yield pod_info_pending_scheduled while True: yield pod_info_succeeded self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen() startup_check_interval = 10 # Any value is fine, as time.sleep is mocked to do nothing + schedule_timeout = 30 + startup_timeout = 60 mock_pod = MagicMock() self.pod_manager.await_pod_start( pod=mock_pod, - startup_timeout=60, # Never hit, any value is fine, as time.sleep is mocked to do nothing - startup_check_interval=startup_check_interval, + schedule_timeout=schedule_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing + startup_timeout=startup_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing + check_interval=startup_check_interval, ) mock_time_sleep.assert_called_with(startup_check_interval) - assert mock_time_sleep.call_count == 2 + assert mock_time_sleep.call_count == 3 + assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text + assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running") def test_container_is_running(self, container_is_running_mock):