Skip to content

Commit

Permalink
Merge pull request #1886 from tyge68/issue/1884
Browse files Browse the repository at this point in the history
#1884 User distribution should happen when new workers comes in
  • Loading branch information
cyberw authored Sep 20, 2021
2 parents c7c49d3 + 9ec166b commit 87b6dbc
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 5 deletions.
7 changes: 7 additions & 0 deletions locust/argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,13 @@ def setup_parser_arguments(parser):
dest="equal_weights",
help="Use equally distributed task weights, overriding the weights specified in the locustfile.",
)
other_group.add_argument(
"--enable-rebalancing",
action="store_true",
default=False,
dest="enable_rebalancing",
help="Allow to automatically rebalance users if new workers are added or removed during a test run.",
)

user_classes_group = parser.add_argument_group("User classes")
user_classes_group.add_argument(
Expand Down
15 changes: 10 additions & 5 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ def __init__(self, environment, master_bind_host, master_bind_port):
self.master_bind_host = master_bind_host
self.master_bind_port = master_bind_port
self.spawn_rate: float = 0
self.spawning_completed = False

self.clients = WorkerNodes()
try:
Expand Down Expand Up @@ -637,6 +638,9 @@ def on_quitting(environment, **kw):

self.environment.events.quitting.add_listener(on_quitting)

def rebalancing_enabled(self) -> bool:
return self.environment.parsed_options and self.environment.parsed_options.enable_rebalancing

@property
def user_count(self) -> int:
return sum(c.user_count for c in self.clients.values())
Expand All @@ -649,6 +653,7 @@ def cpu_log_warning(self):
return warning_emitted

def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:
self.spawning_completed = False
num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning)
if not num_workers:
logger.warning(
Expand Down Expand Up @@ -744,6 +749,7 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:
timeout.cancel()

self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values()))
self.spawning_completed = True

logger.info("All users spawned: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count))

Expand Down Expand Up @@ -831,7 +837,8 @@ def heartbeat_worker(self):
client.user_classes_count = {}
if self._users_dispatcher is not None:
self._users_dispatcher.remove_worker(client)
# TODO: If status is `STATE_RUNNING`, call self.start()
if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed:
self.start(self.target_user_count, self.spawn_rate)
if self.worker_count <= 0:
logger.info("The last worker went missing, stopping test.")
self.stop()
Expand Down Expand Up @@ -877,10 +884,8 @@ def client_listener(self):
"Client %r reported as ready. Currently %i clients ready to swarm."
% (worker_node_id, len(self.clients.ready + self.clients.running + self.clients.spawning))
)
# if self.state == STATE_RUNNING or self.state == STATE_SPAWNING:
# # TODO: Necessary now that UsersDispatcher handles that?
# # balance the load distribution when new client joins
# self.start(self.target_user_count, self.spawn_rate)
if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed:
self.start(self.target_user_count, self.spawn_rate)
# emit a warning if the worker's clock seem to be out of sync with our clock
# if abs(time() - msg.data["time"]) > 5.0:
# warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
Expand Down
102 changes: 102 additions & 0 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
User,
task,
)
from retry import retry

NETWORK_BROKEN = "network broken"

Expand Down Expand Up @@ -663,6 +664,107 @@ def incr_stats(self):
"For some reason the master node's stats has not come in",
)

def test_distributed_rebalanced_integration_run(self):
"""
Full integration test that starts both a MasterRunner and three WorkerRunner instances
and makes sure that their stats is sent to the Master.
"""

class TestUser(User):
wait_time = constant(0.1)

@task
def incr_stats(self):
self.environment.events.request.fire(
request_type="GET",
name="/",
response_time=1337,
response_length=666,
exception=None,
context={},
)

with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):
# start a Master runner
options = parse_options(["--enable-rebalancing"])
master_env = Environment(user_classes=[TestUser], parsed_options=options)
master = master_env.create_master_runner("*", 0)
sleep(0)
# start 3 Worker runners
workers = []

def add_worker():
worker_env = Environment(user_classes=[TestUser])
worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)
workers.append(worker)

for i in range(3):
add_worker()

# give workers time to connect
sleep(0.1)
# issue start command that should trigger TestUsers to be spawned in the Workers
master.start(6, spawn_rate=1000)
sleep(0.1)
# check that worker nodes have started locusts
for worker in workers:
self.assertEqual(2, worker.user_count)
# give time for users to generate stats, and stats to be sent to master
# Add 1 more workers (should be 4 now)
add_worker()

@retry(AssertionError, tries=10, delay=0.5)
def check_rebalanced_true():
for worker in workers:
self.assertTrue(worker.user_count > 0)

# Check that all workers have a user count > 0 at least
check_rebalanced_true()
# Add 2 more workers (should be 6 now)
add_worker()
add_worker()

@retry(AssertionError, tries=10, delay=0.5)
def check_rebalanced_equals():
for worker in workers:
self.assertEqual(1, worker.user_count)

# Check that all workers have a user count = 1 now
check_rebalanced_equals()

# Simulate that some workers are missing by "killing" them abrutly
for i in range(3):
workers[i].greenlet.kill(block=True)

@retry(AssertionError, tries=10, delay=1)
def check_master_worker_missing_count():
self.assertEqual(3, len(master.clients.missing))

# Check that master detected the missing workers
check_master_worker_missing_count()

@retry(AssertionError, tries=10, delay=1)
def check_remaing_worker_new_user_count():
for i in range(3, 6):
self.assertEqual(2, workers[i].user_count)

# Check that remaining workers have a new count of user due to rebalancing.
check_remaing_worker_new_user_count()
sleep(1)

# Finally quit and check states of remaining workers.
master.quit()
# make sure users are killed on remaining workers
for i in range(3, 6):
self.assertEqual(0, workers[i].user_count)

# check that stats are present in master
self.assertGreater(
master_env.runner.stats.total.num_requests,
20,
"For some reason the master node's stats has not come in",
)

def test_distributed_run_with_custom_args(self):
"""
Full integration test that starts both a MasterRunner and three WorkerRunner instances
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ deps =
codecov
flake8
mock
retry
pyquery
cryptography
black==21.5b2
Expand Down

0 comments on commit 87b6dbc

Please sign in to comment.