Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,23 @@
from airflow.providers.cncf.kubernetes.pod_generator import PodGenerator, workload_to_command_args
from airflow.providers.common.compat.sdk import AirflowException
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.singleton import Singleton
from airflow.utils.state import TaskInstanceState

if TYPE_CHECKING:
from kubernetes.client import Configuration, models as k8s


class ResourceVersion(metaclass=Singleton):
class ResourceVersion:
"""Singleton for tracking resourceVersion from Kubernetes."""

_instance: ResourceVersion | None = None
resource_version: dict[str, str] = {}

def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance


class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
"""Watches for Kubernetes jobs."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,18 @@ def setup_method(self) -> None:
self.kubernetes_executor = KubernetesExecutor()
self.kubernetes_executor.job_id = 5

def test_resource_version_singleton(self):
"""Test that ResourceVersion returns the same instance."""
rv1 = ResourceVersion()
rv2 = ResourceVersion()

assert rv1 is rv2

rv1.resource_version["ns"] = "123"
assert rv2.resource_version["ns"] == "123"

rv1.resource_version.clear()

@pytest.mark.skipif(
AirflowKubernetesScheduler is None, reason="kubernetes python package is not installed"
)
Expand Down