diff --git a/airflow/providers/celery/executors/celery_executor.py b/airflow/providers/celery/executors/celery_executor.py index d81e57a2229e..9f1948db52e8 100644 --- a/airflow/providers/celery/executors/celery_executor.py +++ b/airflow/providers/celery/executors/celery_executor.py @@ -37,37 +37,19 @@ from packaging.version import Version from airflow import __version__ as airflow_version - -try: - from airflow.cli.cli_config import ( - ARG_DAEMON, - ARG_LOG_FILE, - ARG_PID, - ARG_SKIP_SERVE_LOGS, - ARG_STDERR, - ARG_STDOUT, - ARG_VERBOSE, - ActionCommand, - Arg, - GroupCommand, - lazy_load_command, - ) -except ImportError: - import packaging.version - - from airflow.exceptions import AirflowOptionalProviderFeatureException - - base_version = packaging.version.parse(airflow_version).base_version - - if packaging.version.parse(base_version) < packaging.version.parse("2.7.0"): - raise AirflowOptionalProviderFeatureException( - "Celery Executor from Celery Provider should only be used with Airflow 2.7.0+.\n" - f"This is Airflow {airflow_version} and Celery and CeleryKubernetesExecutor are " - f"available in the 'airflow.executors' package. You should not use " - f"the provider's executors in this version of Airflow." - ) - raise - +from airflow.cli.cli_config import ( + ARG_DAEMON, + ARG_LOG_FILE, + ARG_PID, + ARG_SKIP_SERVE_LOGS, + ARG_STDERR, + ARG_STDOUT, + ARG_VERBOSE, + ActionCommand, + Arg, + GroupCommand, + lazy_load_command, +) from airflow.configuration import conf from airflow.exceptions import AirflowTaskTimeout from airflow.executors.base_executor import BaseExecutor diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index f62f021fd264..43cefeb9c421 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -38,38 +38,18 @@ from kubernetes.dynamic import DynamicClient from sqlalchemy import select, update -from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException, PodReconciliationError -from airflow.stats import Stats - -try: - from airflow.cli.cli_config import ( - ARG_DAG_ID, - ARG_EXECUTION_DATE, - ARG_OUTPUT_PATH, - ARG_SUBDIR, - ARG_VERBOSE, - ActionCommand, - Arg, - GroupCommand, - lazy_load_command, - positive_int, - ) -except ImportError: - import packaging.version - - from airflow import __version__ as airflow_version - from airflow.exceptions import AirflowOptionalProviderFeatureException - - base_version = packaging.version.parse(airflow_version).base_version - - if packaging.version.parse(base_version) < packaging.version.parse("2.7.0"): - raise AirflowOptionalProviderFeatureException( - "Kubernetes Executor from CNCF Provider should only be used with Airflow 2.7.0+.\n" - f"This is Airflow {airflow_version} and Kubernetes and CeleryKubernetesExecutor are " - f"available in the 'airflow.executors' package. You should not use " - f"the provider's executors in this version of Airflow." - ) - raise +from airflow.cli.cli_config import ( + ARG_DAG_ID, + ARG_EXECUTION_DATE, + ARG_OUTPUT_PATH, + ARG_SUBDIR, + ARG_VERBOSE, + ActionCommand, + Arg, + GroupCommand, + lazy_load_command, + positive_int, +) from airflow.configuration import conf from airflow.executors.base_executor import BaseExecutor from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import ( @@ -78,6 +58,8 @@ ) from airflow.providers.cncf.kubernetes.kube_config import KubeConfig from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key +from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException, PodReconciliationError +from airflow.stats import Stats from airflow.utils.event_scheduler import EventScheduler from airflow.utils.log.logging_mixin import remove_escape_codes from airflow.utils.session import NEW_SESSION, provide_session diff --git a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 3f544fe2feff..d26df876eff5 100644 --- a/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -21,7 +21,7 @@ import multiprocessing import time from queue import Empty, Queue -from typing import TYPE_CHECKING, Any, Generic, TypeVar +from typing import TYPE_CHECKING, Any from kubernetes import client, watch from kubernetes.client.rest import ApiException @@ -36,6 +36,7 @@ ) from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator from airflow.utils.log.logging_mixin import LoggingMixin +from airflow.utils.singleton import Singleton from airflow.utils.state import TaskInstanceState try: @@ -60,22 +61,6 @@ KubernetesWatchType, ) -# Singleton here is duplicated version of airflow.utils.singleton.Singleton until -# min-airflow version is 2.7.0 for the provider. then it can be imported from airflow.utils.singleton. - -T = TypeVar("T") - - -class Singleton(type, Generic[T]): - """Metaclass that allows to implement singleton pattern.""" - - _instances: dict[Singleton[T], T] = {} - - def __call__(cls: Singleton[T], *args, **kwargs) -> T: - if cls not in cls._instances: - cls._instances[cls] = super().__call__(*args, **kwargs) - return cls._instances[cls] - class ResourceVersion(metaclass=Singleton): """Singleton for tracking resourceVersion from Kubernetes."""