From f4a25eb3a5c72ce2695a04c9fb197ab0dc5f0574 Mon Sep 17 00:00:00 2001 From: fjetter Date: Wed, 18 May 2022 11:37:04 +0200 Subject: [PATCH] Remove report and safe from Worker.close --- distributed/chaos.py | 4 +- distributed/client.py | 2 +- distributed/deploy/spec.py | 2 +- .../tests/test_cluster_dump_plugin.py | 2 +- distributed/nanny.py | 12 ++--- distributed/scheduler.py | 49 +++++++------------ distributed/tests/test_client.py | 2 +- distributed/tests/test_nanny.py | 2 +- distributed/tests/test_scheduler.py | 22 ++++++--- distributed/tests/test_worker.py | 4 +- distributed/utils_test.py | 8 +-- distributed/worker.py | 37 +++++++------- 12 files changed, 68 insertions(+), 78 deletions(-) diff --git a/distributed/chaos.py b/distributed/chaos.py index 1bcba1f7950..87255f1dd67 100644 --- a/distributed/chaos.py +++ b/distributed/chaos.py @@ -56,9 +56,7 @@ async def setup(self, worker): ) def graceful(self): - asyncio.create_task( - self.worker.close(report=False, nanny=False, executor_wait=False) - ) + asyncio.create_task(self.worker.close(nanny=False, executor_wait=False)) def sys_exit(self): sys.exit(0) diff --git a/distributed/client.py b/distributed/client.py index caf42dc19ad..af52607c9df 100644 --- a/distributed/client.py +++ b/distributed/client.py @@ -1630,7 +1630,7 @@ async def _shutdown(self): else: with suppress(CommClosedError): self.status = "closing" - await self.scheduler.terminate(close_workers=True) + await self.scheduler.terminate() def shutdown(self): """Shut down the connected scheduler and workers diff --git a/distributed/deploy/spec.py b/distributed/deploy/spec.py index 96a66d63aed..778f0bf19be 100644 --- a/distributed/deploy/spec.py +++ b/distributed/deploy/spec.py @@ -411,7 +411,7 @@ async def _close(self): if self.scheduler_comm: async with self._lock: with suppress(OSError): - await self.scheduler_comm.terminate(close_workers=True) + await self.scheduler_comm.terminate() await self.scheduler_comm.close_rpc() else: logger.warning("Cluster closed without starting up") diff --git a/distributed/diagnostics/tests/test_cluster_dump_plugin.py b/distributed/diagnostics/tests/test_cluster_dump_plugin.py index 67ce815954d..b084e761603 100644 --- a/distributed/diagnostics/tests/test_cluster_dump_plugin.py +++ b/distributed/diagnostics/tests/test_cluster_dump_plugin.py @@ -14,7 +14,7 @@ async def test_cluster_dump_plugin(c, s, *workers, tmp_path): f2 = c.submit(inc, f1) assert (await f2) == 3 - await s.close(close_workers=True) + await s.close() dump = DumpArtefact.from_url(str(dump_file)) assert {f1.key, f2.key} == set(dump.scheduler_story(f1.key, f2.key).keys()) diff --git a/distributed/nanny.py b/distributed/nanny.py index 1786ae04dcf..01c49a207bd 100644 --- a/distributed/nanny.py +++ b/distributed/nanny.py @@ -471,7 +471,7 @@ async def plugin_remove(self, name=None): return {"status": "OK"} - async def restart(self, timeout=30, executor_wait=True): + async def restart(self, timeout=30): async def _(): if self.process is not None: await self.kill() @@ -557,7 +557,7 @@ def close_gracefully(self): """ self.status = Status.closing_gracefully - async def close(self, comm=None, timeout=5, report=None): + async def close(self, timeout=5): """ Close the worker process, stop all comms. """ @@ -570,9 +570,8 @@ async def close(self, comm=None, timeout=5, report=None): self.status = Status.closing logger.info( - "Closing Nanny at %r. Report closure to scheduler: %s", + "Closing Nanny at %r.", self.address_safe, - report, ) for preload in self.preloads: @@ -595,9 +594,8 @@ async def close(self, comm=None, timeout=5, report=None): self.process = None await self.rpc.close() self.status = Status.closed - if comm: - await comm.write("OK") await super().close() + return "OK" async def _log_event(self, topic, msg): await self.scheduler.log_event( @@ -838,9 +836,7 @@ def _run( async def do_stop(timeout=5, executor_wait=True): try: await worker.close( - report=True, nanny=False, - safe=True, # TODO: Graceful or not? executor_wait=executor_wait, timeout=timeout, ) diff --git a/distributed/scheduler.py b/distributed/scheduler.py index 835e1b3ba42..805609a19ec 100644 --- a/distributed/scheduler.py +++ b/distributed/scheduler.py @@ -3347,7 +3347,7 @@ def del_scheduler_file(): setproctitle(f"dask-scheduler [{self.address}]") return self - async def close(self, fast=False, close_workers=False): + async def close(self): """Send cleanup signal to all coroutines then wait until finished See Also @@ -3370,19 +3370,6 @@ async def close(self, fast=False, close_workers=False): for preload in self.preloads: await preload.teardown() - if close_workers: - await self.broadcast(msg={"op": "close_gracefully"}, nanny=True) - for worker in self.workers: - # Report would require the worker to unregister with the - # currently closing scheduler. This is not necessary and might - # delay shutdown of the worker unnecessarily - self.worker_send(worker, {"op": "close", "report": False}) - for i in range(20): # wait a second for send signals to clear - if self.workers: - await asyncio.sleep(0.05) - else: - break - await asyncio.gather( *[plugin.close() for plugin in list(self.plugins.values())] ) @@ -3399,15 +3386,16 @@ async def close(self, fast=False, close_workers=False): logger.info("Scheduler closing all comms") futures = [] - for w, comm in list(self.stream_comms.items()): + for _, comm in list(self.stream_comms.items()): if not comm.closed(): - comm.send({"op": "close", "report": False}) + # This closes the Worker and ensures that if a Nanny is around, + # it is closed as well + comm.send({"op": "terminate"}) comm.send({"op": "close-stream"}) with suppress(AttributeError): futures.append(comm.close()) - for future in futures: # TODO: do all at once - await future + await asyncio.gather(*futures) for comm in self.client_comms.values(): comm.abort() @@ -3431,8 +3419,8 @@ async def close_worker(self, worker: str, stimulus_id: str, safe: bool = False): """ logger.info("Closing worker %s", worker) self.log_event(worker, {"action": "close-worker"}) - # FIXME: This does not handle nannies - self.worker_send(worker, {"op": "close", "report": False}) + ws = self.workers[worker] + self.worker_send(worker, {"op": "close", "nanny": bool(ws.nanny)}) await self.remove_worker(address=worker, safe=safe, stimulus_id=stimulus_id) ########### @@ -4215,7 +4203,7 @@ async def remove_worker(self, address, stimulus_id, safe=False, close=True): logger.info("Remove worker %s", ws) if close: with suppress(AttributeError, CommClosedError): - self.stream_comms[address].send({"op": "close", "report": False}) + self.stream_comms[address].send({"op": "close"}) self.remove_resources(address) @@ -4776,7 +4764,7 @@ def handle_long_running( ws.long_running.add(ts) self.check_idle_saturated(ws) - def handle_worker_status_change( + async def handle_worker_status_change( self, status: str, worker: str, stimulus_id: str ) -> None: ws = self.workers.get(worker) @@ -4804,9 +4792,12 @@ def handle_worker_status_change( worker_msgs: dict = {} self._transitions(recs, client_msgs, worker_msgs, stimulus_id) self.send_all(client_msgs, worker_msgs) - - else: - self.running.discard(ws) + elif ws.status == Status.paused: + self.running.remove(ws) + elif ws.status == Status.closing: + await self.remove_worker( + address=ws.address, stimulus_id=stimulus_id, close=False + ) async def handle_worker(self, comm=None, worker=None, stimulus_id=None): """ @@ -5155,12 +5146,7 @@ async def restart(self, client=None, timeout=30): ] resps = All( - [ - nanny.restart( - close=True, timeout=timeout * 0.8, executor_wait=False - ) - for nanny in nannies - ] + [nanny.restart(close=True, timeout=timeout * 0.8) for nanny in nannies] ) try: resps = await asyncio.wait_for(resps, timeout) @@ -6053,6 +6039,7 @@ async def retire_workers( prev_status = ws.status ws.status = Status.closing_gracefully self.running.discard(ws) + # FIXME: We should send a message to the nanny first. self.stream_comms[ws.address].send( { "op": "worker-status-change", diff --git a/distributed/tests/test_client.py b/distributed/tests/test_client.py index dbf16992655..e99ea442850 100644 --- a/distributed/tests/test_client.py +++ b/distributed/tests/test_client.py @@ -3654,7 +3654,7 @@ async def hard_stop(s): except CancelledError: break - await w.close(report=False) + await w.close() await c._close(fast=True) diff --git a/distributed/tests/test_nanny.py b/distributed/tests/test_nanny.py index bef0cc04010..06b4f95c8c1 100644 --- a/distributed/tests/test_nanny.py +++ b/distributed/tests/test_nanny.py @@ -404,7 +404,7 @@ def remove_worker(self, **kwargs): @gen_cluster(client=True, nthreads=[]) -async def test_nanny_closes_cleanly_2(c, s): +async def test_nanny_closes_cleanly_if_worker_is_terminated(c, s): async with Nanny(s.address) as n: async with c.rpc(n.worker_address) as w: IOLoop.current().add_callback(w.terminate) diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py index 5c4056cfcd7..76c335af5c5 100644 --- a/distributed/tests/test_scheduler.py +++ b/distributed/tests/test_scheduler.py @@ -1824,10 +1824,12 @@ async def test_result_type(c, s, a, b): @gen_cluster() -async def test_close_workers(s, a, b): - await s.close(close_workers=True) - assert a.status == Status.closed - assert b.status == Status.closed +async def test_close_workers(s, *workers): + await s.close() + + for w in workers: + if not w.status == Status.closed: + await asyncio.sleep(0.1) @pytest.mark.skipif(not LINUX, reason="Need 127.0.0.2 to mean localhost") @@ -2718,7 +2720,7 @@ async def test_memory_is_none(c, s): @gen_cluster() async def test_close_scheduler__close_workers_Worker(s, a, b): with captured_logger("distributed.comm", level=logging.DEBUG) as log: - await s.close(close_workers=True) + await s.close() while not a.status == Status.closed: await asyncio.sleep(0.05) log = log.getvalue() @@ -2728,7 +2730,7 @@ async def test_close_scheduler__close_workers_Worker(s, a, b): @gen_cluster(Worker=Nanny) async def test_close_scheduler__close_workers_Nanny(s, a, b): with captured_logger("distributed.comm", level=logging.DEBUG) as log: - await s.close(close_workers=True) + await s.close() while not a.status == Status.closed: await asyncio.sleep(0.05) log = log.getvalue() @@ -2856,6 +2858,14 @@ async def test_rebalance_raises_missing_data3(c, s, a, b, explicit): futures = await c.scatter(range(100), workers=[a.address]) if explicit: + pytest.xfail( + reason="""Freeing keys and gathering data is using different + channels (stream vs explicit RPC). Therefore, the + partial-fail is very timing sensitive and subject to a race + condition. This test assumes that the data is freed before + the rebalance get_data requests come in but merely deleting + the futures is not sufficient to guarantee this""" + ) keys = [f.key for f in futures] del futures out = await s.rebalance(keys=keys) diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index 7421fed1756..1d5de4baedc 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -191,7 +191,7 @@ def g(): assert result == 123 await c.close() - await s.close(close_workers=True) + await s.close() assert not os.path.exists(os.path.join(a.local_directory, "foobar.py")) @@ -3029,7 +3029,7 @@ async def test_missing_released_zombie_tasks(c, s, a, b): while key not in b.tasks or b.tasks[key].state != "fetch": await asyncio.sleep(0.01) - await a.close(report=False) + await a.close() del f1, f2 diff --git a/distributed/utils_test.py b/distributed/utils_test.py index fc75ea18177..8d395dbae6f 100644 --- a/distributed/utils_test.py +++ b/distributed/utils_test.py @@ -132,7 +132,7 @@ def invalid_python_script(tmpdir_factory): async def cleanup_global_workers(): for worker in Worker._instances: - await worker.close(report=False, executor_wait=False) + await worker.close(executor_wait=False) @pytest.fixture @@ -877,7 +877,7 @@ async def start_cluster( await asyncio.sleep(0.01) if time() > start + 30: await asyncio.gather(*(w.close(timeout=1) for w in workers)) - await s.close(fast=True) + await s.close() check_invalid_worker_transitions(s) check_invalid_task_states(s) check_worker_fail_hard(s) @@ -931,7 +931,7 @@ async def end_cluster(s, workers): async def end_worker(w): with suppress(asyncio.TimeoutError, CommClosedError, EnvironmentError): - await w.close(report=False) + await w.close() await asyncio.gather(*(end_worker(w) for w in workers)) await s.close() # wait until scheduler stops completely @@ -1795,7 +1795,7 @@ def check_instances(): for w in Worker._instances: with suppress(RuntimeError): # closed IOLoop - w.loop.add_callback(w.close, report=False, executor_wait=False) + w.loop.add_callback(w.close, executor_wait=False) if w.status in WORKER_ANY_RUNNING: w.loop.add_callback(w.close) Worker._instances.clear() diff --git a/distributed/worker.py b/distributed/worker.py index 57a872455fd..3a9efd296d4 100644 --- a/distributed/worker.py +++ b/distributed/worker.py @@ -219,7 +219,7 @@ async def _force_close(self): 2. If it doesn't, log and kill the process """ try: - await asyncio.wait_for(self.close(nanny=False, executor_wait=False), 30) + await asyncio.wait_for(self.close(executor_wait=False), 30) except (Exception, BaseException): # <-- include BaseException here or not?? # Worker is in a very broken state if closing fails. We need to shut down immediately, # to ensure things don't get even worse and this worker potentially deadlocks the cluster. @@ -770,7 +770,7 @@ def __init__( "get_data": self.get_data, "update_data": self.update_data, "free_keys": self.handle_free_keys, - "terminate": self.close, + "terminate": self.terminate, "ping": pingpong, "upload_file": self.upload_file, "call_stack": self.get_call_stack, @@ -792,6 +792,7 @@ def __init__( stream_handlers = { "close": self.close, + "terminate": self.terminate, "cancel-compute": self.handle_cancel_compute, "acquire-replicas": self.handle_acquire_replicas, "compute-task": self.handle_compute_task, @@ -1217,12 +1218,12 @@ async def heartbeat(self): except CommClosedError: logger.warning("Heartbeat to scheduler failed", exc_info=True) if not self.reconnect: - await self.close(report=False) + await self.close(nanny=True) except OSError as e: # Scheduler is gone. Respect distributed.comm.timeouts.connect if "Timed out trying to connect" in str(e): logger.info("Timed out while trying to connect during heartbeat") - await self.close(report=False) + await self.close() else: logger.exception(e) raise e @@ -1241,7 +1242,7 @@ async def handle_scheduler(self, comm): "Connection to scheduler broken. Closing without reporting. Status: %s", self.status, ) - await self.close(report=False) + await self.close(nanny=True) async def upload_file(self, comm, filename=None, data=None, load=True): out_filename = os.path.join(self.local_directory, filename) @@ -1429,10 +1430,20 @@ async def start_unsafe(self): self.start_periodic_callbacks() return self + async def terminate(self, **kwargs): + return await self.close(nanny=True, **kwargs) + @log_errors async def close( - self, report=True, timeout=30, nanny=True, executor_wait=True, safe=False + self, + timeout=30, + executor_wait=True, + nanny=False, ): + # FIXME: The worker should not be allowed to close the nanny. Ownership + # is the other way round. If an external caller wants to close + # nanny+worker, the nanny must be notified first. ==> Remove kwarg + # nanny, see also Scheduler.retire_workers if self.status in (Status.closed, Status.closing): await self.finished() return @@ -1446,8 +1457,6 @@ async def close( logger.info("Stopping worker") if self.status not in WORKER_ANY_RUNNING: logger.info("Closed worker has not yet started: %s", self.status) - if not report: - logger.info("Not reporting worker closure to scheduler") if not executor_wait: logger.info("Not waiting on executor to close") self.status = Status.closing @@ -1511,16 +1520,6 @@ async def close( # otherwise c.close() - with suppress(EnvironmentError, TimeoutError): - if report and self.contact_address is not None: - await asyncio.wait_for( - self.scheduler.unregister( - address=self.contact_address, - safe=safe, - stimulus_id=f"worker-close-{time()}", - ), - timeout, - ) await self.scheduler.close_rpc() self._workdir.release() @@ -1598,7 +1597,7 @@ async def close_gracefully(self, restart=None): remove=False, stimulus_id=f"worker-close-gracefully-{time()}", ) - await self.close(safe=True, nanny=not restart) + await self.close(nanny=not restart) async def wait_until_closed(self): warnings.warn("wait_until_closed has moved to finished()")