Skip to content

Commit

Permalink
Merge pull request #2474 from locustio/shut-down-workers-if-master-go…
Browse files Browse the repository at this point in the history
…es-missing-for-too-long

Shut down workers if master goes missing for too long
  • Loading branch information
cyberw authored Nov 19, 2023
2 parents 340c9cb + dca3258 commit b930a90
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 0 deletions.
16 changes: 16 additions & 0 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
HEARTBEAT_INTERVAL = 1
HEARTBEAT_LIVENESS = 3
HEARTBEAT_DEAD_INTERNAL = -60
MASTER_HEARTBEAT_TIMEOUT = 60
FALLBACK_INTERVAL = 5
CONNECT_TIMEOUT = 5
CONNECT_RETRY_COUNT = 60
Expand Down Expand Up @@ -1056,6 +1057,9 @@ def client_listener(self) -> NoReturn:
)
if "current_memory_usage" in msg.data:
c.memory_usage = msg.data["current_memory_usage"]
self.send_message("heartbeat", client_id=msg.node_id)
else:
logging.debug(f"Got heartbeat message from unknown worker {msg.node_id}")
elif msg.type == "stats":
self.environment.events.worker_report.fire(client_id=msg.node_id, data=msg.data)
elif msg.type == "spawning":
Expand Down Expand Up @@ -1149,6 +1153,7 @@ def __init__(self, environment: "Environment", master_host: str, master_port: in
super().__init__(environment)
self.retry = 0
self.connected = False
self.last_heartbeat_timestamp = time.time() # will be overwritten on connect, but lets keep the linter happy
self.connection_event = Event()
self.worker_state = STATE_INIT
self.client_id = socket.gethostname() + "_" + uuid4().hex
Expand All @@ -1160,6 +1165,7 @@ def __init__(self, environment: "Environment", master_host: str, master_port: in
self.greenlet.spawn(self.worker).link_exception(greenlet_exception_handler)
self.connect_to_master()
self.greenlet.spawn(self.heartbeat).link_exception(greenlet_exception_handler)
self.greenlet.spawn(self.heartbeat_timeout_checker).link_exception(greenlet_exception_handler)
self.greenlet.spawn(self.stats_reporter).link_exception(greenlet_exception_handler)

# register listener for when all users have spawned, and report it to the master node
Expand Down Expand Up @@ -1249,6 +1255,13 @@ def heartbeat(self) -> NoReturn:
self.reset_connection()
gevent.sleep(HEARTBEAT_INTERVAL)

def heartbeat_timeout_checker(self) -> NoReturn:
while True:
gevent.sleep(1)
if self.connected and self.last_heartbeat_timestamp < time.time() - MASTER_HEARTBEAT_TIMEOUT:
logger.error(f"Didn't get heartbeat from master in over {MASTER_HEARTBEAT_TIMEOUT}s")
self.quit()

def reset_connection(self) -> None:
logger.info("Reset connection to master")
try:
Expand Down Expand Up @@ -1328,6 +1341,8 @@ def worker(self) -> NoReturn:
elif msg.type == "reconnect":
logger.warning("Received reconnect message from master. Resetting RPC connection.")
self.reset_connection()
elif msg.type == "heartbeat":
self.last_heartbeat_timestamp = time.time()
elif msg.type in self.custom_messages:
logger.debug("Received %s message from master" % msg.type)
self.custom_messages[msg.type](environment=self.environment, msg=msg)
Expand Down Expand Up @@ -1378,6 +1393,7 @@ def connect_to_master(self):
raise ConnectionError()
self.connect_to_master()
self.connected = True
self.last_heartbeat_timestamp = time.time()


def _format_user_classes_count_for_log(user_classes_count: Dict[str, int]) -> str:
Expand Down
55 changes: 55 additions & 0 deletions locust/test/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -1860,3 +1860,58 @@ def test_processes_ctrl_c(self):
self.assertIn("(index 3) reported as ready", stderr)
self.assertIn("The last worker quit, stopping test", stderr)
self.assertIn("Shutting down (exit code 0)", stderr)

def test_workers_shut_down_if_master_is_gone(self):
content = """
from locust import HttpUser, task, constant, runners
runners.MASTER_HEARTBEAT_TIMEOUT = 3
class AnyUser(HttpUser):
host = "http://127.0.0.1:8089"
wait_time = constant(1)
@task
def my_task(self):
print("worker index:", self.environment.runner.worker_index)
"""
with mock_locustfile(content=content) as mocked:
master_proc = subprocess.Popen(
[
"locust",
"-f",
mocked.file_path,
"--master",
"--headless",
"--expect-workers",
"2",
],
stdout=PIPE,
stderr=PIPE,
text=True,
)

worker_parent_proc = subprocess.Popen(
[
"locust",
"-f",
mocked.file_path,
"--worker",
"--processes",
"2",
"--headless",
],
stdout=PIPE,
stderr=PIPE,
text=True,
)
gevent.sleep(1)
master_proc.kill()
master_proc.wait()
try:
_, worker_stderr = worker_parent_proc.communicate(timeout=7)
except Exception:
worker_parent_proc.kill()
_, worker_stderr = worker_parent_proc.communicate()
assert False, f"worker never finished: {worker_stderr}"

self.assertNotIn("Traceback", worker_stderr)
self.assertIn("Didn't get heartbeat from master in over ", worker_stderr)

0 comments on commit b930a90

Please sign in to comment.