From acbd1b32e85e695e83b15bb9ef7272ff17566d68 Mon Sep 17 00:00:00 2001 From: Andrew Dunstall Date: Sun, 8 May 2022 18:37:24 +0100 Subject: [PATCH] dispatch: rebalance users with a fixed count --- locust/dispatch.py | 8 ++ locust/test/test_dispatch.py | 149 +++++++++++++++++++++++++++++++++++ 2 files changed, 157 insertions(+) diff --git a/locust/dispatch.py b/locust/dispatch.py index 7e608fede9..6e49645ebb 100644 --- a/locust/dispatch.py +++ b/locust/dispatch.py @@ -221,6 +221,14 @@ def _prepare_rebalance(self) -> None: we started from 0 user. So, if we were currently running 500 users, then the `_distribute_users` will perform a fake ramp-up without any waiting and return the final distribution. """ + # Reset users before recalculating since the current users is used to calculate how many + # fixed users to add. + self._users_on_workers = { + worker_node.id: {user_class.__name__: 0 for user_class in self._user_classes} + for worker_node in self._worker_nodes + } + self._try_dispatch_fixed = True + users_on_workers, user_gen, worker_gen, active_users = self._distribute_users(self._current_user_count) self._users_on_workers = users_on_workers diff --git a/locust/test/test_dispatch.py b/locust/test/test_dispatch.py index 62a3cf1f19..d885c57567 100644 --- a/locust/test/test_dispatch.py +++ b/locust/test/test_dispatch.py @@ -2732,6 +2732,73 @@ class User3(User): self.assertFalse(users_dispatcher._rebalance) + def test_remove_worker_during_ramp_up_with_fixed_user(self): + class User1(User): + fixed_count = 2 + + class User2(User): + weight = 1 + + class User3(User): + weight = 1 + + user_classes = [User1, User2, User3] + + worker_nodes = [WorkerNode(str(i + 1)) for i in range(3)] + + users_dispatcher = UsersDispatcher(worker_nodes=worker_nodes, user_classes=user_classes) + + sleep_time = 0.2 # Speed-up test + + users_dispatcher.new_dispatch(target_user_count=9, spawn_rate=3) + users_dispatcher._wait_between_dispatch = sleep_time + + # Dispatch iteration 1 + ts = time.perf_counter() + dispatched_users = next(users_dispatcher) + delta = time.perf_counter() - ts + self.assertTrue(0 <= delta <= _TOLERANCE, delta) + self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 1, "User3": 0}) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 1) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 1) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 1) + + # Dispatch iteration 2 + ts = time.perf_counter() + dispatched_users = next(users_dispatcher) + delta = time.perf_counter() - ts + self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta) + self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2}) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 2) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2) + + self.assertFalse(users_dispatcher._rebalance) + + users_dispatcher.remove_worker(worker_nodes[1]) + + self.assertTrue(users_dispatcher._rebalance) + + # Re-balance + ts = time.perf_counter() + dispatched_users = next(users_dispatcher) + delta = time.perf_counter() - ts + self.assertTrue(0 <= delta <= _TOLERANCE, f"Expected re-balance dispatch to be instantaneous but got {delta}s") + self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2}) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 3) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3) + + self.assertFalse(users_dispatcher._rebalance) + + # Dispatch iteration 3 + ts = time.perf_counter() + dispatched_users = next(users_dispatcher) + delta = time.perf_counter() - ts + self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta) + self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 4, "User3": 3}) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 5) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4) + class TestAddWorker(unittest.TestCase): def test_add_worker_during_ramp_up(self): @@ -3176,6 +3243,88 @@ class User3(User): self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3) self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3) + def test_add_worker_during_ramp_up_with_fixed_user(self): + class User1(User): + fixed_count = 2 + + class User2(User): + weight = 1 + + class User3(User): + weight = 1 + + user_classes = [User1, User2, User3] + + worker_nodes = [ + WorkerNode("hostname1_worker1"), + WorkerNode("hostname1_worker2"), + WorkerNode("hostname2_worker1"), + ] + + users_dispatcher = UsersDispatcher(worker_nodes=[worker_nodes[0], worker_nodes[2]], user_classes=user_classes) + + sleep_time = 0.2 # Speed-up test + + users_dispatcher.new_dispatch(target_user_count=11, spawn_rate=3) + users_dispatcher._wait_between_dispatch = sleep_time + + # Dispatch iteration 1 + ts = time.perf_counter() + dispatched_users = next(users_dispatcher) + delta = time.perf_counter() - ts + self.assertTrue(0 <= delta <= _TOLERANCE, delta) + self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 1, "User3": 0}) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 2) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 1) + + # Dispatch iteration 2 + ts = time.perf_counter() + dispatched_users = next(users_dispatcher) + delta = time.perf_counter() - ts + self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta) + self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2}) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 3) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3) + + self.assertFalse(users_dispatcher._rebalance) + + users_dispatcher.add_worker(worker_nodes[1]) + + self.assertTrue(users_dispatcher._rebalance) + + # Re-balance + ts = time.perf_counter() + dispatched_users = next(users_dispatcher) + delta = time.perf_counter() - ts + self.assertTrue(0 <= delta <= _TOLERANCE, f"Expected re-balance dispatch to be instantaneous but got {delta}s") + self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 2, "User3": 2}) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 2) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 2) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 2) + + self.assertFalse(users_dispatcher._rebalance) + + # Dispatch iteration 3 + ts = time.perf_counter() + dispatched_users = next(users_dispatcher) + delta = time.perf_counter() - ts + self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta) + self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 4, "User3": 3}) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 3) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 3) + + # Dispatch iteration 4 + ts = time.perf_counter() + dispatched_users = next(users_dispatcher) + delta = time.perf_counter() - ts + self.assertTrue(sleep_time - _TOLERANCE <= delta <= sleep_time + _TOLERANCE, delta) + self.assertDictEqual(_aggregate_dispatched_users(dispatched_users), {"User1": 2, "User2": 5, "User3": 4}) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[0].id), 4) + # without host-based balancing the following two values would be reversed + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[1].id), 3) + self.assertEqual(_user_count_on_worker(dispatched_users, worker_nodes[2].id), 4) + class TestRampUpUsersFromZeroWithFixed(unittest.TestCase): class RampUpCase: