From 88d14b30592a640ef2fef4b0725f2a887af06dff Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Thu, 16 Oct 2025 07:41:11 +0200 Subject: [PATCH 01/14] Move container-related functions from PodManager to a separate file --- .../cncf/kubernetes/hooks/kubernetes.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 3e746a2118104..ed4c070d40cf9 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -67,6 +67,34 @@ def _load_body_to_dict(body: str) -> dict: raise AirflowException(f"Exception when loading resource definition: {e}\n") return body_dict +class PodOperatorHookProtocol(Protocol): + """ + Protocol to define methods relied upon by KubernetesPodOperator. + + Subclasses of KubernetesPodOperator, such as GKEStartPodOperator, may use + hooks that don't extend KubernetesHook. We use this protocol to document the + methods used by KPO and ensure that these methods exist on such other hooks. + """ + + @property + def core_v1_client(self) -> client.CoreV1Api: + """Get authenticated client object.""" + + @property + def is_in_cluster(self) -> bool: + """Expose whether the hook is configured with ``load_incluster_config`` or not.""" + + def get_pod(self, name: str, namespace: str) -> V1Pod: + """Read pod object from kubernetes API.""" + + def get_namespace(self) -> str | None: + """Return the namespace that defined in the connection.""" + + def get_xcom_sidecar_container_image(self) -> str | None: + """Return the xcom sidecar image that defined in the connection.""" + + def get_xcom_sidecar_container_resources(self) -> str | None: + """Return the xcom sidecar resources that defined in the connection.""" class PodOperatorHookProtocol(Protocol): """ From 6f51259caac1cd7fc60e2f86c6373274ab25d12b Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Thu, 16 Oct 2025 11:16:27 +0200 Subject: [PATCH 02/14] Moved unit tests --- .../src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py | 1 + 1 file changed, 1 insertion(+) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index ed4c070d40cf9..3dab24fbdbd65 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -67,6 +67,7 @@ def _load_body_to_dict(body: str) -> dict: raise AirflowException(f"Exception when loading resource definition: {e}\n") return body_dict + class PodOperatorHookProtocol(Protocol): """ Protocol to define methods relied upon by KubernetesPodOperator. From 7aa35a3a2e779ace00f1b9b93b6edd3d41a8aea6 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Mon, 20 Oct 2025 11:21:39 +0200 Subject: [PATCH 03/14] Sync and async workflow use the same code to track Pod startup --- .../cncf/kubernetes/hooks/kubernetes.py | 30 ++- .../providers/cncf/kubernetes/triggers/pod.py | 34 +-- .../cncf/kubernetes/utils/pod_manager.py | 220 +++++++++++++----- .../unit/cncf/kubernetes/triggers/test_pod.py | 71 +++++- .../cncf/kubernetes/utils/test_pod_manager.py | 137 +++++++++++ 5 files changed, 402 insertions(+), 90 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 3dab24fbdbd65..aeb5b3034281f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -48,7 +48,7 @@ if TYPE_CHECKING: from kubernetes.client import V1JobList - from kubernetes.client.models import V1Job, V1Pod + from kubernetes.client.models import CoreV1EventList, V1Job, V1Pod LOADING_KUBE_CONFIG_FILE_RESOURCE = "Loading Kubernetes configuration file kube_config from {}..." @@ -907,12 +907,15 @@ async def get_pod(self, name: str, namespace: str) -> V1Pod: :param namespace: Name of the pod's namespace. """ async with self.get_conn() as connection: - v1_api = async_client.CoreV1Api(connection) - pod: V1Pod = await v1_api.read_namespaced_pod( - name=name, - namespace=namespace, - ) - return pod + try: + v1_api = async_client.CoreV1Api(connection) + pod: V1Pod = await v1_api.read_namespaced_pod( + name=name, + namespace=namespace, + ) + return pod + except HTTPError as e: + raise AirflowException(f"There was an error reading the kubernetes API: {e}") async def delete_pod(self, name: str, namespace: str): """ @@ -961,6 +964,19 @@ async def read_logs(self, name: str, namespace: str): self.log.exception("There was an error reading the kubernetes API.") raise + async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList: + """Get pod's events.""" + async with self.get_conn() as connection: + try: + v1_api = async_client.CoreV1Api(connection) + events: CoreV1EventList = await v1_api.list_namespaced_event( + field_selector=f"involvedObject.name={name}", + namespace=namespace, + ) + return events + except HTTPError as e: + raise AirflowException(f"There was an error reading the kubernetes API: {e}") + async def get_job_status(self, name: str, namespace: str) -> V1Job: """ Get job's status object. diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py index 2646854c5d04d..5207b233a451f 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -28,6 +28,7 @@ from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook from airflow.providers.cncf.kubernetes.utils.pod_manager import ( + AsyncPodManager, OnFinishAction, PodLaunchTimeoutException, PodPhase, @@ -69,6 +70,7 @@ class KubernetesPodTrigger(BaseTrigger): :param get_logs: get the stdout of the container as logs of the tasks. :param startup_timeout: timeout in seconds to start up the pod. :param startup_check_interval: interval in seconds to check if the pod has already started. + :param schedule_timeout: timeout in seconds to schedule pod in cluster. :param on_finish_action: What to do when the pod reaches its final state, or the execution is interrupted. If "delete_pod", the pod will be deleted regardless its state; if "delete_succeeded_pod", only succeeded pod will be deleted. You can set to "keep_pod" to keep the pod. @@ -91,7 +93,8 @@ def __init__( in_cluster: bool | None = None, get_logs: bool = True, startup_timeout: int = 120, - startup_check_interval: int = 5, + startup_check_interval: float = 5, + schedule_timeout: int | None = None, on_finish_action: str = "delete_pod", last_log_time: DateTime | None = None, logging_interval: int | None = None, @@ -110,11 +113,12 @@ def __init__( self.get_logs = get_logs self.startup_timeout = startup_timeout self.startup_check_interval = startup_check_interval + # New parameter startup_timeout_seconds adds breaking change, to handle this as smooth as possible just reuse startup time + self.schedule_timeout = schedule_timeout or startup_timeout self.last_log_time = last_log_time self.logging_interval = logging_interval self.on_finish_action = OnFinishAction(on_finish_action) self.trigger_kwargs = trigger_kwargs or {} - self._since_time = None def serialize(self) -> tuple[str, dict[str, Any]]: @@ -133,6 +137,7 @@ def serialize(self) -> tuple[str, dict[str, Any]]: "get_logs": self.get_logs, "startup_timeout": self.startup_timeout, "startup_check_interval": self.startup_check_interval, + "schedule_timeout": self.schedule_timeout, "trigger_start_time": self.trigger_start_time, "on_finish_action": self.on_finish_action.value, "last_log_time": self.last_log_time, @@ -209,17 +214,16 @@ def _format_exception_description(self, exc: Exception) -> Any: async def _wait_for_pod_start(self) -> ContainerState: """Loops until pod phase leaves ``PENDING`` If timeout is reached, throws error.""" - while True: - pod = await self._get_pod() - if not pod.status.phase == "Pending": - return self.define_container_state(pod) - - delta = datetime.datetime.now(tz=datetime.timezone.utc) - self.trigger_start_time - if self.startup_timeout < delta.total_seconds(): - raise PodLaunchTimeoutException("Pod did not leave 'Pending' phase within specified timeout") - - self.log.info("Still waiting for pod to start. The pod state is %s", pod.status.phase) - await asyncio.sleep(self.startup_check_interval) + pod = await self._get_pod() + events_task = self.pod_manager.watch_pod_events(pod, self.startup_check_interval) + pod_start_task = self.pod_manager.await_pod_start( + pod=pod, + schedule_timeout=self.schedule_timeout, + startup_timeout=self.startup_timeout, + check_interval=self.startup_check_interval, + ) + await asyncio.gather(pod_start_task, events_task) + return self.define_container_state(await self._get_pod()) async def _wait_for_container_completion(self) -> TriggerEvent: """ @@ -287,6 +291,10 @@ def hook(self) -> AsyncKubernetesHook: cluster_context=self.cluster_context, ) + @cached_property + def pod_manager(self) -> AsyncPodManager: + return AsyncPodManager(async_hook=self.hook) # , callbacks=self.callbacks) + def define_container_state(self, pod: V1Pod) -> ContainerState: pod_containers = pod.status.container_statuses diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index c9dc45e57f414..172a9fcada8ac 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -60,6 +60,8 @@ from kubernetes.client.models.v1_pod_condition import V1PodCondition from urllib3.response import HTTPResponse + from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook + EMPTY_XCOM_RESULT = "__airflow_xcom_result_empty__" """ @@ -99,6 +101,92 @@ def check_exception_is_kubernetes_api_unauthorized(exc: BaseException): return isinstance(exc, ApiException) and exc.status and str(exc.status) == "401" +async def generic_watch_pod_events( + self, + pod: V1Pod, + check_interval: float = 1, + is_async: bool = True, +) -> None: + """Read pod events and writes into log.""" + num_events = 0 + while not self.stop_watching_events: + events = await self.read_pod_events(pod) if is_async else self.read_pod_events(pod) + for new_event in events.items[num_events:]: + involved_object: V1ObjectReference = new_event.involved_object + self.log.info("The Pod has an Event: %s from %s", new_event.message, involved_object.field_path) + num_events = len(events.items) + await asyncio.sleep(check_interval) + + +async def generic_await_pod_start( + self, + pod, + schedule_timeout: int = 120, + startup_timeout: int = 120, + check_interval: float = 1, + is_async: bool = True, +): + """ + Monitor the startup phase of a Kubernetes pod, waiting for it to leave the ``Pending`` state. + + This function is shared by both PodManager and AsyncPodManager to provide consistent pod startup tracking. + + :param pod: The pod object to monitor. + :param schedule_timeout: Maximum time (in seconds) to wait for the pod to be scheduled. + :param startup_timeout: Maximum time (in seconds) to wait for the pod to start running after being scheduled. + :param check_interval: Interval (in seconds) between status checks. + :param is_async: Set to True if called in an async context; otherwise, False. + """ + self.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout) + pod_was_scheduled = False + start_check_time = time.time() + while True: + remote_pod = await self.read_pod(pod) if is_async else self.read_pod(pod) + pod_status = remote_pod.status + if pod_status.phase != PodPhase.PENDING: + self.stop_watching_events = True + self.log.info("::endgroup::") + break + + # Check for timeout + pod_conditions: list[V1PodCondition] = pod_status.conditions + if pod_conditions and any( + (condition.type == "PodScheduled" and condition.status == "True") for condition in pod_conditions + ): + if not pod_was_scheduled: + # POD was initially scheduled update timeout for getting POD launched + pod_was_scheduled = True + start_check_time = time.time() + self.log.info("Waiting %ss to get the POD running...", startup_timeout) + + if time.time() - start_check_time >= startup_timeout: + self.log.info("::endgroup::") + raise PodLaunchTimeoutException( + f"Pod took too long to start. More than {startup_timeout}s. Check the pod events in kubernetes." + ) + else: + if time.time() - start_check_time >= schedule_timeout: + self.log.info("::endgroup::") + raise PodLaunchTimeoutException( + f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes." + ) + + # Check for general problems to terminate early - ErrImagePull + if pod_status.container_statuses: + for container_status in pod_status.container_statuses: + container_state: V1ContainerState = container_status.state + container_waiting: V1ContainerStateWaiting | None = container_state.waiting + if container_waiting: + if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]: + self.log.info("::endgroup::") + raise PodLaunchFailedException( + f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" + f"\n{container_waiting.message}" + ) + + await asyncio.sleep(check_interval) + + class PodLaunchTimeoutException(AirflowException): """When pod does not leave the ``Pending`` phase within specified timeout.""" @@ -262,16 +350,7 @@ def create_pod(self, pod: V1Pod) -> V1Pod: async def watch_pod_events(self, pod: V1Pod, check_interval: int = 1) -> None: """Read pod events and writes into log.""" - num_events = 0 - while not self.stop_watching_events: - events = self.read_pod_events(pod) - for new_event in events.items[num_events:]: - involved_object: V1ObjectReference = new_event.involved_object - self.log.info( - "The Pod has an Event: %s from %s", new_event.message, involved_object.field_path - ) - num_events = len(events.items) - await asyncio.sleep(check_interval) + await generic_watch_pod_events(self, pod, check_interval, is_async=False) async def await_pod_start( self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: int = 1 @@ -287,55 +366,14 @@ async def await_pod_start( :param check_interval: Interval (in seconds) between checks :return: """ - self.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout) - pod_was_scheduled = False - start_check_time = time.time() - while True: - remote_pod = self.read_pod(pod) - pod_status = remote_pod.status - if pod_status.phase != PodPhase.PENDING: - self.stop_watching_events = True - self.log.info("::endgroup::") - break - - # Check for timeout - pod_conditions: list[V1PodCondition] = pod_status.conditions - if pod_conditions and any( - (condition.type == "PodScheduled" and condition.status == "True") - for condition in pod_conditions - ): - if not pod_was_scheduled: - # POD was initially scheduled update timeout for getting POD launched - pod_was_scheduled = True - start_check_time = time.time() - self.log.info("Waiting %ss to get the POD running...", startup_timeout) - - if time.time() - start_check_time >= startup_timeout: - self.log.info("::endgroup::") - raise PodLaunchFailedException( - f"Pod took too long to start. More than {startup_timeout}s. Check the pod events in kubernetes." - ) - else: - if time.time() - start_check_time >= schedule_timeout: - self.log.info("::endgroup::") - raise PodLaunchFailedException( - f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes." - ) - - # Check for general problems to terminate early - ErrImagePull - if pod_status.container_statuses: - for container_status in pod_status.container_statuses: - container_state: V1ContainerState = container_status.state - container_waiting: V1ContainerStateWaiting | None = container_state.waiting - if container_waiting: - if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]: - self.log.info("::endgroup::") - raise PodLaunchFailedException( - f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" - f"\n{container_waiting.message}" - ) - - await asyncio.sleep(check_interval) + await generic_await_pod_start( + self=self, + pod=pod, + schedule_timeout=schedule_timeout, + startup_timeout=startup_timeout, + check_interval=check_interval, + is_async=False, + ) def _log_message( self, @@ -915,3 +953,67 @@ class OnFinishAction(str, enum.Enum): def is_log_group_marker(line: str) -> bool: """Check if the line is a log group marker like `::group::` or `::endgroup::`.""" return line.startswith("::group::") or line.startswith("::endgroup::") + + +class AsyncPodManager(LoggingMixin): + """Create, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodTriggerer.""" + + def __init__( + self, + async_hook: AsyncKubernetesHook, + callbacks: list[type[KubernetesPodOperatorCallback]] | None = None, + ): + """ + Create the launcher. + + :param kube_client: kubernetes client + :param callbacks: + """ + super().__init__() + self._hook = async_hook + self._watch = watch.Watch() + self._callbacks = callbacks or [] + self.stop_watching_events = False + + @tenacity.retry(stop=tenacity.stop_after_attempt(5), wait=tenacity.wait_exponential(), reraise=True) + async def read_pod(self, pod: V1Pod) -> V1Pod: + """Read POD information.""" + return await self._hook.get_pod( + pod.metadata.name, + pod.metadata.namespace, + ) + + @tenacity.retry(stop=tenacity.stop_after_attempt(5), wait=tenacity.wait_exponential(), reraise=True) + async def read_pod_events(self, pod: V1Pod) -> CoreV1EventList: + """Get pod's events.""" + return await self._hook.get_pod_events( + pod.metadata.name, + pod.metadata.namespace, + ) + + async def watch_pod_events(self, pod: V1Pod, check_interval: float = 1) -> None: + """Read pod events and writes into log.""" + await generic_watch_pod_events(self, pod, check_interval, is_async=True) + + async def await_pod_start( + self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: float = 1 + ) -> None: + """ + Wait for the pod to reach phase other than ``Pending``. + + :param pod: + :param schedule_timeout: Timeout (in seconds) for pod stay in schedule state + (if pod is taking to long in schedule state, fails task) + :param startup_timeout: Timeout (in seconds) for startup of the pod + (if pod is pending for too long after being scheduled, fails task) + :param check_interval: Interval (in seconds) between checks + :return: + """ + await generic_await_pod_start( + self=self, + pod=pod, + schedule_timeout=schedule_timeout, + startup_timeout=startup_timeout, + check_interval=check_interval, + is_async=True, + ) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py index 66fae2524d618..662f652b51545 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py @@ -44,6 +44,7 @@ IN_CLUSTER = False GET_LOGS = True STARTUP_TIMEOUT_SECS = 120 +STARTUP_CHECK_INTERVAL_SECS = 0.1 TRIGGER_START_TIME = datetime.datetime.now(tz=datetime.timezone.utc) FAILED_RESULT_MSG = "Test message that appears when trigger have failed event." BASE_CONTAINER_NAME = "base" @@ -63,11 +64,25 @@ def trigger(): in_cluster=IN_CLUSTER, get_logs=GET_LOGS, startup_timeout=STARTUP_TIMEOUT_SECS, + startup_check_interval=STARTUP_CHECK_INTERVAL_SECS, + schedule_timeout=STARTUP_TIMEOUT_SECS, trigger_start_time=TRIGGER_START_TIME, on_finish_action=ON_FINISH_ACTION, ) +@pytest.fixture +def mock_time_fixture(): + """Fixture to simulate time passage beyond startup timeout.""" + with mock.patch("time.time") as mock_time: + start_time = 1000 + mock_time.side_effect = [ + start_time, + start_time + STARTUP_TIMEOUT_SECS, + ] + yield mock_time + + def get_read_pod_mock_containers(statuses_to_emit=None): """ Emit pods with given phases sequentially. @@ -106,7 +121,8 @@ def test_serialize(self, trigger): "in_cluster": IN_CLUSTER, "get_logs": GET_LOGS, "startup_timeout": STARTUP_TIMEOUT_SECS, - "startup_check_interval": 5, + "startup_check_interval": STARTUP_CHECK_INTERVAL_SECS, + "schedule_timeout": STARTUP_TIMEOUT_SECS, "trigger_start_time": TRIGGER_START_TIME, "on_finish_action": ON_FINISH_ACTION, "last_log_time": None, @@ -138,7 +154,7 @@ async def test_run_loop_return_success_event(self, mock_wait_pod, trigger): async def test_run_loop_return_waiting_event( self, mock_hook, mock_method, mock_wait_pod, trigger, caplog ): - mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) mock_method.return_value = ContainerState.WAITING caplog.set_level(logging.INFO) @@ -157,7 +173,7 @@ async def test_run_loop_return_waiting_event( async def test_run_loop_return_running_event( self, mock_hook, mock_method, mock_wait_pod, trigger, caplog ): - mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) mock_method.return_value = ContainerState.RUNNING caplog.set_level(logging.INFO) @@ -229,7 +245,7 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( Test that KubernetesPodTrigger fires the correct event in case of fail. """ - mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) mock_method.return_value = ContainerState.FAILED caplog.set_level(logging.INFO) @@ -319,14 +335,48 @@ def test_define_container_state_should_execute_successfully( assert expected_state == trigger.define_container_state(pod) + @pytest.mark.asyncio + @mock.patch(f"{TRIGGER_PATH}.define_container_state") + @mock.patch(f"{TRIGGER_PATH}.hook") + async def test_run_loop_read_events_during_start(self, mock_hook, mock_method, trigger, caplog): + event1 = mock.AsyncMock() + event1.message = "event 1" + event1.involved_object.field_path = "object 1" + event2 = mock.AsyncMock() + event2.message = "event 2" + event2.involved_object.field_path = "object 2" + events_list = mock.AsyncMock() + events_list.items = [event1, event2] + + mock_hook.get_pod_events = mock.AsyncMock(return_value=events_list) + + pod_pending = mock.MagicMock() + pod_pending.status.phase = PodPhase.PENDING + pod_succeeded = mock.MagicMock() + pod_succeeded.status.phase = PodPhase.SUCCEEDED + + mock_hook.get_pod = mock.AsyncMock( + side_effect=[pod_pending, pod_pending, pod_succeeded, pod_succeeded] + ) + + mock_method.return_value = ContainerState.TERMINATED + + caplog.set_level(logging.INFO) + + generator = trigger.run() + await generator.asend(None) + + # Check that both events are present in the log output + assert "The Pod has an Event: event 1 from object 1" in caplog.text + assert "The Pod has an Event: event 2 from object 2" in caplog.text + @pytest.mark.asyncio @pytest.mark.parametrize("container_state", [ContainerState.WAITING, ContainerState.UNDEFINED]) @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") async def test_run_loop_return_timeout_event( - self, mock_hook, mock_method, trigger, caplog, container_state + self, mock_hook, mock_method, trigger, caplog, container_state, mock_time_fixture ): - trigger.trigger_start_time = TRIGGER_START_TIME - datetime.timedelta(minutes=2) mock_hook.get_pod.return_value = self._mock_pod_result( mock.MagicMock( status=mock.MagicMock( @@ -346,7 +396,7 @@ async def test_run_loop_return_timeout_event( "name": POD_NAME, "namespace": NAMESPACE, "status": "timeout", - "message": "Pod did not leave 'Pending' phase within specified timeout", + "message": "Pod took too long to be scheduled on the cluster, giving up. More than 120s. Check the pod events in kubernetes.", } ) == actual @@ -356,14 +406,13 @@ async def test_run_loop_return_timeout_event( @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") async def test_run_loop_return_success_for_completed_pod_after_timeout( - self, mock_hook, mock_method, trigger, caplog + self, mock_hook, mock_method, trigger, caplog, mock_time_fixture ): """ Test that the trigger correctly recognizes the pod is not pending even after the timeout has been reached. This may happen when a new triggerer process takes over the trigger, the pod already left pending state and the timeout has been reached. """ - trigger.trigger_start_time = TRIGGER_START_TIME - datetime.timedelta(minutes=2) mock_hook.get_pod.return_value = self._mock_pod_result( mock.MagicMock( status=mock.MagicMock( @@ -396,7 +445,7 @@ async def test__get_pod(self, mock_hook, trigger): Test that KubernetesPodTrigger _get_pod is called with the correct arguments. """ - mock_hook.get_pod.return_value = self._mock_pod_result(mock.MagicMock()) + mock_hook.get_pod.return_value = self._mock_pod_result(mock.AsyncMock()) await trigger._get_pod() mock_hook.get_pod.assert_called_with(name=POD_NAME, namespace=NAMESPACE) @@ -423,7 +472,7 @@ async def test__get_pod_retries( the hook.get_pod call. """ - side_effects = [Exception("Test exception") for _ in range(exc_count)] + [MagicMock()] + side_effects = [Exception("Test exception") for _ in range(exc_count)] + [mock.AsyncMock()] mock_hook.get_pod.side_effect = mock.AsyncMock(side_effect=side_effects) # We expect the exception to be raised only if the number of retries is exceeded diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 2634119fc29f6..9dc589d123e61 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -31,6 +31,7 @@ from airflow.exceptions import AirflowException from airflow.providers.cncf.kubernetes.utils.pod_manager import ( + AsyncPodManager, PodLogsConsumer, PodManager, PodPhase, @@ -700,6 +701,142 @@ def test_await_xcom_sidecar_container_starts(self, mock_container_is_running): mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar") +class TestAsyncPodManager: + def setup_method(self): + self.mock_async_hook = mock.AsyncMock() + self.async_pod_manager = AsyncPodManager( + async_hook=self.mock_async_hook, + callbacks=[], + ) + + @pytest.mark.asyncio + async def test_start_pod_raises_informative_error_on_scheduled_timeout(self): + pod_response = mock.MagicMock() + pod_response.status.phase = "Pending" + self.mock_async_hook.get_pod.return_value = pod_response + expected_msg = "Pod took too long to be scheduled on the cluster, giving up. More than 0s. Check the pod events in kubernetes." + mock_pod = mock.MagicMock() + with pytest.raises(AirflowException, match=expected_msg): + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=0, + startup_timeout=0, + ) + self.mock_async_hook.get_pod.assert_called() + + @pytest.mark.asyncio + async def test_start_pod_raises_informative_error_on_startup_timeout(self): + pod_response = mock.MagicMock() + pod_response.status.phase = "Pending" + condition = mock.MagicMock() + condition.type = "PodScheduled" + condition.status = "True" + pod_response.status.conditions = [condition] + self.mock_async_hook.get_pod.return_value = pod_response + expected_msg = "Pod took too long to start. More than 0s. Check the pod events in kubernetes." + mock_pod = mock.MagicMock() + with pytest.raises(AirflowException, match=expected_msg): + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=0, + startup_timeout=0, + ) + self.mock_async_hook.get_pod.assert_called() + + @pytest.mark.asyncio + async def test_start_pod_raises_fast_error_on_image_error(self): + pod_response = mock.MagicMock() + pod_response.status.phase = "Pending" + container_status = mock.MagicMock() + waiting_state = mock.MagicMock() + waiting_state.reason = "ErrImagePull" + waiting_state.message = "Test error" + container_status.state.waiting = waiting_state + pod_response.status.container_statuses = [container_status] + self.mock_async_hook.get_pod.return_value = pod_response + expected_msg = f"Pod docker image cannot be pulled, unable to start: {waiting_state.reason}\n{waiting_state.message}" + mock_pod = mock.MagicMock() + with pytest.raises(AirflowException, match=expected_msg): + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=60, + startup_timeout=60, + ) + self.mock_async_hook.get_pod.assert_called() + + @pytest.mark.asyncio + @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) + async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog): + condition_scheduled = mock.MagicMock() + condition_scheduled.type = "PodScheduled" + condition_scheduled.status = "True" + + pod_info_pending = mock.MagicMock() + pod_info_pending.status.phase = PodPhase.PENDING + pod_info_pending.status.conditions = [] + + pod_info_pending_scheduled = mock.MagicMock() + pod_info_pending_scheduled.status.phase = PodPhase.PENDING + pod_info_pending_scheduled.status.conditions = [condition_scheduled] + + pod_info_succeeded = mock.MagicMock() + pod_info_succeeded.status.phase = PodPhase.SUCCEEDED + + # Simulate sequence of pod states + self.mock_async_hook.get_pod.side_effect = [ + pod_info_pending, + pod_info_pending_scheduled, + pod_info_pending_scheduled, + pod_info_succeeded, + ] + startup_check_interval = 10 + schedule_timeout = 30 + startup_timeout = 60 + mock_pod = mock.MagicMock() + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=schedule_timeout, + startup_timeout=startup_timeout, + check_interval=startup_check_interval, + ) + assert mock_time_sleep.call_count == 3 + assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text + assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text + assert self.async_pod_manager.stop_watching_events is True + + @pytest.mark.asyncio + @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) + async def test_watch_pod_events(self, mock_time_sleep): + mock_pod = mock.MagicMock() + mock_pod.metadata.name = "test-pod" + mock_pod.metadata.namespace = "default" + + events = mock.MagicMock() + events.items = [] + for id in ["event 1", "event 2"]: + event = mock.MagicMock() + event.message = f"test {id}" + event.involved_object.field_path = f"object {id}" + events.items.append(event) + startup_check_interval = 10 + + def get_pod_events_side_effect(name, namespace): + self.async_pod_manager.stop_watching_events = True + return events + + self.mock_async_hook.get_pod_events.side_effect = get_pod_events_side_effect + + with mock.patch.object(type(self.async_pod_manager), "log", create=True) as log_mock: + await self.async_pod_manager.watch_pod_events(pod=mock_pod, check_interval=startup_check_interval) + log_mock.info.assert_any_call( + "The Pod has an Event: %s from %s", "test event 1", "object event 1" + ) + log_mock.info.assert_any_call( + "The Pod has an Event: %s from %s", "test event 2", "object event 2" + ) + mock_time_sleep.assert_called_once_with(startup_check_interval) + + class TestPodLogsConsumer: @pytest.mark.parametrize( "chunks, expected_logs", From 18faa475630f883a27753730a36eb5322b8e323b Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Tue, 21 Oct 2025 14:35:12 +0200 Subject: [PATCH 04/14] Reworked unit tests and pod startup logic --- .../providers/cncf/kubernetes/exceptions.py | 4 ++ .../cncf/kubernetes/hooks/kubernetes.py | 34 +--------- .../cncf/kubernetes/operators/pod.py | 1 + .../providers/cncf/kubernetes/triggers/pod.py | 7 +-- .../cncf/kubernetes/utils/pod_manager.py | 63 +++++++++++-------- .../unit/cncf/kubernetes/triggers/test_pod.py | 22 +++---- .../cncf/kubernetes/utils/test_pod_manager.py | 61 ++++++++++-------- 7 files changed, 90 insertions(+), 102 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py index c0b6ad83a3fdc..82015cc983ce8 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py @@ -27,3 +27,7 @@ class PodMutationHookException(AirflowException): class PodReconciliationError(AirflowException): """Raised when an error is encountered while trying to merge pod configs.""" + + +class KubernetesApiError(AirflowException): + """Raised when an error is encountered while trying access Kubernetes API.""" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index aeb5b3034281f..506a59439c4a9 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -37,6 +37,7 @@ from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.models import Connection +from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation from airflow.providers.cncf.kubernetes.utils.container import ( @@ -68,35 +69,6 @@ def _load_body_to_dict(body: str) -> dict: return body_dict -class PodOperatorHookProtocol(Protocol): - """ - Protocol to define methods relied upon by KubernetesPodOperator. - - Subclasses of KubernetesPodOperator, such as GKEStartPodOperator, may use - hooks that don't extend KubernetesHook. We use this protocol to document the - methods used by KPO and ensure that these methods exist on such other hooks. - """ - - @property - def core_v1_client(self) -> client.CoreV1Api: - """Get authenticated client object.""" - - @property - def is_in_cluster(self) -> bool: - """Expose whether the hook is configured with ``load_incluster_config`` or not.""" - - def get_pod(self, name: str, namespace: str) -> V1Pod: - """Read pod object from kubernetes API.""" - - def get_namespace(self) -> str | None: - """Return the namespace that defined in the connection.""" - - def get_xcom_sidecar_container_image(self) -> str | None: - """Return the xcom sidecar image that defined in the connection.""" - - def get_xcom_sidecar_container_resources(self) -> str | None: - """Return the xcom sidecar resources that defined in the connection.""" - class PodOperatorHookProtocol(Protocol): """ Protocol to define methods relied upon by KubernetesPodOperator. @@ -915,7 +887,7 @@ async def get_pod(self, name: str, namespace: str) -> V1Pod: ) return pod except HTTPError as e: - raise AirflowException(f"There was an error reading the kubernetes API: {e}") + raise KubernetesApiError from e async def delete_pod(self, name: str, namespace: str): """ @@ -975,7 +947,7 @@ async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList: ) return events except HTTPError as e: - raise AirflowException(f"There was an error reading the kubernetes API: {e}") + raise KubernetesApiError from e async def get_job_status(self, name: str, namespace: str) -> V1Job: """ diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index bee7eb60049d1..d5cc9e27caf9a 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -867,6 +867,7 @@ def invoke_defer_method(self, last_log_time: DateTime | None = None) -> None: get_logs=self.get_logs, startup_timeout=self.startup_timeout_seconds, startup_check_interval=self.startup_check_interval_seconds, + schedule_timeout=self.schedule_timeout_seconds, base_container_name=self.base_container_name, on_finish_action=self.on_finish_action.value, last_log_time=last_log_time, diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py index 5207b233a451f..cf8530e7ab438 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -94,7 +94,7 @@ def __init__( get_logs: bool = True, startup_timeout: int = 120, startup_check_interval: float = 5, - schedule_timeout: int | None = None, + schedule_timeout: int = 120, on_finish_action: str = "delete_pod", last_log_time: DateTime | None = None, logging_interval: int | None = None, @@ -113,8 +113,7 @@ def __init__( self.get_logs = get_logs self.startup_timeout = startup_timeout self.startup_check_interval = startup_check_interval - # New parameter startup_timeout_seconds adds breaking change, to handle this as smooth as possible just reuse startup time - self.schedule_timeout = schedule_timeout or startup_timeout + self.schedule_timeout = schedule_timeout self.last_log_time = last_log_time self.logging_interval = logging_interval self.on_finish_action = OnFinishAction(on_finish_action) @@ -293,7 +292,7 @@ def hook(self) -> AsyncKubernetesHook: @cached_property def pod_manager(self) -> AsyncPodManager: - return AsyncPodManager(async_hook=self.hook) # , callbacks=self.callbacks) + return AsyncPodManager(async_hook=self.hook) def define_container_state(self, pod: V1Pod) -> ContainerState: pod_containers = pod.status.container_statuses diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 172a9fcada8ac..ca39ffee238a7 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -101,51 +101,62 @@ def check_exception_is_kubernetes_api_unauthorized(exc: BaseException): return isinstance(exc, ApiException) and exc.status and str(exc.status) == "401" -async def generic_watch_pod_events( - self, +async def watch_pod_events( + pod_manager: PodManager | AsyncPodManager, pod: V1Pod, check_interval: float = 1, - is_async: bool = True, ) -> None: - """Read pod events and writes into log.""" + """ + Read pod events and write them to the log. + + This function supports both asynchronous and synchronous pod managers. + + :param pod_manager: The pod manager instance (PodManager or AsyncPodManager). + :param pod: The pod object to monitor. + :param check_interval: Interval (in seconds) between checks. + """ num_events = 0 - while not self.stop_watching_events: - events = await self.read_pod_events(pod) if is_async else self.read_pod_events(pod) + is_async = isinstance(pod_manager, AsyncPodManager) + while not pod_manager.stop_watching_events: + events = await pod_manager.read_pod_events(pod) if is_async else pod_manager.read_pod_events(pod) for new_event in events.items[num_events:]: involved_object: V1ObjectReference = new_event.involved_object - self.log.info("The Pod has an Event: %s from %s", new_event.message, involved_object.field_path) + pod_manager.log.info( + "The Pod has an Event: %s from %s", new_event.message, involved_object.field_path + ) num_events = len(events.items) await asyncio.sleep(check_interval) -async def generic_await_pod_start( - self, - pod, +async def await_pod_start( + pod_manager: PodManager | AsyncPodManager, + pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: float = 1, - is_async: bool = True, ): """ Monitor the startup phase of a Kubernetes pod, waiting for it to leave the ``Pending`` state. This function is shared by both PodManager and AsyncPodManager to provide consistent pod startup tracking. + :param pod_manager: The pod manager instance (PodManager or AsyncPodManager). :param pod: The pod object to monitor. :param schedule_timeout: Maximum time (in seconds) to wait for the pod to be scheduled. :param startup_timeout: Maximum time (in seconds) to wait for the pod to start running after being scheduled. :param check_interval: Interval (in seconds) between status checks. :param is_async: Set to True if called in an async context; otherwise, False. """ - self.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout) + pod_manager.log.info("::group::Waiting until %ss to get the POD scheduled...", schedule_timeout) pod_was_scheduled = False start_check_time = time.time() + is_async = isinstance(pod_manager, AsyncPodManager) while True: - remote_pod = await self.read_pod(pod) if is_async else self.read_pod(pod) + remote_pod = await pod_manager.read_pod(pod) if is_async else pod_manager.read_pod(pod) pod_status = remote_pod.status if pod_status.phase != PodPhase.PENDING: - self.stop_watching_events = True - self.log.info("::endgroup::") + pod_manager.stop_watching_events = True + pod_manager.log.info("::endgroup::") break # Check for timeout @@ -157,16 +168,16 @@ async def generic_await_pod_start( # POD was initially scheduled update timeout for getting POD launched pod_was_scheduled = True start_check_time = time.time() - self.log.info("Waiting %ss to get the POD running...", startup_timeout) + pod_manager.log.info("Waiting %ss to get the POD running...", startup_timeout) if time.time() - start_check_time >= startup_timeout: - self.log.info("::endgroup::") + pod_manager.log.info("::endgroup::") raise PodLaunchTimeoutException( f"Pod took too long to start. More than {startup_timeout}s. Check the pod events in kubernetes." ) else: if time.time() - start_check_time >= schedule_timeout: - self.log.info("::endgroup::") + pod_manager.log.info("::endgroup::") raise PodLaunchTimeoutException( f"Pod took too long to be scheduled on the cluster, giving up. More than {schedule_timeout}s. Check the pod events in kubernetes." ) @@ -178,7 +189,7 @@ async def generic_await_pod_start( container_waiting: V1ContainerStateWaiting | None = container_state.waiting if container_waiting: if container_waiting.reason in ["ErrImagePull", "InvalidImageName"]: - self.log.info("::endgroup::") + pod_manager.log.info("::endgroup::") raise PodLaunchFailedException( f"Pod docker image cannot be pulled, unable to start: {container_waiting.reason}" f"\n{container_waiting.message}" @@ -350,7 +361,7 @@ def create_pod(self, pod: V1Pod) -> V1Pod: async def watch_pod_events(self, pod: V1Pod, check_interval: int = 1) -> None: """Read pod events and writes into log.""" - await generic_watch_pod_events(self, pod, check_interval, is_async=False) + await watch_pod_events(pod_manager=self, pod=pod, check_interval=check_interval) async def await_pod_start( self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: int = 1 @@ -366,13 +377,12 @@ async def await_pod_start( :param check_interval: Interval (in seconds) between checks :return: """ - await generic_await_pod_start( - self=self, + await await_pod_start( + pod_manager=self, pod=pod, schedule_timeout=schedule_timeout, startup_timeout=startup_timeout, check_interval=check_interval, - is_async=False, ) def _log_message( @@ -993,7 +1003,7 @@ async def read_pod_events(self, pod: V1Pod) -> CoreV1EventList: async def watch_pod_events(self, pod: V1Pod, check_interval: float = 1) -> None: """Read pod events and writes into log.""" - await generic_watch_pod_events(self, pod, check_interval, is_async=True) + await watch_pod_events(pod_manager=self, pod=pod, check_interval=check_interval) async def await_pod_start( self, pod: V1Pod, schedule_timeout: int = 120, startup_timeout: int = 120, check_interval: float = 1 @@ -1009,11 +1019,10 @@ async def await_pod_start( :param check_interval: Interval (in seconds) between checks :return: """ - await generic_await_pod_start( - self=self, + await await_pod_start( + pod_manager=self, pod=pod, schedule_timeout=schedule_timeout, startup_timeout=startup_timeout, check_interval=check_interval, - is_async=True, ) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py index 662f652b51545..060d53314d4ff 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py @@ -338,7 +338,7 @@ def test_define_container_state_should_execute_successfully( @pytest.mark.asyncio @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") - async def test_run_loop_read_events_during_start(self, mock_hook, mock_method, trigger, caplog): + async def test_run_loop_read_events_during_start(self, mock_hook, mock_method, trigger): event1 = mock.AsyncMock() event1.message = "event 1" event1.involved_object.field_path = "object 1" @@ -361,21 +361,19 @@ async def test_run_loop_read_events_during_start(self, mock_hook, mock_method, t mock_method.return_value = ContainerState.TERMINATED - caplog.set_level(logging.INFO) - - generator = trigger.run() - await generator.asend(None) + with mock.patch.object(trigger.pod_manager.log, "info") as mock_log_info: + generator = trigger.run() + await generator.asend(None) - # Check that both events are present in the log output - assert "The Pod has an Event: event 1 from object 1" in caplog.text - assert "The Pod has an Event: event 2 from object 2" in caplog.text + mock_log_info.assert_any_call("The Pod has an Event: %s from %s", "event 1", "object 1") + mock_log_info.assert_any_call("The Pod has an Event: %s from %s", "event 2", "object 2") @pytest.mark.asyncio @pytest.mark.parametrize("container_state", [ContainerState.WAITING, ContainerState.UNDEFINED]) @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") async def test_run_loop_return_timeout_event( - self, mock_hook, mock_method, trigger, caplog, container_state, mock_time_fixture + self, mock_hook, mock_method, trigger, container_state, mock_time_fixture ): mock_hook.get_pod.return_value = self._mock_pod_result( mock.MagicMock( @@ -386,8 +384,6 @@ async def test_run_loop_return_timeout_event( ) mock_method.return_value = container_state - caplog.set_level(logging.INFO) - generator = trigger.run() actual = await generator.asend(None) assert ( @@ -406,7 +402,7 @@ async def test_run_loop_return_timeout_event( @mock.patch(f"{TRIGGER_PATH}.define_container_state") @mock.patch(f"{TRIGGER_PATH}.hook") async def test_run_loop_return_success_for_completed_pod_after_timeout( - self, mock_hook, mock_method, trigger, caplog, mock_time_fixture + self, mock_hook, mock_method, trigger, mock_time_fixture ): """ Test that the trigger correctly recognizes the pod is not pending even after the timeout has been @@ -422,8 +418,6 @@ async def test_run_loop_return_success_for_completed_pod_after_timeout( ) mock_method.return_value = ContainerState.TERMINATED - caplog.set_level(logging.INFO) - generator = trigger.run() actual = await generator.asend(None) assert ( diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 9dc589d123e61..e0da0bc104cbd 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -478,7 +478,7 @@ async def test_start_pod_raises_fast_error_on_image_error(self): @pytest.mark.asyncio @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) - async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog): + async def test_start_pod_startup_interval_seconds(self, mock_time_sleep): condition_scheduled = mock.MagicMock() condition_scheduled.type = "PodScheduled" condition_scheduled.status = "True" @@ -501,17 +501,21 @@ def pod_state_gen(): schedule_timeout = 30 startup_timeout = 60 mock_pod = MagicMock() - await self.pod_manager.await_pod_start( - pod=mock_pod, - schedule_timeout=schedule_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing - startup_timeout=startup_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing - check_interval=startup_check_interval, - ) - mock_time_sleep.assert_called_with(startup_check_interval) - assert mock_time_sleep.call_count == 3 - assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text - assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text - assert self.pod_manager.stop_watching_events is True + + with mock.patch.object(self.pod_manager.log, "info") as mock_log_info: + await self.pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=schedule_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing + startup_timeout=startup_timeout, # Never hit, any value is fine, as time.sleep is mocked to do nothing + check_interval=startup_check_interval, + ) + mock_time_sleep.assert_called_with(startup_check_interval) + assert self.pod_manager.stop_watching_events is True + assert mock_time_sleep.call_count == 3 + mock_log_info.assert_any_call( + "::group::Waiting until %ss to get the POD scheduled...", schedule_timeout + ) + mock_log_info.assert_any_call("Waiting %ss to get the POD running...", startup_timeout) @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running") def test_container_is_running(self, container_is_running_mock): @@ -766,7 +770,7 @@ async def test_start_pod_raises_fast_error_on_image_error(self): @pytest.mark.asyncio @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) - async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog): + async def test_start_pod_startup_interval_seconds(self, mock_time_sleep): condition_scheduled = mock.MagicMock() condition_scheduled.type = "PodScheduled" condition_scheduled.status = "True" @@ -793,16 +797,21 @@ async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, caplog) schedule_timeout = 30 startup_timeout = 60 mock_pod = mock.MagicMock() - await self.async_pod_manager.await_pod_start( - pod=mock_pod, - schedule_timeout=schedule_timeout, - startup_timeout=startup_timeout, - check_interval=startup_check_interval, - ) - assert mock_time_sleep.call_count == 3 - assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text - assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text - assert self.async_pod_manager.stop_watching_events is True + with mock.patch.object(self.async_pod_manager.log, "info") as mock_log_info: + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=schedule_timeout, + startup_timeout=startup_timeout, + check_interval=startup_check_interval, + ) + assert mock_time_sleep.call_count == 3 + mock_log_info.assert_any_call( + "::group::Waiting until %ss to get the POD scheduled...", schedule_timeout + ) + mock_log_info.assert_any_call("Waiting %ss to get the POD running...", startup_timeout) + # assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text + # assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text + assert self.async_pod_manager.stop_watching_events is True @pytest.mark.asyncio @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) @@ -826,12 +835,12 @@ def get_pod_events_side_effect(name, namespace): self.mock_async_hook.get_pod_events.side_effect = get_pod_events_side_effect - with mock.patch.object(type(self.async_pod_manager), "log", create=True) as log_mock: + with mock.patch.object(self.async_pod_manager.log, "info") as mock_log_info: await self.async_pod_manager.watch_pod_events(pod=mock_pod, check_interval=startup_check_interval) - log_mock.info.assert_any_call( + mock_log_info.assert_any_call( "The Pod has an Event: %s from %s", "test event 1", "object event 1" ) - log_mock.info.assert_any_call( + mock_log_info.assert_any_call( "The Pod has an Event: %s from %s", "test event 2", "object event 2" ) mock_time_sleep.assert_called_once_with(startup_check_interval) From 0f5beccf7766a040959c681ecf48c3eb37132d46 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Wed, 22 Oct 2025 11:11:27 +0200 Subject: [PATCH 05/14] Add api permission error detection for triggerer --- .../providers/cncf/kubernetes/exceptions.py | 4 ++++ .../cncf/kubernetes/hooks/kubernetes.py | 6 +++++- .../providers/cncf/kubernetes/triggers/pod.py | 17 +++++++++++++++++ .../cncf/kubernetes/utils/pod_manager.py | 10 ++++++++-- 4 files changed, 34 insertions(+), 3 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py index 82015cc983ce8..5503c743797ad 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/exceptions.py @@ -31,3 +31,7 @@ class PodReconciliationError(AirflowException): class KubernetesApiError(AirflowException): """Raised when an error is encountered while trying access Kubernetes API.""" + + +class KubernetesApiPermissionError(AirflowException): + """Raised when an error is encountered while trying access Kubernetes API.""" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 506a59439c4a9..e08d2bb8aaa16 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -37,7 +37,7 @@ from airflow.exceptions import AirflowException, AirflowNotFoundException from airflow.models import Connection -from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError +from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError, KubernetesApiPermissionError from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation from airflow.providers.cncf.kubernetes.utils.container import ( @@ -887,6 +887,8 @@ async def get_pod(self, name: str, namespace: str) -> V1Pod: ) return pod except HTTPError as e: + if hasattr(e, "status") and e.status == 403: + raise KubernetesApiPermissionError("Permission denied (403) from Kubernetes API.") from e raise KubernetesApiError from e async def delete_pod(self, name: str, namespace: str): @@ -947,6 +949,8 @@ async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList: ) return events except HTTPError as e: + if hasattr(e, "status") and e.status == 403: + raise KubernetesApiPermissionError("Permission denied (403) from Kubernetes API.") from e raise KubernetesApiError from e async def get_job_status(self, name: str, namespace: str) -> V1Job: diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py index cf8530e7ab438..a05a4ff05d5fb 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -26,6 +26,7 @@ import tenacity +from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiPermissionError from airflow.providers.cncf.kubernetes.hooks.kubernetes import AsyncKubernetesHook from airflow.providers.cncf.kubernetes.utils.pod_manager import ( AsyncPodManager, @@ -186,6 +187,22 @@ async def run(self) -> AsyncIterator[TriggerEvent]: } ) return + except KubernetesApiPermissionError as e: + message = ( + "Kubernetes API permission error: The triggerer may not have sufficient permissions to monitor or delete pods. " + "Please ensure the triggerer's service account is included in the 'pod-launcher-role' as defined in the latest Airflow Helm chart. " + f"Original error: {e}" + ) + yield TriggerEvent( + { + "name": self.pod_name, + "namespace": self.pod_namespace, + "status": "error", + "message": message, + **self.trigger_kwargs, + } + ) + return except Exception as e: yield TriggerEvent( { diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index ca39ffee238a7..789ad4db79e6c 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -118,7 +118,10 @@ async def watch_pod_events( num_events = 0 is_async = isinstance(pod_manager, AsyncPodManager) while not pod_manager.stop_watching_events: - events = await pod_manager.read_pod_events(pod) if is_async else pod_manager.read_pod_events(pod) + if is_async: + events = await pod_manager.read_pod_events(pod) + else: + events = pod_manager.read_pod_events(pod) for new_event in events.items[num_events:]: involved_object: V1ObjectReference = new_event.involved_object pod_manager.log.info( @@ -152,7 +155,10 @@ async def await_pod_start( start_check_time = time.time() is_async = isinstance(pod_manager, AsyncPodManager) while True: - remote_pod = await pod_manager.read_pod(pod) if is_async else pod_manager.read_pod(pod) + if is_async: + remote_pod = await pod_manager.read_pod(pod) + else: + remote_pod = pod_manager.read_pod(pod) pod_status = remote_pod.status if pod_status.phase != PodPhase.PENDING: pod_manager.stop_watching_events = True From 544cb4c186e17da9af0c47b2fcae07251809f11e Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Tue, 28 Oct 2025 08:33:52 +0100 Subject: [PATCH 06/14] Fix pytest fixture --- .../tests/unit/cncf/kubernetes/triggers/test_pod.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py index 060d53314d4ff..fc477fe0cf763 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py @@ -77,8 +77,7 @@ def mock_time_fixture(): with mock.patch("time.time") as mock_time: start_time = 1000 mock_time.side_effect = [ - start_time, - start_time + STARTUP_TIMEOUT_SECS, + *(start_time + STARTUP_TIMEOUT_SECS * n for n in range(5)), ] yield mock_time @@ -383,7 +382,6 @@ async def test_run_loop_return_timeout_event( ) ) mock_method.return_value = container_state - generator = trigger.run() actual = await generator.asend(None) assert ( From 767e03dea039ef1329bbbcad2814ff6e7097b687 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Tue, 28 Oct 2025 10:12:33 +0100 Subject: [PATCH 07/14] Removed not requried code --- .../tests/unit/cncf/kubernetes/utils/test_pod_manager.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index e0da0bc104cbd..485bdbd769e4d 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -809,8 +809,6 @@ async def test_start_pod_startup_interval_seconds(self, mock_time_sleep): "::group::Waiting until %ss to get the POD scheduled...", schedule_timeout ) mock_log_info.assert_any_call("Waiting %ss to get the POD running...", startup_timeout) - # assert f"::group::Waiting until {schedule_timeout}s to get the POD scheduled..." in caplog.text - # assert f"Waiting {startup_timeout}s to get the POD running..." in caplog.text assert self.async_pod_manager.stop_watching_events is True @pytest.mark.asyncio From bba98698740adc6d10aa347bc2c5f432b7cf1195 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Tue, 28 Oct 2025 14:58:49 +0100 Subject: [PATCH 08/14] Move log file reading to triggerer --- .../cncf/kubernetes/hooks/kubernetes.py | 15 +- .../cncf/kubernetes/operators/pod.py | 25 +-- .../providers/cncf/kubernetes/triggers/pod.py | 21 +- .../cncf/kubernetes/utils/pod_manager.py | 86 ++++++-- .../cncf/kubernetes/hooks/test_kubernetes.py | 13 +- .../cncf/kubernetes/utils/test_pod_manager.py | 187 ++++++++++++++---- 6 files changed, 258 insertions(+), 89 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index d59c6527d6b8a..ad161d1471b6e 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -910,7 +910,9 @@ async def delete_pod(self, name: str, namespace: str): if str(e.status) != "404": raise - async def read_logs(self, name: str, namespace: str): + async def read_logs( + self, name: str, namespace: str, container_name: str | None = None, since_seconds: int | None = None + ): """ Read logs inside the pod while starting containers inside. @@ -921,6 +923,8 @@ async def read_logs(self, name: str, namespace: str): :param name: Name of the pod. :param namespace: Name of the pod's namespace. + :param container_name: Name of the container inside the pod. + :param since_seconds: Only return logs newer than a relative duration in seconds. """ async with self.get_conn() as connection: try: @@ -928,16 +932,15 @@ async def read_logs(self, name: str, namespace: str): logs = await v1_api.read_namespaced_pod_log( name=name, namespace=namespace, + container_name=container_name, follow=False, timestamps=True, + since_seconds=since_seconds, ) logs = logs.splitlines() - for line in logs: - self.log.info("Container logs from %s", line) return logs - except HTTPError: - self.log.exception("There was an error reading the kubernetes API.") - raise + except HTTPError as e: + raise KubernetesApiError from e async def get_pod_events(self, name: str, namespace: str) -> CoreV1EventList: """Get pod's events.""" diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index dd9582e964003..2666132b3185d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -943,32 +943,21 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: message = event.get("stack_trace", event["message"]) raise AirflowException(message) - return xcom_sidecar_output - - if event["status"] == "running": + if event["status"] == "success": + # fetch some logs when pod is executed successfully if self.get_logs: - self.log.info("Resuming logs read from time %r", last_log_time) - - pod_log_status = self.pod_manager.fetch_container_logs( - pod=self.pod, - container_name=self.base_container_name, - follow=follow, - since_time=last_log_time, - container_name_log_prefix_enabled=self.container_name_log_prefix_enabled, - log_formatter=self.log_formatter, - ) + self._write_logs(self.pod, follow=follow, since_time=last_log_time) - self.invoke_defer_method(pod_log_status.last_log_time) - else: - self.invoke_defer_method() + if self.do_xcom_push: + xcom_sidecar_output = self.extract_xcom(pod=self.pod) + return xcom_sidecar_output + return except TaskDeferred: raise finally: self._clean(event=event, context=context, result=xcom_sidecar_output) def _clean(self, event: dict[str, Any], result: dict | None, context: Context) -> None: - if event["status"] == "running": - return istio_enabled = self.is_istio_enabled(self.pod) # Skip await_pod_completion when the event is 'timeout' due to the pod can hang # on the ErrImagePull or ContainerCreating step and it will never complete diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py index a05a4ff05d5fb..3f00c7afbe429 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -251,7 +251,9 @@ async def _wait_for_container_completion(self) -> TriggerEvent: time_begin = datetime.datetime.now(tz=datetime.timezone.utc) time_get_more_logs = None if self.logging_interval is not None: - time_get_more_logs = time_begin + datetime.timedelta(seconds=self.logging_interval) + # if required to read logs then read logs first time after 10 seconds and then switch to logging interval + # to inform the user that something is happening in the pod + time_get_more_logs = time_begin + datetime.timedelta(seconds=10) while True: pod = await self._get_pod() container_state = self.define_container_state(pod) @@ -278,15 +280,14 @@ async def _wait_for_container_completion(self) -> TriggerEvent: ) self.log.debug("Container is not completed and still working.") if time_get_more_logs and datetime.datetime.now(tz=datetime.timezone.utc) > time_get_more_logs: - return TriggerEvent( - { - "status": "running", - "last_log_time": self.last_log_time, - "namespace": self.pod_namespace, - "name": self.pod_name, - **self.trigger_kwargs, - } - ) + if self.get_logs and self.logging_interval: + self.last_log_time = await self.pod_manager.fetch_container_logs_before_current_sec( + pod, container_name=self.base_container_name, since_time=self.last_log_time + ) + time_get_more_logs = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta( + seconds=self.logging_interval + ) + self.log.debug("Sleeping for %s seconds.", self.poll_interval) await asyncio.sleep(self.poll_interval) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 789ad4db79e6c..a8cdcf9c63414 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -480,7 +480,7 @@ def consume_logs(*, since_time: DateTime | None = None) -> tuple[DateTime | None try: for raw_line in logs: line = raw_line.decode("utf-8", errors="backslashreplace") - line_timestamp, message = self.parse_log_line(line) + line_timestamp, message = parse_log_line(line) if line_timestamp: # detect new log line if message_to_log is None: # first line in the log message_to_log = message @@ -708,22 +708,6 @@ def await_pod_completion( time.sleep(2) return remote_pod - def parse_log_line(self, line: str) -> tuple[DateTime | None, str]: - """ - Parse K8s log line and returns the final state. - - :param line: k8s log line - :return: timestamp and log message - """ - timestamp, sep, message = line.strip().partition(" ") - if not sep: - return None, line - try: - last_log_time = cast("DateTime", pendulum.parse(timestamp)) - except ParserError: - return None, line - return last_log_time, message - def container_is_running(self, pod: V1Pod, container_name: str) -> bool: """Read pod and checks if container is running.""" remote_pod = self.read_pod(pod) @@ -971,6 +955,23 @@ def is_log_group_marker(line: str) -> bool: return line.startswith("::group::") or line.startswith("::endgroup::") +def parse_log_line(line: str) -> tuple[DateTime | None, str]: + """ + Parse K8s log line and returns the final state. + + :param line: k8s log line + :return: timestamp and log message + """ + timestamp, sep, message = line.strip().partition(" ") + if not sep: + return None, line + try: + last_log_time = cast("DateTime", pendulum.parse(timestamp)) + except ParserError: + return None, line + return last_log_time, message + + class AsyncPodManager(LoggingMixin): """Create, monitor, and otherwise interact with Kubernetes pods for use with the KubernetesPodTriggerer.""" @@ -1032,3 +1033,54 @@ async def await_pod_start( startup_timeout=startup_timeout, check_interval=check_interval, ) + + @tenacity.retry(stop=tenacity.stop_after_attempt(5), wait=tenacity.wait_exponential(), reraise=True) + async def fetch_container_logs_before_current_sec( + self, pod: V1Pod, container_name: str, since_time: DateTime | None = None + ) -> DateTime | None: + """ + Asynchronously read the log file of the specified pod. + + This method streams logs from the base container, skipping log lines from the current second to prevent duplicate entries on subsequent reads. It is designed to handle long-running containers and gracefully suppresses transient interruptions. + + :param pod: The pod specification to monitor. + :param container_name: The name of the container within the pod. + :param since_time: The timestamp from which to start reading logs. + :return: The timestamp to use for the next log read, representing the start of the current second. Returns None if an exception occurred. + """ + now = pendulum.now() + logs = await self._hook.read_logs( + name=pod.metadata.name, + namespace=pod.metadata.namespace, + container_name=container_name, + since_seconds=(math.ceil((now - since_time).total_seconds()) if since_time else None), + ) + message_to_log = None + try: + now_seconds = now.replace(microsecond=0) + for line in logs: + line_timestamp, message = parse_log_line(line) + # Skip log lines from the current second to prevent duplicate entries on the next read. + # The API only allows specifying 'since_seconds', not an exact timestamp. + if line_timestamp and line_timestamp.replace(microsecond=0) == now_seconds: + break + if line_timestamp: # detect new log line + if message_to_log is None: # first line in the log + message_to_log = message + else: # previous log line is complete + if message_to_log is not None: + if is_log_group_marker(message_to_log): + print(message_to_log) + else: + self.log.info("[%s] %s", container_name, message_to_log) + message_to_log = message + elif message_to_log: # continuation of the previous log line + message_to_log = f"{message_to_log}\n{message}" + finally: + # log the last line and update the last_captured_timestamp + if message_to_log is not None: + if is_log_group_marker(message_to_log): + print(message_to_log) + else: + self.log.info("[%s] %s", container_name, message_to_log) + return now # Return the current time as the last log time to ensure logs from the current second are read in the next fetch. diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py index 223c153cfe0e3..c25caaabd64b2 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py @@ -1063,22 +1063,25 @@ async def test_read_logs(self, lib_method, kube_config_loader): with mock.patch( "airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.log", new_callable=PropertyMock, - ) as log: - await hook.read_logs( + ): + logs = await hook.read_logs( name=POD_NAME, namespace=NAMESPACE, + container_name=CONTAINER_NAME, + since_seconds=10, ) lib_method.assert_called_once() lib_method.assert_called_with( name=POD_NAME, namespace=NAMESPACE, + container_name=CONTAINER_NAME, follow=False, timestamps=True, + since_seconds=10, ) - log.return_value.info.assert_called_with( - "Container logs from %s", "2023-01-11 Some string logs..." - ) + assert len(logs) == 1 + assert "2023-01-11 Some string logs..." in logs @pytest.mark.asyncio @mock.patch(KUBE_BATCH_API.format("read_namespaced_job_status")) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 485bdbd769e4d..2c1150fd9fe04 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -27,14 +27,17 @@ import pytest import time_machine from kubernetes.client.rest import ApiException +from tenacity import wait_none from urllib3.exceptions import HTTPError as BaseHTTPError from airflow.exceptions import AirflowException +from airflow.providers.cncf.kubernetes.exceptions import KubernetesApiError from airflow.providers.cncf.kubernetes.utils.pod_manager import ( AsyncPodManager, PodLogsConsumer, PodManager, PodPhase, + parse_log_line, ) from airflow.utils.timezone import utc @@ -44,6 +47,18 @@ from pendulum import DateTime +def test_parse_log_line(): + log_message = "This should return no timestamp" + timestamp, line = parse_log_line(log_message) + assert timestamp is None + assert line == log_message + + real_timestamp = "2020-10-08T14:16:17.793417674Z" + timestamp, line = parse_log_line(f"{real_timestamp} {log_message}") + assert timestamp == pendulum.parse(real_timestamp) + assert line == log_message + + class TestPodManager: def setup_method(self): self.mock_kube_client = mock.Mock() @@ -51,6 +66,23 @@ def setup_method(self): kube_client=self.mock_kube_client, callbacks=[MockKubernetesPodOperatorCallback], ) + # List of PodManager methods that may use tenacity retry + tenacity_methods = [ + "await_pod_start", + "read_pod_logs", + "create_pod", + "get_init_container_names", + "get_container_names", + "read_pod_events", + "read_pod", + "extract_xcom_json", + "extract_xcom_kill", + ] + # Patch tenacity retry wait for all relevant methods to disable waiting in tests + for method_name in tenacity_methods: + method = getattr(self.pod_manager, method_name, None) + if method and hasattr(method, "retry"): + method.retry.wait = wait_none() def test_read_pod_logs_successfully_returns_logs(self): mock.sentinel.metadata = mock.MagicMock() @@ -298,17 +330,6 @@ def test_read_pod_retries_fails(self): with pytest.raises(AirflowException): self.pod_manager.read_pod(mock.sentinel) - def test_parse_log_line(self): - log_message = "This should return no timestamp" - timestamp, line = self.pod_manager.parse_log_line(log_message) - assert timestamp is None - assert line == log_message - - real_timestamp = "2020-10-08T14:16:17.793417674Z" - timestamp, line = self.pod_manager.parse_log_line(f"{real_timestamp} {log_message}") - assert timestamp == pendulum.parse(real_timestamp) - assert line == log_message - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs") def test_fetch_container_logs_returning_last_timestamp( @@ -706,12 +727,29 @@ def test_await_xcom_sidecar_container_starts(self, mock_container_is_running): class TestAsyncPodManager: + @pytest.fixture + def mock_log_info(self): + with mock.patch.object(self.async_pod_manager.log, "info") as mock_log_info: + yield mock_log_info + def setup_method(self): self.mock_async_hook = mock.AsyncMock() self.async_pod_manager = AsyncPodManager( async_hook=self.mock_async_hook, callbacks=[], ) + # List of PodManager methods that may use tenacity retry + tenacity_methods = [ + "await_pod_start", + "fetch_container_logs_before_current_sec", + "read_pod_events", + "read_pod", + ] + # Patch tenacity retry wait for all relevant methods to disable waiting in tests + for method_name in tenacity_methods: + method = getattr(self.async_pod_manager, method_name, None) + if method and hasattr(method, "retry"): + method.retry.wait = wait_none() @pytest.mark.asyncio async def test_start_pod_raises_informative_error_on_scheduled_timeout(self): @@ -770,7 +808,7 @@ async def test_start_pod_raises_fast_error_on_image_error(self): @pytest.mark.asyncio @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) - async def test_start_pod_startup_interval_seconds(self, mock_time_sleep): + async def test_start_pod_startup_interval_seconds(self, mock_time_sleep, mock_log_info): condition_scheduled = mock.MagicMock() condition_scheduled.type = "PodScheduled" condition_scheduled.status = "True" @@ -797,23 +835,23 @@ async def test_start_pod_startup_interval_seconds(self, mock_time_sleep): schedule_timeout = 30 startup_timeout = 60 mock_pod = mock.MagicMock() - with mock.patch.object(self.async_pod_manager.log, "info") as mock_log_info: - await self.async_pod_manager.await_pod_start( - pod=mock_pod, - schedule_timeout=schedule_timeout, - startup_timeout=startup_timeout, - check_interval=startup_check_interval, - ) - assert mock_time_sleep.call_count == 3 - mock_log_info.assert_any_call( - "::group::Waiting until %ss to get the POD scheduled...", schedule_timeout - ) - mock_log_info.assert_any_call("Waiting %ss to get the POD running...", startup_timeout) - assert self.async_pod_manager.stop_watching_events is True + + await self.async_pod_manager.await_pod_start( + pod=mock_pod, + schedule_timeout=schedule_timeout, + startup_timeout=startup_timeout, + check_interval=startup_check_interval, + ) + assert mock_time_sleep.call_count == 3 + mock_log_info.assert_any_call( + "::group::Waiting until %ss to get the POD scheduled...", schedule_timeout + ) + mock_log_info.assert_any_call("Waiting %ss to get the POD running...", startup_timeout) + assert self.async_pod_manager.stop_watching_events is True @pytest.mark.asyncio @mock.patch("asyncio.sleep", new_callable=mock.AsyncMock) - async def test_watch_pod_events(self, mock_time_sleep): + async def test_watch_pod_events(self, mock_time_sleep, mock_log_info): mock_pod = mock.MagicMock() mock_pod.metadata.name = "test-pod" mock_pod.metadata.namespace = "default" @@ -833,15 +871,98 @@ def get_pod_events_side_effect(name, namespace): self.mock_async_hook.get_pod_events.side_effect = get_pod_events_side_effect - with mock.patch.object(self.async_pod_manager.log, "info") as mock_log_info: - await self.async_pod_manager.watch_pod_events(pod=mock_pod, check_interval=startup_check_interval) - mock_log_info.assert_any_call( - "The Pod has an Event: %s from %s", "test event 1", "object event 1" + await self.async_pod_manager.watch_pod_events(pod=mock_pod, check_interval=startup_check_interval) + mock_log_info.assert_any_call("The Pod has an Event: %s from %s", "test event 1", "object event 1") + mock_log_info.assert_any_call("The Pod has an Event: %s from %s", "test event 2", "object event 2") + mock_time_sleep.assert_called_once_with(startup_check_interval) + + @pytest.mark.asyncio + @pytest.mark.parametrize( + "log_lines, now, expected_log_messages, not_expected_log_messages", + [ + # Case 1: No logs + ([], pendulum.now(), [], []), + # Case 2: One log line with timestamp before now + ( + [f"{pendulum.now().subtract(seconds=2).to_iso8601_string()} message"], + pendulum.now(), + ["message"], + [], + ), + # Case 3: Log line with timestamp equal to now (should be skipped, so last_time is None) + ([f"{pendulum.now().to_iso8601_string()} message"], pendulum.now(), [], ["message"]), + # Case 4: Multiple log lines, last before now + ( + [ + f"{pendulum.now().subtract(seconds=3).to_iso8601_string()} msg1", + f"{pendulum.now().subtract(seconds=2).to_iso8601_string()} msg2", + ], + pendulum.now(), + ["msg1", "msg2"], + [], + ), + # Case 5: Log lines with continuation (no timestamp) + ( + [ + f"{pendulum.now().subtract(seconds=2).to_iso8601_string()} msg1", + "continued line", + ], + pendulum.now(), + ["msg1\ncontinued line"], + [], + ), + # Case 6: Log lines with continuation (no timestamp) + ( + [ + f"{pendulum.now().subtract(seconds=2).to_iso8601_string()} msg1", + f"{pendulum.now().to_iso8601_string()} msg2", + ], + pendulum.now(), + ["msg1"], + ["msg2"], + ), + ], + ) + async def test_fetch_container_logs_before_current_sec_various_logs( + self, log_lines, now, expected_log_messages, not_expected_log_messages + ): + pod = mock.MagicMock() + container_name = "base" + since_time = now.subtract(minutes=1) + mock_async_hook = mock.AsyncMock() + mock_async_hook.read_logs.return_value = log_lines + + with mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.pendulum.now", return_value=now): + async_pod_manager = AsyncPodManager( + async_hook=mock_async_hook, + callbacks=[], ) - mock_log_info.assert_any_call( - "The Pod has an Event: %s from %s", "test event 2", "object event 2" + with mock.patch.object(async_pod_manager.log, "info") as mock_log_info: + result = await async_pod_manager.fetch_container_logs_before_current_sec( + pod=pod, container_name=container_name, since_time=since_time + ) + assert result == now + + for expected in expected_log_messages: + mock_log_info.assert_any_call("[%s] %s", container_name, expected) + for not_expected in not_expected_log_messages: + unexpected_call = mock.call("[%s] %s", container_name, not_expected) + assert unexpected_call not in mock_log_info.mock_calls + + @pytest.mark.asyncio + async def test_fetch_container_logs_before_current_sec_error_handling(self): + pod = mock.MagicMock() + container_name = "base" + since_time = pendulum.now().subtract(minutes=1) + + async def fake_read_logs(**kwargs): + raise KubernetesApiError("error") + + self.async_pod_manager._hook.read_logs = fake_read_logs + with pytest.raises(KubernetesApiError): + await self.async_pod_manager.fetch_container_logs_before_current_sec( + pod=pod, container_name=container_name, since_time=since_time ) - mock_time_sleep.assert_called_once_with(startup_check_interval) class TestPodLogsConsumer: From f08db27666289b61d923338ba50b85e0b13529b5 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Thu, 30 Oct 2025 12:10:04 +0100 Subject: [PATCH 09/14] Removed not required code --- .../providers/cncf/kubernetes/operators/pod.py | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index 904b964659ea1..1ee7dd856fd17 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -899,17 +899,6 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: if not self.pod: raise PodNotFoundException("Could not find pod after resuming from deferral") - if event["status"] != "running": - for callback in self.callbacks: - callback.on_operator_resuming( - pod=self.pod, - event=event, - client=self.client, - mode=ExecutionMode.SYNC, - context=context, - operator=self, - ) - follow = self.logging_interval is None last_log_time = event.get("last_log_time") From 46360e3ca6ebf960cbc335749bb4d36f52e92ec5 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Thu, 30 Oct 2025 12:19:39 +0100 Subject: [PATCH 10/14] Removed not requried unit test --- .../unit/cncf/kubernetes/triggers/test_pod.py | 45 ------------------- 1 file changed, 45 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py index fc477fe0cf763..7ffb3cbc3f344 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py @@ -27,7 +27,6 @@ import pytest from kubernetes.client import models as k8s -from pendulum import DateTime from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState, KubernetesPodTrigger from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase @@ -252,50 +251,6 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( await generator.asend(None) assert "Container logs:" - @pytest.mark.asyncio - @pytest.mark.parametrize( - "logging_interval, exp_event", - [ - pytest.param( - 0, - { - "status": "running", - "last_log_time": DateTime(2022, 1, 1), - "name": POD_NAME, - "namespace": NAMESPACE, - }, - id="short_interval", - ), - ], - ) - @mock.patch(f"{TRIGGER_PATH}.define_container_state") - @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") - @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.AsyncKubernetesHook.get_pod") - async def test_running_log_interval( - self, mock_get_pod, mock_wait_pod, define_container_state, logging_interval, exp_event - ): - """ - If log interval given, should emit event with running status and last log time. - Otherwise, should make it to second loop and emit "done" event. - For this test we emit container status "running, running not". - The first "running" status gets us out of wait_for_pod_start. - The second "running" will fire a "running" event when logging interval is non-None. When logging - interval is None, the second "running" status will just result in continuation of the loop. And - when in the next loop we get a non-running status, the trigger fires a "done" event. - """ - define_container_state.return_value = "running" - trigger = KubernetesPodTrigger( - pod_name=POD_NAME, - pod_namespace=NAMESPACE, - trigger_start_time=datetime.datetime.now(tz=datetime.timezone.utc), - base_container_name=BASE_CONTAINER_NAME, - startup_timeout=5, - poll_interval=1, - logging_interval=1, - last_log_time=DateTime(2022, 1, 1), - ) - assert await trigger.run().__anext__() == TriggerEvent(exp_event) - @pytest.mark.parametrize( "container_state, expected_state", [ From ce8a001a9f6eef532829a9e95c3d1be33c17f02a Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Thu, 30 Oct 2025 13:30:38 +0100 Subject: [PATCH 11/14] Fixed unit tests --- .../cncf/kubernetes/operators/pod.py | 10 ---- .../cncf/kubernetes/operators/test_pod.py | 49 ------------------- 2 files changed, 59 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index 1ee7dd856fd17..4249f56337d4c 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -931,16 +931,6 @@ def trigger_reentry(self, context: Context, event: dict[str, Any]) -> Any: ) message = event.get("stack_trace", event["message"]) raise AirflowException(message) - - if event["status"] == "success": - # fetch some logs when pod is executed successfully - if self.get_logs: - self._write_logs(self.pod, follow=follow, since_time=last_log_time) - - if self.do_xcom_push: - xcom_sidecar_output = self.extract_xcom(pod=self.pod) - return xcom_sidecar_output - return except TaskDeferred: raise finally: diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py index abb503c8aa477..640d8373e6e41 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/operators/test_pod.py @@ -2166,18 +2166,6 @@ def test_process_duplicate_label_pods__pod_removed_if_delete_pod( process_pod_deletion_mock.assert_called_once_with(pod_1) assert result.metadata.name == pod_2.metadata.name - @patch(POD_MANAGER_CLASS.format("fetch_container_logs")) - @patch(KUB_OP_PATH.format("invoke_defer_method")) - def test_defere_call_one_more_time_after_error(self, invoke_defer_method, fetch_container_logs): - fetch_container_logs.return_value = PodLoggingStatus(False, None) - op = KubernetesPodOperator(task_id="test_task", name="test-pod", get_logs=True) - - op.trigger_reentry( - create_context(op), event={"name": TEST_NAME, "namespace": TEST_NAMESPACE, "status": "running"} - ) - - invoke_defer_method.assert_called_with(None) - class TestSuppress: def test__suppress(self, caplog): @@ -2606,32 +2594,6 @@ def test_cleanup_log_pod_spec_on_failure(self, log_pod_spec_on_failure, expect_m with pytest.raises(AirflowException, match=expect_match): k.cleanup(pod, pod) - @patch( - "airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.convert_config_file_to_dict" - ) - @patch(f"{HOOK_CLASS}.get_pod") - @patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.await_pod_completion") - @patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.fetch_container_logs") - def test_get_logs_running( - self, - fetch_container_logs, - await_pod_completion, - get_pod, - mock_convert_config_file_to_dict, - ): - """When logs fetch exits with status running, raise task deferred""" - pod = MagicMock() - get_pod.return_value = pod - op = KubernetesPodOperator(task_id="test_task", name="test-pod", get_logs=True) - await_pod_completion.return_value = None - fetch_container_logs.return_value = PodLoggingStatus(True, None) - with pytest.raises(TaskDeferred): - op.trigger_reentry( - create_context(op), - event={"name": TEST_NAME, "namespace": TEST_NAMESPACE, "status": "running"}, - ) - fetch_container_logs.is_called_with(pod, "base") - @patch(KUB_OP_PATH.format("_write_logs")) @patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.cleanup") @patch("airflow.providers.cncf.kubernetes.operators.pod.KubernetesPodOperator.find_pod") @@ -2702,17 +2664,6 @@ def test_execute_async_callbacks(self, mocked_hook): } k.trigger_reentry(context=context, event=callback_event) - # check on_operator_resuming callback - mock_callbacks.on_operator_resuming.assert_called_once() - assert mock_callbacks.on_operator_resuming.call_args.kwargs == { - "client": k.client, - "mode": ExecutionMode.SYNC, - "pod": remote_pod_mock, - "operator": k, - "context": context, - "event": callback_event, - } - # check on_pod_cleanup callback mock_callbacks.on_pod_cleanup.assert_called_once() assert mock_callbacks.on_pod_cleanup.call_args.kwargs == { From 1570f5430ce8a2777fea24c3abf824618ef91612 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Thu, 30 Oct 2025 14:12:59 +0100 Subject: [PATCH 12/14] Clean up unit tests --- .../providers/cncf/kubernetes/triggers/pod.py | 11 ++-- .../cncf/kubernetes/hooks/test_kubernetes.py | 39 ++++++------ .../unit/cncf/kubernetes/triggers/test_pod.py | 61 +++++++++++++++++++ 3 files changed, 83 insertions(+), 28 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py index bc7d610bbaf0f..29c1a382a0bf0 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/triggers/pod.py @@ -261,9 +261,7 @@ async def _wait_for_container_completion(self) -> TriggerEvent: time_begin = datetime.datetime.now(tz=datetime.timezone.utc) time_get_more_logs = None if self.logging_interval is not None: - # if required to read logs then read logs first time after 10 seconds and then switch to logging interval - # to inform the user that something is happening in the pod - time_get_more_logs = time_begin + datetime.timedelta(seconds=10) + time_get_more_logs = time_begin + datetime.timedelta(seconds=self.logging_interval) while True: pod = await self._get_pod() container_state = self.define_container_state(pod) @@ -289,14 +287,13 @@ async def _wait_for_container_completion(self) -> TriggerEvent: } ) self.log.debug("Container is not completed and still working.") - if time_get_more_logs and datetime.datetime.now(tz=datetime.timezone.utc) > time_get_more_logs: + now = datetime.datetime.now(tz=datetime.timezone.utc) + if time_get_more_logs and now >= time_get_more_logs: if self.get_logs and self.logging_interval: self.last_log_time = await self.pod_manager.fetch_container_logs_before_current_sec( pod, container_name=self.base_container_name, since_time=self.last_log_time ) - time_get_more_logs = datetime.datetime.now(tz=datetime.timezone.utc) + datetime.timedelta( - seconds=self.logging_interval - ) + time_get_more_logs = now + datetime.timedelta(seconds=self.logging_interval) self.log.debug("Sleeping for %s seconds.", self.poll_interval) await asyncio.sleep(self.poll_interval) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py index c25caaabd64b2..f6e51149ed36a 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/hooks/test_kubernetes.py @@ -1060,28 +1060,25 @@ async def test_read_logs(self, lib_method, kube_config_loader): config_file=None, cluster_context=None, ) - with mock.patch( - "airflow.providers.cncf.kubernetes.hooks.kubernetes.AsyncKubernetesHook.log", - new_callable=PropertyMock, - ): - logs = await hook.read_logs( - name=POD_NAME, - namespace=NAMESPACE, - container_name=CONTAINER_NAME, - since_seconds=10, - ) - lib_method.assert_called_once() - lib_method.assert_called_with( - name=POD_NAME, - namespace=NAMESPACE, - container_name=CONTAINER_NAME, - follow=False, - timestamps=True, - since_seconds=10, - ) - assert len(logs) == 1 - assert "2023-01-11 Some string logs..." in logs + logs = await hook.read_logs( + name=POD_NAME, + namespace=NAMESPACE, + container_name=CONTAINER_NAME, + since_seconds=10, + ) + + lib_method.assert_called_once() + lib_method.assert_called_with( + name=POD_NAME, + namespace=NAMESPACE, + container_name=CONTAINER_NAME, + follow=False, + timestamps=True, + since_seconds=10, + ) + assert len(logs) == 1 + assert "2023-01-11 Some string logs..." in logs @pytest.mark.asyncio @mock.patch(KUBE_BATCH_API.format("read_namespaced_job_status")) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py index 7ffb3cbc3f344..637e6dc70d9cc 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/triggers/test_pod.py @@ -27,6 +27,7 @@ import pytest from kubernetes.client import models as k8s +from pendulum import DateTime from airflow.providers.cncf.kubernetes.triggers.pod import ContainerState, KubernetesPodTrigger from airflow.providers.cncf.kubernetes.utils.pod_manager import PodPhase @@ -251,6 +252,66 @@ async def test_logging_in_trigger_when_fail_should_execute_successfully( await generator.asend(None) assert "Container logs:" + @pytest.mark.asyncio + @pytest.mark.parametrize( + "logging_interval, exp_event", + [ + pytest.param( + 0, + { + "status": "success", + "last_log_time": DateTime(2022, 1, 1), + "name": POD_NAME, + "namespace": NAMESPACE, + }, + id="short_interval", + ), + ], + ) + @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.datetime") + @mock.patch(f"{TRIGGER_PATH}.define_container_state") + @mock.patch(f"{TRIGGER_PATH}._wait_for_pod_start") + @mock.patch( + "airflow.providers.cncf.kubernetes.triggers.pod.AsyncPodManager.fetch_container_logs_before_current_sec" + ) + @mock.patch("airflow.providers.cncf.kubernetes.triggers.pod.AsyncKubernetesHook.get_pod") + async def test_running_log_interval( + self, + mock_get_pod, + mock_fetch_container_logs_before_current_sec, + mock_wait_pod, + define_container_state, + mock_datetime, + logging_interval, + exp_event, + ): + """ + If log interval given, check that the trigger fetches logs at the right times. + """ + fixed_now = datetime.datetime(2022, 1, 1, tzinfo=datetime.timezone.utc) + mock_datetime.datetime.now.side_effect = [ + fixed_now, + fixed_now + datetime.timedelta(seconds=1), + fixed_now + datetime.timedelta(seconds=2), + ] + + mock_datetime.timedelta = datetime.timedelta + mock_datetime.timezone = datetime.timezone + mock_fetch_container_logs_before_current_sec.return_value = DateTime(2022, 1, 1) + define_container_state.side_effect = ["running", "running", "terminated"] + trigger = KubernetesPodTrigger( + pod_name=POD_NAME, + pod_namespace=NAMESPACE, + trigger_start_time=fixed_now, + base_container_name=BASE_CONTAINER_NAME, + startup_timeout=5, + poll_interval=1, + logging_interval=1, + last_log_time=DateTime(2022, 1, 1), + ) + assert await trigger.run().__anext__() == TriggerEvent(exp_event) + assert mock_fetch_container_logs_before_current_sec.call_count == 2 + @pytest.mark.parametrize( "container_state, expected_state", [ From 85f59c9046f1bcbbbe578788361c7082199e54d2 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Mon, 3 Nov 2025 10:44:02 +0100 Subject: [PATCH 13/14] Adapt unit test --- .../unit/google/cloud/hooks/test_kubernetes_engine.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py b/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py index 2238595507a6a..a32c92dd4d4d6 100644 --- a/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py +++ b/providers/google/tests/unit/google/cloud/hooks/test_kubernetes_engine.py @@ -531,7 +531,7 @@ async def test_read_logs(self, read_namespaced_pod_log, get_conn_mock, async_hoo caplog.set_level(logging.INFO) self.make_mock_awaitable(read_namespaced_pod_log, result="Test string #1\nTest string #2\n") - await async_hook.read_logs(name=POD_NAME, namespace=POD_NAMESPACE) + logs = await async_hook.read_logs(name=POD_NAME, namespace=POD_NAMESPACE) get_conn_mock.assert_called_once_with() read_namespaced_pod_log.assert_called_with( @@ -539,9 +539,11 @@ async def test_read_logs(self, read_namespaced_pod_log, get_conn_mock, async_hoo namespace=POD_NAMESPACE, follow=False, timestamps=True, + container_name=None, + since_seconds=None, ) - assert "Test string #1" in caplog.text - assert "Test string #2" in caplog.text + assert "Test string #1" in logs + assert "Test string #2" in logs @pytest_asyncio.fixture From 6c22657c9913b56b6bed9d9dcc0696a76b82a1fb Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Mon, 3 Nov 2025 10:45:08 +0100 Subject: [PATCH 14/14] Adapt return type --- .../src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index ad161d1471b6e..49168cc6dfc52 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -912,7 +912,7 @@ async def delete_pod(self, name: str, namespace: str): async def read_logs( self, name: str, namespace: str, container_name: str | None = None, since_seconds: int | None = None - ): + ) -> list[str]: """ Read logs inside the pod while starting containers inside.