From 6ce720e4bae1b432ef2baa346cb3cb8f64ac1a94 Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Thu, 15 May 2025 09:16:23 -0400 Subject: [PATCH] Fix broken cncf.kubernetes for Airflow 2 The #50650 broke cncf.kubernetes for Airflow 2 - it removed a method that was not used in Airflow 3 but it turns out that those methods were also used in Airflow 2. We revert the change but also we add skipif for Airflow 3 to skip the test on Airflow 3 and to mark it essentially for removal when we drop compatibility with Airflow 2. Revert "Remove unused db method in k8s provider (#49186)" This reverts commit 0ca0f17996c86efb292cf5b10181944c67e3b862. --- .../cncf/kubernetes/template_rendering.py | 17 +++++++++ .../kubernetes/test_template_rendering.py | 38 ++++++++++++++++++- 2 files changed, 53 insertions(+), 2 deletions(-) diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py index 4f9c781ed100d..7f2cd83f33f4c 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/template_rendering.py @@ -19,11 +19,14 @@ from typing import TYPE_CHECKING +from jinja2 import TemplateAssertionError, UndefinedError from kubernetes.client.api_client import ApiClient +from airflow.exceptions import AirflowException from airflow.providers.cncf.kubernetes.kube_config import KubeConfig from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import create_unique_id from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator +from airflow.utils.session import NEW_SESSION, provide_session if TYPE_CHECKING: from airflow.models.taskinstance import TaskInstance @@ -58,3 +61,17 @@ def render_k8s_pod_yaml(task_instance: TaskInstance) -> dict | None: ) sanitized_pod = ApiClient().sanitize_for_serialization(pod) return sanitized_pod + + +@provide_session +def get_rendered_k8s_spec(task_instance: TaskInstance, session=NEW_SESSION) -> dict | None: + """Fetch rendered template fields from DB.""" + from airflow.models.renderedtifields import RenderedTaskInstanceFields + + rendered_k8s_spec = RenderedTaskInstanceFields.get_k8s_pod_yaml(task_instance, session=session) + if not rendered_k8s_spec: + try: + rendered_k8s_spec = render_k8s_pod_yaml(task_instance) + except (TemplateAssertionError, UndefinedError) as e: + raise AirflowException(f"Unable to render a k8s spec for this taskinstance: {e}") from e + return rendered_k8s_spec diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_template_rendering.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_template_rendering.py index 2495d766e3a78..d7e34d4125910 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_template_rendering.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/test_template_rendering.py @@ -24,8 +24,9 @@ from kubernetes.client import models as k8s from sqlalchemy.orm import make_transient -from airflow.models.renderedtifields import RenderedTaskInstanceFields as RTIF -from airflow.providers.cncf.kubernetes.template_rendering import render_k8s_pod_yaml +from airflow.models.renderedtifields import RenderedTaskInstanceFields, RenderedTaskInstanceFields as RTIF +from airflow.providers.cncf.kubernetes.template_rendering import get_rendered_k8s_spec, render_k8s_pod_yaml +from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS from airflow.utils import timezone from airflow.utils.session import create_session from airflow.version import version @@ -149,6 +150,39 @@ def test_render_k8s_pod_yaml_with_custom_pod_template_and_pod_override( assert ti_pod_yaml["metadata"]["annotations"]["test"] == "annotation" +@pytest.mark.skipif( + AIRFLOW_V_3_0_PLUS, + reason="This test is only needed for Airflow 2 - we can remove it after " + "only Airflow 3 is supported in providers", +) +@mock.patch.dict(os.environ, {"AIRFLOW_IS_K8S_EXECUTOR_POD": "True"}) +@mock.patch.object(RenderedTaskInstanceFields, "get_k8s_pod_yaml") +@mock.patch("airflow.providers.cncf.kubernetes.template_rendering.render_k8s_pod_yaml") +def test_get_rendered_k8s_spec(render_k8s_pod_yaml, rtif_get_k8s_pod_yaml, create_task_instance): + # Create new TI for the same Task + ti = create_task_instance() + + mock.patch.object(ti, "render_k8s_pod_yaml", autospec=True) + + fake_spec = {"ermagawds": "pods"} + + session = mock.Mock() + + rtif_get_k8s_pod_yaml.return_value = fake_spec + assert get_rendered_k8s_spec(ti, session=session) == fake_spec + + rtif_get_k8s_pod_yaml.assert_called_once_with(ti, session=session) + render_k8s_pod_yaml.assert_not_called() + + # Now test that when we _dont_ find it in the DB, it calls render_k8s_pod_yaml + rtif_get_k8s_pod_yaml.return_value = None + render_k8s_pod_yaml.return_value = fake_spec + + assert get_rendered_k8s_spec(session) == fake_spec + + render_k8s_pod_yaml.assert_called_once() + + @mock.patch.dict(os.environ, {"AIRFLOW_IS_K8S_EXECUTOR_POD": "True"}) @mock.patch("airflow.providers.cncf.kubernetes.template_rendering.render_k8s_pod_yaml") def test_get_k8s_pod_yaml(render_k8s_pod_yaml, dag_maker, session):