diff --git a/platform_api/kube_cluster.py b/platform_api/kube_cluster.py index e0023d700..d857b4faf 100644 --- a/platform_api/kube_cluster.py +++ b/platform_api/kube_cluster.py @@ -8,6 +8,7 @@ from .cluster import Cluster from .cluster_config import ClusterConfig from .config import RegistryConfig, StorageConfig +from .orchestrator.kube_client import KubeClient, PodWatcher from .orchestrator.kube_config import KubeConfig from .orchestrator.kube_orchestrator import KubeOrchestrator, Orchestrator @@ -47,15 +48,33 @@ async def init(self) -> None: async def _init_orchestrator(self) -> None: logger.info(f"Cluster '{self.name}': initializing Orchestrator") + kube_client = KubeClient( + base_url=self._kube_config.endpoint_url, + cert_authority_data_pem=self._kube_config.cert_authority_data_pem, + cert_authority_path=self._kube_config.cert_authority_path, + auth_type=self._kube_config.auth_type, + auth_cert_path=self._kube_config.auth_cert_path, + auth_cert_key_path=self._kube_config.auth_cert_key_path, + token=self._kube_config.token, + token_path=self._kube_config.token_path, + namespace=self._kube_config.namespace, + conn_timeout_s=self._kube_config.client_conn_timeout_s, + read_timeout_s=self._kube_config.client_read_timeout_s, + conn_pool_size=self._kube_config.client_conn_pool_size, + trace_configs=self._trace_configs, + ) + pod_watcher = PodWatcher(kube_client) orchestrator = KubeOrchestrator( cluster_name=self.name, storage_configs=self._storage_configs, registry_config=self._registry_config, orchestrator_config=self._cluster_config.orchestrator, kube_config=self._kube_config, - trace_configs=self._trace_configs, + kube_client=kube_client, ) - await self._exit_stack.enter_async_context(orchestrator) + orchestrator.register(pod_watcher) + await self._exit_stack.enter_async_context(kube_client) + await self._exit_stack.enter_async_context(pod_watcher) self._orchestrator = orchestrator async def close(self) -> None: diff --git a/platform_api/orchestrator/base.py b/platform_api/orchestrator/base.py index 1e3c893c5..4243a5847 100644 --- a/platform_api/orchestrator/base.py +++ b/platform_api/orchestrator/base.py @@ -17,6 +17,16 @@ async def get_job_status(self, job: Job) -> JobStatusItem: async def delete_job(self, job: Job) -> JobStatus: pass + @abstractmethod + async def preempt_jobs( + self, jobs_to_schedule: list[Job], preemptible_jobs: list[Job] + ) -> list[Job]: + pass + + @abstractmethod + async def preempt_idle_jobs(self, jobs_to_schedule: list[Job]) -> bool: + pass + @abstractmethod async def get_missing_secrets( self, secret_path: str, secret_names: list[str] diff --git a/platform_api/orchestrator/kube_client.py b/platform_api/orchestrator/kube_client.py index 3b4a43563..2f8a7cd3c 100644 --- a/platform_api/orchestrator/kube_client.py +++ b/platform_api/orchestrator/kube_client.py @@ -1597,6 +1597,10 @@ def _parse_memory(cls, memory: str) -> int: return int(memory[:-2]) * 1024 raise ValueError("Memory format is not supported") + @property + def any(self) -> bool: + return self.cpu_mcores > 0 or self.memory > 0 or self.gpu > 0 + @property def cpu_mcores(self) -> int: return int(self.cpu * 1000) @@ -2313,6 +2317,17 @@ async def wait_pod_is_terminated( return await asyncio.sleep(interval_s) + async def wait_pod_is_deleted( + self, pod_name: str, timeout_s: float = 10.0 * 60, interval_s: float = 1.0 + ) -> None: + async with timeout(timeout_s): + while True: + try: + await self.get_pod(pod_name) + await asyncio.sleep(interval_s) + except JobNotFoundException: + return + async def create_default_network_policy( self, name: str, @@ -2460,6 +2475,8 @@ async def stop(self) -> None: self._watcher_task.cancel() with suppress(asyncio.CancelledError): await self._watcher_task + self._watcher_task = None + self._handlers.clear() async def _run(self, resource_version: str) -> None: while True: @@ -2497,7 +2514,7 @@ def get_pods_to_preempt( if resources.gpu: pods = [p for p in pods if p.resources and p.resources.gpu] pods_to_preempt: list[PodDescriptor] = [] - while pods and cls._has_resources(resources): + while pods and resources.any: logger.debug("Pods left: %d", len(pods)) logger.debug("Resources left: %s", resources) # max distance for a single resource is 1, 3 resources total @@ -2528,18 +2545,12 @@ def get_pods_to_preempt( resources.memory, resources.gpu or 0, ) - if cls._has_resources(resources): + if resources.any: logger.debug("Pods to preempt: []") return [] logger.debug("Pods to preempt: %s", [p.name for p in pods_to_preempt]) return pods_to_preempt - @classmethod - def _has_resources(cls, resources: NodeResources) -> bool: - return ( - resources.cpu_mcores > 0 or resources.memory > 0 or (resources.gpu or 0) > 0 - ) - @classmethod def _subtract_resources(cls, r1: NodeResources, r2: Resources) -> NodeResources: return replace( diff --git a/platform_api/orchestrator/kube_orchestrator.py b/platform_api/orchestrator/kube_orchestrator.py index aa4fbad2c..a1d81def5 100644 --- a/platform_api/orchestrator/kube_orchestrator.py +++ b/platform_api/orchestrator/kube_orchestrator.py @@ -9,8 +9,6 @@ from pathlib import PurePath from typing import Any, Optional, Union -import aiohttp - from platform_api.cluster_config import OrchestratorConfig from platform_api.config import RegistryConfig, StorageConfig from platform_api.resource import ResourcePoolType @@ -42,6 +40,7 @@ PodExec, PodRestartPolicy, PodStatus, + PodWatcher, PVCVolume, SecretVolume, Service, @@ -50,6 +49,7 @@ VolumeMount, ) from .kube_config import KubeConfig +from .kube_orchestrator_preemption import KubeOrchestratorPreemption logger = logging.getLogger(__name__) @@ -122,7 +122,7 @@ def __init__( registry_config: RegistryConfig, orchestrator_config: OrchestratorConfig, kube_config: KubeConfig, - trace_configs: Optional[list[aiohttp.TraceConfig]] = None, + kube_client: KubeClient, ) -> None: self._loop = asyncio.get_event_loop() self._cluster_name = cluster_name @@ -130,25 +130,8 @@ def __init__( self._registry_config = registry_config self._orchestrator_config = orchestrator_config self._kube_config = kube_config - - # TODO (A Danshyn 05/21/18): think of the namespace life-time; - # should we ensure it does exist before continuing - - self._client = KubeClient( - base_url=kube_config.endpoint_url, - cert_authority_data_pem=kube_config.cert_authority_data_pem, - cert_authority_path=kube_config.cert_authority_path, - auth_type=kube_config.auth_type, - auth_cert_path=kube_config.auth_cert_path, - auth_cert_key_path=kube_config.auth_cert_key_path, - token=kube_config.token, - token_path=kube_config.token_path, - namespace=kube_config.namespace, - conn_timeout_s=kube_config.client_conn_timeout_s, - read_timeout_s=kube_config.client_read_timeout_s, - conn_pool_size=kube_config.client_conn_pool_size, - trace_configs=trace_configs, - ) + self._client = kube_client + self._preemption = KubeOrchestratorPreemption(kube_client) # TODO (A Danshyn 11/16/18): make this configurable at some point self._docker_secret_name_prefix = "neurouser-" @@ -167,12 +150,13 @@ def orchestrator_config(self) -> OrchestratorConfig: def kube_config(self) -> KubeConfig: return self._kube_config + # TODO: remove async def __aenter__(self) -> "KubeOrchestrator": - await self._client.__aenter__() return self + # TODO: remove async def __aexit__(self, *args: Any) -> None: - await self._client.close() + pass @property def _main_storage_config(self) -> StorageConfig: @@ -189,6 +173,9 @@ def _extra_storage_configs(self) -> list[StorageConfig]: result.append(sc) return result + def register(self, pod_watcher: PodWatcher) -> None: + self._preemption.register(pod_watcher) + def create_storage_volumes( self, container_volume: ContainerVolume ) -> Sequence[PathVolume]: @@ -333,7 +320,7 @@ async def _delete_pod_network_policy(self, job: Job) -> None: except Exception as e: logger.warning(f"Failed to remove network policy {name}: {e}") - async def _create_pod_descriptor( + def _create_pod_descriptor( self, job: Job, tolerate_unreachable_node: bool = False ) -> PodDescriptor: pool_types = self._get_cheapest_pool_types(job) @@ -541,7 +528,7 @@ async def start_job( try: await self._create_pod_network_policy(job) - descriptor = await self._create_pod_descriptor( + descriptor = self._create_pod_descriptor( job, tolerate_unreachable_node=tolerate_unreachable_node ) pod = await self._client.create_pod(descriptor) @@ -744,7 +731,7 @@ async def _check_restartable_job_pod(self, job: Job) -> PodDescriptor: if do_recreate_pod: logger.info(f"Recreating preempted pod '{pod_name}'. Job '{job.id}'") - descriptor = await self._create_pod_descriptor(job) + descriptor = self._create_pod_descriptor(job) try: pod = await self._client.create_pod(descriptor) except JobError: @@ -875,3 +862,23 @@ async def delete_all_job_resources(self, job_id: str) -> None: await self._client.delete_all_ingresses(labels=labels) await self._client.delete_all_services(labels=labels) await self._client.delete_all_network_policies(labels=labels) + + async def preempt_jobs( + self, jobs_to_schedule: list[Job], preemptible_jobs: list[Job] + ) -> list[Job]: + job_pods_to_schedule = [ + self._create_pod_descriptor(job) for job in jobs_to_schedule + ] + preemptible_job_pods = [ + self._create_pod_descriptor(job) for job in preemptible_jobs + ] + preempted_pods = await self._preemption.preempt_pods( + job_pods_to_schedule, preemptible_job_pods + ) + preempted_pod_names = {pod.name for pod in preempted_pods} + return [job for job in preemptible_jobs if job.id in preempted_pod_names] + + async def preempt_idle_jobs(self, jobs_to_schedule: list[Job]) -> bool: + job_pods = [self._create_pod_descriptor(job) for job in jobs_to_schedule] + preempted_pods = await self._preemption.preempt_idle_pods(job_pods) + return len(preempted_pods) > 0 diff --git a/platform_api/orchestrator/kube_orchestrator_preemption.py b/platform_api/orchestrator/kube_orchestrator_preemption.py new file mode 100644 index 000000000..b5c5fa585 --- /dev/null +++ b/platform_api/orchestrator/kube_orchestrator_preemption.py @@ -0,0 +1,317 @@ +from __future__ import annotations + +import asyncio +import logging +from collections import defaultdict +from collections.abc import Callable, Iterable +from contextlib import suppress +from typing import Any + +from .kube_client import ( + KubeClient, + KubePreemption, + Node, + NodeResources, + NotFoundException, + PodDescriptor, + PodEventHandler, + PodStatus, + PodWatcher, + PodWatchEvent, + WatchEventType, +) + +logger = logging.getLogger(__name__) + + +class _Pod: + def __init__(self, payload: dict[str, Any]) -> None: + self._payload = payload + self._status = PodStatus(payload["status"]) + + @property + def payload(self) -> dict[str, Any]: + return self._payload + + @property + def name(self) -> str: + return self._payload["metadata"]["name"] + + @property + def labels(self) -> dict[str, str]: + return self._payload["metadata"].get("labels", {}) + + @property + def is_idle(self) -> bool: + return bool(self.labels.get("platform.neuromation.io/idle")) + + @property + def status(self) -> PodStatus: + return self._status + + @property + def node_name(self) -> str: + return self._payload["spec"]["nodeName"] + + @property + def resource_requests(self) -> NodeResources: + pod_resources = NodeResources() + for container in self._payload["spec"]["containers"]: + resources = container.get("resources") + if not resources: + continue + requests = resources.get("requests") + if not requests: + continue + pod_resources += NodeResources.from_primitive(requests) + return pod_resources + + +class NodeResourcesHandler(PodEventHandler): + def __init__(self, kube_client: KubeClient) -> None: + self._kube_client = kube_client + self._nodes: dict[str, Node] = {} + self._node_free_resources: dict[str, NodeResources] = {} + self._pod_node_names: dict[str, str] = {} + + async def init(self, raw_pods: list[dict[str, Any]]) -> None: + for raw_pod in raw_pods: + pod = _Pod(raw_pod) + if pod.status.is_scheduled and not pod.status.is_terminated: + await self._add_pod(pod) + + async def handle(self, event: PodWatchEvent) -> None: + pod = _Pod(event.raw_pod) + if not pod.status.is_scheduled: + return + if event.type == WatchEventType.DELETED or pod.status.is_terminated: + self._remove_pod(pod) + else: + await self._add_pod(pod) + + async def _add_pod(self, pod: _Pod) -> None: + pod_name = pod.name + if pod_name in self._pod_node_names: + return + node_name = pod.node_name + # Ignore error in case node was removed/lost but pod was not yet removed + with suppress(NotFoundException): + if node_name not in self._nodes: + node = await self._kube_client.get_node(node_name) + self._nodes[node_name] = node + self._node_free_resources[node_name] = node.allocatable_resources + self._pod_node_names[pod_name] = node_name + self._node_free_resources[node_name] -= pod.resource_requests + + def _remove_pod(self, pod: _Pod) -> None: + pod_name = pod.name + if pod_name not in self._pod_node_names: + return + node_name = pod.node_name + node = self._nodes.get(node_name) + if node: + node_free_resources = self._node_free_resources[node_name] + node_free_resources += pod.resource_requests + if node.allocatable_resources == node_free_resources: + self._node_free_resources.pop(node_name, None) + self._nodes.pop(node_name, None) + else: + self._node_free_resources[node_name] = node_free_resources + self._pod_node_names.pop(pod_name, None) + + def get_nodes(self) -> list[Node]: + return list(self._nodes.values()) + + def get_node_free_resources(self, node_name: str) -> NodeResources: + return self._node_free_resources.get(node_name) or NodeResources() + + def get_pod_node_name(self, pod_name: str) -> str | None: + """ + Get name of the node which runs pod. + Return None if pod is not in a Running state. + """ + return self._pod_node_names.get(pod_name) + + def _get_node_allocatable_resources(self, node_name: str) -> NodeResources: + node = self._nodes.get(node_name) + return node.allocatable_resources if node else NodeResources() + + +class IdlePodsHandler(PodEventHandler): + def __init__(self) -> None: + self._pods: dict[str, dict[str, PodDescriptor]] = defaultdict(dict) + + async def init(self, raw_pods: list[dict[str, Any]]) -> None: + for raw_pod in raw_pods: + pod = _Pod(raw_pod) + if pod.is_idle and pod.status.is_scheduled and not pod.status.is_terminated: + self._add_pod(pod) + + async def handle(self, event: PodWatchEvent) -> None: + pod = _Pod(event.raw_pod) + if not pod.is_idle or not pod.status.is_scheduled: + return + if event.type == WatchEventType.DELETED or pod.status.is_terminated: + self._remove_pod(pod) + else: + self._add_pod(pod) + + def _add_pod(self, pod: _Pod) -> None: + pod_name = pod.name + node_name = pod.node_name + # there is an issue in k8s, elements in items don't have kind and version + pod.payload["kind"] = "Pod" + self._pods[node_name][pod_name] = PodDescriptor.from_primitive(pod.payload) + + def _remove_pod(self, pod: _Pod) -> None: + node_name = pod.node_name + pod_name = pod.name + pods = self._pods[node_name] + pods.pop(pod_name, None) + if not pods: + self._pods.pop(node_name, None) + + def get_pods(self, node_name: str) -> list[PodDescriptor]: + pods = self._pods.get(node_name) + return list(pods.values()) if pods else [] + + +class KubeOrchestratorPreemption: + def __init__(self, kube_client: KubeClient) -> None: + self._kube_client = kube_client + self._kube_preemption = KubePreemption() + self._node_resources_handler = NodeResourcesHandler(kube_client) + self._idle_pods_handler = IdlePodsHandler() + + def register(self, pod_watcher: PodWatcher) -> None: + pod_watcher.subscribe(self._node_resources_handler) + pod_watcher.subscribe(self._idle_pods_handler) + + async def preempt_pods( + self, + pods_to_schedule: list[PodDescriptor], + preemptible_pods: list[PodDescriptor], + ) -> list[PodDescriptor]: + preemptible_pods_by_node = defaultdict(list) + for pod in preemptible_pods: + node_name = self._node_resources_handler.get_pod_node_name(pod.name) + if node_name: + preemptible_pods_by_node[node_name].append(pod) + preempted_pods = await self._preempt_pods( + pods_to_schedule, get_preemptible_pods=preemptible_pods_by_node.__getitem__ + ) + return preempted_pods + + async def preempt_idle_pods( + self, pods_to_schedule: list[PodDescriptor] + ) -> list[PodDescriptor]: + preempted_pods = await self._preempt_pods( + pods_to_schedule, get_preemptible_pods=self._idle_pods_handler.get_pods + ) + return preempted_pods + + async def _preempt_pods( + self, + pods_to_schedule: list[PodDescriptor], + get_preemptible_pods: Callable[[str], list[PodDescriptor]], + ) -> list[PodDescriptor]: + nodes_to_preempt: set[Node] = set() + pods_to_preempt: list[PodDescriptor] = [] + for pod in self._get_pods_to_schedule(pods_to_schedule): + # Handle one node per api poller iteration. + # Exclude nodes preempted in previous steps + # to avoid node resources tracking complexity. + node, pods = self._get_pods_to_preempt( + pod, + get_preemptible_pods=get_preemptible_pods, + exclude_nodes=nodes_to_preempt, + ) + if node: + nodes_to_preempt.add(node) + pods_to_preempt.extend(pods) + await self._delete_pods(pods_to_preempt) + return pods_to_preempt + + def _get_pods_to_schedule( + self, pods: Iterable[PodDescriptor] + ) -> list[PodDescriptor]: + def _create_key(pod: PodDescriptor) -> tuple[int, int, float]: + r = pod.resources + if not r: + return (0, 0, 0.0) + return (r.gpu or 0, r.memory, r.cpu) + + pods_to_schedule = [] + for pod in pods: + if not self._node_resources_handler.get_pod_node_name(pod.name): + pods_to_schedule.append(pod) + pods_to_schedule.sort( + key=_create_key + ) # Try to preempt pods for small pods first + return pods_to_schedule + + def _get_pods_to_preempt( + self, + pod_to_schedule: PodDescriptor, + get_preemptible_pods: Callable[[str], list[PodDescriptor]], + exclude_nodes: Iterable[Node], + ) -> tuple[Node | None, list[PodDescriptor]]: + for node in self._get_nodes(exclude_nodes): + if not pod_to_schedule.can_be_scheduled(node.labels): + continue + preemptible_pods = get_preemptible_pods(node.name) + if not preemptible_pods: + logger.debug("Node %r doesn't have pods to preempt", node.name) + continue + logger.debug("Find pods to preempt on node %r", node.name) + resources = self._get_resources_to_preempt(pod_to_schedule, node) + logger.debug("Resources to preempt on node %r: %s", node.name, resources) + pods_to_preempt = self._kube_preemption.get_pods_to_preempt( + resources, preemptible_pods + ) + if pods_to_preempt: + logger.info( + "Pods to preempt on node %r for pod %r: %r", + node.name, + pod_to_schedule.name, + [p.name for p in pods_to_preempt], + ) + return node, pods_to_preempt + logger.debug( + "Not enough resources on node %r for pod %r", + node.name, + pod_to_schedule.name, + ) + return None, [] + + def _get_nodes(self, exclude: Iterable[Node]) -> list[Node]: + def _create_key(node: Node) -> tuple[int, int, float]: + r = self._node_resources_handler.get_node_free_resources(node.name) + if not r: + return (0, 0, 0.0) + return (r.gpu or 0, r.memory, r.cpu) + + nodes = self._node_resources_handler.get_nodes() + nodes = [n for n in nodes if n not in exclude] + nodes.sort(key=_create_key) # Try to preempt nodes with less resources first + return nodes + + def _get_resources_to_preempt( + self, pod_to_schedule: PodDescriptor, node: Node + ) -> NodeResources: + free = self._node_resources_handler.get_node_free_resources(node.name) + required = pod_to_schedule.resources + if not required: + return NodeResources() + return NodeResources( + cpu=max(0, required.cpu - free.cpu), + memory=max(0, required.memory - free.memory), + gpu=max(0, (required.gpu or 0) - free.gpu), + ) + + async def _delete_pods(self, pods: Iterable[PodDescriptor]) -> None: + tasks = [ + asyncio.create_task(self._kube_client.delete_pod(pod.name)) for pod in pods + ] + if tasks: + await asyncio.wait(tasks) diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 207f66c7f..ec82fa433 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -569,7 +569,7 @@ async def kube_orchestrator_factory( registry_config: RegistryConfig, orchestrator_config: OrchestratorConfig, kube_config: KubeConfig, - event_loop: Any, + kube_client: KubeClient, ) -> Callable[..., KubeOrchestrator]: def _f(**kwargs: Any) -> KubeOrchestrator: defaults = dict( @@ -578,6 +578,7 @@ def _f(**kwargs: Any) -> KubeOrchestrator: registry_config=registry_config, orchestrator_config=orchestrator_config, kube_config=kube_config, + kube_client=kube_client, ) kwargs = {**defaults, **kwargs} return KubeOrchestrator(**kwargs) @@ -588,47 +589,24 @@ def _f(**kwargs: Any) -> KubeOrchestrator: @pytest.fixture async def kube_orchestrator( kube_orchestrator_factory: Callable[..., KubeOrchestrator], -) -> AsyncIterator[KubeOrchestrator]: - async with kube_orchestrator_factory() as kube_orchestrator: - yield kube_orchestrator +) -> KubeOrchestrator: + return kube_orchestrator_factory() @pytest.fixture async def kube_orchestrator_nfs( + kube_orchestrator_factory: Callable[..., KubeOrchestrator], storage_config_nfs: StorageConfig, - registry_config: RegistryConfig, - orchestrator_config: OrchestratorConfig, - kube_config: KubeConfig, - event_loop: Any, -) -> AsyncIterator[KubeOrchestrator]: - orchestrator = KubeOrchestrator( - cluster_name="default", - storage_configs=[storage_config_nfs], - registry_config=registry_config, - orchestrator_config=orchestrator_config, - kube_config=kube_config, - ) - async with orchestrator: - yield orchestrator +) -> KubeOrchestrator: + return kube_orchestrator_factory(storage_configs=[storage_config_nfs]) @pytest.fixture async def kube_orchestrator_pvc( + kube_orchestrator_factory: Callable[..., KubeOrchestrator], storage_config_pvc: StorageConfig, - registry_config: RegistryConfig, - orchestrator_config: OrchestratorConfig, - kube_config: KubeConfig, - event_loop: Any, -) -> AsyncIterator[KubeOrchestrator]: - orchestrator = KubeOrchestrator( - cluster_name="default", - storage_configs=[storage_config_pvc], - registry_config=registry_config, - orchestrator_config=orchestrator_config, - kube_config=kube_config, - ) - async with orchestrator: - yield orchestrator +) -> KubeOrchestrator: + return kube_orchestrator_factory(storage_configs=[storage_config_pvc]) @pytest.fixture @@ -682,7 +660,6 @@ async def kube_node_gpu( @pytest.fixture async def kube_node_tpu( - kube_config: KubeConfig, kube_client: MyKubeClient, delete_node_later: Callable[[str], Awaitable[None]], ) -> AsyncIterator[str]: diff --git a/tests/integration/test_kube_orchestrator.py b/tests/integration/test_kube_orchestrator.py index 274f812e3..7950a3aa2 100644 --- a/tests/integration/test_kube_orchestrator.py +++ b/tests/integration/test_kube_orchestrator.py @@ -44,10 +44,12 @@ IngressRule, KubeClient, NodeAffinity, + NodeResources, NodeSelectorRequirement, NodeSelectorTerm, NotFoundException, PodDescriptor, + PodWatcher, SecretRef, Service, StatusException, @@ -60,8 +62,8 @@ ) from platform_api.resource import GKEGPUModels, ResourcePoolType, TPUResource -from .conftest import ApiRunner, MyKubeClient from tests.conftest import random_str +from tests.integration.conftest import ApiRunner, MyKubeClient from tests.integration.test_api import ApiConfig @@ -2137,6 +2139,7 @@ async def kube_orchestrator( registry_config: RegistryConfig, orchestrator_config: OrchestratorConfig, kube_config: KubeConfig, + kube_client: KubeClient, kube_job_nodes_factory: Callable[ [OrchestratorConfig, KubeConfig], Awaitable[None] ], @@ -2201,6 +2204,7 @@ async def kube_orchestrator( registry_config=registry_config, orchestrator_config=orchestrator_config, kube_config=kube_config, + kube_client=kube_client, ) as kube_orchestrator: yield kube_orchestrator @@ -2916,3 +2920,201 @@ async def test_restart_always_succeeded( ), JobStatusItem.create(status=JobStatus.RUNNING), ) + + +class TestJobsPreemption: + @pytest.fixture(autouse=True) + async def start_pod_watcher( + self, kube_client: MyKubeClient, kube_orchestrator: KubeOrchestrator + ) -> AsyncIterator[None]: + watcher = PodWatcher(kube_client) + kube_orchestrator.register(watcher) + async with watcher: + yield + + @pytest.fixture + async def pod_factory( + self, pod_factory: Callable[..., Awaitable[PodDescriptor]] + ) -> Callable[..., Awaitable[PodDescriptor]]: + async def _create( + cpu: float = 0.1, + memory: int = 64, + wait: bool = True, + idle: bool = True, + ) -> PodDescriptor: + return await pod_factory( + image="gcr.io/google_containers/pause:3.1", + cpu=cpu, + memory=memory, + wait=wait, + idle=idle, + ) + + return _create + + @pytest.fixture + async def job_factory( + self, + kube_client: MyKubeClient, + kube_orchestrator: KubeOrchestrator, + delete_job_later: Callable[[Job], Awaitable[None]], + ) -> Callable[..., Awaitable[Job]]: + async def _create( + cpu: float = 0.1, + memory: int = 128, + wait: bool = False, + wait_timeout_s: float = 60, + ) -> Job: + container = Container( + image="gcr.io/google_containers/pause:3.1", + resources=ContainerResources(cpu=cpu, memory_mb=memory), + ) + job = MyJob( + orchestrator=kube_orchestrator, + record=JobRecord.create( + owner="owner1", + request=JobRequest.create(container), + cluster_name="test-cluster", + ), + ) + await kube_orchestrator.start_job(job) + await delete_job_later(job) + if wait: + await kube_client.wait_pod_is_running(job.id, timeout_s=wait_timeout_s) + return job + + return _create + + @pytest.fixture + async def node_resources( + self, kube_client: KubeClient, kube_node: str + ) -> NodeResources: + node = await kube_client.get_node(kube_node) + return node.allocatable_resources + + async def test_preempt_jobs( + self, + kube_client: MyKubeClient, + kube_orchestrator: KubeOrchestrator, + job_factory: Callable[..., Awaitable[Job]], + node_resources: NodeResources, + ) -> None: + preemptible_job = await job_factory(cpu=node_resources.cpu / 2, wait=True) + # Node should have less than cpu / 2 left + job = await job_factory(cpu=node_resources.cpu / 2) + preempted_jobs = await kube_orchestrator.preempt_jobs( + jobs_to_schedule=[job], preemptible_jobs=[preemptible_job] + ) + + assert preempted_jobs == [preemptible_job] + + await kube_client.wait_pod_is_deleted( + preemptible_job.id, timeout_s=60, interval_s=0.1 + ) + + async def test_preempt_idle_jobs( + self, + kube_client: MyKubeClient, + kube_orchestrator: KubeOrchestrator, + pod_factory: Callable[..., Awaitable[PodDescriptor]], + job_factory: Callable[..., Awaitable[Job]], + node_resources: NodeResources, + ) -> None: + idle_pod = await pod_factory(cpu=node_resources.cpu / 2) + # Node should have less than cpu / 2 left + job = await job_factory(cpu=node_resources.cpu / 2) + preempted = await kube_orchestrator.preempt_idle_jobs([job]) + + assert preempted is True + + await kube_client.wait_pod_is_deleted( + idle_pod.name, timeout_s=60, interval_s=0.1 + ) + + async def test_cannot_be_scheduled( + self, + kube_orchestrator_factory: Callable[..., KubeOrchestrator], + kube_config_factory: Callable[..., KubeConfig], + kube_client_factory: Callable[..., MyKubeClient], + pod_factory: Callable[..., Awaitable[PodDescriptor]], + job_factory: Callable[..., Awaitable[Job]], + node_resources: NodeResources, + ) -> None: + kube_config = kube_config_factory(node_label_node_pool="nodepool") + async with kube_client_factory(kube_config) as kube_client: + kube_orchestrator = kube_orchestrator_factory( + kube_config=kube_config, kube_client=kube_client + ) + await pod_factory(cpu=node_resources.cpu / 2) + # Node should have less than cpu / 2 left + job = await job_factory(cpu=node_resources.cpu / 2) + preempted = await kube_orchestrator.preempt_idle_jobs([job]) + + assert preempted is False + + job_pod = await kube_client.get_pod(job.id) + assert job_pod.status and job_pod.status.is_phase_pending + + async def test_not_enough_resources( + self, + kube_client: MyKubeClient, + kube_orchestrator: KubeOrchestrator, + pod_factory: Callable[..., Awaitable[PodDescriptor]], + job_factory: Callable[..., Awaitable[Job]], + node_resources: NodeResources, + ) -> None: + await pod_factory(cpu=node_resources.cpu / 2) + # Node should have less than cpu / 2 left + job = await job_factory(cpu=node_resources.cpu) + preempted = await kube_orchestrator.preempt_idle_jobs([job]) + + assert preempted is False + + job_pod = await kube_client.get_pod(job.id) + assert job_pod.status and job_pod.status.is_phase_pending + + async def test_running_jobs_ignored( + self, + kube_client: MyKubeClient, + kube_orchestrator: KubeOrchestrator, + pod_factory: Callable[..., Awaitable[PodDescriptor]], + job_factory: Callable[..., Awaitable[Job]], + ) -> None: + job = await job_factory() + idle_pod = await pod_factory() + preempted = await kube_orchestrator.preempt_idle_jobs([job]) + + assert preempted is False + + idle_pod = await kube_client.get_pod(idle_pod.name) + assert idle_pod.status and idle_pod.status.is_scheduled + + async def test_no_idle_jobs( + self, + kube_client: MyKubeClient, + kube_orchestrator: KubeOrchestrator, + job_factory: Callable[..., Awaitable[Job]], + node_resources: NodeResources, + ) -> None: + # Should not be scheduled + job = await job_factory(cpu=node_resources.cpu) + preempted = await kube_orchestrator.preempt_idle_jobs([job]) + + assert preempted is False + + job_pod = await kube_client.get_pod(job.id) + assert job_pod.status and job_pod.status.is_phase_pending + + async def test_no_jobs( + self, + kube_client: MyKubeClient, + kube_orchestrator: KubeOrchestrator, + pod_factory: Callable[..., Awaitable[PodDescriptor]], + ) -> None: + idle_pod = await pod_factory() + preempted = await kube_orchestrator.preempt_idle_jobs([]) + + assert preempted is False + + idle_pod = await kube_client.get_pod(idle_pod.name) + assert idle_pod.status and idle_pod.status.is_scheduled diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py index af8c87f1b..cd60166fb 100644 --- a/tests/unit/conftest.py +++ b/tests/unit/conftest.py @@ -79,6 +79,8 @@ def __init__(self, config: ClusterConfig) -> None: datetime.now, timezone.utc ) self._successfully_deleted_jobs: list[Job] = [] + self._idle_jobs: list[Job] = [] + self._preempted_jobs: list[Job] = [] @property def config(self) -> OrchestratorConfig: @@ -161,6 +163,26 @@ async def get_missing_secrets( async def get_missing_disks(self, disks: list[Disk]) -> list[Disk]: pass + def start_idle_job(self, job: Job) -> None: + self._idle_jobs.append(job) + + def get_idle_jobs(self) -> list[Job]: + return list(self._idle_jobs) + + def get_preempted_jobs(self) -> list[Job]: + return list(self._preempted_jobs) + + async def preempt_jobs( + self, jobs_to_schedule: list[Job], preemptible_jobs: list[Job] + ) -> list[Job]: + self._preempted_jobs.extend(preemptible_jobs) + return preemptible_jobs + + async def preempt_idle_jobs(self, jobs_to_schedule: list[Job]) -> bool: + self._preempted_jobs.extend(self._idle_jobs) + self._idle_jobs.clear() + return True + class MockJobsStorage(InMemoryJobsStorage): def __init__(self) -> None: diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py index 3b51b19b5..c80f6ca31 100644 --- a/tests/unit/test_config.py +++ b/tests/unit/test_config.py @@ -1,6 +1,7 @@ from datetime import timedelta from decimal import Decimal from pathlib import PurePath +from unittest import mock import pytest from yarl import URL @@ -15,7 +16,7 @@ ) from platform_api.config_factory import EnvironConfigFactory from platform_api.orchestrator.job_request import ContainerVolume -from platform_api.orchestrator.kube_client import SecretVolume +from platform_api.orchestrator.kube_client import KubeClient, SecretVolume from platform_api.orchestrator.kube_config import KubeClientAuthType from platform_api.orchestrator.kube_orchestrator import ( HostVolume, @@ -27,6 +28,11 @@ from platform_api.resource import ResourcePoolType +@pytest.fixture +def kube_client() -> KubeClient: + return mock.AsyncMock(spec=KubeClient) + + class TestStorageConfig: def test_missing_nfs_settings(self) -> None: with pytest.raises(ValueError, match="Missing NFS settings"): @@ -60,7 +66,9 @@ def test_is_nfs(self) -> None: class TestStorageVolume: - def test_create_storage_volume_nfs(self, registry_config: RegistryConfig) -> None: + def test_create_storage_volume_nfs( + self, kube_client: KubeClient, registry_config: RegistryConfig + ) -> None: storage_config = StorageConfig( host_mount_path=PurePath("/tmp"), type=StorageType.NFS, @@ -80,6 +88,7 @@ def test_create_storage_volume_nfs(self, registry_config: RegistryConfig) -> Non registry_config=registry_config, orchestrator_config=orchestrator_config, kube_config=kube_config, + kube_client=kube_client, ) container_volume = ContainerVolume.create( "storage://neuromation/public", @@ -96,7 +105,9 @@ def test_create_storage_volume_nfs(self, registry_config: RegistryConfig) -> Non ) ] - def test_create_storage_volume_host(self, registry_config: RegistryConfig) -> None: + def test_create_storage_volume_host( + self, kube_client: KubeClient, registry_config: RegistryConfig + ) -> None: storage_config = StorageConfig( host_mount_path=PurePath("/tmp"), type=StorageType.HOST ) @@ -113,6 +124,7 @@ def test_create_storage_volume_host(self, registry_config: RegistryConfig) -> No registry_config=registry_config, orchestrator_config=orchestrator_config, kube_config=kube_config, + kube_client=kube_client, ) container_volume = ContainerVolume.create( "storage://neuromation/public", @@ -124,7 +136,9 @@ def test_create_storage_volume_host(self, registry_config: RegistryConfig) -> No HostVolume(name="storage", path=None, host_path=PurePath("/tmp")) ] - def test_create_storage_volume_pvc(self, registry_config: RegistryConfig) -> None: + def test_create_storage_volume_pvc( + self, kube_client: KubeClient, registry_config: RegistryConfig + ) -> None: storage_config = StorageConfig( host_mount_path=PurePath("/tmp"), type=StorageType.PVC, pvc_name="testclaim" ) @@ -141,6 +155,7 @@ def test_create_storage_volume_pvc(self, registry_config: RegistryConfig) -> Non registry_config=registry_config, orchestrator_config=orchestrator_config, kube_config=kube_config, + kube_client=kube_client, ) container_volume = ContainerVolume.create( "storage://neuromation/public", @@ -150,7 +165,9 @@ def test_create_storage_volume_pvc(self, registry_config: RegistryConfig) -> Non volumes = kube_orchestrator.create_storage_volumes(container_volume) assert volumes == [PVCVolume(name="storage", path=None, claim_name="testclaim")] - def test_create_main_storage_volume(self, registry_config: RegistryConfig) -> None: + def test_create_main_storage_volume( + self, kube_client: KubeClient, registry_config: RegistryConfig + ) -> None: main_storage_config = StorageConfig( host_mount_path=PurePath("/tmp"), type=StorageType.PVC, @@ -175,6 +192,7 @@ def test_create_main_storage_volume(self, registry_config: RegistryConfig) -> No registry_config=registry_config, orchestrator_config=orchestrator_config, kube_config=kube_config, + kube_client=kube_client, ) container_volume = ContainerVolume.create( "storage://cluster/user", @@ -186,7 +204,9 @@ def test_create_main_storage_volume(self, registry_config: RegistryConfig) -> No PVCVolume(name="storage", path=None, claim_name="main-claim") ] - def test_create_extra_storage_volume(self, registry_config: RegistryConfig) -> None: + def test_create_extra_storage_volume( + self, kube_client: KubeClient, registry_config: RegistryConfig + ) -> None: main_storage_config = StorageConfig( host_mount_path=PurePath("/tmp"), type=StorageType.PVC, @@ -211,6 +231,7 @@ def test_create_extra_storage_volume(self, registry_config: RegistryConfig) -> N registry_config=registry_config, orchestrator_config=orchestrator_config, kube_config=kube_config, + kube_client=kube_client, ) container_volume = ContainerVolume.create( "storage://cluster/isolated/dir", @@ -226,7 +247,9 @@ def test_create_extra_storage_volume(self, registry_config: RegistryConfig) -> N ) ] - def test_create_all_storage_volumes(self, registry_config: RegistryConfig) -> None: + def test_create_all_storage_volumes( + self, kube_client: KubeClient, registry_config: RegistryConfig + ) -> None: main_storage_config = StorageConfig( host_mount_path=PurePath("/tmp"), type=StorageType.PVC, @@ -251,6 +274,7 @@ def test_create_all_storage_volumes(self, registry_config: RegistryConfig) -> No registry_config=registry_config, orchestrator_config=orchestrator_config, kube_config=kube_config, + kube_client=kube_client, ) container_volume = ContainerVolume.create( "storage://cluster", @@ -269,7 +293,9 @@ def test_create_all_storage_volumes(self, registry_config: RegistryConfig) -> No class TestSecretVolume: - def test_create_secret_volume(self, registry_config: RegistryConfig) -> None: + def test_create_secret_volume( + self, kube_client: KubeClient, registry_config: RegistryConfig + ) -> None: storage_config = StorageConfig( host_mount_path=PurePath("/tmp"), type=StorageType.PVC, pvc_name="testclaim" ) @@ -286,6 +312,7 @@ def test_create_secret_volume(self, registry_config: RegistryConfig) -> None: registry_config=registry_config, orchestrator_config=orchestrator_config, kube_config=kube_config, + kube_client=kube_client, ) user_name = "test-user" volume = kube_orchestrator.create_secret_volume(user_name) diff --git a/tests/unit/test_kube_orchestrator.py b/tests/unit/test_kube_orchestrator.py index 5186feada..e68ce2455 100644 --- a/tests/unit/test_kube_orchestrator.py +++ b/tests/unit/test_kube_orchestrator.py @@ -25,6 +25,7 @@ AlreadyExistsException, ContainerStatus, Ingress, + KubeClient, PathVolume, Resources, SecretEnvVar, @@ -1757,6 +1758,7 @@ def test_is_terminated(self) -> None: class TestKubeOrchestrator: @pytest.fixture def orchestrator(self) -> KubeOrchestrator: + kube_client = mock.AsyncMock(spec=KubeClient) return KubeOrchestrator( cluster_name="default", storage_configs=[ @@ -1780,6 +1782,7 @@ def orchestrator(self) -> KubeOrchestrator: presets=[], ), kube_config=KubeConfig(endpoint_url="https://kuberrnetes.svc"), + kube_client=kube_client, ) def test_create_main_storage_volumes(self, orchestrator: KubeOrchestrator) -> None: diff --git a/tests/unit/test_kube_orchestrator_preemption.py b/tests/unit/test_kube_orchestrator_preemption.py new file mode 100644 index 000000000..a7bdbad3a --- /dev/null +++ b/tests/unit/test_kube_orchestrator_preemption.py @@ -0,0 +1,416 @@ +from __future__ import annotations + +import uuid +from datetime import datetime +from typing import Any, Callable +from unittest import mock + +import pytest + +from platform_api.orchestrator.kube_client import ( + KubeClient, + Node, + NodeResources, + PodDescriptor, + PodWatchEvent, + Resources, +) +from platform_api.orchestrator.kube_orchestrator_preemption import ( + IdlePodsHandler, + NodeResourcesHandler, +) + +PodFactory = Callable[..., dict[str, Any]] + + +@pytest.fixture +def create_pod() -> PodFactory: + def _create( + name: str | None = None, + *, + cpu: float = 0.1, + memory: int = 128, + gpu: int = 1, + labels: dict[str, str] | None = None, + node_name: str | None = "minikube", + is_scheduled: bool = False, + is_running: bool = False, + is_terminated: bool = False, + ) -> dict[str, Any]: + pod = PodDescriptor( + name or f"pod-{uuid.uuid4()}", + labels=labels or {}, + image="gcr.io/google_containers/pause:3.1", + resources=Resources(cpu=cpu, memory=memory, gpu=gpu), + ) + raw_pod = pod.to_primitive() + raw_pod["metadata"]["creationTimestamp"] = datetime.now().isoformat() + raw_pod["status"] = {"phase": "Pending"} + scheduled_condition = { + "lastProbeTime": None, + "lastTransitionTime": datetime.now().isoformat(), + "status": "True", + "type": "PodScheduled", + } + if is_scheduled: + raw_pod["status"] = { + "phase": "Pending", + "containerStatuses": [{"state": {"waiting": {}}}], + "conditions": [scheduled_condition], + } + raw_pod["spec"]["nodeName"] = node_name + if is_running: + raw_pod["status"] = { + "phase": "Running", + "containerStatuses": [{"state": {"running": {}}}], + "conditions": [scheduled_condition], + } + raw_pod["spec"]["nodeName"] = node_name + if is_terminated: + raw_pod["status"] = { + "phase": "Succeeded", + "containerStatuses": [{"state": {"terminated": {}}}], + "conditions": [scheduled_condition], + } + raw_pod["spec"]["nodeName"] = node_name + return raw_pod + + return _create + + +class TestNodeResourcesHandler: + @pytest.fixture + def kube_client(self) -> KubeClient: + kube_client = mock.AsyncMock(spec=KubeClient) + kube_client.get_node.side_effect = [ + Node( + "minikube1", + allocatable_resources=NodeResources(cpu=1, memory=1024, gpu=1), + ), + Node( + "minikube2", + allocatable_resources=NodeResources(cpu=2, memory=4096, gpu=2), + ), + ] + return kube_client + + @pytest.fixture + def handler(self, kube_client: KubeClient) -> NodeResourcesHandler: + return NodeResourcesHandler(kube_client) + + async def test_init_pending( + self, handler: NodeResourcesHandler, create_pod: PodFactory + ) -> None: + pods = [create_pod("job")] + await handler.init(pods) + + assert len(handler.get_nodes()) == 0 + assert handler.get_pod_node_name("job") is None + resources = handler.get_node_free_resources("minikube") + assert resources == NodeResources() + + async def test_init_scheduled( + self, handler: NodeResourcesHandler, create_pod: PodFactory + ) -> None: + pods = [create_pod("job", is_scheduled=True)] + await handler.init(pods) + + assert len(handler.get_nodes()) == 1 + assert handler.get_pod_node_name("job") == "minikube" + resources = handler.get_node_free_resources("minikube") + assert resources == NodeResources(0.9, 896) + + async def test_init_running( + self, handler: NodeResourcesHandler, create_pod: PodFactory + ) -> None: + pods = [ + create_pod("job", gpu=0, is_running=True), + create_pod(is_running=True), + create_pod(node_name="minikube2", is_running=True), + ] + await handler.init(pods) + + assert len(handler.get_nodes()) == 2 + assert handler.get_pod_node_name("job") == "minikube" + resources = handler.get_node_free_resources("minikube") + assert resources == NodeResources(cpu=0.8, memory=768) + resources = handler.get_node_free_resources("minikube2") + assert resources == NodeResources(cpu=1.9, memory=3968, gpu=1) + + async def test_init_succeeded( + self, handler: NodeResourcesHandler, create_pod: PodFactory + ) -> None: + pods = [create_pod("job", is_terminated=True)] + await handler.init(pods) + + assert len(handler.get_nodes()) == 0 + assert handler.get_pod_node_name("job") is None + resources = handler.get_node_free_resources("minikube1") + assert resources == NodeResources() + + async def test_handle_added_pending( + self, handler: NodeResourcesHandler, create_pod: PodFactory + ) -> None: + await handler.handle(PodWatchEvent.create_added(create_pod("job"))) + + assert len(handler.get_nodes()) == 0 + assert handler.get_pod_node_name("job") is None + resources = handler.get_node_free_resources("minikube") + assert resources == NodeResources() + + async def test_handle_added_running( + self, handler: NodeResourcesHandler, create_pod: PodFactory + ) -> None: + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) + + assert len(handler.get_nodes()) == 1 + assert handler.get_pod_node_name("job") == "minikube" + resources = handler.get_node_free_resources("minikube") + assert resources == NodeResources(cpu=0.9, memory=896) + + async def test_handle_added_running_multiple_times( + self, handler: NodeResourcesHandler, create_pod: PodFactory + ) -> None: + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) + + assert len(handler.get_nodes()) == 1 + assert handler.get_pod_node_name("job") == "minikube" + resources = handler.get_node_free_resources("minikube") + assert resources == NodeResources(cpu=0.9, memory=896) + + async def test_handle_modified_succeeded( + self, handler: NodeResourcesHandler, create_pod: PodFactory + ) -> None: + await handler.handle( + PodWatchEvent.create_added(create_pod(gpu=0, is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_modified(create_pod("job", is_terminated=True)) + ) + + assert len(handler.get_nodes()) == 1 + assert handler.get_pod_node_name("job") is None + resources = handler.get_node_free_resources("minikube") + assert resources == NodeResources(cpu=0.9, memory=896, gpu=1) + + async def test_handle_deleted( + self, handler: NodeResourcesHandler, create_pod: PodFactory + ) -> None: + await handler.handle( + PodWatchEvent.create_added(create_pod(gpu=0, is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_deleted(create_pod("job", is_terminated=True)) + ) + + assert len(handler.get_nodes()) == 1 + assert handler.get_pod_node_name("job") is None + resources = handler.get_node_free_resources("minikube") + assert resources == NodeResources(cpu=0.9, memory=896, gpu=1) + + async def test_handle_deleted_multiple_times( + self, handler: NodeResourcesHandler, create_pod: PodFactory + ) -> None: + await handler.handle( + PodWatchEvent.create_added(create_pod(gpu=0, is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_deleted(create_pod("job", is_terminated=True)) + ) + await handler.handle( + PodWatchEvent.create_deleted(create_pod("job", is_terminated=True)) + ) + + assert len(handler.get_nodes()) == 1 + assert handler.get_pod_node_name("job") is None + resources = handler.get_node_free_resources("minikube") + assert resources == NodeResources(cpu=0.9, memory=896, gpu=1) + + async def test_handle_deleted_last( + self, handler: NodeResourcesHandler, create_pod: PodFactory + ) -> None: + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_deleted(create_pod("job", is_terminated=True)) + ) + + assert len(handler.get_nodes()) == 0 + resources = handler.get_node_free_resources("minikube") + assert resources == NodeResources() + + async def test_handle_deleted_not_existing( + self, handler: NodeResourcesHandler, create_pod: PodFactory + ) -> None: + await handler.handle( + PodWatchEvent.create_deleted(create_pod(is_terminated=True)) + ) + + assert len(handler.get_nodes()) == 0 + resources = handler.get_node_free_resources("minikube") + assert resources == NodeResources() + + async def test_get_node_free_resources_unknown_node( + self, handler: NodeResourcesHandler + ) -> None: + resources = handler.get_node_free_resources("minikube") + assert resources == NodeResources() + + +class TestIdlePodsHandler: + @pytest.fixture + def create_pod(self, create_pod: PodFactory) -> PodFactory: + def _create(*args: Any, is_idle: bool = True, **kwargs: Any) -> dict[str, Any]: + if is_idle: + kwargs["labels"] = {"platform.neuromation.io/idle": "true"} + return create_pod(*args, **kwargs) + + return _create + + @pytest.fixture + def handler(self) -> IdlePodsHandler: + return IdlePodsHandler() + + async def test_init_non_idle( + self, handler: IdlePodsHandler, create_pod: PodFactory + ) -> None: + pods = [create_pod(is_idle=False)] + await handler.init(pods) + + idle_pods = handler.get_pods("minikube") + assert len(idle_pods) == 0 + + async def test_init_pending( + self, handler: IdlePodsHandler, create_pod: PodFactory + ) -> None: + pods = [create_pod()] + await handler.init(pods) + + idle_pods = handler.get_pods("minikube") + assert len(idle_pods) == 0 + + async def test_init_scheduled( + self, handler: IdlePodsHandler, create_pod: PodFactory + ) -> None: + pods = [create_pod(is_scheduled=True)] + await handler.init(pods) + + idle_pods = handler.get_pods("minikube") + assert len(idle_pods) == 1 + + async def test_init_running( + self, handler: IdlePodsHandler, create_pod: PodFactory + ) -> None: + pods = [create_pod(is_running=True)] + await handler.init(pods) + + idle_pods = handler.get_pods("minikube") + assert len(idle_pods) == 1 + + async def test_init_succeeded( + self, handler: IdlePodsHandler, create_pod: PodFactory + ) -> None: + pods = [create_pod(is_terminated=True)] + await handler.init(pods) + + idle_pods = handler.get_pods("minikube") + assert len(idle_pods) == 0 + + async def test_handle_added_non_idle( + self, handler: IdlePodsHandler, create_pod: PodFactory + ) -> None: + event = PodWatchEvent.create_added(create_pod(is_idle=False, is_running=True)) + await handler.handle(event) + + pods = handler.get_pods("minikube") + assert len(pods) == 0 + + async def test_handle_added_pending( + self, handler: IdlePodsHandler, create_pod: PodFactory + ) -> None: + event = PodWatchEvent.create_added(create_pod()) + await handler.handle(event) + + pods = handler.get_pods("minikube") + assert len(pods) == 0 + + async def test_handle_added_scheduled( + self, handler: IdlePodsHandler, create_pod: PodFactory + ) -> None: + event = PodWatchEvent.create_added(create_pod(is_scheduled=True)) + await handler.handle(event) + + pods = handler.get_pods("minikube") + assert len(pods) == 1 + + async def test_handle_added_running( + self, handler: IdlePodsHandler, create_pod: PodFactory + ) -> None: + event = PodWatchEvent.create_added(create_pod(is_running=True)) + await handler.handle(event) + + pods = handler.get_pods("minikube") + assert len(pods) == 1 + + async def test_handle_modified_succeeded( + self, handler: IdlePodsHandler, create_pod: PodFactory + ) -> None: + event = PodWatchEvent.create_added(create_pod("idle-job", is_running=True)) + await handler.handle(event) + + pods = handler.get_pods("minikube") + assert len(pods) == 1 + + event = PodWatchEvent.create_modified( + create_pod("idle-job", is_terminated=True) + ) + await handler.handle(event) + + pods = handler.get_pods("minikube") + assert len(pods) == 0 + + async def test_handle_deleted( + self, handler: IdlePodsHandler, create_pod: PodFactory + ) -> None: + event = PodWatchEvent.create_added(create_pod("idle-job", is_running=True)) + await handler.handle(event) + + pods = handler.get_pods("minikube") + assert len(pods) == 1 + + event = PodWatchEvent.create_deleted(create_pod("idle-job", is_terminated=True)) + await handler.handle(event) + + pods = handler.get_pods("minikube") + assert len(pods) == 0 + + async def test_handle_deleted_not_existing( + self, handler: IdlePodsHandler, create_pod: PodFactory + ) -> None: + event = PodWatchEvent.create_deleted(create_pod(is_terminated=True)) + await handler.handle(event) + + pods = handler.get_pods("minikube") + assert len(pods) == 0 + + async def test_get_pods_unknown_node(self, handler: IdlePodsHandler) -> None: + idle_pods = handler.get_pods("minikube") + + assert len(idle_pods) == 0