diff --git a/locust/dispatch.py b/locust/dispatch.py index 5542f8a117..6eb52abecf 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -94,6 +94,8 @@ def __init__(self, worker_nodes: list[WorkerNode], user_classes: list[type[User] self._spawn_rate: float = None + self._params: str = "" + self._user_count_per_dispatch_iteration: int = None self._wait_between_dispatch: float = None @@ -192,13 +194,14 @@ def _dispatcher(self) -> Generator[dict[str, dict[str, int]], None, None]: self._dispatch_in_progress = False def new_dispatch( - self, target_user_count: int, spawn_rate: float, user_classes: list[type[User]] | None = None + self, target_user_count: int, spawn_rate: float, params: str = None, user_classes: list[type[User]] | None = None ) -> None: """ Initialize a new dispatch cycle. :param target_user_count: The desired user count at the end of the dispatch cycle :param spawn_rate: The spawn rate + :param params: The optional parameters :param user_classes: The user classes to be used for the new dispatch """ if user_classes is not None and self._user_classes != sorted(user_classes, key=attrgetter("__name__")): @@ -209,6 +212,8 @@ def new_dispatch( self._spawn_rate = spawn_rate + self._params = params + self._user_count_per_dispatch_iteration = max(1, math.floor(self._spawn_rate)) self._wait_between_dispatch = self._user_count_per_dispatch_iteration / self._spawn_rate diff --git a/locust/main.py b/locust/main.py index 1644616497..a689f78c51 100644 --- a/locust/main.py +++ b/locust/main.py @@ -49,6 +49,7 @@ COMMON_OPTIONS = { "num_users": "users", "spawn_rate": "spawn-rate", + "params": "params", "run_time": "run-time", } @@ -567,7 +568,7 @@ def start_automatic_run(): finally: stop_and_optionally_quit() else: - headless_master_greenlet = gevent.spawn(runner.start, options.num_users, options.spawn_rate) + headless_master_greenlet = gevent.spawn(runner.start, options.num_users, options.spawn_rate, options.params) headless_master_greenlet.link_exception(greenlet_exception_handler) if options.run_time: diff --git a/locust/runners.py b/locust/runners.py index 4d85da3f64..b5326bb40c 100644 --- a/locust/runners.py +++ b/locust/runners.py @@ -118,6 +118,9 @@ def __init__(self, environment: Environment) -> None: self.target_user_classes_count: dict[str, int] = {} # target_user_count is set before the ramp-up/ramp-down occurs. self.target_user_count: int = 0 + + self.params: str = "" + self.custom_messages: dict[str, tuple[Callable, bool]] = {} self._users_dispatcher: UsersDispatcher | None = None @@ -219,11 +222,11 @@ def spawn_users(self, user_classes_spawn_count: dict[str, int], wait: bool = Fal f"Spawning additional {json.dumps(user_classes_spawn_count)} ({json.dumps(self.user_classes_count)} already running)..." ) - def spawn(user_class: str, spawn_count: int) -> list[User]: + def spawn(user_class: str, spawn_count: int, params: str = None) -> list[User]: n = 0 new_users: list[User] = [] while n < spawn_count: - new_user = self.user_classes_by_name[user_class](self.environment) + new_user = self.user_classes_by_name[user_class](self.environment, params) assert hasattr( new_user, "environment" ), f"Attribute 'environment' is missing on user {user_class}. Perhaps you defined your own __init__ and forgot to call the base constructor? (super().__init__(*args, **kwargs))" @@ -237,7 +240,7 @@ def spawn(user_class: str, spawn_count: int) -> list[User]: new_users: list[User] = [] for user_class, spawn_count in user_classes_spawn_count.items(): - new_users += spawn(user_class, spawn_count) + new_users += spawn(user_class, spawn_count, self.params) if wait: self.user_greenlets.join() @@ -459,12 +462,13 @@ def on_user_error(user_instance, exception, tb): self.environment.events.user_error.add_listener(on_user_error) - def _start(self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: list | None = None) -> None: + def _start(self, user_count: int, spawn_rate: float, params: str = None, wait: bool = False, user_classes: list | None = None) -> None: """ Start running a load test :param user_count: Total number of users to start :param spawn_rate: Number of users to spawn per second + :param params: Optional parameters :param wait: If True calls to this method will block until all users are spawned. If False (the default), a greenlet that spawns the users will be started and the call to this method will return immediately. @@ -473,6 +477,8 @@ def _start(self, user_count: int, spawn_rate: float, wait: bool = False, user_cl """ self.target_user_count = user_count + self.params = params + if self.state != STATE_RUNNING and self.state != STATE_SPAWNING: self.stats.clear_all() self.exceptions = {} @@ -498,7 +504,7 @@ def _start(self, user_count: int, spawn_rate: float, wait: bool = False, user_cl logger.info("Ramping to %d users at a rate of %.2f per second" % (user_count, spawn_rate)) - self._users_dispatcher.new_dispatch(user_count, spawn_rate, user_classes) + self._users_dispatcher.new_dispatch(user_count, spawn_rate, params, user_classes) try: for dispatched_users in self._users_dispatcher: @@ -541,7 +547,7 @@ def _start(self, user_count: int, spawn_rate: float, wait: bool = False, user_cl self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values())) def start( - self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: list[type[User]] | None = None + self, user_count: int, spawn_rate: float, params=None, wait: bool = False, user_classes: list[type[User]] | None = None ) -> None: if spawn_rate > 100: logger.warning( @@ -552,7 +558,7 @@ def start( # kill existing spawning_greenlet before we start a new one self.spawning_greenlet.kill(block=True) self.spawning_greenlet = self.greenlet.spawn( - lambda: self._start(user_count, spawn_rate, wait=wait, user_classes=user_classes) + lambda: self._start(user_count, spawn_rate, params=params, wait=wait, user_classes=user_classes) ) self.spawning_greenlet.link_exception(greenlet_exception_handler) @@ -732,12 +738,14 @@ def cpu_log_warning(self) -> bool: return warning_emitted def start( - self, user_count: int, spawn_rate: float, wait=False, user_classes: list[type[User]] | None = None + self, user_count: int, spawn_rate: float, params=None, wait=False, user_classes: list[type[User]] | None = None ) -> None: self.spawning_completed = False self.target_user_count = user_count + self.params = params + num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning) if not num_workers: logger.warning("You can't start a distributed test before at least one worker processes has connected") @@ -776,7 +784,7 @@ def start( self.update_state(STATE_SPAWNING) self._users_dispatcher.new_dispatch( - target_user_count=user_count, spawn_rate=spawn_rate, user_classes=user_classes + target_user_count=user_count, spawn_rate=spawn_rate, params=params, user_classes=user_classes ) try: @@ -939,7 +947,7 @@ def heartbeat_worker(self) -> NoReturn: if self._users_dispatcher is not None: self._users_dispatcher.remove_worker(client) if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed: - self.start(self.target_user_count, self.spawn_rate) + self.start(self.target_user_count, self.spawn_rate, self.params) if self.worker_count <= 0: logger.info("The last worker went missing, stopping test.") self.stop() @@ -956,7 +964,7 @@ def heartbeat_worker(self) -> NoReturn: # _users_dispatcher is set to none so that during redistribution the dead clients are not picked, alternative is to call self.stop() before start self._users_dispatcher = None # trigger redistribution after missing cclient removal - self.start(user_count=self.target_user_count, spawn_rate=self.spawn_rate) + self.start(user_count=self.target_user_count, spawn_rate=self.spawn_rate, params=self.params) def reset_connection(self) -> None: logger.info("Resetting RPC server and all worker connections.") @@ -1027,12 +1035,12 @@ def client_listener(self) -> NoReturn: self._users_dispatcher.add_worker(worker_node=self.clients[client_id]) if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING: # TODO: Test this situation - self.start(self.target_user_count, self.spawn_rate) + self.start(self.target_user_count, self.spawn_rate, self.params) logger.info( f"Worker {client_id} (index {self.get_worker_index(client_id)}) reported as ready. {len(self.clients.ready + self.clients.running + self.clients.spawning)} workers connected." ) if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed: - self.start(self.target_user_count, self.spawn_rate) + self.start(self.target_user_count, self.spawn_rate, self.params) # emit a warning if the worker's clock seem to be out of sync with our clock # if abs(time() - msg.data["time"]) > 5.0: # warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.") @@ -1076,7 +1084,7 @@ def client_listener(self) -> NoReturn: self._users_dispatcher.remove_worker(client) if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING: # TODO: Test this situation - self.start(self.target_user_count, self.spawn_rate) + self.start(self.target_user_count, self.spawn_rate, self.params) logger.info( f"Worker {msg.node_id} (index {self.get_worker_index(client_id)}) reported that it has stopped, removing from running workers" ) @@ -1091,7 +1099,7 @@ def client_listener(self) -> NoReturn: self._users_dispatcher.add_worker(worker_node=c) if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING: # TODO: Test this situation - self.start(self.target_user_count, self.spawn_rate) + self.start(self.target_user_count, self.spawn_rate, self.params) c.state = client_state c.cpu_usage = msg.data["current_cpu_usage"] if not c.cpu_warning_emitted and c.cpu_usage > 90: @@ -1127,7 +1135,7 @@ def client_listener(self) -> NoReturn: self._users_dispatcher.remove_worker(client) if not self._users_dispatcher.dispatch_in_progress and self.state == STATE_RUNNING: # TODO: Test this situation - self.start(self.target_user_count, self.spawn_rate) + self.start(self.target_user_count, self.spawn_rate, self.params) logger.info( f"Worker {msg.node_id!r} (index {self.get_worker_index(msg.node_id)}) quit. {len(self.clients.ready)} workers ready." ) @@ -1258,7 +1266,7 @@ def spawning_complete(self, user_count): self.worker_state = STATE_RUNNING def start( - self, user_count: int, spawn_rate: float, wait: bool = False, user_classes: list[type[User]] | None = None + self, user_count: int, spawn_rate: float, params: str = None, wait: bool = False, user_classes: list[type[User]] | None = None ) -> None: raise NotImplementedError("use start_worker") diff --git a/locust/web.py b/locust/web.py index c3d16367fb..d6373cd83d 100644 --- a/locust/web.py +++ b/locust/web.py @@ -222,6 +222,9 @@ def swarm() -> Response: elif key == "spawn_rate": spawn_rate = float(value) parsed_options_dict[key] = spawn_rate + elif key == "params": + params = value + parsed_options_dict[key] = params elif key == "host": # Replace < > to guard against XSS environment.host = str(request.form["host"]).replace("<", "").replace(">", "") @@ -265,7 +268,7 @@ def swarm() -> Response: self._swarm_greenlet = None if environment.runner is not None: - self._swarm_greenlet = gevent.spawn(environment.runner.start, user_count, spawn_rate) + self._swarm_greenlet = gevent.spawn(environment.runner.start, user_count, spawn_rate, params) self._swarm_greenlet.link_exception(greenlet_exception_handler) response_data = { "success": True,