diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 09488969edc..f56fed08f49 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -381,13 +381,16 @@ async def _correct_state_internal(self) -> None: if workers: worker_futs = [asyncio.ensure_future(w) for w in workers] await asyncio.wait(worker_futs) - self.workers.update(dict(zip(to_open, workers))) for w in workers: w._cluster = weakref.ref(self) # Collect exceptions from failed workers. This must happen after all # *other* workers have finished initialising, so that we can have a # proper teardown. await asyncio.gather(*worker_futs) + # And remember the new workers, note this doesn't + # handle partial failure since the gather above would + # raise in that case. See also #5919. + self.workers.update(dict(zip(to_open, workers))) def _update_worker_status(self, op, msg): if op == "remove": diff --git a/distributed/deploy/tests/test_local.py b/distributed/deploy/tests/test_local.py index 2ddb0f0bbf2..068e02dc5ea 100644 --- a/distributed/deploy/tests/test_local.py +++ b/distributed/deploy/tests/test_local.py @@ -1275,6 +1275,26 @@ def test_localcluster_start_exception(loop): pass +def test_localcluster_scale_exception(loop): + with raises_with_cause( + RuntimeError, + "Nanny failed to start", + RuntimeError, + "Worker failed to start", + ImportError, + "my_nonexistent_library", + ): + with LocalCluster( + n_workers=0, + threads_per_worker=1, + processes=True, + plugins={MyPlugin()}, + loop=loop, + ) as cluster: + cluster.scale(1) + cluster.sync(cluster._correct_state) + + def test_localcluster_get_client(loop): with LocalCluster( n_workers=0, asynchronous=False, dashboard_address=":0", loop=loop