diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 7fddb37bf3470..086a2e9f2c3f1 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -171,23 +171,36 @@ async def await_pod_start( f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes." ) - # Check for general problems to terminate early - ErrImagePull - if pod_status.container_statuses: - for container_status in pod_status.container_statuses: - container_state: V1ContainerState = container_status.state - container_waiting: V1ContainerStateWaiting | None = container_state.waiting - if container_waiting: - if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]: - pod_manager.stop_watching_events = True - pod_manager.log.info("::endgroup::") - raise PodLaunchFailedException( - f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" - f"\n{container_waiting.message}" - ) + # Check for general problems to terminate early + error_message = detect_pod_terminate_early_issues(remote_pod) + if error_message: + pod_manager.log.info("::endgroup::") + raise PodLaunchFailedException(error_message) await asyncio.sleep(check_interval) +def detect_pod_terminate_early_issues(pod: V1Pod) -> str | None: + """ + Identify issues that justify terminating the pod early. + + :param pod: The pod object to check. + :return: An error message if an issue is detected; otherwise, None. + """ + pod_status = pod.status + if pod_status.container_statuses: + for container_status in pod_status.container_statuses: + container_state: V1ContainerState = container_status.state + container_waiting: V1ContainerStateWaiting | None = container_state.waiting + if container_waiting: + if container_waiting.reason in ["ErrImagePull", "ImagePullBackOff", "InvalidImageName"]: + return ( + f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" + f"\n{container_waiting.message}" + ) + return None + + class PodLaunchTimeoutException(AirflowException): """When pod does not leave the ``Pending`` phase within specified timeout.""" @@ -691,6 +704,9 @@ def await_pod_completion( break if istio_enabled and container_is_completed(remote_pod, container_name): break + # abort waiting if defined issues are detected + if detect_pod_terminate_early_issues(remote_pod): + break self.log.info("Pod %s has phase %s", pod.metadata.name, remote_pod.status.phase) time.sleep(2) return remote_pod diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 6fb8b1e10e7dc..062266540004b 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -47,6 +47,33 @@ from pendulum import DateTime +@pytest.fixture +def pod_factory(): + def _make( + *, + pod_phase: str = PodPhase.RUNNING, + container_name: str = "base", + terminated: bool = False, + waiting_reason: str | None = None, + waiting_message: str | None = None, + ) -> mock.MagicMock: + pod = mock.MagicMock() + pod.status.phase = pod_phase + cs = mock.MagicMock() + cs.name = container_name + cs.state = mock.MagicMock() + cs.state.terminated = mock.MagicMock(finished_at=pendulum.now()) if terminated else None + cs.state.waiting = ( + mock.MagicMock(reason=waiting_reason, message=waiting_message or "") if waiting_reason else None + ) + pod.status.container_statuses = [cs] + c_spec = mock.MagicMock(name=container_name) + pod.spec.containers = [c_spec] + return pod + + return _make + + def test_parse_log_line(): log_message = "This should return no timestamp" timestamp, line = parse_log_line(log_message) @@ -654,12 +681,13 @@ async def test_start_pod_raises_informative_error_on_startup_timeout(self): ) @pytest.mark.asyncio - async def test_start_pod_raises_fast_error_on_image_error(self): + @pytest.mark.parametrize("fail_reason", ["ErrImagePull", "ImagePullBackOff", "InvalidImageName"]) + async def test_start_pod_raises_fast_error_on_image_error(self, fail_reason): pod_response = mock.MagicMock() pod_response.status.phase = "Pending" container_statuse = mock.MagicMock() waiting_state = mock.MagicMock() - waiting_state.reason = "ErrImagePull" + waiting_state.reason = fail_reason waiting_state.message = "Test error" container_statuse.state.waiting = waiting_state pod_response.status.container_statuses = [container_statuse] @@ -904,6 +932,49 @@ def test_await_xcom_sidecar_container_starts(self, mock_container_is_running): self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod) mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar") + @mock.patch("time.sleep") + def test_await_pod_completion_breaks_on_terminal_phase(self, mock_sleep, pod_factory): + pending = pod_factory(pod_phase=PodPhase.PENDING) + running = pod_factory(pod_phase=PodPhase.RUNNING) + succeeded = pod_factory(pod_phase=PodPhase.SUCCEEDED) + self.pod_manager.read_pod = mock.MagicMock(side_effect=[pending, running, succeeded]) + + result = self.pod_manager.await_pod_completion(pod=mock.MagicMock(), istio_enabled=False) + + assert result is succeeded + assert mock_sleep.call_count == 2 + + @mock.patch("time.sleep") + def test_await_pod_completion_breaks_on_istio_container_completed(self, mock_sleep, pod_factory): + running1 = pod_factory(pod_phase=PodPhase.RUNNING, container_name="base", terminated=False) + running2 = pod_factory(pod_phase=PodPhase.RUNNING, container_name="base", terminated=True) + + self.pod_manager.read_pod = mock.MagicMock(side_effect=[running1, running2]) + + result = self.pod_manager.await_pod_completion( + pod=mock.MagicMock(), istio_enabled=True, container_name="base" + ) + + assert result is running2 + assert mock_sleep.call_count == 1 + + @mock.patch("time.sleep") + def test_await_pod_completion_breaks_on_early_termination_issue(self, mock_sleep, pod_factory): + running1 = pod_factory(pod_phase=PodPhase.PENDING, container_name="base") + running2 = pod_factory( + pod_phase=PodPhase.PENDING, + container_name="base", + waiting_reason="ImagePullBackOff", + waiting_message="Back-off pulling image", + ) + + self.pod_manager.read_pod = mock.MagicMock(side_effect=[running1, running2]) + + result = self.pod_manager.await_pod_completion(pod=mock.MagicMock(), istio_enabled=False) + + assert result is running2 + assert mock_sleep.call_count == 1 + class TestAsyncPodManager: @pytest.fixture @@ -1125,12 +1196,13 @@ async def test_start_pod_raises_informative_error_on_startup_timeout(self): self.mock_async_hook.get_pod.assert_called() @pytest.mark.asyncio - async def test_start_pod_raises_fast_error_on_image_error(self): + @pytest.mark.parametrize("fail_reason", ["ErrImagePull", "ImagePullBackOff", "InvalidImageName"]) + async def test_start_pod_raises_fast_error_on_image_error(self, fail_reason): pod_response = mock.MagicMock() pod_response.status.phase = "Pending" container_status = mock.MagicMock() waiting_state = mock.MagicMock() - waiting_state.reason = "ErrImagePull" + waiting_state.reason = fail_reason waiting_state.message = "Test error" container_status.state.waiting = waiting_state pod_response.status.container_statuses = [container_status]