Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove compat code for 2.7.0 - its now the min Airflow version #39591

Merged
merged 1 commit into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 13 additions & 31 deletions airflow/providers/celery/executors/celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,37 +37,19 @@
from packaging.version import Version

from airflow import __version__ as airflow_version

try:
from airflow.cli.cli_config import (
ARG_DAEMON,
ARG_LOG_FILE,
ARG_PID,
ARG_SKIP_SERVE_LOGS,
ARG_STDERR,
ARG_STDOUT,
ARG_VERBOSE,
ActionCommand,
Arg,
GroupCommand,
lazy_load_command,
)
except ImportError:
import packaging.version

from airflow.exceptions import AirflowOptionalProviderFeatureException

base_version = packaging.version.parse(airflow_version).base_version

if packaging.version.parse(base_version) < packaging.version.parse("2.7.0"):
raise AirflowOptionalProviderFeatureException(
"Celery Executor from Celery Provider should only be used with Airflow 2.7.0+.\n"
f"This is Airflow {airflow_version} and Celery and CeleryKubernetesExecutor are "
f"available in the 'airflow.executors' package. You should not use "
f"the provider's executors in this version of Airflow."
)
raise

from airflow.cli.cli_config import (
ARG_DAEMON,
ARG_LOG_FILE,
ARG_PID,
ARG_SKIP_SERVE_LOGS,
ARG_STDERR,
ARG_STDOUT,
ARG_VERBOSE,
ActionCommand,
Arg,
GroupCommand,
lazy_load_command,
)
from airflow.configuration import conf
from airflow.exceptions import AirflowTaskTimeout
from airflow.executors.base_executor import BaseExecutor
Expand Down
46 changes: 14 additions & 32 deletions airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,38 +38,18 @@
from kubernetes.dynamic import DynamicClient
from sqlalchemy import select, update

from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException, PodReconciliationError
from airflow.stats import Stats

try:
from airflow.cli.cli_config import (
ARG_DAG_ID,
ARG_EXECUTION_DATE,
ARG_OUTPUT_PATH,
ARG_SUBDIR,
ARG_VERBOSE,
ActionCommand,
Arg,
GroupCommand,
lazy_load_command,
positive_int,
)
except ImportError:
import packaging.version

from airflow import __version__ as airflow_version
from airflow.exceptions import AirflowOptionalProviderFeatureException

base_version = packaging.version.parse(airflow_version).base_version

if packaging.version.parse(base_version) < packaging.version.parse("2.7.0"):
raise AirflowOptionalProviderFeatureException(
"Kubernetes Executor from CNCF Provider should only be used with Airflow 2.7.0+.\n"
f"This is Airflow {airflow_version} and Kubernetes and CeleryKubernetesExecutor are "
f"available in the 'airflow.executors' package. You should not use "
f"the provider's executors in this version of Airflow."
)
raise
from airflow.cli.cli_config import (
ARG_DAG_ID,
ARG_EXECUTION_DATE,
ARG_OUTPUT_PATH,
ARG_SUBDIR,
ARG_VERBOSE,
ActionCommand,
Arg,
GroupCommand,
lazy_load_command,
positive_int,
)
from airflow.configuration import conf
from airflow.executors.base_executor import BaseExecutor
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor_types import (
Expand All @@ -78,6 +58,8 @@
)
from airflow.providers.cncf.kubernetes.kube_config import KubeConfig
from airflow.providers.cncf.kubernetes.kubernetes_helper_functions import annotations_to_key
from airflow.providers.cncf.kubernetes.pod_generator import PodMutationHookException, PodReconciliationError
from airflow.stats import Stats
from airflow.utils.event_scheduler import EventScheduler
from airflow.utils.log.logging_mixin import remove_escape_codes
from airflow.utils.session import NEW_SESSION, provide_session
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import multiprocessing
import time
from queue import Empty, Queue
from typing import TYPE_CHECKING, Any, Generic, TypeVar
from typing import TYPE_CHECKING, Any

from kubernetes import client, watch
from kubernetes.client.rest import ApiException
Expand All @@ -36,6 +36,7 @@
)
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.singleton import Singleton
from airflow.utils.state import TaskInstanceState

try:
Expand All @@ -60,22 +61,6 @@
KubernetesWatchType,
)

# Singleton here is duplicated version of airflow.utils.singleton.Singleton until
# min-airflow version is 2.7.0 for the provider. then it can be imported from airflow.utils.singleton.

T = TypeVar("T")


class Singleton(type, Generic[T]):
"""Metaclass that allows to implement singleton pattern."""

_instances: dict[Singleton[T], T] = {}

def __call__(cls: Singleton[T], *args, **kwargs) -> T:
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]


class ResourceVersion(metaclass=Singleton):
"""Singleton for tracking resourceVersion from Kubernetes."""
Expand Down