Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fire test_start and test_stop events on worker nodes #1777

Merged
merged 7 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/extending-locust.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,27 @@ Here's an example on how to set up an event listener::
print(f"Successfully made a request to: {name})
print(f"The response was {response.text}")

When running locust in distributed mode, it may be useful to do some setup on worker nodes before running your tests.
You can check to ensure you aren't running on the master node by checking the type of the node's :py:attr:`runner <locust.env.Environment.runner>`::

from locust import events
from locust.runners import MasterRunner

@events.test_start.add_listener
def on_test_start(environment, **kwargs):
if not isinstance(environment.runner, MasterRunner):
print("Beginning test setup")
else
print("Started test from Master node")

@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
if not isinstance(environment.runner, MasterRunner):
print("Cleaning up test data")
else
print("Stopped test from Master node")


.. note::

To see all available events, please see :ref:`events`.
Expand Down
2 changes: 0 additions & 2 deletions docs/writing-a-locustfile.rst
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,6 @@ events. You can set up listeners for these events at the module level of your lo
def on_test_stop(environment, **kwargs):
print("A new test is ending")

When running Locust distributed the ``test_start`` and ``test_stop`` events will only be fired in the master node.

init
----

Expand Down
8 changes: 3 additions & 5 deletions locust/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,13 @@ class Events:

test_start: EventHook
"""
Fired when a new load test is started. It's not fired again if the number of
users change during a test. When running locust distributed the event is only fired
on the master node and not on each worker node.
Fired on each node when a new load test is started. It's not fired again if the number of
users change during a test.
"""

test_stop: EventHook
"""
Fired when a load test is stopped. When running locust distributed the event
is only fired on the master node and not on each worker node.
Fired on each node when a load test is stopped.
"""

reset_stats: EventHook
Expand Down
15 changes: 7 additions & 8 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ def start(self, user_count: int, spawn_rate: float, wait: bool = False):
self.exceptions = {}
self.cpu_warning_emitted = False
self.worker_cpu_warning_emitted = False
self.environment.events.test_start.fire(environment=self.environment)

if wait and user_count - self.user_count > spawn_rate:
raise ValueError("wait is True but the amount of users to add is greater than the spawn rate")
Expand Down Expand Up @@ -400,6 +401,8 @@ def stop(self):
"""
Stop a running load test by stopping all running users
"""
if self.state == STATE_STOPPED:
return
logger.debug("Stopping all users")
self.update_state(STATE_CLEANUP)

Expand All @@ -420,6 +423,7 @@ def stop(self):
self.update_state(STATE_STOPPED)

self.cpu_log_warning()
self.environment.events.test_stop.fire(environment=self.environment)

def quit(self):
"""
Expand Down Expand Up @@ -473,10 +477,6 @@ def start(self, user_count: int, spawn_rate: float, wait: bool = False):
"Your selected spawn rate is very high (>100), and this is known to sometimes cause issues. Do you really need to ramp up that fast?"
)

if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:
# if we're not already running we'll fire the test_start event
self.environment.events.test_start.fire(environment=self.environment)

if self.spawning_greenlet:
# kill existing spawning_greenlet before we start a new one
self.spawning_greenlet.kill(block=True)
Expand All @@ -489,7 +489,6 @@ def stop(self):
if self.state == STATE_STOPPED:
return
super().stop()
self.environment.events.test_stop.fire(environment=self.environment)

def send_message(self, msg_type, data=None):
"""
Expand Down Expand Up @@ -805,7 +804,6 @@ def stop(self, send_stop_to_client: bool = True):
logger.error("Timeout waiting for all workers to stop")
finally:
timeout.cancel()

self.environment.events.test_stop.fire(environment=self.environment)

def quit(self):
Expand Down Expand Up @@ -1060,12 +1058,14 @@ def start_worker(self, user_classes_count: Dict[str, int], **kwargs):
:param user_classes_count: Users to run
"""
self.target_user_classes_count = user_classes_count

if self.worker_state != STATE_RUNNING and self.worker_state != STATE_SPAWNING:
self.stats.clear_all()
self.exceptions = {}
self.cpu_warning_emitted = False
self.worker_cpu_warning_emitted = False
self.environment.events.test_start.fire(environment=self.environment)

self.worker_state = STATE_SPAWNING

for user_class in self.user_classes:
if self.environment.host is not None:
Expand Down Expand Up @@ -1123,7 +1123,6 @@ def worker(self):
logger.error("RPCError found when receiving from master: %s" % (e))
continue
if msg.type == "spawn":
self.worker_state = STATE_SPAWNING
self.client.send(Message("spawning", None, self.client_id))
job = msg.data
if job["timestamp"] <= last_received_spawn_timestamp:
Expand Down
171 changes: 160 additions & 11 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,16 +702,16 @@ def _(*args, **kwargs):
# make sure users are killed
self.assertEqual(0, worker.user_count)

# check the test_stop event was called one time in master and zero times in workder
# check the test_stop event was called one time in master and one time in worker
self.assertEqual(
1,
test_stop_count["master"],
"The test_stop event was not called exactly one time in the master node",
)
self.assertEqual(
0,
1,
test_stop_count["worker"],
"The test_stop event was called in the worker node",
"The test_stop event was not called exactly one time in the worker node",
)

def test_distributed_shape(self):
Expand Down Expand Up @@ -2295,12 +2295,6 @@ def the_task(self):

with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:
environment = Environment()
test_start_run = [False]

@environment.events.test_start.add_listener
def on_test_start(_environment, **kw):
test_start_run[0] = True

worker = self.get_runner(environment=environment, user_classes=[MyTestUser])
self.assertEqual(1, len(client.outbox))
self.assertEqual("client_ready", client.outbox[0].type)
Expand Down Expand Up @@ -2328,8 +2322,6 @@ def on_test_start(_environment, **kw):
worker.user_greenlets.join()
# check that locust user got to finish
self.assertEqual(2, MyTestUser._test_state)
# make sure the test_start was never fired on the worker
self.assertFalse(test_start_run[0])

def test_worker_without_stop_timeout(self):
class MyTestUser(User):
Expand Down Expand Up @@ -2690,6 +2682,163 @@ def on_custom_msg(msg, **kw):
msg = self.mocked_log.warning[0]
self.assertIn("Unknown message type recieved", msg)

def test_start_event(self):
class MyTestUser(User):
_test_state = 0

@task
def the_task(self):
MyTestUser._test_state = 1
gevent.sleep(0.2)
MyTestUser._test_state = 2

with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:
environment = Environment()
run_count = [0]

@environment.events.test_start.add_listener
def on_test_start(*args, **kw):
run_count[0] += 1

worker = self.get_runner(environment=environment, user_classes=[MyTestUser])
self.assertEqual(1, len(client.outbox))
self.assertEqual("client_ready", client.outbox[0].type)
client.mocked_send(
Message(
"spawn",
{
"timestamp": 1605538585,
"user_classes_count": {"MyTestUser": 1},
"spawn_rate": 1,
"num_users": 1,
"host": "",
"stop_timeout": None,
},
"dummy_client_id",
)
)
# wait for worker to spawn locusts
self.assertIn("spawning", [m.type for m in client.outbox])
worker.spawning_greenlet.join()
self.assertEqual(1, len(worker.user_greenlets))
self.assertEqual(1, run_count[0])

# check that locust has started running
gevent.sleep(0.01)
self.assertEqual(1, MyTestUser._test_state)

# change number of users and check that test_start isn't fired again
client.mocked_send(
Message(
"spawn",
{
"timestamp": 1605538586,
"user_classes_count": {"MyTestUser": 1},
"spawn_rate": 1,
"num_users": 1,
"host": "",
"stop_timeout": None,
},
"dummy_client_id",
)
)
self.assertEqual(1, run_count[0])

# stop and start to make sure test_start is fired again
client.mocked_send(Message("stop", None, "dummy_client_id"))
client.mocked_send(
Message(
"spawn",
{
"timestamp": 1605538587,
"user_classes_count": {"MyTestUser": 1},
"spawn_rate": 1,
"num_users": 1,
"host": "",
"stop_timeout": None,
},
"dummy_client_id",
)
)
gevent.sleep(0.01)
self.assertEqual(2, run_count[0])

client.mocked_send(Message("stop", None, "dummy_client_id"))

def test_stop_event(self):
class MyTestUser(User):
_test_state = 0

@task
def the_task(self):
MyTestUser._test_state = 1
gevent.sleep(0.2)
MyTestUser._test_state = 2

with mock.patch("locust.rpc.rpc.Client", mocked_rpc()) as client:
environment = Environment()
run_count = [0]

@environment.events.test_stop.add_listener
def on_test_stop(*args, **kw):
run_count[0] += 1

worker = self.get_runner(environment=environment, user_classes=[MyTestUser])
self.assertEqual(1, len(client.outbox))
self.assertEqual("client_ready", client.outbox[0].type)
client.mocked_send(
Message(
"spawn",
{
"timestamp": 1605538585,
"user_classes_count": {"MyTestUser": 1},
"spawn_rate": 1,
"num_users": 1,
"host": "",
"stop_timeout": None,
},
"dummy_client_id",
)
)

# wait for worker to spawn locusts
self.assertIn("spawning", [m.type for m in client.outbox])
worker.spawning_greenlet.join()
self.assertEqual(1, len(worker.user_greenlets))

# check that locust has started running
gevent.sleep(0.01)
self.assertEqual(1, MyTestUser._test_state)

# stop and make sure test_stop is fired
client.mocked_send(Message("stop", None, "dummy_client_id"))
gevent.sleep(0.01)
self.assertEqual(1, run_count[0])

# stop while stopped and make sure the event isn't fired again
client.mocked_send(Message("stop", None, "dummy_client_id"))
gevent.sleep(0.01)
self.assertEqual(1, run_count[0])

# start and stop to check that the event is fired again
client.mocked_send(
Message(
"spawn",
{
"timestamp": 1605538586,
"user_classes_count": {"MyTestUser": 1},
"spawn_rate": 1,
"num_users": 1,
"host": "",
"stop_timeout": None,
},
"dummy_client_id",
)
)
client.mocked_send(Message("stop", None, "dummy_client_id"))
gevent.sleep(0.01)
self.assertEqual(2, run_count[0])


class TestMessageSerializing(unittest.TestCase):
def test_message_serialize(self):
Expand Down