From 7305ea302cc57a6456d54653dec99f3bc61c9cbb Mon Sep 17 00:00:00 2001 From: CharleneHu-42 <37971369+CharleneHu-42@users.noreply.github.com> Date: Fri, 20 Sep 2024 22:59:39 +0800 Subject: [PATCH] Support Poisson distributed requests for benchmark (#131) * Add the capability of customizing load shape * Update the path of custom load shape file * Support customized parameters of custom load shapes. * Add poisson load shape * Parameterize query timeout * Support warm-ups in benchmark.py * fix incorrect spawn_rate return value * Test stop criterias suppport run-time or user_queries only. * Use only 1 locust process for poission load shape for Locust has concurrency issue. * support run-time limit in poisson load shape * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Dynamically allocate Locust processes to fit different loads and load shapes * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: Yi Yao Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: LifengWang --- evals/benchmark/README.md | 12 +- evals/benchmark/benchmark.py | 165 +++++++++++++++--- evals/benchmark/benchmark.yaml | 12 +- .../benchmark/stresscli/commands/load_test.py | 74 +++++++- evals/benchmark/stresscli/locust/aistress.py | 25 ++- .../stresscli/locust/poisson_load_shape.py | 52 ++++++ 6 files changed, 302 insertions(+), 38 deletions(-) create mode 100644 evals/benchmark/stresscli/locust/poisson_load_shape.py diff --git a/evals/benchmark/README.md b/evals/benchmark/README.md index 9901c1ea..e9bc2b6a 100644 --- a/evals/benchmark/README.md +++ b/evals/benchmark/README.md @@ -63,10 +63,18 @@ test_suite_config: deployment_type: "k8s" # Default is "k8s", can also be "docker" service_ip: None # Leave as None for k8s, specify for Docker service_port: None # Leave as None for k8s, specify for Docker - concurrent_level: 4 # The concurrency level + load_shape: # Tenant concurrency pattern + name: constant # poisson or constant(locust default load shape) + params: # Loadshape-specific parameters + constant: # Poisson load shape specific parameters, activate only if load_shape is poisson + concurrent_level: 4 # If user_queries is specified, concurrent_level is target number of requests per user. If not, it is the number of simulated users + poisson: # Poisson load shape specific parameters, activate only if load_shape is poisson + arrival-rate: 1.0 # Request arrival rate + warm_ups: 0 # Number of test requests for warm-ups + run_time: 60m # Total runtime for the test suite user_queries: [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048] # Number of test requests + query_timeout: 120 # Number of seconds to wait for a simulated user to complete any executing task before exiting. 120 sec by defeult. random_prompt: false # Use random prompts if true, fixed prompts if false - run_time: 60m # Total runtime for the test suite collect_service_metric: false # Enable service metrics collection data_visualization: false # Enable data visualization test_output_dir: "/home/sdp/benchmark_output" # Directory for test outputs diff --git a/evals/benchmark/benchmark.py b/evals/benchmark/benchmark.py index 39ef2b35..77cab096 100644 --- a/evals/benchmark/benchmark.py +++ b/evals/benchmark/benchmark.py @@ -40,70 +40,157 @@ def extract_test_case_data(content): return { "examples": test_suite_config.get("examples", []), - "concurrent_level": test_suite_config.get("concurrent_level"), + "warm_ups": test_suite_config.get("warm_ups", 0), "user_queries": test_suite_config.get("user_queries", []), "random_prompt": test_suite_config.get("random_prompt"), "test_output_dir": test_suite_config.get("test_output_dir"), - "run_time": test_suite_config.get("run_time"), + "run_time": test_suite_config.get("run_time", None), "collect_service_metric": test_suite_config.get("collect_service_metric"), "llm_model": test_suite_config.get("llm_model"), "deployment_type": test_suite_config.get("deployment_type"), "service_ip": test_suite_config.get("service_ip"), "service_port": test_suite_config.get("service_port"), + "load_shape": test_suite_config.get("load_shape"), + "query_timeout": test_suite_config.get("query_timeout", 120), "all_case_data": { example: content["test_cases"].get(example, {}) for example in test_suite_config.get("examples", []) }, } -def create_run_yaml_content(service, base_url, bench_target, concurrency, user_queries, test_suite_config): +def create_run_yaml_content(service, base_url, bench_target, test_phase, num_queries, test_params): """Create content for the run.yaml file.""" - return { + + # If a load shape includes the parameter concurrent_level, + # the parameter will be passed to Locust to launch fixed + # number of simulated users. + concurrency = 1 + try: + load_shape = test_params["load_shape"]["name"] + load_shape_params = test_params["load_shape"]["params"][load_shape] + if load_shape_params and load_shape_params["concurrent_level"]: + if num_queries >= 0: + concurrency = max(1, num_queries // load_shape_params["concurrent_level"]) + else: + concurrency = load_shape_params["concurrent_level"] + except KeyError as e: + # If the concurrent_level is not specified, load shapes should + # manage concurrency and user spawn rate by themselves. + pass + + yaml_content = { "profile": { - "storage": {"hostpath": test_suite_config["test_output_dir"]}, + "storage": {"hostpath": test_params["test_output_dir"]}, "global-settings": { "tool": "locust", "locustfile": os.path.join(os.getcwd(), "stresscli/locust/aistress.py"), "host": base_url, - "stop-timeout": 120, + "stop-timeout": test_params["query_timeout"], "processes": 2, "namespace": "default", "bench-target": bench_target, - "run-time": test_suite_config["run_time"], - "service-metric-collect": test_suite_config["collect_service_metric"], + "service-metric-collect": test_params["collect_service_metric"], "service-list": service.get("service_list", []), - "llm-model": test_suite_config["llm_model"], - "deployment-type": test_suite_config["deployment_type"], + "llm-model": test_params["llm_model"], + "deployment-type": test_params["deployment_type"], + "load-shape": test_params["load_shape"], }, - "runs": [{"name": "benchmark", "users": concurrency, "max-request": user_queries}], + "runs": [{"name": test_phase, "users": concurrency, "max-request": num_queries}], } } + # For the following scenarios, test will stop after the specified run-time + # 1) run_time is not specified in benchmark.yaml + # 2) Not a warm-up run + # TODO: According to Locust's doc, run-time should default to run forever, + # however the default is 48 hours. + if test_params["run_time"] is not None and test_phase != "warmup": + yaml_content["profile"]["global-settings"]["run-time"] = test_params["run_time"] + + return yaml_content + + +def generate_stresscli_run_yaml( + example, case_type, case_params, test_params, test_phase, num_queries, base_url, ts +) -> str: + """Create a stresscli configuration file and persist it on disk. + + Parameters + ---------- + example : str + The name of the example. + case_type : str + The type of the test case + case_params : dict + The parameters of single test case. + test_phase : str [warmup|benchmark] + Current phase of the test. + num_queries : int + The number of test requests sent to SUT + base_url : str + The root endpoint of SUT + test_params : dict + The parameters of the test + ts : str + Timestamp + + Returns + ------- + run_yaml_path : str + The path of the generated YAML file. + """ + # Get the workload + if case_type == "e2e": + bench_target = f"{example}{'bench' if test_params['random_prompt'] else 'fixed'}" + else: + bench_target = f"{case_type}{'bench' if test_params['random_prompt'] else 'fixed'}" + + # Generate the content of stresscli configuration file + stresscli_yaml = create_run_yaml_content(case_params, base_url, bench_target, test_phase, num_queries, test_params) + + # Dump the stresscli configuration file + service_name = case_params.get("service_name") + run_yaml_path = os.path.join( + test_params["test_output_dir"], f"run_{service_name}_{ts}_{test_phase}_{num_queries}.yaml" + ) + with open(run_yaml_path, "w") as yaml_file: + yaml.dump(stresscli_yaml, yaml_file) + + return run_yaml_path + def create_and_save_run_yaml(example, deployment_type, service_type, service, base_url, test_suite_config, index): """Create and save the run.yaml file for the service being tested.""" os.makedirs(test_suite_config["test_output_dir"], exist_ok=True) - service_name = service.get("service_name") run_yaml_paths = [] - for user_queries in test_suite_config["user_queries"]: - concurrency = max(1, user_queries // test_suite_config["concurrent_level"]) - - if service_type == "e2e": - bench_target = f"{example}{'bench' if test_suite_config['random_prompt'] else 'fixed'}" - else: - bench_target = f"{service_type}{'bench' if test_suite_config['random_prompt'] else 'fixed'}" - run_yaml_content = create_run_yaml_content( - service, base_url, bench_target, concurrency, user_queries, test_suite_config - ) - run_yaml_path = os.path.join( - test_suite_config["test_output_dir"], f"run_{service_name}_{index}_users_{user_queries}.yaml" + # Add YAML configuration of stresscli for warm-ups + warm_ups = test_suite_config["warm_ups"] + if warm_ups is not None and warm_ups > 0: + run_yaml_paths.append( + generate_stresscli_run_yaml( + example, service_type, service, test_suite_config, "warmup", warm_ups, base_url, index + ) ) - with open(run_yaml_path, "w") as yaml_file: - yaml.dump(run_yaml_content, yaml_file) - run_yaml_paths.append(run_yaml_path) + # Add YAML configuration of stresscli for benchmark + user_queries_lst = test_suite_config["user_queries"] + if user_queries_lst is None or len(user_queries_lst) == 0: + # Test stop is controlled by run time + run_yaml_paths.append( + generate_stresscli_run_yaml( + example, service_type, service, test_suite_config, "benchmark", -1, base_url, index + ) + ) + else: + # Test stop is controlled by request count + for user_queries in user_queries_lst: + run_yaml_paths.append( + generate_stresscli_run_yaml( + example, service_type, service, test_suite_config, "benchmark", user_queries, base_url, index + ) + ) return run_yaml_paths @@ -178,13 +265,31 @@ def process_service(example, service_type, case_data, test_suite_config): run_service_test(example, service_type, service, test_suite_config) +def check_test_suite_config(test_suite_config): + """Check the configuration of test suite. + + Parameters + ---------- + test_suite_config : dict + The name of the example. + + Raises + ------- + ValueError + If incorrect configuration detects + """ + + # User must specify either run_time or user_queries. + if test_suite_config["run_time"] is None and len(test_suite_config["user_queries"]) == 0: + raise ValueError("Must specify either run_time or user_queries.") + + if __name__ == "__main__": # Load test suit configuration yaml_content = load_yaml("./benchmark.yaml") # Extract data parsed_data = extract_test_case_data(yaml_content) test_suite_config = { - "concurrent_level": parsed_data["concurrent_level"], "user_queries": parsed_data["user_queries"], "random_prompt": parsed_data["random_prompt"], "run_time": parsed_data["run_time"], @@ -194,7 +299,11 @@ def process_service(example, service_type, case_data, test_suite_config): "service_ip": parsed_data["service_ip"], "service_port": parsed_data["service_port"], "test_output_dir": parsed_data["test_output_dir"], + "load_shape": parsed_data["load_shape"], + "query_timeout": parsed_data["query_timeout"], + "warm_ups": parsed_data["warm_ups"], } + check_test_suite_config(test_suite_config) # Mapping of example names to service types example_service_map = { diff --git a/evals/benchmark/benchmark.yaml b/evals/benchmark/benchmark.yaml index abde3c06..70ba4a61 100644 --- a/evals/benchmark/benchmark.yaml +++ b/evals/benchmark/benchmark.yaml @@ -6,14 +6,22 @@ test_suite_config: # Overall configuration settings for the test suite deployment_type: "k8s" # Default is "k8s", can also be "docker" service_ip: None # Leave as None for k8s, specify for Docker service_port: None # Leave as None for k8s, specify for Docker - concurrent_level: 4 # The concurrency level, adjustable based on requirements + warm_ups: 0 # Number of test requests for warm-up + run_time: 60m # The max total run time for the test suite user_queries: [1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048] # Number of test requests at each concurrency level + query_timeout: 120 # Number of seconds to wait for a simulated user to complete any executing task before exiting. 120 sec by defeult. random_prompt: false # Use random prompts if true, fixed prompts if false - run_time: 60m # The max total run time for the test suite collect_service_metric: false # Collect service metrics if true, do not collect service metrics if false data_visualization: false # Generate data visualization if true, do not generate data visualization if false llm_model: "Intel/neural-chat-7b-v3-3" # The LLM model used for the test test_output_dir: "/home/sdp/benchmark_output" # The directory to store the test output + load_shape: # Tenant concurrency pattern + name: constant # poisson or constant(locust default load shape) + params: # Loadshape-specific parameters + constant: # Poisson load shape specific parameters, activate only if load_shape is poisson + concurrent_level: 4 # If user_queries is specified, concurrent_level is target number of requests per user. If not, it is the number of simulated users + poisson: # Poisson load shape specific parameters, activate only if load_shape is poisson + arrival-rate: 1.0 # Request arrival rate test_cases: chatqna: diff --git a/evals/benchmark/stresscli/commands/load_test.py b/evals/benchmark/stresscli/commands/load_test.py index 8895e5ab..ff7f2945 100644 --- a/evals/benchmark/stresscli/commands/load_test.py +++ b/evals/benchmark/stresscli/commands/load_test.py @@ -3,6 +3,8 @@ # stresscli/load_test.py +import logging +import math import os import subprocess from datetime import datetime @@ -13,6 +15,9 @@ from .report import export_testdata from .utils import dump_k8s_config, generate_lua_script, generate_random_suffix +# Default load shape +DEFAULT_LOADSHAPE = "constant" + # Define default values locust_defaults = { "locustfile": "locustfile.py", @@ -26,8 +31,11 @@ "users": 10, "max-request": 100, "namespace": "default", + "load-shape": {"name": DEFAULT_LOADSHAPE}, } +console_logger = logging.getLogger("opea.eval") + @click.command() # @click.option('--dataset', type=click.Path(), help='Dataset path') @@ -108,6 +116,9 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in runspec["stop_timeout"] = run_settings.get( "stop-timeout", global_settings.get("stop-timeout", locust_defaults["stop-timeout"]) ) + runspec["stop_timeout"] = ( + locust_defaults["stop-timeout"] if runspec["stop_timeout"] is None else runspec["stop_timeout"] + ) runspec["processes"] = run_settings.get("processes", global_settings.get("processes", locust_defaults["processes"])) runspec["bench-target"] = run_settings.get( "bench-target", global_settings.get("bench-target", locust_defaults["bench-target"]) @@ -119,6 +130,32 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in runspec["namespace"] = run_settings.get("namespace", global_settings.get("namespace", locust_defaults["namespace"])) runspec["run_name"] = run_settings["name"] + + # Specify load shape to adjust user distribution + load_shape_conf = run_settings.get("load-shape", global_settings.get("load-shape", locust_defaults["load-shape"])) + try: + load_shape = load_shape_conf["name"] + except KeyError: + load_shape = DEFAULT_LOADSHAPE + + runspec["load-shape"] = load_shape + if load_shape == DEFAULT_LOADSHAPE: + # constant load is Locust's default load shape, do nothing. + console_logger.info("Use default load shape.") + else: + # load a custom load shape + f_custom_load_shape = os.path.join(os.getcwd(), f"stresscli/locust/{load_shape}_load_shape.py") + if os.path.isfile(f_custom_load_shape): + # Add the locust file of the custom load shape into classpath + runspec["locustfile"] += f",{f_custom_load_shape}" + console_logger.info("Use custom load shape: {load_shape}") + else: + console_logger.error( + f"Unsupported load shape: {load_shape}." + f"The Locust file of custom load shape not found: {f_custom_load_shape}. Aborted." + ) + exit() + # csv_output = os.path.join(output_folder, runspec['run_name']) # json_output = os.path.join(output_folder, f"{runspec['run_name']}_output.log") csv_output = os.path.join(output_folder, f"{index}") @@ -135,7 +172,26 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in metrics_output = os.path.join(output_folder, f"{index}_metrics.json") spawn_rate = 100 if runspec["users"] > 100 else runspec["users"] - processes = 10 if runspec["max_requests"] > 2000 else 5 if runspec["max_requests"] > 1000 else 2 + + load_shape_params = None + try: + load_shape_params = load_shape_conf["params"][load_shape] + except KeyError: + console_logger.info(f"The specified load shape not found: {load_shape}") + + # Dynamically allocate Locust processes to fit different loads + processes = 2 + if load_shape == "constant": + if runspec["max_requests"] > 0: + processes = 10 if runspec["max_requests"] > 2000 else 5 if runspec["max_requests"] > 1000 else processes + else: + concurrent_level = 2 + if load_shape_params and "concurrent_level" in load_shape_params: + concurrent_level = int(load_shape_params["concurrent_level"]) + processes = 10 if concurrent_level > 400 else 5 if concurrent_level > 200 else processes + elif load_shape == "poisson": + if load_shape_params and "arrival-rate" in load_shape_params: + processes = max(2, math.ceil(int(load_shape_params["arrival-rate"]) / 10)) cmd = [ "locust", @@ -145,14 +201,16 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in runspec["host"], "--run-time", runspec["runtime"], + "--load-shape", + runspec["load-shape"], + "--processes", + str(processes), "--users", str(runspec["users"]), "--spawn-rate", str(spawn_rate), "--max-request", str(runspec["max_requests"]), - "--processes", - str(processes), "--bench-target", str(runspec["bench-target"]), "--llm-model", @@ -168,6 +226,16 @@ def run_locust_test(kubeconfig, global_settings, run_settings, output_folder, in "--json", ] + # Get loadshape specific parameters + if load_shape_params and "concurrent_level" in load_shape_params: + del load_shape_params["concurrent_level"] + + # Add loadshape-specific parameters to locust parameters + if load_shape_params is not None: + for key, value in load_shape_params.items(): + cmd.append(f"--{key}") + cmd.append(str(value)) + print(f"Running test: {' '.join(cmd)}") namespace = runspec["namespace"] diff --git a/evals/benchmark/stresscli/locust/aistress.py b/evals/benchmark/stresscli/locust/aistress.py index d52c738a..fb0c61d6 100644 --- a/evals/benchmark/stresscli/locust/aistress.py +++ b/evals/benchmark/stresscli/locust/aistress.py @@ -23,7 +23,7 @@ def _(parser): "--max-request", type=int, env_var="MAX_REQUEST", - default=10000, + default=-1, help="Stop the benchmark If exceed this request", ) parser.add_argument( @@ -43,6 +43,13 @@ def _(parser): default="Intel/neural-chat-7b-v3-3", help="LLM model name", ) + parser.add_argument( + "--load-shape", + type=str, + env_var="OPEA_EVAL_LOAD_SHAPE", + default="constant", + help="load shape to adjust conccurency at runtime", + ) reqlist = [] @@ -64,8 +71,13 @@ def __init__(self, *args, **kwargs): @task def bench_main(self): + max_request = self.environment.parsed_options.max_request + if max_request >= 0 and AiStressUser.request >= max_request: + # For poisson load shape, a user only sends single request before it stops. + # TODO: user should not care about load shape + if self.environment.parsed_options.load_shape == "poisson": + self.stop(force=True) - if AiStressUser.request >= self.environment.parsed_options.max_request: time.sleep(1) return with AiStressUser._lock: @@ -145,6 +157,11 @@ def bench_main(self): self.environment.runner.stats.log_request("POST", url, time.perf_counter() - start_ts, 0) self.environment.runner.stats.log_error("POST", url, "Locust Request error") + # For poisson load shape, a user only sends single request before it stops. + # TODO: user should not care about load shape + if self.environment.parsed_options.load_shape == "poisson": + self.stop(force=True) + # def on_stop(self) -> None: @@ -155,6 +172,7 @@ def on_test_start(environment, **kwargs): console_logger.info(f"Max request count : {environment.parsed_options.max_request}") console_logger.info(f"Http timeout : {environment.parsed_options.http_timeout}\n") console_logger.info(f"Benchmark target : {environment.parsed_options.bench_target}\n") + console_logger.info(f"Load shape : {environment.parsed_options.load_shape}") @events.init.add_listener @@ -202,7 +220,8 @@ def on_reqcount(msg, **kwargs): def checker(environment): while environment.runner.state not in [STATE_STOPPING, STATE_STOPPED, STATE_CLEANUP]: time.sleep(1) - if environment.runner.stats.num_requests >= environment.parsed_options.max_request: + max_request = environment.parsed_options.max_request + if max_request >= 0 and environment.runner.stats.num_requests >= max_request: logging.info(f"Exceed the max-request number:{environment.runner.stats.num_requests}, Exit...") # while environment.runner.user_count > 0: time.sleep(5) diff --git a/evals/benchmark/stresscli/locust/poisson_load_shape.py b/evals/benchmark/stresscli/locust/poisson_load_shape.py new file mode 100644 index 00000000..35a8b64c --- /dev/null +++ b/evals/benchmark/stresscli/locust/poisson_load_shape.py @@ -0,0 +1,52 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import logging + +import locust +import numpy as np +from locust import LoadTestShape, events + +logger = logging.getLogger(__name__) + + +@events.init_command_line_parser.add_listener +def _(parser): + parser.add_argument( + "--arrival-rate", + type=float, + default=1.0, + ) + + +class PoissonLoadShape(LoadTestShape): + use_common_options = True + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.arrival_rate = locust.argument_parser.parse_options().arrival_rate + self.last_tick = 0 + + def tick(self): + if self.last_tick == 0: + logger.info("Poisson load shape arrival rate: {arrival_rate}".format(arrival_rate=self.arrival_rate)) + + run_time = self.get_run_time() + if run_time < self.runner.environment.parsed_options.run_time: + time_diff = run_time - self.last_tick + self.last_tick = run_time + + new_users = np.random.poisson(lam=self.arrival_rate * time_diff) + current_users = self.get_current_user_count() + user_count = current_users + new_users + logger.debug( + "Current users: {current_users}, New users: {new_users}, Target users: {target_users}".format( + current_users=current_users, new_users=new_users, target_users=user_count + ) + ) + # Avoid illegal spawn_rate value of 0 + spawn_rate = max(0.01, new_users) + return (user_count, spawn_rate) + + self.runner.environment.stop_timeout = 0 + return None