From a6e6ccbba8815aea2d50a0d996b41f899a584f81 Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Fri, 4 Jun 2021 13:31:41 -0400 Subject: [PATCH 1/6] Toggled test condition and fixed broken event handler --- locust/test/test_runners.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index d5b6ae3dba..5ba46e57a2 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -1352,7 +1352,7 @@ def the_task(self): test_start_run = [False] @environment.events.test_start.add_listener - def on_test_start(_environment, **kw): + def on_test_start(*args, **kw): test_start_run[0] = True worker = self.get_runner(environment=environment, user_classes=[MyTestUser]) @@ -1383,8 +1383,8 @@ def on_test_start(_environment, **kw): worker.user_greenlets.join() # check that locust user got to finish self.assertEqual(2, MyTestUser._test_state) - # make sure the test_start was never fired on the worker - self.assertFalse(test_start_run[0]) + # make sure the test_start was fired on the worker + self.assertTrue(test_start_run[0]) def test_worker_without_stop_timeout(self): class MyTestUser(User): From ed13b09e27d0667e05694fc7dd5ebc6b16d6edf6 Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Fri, 4 Jun 2021 13:58:17 -0400 Subject: [PATCH 2/6] Added event to base Runner.start() method --- locust/runners.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index 0b07e4be61..ea6b978e28 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -305,6 +305,7 @@ def start(self, user_count, spawn_rate, wait=False): self.cpu_warning_emitted = False self.worker_cpu_warning_emitted = False self.target_user_count = user_count + self.environment.events.test_start.fire(environment=self.environment) if self.state != STATE_INIT and self.state != STATE_STOPPED: logger.debug( @@ -407,10 +408,6 @@ def start(self, user_count, spawn_rate, wait=False): "Your selected spawn rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?" ) - if self.state != STATE_RUNNING and self.state != STATE_SPAWNING: - # if we're not already running we'll fire the test_start event - self.environment.events.test_start.fire(environment=self.environment) - if self.spawning_greenlet: # kill existing spawning_greenlet before we start a new one self.spawning_greenlet.kill(block=True) From 870ecda931d0435146999d440e24b562cb4f0500 Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Fri, 4 Jun 2021 15:34:25 -0400 Subject: [PATCH 3/6] added dedicated tests for worker event changes --- locust/test/test_runners.py | 161 +++++++++++++++++++++++++++++++++--- 1 file changed, 150 insertions(+), 11 deletions(-) diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index 5ba46e57a2..0cd088ebf8 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -537,16 +537,16 @@ def _(*args, **kwargs): # make sure users are killed self.assertEqual(0, worker.user_count) - # check the test_stop event was called one time in master and zero times in workder + # check the test_stop event was called one time in master and one time in worker self.assertEqual( 1, test_stop_count["master"], "The test_stop event was not called exactly one time in the master node", ) self.assertEqual( - 0, + 1, test_stop_count["worker"], - "The test_stop event was called in the worker node", + "The test_stop event was not called exactly one time in the worker node", ) def test_distributed_shape(self): @@ -1349,12 +1349,6 @@ def the_task(self): with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client: environment = Environment() - test_start_run = [False] - - @environment.events.test_start.add_listener - def on_test_start(*args, **kw): - test_start_run[0] = True - worker = self.get_runner(environment=environment, user_classes=[MyTestUser]) self.assertEqual(1, len(client.outbox)) self.assertEqual("client_ready", client.outbox[0].type) @@ -1383,8 +1377,6 @@ def on_test_start(*args, **kw): worker.user_greenlets.join() # check that locust user got to finish self.assertEqual(2, MyTestUser._test_state) - # make sure the test_start was fired on the worker - self.assertTrue(test_start_run[0]) def test_worker_without_stop_timeout(self): class MyTestUser(User): @@ -1470,6 +1462,153 @@ def my_task(self): self.assertEqual(9, len(worker.user_greenlets)) worker.quit() + def test_start_event(self): + class MyTestUser(User): + _test_state = 0 + + @task + def the_task(self): + MyTestUser._test_state = 1 + gevent.sleep(0.2) + MyTestUser._test_state = 2 + + with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client: + environment = Environment() + run_count = [0] + + @environment.events.test_start.add_listener + def on_test_start(*args, **kw): + run_count[0] += 1 + + worker = self.get_runner(environment=environment, user_classes=[MyTestUser]) + self.assertEqual(1, len(client.outbox)) + self.assertEqual("client_ready", client.outbox[0].type) + client.mocked_send( + Message( + "spawn", + { + "spawn_rate": 1, + "num_users": 1, + "host": "", + "stop_timeout": None, + }, + "dummy_client_id", + ) + ) + # wait for worker to spawn locusts + self.assertIn("spawning", [m.type for m in client.outbox]) + worker.spawning_greenlet.join() + self.assertEqual(1, len(worker.user_greenlets)) + self.assertEqual(1, run_count[0]) + + # check that locust has started running + gevent.sleep(0.01) + self.assertEqual(1, MyTestUser._test_state) + + # change number of users and check that test_start isn't fired again + client.mocked_send( + Message( + "spawn", + { + "spawn_rate": 1, + "num_users": 1, + "host": "", + "stop_timeout": None, + }, + "dummy_client_id", + ) + ) + self.assertEqual(1, run_count[0]) + + # stop and start to make sure test_start is fired again + client.mocked_send(Message("stop", None, "dummy_client_id")) + client.mocked_send( + Message( + "spawn", + { + "spawn_rate": 1, + "num_users": 1, + "host": "", + "stop_timeout": None, + }, + "dummy_client_id", + ) + ) + gevent.sleep(0.01) + self.assertEqual(2, run_count[0]) + + client.mocked_send(Message("stop", None, "dummy_client_id")) + + def test_stop_event(self): + class MyTestUser(User): + _test_state = 0 + + @task + def the_task(self): + MyTestUser._test_state = 1 + gevent.sleep(0.2) + MyTestUser._test_state = 2 + + with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client: + environment = Environment() + run_count = [0] + + @environment.events.test_stop.add_listener + def on_test_stop(*args, **kw): + run_count[0] += 1 + + worker = self.get_runner(environment=environment, user_classes=[MyTestUser]) + self.assertEqual(1, len(client.outbox)) + self.assertEqual("client_ready", client.outbox[0].type) + client.mocked_send( + Message( + "spawn", + { + "spawn_rate": 1, + "num_users": 1, + "host": "", + "stop_timeout": None, + }, + "dummy_client_id", + ) + ) + + # wait for worker to spawn locusts + self.assertIn("spawning", [m.type for m in client.outbox]) + worker.spawning_greenlet.join() + self.assertEqual(1, len(worker.user_greenlets)) + + # check that locust has started running + gevent.sleep(0.01) + self.assertEqual(1, MyTestUser._test_state) + + # stop and make sure test_stop is fired + client.mocked_send(Message("stop", None, "dummy_client_id")) + gevent.sleep(0.01) + self.assertEqual(1, run_count[0]) + + # stop while stopped and make sure the event isn't fired again + client.mocked_send(Message("stop", None, "dummy_client_id")) + gevent.sleep(0.01) + self.assertEqual(1, run_count[0]) + + # start and stop to check that the event is fired again + client.mocked_send( + Message( + "spawn", + { + "spawn_rate": 1, + "num_users": 1, + "host": "", + "stop_timeout": None, + }, + "dummy_client_id", + ) + ) + client.mocked_send(Message("stop", None, "dummy_client_id")) + gevent.sleep(0.01) + self.assertEqual(2, run_count[0]) + class TestMessageSerializing(unittest.TestCase): def test_message_serialize(self): From edd1bacd09afca5d13d2101fdf39c3637c0f906d Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Fri, 4 Jun 2021 15:35:23 -0400 Subject: [PATCH 4/6] Updated test_stop event --- locust/runners.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index ea6b978e28..87d20e5118 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -359,6 +359,8 @@ def stop(self): """ Stop a running load test by stopping all running users """ + if self.state == STATE_STOPPED: + return logger.debug("Stopping all users") self.update_state(STATE_CLEANUP) # if we are currently spawning users we need to kill the spawning greenlet first @@ -367,6 +369,7 @@ def stop(self): self.stop_users(self.user_count) self.update_state(STATE_STOPPED) self.cpu_log_warning() + self.environment.events.test_stop.fire(environment=self.environment) def quit(self): """ @@ -416,12 +419,6 @@ def start(self, user_count, spawn_rate, wait=False): ) self.spawning_greenlet.link_exception(greenlet_exception_handler) - def stop(self): - if self.state == STATE_STOPPED: - return - super().stop() - self.environment.events.test_stop.fire(environment=self.environment) - class DistributedRunner(Runner): def __init__(self, *args, **kwargs): From 54aede3cc58badada0081f4baf8f3bf4e55e23be Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Fri, 4 Jun 2021 16:13:49 -0400 Subject: [PATCH 5/6] Updated documentation --- docs/extending-locust.rst | 21 +++++++++++++++++++++ docs/writing-a-locustfile.rst | 2 -- locust/event.py | 8 +++----- 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/docs/extending-locust.rst b/docs/extending-locust.rst index 3db3696c03..0a2ce645c3 100644 --- a/docs/extending-locust.rst +++ b/docs/extending-locust.rst @@ -23,6 +23,27 @@ Here's an example on how to set up an event listener:: print(f"Successfully made a request to: {name}) print(f"The response was {response.text}") +When running locust in distributed mode, it may be useful to do some setup on worker nodes before running your tests. +You can check to ensure you aren't running on the master node by checking the type of the node's :py:attr:`runner `:: + + from locust import events + from locust.runners import MasterRunner + + @events.test_start.add_listener + def on_test_start(environment, **kwargs): + if not isinstance(environment.runner, MasterRunner): + print("Beginning test setup") + else + print("Started test from Master node") + + @events.test_stop.add_listener + def on_test_stop(environment, **kwargs): + if not isinstance(environment.runner, MasterRunner): + print("Cleaning up test data") + else + print("Stopped test from Master node") + + .. note:: To see all available events, please see :ref:`events`. diff --git a/docs/writing-a-locustfile.rst b/docs/writing-a-locustfile.rst index 33759778dd..e1b8a3db6a 100644 --- a/docs/writing-a-locustfile.rst +++ b/docs/writing-a-locustfile.rst @@ -283,8 +283,6 @@ events. You can set up listeners for these events at the module level of your lo def on_test_stop(environment, **kwargs): print("A new test is ending") -When running Locust distributed the ``test_start`` and ``test_stop`` events will only be fired in the master node. - init ---- diff --git a/locust/event.py b/locust/event.py index 9a1107c4b8..6bacbe1c30 100644 --- a/locust/event.py +++ b/locust/event.py @@ -179,15 +179,13 @@ class Events: test_start: EventHook """ - Fired when a new load test is started. It's not fired again if the number of - users change during a test. When running locust distributed the event is only fired - on the master node and not on each worker node. + Fired on each node when a new load test is started. It's not fired again if the number of + users change during a test. """ test_stop: EventHook """ - Fired when a load test is stopped. When running locust distributed the event - is only fired on the master node and not on each worker node. + Fired on each node when a load test is stopped. """ reset_stats: EventHook From 59936ac3306222153bc3aac52de97663e7d84c08 Mon Sep 17 00:00:00 2001 From: Nathan Beam Date: Mon, 12 Jul 2021 16:15:05 -0400 Subject: [PATCH 6/6] Fix test_start and test_stop issues with new code --- locust/runners.py | 7 +++---- locust/test/test_runners.py | 10 ++++++++++ 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/locust/runners.py b/locust/runners.py index 7e3cee905a..c8fc297299 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -489,7 +489,6 @@ def stop(self): if self.state == STATE_STOPPED: return super().stop() - self.environment.events.test_stop.fire(environment=self.environment) def send_message(self, msg_type, data=None): """ @@ -805,7 +804,6 @@ def stop(self, send_stop_to_client: bool = True): logger.error("Timeout waiting for all workers to stop") finally: timeout.cancel() - self.environment.events.test_stop.fire(environment=self.environment) def quit(self): @@ -1060,12 +1058,14 @@ def start_worker(self, user_classes_count: Dict[str, int], **kwargs): :param user_classes_count: Users to run """ self.target_user_classes_count = user_classes_count - if self.worker_state != STATE_RUNNING and self.worker_state != STATE_SPAWNING: self.stats.clear_all() self.exceptions = {} self.cpu_warning_emitted = False self.worker_cpu_warning_emitted = False + self.environment.events.test_start.fire(environment=self.environment) + + self.worker_state = STATE_SPAWNING for user_class in self.user_classes: if self.environment.host is not None: @@ -1123,7 +1123,6 @@ def worker(self): logger.error("RPCError found when receiving from master: %s" % (e)) continue if msg.type == "spawn": - self.worker_state = STATE_SPAWNING self.client.send(Message("spawning", None, self.client_id)) job = msg.data if job["timestamp"] <= last_received_spawn_timestamp: diff --git a/locust/test/test_runners.py b/locust/test/test_runners.py index f86be04bd7..73e08eec3a 100644 --- a/locust/test/test_runners.py +++ b/locust/test/test_runners.py @@ -2707,6 +2707,8 @@ def on_test_start(*args, **kw): Message( "spawn", { + "timestamp": 1605538585, + "user_classes_count": {"MyTestUser": 1}, "spawn_rate": 1, "num_users": 1, "host": "", @@ -2730,6 +2732,8 @@ def on_test_start(*args, **kw): Message( "spawn", { + "timestamp": 1605538586, + "user_classes_count": {"MyTestUser": 1}, "spawn_rate": 1, "num_users": 1, "host": "", @@ -2746,6 +2750,8 @@ def on_test_start(*args, **kw): Message( "spawn", { + "timestamp": 1605538587, + "user_classes_count": {"MyTestUser": 1}, "spawn_rate": 1, "num_users": 1, "host": "", @@ -2784,6 +2790,8 @@ def on_test_stop(*args, **kw): Message( "spawn", { + "timestamp": 1605538585, + "user_classes_count": {"MyTestUser": 1}, "spawn_rate": 1, "num_users": 1, "host": "", @@ -2817,6 +2825,8 @@ def on_test_stop(*args, **kw): Message( "spawn", { + "timestamp": 1605538586, + "user_classes_count": {"MyTestUser": 1}, "spawn_rate": 1, "num_users": 1, "host": "",