Skip to content

Commit

Permalink
support running experimental KubeClusters and HelmClusters on the sam…
Browse files Browse the repository at this point in the history
…e loop as another SyncMethodMixin instance (#493)

* test that nested experimental KubeClusters can be run on the same event loop

* remove calls to LoopRunner from expermental.kubecluster

* test that HelmCluster can run on the same loop as another SyncMethodMixin instance

* remove calls to LoopRunner from helm.helmcluster

* switch to pytest-asyncio strict

* gen_cluster is a synchrounous fixture

* Remove unused import

Co-authored-by: Jacob Tomlinson <jtomlinson@nvidia.com>
  • Loading branch information
graingert and jacobtomlinson authored May 18, 2022
1 parent a2e5aa9 commit 856d7f3
Show file tree
Hide file tree
Showing 9 changed files with 36 additions and 31 deletions.
15 changes: 4 additions & 11 deletions dask_kubernetes/classic/tests/test_async.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest_asyncio
import asyncio
import base64
import getpass
Expand Down Expand Up @@ -42,14 +43,6 @@ def pod_spec(docker_image):
)


@pytest.fixture(scope="module")
def event_loop(request):
"""Override function-scoped fixture in pytest-asyncio."""
loop = asyncio.new_event_loop()
yield loop
loop.close()


@pytest.fixture
def user_env():
"""The env var USER is not always set on non-linux systems."""
Expand All @@ -64,19 +57,19 @@ def user_env():
cluster_kwargs = {"asynchronous": True}


@pytest.fixture
@pytest_asyncio.fixture
async def cluster(k8s_cluster, pod_spec):
async with KubeCluster(pod_spec, **cluster_kwargs) as cluster:
yield cluster


@pytest.fixture
@pytest_asyncio.fixture
async def remote_cluster(k8s_cluster, pod_spec):
async with KubeCluster(pod_spec, deploy_mode="remote", **cluster_kwargs) as cluster:
yield cluster


@pytest.fixture
@pytest_asyncio.fixture
async def client(cluster):
async with Client(cluster, asynchronous=True) as client:
yield client
Expand Down
2 changes: 1 addition & 1 deletion dask_kubernetes/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


@pytest.fixture()
async def kopf_runner(k8s_cluster):
def kopf_runner(k8s_cluster):
yield KopfRunner(["run", "-m", "dask_kubernetes.operator", "--verbose"])


Expand Down
8 changes: 1 addition & 7 deletions dask_kubernetes/experimental/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
from distributed.utils import (
Log,
Logs,
LoopRunner,
TimeoutError,
LoopRunner,
format_dashboard_link,
)

Expand Down Expand Up @@ -128,8 +126,6 @@ def __init__(
n_workers=3,
resources={},
env=[],
loop=None,
asynchronous=False,
auth=ClusterAuth.DEFAULT,
port_forward_cluster_ip=None,
create_mode=CreateMode.CREATE_OR_CONNECT,
Expand All @@ -147,12 +143,10 @@ def __init__(
self.port_forward_cluster_ip = port_forward_cluster_ip
self.create_mode = create_mode
self.shutdown_on_close = shutdown_on_close
self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
self.loop = self._loop_runner.loop

self._instances.add(self)

super().__init__(asynchronous=asynchronous, **kwargs)
super().__init__(**kwargs)
if not self.asynchronous:
self._loop_runner.start()
self.sync(self._start)
Expand Down
11 changes: 11 additions & 0 deletions dask_kubernetes/experimental/tests/test_kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ def test_multiple_clusters_simultaneously(kopf_runner, docker_image):
assert client2.submit(lambda x: x + 1, 10).result() == 11


def test_multiple_clusters_simultaneously_same_loop(kopf_runner, docker_image):
with kopf_runner:
with KubeCluster(name="fizz", image=docker_image) as cluster1, KubeCluster(
name="buzz", image=docker_image, loop=cluster1.loop
) as cluster2:
with Client(cluster1) as client1, Client(cluster2) as client2:
assert cluster1.loop is cluster2.loop is client1.loop is client2.loop
assert client1.submit(lambda x: x + 1, 10).result() == 11
assert client2.submit(lambda x: x + 1, 10).result() == 11


def test_cluster_from_name(kopf_runner, docker_image):
with kopf_runner:
with KubeCluster(name="abc", image=docker_image) as firstcluster:
Expand Down
8 changes: 2 additions & 6 deletions dask_kubernetes/helm/helmcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from distributed.deploy import Cluster
from distributed.core import rpc, Status
from distributed.utils import Log, Logs, LoopRunner
from distributed.utils import Log, Logs
import kubernetes_asyncio as kubernetes

from ..common.auth import ClusterAuth
Expand Down Expand Up @@ -85,8 +85,6 @@ def __init__(
auth=ClusterAuth.DEFAULT,
namespace=None,
port_forward_cluster_ip=False,
loop=None,
asynchronous=False,
scheduler_name="scheduler",
worker_name="worker",
node_host=None,
Expand All @@ -110,14 +108,12 @@ def __init__(
self.scheduler_comm = None
self.port_forward_cluster_ip = port_forward_cluster_ip
self._supports_scaling = True
self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
self.loop = self._loop_runner.loop
self.scheduler_name = scheduler_name
self.worker_name = worker_name
self.node_host = node_host
self.node_port = node_port

super().__init__(asynchronous=asynchronous, **kwargs)
super().__init__(**kwargs)
if not self.asynchronous:
self._loop_runner.start()
self.sync(self._start)
Expand Down
13 changes: 12 additions & 1 deletion dask_kubernetes/helm/tests/test_helm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import pytest
import pytest_asyncio

import subprocess
import os.path

import dask.config
from distributed import Client
from distributed.core import Status
from dask_ctl.discovery import (
list_discovery_methods,
Expand Down Expand Up @@ -79,7 +81,7 @@ def release(k8s_cluster, chart_name, test_namespace, release_name, config_path):
subprocess.check_output(["helm", "delete", "-n", test_namespace, release_name])


@pytest.fixture
@pytest_asyncio.fixture
async def cluster(k8s_cluster, release, test_namespace):
from dask_kubernetes import HelmCluster

Expand Down Expand Up @@ -122,6 +124,15 @@ def test_import():
assert issubclass(HelmCluster, Cluster)


def test_loop(k8s_cluster, release, test_namespace):
from dask_kubernetes import HelmCluster

with Client(nthreads=[]) as client, HelmCluster(
release_name=release, namespace=test_namespace, loop=client.loop
) as cluster:
assert cluster.loop is client.loop


def test_raises_on_non_existant_release(k8s_cluster):
from dask_kubernetes import HelmCluster

Expand Down
2 changes: 1 addition & 1 deletion dask_kubernetes/operator/tests/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


@pytest.fixture()
async def gen_cluster(k8s_cluster):
def gen_cluster(k8s_cluster):
"""Yields an instantiated context manager for creating/deleting a simple cluster."""

@asynccontextmanager
Expand Down
6 changes: 3 additions & 3 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
flake8>=3.7
black>=18.9b0
dask-ctl>=2021.3.0
pytest>=5.3
pytest-asyncio>=0.10.0
pytest>=7.1
pytest-asyncio>=0.17
git+https://codeberg.org/hjacobs/pytest-kind.git
pytest-timeout
pytest-rerunfailures
git+http://github.com/elemental-lf/k8s-crd-resolver
git+http://github.com/elemental-lf/k8s-crd-resolver
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ parentdir_prefix = dask-kubernetes-
addopts = -v --keep-cluster --durations=10
timeout = 300
reruns = 5
asyncio_mode = auto
asyncio_mode = strict

0 comments on commit 856d7f3

Please sign in to comment.