Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add params with type str to support dynamic custom runtime parameters #2764

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion locust/dispatch.py
Original file line number Diff line number Diff line change
@@ -94,6 +94,8 @@ def __init__(self, worker_nodes: list[WorkerNode], user_classes: list[type[User]

self._spawn_rate: float = None

self._params: str = ""

self._user_count_per_dispatch_iteration: int = None

self._wait_between_dispatch: float = None
@@ -192,13 +194,14 @@ def _dispatcher(self) -> Generator[dict[str, dict[str, int]], None, None]:
self._dispatch_in_progress = False

def new_dispatch(
self, target_user_count: int, spawn_rate: float, user_classes: list[type[User]] | None = None
self, target_user_count: int, spawn_rate: float, params: str = None, user_classes: list[type[User]] | None = None
) -> None:
"""
Initialize a new dispatch cycle.

:param target_user_count: The desired user count at the end of the dispatch cycle
:param spawn_rate: The spawn rate
:param params: The optional parameters
:param user_classes: The user classes to be used for the new dispatch
"""
if user_classes is not None and self._user_classes != sorted(user_classes, key=attrgetter("__name__")):
@@ -209,6 +212,8 @@ def new_dispatch(

self._spawn_rate = spawn_rate

self._params = params

self._user_count_per_dispatch_iteration = max(1, math.floor(self._spawn_rate))

self._wait_between_dispatch = self._user_count_per_dispatch_iteration / self._spawn_rate
3 changes: 2 additions & 1 deletion locust/main.py
Original file line number Diff line number Diff line change
@@ -49,6 +49,7 @@
COMMON_OPTIONS = {
"num_users": "users",
"spawn_rate": "spawn-rate",
"params": "params",
"run_time": "run-time",
}

@@ -567,7 +568,7 @@ def start_automatic_run():
finally:
stop_and_optionally_quit()
else:
headless_master_greenlet = gevent.spawn(runner.start, options.num_users, options.spawn_rate)
headless_master_greenlet = gevent.spawn(runner.start, options.num_users, options.spawn_rate, options.params)
headless_master_greenlet.link_exception(greenlet_exception_handler)

if options.run_time:
42 changes: 25 additions & 17 deletions locust/runners.py
Original file line number Diff line number Diff line change
@@ -118,6 +118,9 @@ def __init__(self, environment: Environment) -> None:
self.target_user_classes_count: dict[str, int] = {}
# target_user_count is set before the ramp-up/ramp-down occurs.
self.target_user_count: int = 0

self.params: str = ""

self.custom_messages: dict[str, tuple[Callable, bool]] = {}

self._users_dispatcher: UsersDispatcher | None = None
@@ -219,11 +222,11 @@ def spawn_users(self, user_classes_spawn_count: dict[str, int], wait: bool = Fal
f"Spawning additional {json.dumps(user_classes_spawn_count)} ({json.dumps(self.user_classes_count)} already running)..."
)

def spawn(user_class: str, spawn_count: int) -> list[User]:
def spawn(user_class: str, spawn_count: int, params: str = None) -> list[User]:
n = 0
new_users: list[User] = []
while n < spawn_count:
new_user = self.user_classes_by_name[user_class](self.environment)
new_user = self.user_classes_by_name[user_class](self.environment, params)
assert hasattr(
new_user, "environment"
), f"Attribute 'environment' is missing on user {user_class}. Perhaps you defined your own __init__ and forgot to call the base constructor? (super().__init__(*args, **kwargs))"
@@ -237,7 +240,7 @@ def spawn(user_class: str, spawn_count: int) -> list[User]:

new_users: list[User] = []
for user_class, spawn_count in user_classes_spawn_count.items():
new_users += spawn(user_class, spawn_count)
new_users += spawn(user_class, spawn_count, self.params)

if wait:
self.user_greenlets.join()
@@ -459,12 +462,13 @@ def on_user_error(user_instance, exception, tb):

self.environment.events.user_error.add_listener(on_user_error)

def _start(self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: list | None = None) -> None:
def _start(self, user_count: int, spawn_rate: float, params: str = None, wait: bool = False, user_classes: list | None = None) -> None:
"""
Start running a load test

:param user_count: Total number of users to start
:param spawn_rate: Number of users to spawn per second
:param params: Optional parameters
:param wait: If True calls to this method will block until all users are spawned.
If False (the default), a greenlet that spawns the users will be
started and the call to this method will return immediately.
@@ -473,6 +477,8 @@ def _start(self, user_count: int, spawn_rate: float, wait: bool = False, user_cl
"""
self.target_user_count = user_count

self.params = params

if self.state != STATE_RUNNING and self.state != STATE_SPAWNING:
self.stats.clear_all()
self.exceptions = {}
@@ -498,7 +504,7 @@ def _start(self, user_count: int, spawn_rate: float, wait: bool = False, user_cl

logger.info("Ramping to %d users at a rate of %.2f per second" % (user_count, spawn_rate))

self._users_dispatcher.new_dispatch(user_count, spawn_rate, user_classes)
self._users_dispatcher.new_dispatch(user_count, spawn_rate, params, user_classes)

try:
for dispatched_users in self._users_dispatcher:
@@ -541,7 +547,7 @@ def _start(self, user_count: int, spawn_rate: float, wait: bool = False, user_cl
self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values()))

def start(
self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: list[type[User]] | None = None
self, user_count: int, spawn_rate: float, params=None, wait: bool = False, user_classes: list[type[User]] | None = None
) -> None:
if spawn_rate > 100:
logger.warning(
@@ -552,7 +558,7 @@ def start(
# kill existing spawning_greenlet before we start a new one
self.spawning_greenlet.kill(block=True)
self.spawning_greenlet = self.greenlet.spawn(
lambda: self._start(user_count, spawn_rate, wait=wait, user_classes=user_classes)
lambda: self._start(user_count, spawn_rate, params=params, wait=wait, user_classes=user_classes)
)
self.spawning_greenlet.link_exception(greenlet_exception_handler)

@@ -732,12 +738,14 @@ def cpu_log_warning(self) -> bool:
return warning_emitted

def start(
self, user_count: int, spawn_rate: float, wait=False, user_classes: list[type[User]] | None = None
self, user_count: int, spawn_rate: float, params=None, wait=False, user_classes: list[type[User]] | None = None
) -> None:
self.spawning_completed = False

self.target_user_count = user_count

self.params = params

num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning)
if not num_workers:
logger.warning("You can't start a distributed test before at least one worker processes has connected")
@@ -776,7 +784,7 @@ def start(
self.update_state(STATE_SPAWNING)

self._users_dispatcher.new_dispatch(
target_user_count=user_count, spawn_rate=spawn_rate, user_classes=user_classes
target_user_count=user_count, spawn_rate=spawn_rate, params=params, user_classes=user_classes
)

try:
@@ -939,7 +947,7 @@ def heartbeat_worker(self) -> NoReturn:
if self._users_dispatcher is not None:
self._users_dispatcher.remove_worker(client)
if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed:
self.start(self.target_user_count, self.spawn_rate)
self.start(self.target_user_count, self.spawn_rate, self.params)
if self.worker_count <= 0:
logger.info("The last worker went missing, stopping test.")
self.stop()
@@ -956,7 +964,7 @@ def heartbeat_worker(self) -> NoReturn:
# _users_dispatcher is set to none so that during redistribution the dead clients are not picked, alternative is to call self.stop() before start
self._users_dispatcher = None
# trigger redistribution after missing cclient removal
self.start(user_count=self.target_user_count, spawn_rate=self.spawn_rate)
self.start(user_count=self.target_user_count, spawn_rate=self.spawn_rate, params=self.params)

def reset_connection(self) -> None:
logger.info("Resetting RPC server and all worker connections.")
@@ -1027,12 +1035,12 @@ def client_listener(self) -> NoReturn:
self._users_dispatcher.add_worker(worker_node=self.clients[client_id])
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
self.start(self.target_user_count, self.spawn_rate, self.params)
logger.info(
f"Worker {client_id} (index {self.get_worker_index(client_id)}) reported as ready. {len(self.clients.ready + self.clients.running + self.clients.spawning)} workers connected."
)
if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed:
self.start(self.target_user_count, self.spawn_rate)
self.start(self.target_user_count, self.spawn_rate, self.params)
# emit a warning if the worker's clock seem to be out of sync with our clock
# if abs(time() - msg.data["time"]) > 5.0:
# warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
@@ -1076,7 +1084,7 @@ def client_listener(self) -> NoReturn:
self._users_dispatcher.remove_worker(client)
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
self.start(self.target_user_count, self.spawn_rate, self.params)
logger.info(
f"Worker {msg.node_id} (index {self.get_worker_index(client_id)}) reported that it has stopped, removing from running workers"
)
@@ -1091,7 +1099,7 @@ def client_listener(self) -> NoReturn:
self._users_dispatcher.add_worker(worker_node=c)
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
self.start(self.target_user_count, self.spawn_rate, self.params)
c.state = client_state
c.cpu_usage = msg.data["current_cpu_usage"]
if not c.cpu_warning_emitted and c.cpu_usage > 90:
@@ -1127,7 +1135,7 @@ def client_listener(self) -> NoReturn:
self._users_dispatcher.remove_worker(client)
if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING:
# TODO: Test this situation
self.start(self.target_user_count, self.spawn_rate)
self.start(self.target_user_count, self.spawn_rate, self.params)
logger.info(
f"Worker {msg.node_id!r} (index {self.get_worker_index(msg.node_id)}) quit. {len(self.clients.ready)} workers ready."
)
@@ -1258,7 +1266,7 @@ def spawning_complete(self, user_count):
self.worker_state = STATE_RUNNING

def start(
self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: list[type[User]] | None = None
self, user_count: int, spawn_rate: float, params: str = None, wait: bool = False, user_classes: list[type[User]] | None = None
) -> None:
raise NotImplementedError("use start_worker")

5 changes: 4 additions & 1 deletion locust/web.py
Original file line number Diff line number Diff line change
@@ -222,6 +222,9 @@ def swarm() -> Response:
elif key == "spawn_rate":
spawn_rate = float(value)
parsed_options_dict[key] = spawn_rate
elif key == "params":
params = value
parsed_options_dict[key] = params
elif key == "host":
# Replace < > to guard against XSS
environment.host = str(request.form["host"]).replace("<", "").replace(">", "")
@@ -265,7 +268,7 @@ def swarm() -> Response:
self._swarm_greenlet = None

if environment.runner is not None:
self._swarm_greenlet = gevent.spawn(environment.runner.start, user_count, spawn_rate)
self._swarm_greenlet = gevent.spawn(environment.runner.start, user_count, spawn_rate, params)
self._swarm_greenlet.link_exception(greenlet_exception_handler)
response_data = {
"success": True,