From 11509cba9c3f462bf4d56eceee4e06e0479d52f6 Mon Sep 17 00:00:00 2001 From: Kevin Hunter Kesling Date: Wed, 12 Jul 2023 16:02:33 -0400 Subject: [PATCH] Check *all* child process for liveness every HB (#2817) Prior to this, a dead process in a pool of N processes would take, on average, (N * heartbeat) / 2 seconds to be restarted. --- parsl/executors/high_throughput/process_worker_pool.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/parsl/executors/high_throughput/process_worker_pool.py b/parsl/executors/high_throughput/process_worker_pool.py index c01c0b2ef2..f65ec560e5 100755 --- a/parsl/executors/high_throughput/process_worker_pool.py +++ b/parsl/executors/high_throughput/process_worker_pool.py @@ -370,7 +370,7 @@ def push_results(self, kill_event): logger.critical("Exiting") @wrap_with_logs - def worker_watchdog(self, kill_event): + def worker_watchdog(self, kill_event: threading.Event): """Keeps workers alive. Parameters: @@ -381,7 +381,7 @@ def worker_watchdog(self, kill_event): logger.debug("Starting worker watchdog") - while not kill_event.is_set(): + while not kill_event.wait(self.heartbeat_period): for worker_id, p in self.procs.items(): if not p.is_alive(): logger.error("Worker {} has died".format(worker_id)) @@ -409,7 +409,6 @@ def worker_watchdog(self, kill_event): name="HTEX-Worker-{}".format(worker_id)) self.procs[worker_id] = p logger.info("Worker {} has been restarted".format(worker_id)) - time.sleep(self.heartbeat_period) logger.critical("Exiting")