Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Transitions caused by worker death use old 'worker-connect' stimulus_id #6657

Merged
merged 1 commit into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 13 additions & 16 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3553,7 +3553,7 @@ def heartbeat_worker(
@log_errors
async def add_worker(
self,
comm=None,
comm: Comm,
*,
address: str,
status: str,
Expand All @@ -3575,8 +3575,8 @@ async def add_worker(
versions: dict[str, Any] | None = None,
nanny=None,
extra=None,
stimulus_id=None,
):
stimulus_id: str,
) -> None:
"""Add a new worker to the cluster"""
address = self.coerce_address(address, resolve_address)
address = normalize_address(address)
Expand All @@ -3591,8 +3591,7 @@ async def add_worker(
f"Keys: {list(nbytes)}"
)
logger.error(err)
if comm:
await comm.write({"status": "error", "message": err, "time": time()})
await comm.write({"status": "error", "message": err, "time": time()})
return

if name in self.aliases:
Expand All @@ -3602,8 +3601,7 @@ async def add_worker(
"message": "name taken, %s" % name,
"time": time(),
}
if comm:
await comm.write(msg)
await comm.write(msg)
return

self.log_event(address, {"action": "add-worker"})
Expand Down Expand Up @@ -3682,7 +3680,6 @@ async def add_worker(
"worker-plugins": self.worker_plugins,
}

cs: ClientState
version_warning = version_module.error_message(
version_module.get_versions(),
merge(
Expand All @@ -3694,12 +3691,11 @@ async def add_worker(
)
msg.update(version_warning)

if comm:
await comm.write(msg)
await comm.write(msg)
# This will keep running until the worker is removed
await self.handle_worker(comm, address)

await self.handle_worker(comm=comm, worker=address, stimulus_id=stimulus_id)

async def add_nanny(self, comm):
async def add_nanny(self) -> dict[str, Any]:
msg = {
"status": "OK",
"nanny-plugins": self.nanny_plugins,
Expand Down Expand Up @@ -4807,7 +4803,7 @@ async def handle_request_refresh_who_has(
}
)

async def handle_worker(self, comm=None, worker=None, stimulus_id=None):
async def handle_worker(self, comm: Comm, worker: str) -> None:
"""
Listen to responses from a single worker

Expand All @@ -4817,7 +4813,6 @@ async def handle_worker(self, comm=None, worker=None, stimulus_id=None):
--------
Scheduler.handle_client: Equivalent coroutine for clients
"""
assert stimulus_id
comm.name = "Scheduler connection to worker"
worker_comm = self.stream_comms[worker]
worker_comm.start(comm)
Expand All @@ -4827,7 +4822,9 @@ async def handle_worker(self, comm=None, worker=None, stimulus_id=None):
finally:
if worker in self.stream_comms:
worker_comm.abort()
await self.remove_worker(address=worker, stimulus_id=stimulus_id)
await self.remove_worker(
worker, stimulus_id=f"handle-worker-cleanup-{time()}"
)

def add_plugin(
self,
Expand Down
17 changes: 0 additions & 17 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1339,23 +1339,6 @@ async def test_scheduler_file():
await s.close()


@pytest.mark.xfail()
@gen_cluster(client=True, nthreads=[])
async def test_non_existent_worker(c, s):
with dask.config.set({"distributed.comm.timeouts.connect": "100ms"}):
await s.add_worker(
address="127.0.0.1:5738",
status="running",
nthreads=2,
nbytes={},
host_info={},
)
futures = c.map(inc, range(10))
await asyncio.sleep(0.300)
assert not s.workers
assert all(ts.state == "no-worker" for ts in s.tasks.values())


@pytest.mark.parametrize(
"host", ["tcp://0.0.0.0", "tcp://127.0.0.1", "tcp://127.0.0.1:38275"]
)
Expand Down