From 5016d68ffa91082ec063bc36334bcbe26ade3f4f Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Thu, 16 Oct 2025 07:41:11 +0200 Subject: [PATCH 1/3] Move container-related functions from PodManager to a separate file --- .../cncf/kubernetes/hooks/kubernetes.py | 33 ++++- .../cncf/kubernetes/operators/pod.py | 2 +- .../cncf/kubernetes/utils/container.py | 121 ++++++++++++++++ .../cncf/kubernetes/utils/pod_manager.py | 137 ++---------------- 4 files changed, 161 insertions(+), 132 deletions(-) create mode 100644 providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/container.py diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index e30458ef99f0b..7445f55cad2c3 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -23,7 +23,7 @@ from collections.abc import Generator from functools import cached_property from time import sleep -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Protocol import aiofiles import requests @@ -39,8 +39,7 @@ from airflow.models import Connection from airflow.providers.cncf.kubernetes.kube_client import _disable_verify_ssl, _enable_tcp_keepalive from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation -from airflow.providers.cncf.kubernetes.utils.pod_manager import ( - PodOperatorHookProtocol, +from airflow.providers.cncf.kubernetes.utils.container import ( container_is_completed, container_is_running, ) @@ -68,6 +67,34 @@ def _load_body_to_dict(body: str) -> dict: raise AirflowException(f"Exception when loading resource definition: {e}\n") return body_dict +class PodOperatorHookProtocol(Protocol): + """ + Protocol to define methods relied upon by KubernetesPodOperator. + + Subclasses of KubernetesPodOperator, such as GKEStartPodOperator, may use + hooks that don't extend KubernetesHook. We use this protocol to document the + methods used by KPO and ensure that these methods exist on such other hooks. + """ + + @property + def core_v1_client(self) -> client.CoreV1Api: + """Get authenticated client object.""" + + @property + def is_in_cluster(self) -> bool: + """Expose whether the hook is configured with ``load_incluster_config`` or not.""" + + def get_pod(self, name: str, namespace: str) -> V1Pod: + """Read pod object from kubernetes API.""" + + def get_namespace(self) -> str | None: + """Return the namespace that defined in the connection.""" + + def get_xcom_sidecar_container_image(self) -> str | None: + """Return the xcom sidecar image that defined in the connection.""" + + def get_xcom_sidecar_container_resources(self) -> str | None: + """Return the xcom sidecar resources that defined in the connection.""" class KubernetesHook(BaseHook, PodOperatorHookProtocol): """ diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index fdc1b36cf7e2f..e4b473ab47132 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -75,7 +75,6 @@ PodLaunchFailedException, PodManager, PodNotFoundException, - PodOperatorHookProtocol, PodPhase, container_is_succeeded, get_container_termination_message, @@ -95,6 +94,7 @@ import jinja2 from pendulum import DateTime + from airflow.providers.cncf.kubernetes.hooks.kubernetes import PodOperatorHookProtocol from airflow.providers.cncf.kubernetes.secret import Secret try: diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/container.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/container.py new file mode 100644 index 0000000000000..d384cb0a1ac50 --- /dev/null +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/container.py @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Helper functions for inspecting and interacting with containers in a Kubernetes Pod.""" + +from __future__ import annotations + +from contextlib import suppress +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from kubernetes.client.models.v1_container_status import V1ContainerStatus + from kubernetes.client.models.v1_pod import V1Pod + + +def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None: + """Retrieve container status.""" + if pod and pod.status: + container_statuses = [] + if pod.status.container_statuses: + container_statuses.extend(pod.status.container_statuses) + if pod.status.init_container_statuses: + container_statuses.extend(pod.status.init_container_statuses) + + else: + container_statuses = None + + if container_statuses: + # In general the variable container_statuses can store multiple items matching different containers. + # The following generator expression yields all items that have name equal to the container_name. + # The function next() here calls the generator to get only the first value. If there's nothing found + # then None is returned. + return next((x for x in container_statuses if x.name == container_name), None) + return None + + +def container_is_running(pod: V1Pod, container_name: str) -> bool: + """ + Examine V1Pod ``pod`` to determine whether ``container_name`` is running. + + If that container is present and running, returns True. Returns False otherwise. + """ + container_status = get_container_status(pod, container_name) + if not container_status: + return False + return container_status.state.running is not None + + +def container_is_completed(pod: V1Pod, container_name: str) -> bool: + """ + Examine V1Pod ``pod`` to determine whether ``container_name`` is completed. + + If that container is present and completed, returns True. Returns False otherwise. + """ + container_status = get_container_status(pod, container_name) + if not container_status: + return False + return container_status.state.terminated is not None + + +def container_is_succeeded(pod: V1Pod, container_name: str) -> bool: + """ + Examine V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded. + + If that container is present and completed and succeeded, returns True. Returns False otherwise. + """ + if not container_is_completed(pod, container_name): + return False + + container_status = get_container_status(pod, container_name) + if not container_status: + return False + return container_status.state.terminated.exit_code == 0 + + +def container_is_wait(pod: V1Pod, container_name: str) -> bool: + """ + Examine V1Pod ``pod`` to determine whether ``container_name`` is waiting. + + If that container is present and waiting, returns True. Returns False otherwise. + """ + container_status = get_container_status(pod, container_name) + if not container_status: + return False + + return container_status.state.waiting is not None + + +def container_is_terminated(pod: V1Pod, container_name: str) -> bool: + """ + Examine V1Pod ``pod`` to determine whether ``container_name`` is terminated. + + If that container is present and terminated, returns True. Returns False otherwise. + """ + container_statuses = pod.status.container_statuses if pod and pod.status else None + if not container_statuses: + return False + container_status = next((x for x in container_statuses if x.name == container_name), None) + if not container_status: + return False + return container_status.state.terminated is not None + + +def get_container_termination_message(pod: V1Pod, container_name: str): + with suppress(AttributeError, TypeError): + container_statuses = pod.status.container_statuses + container_status = next((x for x in container_statuses if x.name == container_name), None) + return container_status.state.terminated.message if container_status else None diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py index 64e606ed4ff16..c9dc45e57f414 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/pod_manager.py @@ -24,10 +24,10 @@ import math import time from collections.abc import Callable, Generator, Iterable -from contextlib import closing, suppress +from contextlib import closing from dataclasses import dataclass from datetime import timedelta -from typing import TYPE_CHECKING, Literal, Protocol, cast +from typing import TYPE_CHECKING, Literal, cast import pendulum import tenacity @@ -40,6 +40,13 @@ from airflow.exceptions import AirflowException from airflow.providers.cncf.kubernetes.callbacks import ExecutionMode, KubernetesPodOperatorCallback +from airflow.providers.cncf.kubernetes.utils.container import ( + container_is_completed, + container_is_running, + container_is_terminated, + container_is_wait, + get_container_status, +) from airflow.providers.cncf.kubernetes.utils.xcom_sidecar import PodDefaults from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.timezone import utcnow @@ -48,7 +55,6 @@ from kubernetes.client.models.core_v1_event_list import CoreV1EventList from kubernetes.client.models.v1_container_state import V1ContainerState from kubernetes.client.models.v1_container_state_waiting import V1ContainerStateWaiting - from kubernetes.client.models.v1_container_status import V1ContainerStatus from kubernetes.client.models.v1_object_reference import V1ObjectReference from kubernetes.client.models.v1_pod import V1Pod from kubernetes.client.models.v1_pod_condition import V1PodCondition @@ -89,131 +95,6 @@ class PodPhase: terminal_states = {FAILED, SUCCEEDED} -class PodOperatorHookProtocol(Protocol): - """ - Protocol to define methods relied upon by KubernetesPodOperator. - - Subclasses of KubernetesPodOperator, such as GKEStartPodOperator, may use - hooks that don't extend KubernetesHook. We use this protocol to document the - methods used by KPO and ensure that these methods exist on such other hooks. - """ - - @property - def core_v1_client(self) -> client.CoreV1Api: - """Get authenticated client object.""" - - @property - def is_in_cluster(self) -> bool: - """Expose whether the hook is configured with ``load_incluster_config`` or not.""" - - def get_pod(self, name: str, namespace: str) -> V1Pod: - """Read pod object from kubernetes API.""" - - def get_namespace(self) -> str | None: - """Return the namespace that defined in the connection.""" - - def get_xcom_sidecar_container_image(self) -> str | None: - """Return the xcom sidecar image that defined in the connection.""" - - def get_xcom_sidecar_container_resources(self) -> str | None: - """Return the xcom sidecar resources that defined in the connection.""" - - -def get_container_status(pod: V1Pod, container_name: str) -> V1ContainerStatus | None: - """Retrieve container status.""" - if pod and pod.status: - container_statuses = [] - if pod.status.container_statuses: - container_statuses.extend(pod.status.container_statuses) - if pod.status.init_container_statuses: - container_statuses.extend(pod.status.init_container_statuses) - - else: - container_statuses = None - - if container_statuses: - # In general the variable container_statuses can store multiple items matching different containers. - # The following generator expression yields all items that have name equal to the container_name. - # The function next() here calls the generator to get only the first value. If there's nothing found - # then None is returned. - return next((x for x in container_statuses if x.name == container_name), None) - return None - - -def container_is_running(pod: V1Pod, container_name: str) -> bool: - """ - Examine V1Pod ``pod`` to determine whether ``container_name`` is running. - - If that container is present and running, returns True. Returns False otherwise. - """ - container_status = get_container_status(pod, container_name) - if not container_status: - return False - return container_status.state.running is not None - - -def container_is_completed(pod: V1Pod, container_name: str) -> bool: - """ - Examine V1Pod ``pod`` to determine whether ``container_name`` is completed. - - If that container is present and completed, returns True. Returns False otherwise. - """ - container_status = get_container_status(pod, container_name) - if not container_status: - return False - return container_status.state.terminated is not None - - -def container_is_succeeded(pod: V1Pod, container_name: str) -> bool: - """ - Examine V1Pod ``pod`` to determine whether ``container_name`` is completed and succeeded. - - If that container is present and completed and succeeded, returns True. Returns False otherwise. - """ - if not container_is_completed(pod, container_name): - return False - - container_status = get_container_status(pod, container_name) - if not container_status: - return False - return container_status.state.terminated.exit_code == 0 - - -def container_is_wait(pod: V1Pod, container_name: str) -> bool: - """ - Examine V1Pod ``pod`` to determine whether ``container_name`` is waiting. - - If that container is present and waiting, returns True. Returns False otherwise. - """ - container_status = get_container_status(pod, container_name) - if not container_status: - return False - - return container_status.state.waiting is not None - - -def container_is_terminated(pod: V1Pod, container_name: str) -> bool: - """ - Examine V1Pod ``pod`` to determine whether ``container_name`` is terminated. - - If that container is present and terminated, returns True. Returns False otherwise. - """ - container_statuses = pod.status.container_statuses if pod and pod.status else None - if not container_statuses: - return False - container_status = next((x for x in container_statuses if x.name == container_name), None) - if not container_status: - return False - return container_status.state.terminated is not None - - -def get_container_termination_message(pod: V1Pod, container_name: str): - with suppress(AttributeError, TypeError): - container_statuses = pod.status.container_statuses - container_status = next((x for x in container_statuses if x.name == container_name), None) - return container_status.state.terminated.message if container_status else None - - def check_exception_is_kubernetes_api_unauthorized(exc: BaseException): return isinstance(exc, ApiException) and exc.status and str(exc.status) == "401" From 37295393121740775bb5634cbe802763865d29d2 Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Thu, 16 Oct 2025 11:16:27 +0200 Subject: [PATCH 2/3] Moved unit tests --- .../cncf/kubernetes/hooks/kubernetes.py | 2 + .../cncf/kubernetes/operators/pod.py | 6 +- .../cncf/kubernetes/utils/test_container.py | 172 ++++++++++++++++++ .../cncf/kubernetes/utils/test_pod_manager.py | 147 --------------- 4 files changed, 178 insertions(+), 149 deletions(-) create mode 100644 providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_container.py diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py index 7445f55cad2c3..3e746a2118104 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/hooks/kubernetes.py @@ -67,6 +67,7 @@ def _load_body_to_dict(body: str) -> dict: raise AirflowException(f"Exception when loading resource definition: {e}\n") return body_dict + class PodOperatorHookProtocol(Protocol): """ Protocol to define methods relied upon by KubernetesPodOperator. @@ -96,6 +97,7 @@ def get_xcom_sidecar_container_image(self) -> str | None: def get_xcom_sidecar_container_resources(self) -> str | None: """Return the xcom sidecar resources that defined in the connection.""" + class KubernetesHook(BaseHook, PodOperatorHookProtocol): """ Creates Kubernetes API connection. diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index e4b473ab47132..4522748ee5d3d 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -69,6 +69,10 @@ from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.providers.cncf.kubernetes.triggers.pod import KubernetesPodTrigger from airflow.providers.cncf.kubernetes.utils import xcom_sidecar +from airflow.providers.cncf.kubernetes.utils.container import ( + container_is_succeeded, + get_container_termination_message, +) from airflow.providers.cncf.kubernetes.utils.pod_manager import ( EMPTY_XCOM_RESULT, OnFinishAction, @@ -76,8 +80,6 @@ PodManager, PodNotFoundException, PodPhase, - container_is_succeeded, - get_container_termination_message, ) from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS, XCOM_RETURN_KEY diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_container.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_container.py new file mode 100644 index 0000000000000..51cbc00047ece --- /dev/null +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_container.py @@ -0,0 +1,172 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +from airflow.providers.cncf.kubernetes.utils.container import ( + container_is_running, + container_is_succeeded, + container_is_terminated, +) + + +@pytest.mark.parametrize( + "container_state, expected_is_terminated", + [("waiting", False), ("running", False), ("terminated", True)], +) +def test_container_is_terminated_with_waiting_state(container_state, expected_is_terminated): + container_status = MagicMock() + container_status.configure_mock( + **{ + "name": "base", + "state.waiting": True if container_state == "waiting" else None, + "state.running": True if container_state == "running" else None, + "state.terminated": True if container_state == "terminated" else None, + } + ) + pod_info = MagicMock() + pod_info.status.container_statuses = [container_status] + assert container_is_terminated(pod_info, "base") == expected_is_terminated + + +def params_for_test_container_is_running(): + """The `container_is_running` method is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This function + emits params used in `test_container_is_running` to verify this behavior. + + We create mock classes not derived from MagicMock because with an instance `e` of MagicMock, + tests like `e.hello is not None` are always True. + """ + + class RemotePodMock: + pass + + class ContainerStatusMock: + def __init__(self, name): + self.name = name + + def remote_pod(running=None, not_running=None): + e = RemotePodMock() + e.status = RemotePodMock() + e.status.container_statuses = [] + e.status.init_container_statuses = [] + for r in not_running or []: + e.status.container_statuses.append(container(r, False)) + for r in running or []: + e.status.container_statuses.append(container(r, True)) + return e + + def container(name, running): + c = ContainerStatusMock(name) + c.state = RemotePodMock() + c.state.running = {"a": "b"} if running else None + return c + + pod_mock_list = [] + pod_mock_list.append(pytest.param(None, False, id="None remote_pod")) + p = RemotePodMock() + p.status = None + pod_mock_list.append(pytest.param(p, False, id="None remote_pod.status")) + p = RemotePodMock() + p.status = RemotePodMock() + p.status.container_statuses = [] + p.status.init_container_statuses = [] + pod_mock_list.append(pytest.param(p, False, id="empty remote_pod.status.container_statuses")) + pod_mock_list.append(pytest.param(remote_pod(), False, id="filter empty")) + pod_mock_list.append(pytest.param(remote_pod(None, ["base"]), False, id="filter 0 running")) + pod_mock_list.append(pytest.param(remote_pod(["hello"], ["base"]), False, id="filter 1 not running")) + pod_mock_list.append(pytest.param(remote_pod(["base"], ["hello"]), True, id="filter 1 running")) + return pod_mock_list + + +@pytest.mark.parametrize("remote_pod, result", params_for_test_container_is_running()) +def test_container_is_running(remote_pod, result): + """The `container_is_running` function is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This test + verifies the expected behavior.""" + assert container_is_running(remote_pod, "base") is result + + +def params_for_test_container_is_succeeded(): + """The `container_is_succeeded` method is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This function + emits params used in `test_container_is_succeeded` to verify this behavior. + We create mock classes not derived from MagicMock because with an instance `e` of MagicMock, + tests like `e.hello is not None` are always True. + """ + + class RemotePodMock: + pass + + class ContainerStatusMock: + def __init__(self, name): + self.name = name + + def remote_pod(succeeded=None, not_succeeded=None): + e = RemotePodMock() + e.status = RemotePodMock() + e.status.container_statuses = [] + e.status.init_container_statuses = [] + for r in not_succeeded or []: + e.status.container_statuses.append(container(r, False)) + for r in succeeded or []: + e.status.container_statuses.append(container(r, True)) + return e + + def container(name, succeeded): + c = ContainerStatusMock(name) + c.state = RemotePodMock() + c.state.terminated = SimpleNamespace(**{"exit_code": 0}) if succeeded else None + return c + + pod_mock_list = [] + pod_mock_list.append(pytest.param(None, False, id="None remote_pod")) + p = RemotePodMock() + p.status = None + pod_mock_list.append(pytest.param(p, False, id="None remote_pod.status")) + p = RemotePodMock() + p.status = RemotePodMock() + p.status.container_statuses = None + p.status.init_container_statuses = [] + + pod_mock_list.append(pytest.param(p, False, id="None remote_pod.status.container_statuses")) + p = RemotePodMock() + p.status = RemotePodMock() + p.status.container_statuses = [] + p.status.init_container_statuses = [] + pod_mock_list.append(pytest.param(p, False, id="empty remote_pod.status.container_statuses")) + pod_mock_list.append(pytest.param(remote_pod(), False, id="filter empty")) + pod_mock_list.append(pytest.param(remote_pod(None, ["base"]), False, id="filter 0 succeeded")) + pod_mock_list.append(pytest.param(remote_pod(["hello"], ["base"]), False, id="filter 1 not succeeded")) + pod_mock_list.append(pytest.param(remote_pod(["base"], ["hello"]), True, id="filter 1 succeeded")) + return pod_mock_list + + +@pytest.mark.parametrize("remote_pod, result", params_for_test_container_is_succeeded()) +def test_container_is_succeeded(remote_pod, result): + """The `container_is_succeeded` function is designed to handle an assortment of bad objects + returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, + an object `e` such that `e.status.container_statuses` is None, and so on. This test + verifies the expected behavior.""" + assert container_is_succeeded(remote_pod, "base") is result diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py index 194d1e3e96ae2..2634119fc29f6 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/utils/test_pod_manager.py @@ -19,7 +19,6 @@ import logging from datetime import datetime from json.decoder import JSONDecodeError -from types import SimpleNamespace from typing import TYPE_CHECKING, cast from unittest import mock from unittest.mock import MagicMock, PropertyMock @@ -35,9 +34,6 @@ PodLogsConsumer, PodManager, PodPhase, - container_is_running, - container_is_succeeded, - container_is_terminated, ) from airflow.utils.timezone import utc @@ -611,24 +607,6 @@ def test_fetch_container_running_follow( assert ret.last_log_time == pendulum.datetime(2021, 1, 1, tz="UTC") assert ret.running is exp_running - @pytest.mark.parametrize( - "container_state, expected_is_terminated", - [("waiting", False), ("running", False), ("terminated", True)], - ) - def test_container_is_terminated_with_waiting_state(self, container_state, expected_is_terminated): - container_status = MagicMock() - container_status.configure_mock( - **{ - "name": "base", - "state.waiting": True if container_state == "waiting" else None, - "state.running": True if container_state == "running" else None, - "state.terminated": True if container_state == "terminated" else None, - } - ) - pod_info = MagicMock() - pod_info.status.container_statuses = [container_status] - assert container_is_terminated(pod_info, "base") == expected_is_terminated - @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.kubernetes_stream") @mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.extract_xcom_kill") def test_extract_xcom_success(self, mock_exec_xcom_kill, mock_kubernetes_stream): @@ -722,66 +700,6 @@ def test_await_xcom_sidecar_container_starts(self, mock_container_is_running): mock_container_is_running.assert_any_call(mock_pod, "airflow-xcom-sidecar") -def params_for_test_container_is_running(): - """The `container_is_running` method is designed to handle an assortment of bad objects - returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, - an object `e` such that `e.status.container_statuses` is None, and so on. This function - emits params used in `test_container_is_running` to verify this behavior. - - We create mock classes not derived from MagicMock because with an instance `e` of MagicMock, - tests like `e.hello is not None` are always True. - """ - - class RemotePodMock: - pass - - class ContainerStatusMock: - def __init__(self, name): - self.name = name - - def remote_pod(running=None, not_running=None): - e = RemotePodMock() - e.status = RemotePodMock() - e.status.container_statuses = [] - e.status.init_container_statuses = [] - for r in not_running or []: - e.status.container_statuses.append(container(r, False)) - for r in running or []: - e.status.container_statuses.append(container(r, True)) - return e - - def container(name, running): - c = ContainerStatusMock(name) - c.state = RemotePodMock() - c.state.running = {"a": "b"} if running else None - return c - - pod_mock_list = [] - pod_mock_list.append(pytest.param(None, False, id="None remote_pod")) - p = RemotePodMock() - p.status = None - pod_mock_list.append(pytest.param(p, False, id="None remote_pod.status")) - p = RemotePodMock() - p.status = RemotePodMock() - p.status.container_statuses = [] - p.status.init_container_statuses = [] - pod_mock_list.append(pytest.param(p, False, id="empty remote_pod.status.container_statuses")) - pod_mock_list.append(pytest.param(remote_pod(), False, id="filter empty")) - pod_mock_list.append(pytest.param(remote_pod(None, ["base"]), False, id="filter 0 running")) - pod_mock_list.append(pytest.param(remote_pod(["hello"], ["base"]), False, id="filter 1 not running")) - pod_mock_list.append(pytest.param(remote_pod(["base"], ["hello"]), True, id="filter 1 running")) - return pod_mock_list - - -@pytest.mark.parametrize("remote_pod, result", params_for_test_container_is_running()) -def test_container_is_running(remote_pod, result): - """The `container_is_running` function is designed to handle an assortment of bad objects - returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, - an object `e` such that `e.status.container_statuses` is None, and so on. This test - verifies the expected behavior.""" - assert container_is_running(remote_pod, "base") is result - - class TestPodLogsConsumer: @pytest.mark.parametrize( "chunks, expected_logs", @@ -958,68 +876,3 @@ def test_read_pod( # second read with time_machine.travel(mock_read_pod_at_1): assert consumer.read_pod() == expected_read_pods[1] - - -def params_for_test_container_is_succeeded(): - """The `container_is_succeeded` method is designed to handle an assortment of bad objects - returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, - an object `e` such that `e.status.container_statuses` is None, and so on. This function - emits params used in `test_container_is_succeeded` to verify this behavior. - We create mock classes not derived from MagicMock because with an instance `e` of MagicMock, - tests like `e.hello is not None` are always True. - """ - - class RemotePodMock: - pass - - class ContainerStatusMock: - def __init__(self, name): - self.name = name - - def remote_pod(succeeded=None, not_succeeded=None): - e = RemotePodMock() - e.status = RemotePodMock() - e.status.container_statuses = [] - e.status.init_container_statuses = [] - for r in not_succeeded or []: - e.status.container_statuses.append(container(r, False)) - for r in succeeded or []: - e.status.container_statuses.append(container(r, True)) - return e - - def container(name, succeeded): - c = ContainerStatusMock(name) - c.state = RemotePodMock() - c.state.terminated = SimpleNamespace(**{"exit_code": 0}) if succeeded else None - return c - - pod_mock_list = [] - pod_mock_list.append(pytest.param(None, False, id="None remote_pod")) - p = RemotePodMock() - p.status = None - pod_mock_list.append(pytest.param(p, False, id="None remote_pod.status")) - p = RemotePodMock() - p.status = RemotePodMock() - p.status.container_statuses = None - p.status.init_container_statuses = [] - - pod_mock_list.append(pytest.param(p, False, id="None remote_pod.status.container_statuses")) - p = RemotePodMock() - p.status = RemotePodMock() - p.status.container_statuses = [] - p.status.init_container_statuses = [] - pod_mock_list.append(pytest.param(p, False, id="empty remote_pod.status.container_statuses")) - pod_mock_list.append(pytest.param(remote_pod(), False, id="filter empty")) - pod_mock_list.append(pytest.param(remote_pod(None, ["base"]), False, id="filter 0 succeeded")) - pod_mock_list.append(pytest.param(remote_pod(["hello"], ["base"]), False, id="filter 1 not succeeded")) - pod_mock_list.append(pytest.param(remote_pod(["base"], ["hello"]), True, id="filter 1 succeeded")) - return pod_mock_list - - -@pytest.mark.parametrize("remote_pod, result", params_for_test_container_is_succeeded()) -def test_container_is_succeeded(remote_pod, result): - """The `container_is_succeeded` function is designed to handle an assortment of bad objects - returned from `read_pod`. E.g. a None object, an object `e` such that `e.status` is None, - an object `e` such that `e.status.container_statuses` is None, and so on. This test - verifies the expected behavior.""" - assert container_is_succeeded(remote_pod, "base") is result From 297f6d802fcf46c4d334fada4be014680c8d6aca Mon Sep 17 00:00:00 2001 From: AutomationDev85 Date: Tue, 21 Oct 2025 07:10:37 +0200 Subject: [PATCH 3/3] Optimize redundant container status checks --- .../src/airflow/providers/cncf/kubernetes/utils/container.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/container.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/container.py index d384cb0a1ac50..9113195feaf00 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/container.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/utils/container.py @@ -77,11 +77,8 @@ def container_is_succeeded(pod: V1Pod, container_name: str) -> bool: If that container is present and completed and succeeded, returns True. Returns False otherwise. """ - if not container_is_completed(pod, container_name): - return False - container_status = get_container_status(pod, container_name) - if not container_status: + if not container_status or container_status.state.terminated is None: return False return container_status.state.terminated.exit_code == 0