Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor aggregate #706

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 45 additions & 36 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,37 +22,37 @@ def __init__(self, cfg, test_executions_dict, args):
self.loaded_workload = None

def count_iterations_for_each_op(self, test_execution) -> None:
matching_test_procedure = next((tp for tp in self.loaded_workload.test_procedures if tp.name == self.test_procedure_name), None)
"""Count iterations for each operation in the test execution"""
workload_params = test_execution.workload_params if test_execution.workload_params else {}

test_execution_id = test_execution.test_execution_id
self.accumulated_iterations[test_execution_id] = {}

if matching_test_procedure:
for task in matching_test_procedure.schedule:
task_name = task.name
task_name_iterations = f"{task_name}_iterations"
if task_name_iterations in workload_params:
iterations = int(workload_params[task_name_iterations])
else:
iterations = task.iterations or 1
self.accumulated_iterations[test_execution_id][task_name] = iterations
else:
raise ValueError(f"Test procedure '{self.test_procedure_name}' not found in the loaded workload.")
for task in self.loaded_workload.find_test_procedure_or_default(self.test_procedure_name).schedule:
task_name = task.name
task_name_iterations = f"{task_name}_iterations"
iterations = int(workload_params.get(task_name_iterations, task.iterations or 1))
self.accumulated_iterations[test_execution_id][task_name] = iterations

def accumulate_results(self, test_execution: Any) -> None:
for item in test_execution.results.get("op_metrics", []):
task = item.get("task", "")
def accumulate_results(self, test_execution) -> None:
"""Accumulate results from a single test execution"""
for operation_metric in test_execution.results.get("op_metrics", []):
task = operation_metric.get("task", "")
self.accumulated_results.setdefault(task, {})
for metric in self.metrics:
self.accumulated_results[task].setdefault(metric, [])
self.accumulated_results[task][metric].append(item.get(metric))
self.accumulated_results[task][metric].append(operation_metric.get(metric))

def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any:
"""
Aggregates JSON results across multiple test executions using a specified key path.
Handles nested dictionary structures and calculates averages for numeric values
"""
all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()]

# retrieve nested value from a dictionary given a key path
def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any:
"""
Retrieves a value from a nested dictionary structure using a path of keys.
"""
for key in path:
if isinstance(obj, dict):
obj = obj.get(key, {})
Expand All @@ -66,8 +66,7 @@ def aggregate_helper(objects: List[Any]) -> Any:
if not objects:
return None
if all(isinstance(obj, (int, float)) for obj in objects):
avg = sum(objects) / len(objects)
return avg
return sum(objects) / len(objects)
if all(isinstance(obj, dict) for obj in objects):
keys = set().union(*objects)
return {key: aggregate_helper([obj.get(key) for obj in objects]) for key in keys}
Expand All @@ -82,8 +81,8 @@ def aggregate_helper(objects: List[Any]) -> Any:
values = [get_nested_value(json, key_path) for json in all_jsons]
return aggregate_helper(values)

def build_aggregated_results(self):
test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
def build_aggregated_results_dict(self):
"""Builds a dictionary of aggregated metrics from all test executions"""
aggregated_results = {
"op_metrics": [],
"correctness_metrics": self.aggregate_json_by_key("correctness_metrics"),
Expand Down Expand Up @@ -147,8 +146,30 @@ def build_aggregated_results(self):

aggregated_results["op_metrics"].append(op_metric)

# extract the necessary data from the first test execution, since the configurations should be identical for all test executions
return aggregated_results

def update_config_object(self, test_execution):
"""
Updates the configuration object with values from a test execution.
Uses the first test execution as reference since configurations should be identical
"""
current_timestamp = self.config.opts("system", "time.start")
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.names", test_execution.provision_config_instance)
self.config.add(config.Scope.applicationOverride, "system",
"env.name", test_execution.environment_name)
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp)
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_execution.pipeline)
self.config.add(config.Scope.applicationOverride, "workload", "params", test_execution.workload_params)
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.params", test_execution.provision_config_instance_params)
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_execution.plugin_params)
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_execution.latency_percentiles)
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_execution.throughput_percentiles)

def build_aggregated_results(self):
test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
aggregated_results = self.build_aggregated_results_dict()

if hasattr(self.args, 'results_file') and self.args.results_file != "":
normalized_results_file = rio.normalize_path(self.args.results_file, self.cwd)
Expand All @@ -165,19 +186,7 @@ def build_aggregated_results(self):

print("Aggregate test execution ID: ", test_execution_id)

# add values to the configuration object
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.names", test_exe.provision_config_instance)
self.config.add(config.Scope.applicationOverride, "system",
"env.name", test_exe.environment_name)
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp)
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_exe.pipeline)
self.config.add(config.Scope.applicationOverride, "workload", "params", test_exe.workload_params)
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.params", test_exe.provision_config_instance_params)
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_exe.plugin_params)
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_exe.latency_percentiles)
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles)
self.update_config_object(test_exe)

loaded_workload = workload.load_workload(self.config)
test_procedure_object = loaded_workload.find_test_procedure_or_default(self.test_procedure_name)
Expand Down
1 change: 1 addition & 0 deletions tests/aggregator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def test_count_iterations_for_each_op(aggregator):
mock_test_procedure.schedule = mock_schedule
mock_workload.test_procedures = [mock_test_procedure]

mock_workload.find_test_procedure_or_default = Mock(return_value=mock_test_procedure)
mock_test_execution = Mock(test_execution_id="test1", workload_params={})

aggregator.loaded_workload = mock_workload
Expand Down
Loading