diff --git a/locust/runners.py b/locust/runners.py index c1cd29beca..659df576fd 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -74,6 +74,7 @@ HEARTBEAT_INTERVAL = 1 HEARTBEAT_LIVENESS = 3 HEARTBEAT_DEAD_INTERNAL = -60 +MASTER_HEARTBEAT_TIMEOUT = 60 FALLBACK_INTERVAL = 5 CONNECT_TIMEOUT = 5 CONNECT_RETRY_COUNT = 60 @@ -1056,6 +1057,9 @@ def client_listener(self) -> NoReturn: ) if "current_memory_usage" in msg.data: c.memory_usage = msg.data["current_memory_usage"] + self.send_message("heartbeat", client_id=msg.node_id) + else: + logging.debug(f"Got heartbeat message from unknown worker {msg.node_id}") elif msg.type == "stats": self.environment.events.worker_report.fire(client_id=msg.node_id, data=msg.data) elif msg.type == "spawning": @@ -1149,6 +1153,7 @@ def __init__(self, environment: "Environment", master_host: str, master_port: in super().__init__(environment) self.retry = 0 self.connected = False + self.last_heartbeat_timestamp = time.time() # will be overwritten on connect, but lets keep the linter happy self.connection_event = Event() self.worker_state = STATE_INIT self.client_id = socket.gethostname() + "_" + uuid4().hex @@ -1160,6 +1165,7 @@ def __init__(self, environment: "Environment", master_host: str, master_port: in self.greenlet.spawn(self.worker).link_exception(greenlet_exception_handler) self.connect_to_master() self.greenlet.spawn(self.heartbeat).link_exception(greenlet_exception_handler) + self.greenlet.spawn(self.heartbeat_timeout_checker).link_exception(greenlet_exception_handler) self.greenlet.spawn(self.stats_reporter).link_exception(greenlet_exception_handler) # register listener for when all users have spawned, and report it to the master node @@ -1249,6 +1255,13 @@ def heartbeat(self) -> NoReturn: self.reset_connection() gevent.sleep(HEARTBEAT_INTERVAL) + def heartbeat_timeout_checker(self) -> NoReturn: + while True: + gevent.sleep(1) + if self.connected and self.last_heartbeat_timestamp < time.time() - MASTER_HEARTBEAT_TIMEOUT: + logger.error(f"Didn't get heartbeat from master in over {MASTER_HEARTBEAT_TIMEOUT}s") + self.quit() + def reset_connection(self) -> None: logger.info("Reset connection to master") try: @@ -1328,6 +1341,8 @@ def worker(self) -> NoReturn: elif msg.type == "reconnect": logger.warning("Received reconnect message from master. Resetting RPC connection.") self.reset_connection() + elif msg.type == "heartbeat": + self.last_heartbeat_timestamp = time.time() elif msg.type in self.custom_messages: logger.debug("Received %s message from master" % msg.type) self.custom_messages[msg.type](environment=self.environment, msg=msg) @@ -1378,6 +1393,7 @@ def connect_to_master(self): raise ConnectionError() self.connect_to_master() self.connected = True + self.last_heartbeat_timestamp = time.time() def _format_user_classes_count_for_log(user_classes_count: Dict[str, int]) -> str: diff --git a/locust/test/test_main.py b/locust/test/test_main.py index 5ab533f69d..ce740103f8 100644 --- a/locust/test/test_main.py +++ b/locust/test/test_main.py @@ -1860,3 +1860,58 @@ def test_processes_ctrl_c(self): self.assertIn("(index 3) reported as ready", stderr) self.assertIn("The last worker quit, stopping test", stderr) self.assertIn("Shutting down (exit code 0)", stderr) + + def test_workers_shut_down_if_master_is_gone(self): + content = """ +from locust import HttpUser, task, constant, runners +runners.MASTER_HEARTBEAT_TIMEOUT = 3 + +class AnyUser(HttpUser): + host = "http://127.0.0.1:8089" + wait_time = constant(1) + @task + def my_task(self): + print("worker index:", self.environment.runner.worker_index) +""" + with mock_locustfile(content=content) as mocked: + master_proc = subprocess.Popen( + [ + "locust", + "-f", + mocked.file_path, + "--master", + "--headless", + "--expect-workers", + "2", + ], + stdout=PIPE, + stderr=PIPE, + text=True, + ) + + worker_parent_proc = subprocess.Popen( + [ + "locust", + "-f", + mocked.file_path, + "--worker", + "--processes", + "2", + "--headless", + ], + stdout=PIPE, + stderr=PIPE, + text=True, + ) + gevent.sleep(1) + master_proc.kill() + master_proc.wait() + try: + _, worker_stderr = worker_parent_proc.communicate(timeout=7) + except Exception: + worker_parent_proc.kill() + _, worker_stderr = worker_parent_proc.communicate() + assert False, f"worker never finished: {worker_stderr}" + + self.assertNotIn("Traceback", worker_stderr) + self.assertIn("Didn't get heartbeat from master in over ", worker_stderr)