Skip to content

Commit

Permalink
move auto-aggregation logic to execute-test subcommand
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
  • Loading branch information
OVI3D0 committed Sep 27, 2024
1 parent ab31ed0 commit c4012dd
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 80 deletions.
1 change: 1 addition & 0 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def build_aggregated_results(self):
test_procedure = loaded_workload.find_test_procedure_or_default(test_exe.test_procedure)

test_execution = metrics.create_test_execution(self.config, loaded_workload, test_procedure, test_exe.workload_revision)
test_execution.user_tags = list(self.test_executions.keys())
test_execution.add_results(AggregatedResults(aggregated_results))
test_execution.distribution_version = test_exe.distribution_version
test_execution.revision = test_exe.revision
Expand Down
170 changes: 90 additions & 80 deletions osbenchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import os
import platform
import sys
import threading
import time
import uuid

Expand Down Expand Up @@ -615,6 +614,25 @@ def add_workload_source(subparser):
f"high values favor the most common queries. "
f"Ignored if randomization is off (default: {workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA}).",
default=workload.loader.QueryRandomizerWorkloadProcessor.DEFAULT_ALPHA)
test_execution_parser.add_argument(
"--test-iterations",
help="The number of times to run the workload (default: 1).",
default=1)
test_execution_parser.add_argument(
"--aggregate",
type=lambda x: (str(x).lower() in ['true', '1', 'yes', 'y']),
help="Aggregate the results of multiple test executions (default: true).",
default=True)
test_execution_parser.add_argument(
"--sleep-timer",
help="Sleep for the specified number of seconds before starting the next test execution (default: 5).",
default=5)
test_execution_parser.add_argument(
"--cancel-on-error",
action="store_true",
help="Stop executing tests if an error occurs in one of the test iterations (default: false).",
default=False
)

###############################################################################
#
Expand All @@ -635,17 +653,7 @@ def add_workload_source(subparser):
action="store_true",
default=False)

auto_aggregate_parser = subparsers.add_parser("auto-aggregate",
add_help=False,
help="Run multiple test executions with the same configuration and aggregate the results",
parents=[test_execution_parser])
auto_aggregate_parser.add_argument(
"--test-iterations",
type=int,
required=True,
help="Number of test executions to run and aggregate")

for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser, auto_aggregate_parser,
for p in [list_parser, test_execution_parser, compare_parser, aggregate_parser,
download_parser, install_parser, start_parser, stop_parser, info_parser, create_workload_parser]:
# This option is needed to support a separate configuration for the integration tests on the same machine
p.add_argument(
Expand Down Expand Up @@ -874,29 +882,48 @@ def prepare_test_executions_dict(args, cfg):
test_executions_dict[execution] = None
return test_executions_dict

def run_and_aggregate(arg_parser, args, cfg):
semaphore = threading.Semaphore(1)
test_exes = []
args.subcommand = "execute-test"
aggregate_id = args.test_execution_id

for _ in range(args.test_iterations):
console.info(f"Running test {_ + 1}...")
args.test_execution_id = str(uuid.uuid4()) # we reuse the same args for each test so need to refresh the id
test_exes.append(args.test_execution_id)
with semaphore:
dispatch_sub_command(arg_parser, args, cfg)

console.info(f"Test executions: {', '.join(test_exes)}")
console.info("Aggregating results...")
aggregate_args = arg_parser.parse_args([
"aggregate",
f"--test-executions={','.join(test_exes)}",
f"--test-execution-id={aggregate_id}",
f"--results-file={args.results_file}",
f"--workload-repository={args.workload_repository}"
])
dispatch_sub_command(arg_parser, aggregate_args, cfg)
def configure_test(arg_parser, args, cfg):
# As the execute-test command is doing more work than necessary at the moment, we duplicate several parameters
# in this section that actually belong to dedicated subcommands (like install, start or stop). Over time
# these duplicated parameters will vanish as we move towards dedicated subcommands and use "execute-test" only
# to run the actual benchmark (i.e. generating load).
print_test_execution_id(args)
if args.effective_start_date:
cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date)
cfg.add(config.Scope.applicationOverride, "system", "test_execution.id", args.test_execution_id)
# use the test_execution id implicitly also as the install id.
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_execution_id)
cfg.add(config.Scope.applicationOverride, "test_execution", "pipeline", args.pipeline)
cfg.add(config.Scope.applicationOverride, "test_execution", "user.tag", args.user_tag)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error)
cfg.add(
config.Scope.applicationOverride,
"worker_coordinator",
"load_worker_coordinator_hosts",
opts.csv_to_list(args.load_worker_coordinator_hosts))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.alpha", args.randomization_alpha)
configure_workload_params(arg_parser, args, cfg)
configure_connection_params(arg_parser, args, cfg)
configure_telemetry_params(args, cfg)
configure_builder_params(args, cfg)
cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk)
cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision)
cfg.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.plugins", opts.csv_to_list(
args.opensearch_plugins))
cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params))
cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install))
cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check))

configure_results_publishing_params(args, cfg)

def print_test_execution_id(args):
console.info(f"[Test Execution ID]: {args.test_execution_id}")
Expand All @@ -916,8 +943,6 @@ def dispatch_sub_command(arg_parser, args, cfg):
test_executions_dict = prepare_test_executions_dict(args, cfg)
aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args)
aggregator_instance.aggregate()
elif sub_command == "auto-aggregate":
run_and_aggregate(arg_parser, args, cfg)
elif sub_command == "list":
cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration)
cfg.add(config.Scope.applicationOverride, "system", "list.test_executions.max_results", args.limit)
Expand Down Expand Up @@ -957,49 +982,34 @@ def dispatch_sub_command(arg_parser, args, cfg):
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.installation_id)
builder.stop(cfg)
elif sub_command == "execute-test":
# As the execute-test command is doing more work than necessary at the moment, we duplicate several parameters
# in this section that actually belong to dedicated subcommands (like install, start or stop). Over time
# these duplicated parameters will vanish as we move towards dedicated subcommands and use "execute-test" only
# to run the actual benchmark (i.e. generating load).
print_test_execution_id(args)
if args.effective_start_date:
cfg.add(config.Scope.applicationOverride, "system", "time.start", args.effective_start_date)
cfg.add(config.Scope.applicationOverride, "system", "test_execution.id", args.test_execution_id)
# use the test_execution id implicitly also as the install id.
cfg.add(config.Scope.applicationOverride, "system", "install.id", args.test_execution_id)
cfg.add(config.Scope.applicationOverride, "test_execution", "pipeline", args.pipeline)
cfg.add(config.Scope.applicationOverride, "test_execution", "user.tag", args.user_tag)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "profiling", args.enable_worker_coordinator_profiling)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "assertions", args.enable_assertions)
cfg.add(config.Scope.applicationOverride, "worker_coordinator", "on.error", args.on_error)
cfg.add(
config.Scope.applicationOverride,
"worker_coordinator",
"load_worker_coordinator_hosts",
opts.csv_to_list(args.load_worker_coordinator_hosts))
cfg.add(config.Scope.applicationOverride, "workload", "test.mode.enabled", args.test_mode)
cfg.add(config.Scope.applicationOverride, "workload", "latency.percentiles", args.latency_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", args.throughput_percentiles)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.enabled", args.randomization_enabled)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.repeat_frequency", args.randomization_repeat_frequency)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.n", args.randomization_n)
cfg.add(config.Scope.applicationOverride, "workload", "randomization.alpha", args.randomization_alpha)
configure_workload_params(arg_parser, args, cfg)
configure_connection_params(arg_parser, args, cfg)
configure_telemetry_params(args, cfg)
configure_builder_params(args, cfg)
cfg.add(config.Scope.applicationOverride, "builder", "runtime.jdk", args.runtime_jdk)
cfg.add(config.Scope.applicationOverride, "builder", "source.revision", args.revision)
cfg.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.plugins", opts.csv_to_list(
args.opensearch_plugins))
cfg.add(config.Scope.applicationOverride, "builder", "plugin.params", opts.to_dict(args.plugin_params))
cfg.add(config.Scope.applicationOverride, "builder", "preserve.install", convert.to_bool(args.preserve_install))
cfg.add(config.Scope.applicationOverride, "builder", "skip.rest.api.check", convert.to_bool(args.skip_rest_api_check))

configure_results_publishing_params(args, cfg)

execute_test(cfg, args.kill_running_processes)
iterations = int(args.test_iterations)
if iterations > 1:
test_exes = []
for _ in range(iterations):
try:
test_exes.append(args.test_execution_id)
configure_test(arg_parser, args, cfg)
execute_test(cfg, args.kill_running_processes)
time.sleep(int(args.sleep_timer))
args.test_execution_id = str(uuid.uuid4())
except Exception as e:
console.error(f"Error occurred during test execution {_+1}: {str(e)}")
if args.cancel_on_error:
console.info("Cancelling remaining test executions.")
break
else:
console.info("Continuing with next test execution.")

if args.aggregate:
args.test_executions = test_exes
test_executions_dict = prepare_test_executions_dict(args, cfg)
aggregator_instance = aggregator.Aggregator(cfg, test_executions_dict, args)
aggregator_instance.aggregate()
elif args.test_iterations == 1:
configure_test(arg_parser, args, cfg)
execute_test(cfg, args.kill_running_processes)
else:
console.info("Please enter a valid number of test iterations")
elif sub_command == "create-workload":
cfg.add(config.Scope.applicationOverride, "generator", "indices", args.indices)
cfg.add(config.Scope.applicationOverride, "generator", "number_of_docs", args.number_of_docs)
Expand Down

0 comments on commit c4012dd

Please sign in to comment.