diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py index 7555a0277c1f9..0334947b47287 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor_utils.py @@ -47,18 +47,23 @@ from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, workload_to_command_args from airflow.providers.common.compat.sdk import AirflowException from airflow.utils.log.logging_mixin import LoggingMixin -from airflow.utils.singleton import Singleton from airflow.utils.state import TaskInstanceState if TYPE_CHECKING: from kubernetes.client import Configuration, models as k8s -class ResourceVersion(metaclass=Singleton): +class ResourceVersion: """Singleton for tracking resourceVersion from Kubernetes.""" + _instance: ResourceVersion | None = None resource_version: dict[str, str] = {} + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin): """Watches for Kubernetes jobs.""" diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 33345c1249812..470a3912cc064 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -257,6 +257,18 @@ def setup_method(self) -> None: self.kubernetes_executor = KubernetesExecutor() self.kubernetes_executor.job_id = 5 + def test_resource_version_singleton(self): + """Test that ResourceVersion returns the same instance.""" + rv1 = ResourceVersion() + rv2 = ResourceVersion() + + assert rv1 is rv2 + + rv1.resource_version["ns"] = "123" + assert rv2.resource_version["ns"] == "123" + + rv1.resource_version.clear() + @pytest.mark.skipif( AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed" )