Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of the dispatch logic to improve performance #1809

Merged
merged 26 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
694e83e
Optimization of dispatch
mboutet Jul 7, 2021
59f77db
WIP on new dispatch code + distribution refactor
mboutet Jul 7, 2021
5c996b3
[WIP] Remove unused code + remove distribution logic
mboutet Jul 7, 2021
be8e004
Comment print in dispatcher
mboutet Jul 7, 2021
4868c9a
Include `roundrobin` in the dependencies
mboutet Jul 7, 2021
86b7923
Change wording in TODO
mboutet Jul 7, 2021
3036c62
Simplify if-elif-else
mboutet Jul 7, 2021
dce5f30
Improve handling of ramp-down
mboutet Jul 8, 2021
73c8f92
Remove commented code
mboutet Jul 8, 2021
9f84e78
Handle disconnecting/connecting workers
mboutet Jul 9, 2021
7c4bd3f
Handle initialized users dispatcher when worker goes missing
mboutet Jul 9, 2021
a47963b
Fix ramp-down when spawn rate greater than number of users to stop
mboutet Jul 9, 2021
fc6f886
Improve perf of `_user_gen` by using `itertools.cycle`
mboutet Jul 9, 2021
b7c2d3b
Remove TODOs that are no longer applicable
mboutet Jul 9, 2021
32ba957
Update some tests to work with refactor
mboutet Jul 9, 2021
0309453
Implement more test cases for dispatch
mboutet Jul 9, 2021
fd14a0b
Perf improvement for dispatch
mboutet Jul 9, 2021
b9c5dd4
Small script to benchmark dispatcher
mboutet Jul 9, 2021
f9adde1
Some scripts I use to manually test locust locally
mboutet Jul 9, 2021
72ed364
Improve benchmark script
mboutet Jul 11, 2021
a0360e2
Small perf improvement in dispatch
mboutet Jul 11, 2021
15b76a8
Add TODO
mboutet Jul 11, 2021
6a8130c
Update tests to make them pass
mboutet Jul 11, 2021
06d8498
Relax pass criteria
mboutet Jul 11, 2021
0c1bc35
Ensure there's at least one user class when instantiating the dispatcher
mboutet Jul 11, 2021
55f0a75
Instantiating the dispatcher in start for the LocalRunner
mboutet Jul 11, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
632 changes: 632 additions & 0 deletions benchmarks/dispatch.py

Large diffs are not rendered by default.

691 changes: 192 additions & 499 deletions locust/dispatch.py

Large diffs are not rendered by default.

115 changes: 0 additions & 115 deletions locust/distribution.py

This file was deleted.

126 changes: 89 additions & 37 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
Dict,
Iterator,
List,
Union,
ValuesView,
)
from uuid import uuid4
Expand All @@ -30,7 +31,6 @@
from . import User
from locust import __version__
from .dispatch import UsersDispatcher
from .distribution import weight_users
from .exception import RPCError
from .log import greenlet_exception_logger
from .rpc import (
Expand Down Expand Up @@ -91,6 +91,12 @@ def __init__(self, environment):
self.target_user_classes_count: Dict[str, int] = {}
self.custom_messages = {}

# Only when running in standalone mode (non-distributed)
self._local_worker_node = WorkerNode(id="local")
self._local_worker_node.user_classes_count = self.user_classes_count

self._users_dispatcher = None

# set up event listeners for recording requests
def on_request_success(request_type, name, response_time, response_length, **_kwargs):
self.stats.log_request(request_type, name, response_time, response_length)
Expand Down Expand Up @@ -299,25 +305,23 @@ def start(self, user_count: int, spawn_rate: float, wait: bool = False):
if self.environment.host is not None:
user_class.host = self.environment.host

self.target_user_classes_count = weight_users(self.user_classes, user_count)

local_worker_node = WorkerNode(id="local")
local_worker_node.user_classes_count = self.user_classes_count

if self.state != STATE_INIT and self.state != STATE_STOPPED:
self.update_state(STATE_SPAWNING)

if self._users_dispatcher is None:
self._users_dispatcher = UsersDispatcher(
worker_nodes=[self._local_worker_node], user_classes=self.user_classes
)

logger.info("Ramping to %d users using a %.2f spawn rate" % (user_count, spawn_rate))
mboutet marked this conversation as resolved.
Show resolved Hide resolved

self._users_dispatcher.new_dispatch(user_count, spawn_rate)

try:
for dispatched_users in UsersDispatcher(
worker_nodes=[local_worker_node],
user_classes_count=self.target_user_classes_count,
spawn_rate=spawn_rate,
):
for dispatched_users in self._users_dispatcher:
user_classes_spawn_count = {}
user_classes_stop_count = {}
user_classes_count = dispatched_users[local_worker_node.id]
user_classes_count = dispatched_users[self._local_worker_node.id]
logger.debug("Ramping to %s" % _format_user_classes_count_for_log(user_classes_count))
for user_class, user_class_count in user_classes_count.items():
if self.user_classes_count[user_class] > user_class_count:
Expand All @@ -335,7 +339,10 @@ def start(self, user_count: int, spawn_rate: float, wait: bool = False):
self.spawn_users(user_classes_spawn_count, wait)
self.stop_users(user_classes_stop_count)

self._local_worker_node.user_classes_count = next(iter(dispatched_users.values()))

except KeyboardInterrupt:
# TODO: Find a cleaner way to handle that
# We need to catch keyboard interrupt. Otherwise, if KeyboardInterrupt is received while in
# a gevent.sleep inside the dispatch_users function, locust won't gracefully shutdown.
self.quit()
Expand Down Expand Up @@ -503,6 +510,7 @@ def send_message(self, msg_type, data=None):
class DistributedRunner(Runner):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._local_worker_node = None
setup_distributed_stats_event_listeners(self.environment.events, self.stats)


Expand All @@ -513,6 +521,7 @@ def __init__(self, id: str, state=STATE_INIT, heartbeat_liveness=HEARTBEAT_LIVEN
self.heartbeat = heartbeat_liveness
self.cpu_usage = 0
self.cpu_warning_emitted = False
# The reported users running on the worker
self.user_classes_count: Dict[str, int] = {}

@property
Expand Down Expand Up @@ -600,6 +609,8 @@ def __init__(self, environment, master_bind_host, master_bind_port):
else:
raise

self._users_dispatcher: Union[UsersDispatcher, None] = None

self.greenlet.spawn(self.heartbeat_worker).link_exception(greenlet_exception_handler)
self.greenlet.spawn(self.client_listener).link_exception(greenlet_exception_handler)

Expand Down Expand Up @@ -642,10 +653,13 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:
if self.environment.host is not None:
user_class.host = self.environment.host

self.target_user_classes_count = weight_users(self.user_classes, user_count)

self.spawn_rate = spawn_rate

if self._users_dispatcher is None:
self._users_dispatcher = UsersDispatcher(
worker_nodes=list(self.clients.values()), user_classes=self.user_classes
)

logger.info(
"Sending spawn jobs of %d users at %.2f spawn rate to %d ready clients"
% (user_count, spawn_rate, num_workers)
Expand Down Expand Up @@ -678,12 +692,12 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:

mboutet marked this conversation as resolved.
Show resolved Hide resolved
self.update_state(STATE_SPAWNING)

self._users_dispatcher.new_dispatch(target_user_count=user_count, spawn_rate=spawn_rate)

try:
for dispatched_users in UsersDispatcher(
worker_nodes=self.clients.ready + self.clients.running + self.clients.spawning,
user_classes_count=self.target_user_classes_count,
spawn_rate=spawn_rate,
):
dispatched_users = None
mboutet marked this conversation as resolved.
Show resolved Hide resolved

for dispatched_users in self._users_dispatcher:
dispatch_greenlets = Group()
for worker_node_id, worker_user_classes_count in dispatched_users.items():
data = {
Expand Down Expand Up @@ -711,7 +725,11 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:
"Currently spawned users: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count)
)

assert dispatched_users is not None
mboutet marked this conversation as resolved.
Show resolved Hide resolved
self.target_user_classes_count = _aggregate_dispatched_users(dispatched_users)

except KeyboardInterrupt:
# TODO: Find a cleaner way to handle that
# We need to catch keyboard interrupt. Otherwise, if KeyboardInterrupt is received while in
# a gevent.sleep inside the dispatch_users function, locust won't gracefully shutdown.
self.quit()
Expand Down Expand Up @@ -760,7 +778,7 @@ def _wait_for_workers_report_after_ramp_up(self) -> float:

assert False, "not supposed to reach that"

def stop(self):
mboutet marked this conversation as resolved.
Show resolved Hide resolved
def stop(self, send_stop_to_client: bool = True):
mboutet marked this conversation as resolved.
Show resolved Hide resolved
if self.state not in [STATE_INIT, STATE_STOPPED, STATE_STOPPING]:
logger.debug("Stopping...")
self.update_state(STATE_STOPPING)
Expand All @@ -770,25 +788,28 @@ def stop(self):
self.shape_greenlet = None
self.shape_last_state = None

for client in self.clients.all:
logger.debug("Sending stop message to client %s" % client.id)
self.server.send_to_client(Message("stop", None, client.id))
self._users_dispatcher = None

# Give an additional 60s for all workers to stop
timeout = gevent.Timeout(self.environment.stop_timeout or 0 + 60)
timeout.start()
try:
while self.user_count != 0:
gevent.sleep(1)
except gevent.Timeout:
logger.error("Timeout waiting for all workers to stop")
finally:
timeout.cancel()
if send_stop_to_client:
for client in self.clients.all:
logger.debug("Sending stop message to client %s" % client.id)
self.server.send_to_client(Message("stop", None, client.id))

# Give an additional 60s for all workers to stop
timeout = gevent.Timeout(self.environment.stop_timeout or 0 + 60)
timeout.start()
try:
while self.user_count != 0:
gevent.sleep(1)
except gevent.Timeout:
logger.error("Timeout waiting for all workers to stop")
finally:
timeout.cancel()

self.environment.events.test_stop.fire(environment=self.environment)

def quit(self):
self.stop()
self.stop(send_stop_to_client=False)
logger.debug("Quitting...")
for client in self.clients.all:
logger.debug("Sending quit message to client %s" % (client.id))
Expand Down Expand Up @@ -816,6 +837,9 @@ def heartbeat_worker(self):
logger.info("Worker %s failed to send heartbeat, setting state to missing." % str(client.id))
client.state = STATE_MISSING
client.user_classes_count = {}
if self._users_dispatcher is not None:
cyberw marked this conversation as resolved.
Show resolved Hide resolved
self._users_dispatcher.remove_worker(client.id)
# TODO: If status is `STATE_RUNNING`, call self.start()
Copy link
Collaborator

@cyberw cyberw Jul 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets talk about this after merge

if self.worker_count <= 0:
logger.info("The last worker went missing, stopping test.")
self.stop()
Expand Down Expand Up @@ -852,18 +876,29 @@ def client_listener(self):
)
worker_node_id = msg.node_id
self.clients[worker_node_id] = WorkerNode(worker_node_id, heartbeat_liveness=HEARTBEAT_LIVENESS)
if self._users_dispatcher is not None:
self._users_dispatcher.add_worker(worker_node=self.clients[worker_node_id])
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
logger.info(
"Client %r reported as ready. Currently %i clients ready to swarm."
% (worker_node_id, len(self.clients.ready + self.clients.running + self.clients.spawning))
)
if self.state == STATE_RUNNING or self.state == STATE_SPAWNING:
# balance the load distribution when new client joins
self.start(self.target_user_count, self.spawn_rate)
# if self.state == STATE_RUNNING or self.state == STATE_SPAWNING:
# # TODO: Necessary now that UsersDispatcher handles that?
# # balance the load distribution when new client joins
# self.start(self.target_user_count, self.spawn_rate)
# emit a warning if the worker's clock seem to be out of sync with our clock
# if abs(time() - msg.data["time"]) > 5.0:
# warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
elif msg.type == "client_stopped":
del self.clients[msg.node_id]
if self._users_dispatcher is not None:
self._users_dispatcher.remove_worker(msg.node_id)
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
logger.info("Removing %s client from running clients" % (msg.node_id))
elif msg.type == "heartbeat":
if msg.node_id in self.clients:
Expand All @@ -877,6 +912,11 @@ def client_listener(self):
user_classes_count = msg.data.get("user_classes_count")
if user_classes_count:
c.user_classes_count = user_classes_count
if self._users_dispatcher is not None:
self._users_dispatcher.add_worker(worker_node=c)
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
c.state = client_state
c.cpu_usage = msg.data["current_cpu_usage"]
if not c.cpu_warning_emitted and c.cpu_usage > 90:
Expand All @@ -895,6 +935,11 @@ def client_listener(self):
elif msg.type == "quit":
if msg.node_id in self.clients:
del self.clients[msg.node_id]
if self._users_dispatcher is not None:
self._users_dispatcher.remove_worker(msg.node_id)
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
logger.info(
"Client %r quit. Currently %i clients connected." % (msg.node_id, len(self.clients.ready))
)
Expand Down Expand Up @@ -964,6 +1009,7 @@ def __init__(self, environment, master_host, master_port):
self.master_host = master_host
self.master_port = master_port
self.worker_cpu_warning_emitted = False
self._users_dispatcher = None
self.client = rpc.Client(master_host, master_port, self.client_id)
self.greenlet.spawn(self.heartbeat).link_exception(greenlet_exception_handler)
self.greenlet.spawn(self.worker).link_exception(greenlet_exception_handler)
Expand Down Expand Up @@ -1142,3 +1188,9 @@ def _format_user_classes_count_for_log(user_classes_count: Dict[str, int]) -> st
json.dumps(dict(sorted(user_classes_count.items(), key=itemgetter(0)))),
sum(user_classes_count.values()),
)


def _aggregate_dispatched_users(d: Dict[str, Dict[str, int]]) -> Dict[str, int]:
# TODO: Test it
user_classes = list(next(iter(d.values())).keys())
return {u: sum(d[u] for d in d.values()) for u in user_classes}
Loading