diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py index c0b6ad83a3fdc..5503c743797ad 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py @@ -27,3 +27,11 @@ class PodMutationHookException(AirflowException): class PodReconciliationError(AirflowException): """Raised when an error is encountered while trying to merge pod configs.""" + + +class KubernetesApiError(AirflowException): + """Raised when an error is encountered while trying access Kubernetes API.""" + + +class KubernetesApiPermissionError(AirflowException): + """Raised when an error is encountered while trying access Kubernetes API.""" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 3e746a2118104..e08d2bb8aaa16 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -37,6 +37,7 @@ from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.models import Connection +from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError, KubernetesApiPermissionError from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation from airflow.providers.cncf.kubernetes.utils.container import ( @@ -48,7 +49,7 @@ if TYPE_CHECKING: from kubernetes.client import V1JobList - from kubernetes.client.models import V1Job, V1Pod + from kubernetes.client.models import CoreV1EventList, V1Job, V1Pod LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file kube_config from {}..." @@ -878,12 +879,17 @@ async def get_pod(self, name: str, namespace: str) -> V1Pod: :param namespace: Name of the pod's namespace. """ async with self.get_conn() as connection: - v1_api = async_client.CoreV1Api(connection) - pod: V1Pod = await v1_api.read_namespaced_pod( - name=name, - namespace=namespace, - ) - return pod + try: + v1_api = async_client.CoreV1Api(connection) + pod: V1Pod = await v1_api.read_namespaced_pod( + name=name, + namespace=namespace, + ) + return pod + except HTTPError as e: + if hasattr(e, "status") and e.status == 403: + raise KubernetesApiPermissionError("Permission denied (403) from Kubernetes API.") from e + raise KubernetesApiError from e async def delete_pod(self, name: str, namespace: str): """ @@ -932,6 +938,21 @@ async def read_logs(self, name: str, namespace: str): self.log.exception("There was an error reading the kubernetes API.") raise + async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList: + """Get pod's events.""" + async with self.get_conn() as connection: + try: + v1_api = async_client.CoreV1Api(connection) + events: CoreV1EventList = await v1_api.list_namespaced_event( + field_selector=f"involvedObject.name={name}", + namespace=namespace, + ) + return events + except HTTPError as e: + if hasattr(e, "status") and e.status == 403: + raise KubernetesApiPermissionError("Permission denied (403) from Kubernetes API.") from e + raise KubernetesApiError from e + async def get_job_status(self, name: str, namespace: str) -> V1Job: """ Get job's status object. diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index bee7eb60049d1..d5cc9e27caf9a 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -867,6 +867,7 @@ def invoke_defer_method(self, last_log_time: DateTime | None = None) -> None: get_logs=self.get_logs, startup_timeout=self.startup_timeout_seconds, startup_check_interval=self.startup_check_interval_seconds, + schedule_timeout=self.schedule_timeout_seconds, base_container_name=self.base_container_name, on_finish_action=self.on_finish_action.value, last_log_time=last_log_time, diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py index 2646854c5d04d..a05a4ff05d5fb 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -26,8 +26,10 @@ import tenacity +from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiPermissionError from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook from airflow.providers.cncf.kubernetes.utils.pod_manager import ( + AsyncPodManager, OnFinishAction, PodLaunchTimeoutException, PodPhase, @@ -69,6 +71,7 @@ class KubernetesPodTrigger(BaseTrigger): :param get_logs: get the stdout of the container as logs of the tasks. :param startup_timeout: timeout in seconds to start up the pod. :param startup_check_interval: interval in seconds to check if the pod has already started. + :param schedule_timeout: timeout in seconds to schedule pod in cluster. :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. @@ -91,7 +94,8 @@ def __init__( in_cluster: bool | None = None, get_logs: bool = True, startup_timeout: int = 120, - startup_check_interval: int = 5, + startup_check_interval: float = 5, + schedule_timeout: int = 120, on_finish_action: str = "delete_pod", last_log_time: DateTime | None = None, logging_interval: int | None = None, @@ -110,11 +114,11 @@ def __init__( self.get_logs = get_logs self.startup_timeout = startup_timeout self.startup_check_interval = startup_check_interval + self.schedule_timeout = schedule_timeout self.last_log_time = last_log_time self.logging_interval = logging_interval self.on_finish_action = OnFinishAction(on_finish_action) self.trigger_kwargs = trigger_kwargs or {} - self._since_time = None def serialize(self) -> tuple[str, dict[str, Any]]: @@ -133,6 +137,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "get_logs": self.get_logs, "startup_timeout": self.startup_timeout, "startup_check_interval": self.startup_check_interval, + "schedule_timeout": self.schedule_timeout, "trigger_start_time": self.trigger_start_time, "on_finish_action": self.on_finish_action.value, "last_log_time": self.last_log_time, @@ -182,6 +187,22 @@ async def run(self) -> AsyncIterator[TriggerEvent]: } ) return + except KubernetesApiPermissionError as e: + message = ( + "Kubernetes API permission error: The triggerer may not have sufficient permissions to monitor or delete pods. " + "Please ensure the triggerer's service account is included in the 'pod-launcher-role' as defined in the latest Airflow Helm chart. " + f"Original error: {e}" + ) + yield TriggerEvent( + { + "name": self.pod_name, + "namespace": self.pod_namespace, + "status": "error", + "message": message, + **self.trigger_kwargs, + } + ) + return except Exception as e: yield TriggerEvent( { @@ -209,17 +230,16 @@ def _format_exception_description(self, exc: Exception) -> Any: async def _wait_for_pod_start(self) -> ContainerState: """Loops until pod phase leaves ``PENDING`` If timeout is reached, throws error.""" - while True: - pod = await self._get_pod() - if not pod.status.phase == "Pending": - return self.define_container_state(pod) - - delta = datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time - if self.startup_timeout < delta.total_seconds(): - raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase within specified timeout") - - self.log.info("Still waiting for pod to start. The pod state is %s", pod.status.phase) - await asyncio.sleep(self.startup_check_interval) + pod = await self._get_pod() + events_task = self.pod_manager.watch_pod_events(pod, self.startup_check_interval) + pod_start_task = self.pod_manager.await_pod_start( + pod=pod, + schedule_timeout=self.schedule_timeout, + startup_timeout=self.startup_timeout, + check_interval=self.startup_check_interval, + ) + await asyncio.gather(pod_start_task, events_task) + return self.define_container_state(await self._get_pod()) async def _wait_for_container_completion(self) -> TriggerEvent: """ @@ -287,6 +307,10 @@ def hook(self) -> AsyncKubernetesHook: cluster_context=self.cluster_context, ) + @cached_property + def pod_manager(self) -> AsyncPodManager: + return AsyncPodManager(async_hook=self.hook) + def define_container_state(self, pod: V1Pod) -> ContainerState: pod_containers = pod.status.container_statuses diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index c9dc45e57f414..789ad4db79e6c 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -60,6 +60,8 @@ from kubernetes.client.models.v1_pod_condition import V1PodCondition from urllib3.response import HTTPResponse + from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook + EMPTY_XCOM_RESULT = "__airflow_xcom_result_empty__" """ @@ -99,6 +101,109 @@ def check_exception_is_kubernetes_api_unauthorized(exc: BaseException): return isinstance(exc, ApiException) and exc.status and str(exc.status) == "401" +async def watch_pod_events( + pod_manager: PodManager | AsyncPodManager, + pod: V1Pod, + check_interval: float = 1, +) -> None: + """ + Read pod events and write them to the log. + + This function supports both asynchronous and synchronous pod managers. + + :param pod_manager: The pod manager instance (PodManager or AsyncPodManager). + :param pod: The pod object to monitor. + :param check_interval: Interval (in seconds) between checks. + """ + num_events = 0 + is_async = isinstance(pod_manager, AsyncPodManager) + while not pod_manager.stop_watching_events: + if is_async: + events = await pod_manager.read_pod_events(pod) + else: + events = pod_manager.read_pod_events(pod) + for new_event in events.items[num_events:]: + involved_object: V1ObjectReference = new_event.involved_object + pod_manager.log.info( + "The Pod has an Event: %s from %s", new_event.message, involved_object.field_path + ) + num_events = len(events.items) + await asyncio.sleep(check_interval) + + +async def await_pod_start( + pod_manager: PodManager | AsyncPodManager, + pod: V1Pod, + schedule_timeout: int = 120, + startup_timeout: int = 120, + check_interval: float = 1, +): + """ + Monitor the startup phase of a Kubernetes pod, waiting for it to leave the ``Pending`` state. + + This function is shared by both PodManager and AsyncPodManager to provide consistent pod startup tracking. + + :param pod_manager: The pod manager instance (PodManager or AsyncPodManager). + :param pod: The pod object to monitor. + :param schedule_timeout: Maximum time (in seconds) to wait for the pod to be scheduled. + :param startup_timeout: Maximum time (in seconds) to wait for the pod to start running after being scheduled. + :param check_interval: Interval (in seconds) between status checks. + :param is_async: Set to True if called in an async context; otherwise, False. + """ + pod_manager.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout) + pod_was_scheduled = False + start_check_time = time.time() + is_async = isinstance(pod_manager, AsyncPodManager) + while True: + if is_async: + remote_pod = await pod_manager.read_pod(pod) + else: + remote_pod = pod_manager.read_pod(pod) + pod_status = remote_pod.status + if pod_status.phase != PodPhase.PENDING: + pod_manager.stop_watching_events = True + pod_manager.log.info("::endgroup::") + break + + # Check for timeout + pod_conditions: list[V1PodCondition] = pod_status.conditions + if pod_conditions and any( + (condition.type == "PodScheduled" and condition.status == "True") for condition in pod_conditions + ): + if not pod_was_scheduled: + # POD was initially scheduled update timeout for getting POD launched + pod_was_scheduled = True + start_check_time = time.time() + pod_manager.log.info("Waiting %ss to get the POD running...", startup_timeout) + + if time.time() - start_check_time >= startup_timeout: + pod_manager.log.info("::endgroup::") + raise PodLaunchTimeoutException( + f"Pod took too long to start. More than {startup_timeout}s. Check the pod events in kubernetes." + ) + else: + if time.time() - start_check_time >= schedule_timeout: + pod_manager.log.info("::endgroup::") + raise PodLaunchTimeoutException( + f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes." + ) + + # Check for general problems to terminate early - ErrImagePull + if pod_status.container_statuses: + for container_status in pod_status.container_statuses: + container_state: V1ContainerState = container_status.state + container_waiting: V1ContainerStateWaiting | None = container_state.waiting + if container_waiting: + if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]: + pod_manager.log.info("::endgroup::") + raise PodLaunchFailedException( + f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" + f"\n{container_waiting.message}" + ) + + await asyncio.sleep(check_interval) + + class PodLaunchTimeoutException(AirflowException): """When pod does not leave the ``Pending`` phase within specified timeout.""" @@ -262,16 +367,7 @@ def create_pod(self, pod: V1Pod) -> V1Pod: async def watch_pod_events(self, pod: V1Pod, check_interval: int = 1) -> None: """Read pod events and writes into log.""" - num_events = 0 - while not self.stop_watching_events: - events = self.read_pod_events(pod) - for new_event in events.items[num_events:]: - involved_object: V1ObjectReference = new_event.involved_object - self.log.info( - "The Pod has an Event: %s from %s", new_event.message, involved_object.field_path - ) - num_events = len(events.items) - await asyncio.sleep(check_interval) + await watch_pod_events(pod_manager=self, pod=pod, check_interval=check_interval) async def await_pod_start( self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: int = 1 @@ -287,55 +383,13 @@ async def await_pod_start( :param check_interval: Interval (in seconds) between checks :return: """ - self.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout) - pod_was_scheduled = False - start_check_time = time.time() - while True: - remote_pod = self.read_pod(pod) - pod_status = remote_pod.status - if pod_status.phase != PodPhase.PENDING: - self.stop_watching_events = True - self.log.info("::endgroup::") - break - - # Check for timeout - pod_conditions: list[V1PodCondition] = pod_status.conditions - if pod_conditions and any( - (condition.type == "PodScheduled" and condition.status == "True") - for condition in pod_conditions - ): - if not pod_was_scheduled: - # POD was initially scheduled update timeout for getting POD launched - pod_was_scheduled = True - start_check_time = time.time() - self.log.info("Waiting %ss to get the POD running...", startup_timeout) - - if time.time() - start_check_time >= startup_timeout: - self.log.info("::endgroup::") - raise PodLaunchFailedException( - f"Pod took too long to start. More than {startup_timeout}s. Check the pod events in kubernetes." - ) - else: - if time.time() - start_check_time >= schedule_timeout: - self.log.info("::endgroup::") - raise PodLaunchFailedException( - f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes." - ) - - # Check for general problems to terminate early - ErrImagePull - if pod_status.container_statuses: - for container_status in pod_status.container_statuses: - container_state: V1ContainerState = container_status.state - container_waiting: V1ContainerStateWaiting | None = container_state.waiting - if container_waiting: - if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]: - self.log.info("::endgroup::") - raise PodLaunchFailedException( - f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" - f"\n{container_waiting.message}" - ) - - await asyncio.sleep(check_interval) + await await_pod_start( + pod_manager=self, + pod=pod, + schedule_timeout=schedule_timeout, + startup_timeout=startup_timeout, + check_interval=check_interval, + ) def _log_message( self, @@ -915,3 +969,66 @@ class OnFinishAction(str, enum.Enum): def is_log_group_marker(line: str) -> bool: """Check if the line is a log group marker like `::group::` or `::endgroup::`.""" return line.startswith("::group::") or line.startswith("::endgroup::") + + +class AsyncPodManager(LoggingMixin): + """Create, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodTriggerer.""" + + def __init__( + self, + async_hook: AsyncKubernetesHook, + callbacks: list[type[KubernetesPodOperatorCallback]] | None = None, + ): + """ + Create the launcher. + + :param kube_client: kubernetes client + :param callbacks: + """ + super().__init__() + self._hook = async_hook + self._watch = watch.Watch() + self._callbacks = callbacks or [] + self.stop_watching_events = False + + @tenacity.retry(stop=tenacity.stop_after_attempt(5), wait=tenacity.wait_exponential(), reraise=True) + async def read_pod(self, pod: V1Pod) -> V1Pod: + """Read POD information.""" + return await self._hook.get_pod( + pod.metadata.name, + pod.metadata.namespace, + ) + + @tenacity.retry(stop=tenacity.stop_after_attempt(5), wait=tenacity.wait_exponential(), reraise=True) + async def read_pod_events(self, pod: V1Pod) -> CoreV1EventList: + """Get pod's events.""" + return await self._hook.get_pod_events( + pod.metadata.name, + pod.metadata.namespace, + ) + + async def watch_pod_events(self, pod: V1Pod, check_interval: float = 1) -> None: + """Read pod events and writes into log.""" + await watch_pod_events(pod_manager=self, pod=pod, check_interval=check_interval) + + async def await_pod_start( + self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: float = 1 + ) -> None: + """ + Wait for the pod to reach phase other than ``Pending``. + + :param pod: + :param schedule_timeout: Timeout (in seconds) for pod stay in schedule state + (if pod is taking to long in schedule state, fails task) + :param startup_timeout: Timeout (in seconds) for startup of the pod + (if pod is pending for too long after being scheduled, fails task) + :param check_interval: Interval (in seconds) between checks + :return: + """ + await await_pod_start( + pod_manager=self, + pod=pod, + schedule_timeout=schedule_timeout, + startup_timeout=startup_timeout, + check_interval=check_interval, + ) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py index 66fae2524d618..fc477fe0cf763 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py @@ -44,6 +44,7 @@ IN_CLUSTER = False GET_LOGS = True STARTUP_TIMEOUT_SECS = 120 +STARTUP_CHECK_INTERVAL_SECS = 0.1 TRIGGER_START_TIME = datetime.datetime.now(tz=datetime.timezone.utc) FAILED_RESULT_MSG = "Test message that appears when trigger have failed event." BASE_CONTAINER_NAME = "base" @@ -63,11 +64,24 @@ def trigger(): in_cluster=IN_CLUSTER, get_logs=GET_LOGS, startup_timeout=STARTUP_TIMEOUT_SECS, + startup_check_interval=STARTUP_CHECK_INTERVAL_SECS, + schedule_timeout=STARTUP_TIMEOUT_SECS, trigger_start_time=TRIGGER_START_TIME, on_finish_action=ON_FINISH_ACTION, ) +@pytest.fixture +def mock_time_fixture(): + """Fixture to simulate time passage beyond startup timeout.""" + with mock.patch("time.time") as mock_time: + start_time = 1000 + mock_time.side_effect = [ + *(start_time + STARTUP_TIMEOUT_SECS * n for n in range(5)), + ] + yield mock_time + + def get_read_pod_mock_containers(statuses_to_emit=None): """ Emit pods with given phases sequentially. @@ -106,7 +120,8 @@ def test_serialize(self, trigger): "in_cluster": IN_CLUSTER, "get_logs": GET_LOGS, "startup_timeout": STARTUP_TIMEOUT_SECS, - "startup_check_interval": 5, + "startup_check_interval": STARTUP_CHECK_INTERVAL_SECS, + "schedule_timeout": STARTUP_TIMEOUT_SECS, "trigger_start_time": TRIGGER_START_TIME, "on_finish_action": ON_FINISH_ACTION, "last_log_time": None, @@ -138,7 +153,7 @@ async def test_run_loop_return_success_event(self, mock_wait_pod, trigger): async def test_run_loop_return_waiting_event( self, mock_hook, mock_method, mock_wait_pod, trigger, caplog ): - mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) mock_method.return_value = ContainerState.WAITING caplog.set_level(logging.INFO) @@ -157,7 +172,7 @@ async def test_run_loop_return_waiting_event( async def test_run_loop_return_running_event( self, mock_hook, mock_method, mock_wait_pod, trigger, caplog ): - mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) mock_method.return_value = ContainerState.RUNNING caplog.set_level(logging.INFO) @@ -229,7 +244,7 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( Test that KubernetesPodTrigger fires the correct event in case of fail. """ - mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) mock_method.return_value = ContainerState.FAILED caplog.set_level(logging.INFO) @@ -319,14 +334,46 @@ def test_define_container_state_should_execute_successfully( assert expected_state == trigger.define_container_state(pod) + @pytest.mark.asyncio + @mock.patch(f"{TRIGGER_PATH}.define_container_state") + @mock.patch(f"{TRIGGER_PATH}.hook") + async def test_run_loop_read_events_during_start(self, mock_hook, mock_method, trigger): + event1 = mock.AsyncMock() + event1.message = "event 1" + event1.involved_object.field_path = "object 1" + event2 = mock.AsyncMock() + event2.message = "event 2" + event2.involved_object.field_path = "object 2" + events_list = mock.AsyncMock() + events_list.items = [event1, event2] + + mock_hook.get_pod_events = mock.AsyncMock(return_value=events_list) + + pod_pending = mock.MagicMock() + pod_pending.status.phase = PodPhase.PENDING + pod_succeeded = mock.MagicMock() + pod_succeeded.status.phase = PodPhase.SUCCEEDED + + mock_hook.get_pod = mock.AsyncMock( + side_effect=[pod_pending, pod_pending, pod_succeeded, pod_succeeded] + ) + + mock_method.return_value = ContainerState.TERMINATED + + with mock.patch.object(trigger.pod_manager.log, "info") as mock_log_info: + generator = trigger.run() + await generator.asend(None) + + mock_log_info.assert_any_call("The Pod has an Event: %s from %s", "event 1", "object 1") + mock_log_info.assert_any_call("The Pod has an Event: %s from %s", "event 2", "object 2") + @pytest.mark.asyncio @pytest.mark.parametrize("container_state", [ContainerState.WAITING, ContainerState.UNDEFINED]) @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") async def test_run_loop_return_timeout_event( - self, mock_hook, mock_method, trigger, caplog, container_state + self, mock_hook, mock_method, trigger, container_state, mock_time_fixture ): - trigger.trigger_start_time = TRIGGER_START_TIME - datetime.timedelta(minutes=2) mock_hook.get_pod.return_value = self._mock_pod_result( mock.MagicMock( status=mock.MagicMock( @@ -335,9 +382,6 @@ async def test_run_loop_return_timeout_event( ) ) mock_method.return_value = container_state - - caplog.set_level(logging.INFO) - generator = trigger.run() actual = await generator.asend(None) assert ( @@ -346,7 +390,7 @@ async def test_run_loop_return_timeout_event( "name": POD_NAME, "namespace": NAMESPACE, "status": "timeout", - "message": "Pod did not leave 'Pending' phase within specified timeout", + "message": "Pod took too long to be scheduled on the cluster, giving up. More than 120s. Check the pod events in kubernetes.", } ) == actual @@ -356,14 +400,13 @@ async def test_run_loop_return_timeout_event( @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") async def test_run_loop_return_success_for_completed_pod_after_timeout( - self, mock_hook, mock_method, trigger, caplog + self, mock_hook, mock_method, trigger, mock_time_fixture ): """ Test that the trigger correctly recognizes the pod is not pending even after the timeout has been reached. This may happen when a new triggerer process takes over the trigger, the pod already left pending state and the timeout has been reached. """ - trigger.trigger_start_time = TRIGGER_START_TIME - datetime.timedelta(minutes=2) mock_hook.get_pod.return_value = self._mock_pod_result( mock.MagicMock( status=mock.MagicMock( @@ -373,8 +416,6 @@ async def test_run_loop_return_success_for_completed_pod_after_timeout( ) mock_method.return_value = ContainerState.TERMINATED - caplog.set_level(logging.INFO) - generator = trigger.run() actual = await generator.asend(None) assert ( @@ -396,7 +437,7 @@ async def test__get_pod(self, mock_hook, trigger): Test that KubernetesPodTrigger _get_pod is called with the correct arguments. """ - mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) await trigger._get_pod() mock_hook.get_pod.assert_called_with(name=POD_NAME, namespace=NAMESPACE) @@ -423,7 +464,7 @@ async def test__get_pod_retries( the hook.get_pod call. """ - side_effects = [Exception("Test exception") for _ in range(exc_count)] + [MagicMock()] + side_effects = [Exception("Test exception") for _ in range(exc_count)] + [mock.AsyncMock()] mock_hook.get_pod.side_effect = mock.AsyncMock(side_effect=side_effects) # We expect the exception to be raised only if the number of retries is exceeded diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 2634119fc29f6..485bdbd769e4d 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -31,6 +31,7 @@ from airflow.exceptions import AirflowException from airflow.providers.cncf.kubernetes.utils.pod_manager import ( + AsyncPodManager, PodLogsConsumer, PodManager, PodPhase, @@ -477,7 +478,7 @@ async def test_start_pod_raises_fast_error_on_image_error(self): @pytest.mark.asyncio @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) - async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog): + async def test_start_pod_startup_interval_seconds(self, mock_time_sleep): condition_scheduled = mock.MagicMock() condition_scheduled.type = "PodScheduled" condition_scheduled.status = "True" @@ -500,17 +501,21 @@ def pod_state_gen(): schedule_timeout = 30 startup_timeout = 60 mock_pod = MagicMock() - await self.pod_manager.await_pod_start( - pod=mock_pod, - schedule_timeout=schedule_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing - startup_timeout=startup_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing - check_interval=startup_check_interval, - ) - mock_time_sleep.assert_called_with(startup_check_interval) - assert mock_time_sleep.call_count == 3 - assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text - assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text - assert self.pod_manager.stop_watching_events is True + + with mock.patch.object(self.pod_manager.log, "info") as mock_log_info: + await self.pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=schedule_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing + startup_timeout=startup_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing + check_interval=startup_check_interval, + ) + mock_time_sleep.assert_called_with(startup_check_interval) + assert self.pod_manager.stop_watching_events is True + assert mock_time_sleep.call_count == 3 + mock_log_info.assert_any_call( + "::group::Waiting until %ss to get the POD scheduled...", schedule_timeout + ) + mock_log_info.assert_any_call("Waiting %ss to get the POD running...", startup_timeout) @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running") def test_container_is_running(self, container_is_running_mock): @@ -700,6 +705,145 @@ def test_await_xcom_sidecar_container_starts(self, mock_container_is_running): mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar") +class TestAsyncPodManager: + def setup_method(self): + self.mock_async_hook = mock.AsyncMock() + self.async_pod_manager = AsyncPodManager( + async_hook=self.mock_async_hook, + callbacks=[], + ) + + @pytest.mark.asyncio + async def test_start_pod_raises_informative_error_on_scheduled_timeout(self): + pod_response = mock.MagicMock() + pod_response.status.phase = "Pending" + self.mock_async_hook.get_pod.return_value = pod_response + expected_msg = "Pod took too long to be scheduled on the cluster, giving up. More than 0s. Check the pod events in kubernetes." + mock_pod = mock.MagicMock() + with pytest.raises(AirflowException, match=expected_msg): + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=0, + startup_timeout=0, + ) + self.mock_async_hook.get_pod.assert_called() + + @pytest.mark.asyncio + async def test_start_pod_raises_informative_error_on_startup_timeout(self): + pod_response = mock.MagicMock() + pod_response.status.phase = "Pending" + condition = mock.MagicMock() + condition.type = "PodScheduled" + condition.status = "True" + pod_response.status.conditions = [condition] + self.mock_async_hook.get_pod.return_value = pod_response + expected_msg = "Pod took too long to start. More than 0s. Check the pod events in kubernetes." + mock_pod = mock.MagicMock() + with pytest.raises(AirflowException, match=expected_msg): + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=0, + startup_timeout=0, + ) + self.mock_async_hook.get_pod.assert_called() + + @pytest.mark.asyncio + async def test_start_pod_raises_fast_error_on_image_error(self): + pod_response = mock.MagicMock() + pod_response.status.phase = "Pending" + container_status = mock.MagicMock() + waiting_state = mock.MagicMock() + waiting_state.reason = "ErrImagePull" + waiting_state.message = "Test error" + container_status.state.waiting = waiting_state + pod_response.status.container_statuses = [container_status] + self.mock_async_hook.get_pod.return_value = pod_response + expected_msg = f"Pod docker image cannot be pulled, unable to start: {waiting_state.reason}\n{waiting_state.message}" + mock_pod = mock.MagicMock() + with pytest.raises(AirflowException, match=expected_msg): + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=60, + startup_timeout=60, + ) + self.mock_async_hook.get_pod.assert_called() + + @pytest.mark.asyncio + @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) + async def test_start_pod_startup_interval_seconds(self, mock_time_sleep): + condition_scheduled = mock.MagicMock() + condition_scheduled.type = "PodScheduled" + condition_scheduled.status = "True" + + pod_info_pending = mock.MagicMock() + pod_info_pending.status.phase = PodPhase.PENDING + pod_info_pending.status.conditions = [] + + pod_info_pending_scheduled = mock.MagicMock() + pod_info_pending_scheduled.status.phase = PodPhase.PENDING + pod_info_pending_scheduled.status.conditions = [condition_scheduled] + + pod_info_succeeded = mock.MagicMock() + pod_info_succeeded.status.phase = PodPhase.SUCCEEDED + + # Simulate sequence of pod states + self.mock_async_hook.get_pod.side_effect = [ + pod_info_pending, + pod_info_pending_scheduled, + pod_info_pending_scheduled, + pod_info_succeeded, + ] + startup_check_interval = 10 + schedule_timeout = 30 + startup_timeout = 60 + mock_pod = mock.MagicMock() + with mock.patch.object(self.async_pod_manager.log, "info") as mock_log_info: + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=schedule_timeout, + startup_timeout=startup_timeout, + check_interval=startup_check_interval, + ) + assert mock_time_sleep.call_count == 3 + mock_log_info.assert_any_call( + "::group::Waiting until %ss to get the POD scheduled...", schedule_timeout + ) + mock_log_info.assert_any_call("Waiting %ss to get the POD running...", startup_timeout) + assert self.async_pod_manager.stop_watching_events is True + + @pytest.mark.asyncio + @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) + async def test_watch_pod_events(self, mock_time_sleep): + mock_pod = mock.MagicMock() + mock_pod.metadata.name = "test-pod" + mock_pod.metadata.namespace = "default" + + events = mock.MagicMock() + events.items = [] + for id in ["event 1", "event 2"]: + event = mock.MagicMock() + event.message = f"test {id}" + event.involved_object.field_path = f"object {id}" + events.items.append(event) + startup_check_interval = 10 + + def get_pod_events_side_effect(name, namespace): + self.async_pod_manager.stop_watching_events = True + return events + + self.mock_async_hook.get_pod_events.side_effect = get_pod_events_side_effect + + with mock.patch.object(self.async_pod_manager.log, "info") as mock_log_info: + await self.async_pod_manager.watch_pod_events(pod=mock_pod, check_interval=startup_check_interval) + mock_log_info.assert_any_call( + "The Pod has an Event: %s from %s", "test event 1", "object event 1" + ) + mock_log_info.assert_any_call( + "The Pod has an Event: %s from %s", "test event 2", "object event 2" + ) + mock_time_sleep.assert_called_once_with(startup_check_interval) + + class TestPodLogsConsumer: @pytest.mark.parametrize( "chunks, expected_logs",