Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support running experimental KubeClusters and HelmClusters on the same loop as another SyncMethodMixin instance #493

Merged
merged 9 commits into from
May 18, 2022
15 changes: 4 additions & 11 deletions dask_kubernetes/classic/tests/test_async.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pytest_asyncio
import asyncio
import base64
import getpass
Expand Down Expand Up @@ -42,14 +43,6 @@ def pod_spec(docker_image):
)


@pytest.fixture(scope="module")
def event_loop(request):
"""Override function-scoped fixture in pytest-asyncio."""
loop = asyncio.new_event_loop()
yield loop
loop.close()


@pytest.fixture
def user_env():
"""The env var USER is not always set on non-linux systems."""
Expand All @@ -64,19 +57,19 @@ def user_env():
cluster_kwargs = {"asynchronous": True}


@pytest.fixture
@pytest_asyncio.fixture
async def cluster(k8s_cluster, pod_spec):
async with KubeCluster(pod_spec, **cluster_kwargs) as cluster:
yield cluster


@pytest.fixture
@pytest_asyncio.fixture
async def remote_cluster(k8s_cluster, pod_spec):
async with KubeCluster(pod_spec, deploy_mode="remote", **cluster_kwargs) as cluster:
yield cluster


@pytest.fixture
@pytest_asyncio.fixture
async def client(cluster):
async with Client(cluster, asynchronous=True) as client:
yield client
Expand Down
2 changes: 1 addition & 1 deletion dask_kubernetes/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@


@pytest.fixture()
async def kopf_runner(k8s_cluster):
def kopf_runner(k8s_cluster):
yield KopfRunner(["run", "-m", "dask_kubernetes.operator", "--verbose"])


Expand Down
8 changes: 1 addition & 7 deletions dask_kubernetes/experimental/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
from distributed.utils import (
Log,
Logs,
LoopRunner,
TimeoutError,
LoopRunner,
format_dashboard_link,
)

Expand Down Expand Up @@ -128,8 +126,6 @@ def __init__(
n_workers=3,
resources={},
env=[],
loop=None,
asynchronous=False,
auth=ClusterAuth.DEFAULT,
port_forward_cluster_ip=None,
create_mode=CreateMode.CREATE_OR_CONNECT,
Expand All @@ -147,12 +143,10 @@ def __init__(
self.port_forward_cluster_ip = port_forward_cluster_ip
self.create_mode = create_mode
self.shutdown_on_close = shutdown_on_close
self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
self.loop = self._loop_runner.loop

self._instances.add(self)

super().__init__(asynchronous=asynchronous, **kwargs)
super().__init__(**kwargs)
if not self.asynchronous:
self._loop_runner.start()
self.sync(self._start)
Expand Down
11 changes: 11 additions & 0 deletions dask_kubernetes/experimental/tests/test_kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ def test_multiple_clusters_simultaneously(kopf_runner, docker_image):
assert client2.submit(lambda x: x + 1, 10).result() == 11


def test_multiple_clusters_simultaneously_same_loop(kopf_runner, docker_image):
with kopf_runner:
with KubeCluster(name="fizz", image=docker_image) as cluster1, KubeCluster(
name="buzz", image=docker_image, loop=cluster1.loop
) as cluster2:
with Client(cluster1) as client1, Client(cluster2) as client2:
assert cluster1.loop is cluster2.loop is client1.loop is client2.loop
assert client1.submit(lambda x: x + 1, 10).result() == 11
assert client2.submit(lambda x: x + 1, 10).result() == 11


def test_cluster_from_name(kopf_runner, docker_image):
with kopf_runner:
with KubeCluster(name="abc", image=docker_image) as firstcluster:
Expand Down
8 changes: 2 additions & 6 deletions dask_kubernetes/helm/helmcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from distributed.deploy import Cluster
from distributed.core import rpc, Status
from distributed.utils import Log, Logs, LoopRunner
from distributed.utils import Log, Logs
import kubernetes_asyncio as kubernetes

from ..common.auth import ClusterAuth
Expand Down Expand Up @@ -85,8 +85,6 @@ def __init__(
auth=ClusterAuth.DEFAULT,
namespace=None,
port_forward_cluster_ip=False,
loop=None,
asynchronous=False,
scheduler_name="scheduler",
worker_name="worker",
node_host=None,
Expand All @@ -110,14 +108,12 @@ def __init__(
self.scheduler_comm = None
self.port_forward_cluster_ip = port_forward_cluster_ip
self._supports_scaling = True
self._loop_runner = LoopRunner(loop=loop, asynchronous=asynchronous)
self.loop = self._loop_runner.loop
self.scheduler_name = scheduler_name
self.worker_name = worker_name
self.node_host = node_host
self.node_port = node_port

super().__init__(asynchronous=asynchronous, **kwargs)
super().__init__(**kwargs)
if not self.asynchronous:
self._loop_runner.start()
self.sync(self._start)
Expand Down
13 changes: 12 additions & 1 deletion dask_kubernetes/helm/tests/test_helm.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import pytest
import pytest_asyncio

import subprocess
import os.path

import dask.config
from distributed import Client
from distributed.core import Status
from dask_ctl.discovery import (
list_discovery_methods,
Expand Down Expand Up @@ -79,7 +81,7 @@ def release(k8s_cluster, chart_name, test_namespace, release_name, config_path):
subprocess.check_output(["helm", "delete", "-n", test_namespace, release_name])


@pytest.fixture
@pytest_asyncio.fixture
async def cluster(k8s_cluster, release, test_namespace):
from dask_kubernetes import HelmCluster

Expand Down Expand Up @@ -122,6 +124,15 @@ def test_import():
assert issubclass(HelmCluster, Cluster)


def test_loop(k8s_cluster, release, test_namespace):
from dask_kubernetes import HelmCluster

with Client(nthreads=[]) as client, HelmCluster(
release_name=release, namespace=test_namespace, loop=client.loop
) as cluster:
assert cluster.loop is client.loop


def test_raises_on_non_existant_release(k8s_cluster):
from dask_kubernetes import HelmCluster

Expand Down
2 changes: 1 addition & 1 deletion dask_kubernetes/operator/tests/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


@pytest.fixture()
async def gen_cluster(k8s_cluster):
def gen_cluster(k8s_cluster):
"""Yields an instantiated context manager for creating/deleting a simple cluster."""

@asynccontextmanager
Expand Down
6 changes: 3 additions & 3 deletions requirements-test.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
flake8>=3.7
black>=18.9b0
dask-ctl>=2021.3.0
pytest>=5.3
pytest-asyncio>=0.10.0
pytest>=7.1
pytest-asyncio>=0.17
git+https://codeberg.org/hjacobs/pytest-kind.git
pytest-timeout
pytest-rerunfailures
git+http://github.com/elemental-lf/k8s-crd-resolver
git+http://github.com/elemental-lf/k8s-crd-resolver
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,4 @@ parentdir_prefix = dask-kubernetes-
addopts = -v --keep-cluster --durations=10
timeout = 300
reruns = 5
asyncio_mode = auto
asyncio_mode = strict