From 35cc11f1cbf088d18151482606935586ba285dfa Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 14 Jun 2025 16:36:32 +0200 Subject: [PATCH 1/3] Prevent legacy hybrid executors to be running in Airflow 3 --- .../providers/celery/executors/celery_kubernetes_executor.py | 5 ++++- .../cncf/kubernetes/executors/local_kubernetes_executor.py | 4 ++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py index 8e895f6ed9044..d96ca3b7ec9e9 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -26,7 +26,7 @@ from airflow.configuration import conf from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning from airflow.executors.base_executor import BaseExecutor -from airflow.providers.celery.executors.celery_executor import CeleryExecutor +from airflow.providers.celery.executors.celery_executor import AIRFLOW_V_3_0_PLUS, CeleryExecutor try: from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor @@ -130,6 +130,9 @@ def job_id(self, value: int | str | None) -> None: def start(self) -> None: """Start celery and kubernetes executor.""" + if AIRFLOW_V_3_0_PLUS: + raise RuntimeError(f"{self.__class__.__name__} does not support Airflow 3.0+.") + self.celery_executor.start() self.kubernetes_executor.start() diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index 0eb6e5a1f2157..eba7eb95d92e0 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -26,6 +26,7 @@ from airflow.exceptions import AirflowProviderDeprecationWarning from airflow.executors.base_executor import BaseExecutor from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor +from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS if TYPE_CHECKING: from airflow.callbacks.base_callback_sink import BaseCallbackSink @@ -119,6 +120,9 @@ def job_id(self, value: int | str | None) -> None: def start(self) -> None: """Start local and kubernetes executor.""" + if AIRFLOW_V_3_0_PLUS: + raise RuntimeError(f"{self.__class__.__name__} does not support Airflow 3.0+.") + self.log.info("Starting local and Kubernetes Executor") self.local_executor.start() self.kubernetes_executor.start() From e7c8af555daa7fe0c030bc612b6a691e85b61740 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 14 Jun 2025 16:50:00 +0200 Subject: [PATCH 2/3] Add docs link to execption text --- .../celery/executors/celery_kubernetes_executor.py | 6 +++++- .../cncf/kubernetes/executors/local_kubernetes_executor.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py index d96ca3b7ec9e9..c160bb91d5975 100644 --- a/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py +++ b/providers/celery/src/airflow/providers/celery/executors/celery_kubernetes_executor.py @@ -131,7 +131,11 @@ def job_id(self, value: int | str | None) -> None: def start(self) -> None: """Start celery and kubernetes executor.""" if AIRFLOW_V_3_0_PLUS: - raise RuntimeError(f"{self.__class__.__name__} does not support Airflow 3.0+.") + raise RuntimeError( + f"{self.__class__.__name__} does not support Airflow 3.0+. See " + "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently" + " how to use multiple executors concurrently." + ) self.celery_executor.start() self.kubernetes_executor.start() diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py index eba7eb95d92e0..6feb2299bb0a2 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/local_kubernetes_executor.py @@ -121,7 +121,11 @@ def job_id(self, value: int | str | None) -> None: def start(self) -> None: """Start local and kubernetes executor.""" if AIRFLOW_V_3_0_PLUS: - raise RuntimeError(f"{self.__class__.__name__} does not support Airflow 3.0+.") + raise RuntimeError( + f"{self.__class__.__name__} does not support Airflow 3.0+. See " + "https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently" + " how to use multiple executors concurrently." + ) self.log.info("Starting local and Kubernetes Executor") self.local_executor.start() From 729e79edaff61f3cdf4c4cb0dcbf52dac8c96d33 Mon Sep 17 00:00:00 2001 From: Jens Scheffler Date: Sat, 14 Jun 2025 17:14:24 +0200 Subject: [PATCH 3/3] Fix pytests --- .../executors/test_celery_kubernetes_executor.py | 2 +- .../executors/test_local_kubernetes_executor.py | 12 +++++------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/providers/celery/tests/unit/celery/executors/test_celery_kubernetes_executor.py b/providers/celery/tests/unit/celery/executors/test_celery_kubernetes_executor.py index cadf718ec1a7e..8073cae4d8bb7 100644 --- a/providers/celery/tests/unit/celery/executors/test_celery_kubernetes_executor.py +++ b/providers/celery/tests/unit/celery/executors/test_celery_kubernetes_executor.py @@ -32,6 +32,7 @@ KUBERNETES_QUEUE = "kubernetes" +@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 3 does not support this executor anymore") class TestCeleryKubernetesExecutor: def test_supports_pickling(self): assert CeleryKubernetesExecutor.supports_pickling @@ -88,7 +89,6 @@ def test_start(self): celery_executor_mock.start.assert_called() k8s_executor_mock.start.assert_called() - @pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 3 doesn't have queue_command anymore") @pytest.mark.parametrize("test_queue", ["any-other-queue", KUBERNETES_QUEUE]) @mock.patch.object(CeleryExecutor, "queue_command") @mock.patch.object(KubernetesExecutor, "queue_command") diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_local_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_local_kubernetes_executor.py index c712b2d4b5e59..44832e9083c76 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_local_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_local_kubernetes_executor.py @@ -19,7 +19,9 @@ from unittest import mock -from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest +import pytest + +from airflow.callbacks.callback_requests import CallbackRequest from airflow.configuration import conf from airflow.executors.local_executor import LocalExecutor from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor import ( @@ -29,6 +31,7 @@ from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS +@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 3 does not support this executor anymore") class TestLocalKubernetesExecutor: def test_supports_pickling(self): assert not LocalKubernetesExecutor.supports_pickling @@ -115,12 +118,7 @@ def test_send_callback(self): local_k8s_exec = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock) local_k8s_exec.callback_sink = mock.MagicMock() - if AIRFLOW_V_3_0_PLUS: - callback = DagCallbackRequest( - filepath="fake", dag_id="fake", run_id="fake", bundle_name="fake", bundle_version=None - ) - else: - callback = CallbackRequest(full_filepath="fake") + callback = CallbackRequest(full_filepath="fake") local_k8s_exec.send_callback(callback) local_k8s_exec.callback_sink.send.assert_called_once_with(callback)