diff --git a/dask_kubernetes/classic/tests/test_async.py b/dask_kubernetes/classic/tests/test_async.py index 84d0bdf3c..5785d6dbd 100644 --- a/dask_kubernetes/classic/tests/test_async.py +++ b/dask_kubernetes/classic/tests/test_async.py @@ -1,3 +1,4 @@ +import pytest_asyncio import asyncio import base64 import getpass @@ -42,14 +43,6 @@ def pod_spec(docker_image): ) -@pytest.fixture(scope="module") -def event_loop(request): - """Override function-scoped fixture in pytest-asyncio.""" - loop = asyncio.new_event_loop() - yield loop - loop.close() - - @pytest.fixture def user_env(): """The env var USER is not always set on non-linux systems.""" @@ -64,19 +57,19 @@ def user_env(): cluster_kwargs = {"asynchronous": True} -@pytest.fixture +@pytest_asyncio.fixture async def cluster(k8s_cluster, pod_spec): async with KubeCluster(pod_spec, **cluster_kwargs) as cluster: yield cluster -@pytest.fixture +@pytest_asyncio.fixture async def remote_cluster(k8s_cluster, pod_spec): async with KubeCluster(pod_spec, deploy_mode="remote", **cluster_kwargs) as cluster: yield cluster -@pytest.fixture +@pytest_asyncio.fixture async def client(cluster): async with Client(cluster, asynchronous=True) as client: yield client diff --git a/dask_kubernetes/conftest.py b/dask_kubernetes/conftest.py index f70bd2ee4..340661e2b 100644 --- a/dask_kubernetes/conftest.py +++ b/dask_kubernetes/conftest.py @@ -17,7 +17,7 @@ @pytest.fixture() -async def kopf_runner(k8s_cluster): +def kopf_runner(k8s_cluster): yield KopfRunner(["run", "-m", "dask_kubernetes.operator", "--verbose"]) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index 34c430712..e355cdc73 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -15,9 +15,7 @@ from distributed.utils import ( Log, Logs, - LoopRunner, TimeoutError, - LoopRunner, format_dashboard_link, ) @@ -128,8 +126,6 @@ def __init__( n_workers=3, resources={}, env=[], - loop=None, - asynchronous=False, auth=ClusterAuth.DEFAULT, port_forward_cluster_ip=None, create_mode=CreateMode.CREATE_OR_CONNECT, @@ -147,12 +143,10 @@ def __init__( self.port_forward_cluster_ip = port_forward_cluster_ip self.create_mode = create_mode self.shutdown_on_close = shutdown_on_close - self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) - self.loop = self._loop_runner.loop self._instances.add(self) - super().__init__(asynchronous=asynchronous, **kwargs) + super().__init__(**kwargs) if not self.asynchronous: self._loop_runner.start() self.sync(self._start) diff --git a/dask_kubernetes/experimental/tests/test_kubecluster.py b/dask_kubernetes/experimental/tests/test_kubecluster.py index 742b59c80..eda01110b 100644 --- a/dask_kubernetes/experimental/tests/test_kubecluster.py +++ b/dask_kubernetes/experimental/tests/test_kubecluster.py @@ -39,6 +39,17 @@ def test_multiple_clusters_simultaneously(kopf_runner, docker_image): assert client2.submit(lambda x: x + 1, 10).result() == 11 +def test_multiple_clusters_simultaneously_same_loop(kopf_runner, docker_image): + with kopf_runner: + with KubeCluster(name="fizz", image=docker_image) as cluster1, KubeCluster( + name="buzz", image=docker_image, loop=cluster1.loop + ) as cluster2: + with Client(cluster1) as client1, Client(cluster2) as client2: + assert cluster1.loop is cluster2.loop is client1.loop is client2.loop + assert client1.submit(lambda x: x + 1, 10).result() == 11 + assert client2.submit(lambda x: x + 1, 10).result() == 11 + + def test_cluster_from_name(kopf_runner, docker_image): with kopf_runner: with KubeCluster(name="abc", image=docker_image) as firstcluster: diff --git a/dask_kubernetes/helm/helmcluster.py b/dask_kubernetes/helm/helmcluster.py index c2011bcac..1b15618bb 100644 --- a/dask_kubernetes/helm/helmcluster.py +++ b/dask_kubernetes/helm/helmcluster.py @@ -7,7 +7,7 @@ from distributed.deploy import Cluster from distributed.core import rpc, Status -from distributed.utils import Log, Logs, LoopRunner +from distributed.utils import Log, Logs import kubernetes_asyncio as kubernetes from ..common.auth import ClusterAuth @@ -85,8 +85,6 @@ def __init__( auth=ClusterAuth.DEFAULT, namespace=None, port_forward_cluster_ip=False, - loop=None, - asynchronous=False, scheduler_name="scheduler", worker_name="worker", node_host=None, @@ -110,14 +108,12 @@ def __init__( self.scheduler_comm = None self.port_forward_cluster_ip = port_forward_cluster_ip self._supports_scaling = True - self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous) - self.loop = self._loop_runner.loop self.scheduler_name = scheduler_name self.worker_name = worker_name self.node_host = node_host self.node_port = node_port - super().__init__(asynchronous=asynchronous, **kwargs) + super().__init__(**kwargs) if not self.asynchronous: self._loop_runner.start() self.sync(self._start) diff --git a/dask_kubernetes/helm/tests/test_helm.py b/dask_kubernetes/helm/tests/test_helm.py index 0e9fc052e..3bd9ebf08 100644 --- a/dask_kubernetes/helm/tests/test_helm.py +++ b/dask_kubernetes/helm/tests/test_helm.py @@ -1,9 +1,11 @@ import pytest +import pytest_asyncio import subprocess import os.path import dask.config +from distributed import Client from distributed.core import Status from dask_ctl.discovery import ( list_discovery_methods, @@ -79,7 +81,7 @@ def release(k8s_cluster, chart_name, test_namespace, release_name, config_path): subprocess.check_output(["helm", "delete", "-n", test_namespace, release_name]) -@pytest.fixture +@pytest_asyncio.fixture async def cluster(k8s_cluster, release, test_namespace): from dask_kubernetes import HelmCluster @@ -122,6 +124,15 @@ def test_import(): assert issubclass(HelmCluster, Cluster) +def test_loop(k8s_cluster, release, test_namespace): + from dask_kubernetes import HelmCluster + + with Client(nthreads=[]) as client, HelmCluster( + release_name=release, namespace=test_namespace, loop=client.loop + ) as cluster: + assert cluster.loop is client.loop + + def test_raises_on_non_existant_release(k8s_cluster): from dask_kubernetes import HelmCluster diff --git a/dask_kubernetes/operator/tests/test_operator.py b/dask_kubernetes/operator/tests/test_operator.py index 48daeea90..80ee6ca14 100644 --- a/dask_kubernetes/operator/tests/test_operator.py +++ b/dask_kubernetes/operator/tests/test_operator.py @@ -12,7 +12,7 @@ @pytest.fixture() -async def gen_cluster(k8s_cluster): +def gen_cluster(k8s_cluster): """Yields an instantiated context manager for creating/deleting a simple cluster.""" @asynccontextmanager diff --git a/requirements-test.txt b/requirements-test.txt index 4c144dc17..b0f127119 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -1,9 +1,9 @@ flake8>=3.7 black>=18.9b0 dask-ctl>=2021.3.0 -pytest>=5.3 -pytest-asyncio>=0.10.0 +pytest>=7.1 +pytest-asyncio>=0.17 git+https://codeberg.org/hjacobs/pytest-kind.git pytest-timeout pytest-rerunfailures -git+http://github.com/elemental-lf/k8s-crd-resolver \ No newline at end of file +git+http://github.com/elemental-lf/k8s-crd-resolver diff --git a/setup.cfg b/setup.cfg index dc4ada234..5b716f7fd 100644 --- a/setup.cfg +++ b/setup.cfg @@ -39,4 +39,4 @@ parentdir_prefix = dask-kubernetes- addopts = -v --keep-cluster --durations=10 timeout = 300 reruns = 5 -asyncio_mode = auto +asyncio_mode = strict