Skip to content

Commit

Permalink
use async with Client: in tests (#6921)
Browse files Browse the repository at this point in the history
  • Loading branch information
graingert authored Aug 22, 2022
1 parent 11616a3 commit b7e184a
Show file tree
Hide file tree
Showing 11 changed files with 570 additions and 615 deletions.
93 changes: 42 additions & 51 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,50 +117,47 @@ def scale_up(self, n, **kwargs):

@gen_test()
async def test_min_max():
cluster = await LocalCluster(
async with LocalCluster(
n_workers=0,
silence_logs=False,
processes=False,
dashboard_address=":0",
asynchronous=True,
threads_per_worker=1,
)
try:
) as cluster:
adapt = cluster.adapt(minimum=1, maximum=2, interval="20 ms", wait_count=10)
c = await Client(cluster, asynchronous=True)

start = time()
while not cluster.scheduler.workers:
await asyncio.sleep(0.01)
assert time() < start + 1

await asyncio.sleep(0.2)
assert len(cluster.scheduler.workers) == 1
assert len(adapt.log) == 1 and adapt.log[-1][1] == {"status": "up", "n": 1}
async with Client(cluster, asynchronous=True) as c:
start = time()
while not cluster.scheduler.workers:
await asyncio.sleep(0.01)
assert time() < start + 1

futures = c.map(slowinc, range(100), delay=0.1)
await asyncio.sleep(0.2)
assert len(cluster.scheduler.workers) == 1
assert len(adapt.log) == 1 and adapt.log[-1][1] == {"status": "up", "n": 1}

start = time()
while len(cluster.scheduler.workers) < 2:
await asyncio.sleep(0.01)
assert time() < start + 1
futures = c.map(slowinc, range(100), delay=0.1)

assert len(cluster.scheduler.workers) == 2
await asyncio.sleep(0.5)
assert len(cluster.scheduler.workers) == 2
assert len(cluster.workers) == 2
assert len(adapt.log) == 2 and all(d["status"] == "up" for _, d in adapt.log)
start = time()
while len(cluster.scheduler.workers) < 2:
await asyncio.sleep(0.01)
assert time() < start + 1

assert len(cluster.scheduler.workers) == 2
await asyncio.sleep(0.5)
assert len(cluster.scheduler.workers) == 2
assert len(cluster.workers) == 2
assert len(adapt.log) == 2 and all(
d["status"] == "up" for _, d in adapt.log
)

del futures
del futures

start = time()
while len(cluster.scheduler.workers) != 1:
await asyncio.sleep(0.01)
assert time() < start + 2
assert adapt.log[-1][1]["status"] == "down"
finally:
await c.close()
await cluster.close()
start = time()
while len(cluster.scheduler.workers) != 1:
await asyncio.sleep(0.01)
assert time() < start + 2
assert adapt.log[-1][1]["status"] == "down"


@gen_test()
Expand Down Expand Up @@ -194,16 +191,14 @@ async def test_adapt_quickly():
Instead we want to wait a few beats before removing a worker in case the
user is taking a brief pause between work
"""
cluster = await LocalCluster(
async with LocalCluster(
n_workers=0,
asynchronous=True,
processes=False,
silence_logs=False,
dashboard_address=":0",
)
client = await Client(cluster, asynchronous=True)
adapt = cluster.adapt(interval="20 ms", wait_count=5, maximum=10)
try:
) as cluster, Client(cluster, asynchronous=True) as client:
adapt = cluster.adapt(interval="20 ms", wait_count=5, maximum=10)
future = client.submit(slowinc, 1, delay=0.100)
await wait(future)
assert len(adapt.log) == 1
Expand Down Expand Up @@ -241,9 +236,6 @@ async def test_adapt_quickly():

await asyncio.sleep(0.1)
assert len(cluster.workers) == 1
finally:
await client.close()
await cluster.close()


@gen_test()
Expand All @@ -255,20 +247,19 @@ async def test_adapt_down():
processes=False,
silence_logs=False,
dashboard_address=":0",
) as cluster:
async with Client(cluster, asynchronous=True) as client:
cluster.adapt(interval="20ms", maximum=5)
) as cluster, Client(cluster, asynchronous=True) as client:
cluster.adapt(interval="20ms", maximum=5)

futures = client.map(slowinc, range(1000), delay=0.1)
while len(cluster.scheduler.workers) < 5:
await asyncio.sleep(0.1)
futures = client.map(slowinc, range(1000), delay=0.1)
while len(cluster.scheduler.workers) < 5:
await asyncio.sleep(0.1)

cluster.adapt(maximum=2)
cluster.adapt(maximum=2)

start = time()
while len(cluster.scheduler.workers) != 2:
await asyncio.sleep(0.1)
assert time() < start + 60
start = time()
while len(cluster.scheduler.workers) != 2:
await asyncio.sleep(0.1)
assert time() < start + 60


@gen_test()
Expand Down
64 changes: 29 additions & 35 deletions distributed/deploy/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
import sys
from threading import Lock
from time import sleep
from unittest import mock
from urllib.parse import urlparse

import pytest
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop

from dask.system import CPU_COUNT

Expand Down Expand Up @@ -245,19 +245,21 @@ def test_Client_solo(loop):
@gen_test()
async def test_duplicate_clients():
pytest.importorskip("bokeh")
c1 = await Client(
async with Client(
processes=False, silence_logs=False, dashboard_address=9876, asynchronous=True
)
with pytest.warns(Warning) as info:
c2 = await Client(
processes=False,
silence_logs=False,
dashboard_address=9876,
asynchronous=True,
)

assert "dashboard" in c1.cluster.scheduler.services
assert "dashboard" in c2.cluster.scheduler.services
) as c1:
c1_services = c1.cluster.scheduler.services
with pytest.warns(Warning) as info:
async with Client(
processes=False,
silence_logs=False,
dashboard_address=9876,
asynchronous=True,
) as c2:
c2_services = c2.cluster.scheduler.services

assert c1_services == {"dashboard": mock.ANY}
assert c2_services == {"dashboard": mock.ANY}

assert any(
all(
Expand All @@ -266,8 +268,6 @@ async def test_duplicate_clients():
)
for msg in info.list
)
await c1.close()
await c2.close()


def test_Client_kwargs(loop):
Expand Down Expand Up @@ -824,35 +824,29 @@ class MyCluster(LocalCluster):
def scale_down(self, *args, **kwargs):
pass

loop = IOLoop.current()
cluster = await MyCluster(
async with MyCluster(
n_workers=0,
processes=False,
silence_logs=False,
dashboard_address=":0",
loop=loop,
loop=None,
asynchronous=True,
)
c = await Client(cluster, asynchronous=True)

assert not cluster.workers

await cluster.scale(2)
) as cluster, Client(cluster, asynchronous=True) as c:
assert not cluster.workers

start = time()
while len(cluster.scheduler.workers) != 2:
await asyncio.sleep(0.01)
assert time() < start + 3
await cluster.scale(2)

await cluster.scale(1)
start = time()
while len(cluster.scheduler.workers) != 2:
await asyncio.sleep(0.01)
assert time() < start + 3

start = time()
while len(cluster.scheduler.workers) != 1:
await asyncio.sleep(0.01)
assert time() < start + 3
await cluster.scale(1)

await c.close()
await cluster.close()
start = time()
while len(cluster.scheduler.workers) != 1:
await asyncio.sleep(0.01)
assert time() < start + 3


def test_local_tls_restart(loop):
Expand Down
Loading

0 comments on commit b7e184a

Please sign in to comment.