Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@

if TYPE_CHECKING:
from kubernetes.client.models.core_v1_event_list import CoreV1EventList
from kubernetes.client.models.v1_container_state import V1ContainerState
from kubernetes.client.models.v1_container_state_waiting import V1ContainerStateWaiting
from kubernetes.client.models.v1_container_status import V1ContainerStatus
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.models.v1_pod_condition import V1PodCondition
Expand Down Expand Up @@ -424,6 +426,19 @@ def await_pod_start(
f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes."
)

# Check for general problems to terminate early - ErrImagePull
if pod_status.container_statuses:
for container_status in pod_status.container_statuses:
container_state: V1ContainerState = container_status.state
container_waiting: V1ContainerStateWaiting | None = container_state.waiting
if container_waiting:
if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]:
self.log.info("::endgroup::")
raise PodLaunchFailedException(
f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}"
f"\n{container_waiting.message}"
)

time.sleep(check_interval)

def fetch_container_logs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,26 @@ def test_start_pod_raises_informative_error_on_startup_timeout(self):
startup_timeout=0,
)

def test_start_pod_raises_fast_error_on_image_error(self):
pod_response = mock.MagicMock()
pod_response.status.phase = "Pending"
container_statuse = mock.MagicMock()
waiting_state = mock.MagicMock()
waiting_state.reason = "ErrImagePull"
waiting_state.message = "Test error"
container_statuse.state.waiting = waiting_state
pod_response.status.container_statuses = [container_statuse]

self.mock_kube_client.read_namespaced_pod.return_value = pod_response
expected_msg = f"Pod docker image cannot be pulled, unable to start: {waiting_state.reason}\n{waiting_state.message}"
mock_pod = MagicMock()
with pytest.raises(AirflowException, match=expected_msg):
self.pod_manager.await_pod_start(
pod=mock_pod,
schedule_timeout=60,
startup_timeout=60,
)

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.time.sleep")
def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog):
condition_scheduled = mock.MagicMock()
Expand Down