From 29674ae3d296a598b15f7019099dbf42cac684d5 Mon Sep 17 00:00:00 2001 From: Ivan Zubenko Date: Fri, 22 Apr 2022 19:33:49 +0300 Subject: [PATCH] refactor --- platform_api/orchestrator/kube_client.py | 14 +- .../kube_orchestrator_preemption.py | 77 ++--- .../unit/test_kube_orchestrator_preemption.py | 294 +++++++++--------- 3 files changed, 191 insertions(+), 194 deletions(-) diff --git a/platform_api/orchestrator/kube_client.py b/platform_api/orchestrator/kube_client.py index 20f0fe3ff..de3fa0bf3 100644 --- a/platform_api/orchestrator/kube_client.py +++ b/platform_api/orchestrator/kube_client.py @@ -1597,6 +1597,10 @@ def _parse_memory(cls, memory: str) -> int: return int(memory[:-2]) * 1024 raise ValueError("Memory format is not supported") + @property + def any(self) -> bool: + return self.cpu_mcores > 0 or self.memory > 0 or self.gpu > 0 + @property def cpu_mcores(self) -> int: return int(self.cpu * 1000) @@ -2510,7 +2514,7 @@ def get_pods_to_preempt( if resources.gpu: pods = [p for p in pods if p.resources and p.resources.gpu] pods_to_preempt: list[PodDescriptor] = [] - while pods and cls._has_resources(resources): + while pods and resources.any: logger.debug("Pods left: %d", len(pods)) logger.debug("Resources left: %s", resources) # max distance for a single resource is 1, 3 resources total @@ -2541,18 +2545,12 @@ def get_pods_to_preempt( resources.memory, resources.gpu or 0, ) - if cls._has_resources(resources): + if resources.any: logger.debug("Pods to preempt: []") return [] logger.debug("Pods to preempt: %s", [p.name for p in pods_to_preempt]) return pods_to_preempt - @classmethod - def _has_resources(cls, resources: NodeResources) -> bool: - return ( - resources.cpu_mcores > 0 or resources.memory > 0 or (resources.gpu or 0) > 0 - ) - @classmethod def _subtract_resources(cls, r1: NodeResources, r2: Resources) -> NodeResources: return replace( diff --git a/platform_api/orchestrator/kube_orchestrator_preemption.py b/platform_api/orchestrator/kube_orchestrator_preemption.py index 8c0a45885..1e73f8048 100644 --- a/platform_api/orchestrator/kube_orchestrator_preemption.py +++ b/platform_api/orchestrator/kube_orchestrator_preemption.py @@ -15,6 +15,7 @@ NotFoundException, PodDescriptor, PodEventHandler, + PodStatus, PodWatcher, PodWatchEvent, WatchEventType, @@ -23,51 +24,43 @@ logger = logging.getLogger(__name__) -class _Pod(dict[str, Any]): +class _Pod: + def __init__(self, payload: dict[str, Any]) -> None: + self._payload = payload + self._status = PodStatus(payload["status"]) + + @property + def payload(self) -> dict[str, Any]: + return self._payload + @property def name(self) -> str: - return self["metadata"]["name"] + return self._payload["metadata"]["name"] @property def labels(self) -> dict[str, str]: - return self["metadata"].get("labels", {}) + return self._payload["metadata"].get("labels", {}) @property def is_idle(self) -> bool: return bool(self.labels.get("platform.neuromation.io/idle")) @property - def node_name(self) -> str | None: - return self["spec"].get("nodeName") - - @property - def is_pending(self) -> bool: - return self["status"]["phase"] == "Pending" - - @property - def is_waiting_for_node(self) -> bool: - return self.is_pending and not bool(self["spec"].get("nodeName")) - - @property - def is_scheduled(self) -> bool: - return self.is_pending and bool(self.node_name) - - @property - def is_running(self) -> bool: - return self["status"]["phase"] == "Running" + def is_terminating(self) -> bool: + return bool(self._payload["metadata"].get("deletionTimestamp")) @property - def is_bound_to_node(self) -> bool: - return self.is_scheduled or self.is_running + def status(self) -> PodStatus: + return self._status @property - def is_terminating(self) -> bool: - return bool(self["metadata"].get("deletionTimestamp")) + def node_name(self) -> str: + return self._payload["spec"]["nodeName"] @property def resource_requests(self) -> NodeResources: pod_resources = NodeResources() - for container in self["spec"]["containers"]: + for container in self._payload["spec"]["containers"]: resources = container.get("resources") if not resources: continue @@ -88,25 +81,23 @@ def __init__(self, kube_client: KubeClient) -> None: async def init(self, raw_pods: list[dict[str, Any]]) -> None: for raw_pod in raw_pods: pod = _Pod(raw_pod) - if pod.is_bound_to_node: + if pod.status.is_scheduled and not pod.status.is_terminated: await self._add_pod(pod) async def handle(self, event: PodWatchEvent) -> None: pod = _Pod(event.raw_pod) - if pod.is_waiting_for_node: + if not pod.status.is_scheduled: return - if event.type == WatchEventType.DELETED: + if event.type == WatchEventType.DELETED or pod.status.is_terminated: self._remove_pod(pod) - elif pod.is_bound_to_node: - await self._add_pod(pod) else: - self._remove_pod(pod) + await self._add_pod(pod) async def _add_pod(self, pod: _Pod) -> None: pod_name = pod.name if pod_name in self._pod_names: return - node_name: str = pod.node_name # type: ignore + node_name = pod.node_name # Ignore error in case node was removed/lost but pod was not yet removed with suppress(NotFoundException): if node_name not in self._nodes: @@ -120,7 +111,7 @@ def _remove_pod(self, pod: _Pod) -> None: pod_name = pod.name if pod_name not in self._pod_names: return - node_name: str = pod.node_name # type: ignore + node_name = pod.node_name node = self._nodes.get(node_name) if node: node_free_resources = self._node_free_resources[node_name] @@ -157,33 +148,31 @@ def __init__(self) -> None: async def init(self, raw_pods: list[dict[str, Any]]) -> None: for raw_pod in raw_pods: pod = _Pod(raw_pod) - if pod.is_idle and pod.is_bound_to_node: + if pod.is_idle and pod.status.is_scheduled and not pod.status.is_terminated: self._add_pod(pod) async def handle(self, event: PodWatchEvent) -> None: pod = _Pod(event.raw_pod) - if not pod.is_idle or pod.is_waiting_for_node: + if not pod.is_idle or not pod.status.is_scheduled: return - if event.type == WatchEventType.DELETED: + if event.type == WatchEventType.DELETED or pod.status.is_terminated: self._notify_pod_terminating(pod) # in case it's force delete self._remove_pod(pod) - elif pod.is_bound_to_node: + else: self._add_pod(pod) if pod.is_terminating: self._notify_pod_terminating(pod) - else: - self._remove_pod(pod) def _add_pod(self, pod: _Pod) -> None: pod_name = pod.name - node_name: str = pod.node_name # type: ignore + node_name = pod.node_name # there is an issue in k8s, elements in items don't have kind and version - pod["kind"] = "Pod" + pod.payload["kind"] = "Pod" self._pod_names.add(pod_name) - self._pods[node_name][pod_name] = PodDescriptor.from_primitive(pod) + self._pods[node_name][pod_name] = PodDescriptor.from_primitive(pod.payload) def _remove_pod(self, pod: _Pod) -> None: - node_name: str = pod.node_name # type: ignore + node_name = pod.node_name pod_name = pod.name pods = self._pods[node_name] pods.pop(pod_name, None) diff --git a/tests/unit/test_kube_orchestrator_preemption.py b/tests/unit/test_kube_orchestrator_preemption.py index 53c759b56..baeead256 100644 --- a/tests/unit/test_kube_orchestrator_preemption.py +++ b/tests/unit/test_kube_orchestrator_preemption.py @@ -21,36 +21,61 @@ NodeResourcesHandler, ) -RawPodFactory = Callable[..., dict[str, Any]] +PodFactory = Callable[..., dict[str, Any]] @pytest.fixture -async def pod_factory() -> RawPodFactory: +def create_pod() -> PodFactory: def _create( name: str | None = None, - node_name: str | None = "minikube", + *, cpu: float = 0.1, memory: int = 128, gpu: int = 1, - phase: str = "Running", + labels: dict[str, str] | None = None, + node_name: str | None = "minikube", + is_scheduled: bool = False, + is_running: bool = False, is_terminating: bool = False, - is_idle: bool = False, + is_terminated: bool = False, ) -> dict[str, Any]: - labels = {} - if is_idle: - labels["platform.neuromation.io/idle"] = "true" pod = PodDescriptor( - name=name or f"pod-{uuid.uuid4()}", - labels=labels, + name or f"pod-{uuid.uuid4()}", + labels=labels or {}, image="gcr.io/google_containers/pause:3.1", resources=Resources(cpu=cpu, memory=memory, gpu=gpu), ) raw_pod = pod.to_primitive() raw_pod["metadata"]["creationTimestamp"] = datetime.now().isoformat() - raw_pod["status"] = {"phase": phase} + raw_pod["status"] = {"phase": "Pending"} + scheduled_condition = { + "lastProbeTime": None, + "lastTransitionTime": datetime.now().isoformat(), + "status": "True", + "type": "PodScheduled", + } + if is_scheduled: + raw_pod["status"] = { + "phase": "Pending", + "containerStatuses": [{"state": {"waiting": {}}}], + "conditions": [scheduled_condition], + } + raw_pod["spec"]["nodeName"] = node_name + if is_running or is_terminating: + raw_pod["status"] = { + "phase": "Running", + "containerStatuses": [{"state": {"running": {}}}], + "conditions": [scheduled_condition], + } + raw_pod["spec"]["nodeName"] = node_name if is_terminating: raw_pod["metadata"]["deletionTimestamp"] = datetime.now().isoformat() - if node_name: + if is_terminated: + raw_pod["status"] = { + "phase": "Succeeded", + "containerStatuses": [{"state": {"terminated": {}}}], + "conditions": [scheduled_condition], + } raw_pod["spec"]["nodeName"] = node_name return raw_pod @@ -63,11 +88,11 @@ def kube_client(self) -> KubeClient: kube_client = mock.AsyncMock(spec=KubeClient) kube_client.get_node.side_effect = [ Node( - name="minikube1", + "minikube1", allocatable_resources=NodeResources(cpu=1, memory=1024, gpu=1), ), Node( - name="minikube2", + "minikube2", allocatable_resources=NodeResources(cpu=2, memory=4096, gpu=2), ), ] @@ -78,168 +103,171 @@ def handler(self, kube_client: KubeClient) -> NodeResourcesHandler: return NodeResourcesHandler(kube_client) async def test_init_pending( - self, handler: NodeResourcesHandler, pod_factory: RawPodFactory + self, handler: NodeResourcesHandler, create_pod: PodFactory ) -> None: - pods = [pod_factory(name="job", phase="Pending", node_name=None)] + pods = [create_pod("job")] await handler.init(pods) assert len(handler.get_nodes()) == 0 - assert handler.is_pod_bound_to_node("job") is False - resources = handler.get_node_free_resources("minikube") assert resources == NodeResources() async def test_init_scheduled( - self, handler: NodeResourcesHandler, pod_factory: RawPodFactory + self, handler: NodeResourcesHandler, create_pod: PodFactory ) -> None: - pods = [pod_factory(name="job", phase="Pending")] + pods = [create_pod("job", is_scheduled=True)] await handler.init(pods) assert len(handler.get_nodes()) == 1 - assert handler.is_pod_bound_to_node("job") is True - resources = handler.get_node_free_resources("minikube") assert resources == NodeResources(0.9, 896) async def test_init_running( - self, handler: NodeResourcesHandler, pod_factory: RawPodFactory + self, handler: NodeResourcesHandler, create_pod: PodFactory ) -> None: pods = [ - pod_factory(name="job", node_name="minikube1", gpu=0), - pod_factory(node_name="minikube1"), - pod_factory(node_name="minikube2"), + create_pod("job", gpu=0, is_running=True), + create_pod(is_running=True), + create_pod(node_name="minikube2", is_running=True), ] await handler.init(pods) assert len(handler.get_nodes()) == 2 - assert handler.is_pod_bound_to_node("job") is True - - resources = handler.get_node_free_resources("minikube1") + resources = handler.get_node_free_resources("minikube") assert resources == NodeResources(cpu=0.8, memory=768) - resources = handler.get_node_free_resources("minikube2") assert resources == NodeResources(cpu=1.9, memory=3968, gpu=1) async def test_init_succeeded( - self, handler: NodeResourcesHandler, pod_factory: RawPodFactory + self, handler: NodeResourcesHandler, create_pod: PodFactory ) -> None: - pods = [pod_factory(name="job", phase="Succeeded")] + pods = [create_pod("job", is_terminated=True)] await handler.init(pods) assert len(handler.get_nodes()) == 0 - assert handler.is_pod_bound_to_node("job") is False - resources = handler.get_node_free_resources("minikube1") assert resources == NodeResources() async def test_handle_added_pending( - self, handler: NodeResourcesHandler, pod_factory: RawPodFactory + self, handler: NodeResourcesHandler, create_pod: PodFactory ) -> None: - await handler.handle( - PodWatchEvent.create_added( - pod_factory(name="job", phase="Pending", node_name=None) - ) - ) + await handler.handle(PodWatchEvent.create_added(create_pod("job"))) assert len(handler.get_nodes()) == 0 - assert handler.is_pod_bound_to_node("job") is False - resources = handler.get_node_free_resources("minikube") assert resources == NodeResources() async def test_handle_added_running( - self, handler: NodeResourcesHandler, pod_factory: RawPodFactory + self, handler: NodeResourcesHandler, create_pod: PodFactory ) -> None: - await handler.handle(PodWatchEvent.create_added(pod_factory(name="job"))) + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) assert len(handler.get_nodes()) == 1 - assert handler.is_pod_bound_to_node("job") is True - resources = handler.get_node_free_resources("minikube") assert resources == NodeResources(cpu=0.9, memory=896) - async def test_handle_added_multiple( - self, handler: NodeResourcesHandler, pod_factory: RawPodFactory + async def test_handle_added_running_multiple_times( + self, handler: NodeResourcesHandler, create_pod: PodFactory ) -> None: - await handler.handle(PodWatchEvent.create_added(pod_factory(name="job"))) - await handler.handle(PodWatchEvent.create_added(pod_factory(name="job"))) + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) assert len(handler.get_nodes()) == 1 - assert handler.is_pod_bound_to_node("job") is True - resources = handler.get_node_free_resources("minikube") assert resources == NodeResources(cpu=0.9, memory=896) async def test_handle_modified_succeeded( - self, handler: NodeResourcesHandler, pod_factory: RawPodFactory + self, handler: NodeResourcesHandler, create_pod: PodFactory ) -> None: - await handler.handle(PodWatchEvent.create_added(pod_factory(gpu=0))) - await handler.handle(PodWatchEvent.create_added(pod_factory(name="job"))) await handler.handle( - PodWatchEvent.create_modified(pod_factory(name="job", phase="Succeeded")) + PodWatchEvent.create_added(create_pod(gpu=0, is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_modified(create_pod("job", is_terminated=True)) ) assert len(handler.get_nodes()) == 1 - assert handler.is_pod_bound_to_node("job") is False - resources = handler.get_node_free_resources("minikube") assert resources == NodeResources(cpu=0.9, memory=896, gpu=1) async def test_handle_deleted( - self, handler: NodeResourcesHandler, pod_factory: RawPodFactory + self, handler: NodeResourcesHandler, create_pod: PodFactory ) -> None: - await handler.handle(PodWatchEvent.create_added(pod_factory(gpu=0))) - await handler.handle(PodWatchEvent.create_added(pod_factory(name="job"))) - await handler.handle(PodWatchEvent.create_deleted(pod_factory(name="job"))) + await handler.handle( + PodWatchEvent.create_added(create_pod(gpu=0, is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_deleted(create_pod("job", is_terminated=True)) + ) assert len(handler.get_nodes()) == 1 - assert handler.is_pod_bound_to_node("job") is False - resources = handler.get_node_free_resources("minikube") assert resources == NodeResources(cpu=0.9, memory=896, gpu=1) async def test_handle_deleted_multiple_times( - self, handler: NodeResourcesHandler, pod_factory: RawPodFactory + self, handler: NodeResourcesHandler, create_pod: PodFactory ) -> None: - await handler.handle(PodWatchEvent.create_added(pod_factory(gpu=0))) - await handler.handle(PodWatchEvent.create_added(pod_factory(name="job"))) - await handler.handle(PodWatchEvent.create_deleted(pod_factory(name="job"))) - await handler.handle(PodWatchEvent.create_deleted(pod_factory(name="job"))) + await handler.handle( + PodWatchEvent.create_added(create_pod(gpu=0, is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_deleted(create_pod("job", is_terminated=True)) + ) + await handler.handle( + PodWatchEvent.create_deleted(create_pod("job", is_terminated=True)) + ) assert len(handler.get_nodes()) == 1 - assert handler.is_pod_bound_to_node("job") is False - resources = handler.get_node_free_resources("minikube") assert resources == NodeResources(cpu=0.9, memory=896, gpu=1) async def test_handle_deleted_last( - self, handler: NodeResourcesHandler, pod_factory: RawPodFactory + self, handler: NodeResourcesHandler, create_pod: PodFactory ) -> None: - await handler.handle(PodWatchEvent.create_added(pod_factory(name="job"))) - await handler.handle(PodWatchEvent.create_deleted(pod_factory(name="job"))) + await handler.handle( + PodWatchEvent.create_added(create_pod("job", is_running=True)) + ) + await handler.handle( + PodWatchEvent.create_deleted(create_pod("job", is_terminated=True)) + ) assert len(handler.get_nodes()) == 0 - resources = handler.get_node_free_resources("minikube") assert resources == NodeResources() async def test_handle_deleted_not_existing( - self, handler: NodeResourcesHandler, pod_factory: RawPodFactory + self, handler: NodeResourcesHandler, create_pod: PodFactory ) -> None: - await handler.handle(PodWatchEvent.create_deleted(pod_factory())) + await handler.handle( + PodWatchEvent.create_deleted(create_pod(is_terminated=True)) + ) assert len(handler.get_nodes()) == 0 - resources = handler.get_node_free_resources("minikube") assert resources == NodeResources() @@ -251,115 +279,111 @@ async def test_get_node_free_resources_unknown_node( class TestIdlePodsHandler: + @pytest.fixture + def create_pod(self, create_pod: PodFactory) -> PodFactory: + def _create(*args: Any, is_idle: bool = True, **kwargs: Any) -> dict[str, Any]: + if is_idle: + kwargs["labels"] = {"platform.neuromation.io/idle": "true"} + return create_pod(*args, **kwargs) + + return _create + @pytest.fixture def handler(self) -> IdlePodsHandler: return IdlePodsHandler() async def test_init_non_idle( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - pods = [pod_factory()] + pods = [create_pod(is_idle=False)] await handler.init(pods) idle_pods = handler.get_pods("minikube") - assert len(idle_pods) == 0 async def test_init_pending( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - pods = [pod_factory(is_idle=True, phase="Pending", node_name=None)] + pods = [create_pod()] await handler.init(pods) idle_pods = handler.get_pods("minikube") - assert len(idle_pods) == 0 async def test_init_scheduled( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - pods = [pod_factory(is_idle=True, phase="Pending")] + pods = [create_pod(is_scheduled=True)] await handler.init(pods) idle_pods = handler.get_pods("minikube") - assert len(idle_pods) == 1 async def test_init_running( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - pods = [pod_factory(is_idle=True)] + pods = [create_pod(is_running=True)] await handler.init(pods) idle_pods = handler.get_pods("minikube") - assert len(idle_pods) == 1 async def test_init_succeeded( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - pods = [pod_factory(is_idle=True, phase="Succeeded")] + pods = [create_pod(is_terminated=True)] await handler.init(pods) idle_pods = handler.get_pods("minikube") - assert len(idle_pods) == 0 async def test_handle_added_non_idle( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - event = PodWatchEvent.create_added(pod_factory()) + event = PodWatchEvent.create_added(create_pod(is_idle=False, is_running=True)) await handler.handle(event) pods = handler.get_pods("minikube") - assert len(pods) == 0 async def test_handle_added_pending( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - event = PodWatchEvent.create_added( - pod_factory(is_idle=True, phase="Pending", node_name=None) - ) + event = PodWatchEvent.create_added(create_pod()) await handler.handle(event) pods = handler.get_pods("minikube") - assert len(pods) == 0 async def test_handle_added_scheduled( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - event = PodWatchEvent.create_added(pod_factory(is_idle=True, phase="Pending")) + event = PodWatchEvent.create_added(create_pod(is_scheduled=True)) await handler.handle(event) pods = handler.get_pods("minikube") - assert len(pods) == 1 async def test_handle_added_running( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - event = PodWatchEvent.create_added(pod_factory(is_idle=True)) + event = PodWatchEvent.create_added(create_pod(is_running=True)) await handler.handle(event) pods = handler.get_pods("minikube") - assert len(pods) == 1 async def test_handle_modified_succeeded( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - event = PodWatchEvent.create_added( - pod_factory(name="idle-job", is_idle=True), - ) + event = PodWatchEvent.create_added(create_pod("idle-job", is_running=True)) await handler.handle(event) pods = handler.get_pods("minikube") assert len(pods) == 1 event = PodWatchEvent.create_modified( - pod_factory(name="idle-job", is_idle=True, phase="Succeeded"), + create_pod("idle-job", is_terminated=True) ) await handler.handle(event) @@ -367,30 +391,24 @@ async def test_handle_modified_succeeded( assert len(pods) == 0 async def test_handle_deleted( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - event = PodWatchEvent.create_added( - pod_factory(name="idle-job", is_idle=True), - ) + event = PodWatchEvent.create_added(create_pod("idle-job", is_running=True)) await handler.handle(event) pods = handler.get_pods("minikube") assert len(pods) == 1 - event = PodWatchEvent.create_deleted( - pod_factory(name="idle-job", is_idle=True), - ) + event = PodWatchEvent.create_deleted(create_pod("idle-job", is_terminated=True)) await handler.handle(event) pods = handler.get_pods("minikube") assert len(pods) == 0 async def test_handle_deleted_not_existing( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - event = PodWatchEvent.create_deleted( - pod_factory(is_idle=True), - ) + event = PodWatchEvent.create_deleted(create_pod(is_terminated=True)) await handler.handle(event) pods = handler.get_pods("minikube") @@ -402,17 +420,15 @@ async def test_get_pods_unknown_node(self, handler: IdlePodsHandler) -> None: assert len(idle_pods) == 0 async def test_wait_for_pod_terminating( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - event = PodWatchEvent.create_added( - pod_factory(name="idle-job", is_idle=True), - ) + event = PodWatchEvent.create_added(create_pod("idle-job", is_running=True)) await handler.handle(event) async def _handle_modified() -> None: await asyncio.sleep(0.1) event = PodWatchEvent.create_modified( - pod_factory(name="idle-job", is_idle=True, is_terminating=True), + create_pod("idle-job", is_terminating=True) ) await handler.handle(event) @@ -421,17 +437,15 @@ async def _handle_modified() -> None: await handler.wait_for_pod_terminating("idle-job", 1) async def test_wait_for_pod_terminating_deleted( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - event = PodWatchEvent.create_added( - pod_factory(name="idle-job", is_idle=True), - ) + event = PodWatchEvent.create_added(create_pod("idle-job", is_running=True)) await handler.handle(event) async def _handle_deleted() -> None: await asyncio.sleep(0.1) event = PodWatchEvent.create_deleted( - pod_factory(name="idle-job", is_idle=True, is_terminating=True), + create_pod("idle-job", is_terminating=True) ) await handler.handle(event) @@ -440,26 +454,22 @@ async def _handle_deleted() -> None: await handler.wait_for_pod_terminating("idle-job", 1) async def test_wait_for_pod_terminating_already( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - event = PodWatchEvent.create_added( - pod_factory(name="idle-job", is_idle=True), - ) + event = PodWatchEvent.create_added(create_pod("idle-job", is_running=True)) await handler.handle(event) event = PodWatchEvent.create_modified( - pod_factory(name="idle-job", is_idle=True, is_terminating=True), + create_pod("idle-job", is_terminating=True) ) await handler.handle(event) await handler.wait_for_pod_terminating("idle-job", 1) async def test_wait_for_pod_terminating_fail( - self, handler: IdlePodsHandler, pod_factory: RawPodFactory + self, handler: IdlePodsHandler, create_pod: PodFactory ) -> None: - event = PodWatchEvent.create_added( - pod_factory(name="idle-job", is_idle=True), - ) + event = PodWatchEvent.create_added(create_pod("idle-job", is_running=True)) await handler.handle(event) with pytest.raises(asyncio.TimeoutError):