From 88d14b30592a640ef2fef4b0725f2a887af06dff Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Thu, 16 Oct 2025 07:41:11 +0200 Subject: [PATCH 1/7] Move container-related functions from PodManager to a separate file --- .../cncf/kubernetes/hooks/kubernetes.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 3e746a2118104..ed4c070d40cf9 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -67,6 +67,34 @@ def _load_body_to_dict(body: str) -> dict: raise AirflowException(f"Exception when loading resource definition: {e}\n") return body_dict +class PodOperatorHookProtocol(Protocol): + """ + Protocol to define methods relied upon by KubernetesPodOperator. + + Subclasses of KubernetesPodOperator, such as GKEStartPodOperator, may use + hooks that don't extend KubernetesHook. We use this protocol to document the + methods used by KPO and ensure that these methods exist on such other hooks. + """ + + @property + def core_v1_client(self) -> client.CoreV1Api: + """Get authenticated client object.""" + + @property + def is_in_cluster(self) -> bool: + """Expose whether the hook is configured with ``load_incluster_config`` or not.""" + + def get_pod(self, name: str, namespace: str) -> V1Pod: + """Read pod object from kubernetes API.""" + + def get_namespace(self) -> str | None: + """Return the namespace that defined in the connection.""" + + def get_xcom_sidecar_container_image(self) -> str | None: + """Return the xcom sidecar image that defined in the connection.""" + + def get_xcom_sidecar_container_resources(self) -> str | None: + """Return the xcom sidecar resources that defined in the connection.""" class PodOperatorHookProtocol(Protocol): """ From 6f51259caac1cd7fc60e2f86c6373274ab25d12b Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Thu, 16 Oct 2025 11:16:27 +0200 Subject: [PATCH 2/7] Moved unit tests --- .../src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index ed4c070d40cf9..3dab24fbdbd65 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -67,6 +67,7 @@ def _load_body_to_dict(body: str) -> dict: raise AirflowException(f"Exception when loading resource definition: {e}\n") return body_dict + class PodOperatorHookProtocol(Protocol): """ Protocol to define methods relied upon by KubernetesPodOperator. From 7aa35a3a2e779ace00f1b9b93b6edd3d41a8aea6 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Mon, 20 Oct 2025 11:21:39 +0200 Subject: [PATCH 3/7] Sync and async workflow use the same code to track Pod startup --- .../cncf/kubernetes/hooks/kubernetes.py | 30 ++- .../providers/cncf/kubernetes/triggers/pod.py | 34 +-- .../cncf/kubernetes/utils/pod_manager.py | 220 +++++++++++++----- .../unit/cncf/kubernetes/triggers/test_pod.py | 71 +++++- .../cncf/kubernetes/utils/test_pod_manager.py | 137 +++++++++++ 5 files changed, 402 insertions(+), 90 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 3dab24fbdbd65..aeb5b3034281f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -48,7 +48,7 @@ if TYPE_CHECKING: from kubernetes.client import V1JobList - from kubernetes.client.models import V1Job, V1Pod + from kubernetes.client.models import CoreV1EventList, V1Job, V1Pod LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file kube_config from {}..." @@ -907,12 +907,15 @@ async def get_pod(self, name: str, namespace: str) -> V1Pod: :param namespace: Name of the pod's namespace. """ async with self.get_conn() as connection: - v1_api = async_client.CoreV1Api(connection) - pod: V1Pod = await v1_api.read_namespaced_pod( - name=name, - namespace=namespace, - ) - return pod + try: + v1_api = async_client.CoreV1Api(connection) + pod: V1Pod = await v1_api.read_namespaced_pod( + name=name, + namespace=namespace, + ) + return pod + except HTTPError as e: + raise AirflowException(f"There was an error reading the kubernetes API: {e}") async def delete_pod(self, name: str, namespace: str): """ @@ -961,6 +964,19 @@ async def read_logs(self, name: str, namespace: str): self.log.exception("There was an error reading the kubernetes API.") raise + async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList: + """Get pod's events.""" + async with self.get_conn() as connection: + try: + v1_api = async_client.CoreV1Api(connection) + events: CoreV1EventList = await v1_api.list_namespaced_event( + field_selector=f"involvedObject.name={name}", + namespace=namespace, + ) + return events + except HTTPError as e: + raise AirflowException(f"There was an error reading the kubernetes API: {e}") + async def get_job_status(self, name: str, namespace: str) -> V1Job: """ Get job's status object. diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py index 2646854c5d04d..5207b233a451f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -28,6 +28,7 @@ from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook from airflow.providers.cncf.kubernetes.utils.pod_manager import ( + AsyncPodManager, OnFinishAction, PodLaunchTimeoutException, PodPhase, @@ -69,6 +70,7 @@ class KubernetesPodTrigger(BaseTrigger): :param get_logs: get the stdout of the container as logs of the tasks. :param startup_timeout: timeout in seconds to start up the pod. :param startup_check_interval: interval in seconds to check if the pod has already started. + :param schedule_timeout: timeout in seconds to schedule pod in cluster. :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. @@ -91,7 +93,8 @@ def __init__( in_cluster: bool | None = None, get_logs: bool = True, startup_timeout: int = 120, - startup_check_interval: int = 5, + startup_check_interval: float = 5, + schedule_timeout: int | None = None, on_finish_action: str = "delete_pod", last_log_time: DateTime | None = None, logging_interval: int | None = None, @@ -110,11 +113,12 @@ def __init__( self.get_logs = get_logs self.startup_timeout = startup_timeout self.startup_check_interval = startup_check_interval + # New parameter startup_timeout_seconds adds breaking change, to handle this as smooth as possible just reuse startup time + self.schedule_timeout = schedule_timeout or startup_timeout self.last_log_time = last_log_time self.logging_interval = logging_interval self.on_finish_action = OnFinishAction(on_finish_action) self.trigger_kwargs = trigger_kwargs or {} - self._since_time = None def serialize(self) -> tuple[str, dict[str, Any]]: @@ -133,6 +137,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "get_logs": self.get_logs, "startup_timeout": self.startup_timeout, "startup_check_interval": self.startup_check_interval, + "schedule_timeout": self.schedule_timeout, "trigger_start_time": self.trigger_start_time, "on_finish_action": self.on_finish_action.value, "last_log_time": self.last_log_time, @@ -209,17 +214,16 @@ def _format_exception_description(self, exc: Exception) -> Any: async def _wait_for_pod_start(self) -> ContainerState: """Loops until pod phase leaves ``PENDING`` If timeout is reached, throws error.""" - while True: - pod = await self._get_pod() - if not pod.status.phase == "Pending": - return self.define_container_state(pod) - - delta = datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time - if self.startup_timeout < delta.total_seconds(): - raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase within specified timeout") - - self.log.info("Still waiting for pod to start. The pod state is %s", pod.status.phase) - await asyncio.sleep(self.startup_check_interval) + pod = await self._get_pod() + events_task = self.pod_manager.watch_pod_events(pod, self.startup_check_interval) + pod_start_task = self.pod_manager.await_pod_start( + pod=pod, + schedule_timeout=self.schedule_timeout, + startup_timeout=self.startup_timeout, + check_interval=self.startup_check_interval, + ) + await asyncio.gather(pod_start_task, events_task) + return self.define_container_state(await self._get_pod()) async def _wait_for_container_completion(self) -> TriggerEvent: """ @@ -287,6 +291,10 @@ def hook(self) -> AsyncKubernetesHook: cluster_context=self.cluster_context, ) + @cached_property + def pod_manager(self) -> AsyncPodManager: + return AsyncPodManager(async_hook=self.hook) # , callbacks=self.callbacks) + def define_container_state(self, pod: V1Pod) -> ContainerState: pod_containers = pod.status.container_statuses diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index c9dc45e57f414..172a9fcada8ac 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -60,6 +60,8 @@ from kubernetes.client.models.v1_pod_condition import V1PodCondition from urllib3.response import HTTPResponse + from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook + EMPTY_XCOM_RESULT = "__airflow_xcom_result_empty__" """ @@ -99,6 +101,92 @@ def check_exception_is_kubernetes_api_unauthorized(exc: BaseException): return isinstance(exc, ApiException) and exc.status and str(exc.status) == "401" +async def generic_watch_pod_events( + self, + pod: V1Pod, + check_interval: float = 1, + is_async: bool = True, +) -> None: + """Read pod events and writes into log.""" + num_events = 0 + while not self.stop_watching_events: + events = await self.read_pod_events(pod) if is_async else self.read_pod_events(pod) + for new_event in events.items[num_events:]: + involved_object: V1ObjectReference = new_event.involved_object + self.log.info("The Pod has an Event: %s from %s", new_event.message, involved_object.field_path) + num_events = len(events.items) + await asyncio.sleep(check_interval) + + +async def generic_await_pod_start( + self, + pod, + schedule_timeout: int = 120, + startup_timeout: int = 120, + check_interval: float = 1, + is_async: bool = True, +): + """ + Monitor the startup phase of a Kubernetes pod, waiting for it to leave the ``Pending`` state. + + This function is shared by both PodManager and AsyncPodManager to provide consistent pod startup tracking. + + :param pod: The pod object to monitor. + :param schedule_timeout: Maximum time (in seconds) to wait for the pod to be scheduled. + :param startup_timeout: Maximum time (in seconds) to wait for the pod to start running after being scheduled. + :param check_interval: Interval (in seconds) between status checks. + :param is_async: Set to True if called in an async context; otherwise, False. + """ + self.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout) + pod_was_scheduled = False + start_check_time = time.time() + while True: + remote_pod = await self.read_pod(pod) if is_async else self.read_pod(pod) + pod_status = remote_pod.status + if pod_status.phase != PodPhase.PENDING: + self.stop_watching_events = True + self.log.info("::endgroup::") + break + + # Check for timeout + pod_conditions: list[V1PodCondition] = pod_status.conditions + if pod_conditions and any( + (condition.type == "PodScheduled" and condition.status == "True") for condition in pod_conditions + ): + if not pod_was_scheduled: + # POD was initially scheduled update timeout for getting POD launched + pod_was_scheduled = True + start_check_time = time.time() + self.log.info("Waiting %ss to get the POD running...", startup_timeout) + + if time.time() - start_check_time >= startup_timeout: + self.log.info("::endgroup::") + raise PodLaunchTimeoutException( + f"Pod took too long to start. More than {startup_timeout}s. Check the pod events in kubernetes." + ) + else: + if time.time() - start_check_time >= schedule_timeout: + self.log.info("::endgroup::") + raise PodLaunchTimeoutException( + f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes." + ) + + # Check for general problems to terminate early - ErrImagePull + if pod_status.container_statuses: + for container_status in pod_status.container_statuses: + container_state: V1ContainerState = container_status.state + container_waiting: V1ContainerStateWaiting | None = container_state.waiting + if container_waiting: + if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]: + self.log.info("::endgroup::") + raise PodLaunchFailedException( + f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" + f"\n{container_waiting.message}" + ) + + await asyncio.sleep(check_interval) + + class PodLaunchTimeoutException(AirflowException): """When pod does not leave the ``Pending`` phase within specified timeout.""" @@ -262,16 +350,7 @@ def create_pod(self, pod: V1Pod) -> V1Pod: async def watch_pod_events(self, pod: V1Pod, check_interval: int = 1) -> None: """Read pod events and writes into log.""" - num_events = 0 - while not self.stop_watching_events: - events = self.read_pod_events(pod) - for new_event in events.items[num_events:]: - involved_object: V1ObjectReference = new_event.involved_object - self.log.info( - "The Pod has an Event: %s from %s", new_event.message, involved_object.field_path - ) - num_events = len(events.items) - await asyncio.sleep(check_interval) + await generic_watch_pod_events(self, pod, check_interval, is_async=False) async def await_pod_start( self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: int = 1 @@ -287,55 +366,14 @@ async def await_pod_start( :param check_interval: Interval (in seconds) between checks :return: """ - self.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout) - pod_was_scheduled = False - start_check_time = time.time() - while True: - remote_pod = self.read_pod(pod) - pod_status = remote_pod.status - if pod_status.phase != PodPhase.PENDING: - self.stop_watching_events = True - self.log.info("::endgroup::") - break - - # Check for timeout - pod_conditions: list[V1PodCondition] = pod_status.conditions - if pod_conditions and any( - (condition.type == "PodScheduled" and condition.status == "True") - for condition in pod_conditions - ): - if not pod_was_scheduled: - # POD was initially scheduled update timeout for getting POD launched - pod_was_scheduled = True - start_check_time = time.time() - self.log.info("Waiting %ss to get the POD running...", startup_timeout) - - if time.time() - start_check_time >= startup_timeout: - self.log.info("::endgroup::") - raise PodLaunchFailedException( - f"Pod took too long to start. More than {startup_timeout}s. Check the pod events in kubernetes." - ) - else: - if time.time() - start_check_time >= schedule_timeout: - self.log.info("::endgroup::") - raise PodLaunchFailedException( - f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes." - ) - - # Check for general problems to terminate early - ErrImagePull - if pod_status.container_statuses: - for container_status in pod_status.container_statuses: - container_state: V1ContainerState = container_status.state - container_waiting: V1ContainerStateWaiting | None = container_state.waiting - if container_waiting: - if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]: - self.log.info("::endgroup::") - raise PodLaunchFailedException( - f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" - f"\n{container_waiting.message}" - ) - - await asyncio.sleep(check_interval) + await generic_await_pod_start( + self=self, + pod=pod, + schedule_timeout=schedule_timeout, + startup_timeout=startup_timeout, + check_interval=check_interval, + is_async=False, + ) def _log_message( self, @@ -915,3 +953,67 @@ class OnFinishAction(str, enum.Enum): def is_log_group_marker(line: str) -> bool: """Check if the line is a log group marker like `::group::` or `::endgroup::`.""" return line.startswith("::group::") or line.startswith("::endgroup::") + + +class AsyncPodManager(LoggingMixin): + """Create, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodTriggerer.""" + + def __init__( + self, + async_hook: AsyncKubernetesHook, + callbacks: list[type[KubernetesPodOperatorCallback]] | None = None, + ): + """ + Create the launcher. + + :param kube_client: kubernetes client + :param callbacks: + """ + super().__init__() + self._hook = async_hook + self._watch = watch.Watch() + self._callbacks = callbacks or [] + self.stop_watching_events = False + + @tenacity.retry(stop=tenacity.stop_after_attempt(5), wait=tenacity.wait_exponential(), reraise=True) + async def read_pod(self, pod: V1Pod) -> V1Pod: + """Read POD information.""" + return await self._hook.get_pod( + pod.metadata.name, + pod.metadata.namespace, + ) + + @tenacity.retry(stop=tenacity.stop_after_attempt(5), wait=tenacity.wait_exponential(), reraise=True) + async def read_pod_events(self, pod: V1Pod) -> CoreV1EventList: + """Get pod's events.""" + return await self._hook.get_pod_events( + pod.metadata.name, + pod.metadata.namespace, + ) + + async def watch_pod_events(self, pod: V1Pod, check_interval: float = 1) -> None: + """Read pod events and writes into log.""" + await generic_watch_pod_events(self, pod, check_interval, is_async=True) + + async def await_pod_start( + self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: float = 1 + ) -> None: + """ + Wait for the pod to reach phase other than ``Pending``. + + :param pod: + :param schedule_timeout: Timeout (in seconds) for pod stay in schedule state + (if pod is taking to long in schedule state, fails task) + :param startup_timeout: Timeout (in seconds) for startup of the pod + (if pod is pending for too long after being scheduled, fails task) + :param check_interval: Interval (in seconds) between checks + :return: + """ + await generic_await_pod_start( + self=self, + pod=pod, + schedule_timeout=schedule_timeout, + startup_timeout=startup_timeout, + check_interval=check_interval, + is_async=True, + ) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py index 66fae2524d618..662f652b51545 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py @@ -44,6 +44,7 @@ IN_CLUSTER = False GET_LOGS = True STARTUP_TIMEOUT_SECS = 120 +STARTUP_CHECK_INTERVAL_SECS = 0.1 TRIGGER_START_TIME = datetime.datetime.now(tz=datetime.timezone.utc) FAILED_RESULT_MSG = "Test message that appears when trigger have failed event." BASE_CONTAINER_NAME = "base" @@ -63,11 +64,25 @@ def trigger(): in_cluster=IN_CLUSTER, get_logs=GET_LOGS, startup_timeout=STARTUP_TIMEOUT_SECS, + startup_check_interval=STARTUP_CHECK_INTERVAL_SECS, + schedule_timeout=STARTUP_TIMEOUT_SECS, trigger_start_time=TRIGGER_START_TIME, on_finish_action=ON_FINISH_ACTION, ) +@pytest.fixture +def mock_time_fixture(): + """Fixture to simulate time passage beyond startup timeout.""" + with mock.patch("time.time") as mock_time: + start_time = 1000 + mock_time.side_effect = [ + start_time, + start_time + STARTUP_TIMEOUT_SECS, + ] + yield mock_time + + def get_read_pod_mock_containers(statuses_to_emit=None): """ Emit pods with given phases sequentially. @@ -106,7 +121,8 @@ def test_serialize(self, trigger): "in_cluster": IN_CLUSTER, "get_logs": GET_LOGS, "startup_timeout": STARTUP_TIMEOUT_SECS, - "startup_check_interval": 5, + "startup_check_interval": STARTUP_CHECK_INTERVAL_SECS, + "schedule_timeout": STARTUP_TIMEOUT_SECS, "trigger_start_time": TRIGGER_START_TIME, "on_finish_action": ON_FINISH_ACTION, "last_log_time": None, @@ -138,7 +154,7 @@ async def test_run_loop_return_success_event(self, mock_wait_pod, trigger): async def test_run_loop_return_waiting_event( self, mock_hook, mock_method, mock_wait_pod, trigger, caplog ): - mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) mock_method.return_value = ContainerState.WAITING caplog.set_level(logging.INFO) @@ -157,7 +173,7 @@ async def test_run_loop_return_waiting_event( async def test_run_loop_return_running_event( self, mock_hook, mock_method, mock_wait_pod, trigger, caplog ): - mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) mock_method.return_value = ContainerState.RUNNING caplog.set_level(logging.INFO) @@ -229,7 +245,7 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( Test that KubernetesPodTrigger fires the correct event in case of fail. """ - mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) mock_method.return_value = ContainerState.FAILED caplog.set_level(logging.INFO) @@ -319,14 +335,48 @@ def test_define_container_state_should_execute_successfully( assert expected_state == trigger.define_container_state(pod) + @pytest.mark.asyncio + @mock.patch(f"{TRIGGER_PATH}.define_container_state") + @mock.patch(f"{TRIGGER_PATH}.hook") + async def test_run_loop_read_events_during_start(self, mock_hook, mock_method, trigger, caplog): + event1 = mock.AsyncMock() + event1.message = "event 1" + event1.involved_object.field_path = "object 1" + event2 = mock.AsyncMock() + event2.message = "event 2" + event2.involved_object.field_path = "object 2" + events_list = mock.AsyncMock() + events_list.items = [event1, event2] + + mock_hook.get_pod_events = mock.AsyncMock(return_value=events_list) + + pod_pending = mock.MagicMock() + pod_pending.status.phase = PodPhase.PENDING + pod_succeeded = mock.MagicMock() + pod_succeeded.status.phase = PodPhase.SUCCEEDED + + mock_hook.get_pod = mock.AsyncMock( + side_effect=[pod_pending, pod_pending, pod_succeeded, pod_succeeded] + ) + + mock_method.return_value = ContainerState.TERMINATED + + caplog.set_level(logging.INFO) + + generator = trigger.run() + await generator.asend(None) + + # Check that both events are present in the log output + assert "The Pod has an Event: event 1 from object 1" in caplog.text + assert "The Pod has an Event: event 2 from object 2" in caplog.text + @pytest.mark.asyncio @pytest.mark.parametrize("container_state", [ContainerState.WAITING, ContainerState.UNDEFINED]) @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") async def test_run_loop_return_timeout_event( - self, mock_hook, mock_method, trigger, caplog, container_state + self, mock_hook, mock_method, trigger, caplog, container_state, mock_time_fixture ): - trigger.trigger_start_time = TRIGGER_START_TIME - datetime.timedelta(minutes=2) mock_hook.get_pod.return_value = self._mock_pod_result( mock.MagicMock( status=mock.MagicMock( @@ -346,7 +396,7 @@ async def test_run_loop_return_timeout_event( "name": POD_NAME, "namespace": NAMESPACE, "status": "timeout", - "message": "Pod did not leave 'Pending' phase within specified timeout", + "message": "Pod took too long to be scheduled on the cluster, giving up. More than 120s. Check the pod events in kubernetes.", } ) == actual @@ -356,14 +406,13 @@ async def test_run_loop_return_timeout_event( @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") async def test_run_loop_return_success_for_completed_pod_after_timeout( - self, mock_hook, mock_method, trigger, caplog + self, mock_hook, mock_method, trigger, caplog, mock_time_fixture ): """ Test that the trigger correctly recognizes the pod is not pending even after the timeout has been reached. This may happen when a new triggerer process takes over the trigger, the pod already left pending state and the timeout has been reached. """ - trigger.trigger_start_time = TRIGGER_START_TIME - datetime.timedelta(minutes=2) mock_hook.get_pod.return_value = self._mock_pod_result( mock.MagicMock( status=mock.MagicMock( @@ -396,7 +445,7 @@ async def test__get_pod(self, mock_hook, trigger): Test that KubernetesPodTrigger _get_pod is called with the correct arguments. """ - mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) await trigger._get_pod() mock_hook.get_pod.assert_called_with(name=POD_NAME, namespace=NAMESPACE) @@ -423,7 +472,7 @@ async def test__get_pod_retries( the hook.get_pod call. """ - side_effects = [Exception("Test exception") for _ in range(exc_count)] + [MagicMock()] + side_effects = [Exception("Test exception") for _ in range(exc_count)] + [mock.AsyncMock()] mock_hook.get_pod.side_effect = mock.AsyncMock(side_effect=side_effects) # We expect the exception to be raised only if the number of retries is exceeded diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 2634119fc29f6..9dc589d123e61 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -31,6 +31,7 @@ from airflow.exceptions import AirflowException from airflow.providers.cncf.kubernetes.utils.pod_manager import ( + AsyncPodManager, PodLogsConsumer, PodManager, PodPhase, @@ -700,6 +701,142 @@ def test_await_xcom_sidecar_container_starts(self, mock_container_is_running): mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar") +class TestAsyncPodManager: + def setup_method(self): + self.mock_async_hook = mock.AsyncMock() + self.async_pod_manager = AsyncPodManager( + async_hook=self.mock_async_hook, + callbacks=[], + ) + + @pytest.mark.asyncio + async def test_start_pod_raises_informative_error_on_scheduled_timeout(self): + pod_response = mock.MagicMock() + pod_response.status.phase = "Pending" + self.mock_async_hook.get_pod.return_value = pod_response + expected_msg = "Pod took too long to be scheduled on the cluster, giving up. More than 0s. Check the pod events in kubernetes." + mock_pod = mock.MagicMock() + with pytest.raises(AirflowException, match=expected_msg): + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=0, + startup_timeout=0, + ) + self.mock_async_hook.get_pod.assert_called() + + @pytest.mark.asyncio + async def test_start_pod_raises_informative_error_on_startup_timeout(self): + pod_response = mock.MagicMock() + pod_response.status.phase = "Pending" + condition = mock.MagicMock() + condition.type = "PodScheduled" + condition.status = "True" + pod_response.status.conditions = [condition] + self.mock_async_hook.get_pod.return_value = pod_response + expected_msg = "Pod took too long to start. More than 0s. Check the pod events in kubernetes." + mock_pod = mock.MagicMock() + with pytest.raises(AirflowException, match=expected_msg): + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=0, + startup_timeout=0, + ) + self.mock_async_hook.get_pod.assert_called() + + @pytest.mark.asyncio + async def test_start_pod_raises_fast_error_on_image_error(self): + pod_response = mock.MagicMock() + pod_response.status.phase = "Pending" + container_status = mock.MagicMock() + waiting_state = mock.MagicMock() + waiting_state.reason = "ErrImagePull" + waiting_state.message = "Test error" + container_status.state.waiting = waiting_state + pod_response.status.container_statuses = [container_status] + self.mock_async_hook.get_pod.return_value = pod_response + expected_msg = f"Pod docker image cannot be pulled, unable to start: {waiting_state.reason}\n{waiting_state.message}" + mock_pod = mock.MagicMock() + with pytest.raises(AirflowException, match=expected_msg): + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=60, + startup_timeout=60, + ) + self.mock_async_hook.get_pod.assert_called() + + @pytest.mark.asyncio + @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) + async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog): + condition_scheduled = mock.MagicMock() + condition_scheduled.type = "PodScheduled" + condition_scheduled.status = "True" + + pod_info_pending = mock.MagicMock() + pod_info_pending.status.phase = PodPhase.PENDING + pod_info_pending.status.conditions = [] + + pod_info_pending_scheduled = mock.MagicMock() + pod_info_pending_scheduled.status.phase = PodPhase.PENDING + pod_info_pending_scheduled.status.conditions = [condition_scheduled] + + pod_info_succeeded = mock.MagicMock() + pod_info_succeeded.status.phase = PodPhase.SUCCEEDED + + # Simulate sequence of pod states + self.mock_async_hook.get_pod.side_effect = [ + pod_info_pending, + pod_info_pending_scheduled, + pod_info_pending_scheduled, + pod_info_succeeded, + ] + startup_check_interval = 10 + schedule_timeout = 30 + startup_timeout = 60 + mock_pod = mock.MagicMock() + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=schedule_timeout, + startup_timeout=startup_timeout, + check_interval=startup_check_interval, + ) + assert mock_time_sleep.call_count == 3 + assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text + assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text + assert self.async_pod_manager.stop_watching_events is True + + @pytest.mark.asyncio + @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) + async def test_watch_pod_events(self, mock_time_sleep): + mock_pod = mock.MagicMock() + mock_pod.metadata.name = "test-pod" + mock_pod.metadata.namespace = "default" + + events = mock.MagicMock() + events.items = [] + for id in ["event 1", "event 2"]: + event = mock.MagicMock() + event.message = f"test {id}" + event.involved_object.field_path = f"object {id}" + events.items.append(event) + startup_check_interval = 10 + + def get_pod_events_side_effect(name, namespace): + self.async_pod_manager.stop_watching_events = True + return events + + self.mock_async_hook.get_pod_events.side_effect = get_pod_events_side_effect + + with mock.patch.object(type(self.async_pod_manager), "log", create=True) as log_mock: + await self.async_pod_manager.watch_pod_events(pod=mock_pod, check_interval=startup_check_interval) + log_mock.info.assert_any_call( + "The Pod has an Event: %s from %s", "test event 1", "object event 1" + ) + log_mock.info.assert_any_call( + "The Pod has an Event: %s from %s", "test event 2", "object event 2" + ) + mock_time_sleep.assert_called_once_with(startup_check_interval) + + class TestPodLogsConsumer: @pytest.mark.parametrize( "chunks, expected_logs", From 18faa475630f883a27753730a36eb5322b8e323b Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Tue, 21 Oct 2025 14:35:12 +0200 Subject: [PATCH 4/7] Reworked unit tests and pod startup logic --- .../providers/cncf/kubernetes/exceptions.py | 4 ++ .../cncf/kubernetes/hooks/kubernetes.py | 34 +--------- .../cncf/kubernetes/operators/pod.py | 1 + .../providers/cncf/kubernetes/triggers/pod.py | 7 +-- .../cncf/kubernetes/utils/pod_manager.py | 63 +++++++++++-------- .../unit/cncf/kubernetes/triggers/test_pod.py | 22 +++---- .../cncf/kubernetes/utils/test_pod_manager.py | 61 ++++++++++-------- 7 files changed, 90 insertions(+), 102 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py index c0b6ad83a3fdc..82015cc983ce8 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py @@ -27,3 +27,7 @@ class PodMutationHookException(AirflowException): class PodReconciliationError(AirflowException): """Raised when an error is encountered while trying to merge pod configs.""" + + +class KubernetesApiError(AirflowException): + """Raised when an error is encountered while trying access Kubernetes API.""" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index aeb5b3034281f..506a59439c4a9 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -37,6 +37,7 @@ from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.models import Connection +from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation from airflow.providers.cncf.kubernetes.utils.container import ( @@ -68,35 +69,6 @@ def _load_body_to_dict(body: str) -> dict: return body_dict -class PodOperatorHookProtocol(Protocol): - """ - Protocol to define methods relied upon by KubernetesPodOperator. - - Subclasses of KubernetesPodOperator, such as GKEStartPodOperator, may use - hooks that don't extend KubernetesHook. We use this protocol to document the - methods used by KPO and ensure that these methods exist on such other hooks. - """ - - @property - def core_v1_client(self) -> client.CoreV1Api: - """Get authenticated client object.""" - - @property - def is_in_cluster(self) -> bool: - """Expose whether the hook is configured with ``load_incluster_config`` or not.""" - - def get_pod(self, name: str, namespace: str) -> V1Pod: - """Read pod object from kubernetes API.""" - - def get_namespace(self) -> str | None: - """Return the namespace that defined in the connection.""" - - def get_xcom_sidecar_container_image(self) -> str | None: - """Return the xcom sidecar image that defined in the connection.""" - - def get_xcom_sidecar_container_resources(self) -> str | None: - """Return the xcom sidecar resources that defined in the connection.""" - class PodOperatorHookProtocol(Protocol): """ Protocol to define methods relied upon by KubernetesPodOperator. @@ -915,7 +887,7 @@ async def get_pod(self, name: str, namespace: str) -> V1Pod: ) return pod except HTTPError as e: - raise AirflowException(f"There was an error reading the kubernetes API: {e}") + raise KubernetesApiError from e async def delete_pod(self, name: str, namespace: str): """ @@ -975,7 +947,7 @@ async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList: ) return events except HTTPError as e: - raise AirflowException(f"There was an error reading the kubernetes API: {e}") + raise KubernetesApiError from e async def get_job_status(self, name: str, namespace: str) -> V1Job: """ diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index bee7eb60049d1..d5cc9e27caf9a 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -867,6 +867,7 @@ def invoke_defer_method(self, last_log_time: DateTime | None = None) -> None: get_logs=self.get_logs, startup_timeout=self.startup_timeout_seconds, startup_check_interval=self.startup_check_interval_seconds, + schedule_timeout=self.schedule_timeout_seconds, base_container_name=self.base_container_name, on_finish_action=self.on_finish_action.value, last_log_time=last_log_time, diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py index 5207b233a451f..cf8530e7ab438 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -94,7 +94,7 @@ def __init__( get_logs: bool = True, startup_timeout: int = 120, startup_check_interval: float = 5, - schedule_timeout: int | None = None, + schedule_timeout: int = 120, on_finish_action: str = "delete_pod", last_log_time: DateTime | None = None, logging_interval: int | None = None, @@ -113,8 +113,7 @@ def __init__( self.get_logs = get_logs self.startup_timeout = startup_timeout self.startup_check_interval = startup_check_interval - # New parameter startup_timeout_seconds adds breaking change, to handle this as smooth as possible just reuse startup time - self.schedule_timeout = schedule_timeout or startup_timeout + self.schedule_timeout = schedule_timeout self.last_log_time = last_log_time self.logging_interval = logging_interval self.on_finish_action = OnFinishAction(on_finish_action) @@ -293,7 +292,7 @@ def hook(self) -> AsyncKubernetesHook: @cached_property def pod_manager(self) -> AsyncPodManager: - return AsyncPodManager(async_hook=self.hook) # , callbacks=self.callbacks) + return AsyncPodManager(async_hook=self.hook) def define_container_state(self, pod: V1Pod) -> ContainerState: pod_containers = pod.status.container_statuses diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 172a9fcada8ac..ca39ffee238a7 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -101,51 +101,62 @@ def check_exception_is_kubernetes_api_unauthorized(exc: BaseException): return isinstance(exc, ApiException) and exc.status and str(exc.status) == "401" -async def generic_watch_pod_events( - self, +async def watch_pod_events( + pod_manager: PodManager | AsyncPodManager, pod: V1Pod, check_interval: float = 1, - is_async: bool = True, ) -> None: - """Read pod events and writes into log.""" + """ + Read pod events and write them to the log. + + This function supports both asynchronous and synchronous pod managers. + + :param pod_manager: The pod manager instance (PodManager or AsyncPodManager). + :param pod: The pod object to monitor. + :param check_interval: Interval (in seconds) between checks. + """ num_events = 0 - while not self.stop_watching_events: - events = await self.read_pod_events(pod) if is_async else self.read_pod_events(pod) + is_async = isinstance(pod_manager, AsyncPodManager) + while not pod_manager.stop_watching_events: + events = await pod_manager.read_pod_events(pod) if is_async else pod_manager.read_pod_events(pod) for new_event in events.items[num_events:]: involved_object: V1ObjectReference = new_event.involved_object - self.log.info("The Pod has an Event: %s from %s", new_event.message, involved_object.field_path) + pod_manager.log.info( + "The Pod has an Event: %s from %s", new_event.message, involved_object.field_path + ) num_events = len(events.items) await asyncio.sleep(check_interval) -async def generic_await_pod_start( - self, - pod, +async def await_pod_start( + pod_manager: PodManager | AsyncPodManager, + pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: float = 1, - is_async: bool = True, ): """ Monitor the startup phase of a Kubernetes pod, waiting for it to leave the ``Pending`` state. This function is shared by both PodManager and AsyncPodManager to provide consistent pod startup tracking. + :param pod_manager: The pod manager instance (PodManager or AsyncPodManager). :param pod: The pod object to monitor. :param schedule_timeout: Maximum time (in seconds) to wait for the pod to be scheduled. :param startup_timeout: Maximum time (in seconds) to wait for the pod to start running after being scheduled. :param check_interval: Interval (in seconds) between status checks. :param is_async: Set to True if called in an async context; otherwise, False. """ - self.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout) + pod_manager.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout) pod_was_scheduled = False start_check_time = time.time() + is_async = isinstance(pod_manager, AsyncPodManager) while True: - remote_pod = await self.read_pod(pod) if is_async else self.read_pod(pod) + remote_pod = await pod_manager.read_pod(pod) if is_async else pod_manager.read_pod(pod) pod_status = remote_pod.status if pod_status.phase != PodPhase.PENDING: - self.stop_watching_events = True - self.log.info("::endgroup::") + pod_manager.stop_watching_events = True + pod_manager.log.info("::endgroup::") break # Check for timeout @@ -157,16 +168,16 @@ async def generic_await_pod_start( # POD was initially scheduled update timeout for getting POD launched pod_was_scheduled = True start_check_time = time.time() - self.log.info("Waiting %ss to get the POD running...", startup_timeout) + pod_manager.log.info("Waiting %ss to get the POD running...", startup_timeout) if time.time() - start_check_time >= startup_timeout: - self.log.info("::endgroup::") + pod_manager.log.info("::endgroup::") raise PodLaunchTimeoutException( f"Pod took too long to start. More than {startup_timeout}s. Check the pod events in kubernetes." ) else: if time.time() - start_check_time >= schedule_timeout: - self.log.info("::endgroup::") + pod_manager.log.info("::endgroup::") raise PodLaunchTimeoutException( f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes." ) @@ -178,7 +189,7 @@ async def generic_await_pod_start( container_waiting: V1ContainerStateWaiting | None = container_state.waiting if container_waiting: if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]: - self.log.info("::endgroup::") + pod_manager.log.info("::endgroup::") raise PodLaunchFailedException( f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" f"\n{container_waiting.message}" @@ -350,7 +361,7 @@ def create_pod(self, pod: V1Pod) -> V1Pod: async def watch_pod_events(self, pod: V1Pod, check_interval: int = 1) -> None: """Read pod events and writes into log.""" - await generic_watch_pod_events(self, pod, check_interval, is_async=False) + await watch_pod_events(pod_manager=self, pod=pod, check_interval=check_interval) async def await_pod_start( self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: int = 1 @@ -366,13 +377,12 @@ async def await_pod_start( :param check_interval: Interval (in seconds) between checks :return: """ - await generic_await_pod_start( - self=self, + await await_pod_start( + pod_manager=self, pod=pod, schedule_timeout=schedule_timeout, startup_timeout=startup_timeout, check_interval=check_interval, - is_async=False, ) def _log_message( @@ -993,7 +1003,7 @@ async def read_pod_events(self, pod: V1Pod) -> CoreV1EventList: async def watch_pod_events(self, pod: V1Pod, check_interval: float = 1) -> None: """Read pod events and writes into log.""" - await generic_watch_pod_events(self, pod, check_interval, is_async=True) + await watch_pod_events(pod_manager=self, pod=pod, check_interval=check_interval) async def await_pod_start( self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: float = 1 @@ -1009,11 +1019,10 @@ async def await_pod_start( :param check_interval: Interval (in seconds) between checks :return: """ - await generic_await_pod_start( - self=self, + await await_pod_start( + pod_manager=self, pod=pod, schedule_timeout=schedule_timeout, startup_timeout=startup_timeout, check_interval=check_interval, - is_async=True, ) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py index 662f652b51545..060d53314d4ff 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py @@ -338,7 +338,7 @@ def test_define_container_state_should_execute_successfully( @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") - async def test_run_loop_read_events_during_start(self, mock_hook, mock_method, trigger, caplog): + async def test_run_loop_read_events_during_start(self, mock_hook, mock_method, trigger): event1 = mock.AsyncMock() event1.message = "event 1" event1.involved_object.field_path = "object 1" @@ -361,21 +361,19 @@ async def test_run_loop_read_events_during_start(self, mock_hook, mock_method, t mock_method.return_value = ContainerState.TERMINATED - caplog.set_level(logging.INFO) - - generator = trigger.run() - await generator.asend(None) + with mock.patch.object(trigger.pod_manager.log, "info") as mock_log_info: + generator = trigger.run() + await generator.asend(None) - # Check that both events are present in the log output - assert "The Pod has an Event: event 1 from object 1" in caplog.text - assert "The Pod has an Event: event 2 from object 2" in caplog.text + mock_log_info.assert_any_call("The Pod has an Event: %s from %s", "event 1", "object 1") + mock_log_info.assert_any_call("The Pod has an Event: %s from %s", "event 2", "object 2") @pytest.mark.asyncio @pytest.mark.parametrize("container_state", [ContainerState.WAITING, ContainerState.UNDEFINED]) @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") async def test_run_loop_return_timeout_event( - self, mock_hook, mock_method, trigger, caplog, container_state, mock_time_fixture + self, mock_hook, mock_method, trigger, container_state, mock_time_fixture ): mock_hook.get_pod.return_value = self._mock_pod_result( mock.MagicMock( @@ -386,8 +384,6 @@ async def test_run_loop_return_timeout_event( ) mock_method.return_value = container_state - caplog.set_level(logging.INFO) - generator = trigger.run() actual = await generator.asend(None) assert ( @@ -406,7 +402,7 @@ async def test_run_loop_return_timeout_event( @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") async def test_run_loop_return_success_for_completed_pod_after_timeout( - self, mock_hook, mock_method, trigger, caplog, mock_time_fixture + self, mock_hook, mock_method, trigger, mock_time_fixture ): """ Test that the trigger correctly recognizes the pod is not pending even after the timeout has been @@ -422,8 +418,6 @@ async def test_run_loop_return_success_for_completed_pod_after_timeout( ) mock_method.return_value = ContainerState.TERMINATED - caplog.set_level(logging.INFO) - generator = trigger.run() actual = await generator.asend(None) assert ( diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 9dc589d123e61..e0da0bc104cbd 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -478,7 +478,7 @@ async def test_start_pod_raises_fast_error_on_image_error(self): @pytest.mark.asyncio @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) - async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog): + async def test_start_pod_startup_interval_seconds(self, mock_time_sleep): condition_scheduled = mock.MagicMock() condition_scheduled.type = "PodScheduled" condition_scheduled.status = "True" @@ -501,17 +501,21 @@ def pod_state_gen(): schedule_timeout = 30 startup_timeout = 60 mock_pod = MagicMock() - await self.pod_manager.await_pod_start( - pod=mock_pod, - schedule_timeout=schedule_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing - startup_timeout=startup_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing - check_interval=startup_check_interval, - ) - mock_time_sleep.assert_called_with(startup_check_interval) - assert mock_time_sleep.call_count == 3 - assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text - assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text - assert self.pod_manager.stop_watching_events is True + + with mock.patch.object(self.pod_manager.log, "info") as mock_log_info: + await self.pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=schedule_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing + startup_timeout=startup_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing + check_interval=startup_check_interval, + ) + mock_time_sleep.assert_called_with(startup_check_interval) + assert self.pod_manager.stop_watching_events is True + assert mock_time_sleep.call_count == 3 + mock_log_info.assert_any_call( + "::group::Waiting until %ss to get the POD scheduled...", schedule_timeout + ) + mock_log_info.assert_any_call("Waiting %ss to get the POD running...", startup_timeout) @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running") def test_container_is_running(self, container_is_running_mock): @@ -766,7 +770,7 @@ async def test_start_pod_raises_fast_error_on_image_error(self): @pytest.mark.asyncio @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) - async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog): + async def test_start_pod_startup_interval_seconds(self, mock_time_sleep): condition_scheduled = mock.MagicMock() condition_scheduled.type = "PodScheduled" condition_scheduled.status = "True" @@ -793,16 +797,21 @@ async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog) schedule_timeout = 30 startup_timeout = 60 mock_pod = mock.MagicMock() - await self.async_pod_manager.await_pod_start( - pod=mock_pod, - schedule_timeout=schedule_timeout, - startup_timeout=startup_timeout, - check_interval=startup_check_interval, - ) - assert mock_time_sleep.call_count == 3 - assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text - assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text - assert self.async_pod_manager.stop_watching_events is True + with mock.patch.object(self.async_pod_manager.log, "info") as mock_log_info: + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=schedule_timeout, + startup_timeout=startup_timeout, + check_interval=startup_check_interval, + ) + assert mock_time_sleep.call_count == 3 + mock_log_info.assert_any_call( + "::group::Waiting until %ss to get the POD scheduled...", schedule_timeout + ) + mock_log_info.assert_any_call("Waiting %ss to get the POD running...", startup_timeout) + # assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text + # assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text + assert self.async_pod_manager.stop_watching_events is True @pytest.mark.asyncio @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) @@ -826,12 +835,12 @@ def get_pod_events_side_effect(name, namespace): self.mock_async_hook.get_pod_events.side_effect = get_pod_events_side_effect - with mock.patch.object(type(self.async_pod_manager), "log", create=True) as log_mock: + with mock.patch.object(self.async_pod_manager.log, "info") as mock_log_info: await self.async_pod_manager.watch_pod_events(pod=mock_pod, check_interval=startup_check_interval) - log_mock.info.assert_any_call( + mock_log_info.assert_any_call( "The Pod has an Event: %s from %s", "test event 1", "object event 1" ) - log_mock.info.assert_any_call( + mock_log_info.assert_any_call( "The Pod has an Event: %s from %s", "test event 2", "object event 2" ) mock_time_sleep.assert_called_once_with(startup_check_interval) From 0f5beccf7766a040959c681ecf48c3eb37132d46 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Wed, 22 Oct 2025 11:11:27 +0200 Subject: [PATCH 5/7] Add api permission error detection for triggerer --- .../providers/cncf/kubernetes/exceptions.py | 4 ++++ .../cncf/kubernetes/hooks/kubernetes.py | 6 +++++- .../providers/cncf/kubernetes/triggers/pod.py | 17 +++++++++++++++++ .../cncf/kubernetes/utils/pod_manager.py | 10 ++++++++-- 4 files changed, 34 insertions(+), 3 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py index 82015cc983ce8..5503c743797ad 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py @@ -31,3 +31,7 @@ class PodReconciliationError(AirflowException): class KubernetesApiError(AirflowException): """Raised when an error is encountered while trying access Kubernetes API.""" + + +class KubernetesApiPermissionError(AirflowException): + """Raised when an error is encountered while trying access Kubernetes API.""" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 506a59439c4a9..e08d2bb8aaa16 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -37,7 +37,7 @@ from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.models import Connection -from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError +from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError, KubernetesApiPermissionError from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation from airflow.providers.cncf.kubernetes.utils.container import ( @@ -887,6 +887,8 @@ async def get_pod(self, name: str, namespace: str) -> V1Pod: ) return pod except HTTPError as e: + if hasattr(e, "status") and e.status == 403: + raise KubernetesApiPermissionError("Permission denied (403) from Kubernetes API.") from e raise KubernetesApiError from e async def delete_pod(self, name: str, namespace: str): @@ -947,6 +949,8 @@ async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList: ) return events except HTTPError as e: + if hasattr(e, "status") and e.status == 403: + raise KubernetesApiPermissionError("Permission denied (403) from Kubernetes API.") from e raise KubernetesApiError from e async def get_job_status(self, name: str, namespace: str) -> V1Job: diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py index cf8530e7ab438..a05a4ff05d5fb 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -26,6 +26,7 @@ import tenacity +from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiPermissionError from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook from airflow.providers.cncf.kubernetes.utils.pod_manager import ( AsyncPodManager, @@ -186,6 +187,22 @@ async def run(self) -> AsyncIterator[TriggerEvent]: } ) return + except KubernetesApiPermissionError as e: + message = ( + "Kubernetes API permission error: The triggerer may not have sufficient permissions to monitor or delete pods. " + "Please ensure the triggerer's service account is included in the 'pod-launcher-role' as defined in the latest Airflow Helm chart. " + f"Original error: {e}" + ) + yield TriggerEvent( + { + "name": self.pod_name, + "namespace": self.pod_namespace, + "status": "error", + "message": message, + **self.trigger_kwargs, + } + ) + return except Exception as e: yield TriggerEvent( { diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index ca39ffee238a7..789ad4db79e6c 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -118,7 +118,10 @@ async def watch_pod_events( num_events = 0 is_async = isinstance(pod_manager, AsyncPodManager) while not pod_manager.stop_watching_events: - events = await pod_manager.read_pod_events(pod) if is_async else pod_manager.read_pod_events(pod) + if is_async: + events = await pod_manager.read_pod_events(pod) + else: + events = pod_manager.read_pod_events(pod) for new_event in events.items[num_events:]: involved_object: V1ObjectReference = new_event.involved_object pod_manager.log.info( @@ -152,7 +155,10 @@ async def await_pod_start( start_check_time = time.time() is_async = isinstance(pod_manager, AsyncPodManager) while True: - remote_pod = await pod_manager.read_pod(pod) if is_async else pod_manager.read_pod(pod) + if is_async: + remote_pod = await pod_manager.read_pod(pod) + else: + remote_pod = pod_manager.read_pod(pod) pod_status = remote_pod.status if pod_status.phase != PodPhase.PENDING: pod_manager.stop_watching_events = True From 544cb4c186e17da9af0c47b2fcae07251809f11e Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Tue, 28 Oct 2025 08:33:52 +0100 Subject: [PATCH 6/7] Fix pytest fixture --- .../tests/unit/cncf/kubernetes/triggers/test_pod.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py index 060d53314d4ff..fc477fe0cf763 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py @@ -77,8 +77,7 @@ def mock_time_fixture(): with mock.patch("time.time") as mock_time: start_time = 1000 mock_time.side_effect = [ - start_time, - start_time + STARTUP_TIMEOUT_SECS, + *(start_time + STARTUP_TIMEOUT_SECS * n for n in range(5)), ] yield mock_time @@ -383,7 +382,6 @@ async def test_run_loop_return_timeout_event( ) ) mock_method.return_value = container_state - generator = trigger.run() actual = await generator.asend(None) assert ( From 767e03dea039ef1329bbbcad2814ff6e7097b687 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Tue, 28 Oct 2025 10:12:33 +0100 Subject: [PATCH 7/7] Removed not requried code --- .../tests/unit/cncf/kubernetes/utils/test_pod_manager.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index e0da0bc104cbd..485bdbd769e4d 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -809,8 +809,6 @@ async def test_start_pod_startup_interval_seconds(self, mock_time_sleep): "::group::Waiting until %ss to get the POD scheduled...", schedule_timeout ) mock_log_info.assert_any_call("Waiting %ss to get the POD running...", startup_timeout) - # assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text - # assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text assert self.async_pod_manager.stop_watching_events is True @pytest.mark.asyncio