Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -171,23 +171,36 @@ async def await_pod_start(
f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes."
)

# Check for general problems to terminate early - ErrImagePull
if pod_status.container_statuses:
for container_status in pod_status.container_statuses:
container_state: V1ContainerState = container_status.state
container_waiting: V1ContainerStateWaiting | None = container_state.waiting
if container_waiting:
if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]:
pod_manager.stop_watching_events = True
pod_manager.log.info("::endgroup::")
raise PodLaunchFailedException(
f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}"
f"\n{container_waiting.message}"
)
# Check for general problems to terminate early
error_message = detect_pod_terminate_early_issues(remote_pod)
if error_message:
pod_manager.log.info("::endgroup::")
raise PodLaunchFailedException(error_message)

await asyncio.sleep(check_interval)


def detect_pod_terminate_early_issues(pod: V1Pod) -> str | None:
"""
Identify issues that justify terminating the pod early.

:param pod: The pod object to check.
:return: An error message if an issue is detected; otherwise, None.
"""
pod_status = pod.status
if pod_status.container_statuses:
for container_status in pod_status.container_statuses:
container_state: V1ContainerState = container_status.state
container_waiting: V1ContainerStateWaiting | None = container_state.waiting
if container_waiting:
if container_waiting.reason in ["ErrImagePull", "ImagePullBackOff", "InvalidImageName"]:
return (
f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}"
f"\n{container_waiting.message}"
)
return None


class PodLaunchTimeoutException(AirflowException):
"""When pod does not leave the ``Pending`` phase within specified timeout."""

Expand Down Expand Up @@ -691,6 +704,9 @@ def await_pod_completion(
break
if istio_enabled and container_is_completed(remote_pod, container_name):
break
# abort waiting if defined issues are detected
if detect_pod_terminate_early_issues(remote_pod):
break
self.log.info("Pod %s has phase %s", pod.metadata.name, remote_pod.status.phase)
time.sleep(2)
return remote_pod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,33 @@
from pendulum import DateTime


@pytest.fixture
def pod_factory():
def _make(
*,
pod_phase: str = PodPhase.RUNNING,
container_name: str = "base",
terminated: bool = False,
waiting_reason: str | None = None,
waiting_message: str | None = None,
) -> mock.MagicMock:
pod = mock.MagicMock()
pod.status.phase = pod_phase
cs = mock.MagicMock()
cs.name = container_name
cs.state = mock.MagicMock()
cs.state.terminated = mock.MagicMock(finished_at=pendulum.now()) if terminated else None
cs.state.waiting = (
mock.MagicMock(reason=waiting_reason, message=waiting_message or "") if waiting_reason else None
)
pod.status.container_statuses = [cs]
c_spec = mock.MagicMock(name=container_name)
pod.spec.containers = [c_spec]
return pod

return _make


def test_parse_log_line():
log_message = "This should return no timestamp"
timestamp, line = parse_log_line(log_message)
Expand Down Expand Up @@ -654,12 +681,13 @@ async def test_start_pod_raises_informative_error_on_startup_timeout(self):
)

@pytest.mark.asyncio
async def test_start_pod_raises_fast_error_on_image_error(self):
@pytest.mark.parametrize("fail_reason", ["ErrImagePull", "ImagePullBackOff", "InvalidImageName"])
async def test_start_pod_raises_fast_error_on_image_error(self, fail_reason):
pod_response = mock.MagicMock()
pod_response.status.phase = "Pending"
container_statuse = mock.MagicMock()
waiting_state = mock.MagicMock()
waiting_state.reason = "ErrImagePull"
waiting_state.reason = fail_reason
waiting_state.message = "Test error"
container_statuse.state.waiting = waiting_state
pod_response.status.container_statuses = [container_statuse]
Expand Down Expand Up @@ -904,6 +932,49 @@ def test_await_xcom_sidecar_container_starts(self, mock_container_is_running):
self.pod_manager.await_xcom_sidecar_container_start(pod=mock_pod)
mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar")

@mock.patch("time.sleep")
def test_await_pod_completion_breaks_on_terminal_phase(self, mock_sleep, pod_factory):
pending = pod_factory(pod_phase=PodPhase.PENDING)
running = pod_factory(pod_phase=PodPhase.RUNNING)
succeeded = pod_factory(pod_phase=PodPhase.SUCCEEDED)
self.pod_manager.read_pod = mock.MagicMock(side_effect=[pending, running, succeeded])

result = self.pod_manager.await_pod_completion(pod=mock.MagicMock(), istio_enabled=False)

assert result is succeeded
assert mock_sleep.call_count == 2

@mock.patch("time.sleep")
def test_await_pod_completion_breaks_on_istio_container_completed(self, mock_sleep, pod_factory):
running1 = pod_factory(pod_phase=PodPhase.RUNNING, container_name="base", terminated=False)
running2 = pod_factory(pod_phase=PodPhase.RUNNING, container_name="base", terminated=True)

self.pod_manager.read_pod = mock.MagicMock(side_effect=[running1, running2])

result = self.pod_manager.await_pod_completion(
pod=mock.MagicMock(), istio_enabled=True, container_name="base"
)

assert result is running2
assert mock_sleep.call_count == 1

@mock.patch("time.sleep")
def test_await_pod_completion_breaks_on_early_termination_issue(self, mock_sleep, pod_factory):
running1 = pod_factory(pod_phase=PodPhase.PENDING, container_name="base")
running2 = pod_factory(
pod_phase=PodPhase.PENDING,
container_name="base",
waiting_reason="ImagePullBackOff",
waiting_message="Back-off pulling image",
)

self.pod_manager.read_pod = mock.MagicMock(side_effect=[running1, running2])

result = self.pod_manager.await_pod_completion(pod=mock.MagicMock(), istio_enabled=False)

assert result is running2
assert mock_sleep.call_count == 1


class TestAsyncPodManager:
@pytest.fixture
Expand Down Expand Up @@ -1125,12 +1196,13 @@ async def test_start_pod_raises_informative_error_on_startup_timeout(self):
self.mock_async_hook.get_pod.assert_called()

@pytest.mark.asyncio
async def test_start_pod_raises_fast_error_on_image_error(self):
@pytest.mark.parametrize("fail_reason", ["ErrImagePull", "ImagePullBackOff", "InvalidImageName"])
async def test_start_pod_raises_fast_error_on_image_error(self, fail_reason):
pod_response = mock.MagicMock()
pod_response.status.phase = "Pending"
container_status = mock.MagicMock()
waiting_state = mock.MagicMock()
waiting_state.reason = "ErrImagePull"
waiting_state.reason = fail_reason
waiting_state.message = "Test error"
container_status.state.waiting = waiting_state
pod_response.status.container_statuses = [container_status]
Expand Down