Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#1884 User distribution should happen when new workers comes in #1886

Merged
merged 2 commits into from
Sep 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions locust/argument_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,13 @@ def setup_parser_arguments(parser):
dest="equal_weights",
help="Use equally distributed task weights, overriding the weights specified in the locustfile.",
)
other_group.add_argument(
"--enable-rebalancing",
action="store_true",
default=False,
dest="enable_rebalancing",
help="Allow to automatically rebalance users if new workers are added or removed during a test run.",
)

user_classes_group = parser.add_argument_group("User classes")
user_classes_group.add_argument(
Expand Down
15 changes: 10 additions & 5 deletions locust/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,6 +601,7 @@ def __init__(self, environment, master_bind_host, master_bind_port):
self.master_bind_host = master_bind_host
self.master_bind_port = master_bind_port
self.spawn_rate: float = 0
self.spawning_completed = False

self.clients = WorkerNodes()
try:
Expand Down Expand Up @@ -637,6 +638,9 @@ def on_quitting(environment, **kw):

self.environment.events.quitting.add_listener(on_quitting)

def rebalancing_enabled(self) -> bool:
return self.environment.parsed_options and self.environment.parsed_options.enable_rebalancing

@property
def user_count(self) -> int:
return sum(c.user_count for c in self.clients.values())
Expand All @@ -649,6 +653,7 @@ def cpu_log_warning(self):
return warning_emitted

def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:
self.spawning_completed = False
num_workers = len(self.clients.ready) + len(self.clients.running) + len(self.clients.spawning)
if not num_workers:
logger.warning(
Expand Down Expand Up @@ -744,6 +749,7 @@ def start(self, user_count: int, spawn_rate: float, **kwargs) -> None:
timeout.cancel()

self.environment.events.spawning_complete.fire(user_count=sum(self.target_user_classes_count.values()))
self.spawning_completed = True

logger.info("All users spawned: %s" % _format_user_classes_count_for_log(self.reported_user_classes_count))

Expand Down Expand Up @@ -831,7 +837,8 @@ def heartbeat_worker(self):
client.user_classes_count = {}
if self._users_dispatcher is not None:
self._users_dispatcher.remove_worker(client)
# TODO: If status is `STATE_RUNNING`, call self.start()
if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed:
self.start(self.target_user_count, self.spawn_rate)
if self.worker_count <= 0:
logger.info("The last worker went missing, stopping test.")
self.stop()
Expand Down Expand Up @@ -877,10 +884,8 @@ def client_listener(self):
"Client %r reported as ready. Currently %i clients ready to swarm."
% (worker_node_id, len(self.clients.ready + self.clients.running + self.clients.spawning))
)
# if self.state == STATE_RUNNING or self.state == STATE_SPAWNING:
# # TODO: Necessary now that UsersDispatcher handles that?
# # balance the load distribution when new client joins
# self.start(self.target_user_count, self.spawn_rate)
if self.rebalancing_enabled() and self.state == STATE_RUNNING and self.spawning_completed:
self.start(self.target_user_count, self.spawn_rate)
# emit a warning if the worker's clock seem to be out of sync with our clock
# if abs(time() - msg.data["time"]) > 5.0:
# warnings.warn("The worker node's clock seem to be out of sync. For the statistics to be correct the different locust servers need to have synchronized clocks.")
Expand Down
102 changes: 102 additions & 0 deletions locust/test/test_runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
User,
task,
)
from retry import retry

NETWORK_BROKEN = "network broken"

Expand Down Expand Up @@ -663,6 +664,107 @@ def incr_stats(self):
"For some reason the master node's stats has not come in",
)

def test_distributed_rebalanced_integration_run(self):
"""
Full integration test that starts both a MasterRunner and three WorkerRunner instances
and makes sure that their stats is sent to the Master.
"""

class TestUser(User):
wait_time = constant(0.1)

@task
def incr_stats(self):
self.environment.events.request.fire(
request_type="GET",
name="/",
response_time=1337,
response_length=666,
exception=None,
context={},
)

with mock.patch("locust.runners.WORKER_REPORT_INTERVAL", new=0.3):
# start a Master runner
options = parse_options(["--enable-rebalancing"])
master_env = Environment(user_classes=[TestUser], parsed_options=options)
master = master_env.create_master_runner("*", 0)
sleep(0)
# start 3 Worker runners
workers = []

def add_worker():
worker_env = Environment(user_classes=[TestUser])
worker = worker_env.create_worker_runner("127.0.0.1", master.server.port)
workers.append(worker)

for i in range(3):
add_worker()

# give workers time to connect
sleep(0.1)
# issue start command that should trigger TestUsers to be spawned in the Workers
master.start(6, spawn_rate=1000)
sleep(0.1)
# check that worker nodes have started locusts
for worker in workers:
self.assertEqual(2, worker.user_count)
# give time for users to generate stats, and stats to be sent to master
# Add 1 more workers (should be 4 now)
add_worker()

@retry(AssertionError, tries=10, delay=0.5)
def check_rebalanced_true():
for worker in workers:
self.assertTrue(worker.user_count > 0)

# Check that all workers have a user count > 0 at least
check_rebalanced_true()
# Add 2 more workers (should be 6 now)
add_worker()
add_worker()

@retry(AssertionError, tries=10, delay=0.5)
def check_rebalanced_equals():
for worker in workers:
self.assertEqual(1, worker.user_count)

# Check that all workers have a user count = 1 now
check_rebalanced_equals()

# Simulate that some workers are missing by "killing" them abrutly
for i in range(3):
workers[i].greenlet.kill(block=True)

@retry(AssertionError, tries=10, delay=1)
def check_master_worker_missing_count():
self.assertEqual(3, len(master.clients.missing))

# Check that master detected the missing workers
check_master_worker_missing_count()

@retry(AssertionError, tries=10, delay=1)
def check_remaing_worker_new_user_count():
for i in range(3, 6):
self.assertEqual(2, workers[i].user_count)

# Check that remaining workers have a new count of user due to rebalancing.
check_remaing_worker_new_user_count()
sleep(1)

# Finally quit and check states of remaining workers.
master.quit()
# make sure users are killed on remaining workers
for i in range(3, 6):
self.assertEqual(0, workers[i].user_count)

# check that stats are present in master
self.assertGreater(
master_env.runner.stats.total.num_requests,
20,
"For some reason the master node's stats has not come in",
)

def test_distributed_run_with_custom_args(self):
"""
Full integration test that starts both a MasterRunner and three WorkerRunner instances
Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ deps =
codecov
flake8
mock
retry
pyquery
cryptography
black==21.5b2
Expand Down