Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from airflow.configuration import conf
from airflow.exceptions import AirflowOptionalProviderFeatureException, AirflowProviderDeprecationWarning
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.providers.celery.executors.celery_executor import AIRFLOW_V_3_0_PLUS, CeleryExecutor

try:
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
Expand Down Expand Up @@ -130,6 +130,13 @@ def job_id(self, value: int | str | None) -> None:

def start(self) -> None:
"""Start celery and kubernetes executor."""
if AIRFLOW_V_3_0_PLUS:
raise RuntimeError(
f"{self.__class__.__name__} does not support Airflow 3.0+. See "
"https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently"
" how to use multiple executors concurrently."
)

self.celery_executor.start()
self.kubernetes_executor.start()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
KUBERNETES_QUEUE = "kubernetes"


@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 3 does not support this executor anymore")
class TestCeleryKubernetesExecutor:
def test_supports_pickling(self):
assert CeleryKubernetesExecutor.supports_pickling
Expand Down Expand Up @@ -88,7 +89,6 @@ def test_start(self):
celery_executor_mock.start.assert_called()
k8s_executor_mock.start.assert_called()

@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 3 doesn't have queue_command anymore")
@pytest.mark.parametrize("test_queue", ["any-other-queue", KUBERNETES_QUEUE])
@mock.patch.object(CeleryExecutor, "queue_command")
@mock.patch.object(KubernetesExecutor, "queue_command")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS

if TYPE_CHECKING:
from airflow.callbacks.base_callback_sink import BaseCallbackSink
Expand Down Expand Up @@ -119,6 +120,13 @@ def job_id(self, value: int | str | None) -> None:

def start(self) -> None:
"""Start local and kubernetes executor."""
if AIRFLOW_V_3_0_PLUS:
raise RuntimeError(
f"{self.__class__.__name__} does not support Airflow 3.0+. See "
"https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/executor/index.html#using-multiple-executors-concurrently"
" how to use multiple executors concurrently."
)

self.log.info("Starting local and Kubernetes Executor")
self.local_executor.start()
self.kubernetes_executor.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

from unittest import mock

from airflow.callbacks.callback_requests import CallbackRequest, DagCallbackRequest
import pytest

from airflow.callbacks.callback_requests import CallbackRequest
from airflow.configuration import conf
from airflow.executors.local_executor import LocalExecutor
from airflow.providers.cncf.kubernetes.executors.local_kubernetes_executor import (
Expand All @@ -29,6 +31,7 @@
from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS


@pytest.mark.skipif(AIRFLOW_V_3_0_PLUS, reason="Airflow 3 does not support this executor anymore")
class TestLocalKubernetesExecutor:
def test_supports_pickling(self):
assert not LocalKubernetesExecutor.supports_pickling
Expand Down Expand Up @@ -115,12 +118,7 @@ def test_send_callback(self):
local_k8s_exec = LocalKubernetesExecutor(local_executor_mock, k8s_executor_mock)
local_k8s_exec.callback_sink = mock.MagicMock()

if AIRFLOW_V_3_0_PLUS:
callback = DagCallbackRequest(
filepath="fake", dag_id="fake", run_id="fake", bundle_name="fake", bundle_version=None
)
else:
callback = CallbackRequest(full_filepath="fake")
callback = CallbackRequest(full_filepath="fake")
local_k8s_exec.send_callback(callback)

local_k8s_exec.callback_sink.send.assert_called_once_with(callback)