diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index ec593f4d573c5..76ee44ae9c7da 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -46,6 +46,8 @@ if TYPE_CHECKING: from kubernetes.client.models.core_v1_event_list import CoreV1EventList + from kubernetes.client.models.v1_container_state import V1ContainerState + from kubernetes.client.models.v1_container_state_waiting import V1ContainerStateWaiting from kubernetes.client.models.v1_container_status import V1ContainerStatus from kubernetes.client.models.v1_pod import V1Pod from kubernetes.client.models.v1_pod_condition import V1PodCondition @@ -424,6 +426,19 @@ def await_pod_start( f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes." ) + # Check for general problems to terminate early - ErrImagePull + if pod_status.container_statuses: + for container_status in pod_status.container_statuses: + container_state: V1ContainerState = container_status.state + container_waiting: V1ContainerStateWaiting | None = container_state.waiting + if container_waiting: + if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]: + self.log.info("::endgroup::") + raise PodLaunchFailedException( + f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" + f"\n{container_waiting.message}" + ) + time.sleep(check_interval) def fetch_container_logs( diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index e6f99e62ed40a..c682310fba1cf 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -423,6 +423,26 @@ def test_start_pod_raises_informative_error_on_startup_timeout(self): startup_timeout=0, ) + def test_start_pod_raises_fast_error_on_image_error(self): + pod_response = mock.MagicMock() + pod_response.status.phase = "Pending" + container_statuse = mock.MagicMock() + waiting_state = mock.MagicMock() + waiting_state.reason = "ErrImagePull" + waiting_state.message = "Test error" + container_statuse.state.waiting = waiting_state + pod_response.status.container_statuses = [container_statuse] + + self.mock_kube_client.read_namespaced_pod.return_value = pod_response + expected_msg = f"Pod docker image cannot be pulled, unable to start: {waiting_state.reason}\n{waiting_state.message}" + mock_pod = MagicMock() + with pytest.raises(AirflowException, match=expected_msg): + self.pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=60, + startup_timeout=60, + ) + @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.time.sleep") def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog): condition_scheduled = mock.MagicMock()