-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add aggregate command #638
Add aggregate command #638
Conversation
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
95cb97e
to
ba9ae77
Compare
osbenchmark/metrics.py
Outdated
@@ -1428,7 +1428,7 @@ def as_dict(self): | |||
} | |||
} | |||
if self.results: | |||
d["results"] = self.results.as_dict() | |||
d["results"] = self.results if isinstance(self.results, dict) else self.results.as_dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering your guys thoughts on this. I figured since it needs to be a dict anyway that we can write it to accept results that are already dicts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will there be cases where self.results is already a dict? I don't think it hurts to keep it as is and reinforce it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Reverted this change
ba9ae77
to
9526f06
Compare
osbenchmark/aggregator.py
Outdated
self.accumulated_iterations: Dict[str, int] = {} | ||
|
||
# count iterations for each operation in the workload | ||
def iterations(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Might be better to rename this to be more descriptive. Something like count_iterations
or count_iterations_for_each_op
could work. If we go something descriptive like the later, we can also remove the comment above the method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another conditional: After we verify that all test execution ids have the same test procedure, we'll need to add test_procedure to the config as well so that we can use that here to collect all the operations belonging to that test procedure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An edge case: Line 20 is getting the default iterations from the workload but some users override this. For situations like this, we'll need to forgo the default iterations from the workload definition and count the iterations based on the count recorded in the results file:
"test_procedure": "big5",
"workload-params": {
"iterations": 200,
"search_clients": 3000,
"target_throughput": 3000
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Had a sync with Michael offline: For now, will check if user overrode iterations. If not, will grab default from workload (as the code is doing here). Will address cases like time-period
in future PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will also open issues to add configurable iterations to workloads, in order to handle workloads ran with custom iteration numbers as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another conditional: After we verify that all test execution ids have the same test procedure, we'll need to add test_procedure to the config as well so that we can use that here to collect all the operations belonging to that test procedure.
If we already verified the test procedures are the same, shouldn't the operations be identical as well? I'm not sure what I would do with the collected operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Might be better to rename this to be more descriptive. Something like
count_iterations
orcount_iterations_for_each_op
could work. If we go something descriptive like the later, we can also remove the comment above the method
Fixed!
for id in self.test_executions.keys(): | ||
test_execution = test_store.find_by_test_execution_id(id) | ||
if test_execution: | ||
if test_execution.workload != workload: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On top of checking workload, we should also verify if the first test execution id's test_procedure matches the rest. Reason being some workloads have multiple test_procedures.
For example, NYC Taxis has 4 test procedures (3 form default.json):https://github.com/opensearch-project/opensearch-benchmark-workloads/blob/main/nyc_taxis/test_procedures/default.json
If a user aggregated a group of test execution ids that use the same workload but differ in test procedures, we could still run into the issue of comparing different operations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a check for this👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A much needed contribution. Left some comments
osbenchmark/aggregator.py
Outdated
self.accumulated_iterations: Dict[str, int] = {} | ||
|
||
# count iterations for each operation in the workload | ||
def iterations(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another conditional: After we verify that all test execution ids have the same test procedure, we'll need to add test_procedure to the config as well so that we can use that here to collect all the operations belonging to that test procedure.
osbenchmark/aggregator.py
Outdated
self.accumulated_iterations: Dict[str, int] = {} | ||
|
||
# count iterations for each operation in the workload | ||
def iterations(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An edge case: Line 20 is getting the default iterations from the workload but some users override this. For situations like this, we'll need to forgo the default iterations from the workload definition and count the iterations based on the count recorded in the results file:
"test_procedure": "big5",
"workload-params": {
"iterations": 200,
"search_clients": 3000,
"target_throughput": 3000
}
osbenchmark/aggregator.py
Outdated
# count iterations for each operation in the workload | ||
def iterations(self) -> None: | ||
loaded_workload = workload.load_workload(self.config) | ||
for task in loaded_workload.test_procedures: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think loaded_workload.test_procedures
returns a list of test procedures instead of an instance of test procedure. If that's the case, we should update line 17 to use test_procedure
instead of task. We should also update line 18 to use operation
to task
and from task.schedule
to test_procedure.schedule
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the test_procedure matches the test_procedures from the test execution ids, we'll get those tasks / operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the test_procedure matches the test_procedures from the test execution ids, we'll get those tasks / operations
Missed this comment, will add a check for this 👍
type=non_empty_list, | ||
required=True, | ||
help="Comma-separated list of TestExecution IDs to aggregate") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should include other common options like --test-execution-id
and --results-file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added these! Let me know if I should add others as well
|
||
aggregated_results = self.build_aggregated_results(test_execution_store) | ||
file_test_exe_store = FileTestExecutionStore(self.config) | ||
file_test_exe_store.store_test_execution(aggregated_results) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple of questions:
- Will we be storing the aggregated results to ~/.benchmark/benchmarks/test_executions or to a separate directory for aggregated results?
- Will we also store this in a OSTestExecutionStore if the user has their benchmark.ini file configured to use an external metrics data store? If so, let's implement this in a separate PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, we'll store the results to the benchmarks test_executions folder, but I can add a separate folder in a future PR.
I did some testing and this does store in an OSTestExecutionStore when my benchmark.ini file is configured to use it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, sounds good!
It'd be nice to include a sample output of how this command is used in the PR description. |
b5da7f2
to
36cbcc7
Compare
Updated the description with some more detail. Let me know what you think! |
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
36cbcc7
to
9b35fcd
Compare
osbenchmark/aggregator.py
Outdated
# accumulate metrics for each task from test execution results | ||
def results(self, test_execution: Any) -> None: | ||
for test_procedure in loaded_workload.test_procedures: | ||
if test_procedure.name == self.config.opts("workload", "test_procedure.name"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be good practice to add error handling here in case test_procedure from the test executions is not found in the loaded_workload.
Even though all test iterations might have the same test procedure by this point, it's possible where users might be using a workload (e.g. a custom workload or modified official workload) where a test procedure isn't available.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this!
osbenchmark/aggregator.py
Outdated
for item in test_execution.results.get("op_metrics", []): | ||
task = item.get("task", "") | ||
self.accumulated_results.setdefault(task, {}) | ||
for metric in ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"]: | ||
for metric in self.statistics: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is clean!
osbenchmark/aggregator.py
Outdated
self.test_executions = test_executions_dict | ||
self.accumulated_results: Dict[str, Dict[str, List[Any]]] = {} | ||
self.accumulated_iterations: Dict[str, int] = {} | ||
self.statistics = ["throughput", "latency", "service_time", "client_processing_time", "processing_time", "error_rate", "duration"] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we use metrics
instead of statistics
, which would standardize on official documentation terminiology: https://opensearch.org/docs/latest/benchmark/reference/metrics/metric-keys/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, updated this name
osbenchmark/aggregator.py
Outdated
if test_execution.test_procedure != test_procedure: | ||
raise ValueError( | ||
f"Incompatible test procedure: test {id} has test procedure '{test_execution.test_procedure}'\n" | ||
f"instead of '{test_procedure}'" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: It's good that we stated what's wrong but it'd also be nice to point the user in the right direction:
f"Ensure that all test ids have the same test procedure from the same workload"
This is especially useful to inexperienced users who are not familiar with how OSB works. This can be applied to both ValueErrors in line 205 and 208-209.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added this to all the error messages
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few additional comments but this is great work and overall looks good! Thanks for doing this.
Signed-off-by: Michael Oviedo <mikeovi@amazon.com>
Description
Adds the aggregate feature, allowing users to aggregate multiple test executions into one aggregated test result. Also compatible with other features such as
compare
Usage
To aggregate multiple test executions, you can use the aggregate command like so:
opensearch-benchmark aggregate --test-executions=<test_execution_id1>,<test_execution_id2>,...
Sample output:
The results will then be aggregated into one test execution and stored under the ID shown.
Additional feature flags:
--test-execution-id
: Define a unique ID for the aggregated test execution.--results-file
: Write the aggregated results to the provided file.--workload-repository
: Define the repository from where OSB will load workloads (default: default).Issues Resolved
#629 #630
Testing
Tested using
make it
andmake test
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.