diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py index 79e9b83581243..3d397eb5490e4 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/job.py @@ -42,7 +42,12 @@ from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, merge_objects from airflow.providers.cncf.kubernetes.triggers.job import KubernetesJobTrigger from airflow.providers.cncf.kubernetes.utils.pod_manager import EMPTY_XCOM_RESULT, PodNotFoundException -from airflow.providers.cncf.kubernetes.version_compat import BaseOperator +from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS + +if AIRFLOW_V_3_1_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator from airflow.utils import yaml from airflow.utils.context import Context diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py index 718f93428cfe0..9dc88403c67d4 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/kueue.py @@ -27,7 +27,12 @@ from airflow.exceptions import AirflowException from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator -from airflow.providers.cncf.kubernetes.version_compat import BaseOperator +from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS + +if AIRFLOW_V_3_1_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator class KubernetesInstallKueueOperator(BaseOperator): diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py index e550a4b73a916..d0f438b7d27f4 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/pod.py @@ -80,7 +80,12 @@ container_is_succeeded, get_container_termination_message, ) -from airflow.providers.cncf.kubernetes.version_compat import XCOM_RETURN_KEY, BaseOperator +from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS, XCOM_RETURN_KEY + +if AIRFLOW_V_3_1_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator from airflow.settings import pod_mutation_hook from airflow.utils import yaml from airflow.utils.helpers import prune_dict, validate_key diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/resource.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/resource.py index 65fd4d89d2d20..75fc79360c411 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/resource.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/operators/resource.py @@ -32,7 +32,12 @@ from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import should_retry_creation from airflow.providers.cncf.kubernetes.utils.delete_from import delete_from_yaml from airflow.providers.cncf.kubernetes.utils.k8s_resource_iterator import k8s_resource_iterator -from airflow.providers.cncf.kubernetes.version_compat import BaseOperator +from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_1_PLUS + +if AIRFLOW_V_3_1_PLUS: + from airflow.sdk import BaseOperator +else: + from airflow.models import BaseOperator if TYPE_CHECKING: from kubernetes.client import ApiClient, CustomObjectsApi diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py index 48cb809a47c84..50e713e134a53 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/sensors/spark_kubernetes.py @@ -25,7 +25,12 @@ from airflow.exceptions import AirflowException from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook -from airflow.providers.cncf.kubernetes.version_compat import BaseSensorOperator +from airflow.providers.cncf.kubernetes.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import BaseSensorOperator +else: + from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] if TYPE_CHECKING: from airflow.utils.context import Context diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py index da0c09bab59e4..812f1241781eb 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/version_compat.py @@ -37,16 +37,14 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: if AIRFLOW_V_3_1_PLUS: from airflow.models.xcom import XCOM_RETURN_KEY - from airflow.sdk import BaseHook, BaseOperator + from airflow.sdk import BaseHook from airflow.sdk.definitions.context import context_merge else: from airflow.hooks.base import BaseHook # type: ignore[attr-defined,no-redef] - from airflow.models import BaseOperator from airflow.utils.context import context_merge # type: ignore[attr-defined, no-redef] from airflow.utils.xcom import XCOM_RETURN_KEY # type: ignore[no-redef] if AIRFLOW_V_3_0_PLUS: - from airflow.sdk import BaseSensorOperator from airflow.sdk.bases.decorator import DecoratedOperator, TaskDecorator, task_decorator_factory else: from airflow.decorators.base import ( # type: ignore[no-redef] @@ -54,14 +52,14 @@ def get_base_airflow_version_tuple() -> tuple[int, int, int]: TaskDecorator, task_decorator_factory, ) - from airflow.sensors.base import BaseSensorOperator # type: ignore[no-redef] + +# BaseOperator and BaseSensorOperator removed from version_compat to avoid circular imports +# Import them directly in files that need them instead __all__ = [ "AIRFLOW_V_3_0_PLUS", "AIRFLOW_V_3_1_PLUS", "BaseHook", - "BaseOperator", - "BaseSensorOperator", "DecoratedOperator", "TaskDecorator", "task_decorator_factory",