Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,9 @@ class KubernetesPodOperator(BaseOperator):
:param reattach_on_restart: if the worker dies while the pod is running, reattach and monitor
during the next try. If False, always create a new pod for each try.
:param labels: labels to apply to the Pod. (templated)
:param startup_timeout_seconds: timeout in seconds to startup the pod.
:param startup_timeout_seconds: timeout in seconds to startup the pod after pod was scheduled.
:param startup_check_interval_seconds: interval in seconds to check if the pod has already started
:param schedule_timeout_seconds: timeout in seconds to schedule pod in cluster.
:param get_logs: get the stdout of the base container as logs of the tasks.
:param init_container_logs: list of init containers whose logs will be published to stdout
Takes a sequence of containers, a single container name or True. If True,
Expand Down Expand Up @@ -289,6 +290,7 @@ def __init__(
reattach_on_restart: bool = True,
startup_timeout_seconds: int = 120,
startup_check_interval_seconds: int = 5,
schedule_timeout_seconds: int | None = None,
get_logs: bool = True,
base_container_name: str | None = None,
base_container_status_polling_interval: float = 1,
Expand Down Expand Up @@ -347,6 +349,8 @@ def __init__(
self.labels = labels or {}
self.startup_timeout_seconds = startup_timeout_seconds
self.startup_check_interval_seconds = startup_check_interval_seconds
# New parameter startup_timeout_seconds adds breaking change, to handle this as smooth as possible just reuse startup time
self.schedule_timeout_seconds = schedule_timeout_seconds or startup_timeout_seconds
env_vars = convert_env_vars(env_vars) if env_vars else []
self.env_vars = env_vars
pod_runtime_info_envs = (
Expand Down Expand Up @@ -574,8 +578,9 @@ def await_pod_start(self, pod: k8s.V1Pod) -> None:
try:
self.pod_manager.await_pod_start(
pod=pod,
schedule_timeout=self.schedule_timeout_seconds,
startup_timeout=self.startup_timeout_seconds,
startup_check_interval=self.startup_check_interval_seconds,
check_interval=self.startup_check_interval_seconds,
)
except PodLaunchFailedException:
if self.log_events_on_failure:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from kubernetes.client.models.core_v1_event_list import CoreV1EventList
from kubernetes.client.models.v1_container_status import V1ContainerStatus
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.models.v1_pod_condition import V1PodCondition
from urllib3.response import HTTPResponse


Expand Down Expand Up @@ -375,30 +376,55 @@ def create_pod(self, pod: V1Pod) -> V1Pod:
return self.run_pod_async(pod)

def await_pod_start(
self, pod: V1Pod, startup_timeout: int = 120, startup_check_interval: int = 1
self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: int = 1
) -> None:
"""
Wait for the pod to reach phase other than ``Pending``.

:param pod:
:param schedule_timeout: Timeout (in seconds) for pod stay in schedule state
(if pod is taking to long in schedule state, fails task)
:param startup_timeout: Timeout (in seconds) for startup of the pod
(if pod is pending for too long, fails task)
:param startup_check_interval: Interval (in seconds) between checks
(if pod is pending for too long after being scheduled, fails task)
:param check_interval: Interval (in seconds) between checks
:return:
"""
curr_time = time.time()
self.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout)
pod_was_scheduled = False
start_check_time = time.time()
while True:
remote_pod = self.read_pod(pod)
if remote_pod.status.phase != PodPhase.PENDING:
pod_status = remote_pod.status
if pod_status.phase != PodPhase.PENDING:
self.keep_watching_for_events = False
self.log.info("::endgroup::")
break
self.log.warning("Pod not yet started: %s", pod.metadata.name)
if time.time() - curr_time >= startup_timeout:
msg = (
f"Pod took longer than {startup_timeout} seconds to start. "
"Check the pod events in kubernetes to determine why."
)
raise PodLaunchFailedException(msg)
time.sleep(startup_check_interval)

# Check for timeout
pod_conditions: list[V1PodCondition] = pod_status.conditions
if pod_conditions and any(
(condition.type == "PodScheduled" and condition.status == "True")
for condition in pod_conditions
):
if not pod_was_scheduled:
# POD was initially scheduled update timeout for getting POD launched
pod_was_scheduled = True
start_check_time = time.time()
self.log.info("Waiting %ss to get the POD running...", startup_timeout)

if time.time() - start_check_time >= startup_timeout:
self.log.info("::endgroup::")
raise PodLaunchFailedException(
f"Pod took too long to start. More than {startup_timeout}s. Check the pod events in kubernetes."
)
else:
if time.time() - start_check_time >= schedule_timeout:
self.log.info("::endgroup::")
raise PodLaunchFailedException(
f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes."
)

time.sleep(check_interval)

def fetch_container_logs(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,39 +392,71 @@ def test_start_pod_retries_three_times(self, mock_run_pod_async):

assert mock_run_pod_async.call_count == 3

def test_start_pod_raises_informative_error_on_timeout(self):
def test_start_pod_raises_informative_error_on_scheduled_timeout(self):
pod_response = mock.MagicMock()
pod_response.status.phase = "Pending"
self.mock_kube_client.read_namespaced_pod.return_value = pod_response
expected_msg = "Check the pod events in kubernetes"
expected_msg = "Pod took too long to be scheduled on the cluster, giving up. More than 0s. Check the pod events in kubernetes."
mock_pod = MagicMock()
with pytest.raises(AirflowException, match=expected_msg):
self.pod_manager.await_pod_start(
pod=mock_pod,
schedule_timeout=0,
startup_timeout=0,
)

def test_start_pod_raises_informative_error_on_startup_timeout(self):
pod_response = mock.MagicMock()
pod_response.status.phase = "Pending"
condition = mock.MagicMock()
condition.type = "PodScheduled"
condition.status = "True"
pod_response.status.conditions = [condition]

self.mock_kube_client.read_namespaced_pod.return_value = pod_response
expected_msg = "Pod took too long to start. More than 0s. Check the pod events in kubernetes."
mock_pod = MagicMock()
with pytest.raises(AirflowException, match=expected_msg):
self.pod_manager.await_pod_start(
pod=mock_pod,
schedule_timeout=0,
startup_timeout=0,
)

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.time.sleep")
def test_start_pod_startup_interval_seconds(self, mock_time_sleep):
def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog):
condition_scheduled = mock.MagicMock()
condition_scheduled.type = "PodScheduled"
condition_scheduled.status = "True"

pod_info_pending = mock.MagicMock(**{"status.phase": PodPhase.PENDING})
pod_info_pending_scheduled = mock.MagicMock(
**{"status.phase": PodPhase.PENDING, "status.conditions": [condition_scheduled]}
)
pod_info_succeeded = mock.MagicMock(**{"status.phase": PodPhase.SUCCEEDED})

def pod_state_gen():
yield pod_info_pending
yield pod_info_pending
yield pod_info_pending_scheduled
yield pod_info_pending_scheduled
while True:
yield pod_info_succeeded

self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen()
startup_check_interval = 10 # Any value is fine, as time.sleep is mocked to do nothing
schedule_timeout = 30
startup_timeout = 60
mock_pod = MagicMock()
self.pod_manager.await_pod_start(
pod=mock_pod,
startup_timeout=60, # Never hit, any value is fine, as time.sleep is mocked to do nothing
startup_check_interval=startup_check_interval,
schedule_timeout=schedule_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing
startup_timeout=startup_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing
check_interval=startup_check_interval,
)
mock_time_sleep.assert_called_with(startup_check_interval)
assert mock_time_sleep.call_count == 2
assert mock_time_sleep.call_count == 3
assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text
assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
def test_container_is_running(self, container_is_running_mock):
Expand Down