Skip to content

Commit

Permalink
Fix flaky test_broken_worker
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Oct 4, 2023
1 parent dfc4358 commit 531b1b4
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ jobs:
set -o pipefail
mkdir reports
pytest distributed \
pytest distributed/deploy/tests/test_spec_cluster.py \ # DNM
-m "not avoid_ci and ${{ matrix.partition }}" --runslow \
--leaks=fds,processes,threads \
--junitxml reports/pytest.xml -o junit_suite_name=$TEST_ID \
Expand Down
24 changes: 16 additions & 8 deletions distributed/deploy/spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,13 +379,17 @@ async def _correct_state_internal(self) -> None:
self._created.add(worker)
workers.append(worker)
if workers:
await asyncio.wait(
[asyncio.create_task(_wrap_awaitable(w)) for w in workers]
)
worker_futs = [asyncio.ensure_future(w) for w in workers]
await asyncio.wait(worker_futs)
self.workers.update(dict(zip(to_open, workers)))
for w in workers:
w._cluster = weakref.ref(self)
# Collect exceptions from failed workers. This must happen after all
# *other* workers have finished initialising, so that we can have a
# proper teardown.
await asyncio.gather(*worker_futs)
for w in workers:
await w # for tornado gen.coroutine support
self.workers.update(dict(zip(to_open, workers)))

def _update_worker_status(self, op, msg):
if op == "remove":
Expand Down Expand Up @@ -467,10 +471,14 @@ async def _close(self):
await super()._close()

async def __aenter__(self):
await self
await self._correct_state()
assert self.status == Status.running
return self
try:
await self
await self._correct_state()
assert self.status == Status.running
return self
except Exception:
await self.close()
raise

def _threads_per_worker(self) -> int:
"""Return the number of threads per worker for new workers"""
Expand Down
13 changes: 4 additions & 9 deletions distributed/deploy/tests/test_spec_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ async def test_restart():
await asyncio.sleep(0.01)


@pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out")
@pytest.mark.repeat(50) # DNM
@gen_test()
async def test_broken_worker():
class BrokenWorkerException(Exception):
Expand All @@ -216,7 +216,6 @@ class BrokenWorkerException(Exception):
class BrokenWorker(Worker):
def __await__(self):
async def _():
self.status = Status.closed
raise BrokenWorkerException("Worker Broken")

return _().__await__()
Expand All @@ -226,13 +225,9 @@ async def _():
workers={"good": {"cls": Worker}, "bad": {"cls": BrokenWorker}},
scheduler=scheduler,
)
try:
with pytest.raises(BrokenWorkerException, match=r"Worker Broken"):
async with cluster:
pass
finally:
# FIXME: SpecCluster leaks if SpecCluster.__aenter__ raises
await cluster.close()
with pytest.raises(BrokenWorkerException, match=r"Worker Broken"):
async with cluster:
pass


@pytest.mark.skipif(WINDOWS, reason="HTTP Server doesn't close out")
Expand Down

0 comments on commit 531b1b4

Please sign in to comment.