From 6b632abe1e4d68c4a72a3c394ea93605d5f58e20 Mon Sep 17 00:00:00 2001 From: Jacob Tomlinson Date: Fri, 28 Jul 2023 17:33:15 +0100 Subject: [PATCH] Small changes form kr8s kubecluster migration (#786) --- dask_kubernetes/common/networking.py | 4 +++- dask_kubernetes/operator/_objects.py | 8 ++++++++ dask_kubernetes/operator/kubecluster/kubecluster.py | 3 +-- requirements-test.txt | 2 +- requirements.txt | 2 +- 5 files changed, 14 insertions(+), 5 deletions(-) diff --git a/dask_kubernetes/common/networking.py b/dask_kubernetes/common/networking.py index 6d05c9a72..a589a976a 100644 --- a/dask_kubernetes/common/networking.py +++ b/dask_kubernetes/common/networking.py @@ -12,6 +12,7 @@ from dask_kubernetes.common.utils import check_dependency from dask_kubernetes.aiopykube.objects import Pod +from dask_kubernetes.aiopykube import HTTPClient, KubeConfig from dask_kubernetes.exceptions import CrashLoopBackOffError @@ -193,8 +194,9 @@ async def get_scheduler_address( return address -async def wait_for_scheduler(api, cluster_name, namespace, timeout=None): +async def wait_for_scheduler(cluster_name, namespace, timeout=None): pod_start_time = None + api = HTTPClient(KubeConfig.from_env()) while True: async with kubernetes.client.api_client.ApiClient() as api_client: k8s_api = kubernetes.client.CoreV1Api(api_client) diff --git a/dask_kubernetes/operator/_objects.py b/dask_kubernetes/operator/_objects.py index f02bdcddd..3f7ce6f00 100644 --- a/dask_kubernetes/operator/_objects.py +++ b/dask_kubernetes/operator/_objects.py @@ -69,6 +69,14 @@ async def scheduler_service(self) -> Service: assert len(services) == 1 return services[0] + async def ready(self) -> bool: + await self._refresh() + return ( + "status" in self.raw + and "phase" in self.status + and self.status.phase == "Running" + ) + class DaskWorkerGroup(APIObject): version = "kubernetes.dask.org/v1" diff --git a/dask_kubernetes/operator/kubecluster/kubecluster.py b/dask_kubernetes/operator/kubecluster/kubecluster.py index 8cc5d9e79..8e1a9dfe8 100644 --- a/dask_kubernetes/operator/kubecluster/kubecluster.py +++ b/dask_kubernetes/operator/kubecluster/kubecluster.py @@ -378,7 +378,6 @@ async def _create_cluster(self): try: self._log("Waiting for scheduler pod") await wait_for_scheduler( - self.k8s_api, self.name, self.namespace, timeout=self._resource_timeout, @@ -434,7 +433,7 @@ async def _connect_cluster(self): service_name = f"{cluster_spec['metadata']['name']}-scheduler" self._log("Waiting for scheduler pod") await wait_for_scheduler( - self.k8s_api, self.name, self.namespace, timeout=self._resource_timeout + self.name, self.namespace, timeout=self._resource_timeout ) self._log("Waiting for scheduler service") await wait_for_service(core_api, service_name, self.namespace) diff --git a/requirements-test.txt b/requirements-test.txt index 41ad62fe7..c44e8bbba 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -2,7 +2,7 @@ flake8>=3.7 black>=18.9b0 dask-ctl>=2021.3.0 pytest>=7.1 -git+https://codeberg.org/hjacobs/pytest-kind.git +pytest-kind pytest-timeout pytest-rerunfailures git+https://github.com/elemental-lf/k8s-crd-resolver@v0.14.0 diff --git a/requirements.txt b/requirements.txt index 0641d0d68..76f3bf206 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1 kopf>=1.35.3 pykube-ng>=22.9.0 rich>=12.5.1 -kr8s==0.8.6 +kr8s==0.8.7