Skip to content

Commit

Permalink
refactor aggregator class + fix unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
  • Loading branch information
OVI3D0 committed Dec 9, 2024
1 parent 402c97d commit f9159fd
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 24 deletions.
64 changes: 40 additions & 24 deletions osbenchmark/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def __init__(self, cfg, test_executions_dict, args):
self.loaded_workload = None

def count_iterations_for_each_op(self, test_execution) -> None:
"""Count iterations for each operation in the test execution."""
"""Count iterations for each operation in the test execution"""
workload_params = test_execution.workload_params if test_execution.workload_params else {}
test_execution_id = test_execution.test_execution_id
self.accumulated_iterations[test_execution_id] = {}
Expand All @@ -33,20 +33,26 @@ def count_iterations_for_each_op(self, test_execution) -> None:
iterations = int(workload_params.get(task_name_iterations, task.iterations or 1))
self.accumulated_iterations[test_execution_id][task_name] = iterations

def accumulate_results(self, test_execution: Any) -> None:
"""Accumulate results from a single test execution."""
for item in test_execution.results.get("op_metrics", []):
task = item.get("task", "")
def accumulate_results(self, test_execution) -> None:
"""Accumulate results from a single test execution"""
for operation_metric in test_execution.results.get("op_metrics", []):
task = operation_metric.get("task", "")
self.accumulated_results.setdefault(task, {})
for metric in self.metrics:
self.accumulated_results[task].setdefault(metric, [])
self.accumulated_results[task][metric].append(item.get(metric))
self.accumulated_results[task][metric].append(operation_metric.get(metric))

def aggregate_json_by_key(self, key_path: Union[str, List[str]]) -> Any:
"""Aggregate JSON results by a given key path."""
"""
Aggregates JSON results across multiple test executions using a specified key path.
Handles nested dictionary structures and calculates averages for numeric values
"""
all_jsons = [self.test_store.find_by_test_execution_id(id).results for id in self.test_executions.keys()]

def get_nested_value(obj: Dict[str, Any], path: List[str]) -> Any:
"""
Retrieves a value from a nested dictionary structure using a path of keys.
"""
for key in path:
if isinstance(obj, dict):
obj = obj.get(key, {})
Expand Down Expand Up @@ -75,8 +81,8 @@ def aggregate_helper(objects: List[Any]) -> Any:
values = [get_nested_value(json, key_path) for json in all_jsons]
return aggregate_helper(values)

def build_aggregated_results(self):
test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
def build_aggregated_results_dict(self):
"""Builds a dictionary of aggregated metrics from all test executions"""
aggregated_results = {
"op_metrics": [],
"correctness_metrics": self.aggregate_json_by_key("correctness_metrics"),
Expand Down Expand Up @@ -139,9 +145,31 @@ def build_aggregated_results(self):
op_metric[f"{metric}_rsd"] = rsd

aggregated_results["op_metrics"].append(op_metric)

# extract the necessary data from the first test execution, since the configurations should be identical for all test executions

return aggregated_results

def update_config_object(self, test_execution):
"""
Updates the configuration object with values from a test execution.
Uses the first test execution as reference since configurations should be identical
"""
current_timestamp = self.config.opts("system", "time.start")
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.names", test_execution.provision_config_instance)
self.config.add(config.Scope.applicationOverride, "system",
"env.name", test_execution.environment_name)
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp)
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_execution.pipeline)
self.config.add(config.Scope.applicationOverride, "workload", "params", test_execution.workload_params)
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.params", test_execution.provision_config_instance_params)
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_execution.plugin_params)
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_execution.latency_percentiles)
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_execution.throughput_percentiles)

def build_aggregated_results(self):
test_exe = self.test_store.find_by_test_execution_id(list(self.test_executions.keys())[0])
aggregated_results = self.build_aggregated_results_dict()

if hasattr(self.args, 'results_file') and self.args.results_file != "":
normalized_results_file = rio.normalize_path(self.args.results_file, self.cwd)
Expand All @@ -158,19 +186,7 @@ def build_aggregated_results(self):

print("Aggregate test execution ID: ", test_execution_id)

# add values to the configuration object
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.names", test_exe.provision_config_instance)
self.config.add(config.Scope.applicationOverride, "system",
"env.name", test_exe.environment_name)
self.config.add(config.Scope.applicationOverride, "system", "time.start", current_timestamp)
self.config.add(config.Scope.applicationOverride, "test_execution", "pipeline", test_exe.pipeline)
self.config.add(config.Scope.applicationOverride, "workload", "params", test_exe.workload_params)
self.config.add(config.Scope.applicationOverride, "builder",
"provision_config_instance.params", test_exe.provision_config_instance_params)
self.config.add(config.Scope.applicationOverride, "builder", "plugin.params", test_exe.plugin_params)
self.config.add(config.Scope.applicationOverride, "workload", "latency.percentiles", test_exe.latency_percentiles)
self.config.add(config.Scope.applicationOverride, "workload", "throughput.percentiles", test_exe.throughput_percentiles)
self.update_config_object(test_exe)

loaded_workload = workload.load_workload(self.config)
test_procedure_object = loaded_workload.find_test_procedure_or_default(self.test_procedure_name)
Expand Down
1 change: 1 addition & 0 deletions tests/aggregator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ def test_count_iterations_for_each_op(aggregator):
mock_test_procedure.schedule = mock_schedule
mock_workload.test_procedures = [mock_test_procedure]

mock_workload.find_test_procedure_or_default = Mock(return_value=mock_test_procedure)
mock_test_execution = Mock(test_execution_id="test1", workload_params={})

aggregator.loaded_workload = mock_workload
Expand Down

0 comments on commit f9159fd

Please sign in to comment.