Skip to content

Commit

Permalink
Transitions caused by worker death use old 'worker-connect' stimulus_id
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Jun 30, 2022
1 parent ff250f2 commit b9d1e1c
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 33 deletions.
29 changes: 13 additions & 16 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3553,7 +3553,7 @@ def heartbeat_worker(
@log_errors
async def add_worker(
self,
comm=None,
comm: Comm,
*,
address: str,
status: str,
Expand All @@ -3575,8 +3575,8 @@ async def add_worker(
versions: dict[str, Any] | None = None,
nanny=None,
extra=None,
stimulus_id=None,
):
stimulus_id: str,
) -> None:
"""Add a new worker to the cluster"""
address = self.coerce_address(address, resolve_address)
address = normalize_address(address)
Expand All @@ -3591,8 +3591,7 @@ async def add_worker(
f"Keys: {list(nbytes)}"
)
logger.error(err)
if comm:
await comm.write({"status": "error", "message": err, "time": time()})
await comm.write({"status": "error", "message": err, "time": time()})
return

if name in self.aliases:
Expand All @@ -3602,8 +3601,7 @@ async def add_worker(
"message": "name taken, %s" % name,
"time": time(),
}
if comm:
await comm.write(msg)
await comm.write(msg)
return

self.log_event(address, {"action": "add-worker"})
Expand Down Expand Up @@ -3682,7 +3680,6 @@ async def add_worker(
"worker-plugins": self.worker_plugins,
}

cs: ClientState
version_warning = version_module.error_message(
version_module.get_versions(),
merge(
Expand All @@ -3694,12 +3691,11 @@ async def add_worker(
)
msg.update(version_warning)

if comm:
await comm.write(msg)
await comm.write(msg)
# This will keep running until the worker is removed
await self.handle_worker(comm, address)

await self.handle_worker(comm=comm, worker=address, stimulus_id=stimulus_id)

async def add_nanny(self, comm):
async def add_nanny(self) -> dict[str, Any]:
msg = {
"status": "OK",
"nanny-plugins": self.nanny_plugins,
Expand Down Expand Up @@ -4807,7 +4803,7 @@ async def handle_request_refresh_who_has(
}
)

async def handle_worker(self, comm=None, worker=None, stimulus_id=None):
async def handle_worker(self, comm: Comm, worker: str) -> None:
"""
Listen to responses from a single worker
Expand All @@ -4817,7 +4813,6 @@ async def handle_worker(self, comm=None, worker=None, stimulus_id=None):
--------
Scheduler.handle_client: Equivalent coroutine for clients
"""
assert stimulus_id
comm.name = "Scheduler connection to worker"
worker_comm = self.stream_comms[worker]
worker_comm.start(comm)
Expand All @@ -4827,7 +4822,9 @@ async def handle_worker(self, comm=None, worker=None, stimulus_id=None):
finally:
if worker in self.stream_comms:
worker_comm.abort()
await self.remove_worker(address=worker, stimulus_id=stimulus_id)
await self.remove_worker(
worker, stimulus_id=f"handle-worker-cleanup-{time()}"
)

def add_plugin(
self,
Expand Down
17 changes: 0 additions & 17 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1339,23 +1339,6 @@ async def test_scheduler_file():
await s.close()


@pytest.mark.xfail()
@gen_cluster(client=True, nthreads=[])
async def test_non_existent_worker(c, s):
with dask.config.set({"distributed.comm.timeouts.connect": "100ms"}):
await s.add_worker(
address="127.0.0.1:5738",
status="running",
nthreads=2,
nbytes={},
host_info={},
)
futures = c.map(inc, range(10))
await asyncio.sleep(0.300)
assert not s.workers
assert all(ts.state == "no-worker" for ts in s.tasks.values())


@pytest.mark.parametrize(
"host", ["tcp://0.0.0.0", "tcp://127.0.0.1", "tcp://127.0.0.1:38275"]
)
Expand Down

0 comments on commit b9d1e1c

Please sign in to comment.