Skip to content

Commit

Permalink
Add command "tuning" (opensearch-project#508)
Browse files Browse the repository at this point in the history
Signed-off-by: Liyun Xiu <xiliyun@amazon.com>
  • Loading branch information
chishui committed Apr 28, 2024
1 parent 8b6f746 commit 62b337b
Show file tree
Hide file tree
Showing 7 changed files with 527 additions and 12 deletions.
202 changes: 202 additions & 0 deletions notebooks/tuning_ingest_params.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
{
"cells": [
{
"cell_type": "raw",
"id": "561e61a3-3ba6-4bc8-a747-a8b51e34c493",
"metadata": {},
"source": [
"# Run benchmark with different variables to find combination which could lead to optimal ingestion performance\n",
"When ingesting data to OpenSearch using bulk API, using different variables could result in different ingestion performance. For example, the amount of document in bulk API, how many OpenSearch clients are used to send requests etc. It's not easy for user to experiment with all the combinations of the variables and find the option which could lead to optimal ingestion performance. In OpenSearch-2.15.0, a new parameter \"batch size\" was introduced in bulk API which could significantly reduce ingestion time when using with `text_embedding` processor and `sparse_encoding` processor. However, this additional impactor could make the variable tuning even more difficult.\n",
"\n",
"This tool is to help dealing with the pain point of tuning these variables which could impact ingestion performance and automatically find the optimal combination of the variables. It utilizes the OpenSearch-Benchmark, uses different varible combination to run benchmark, collects their outputs, analyzes and visualizes the results.\n",
"\n",
"There are three variables that you can test against: bulk size, OS client number, batch size. If you already have a perferred value for certain variable, you can simply set it to a fixed value and only test other variables. For example, you always use 1 client to run bulk API with bulk size equals to 100, then you can only try different batch size to see which batch size can have the best performance."
]
},
{
"cell_type": "markdown",
"id": "e40b1cc3-a2b2-4fac-b857-6f40943ef4f8",
"metadata": {},
"source": [
"## Step 1: Import Packages"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "140f4d8b-83d3-48fc-83c5-0c631104c355",
"metadata": {},
"outputs": [],
"source": [
"import sys\n",
"from pathlib import Path\n",
"\n",
"sys.path.insert(0, \"../\")\n",
"\n",
"from osbenchmark.tuning.optimal_finder import run\n",
"from osbenchmark.benchmark import create_arg_parser"
]
},
{
"cell_type": "raw",
"id": "e8c59ff8-ac0b-4a4c-ad05-944abc3dae13",
"metadata": {},
"source": [
"## Step 2: Prepare for test inputs\n",
"### Variable Test Schedule\n",
"All three variables have two different parameters, one to set fixed value and one to set a testing schedule. The schedule has two patterns:\n",
"1. set starting value, end value, step size and trend, separated by `:`, e.g. \"10:100:1:10\" means we should test with \"10, 20, 30, 40, 50, 60, 70, 80, 90, 100\". \"20:100:-1:20\" means we should test reversely with \"100, 80, 60, 40, 20\"\n",
"2. configure testing values manually by adding a prefix symbol `@` and still separate values using `:` e.g. \"@10:20:50\" means we only test with 10, 20, 50.\n",
"\n",
"Use `BULK_SIZE` to set a fixed bulk size value, e.g. `BULK_SIZE=100`\n",
"Use `BULK_SIZE_SCHEDULE` to set a testing schedule for bulk size. e.g. `BULK_SIZE_SCHEDULE=\"@10:20:50:100\"`\n",
"\n",
"Use `BATCH_SIZE` to set a fixed batch size value, e.g. `BATCH_SIZE=100`\n",
"Use `BATCH_SIZE_SCHEDULE` to set a testing schedule for batch size. e.g. `BATCH_SIZE_SCHEDULE=\"10:100:1:10\"`\n",
"\n",
"Use `CLIENT` to set a fixed client count, e.g. `CLIENT=1`\n",
"Use `CLIENT_SCHEDULE` to set a testing schedule for client count. e.g. `CLIENT_SCHEDULE=\"@1:2:4\"`\n",
"\n",
"### Parameters shared with OpenSearch-Benchmark\n",
"We reuse these parameters with OpenSearch-Benchmark `execute-test`:\n",
"1. WORKLOAD_PATH same as \"--workload-path\" in OSB `execute-test`, \"Define the path to a workload\"\n",
"2. TARGET_HOSTS same as \"--target-hosts\" in OSB `execute-test`, \"Define a comma-separated list of host:port pairs which should be targeted if using the pipeline 'benchmark-only' (default: localhost:9200).\"\n",
"3. CLIENT_OPTIONS same as \"--client-options\" in OSB `execute-test`, \"Define a comma-separated list of client options to use. The options will be passed to the OpenSearch Python client (default: timeout:60).\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7f656ab2-1659-414a-81db-76812a43132d",
"metadata": {},
"outputs": [],
"source": [
"# bulk size\n",
"BULK_SIZE=100\n",
"\n",
"# the clients used to run test in parallel\n",
"CLIENT=1\n",
"\n",
"# single test with error rate higher than this will be dropped\n",
"ALLOWED_ERROR_RATE=0\n",
"\n",
"# directory where the workload files locate\n",
"WORKLOAD_PATH=\"\"\n",
"\n",
"# remote ML server type, based on type we can recommend a set of testing parameters.\n",
"# choices are: \"sagemaker\", \"cohere\", \"openai\", \"unknown\". \n",
"REMOTE_ML_SERVER_TYPE=\"unknown\"\n",
"\n",
"# a comma-separated list of host:port pairs\n",
"TARGET_HOSTS=\"localhost:9200\"\n",
"\n",
"# a comma-separated list of client options to use\n",
"CLIENT_OPTIONS=\"timeout:60\"\n",
"\n",
"BATCH_SIZE_SCHEDULE=\"1:100:1:20\""
]
},
{
"cell_type": "markdown",
"id": "e3513460-b440-4a4e-bf9a-db2a8c9917d6",
"metadata": {},
"source": [
"## Step 3: Run tests"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "00b00450-97f1-4f58-ab76-259e112c3361",
"metadata": {},
"outputs": [],
"source": [
"# construct arguments for testing\n",
"argv = [\n",
" \"tuning\",\n",
" \"--allowed-error-rate\", str(ALLOWED_ERROR_RATE),\n",
" \"--bulk-size\", str(BULK_SIZE),\n",
" \"--client\", str(CLIENT),\n",
" \"--workload-path\", WORKLOAD_PATH,\n",
" \"--remote-ml-server-type\", REMOTE_ML_SERVER_TYPE,\n",
" \"--target-hosts\", TARGET_HOSTS,\n",
" \"--client-options\", CLIENT_OPTIONS,\n",
" \"--batch-size-schedule\", BATCH_SIZE_SCHEDULE\n",
"]\n",
"\n",
"# validate arguments\n",
"if not Path(WORKLOAD_PATH).exists():\n",
" print(\"WORKLOAD_PATH does not exist!\")\n",
"\n",
"# construct arguments to run\n",
"parser = create_arg_parser()\n",
"args = parser.parse_args(argv)\n",
"\n",
"# run tests with different arguments\n",
"results = run(args)"
]
},
{
"cell_type": "markdown",
"id": "97fdb2c2-7846-4daf-aced-67e6d2ea8aa2",
"metadata": {},
"source": [
"## Step 4: Visualize test results"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "ef26cfa8-7509-4d58-99f1-8e0bac115f96",
"metadata": {},
"outputs": [],
"source": [
"# visualize benchmark result\n",
"import matplotlib.pyplot as plt \n",
"\n",
"if results:\n",
" \n",
" batches = [int(result.batch_size) for result in results.values()]\n",
" latencies = [result.total_time for result in results.values()]\n",
" \n",
" plt.plot(batches, latencies)\n",
" \n",
" plt.xlabel('batch size')\n",
" plt.ylabel('latency')\n",
" \n",
" plt.show()\n",
"else:\n",
" print(\"please wait until last step completed!\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7cf5f074-bad9-481e-a4b3-056aafc77431",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.9"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
65 changes: 54 additions & 11 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from osbenchmark.builder import provision_config, builder
from osbenchmark.workload_generator import workload_generator
from osbenchmark.utils import io, convert, process, console, net, opts, versions
from osbenchmark.tuning import optimal_finder


def create_arg_parser():
Expand Down Expand Up @@ -221,6 +222,42 @@ def add_workload_source(subparser):
help="Whether to include the comparison in the results file.",
default=True)

tuning_parser = subparsers.add_parser("tuning", help="Run benchmarks with different combinations of "
"arguments, then compare results to find one which leads "
"to optimal result.")
tuning_parser.add_argument("--bulk-size",
type=int,
default=100,
help="A fixed bulk size used in benchmark")
tuning_parser.add_argument("--bulk-size-schedule",
help="A schedule for a series of bulk size used in benchmark, see "
"'https://tiny.amazon.com/zgliokjl' on the schedule pattern")
tuning_parser.add_argument("--client",
type=int,
default=1,
help="A fixed number of clients used in benchmark")
tuning_parser.add_argument("--client-schedule",
help="A schedule for a series of number of client used in benchmark, see "
"'https://tiny.amazon.com/zgliokjl' on the schedule pattern")
# hide these two variables until batch ingestion feature is launched
# https://github.com/opensearch-project/OpenSearch/issues/12457
tuning_parser.add_argument("--batch-size",
type=int,
default=1,
help=argparse.SUPPRESS)
tuning_parser.add_argument("--batch-size-schedule",
help=argparse.SUPPRESS)
tuning_parser.add_argument("--remote-ml-server-type",
default="unknown",
choices=["sagemaker", "cohere", "openai", "unknown"],
help=argparse.SUPPRESS)
tuning_parser.add_argument("--allowed-error-rate",
type=float,
default=0,
help="The maximum allowed error rate from a single test within which could indicate a "
"successful run")
add_workload_source(tuning_parser)

download_parser = subparsers.add_parser("download", help="Downloads an artifact")
download_parser.add_argument(
"--provision-config-repository",
Expand Down Expand Up @@ -469,20 +506,24 @@ def add_workload_source(subparser):
help="Define a comma-separated list of key:value pairs that are injected verbatim to all plugins as variables.",
default=""
)
test_execution_parser.add_argument(
"--target-hosts",
help="Define a comma-separated list of host:port pairs which should be targeted if using the pipeline 'benchmark-only' "
"(default: localhost:9200).",
default="") # actually the default is pipeline specific and it is set later

test_execution_parser.add_argument(
"--load-worker-coordinator-hosts",
help="Define a comma-separated list of hosts which should generate load (default: localhost).",
default="localhost")
test_execution_parser.add_argument(
"--client-options",
help=f"Define a comma-separated list of client options to use. The options will be passed to the OpenSearch "
f"Python client (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS}).",
default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS)

for p in [test_execution_parser, tuning_parser]:
p.add_argument(
"--target-hosts",
help="Define a comma-separated list of host:port pairs which should be targeted if using the pipeline 'benchmark-only' "
"(default: localhost:9200).",
default="") # actually the default is pipeline specific and it is set later
p.add_argument(
"--client-options",
help=f"Define a comma-separated list of client options to use. The options will be passed to the OpenSearch "
f"Python client (default: {opts.ClientOptions.DEFAULT_CLIENT_OPTIONS}).",
default=opts.ClientOptions.DEFAULT_CLIENT_OPTIONS)

test_execution_parser.add_argument("--on-error",
choices=["continue", "abort"],
help="Controls how Benchmark behaves on response errors (default: continue).",
Expand Down Expand Up @@ -608,7 +649,7 @@ def add_workload_source(subparser):
default=False)

for p in [list_parser, test_execution_parser, compare_parser, download_parser, install_parser,
start_parser, stop_parser, info_parser, create_workload_parser]:
start_parser, stop_parser, info_parser, create_workload_parser, tuning_parser]:
# This option is needed to support a separate configuration for the integration tests on the same machine
p.add_argument(
"--configuration-name",
Expand Down Expand Up @@ -934,6 +975,8 @@ def dispatch_sub_command(arg_parser, args, cfg):
elif sub_command == "info":
configure_workload_params(arg_parser, args, cfg)
workload.workload_info(cfg)
elif sub_command == "tuning":
optimal_finder.run(args)
else:
raise exceptions.SystemSetupError(f"Unknown subcommand [{sub_command}]")
return True
Expand Down
Empty file added osbenchmark/tuning/__init__.py
Empty file.
Loading

0 comments on commit 62b337b

Please sign in to comment.