diff --git a/benchmarks/osb/README.md b/benchmarks/osb/README.md
index 95b48edfb..92272e20b 100644
--- a/benchmarks/osb/README.md
+++ b/benchmarks/osb/README.md
@@ -56,13 +56,13 @@ After all of this completes, you should be ready to run your first benchmark!
### Running a benchmark
Before running a benchmark, make sure you have the endpoint of your cluster and
- and the machine you are running the benchmarks from can access it.
+ the machine you are running the benchmarks from can access it.
Additionally, ensure that all data has been pulled to the client.
Currently, we support 2 test procedures for the k-NN workload: train-test and
no-train-test. The train test has steps to train a model included in the
schedule, while no train does not. Both test procedures will index a data set
-of vectors into an OpenSearch index.
+of vectors into an OpenSearch index and then run a set of queries against them.
Once you have decided which test procedure you want to use, open up
[params/train-params.json](params/train-params.json) or
@@ -102,97 +102,124 @@ use an algorithm that requires training.
3. Wait for cluster to be green
4. Ingest data set into the cluster
5. Refresh the index
+6. Run queries from data set against the cluster
#### Parameters
-| Name | Description |
-|-----------------------------------------|--------------------------------------------------------------------------|
-| target_index_name | Name of index to add vectors to |
-| target_field_name | Name of field to add vectors to |
-| target_index_body | Path to target index definition |
-| target_index_primary_shards | Target index primary shards |
-| target_index_replica_shards | Target index replica shards |
-| target_index_dimension | Dimension of target index |
-| target_index_space_type | Target index space type |
-| target_index_bulk_size | Target index bulk size |
-| target_index_bulk_index_data_set_format | Format of vector data set |
-| target_index_bulk_index_data_set_path | Path to vector data set |
-| target_index_bulk_index_clients | Clients to be used for bulk ingestion (must be divisor of data set size) |
-| hnsw_ef_search | HNSW ef search parameter |
-| hnsw_ef_construction | HNSW ef construction parameter |
-| hnsw_m | HNSW m parameter |
+| Name | Description |
+|-----------------------------------------|----------------------------------------------------------------------------------|
+| target_index_name | Name of index to add vectors to |
+| target_field_name | Name of field to add vectors to |
+| target_index_body | Path to target index definition |
+| target_index_primary_shards | Target index primary shards |
+| target_index_replica_shards | Target index replica shards |
+| target_index_dimension | Dimension of target index |
+| target_index_space_type | Target index space type |
+| target_index_bulk_size | Target index bulk size |
+| target_index_bulk_index_data_set_format | Format of vector data set |
+| target_index_bulk_index_data_set_path | Path to vector data set |
+| target_index_bulk_index_clients | Clients to be used for bulk ingestion (must be divisor of data set size) |
+| hnsw_ef_search | HNSW ef search parameter |
+| hnsw_ef_construction | HNSW ef construction parameter |
+| hnsw_m | HNSW m parameter |
+| query_k | The number of neighbors to return for the search |
+| query_clients | Number of clients to use for running queries |
+| query_data_set_format | Format of vector data set for queries |
+| query_data_set_path | Path to vector data set for queries |
#### Metrics
The result metrics of this procedure will look like:
```
-|---------------------------------------------------------------:|---------------------:|----------:|-------:|
-| Cumulative indexing time of primary shards | | 2.36965 | min |
-| Min cumulative indexing time across primary shards | | 0.0923333 | min |
-| Median cumulative indexing time across primary shards | | 0.732892 | min |
-| Max cumulative indexing time across primary shards | | 0.811533 | min |
-| Cumulative indexing throttle time of primary shards | | 0 | min |
-| Min cumulative indexing throttle time across primary shards | | 0 | min |
-| Median cumulative indexing throttle time across primary shards | | 0 | min |
-| Max cumulative indexing throttle time across primary shards | | 0 | min |
-| Cumulative merge time of primary shards | | 1.70392 | min |
-| Cumulative merge count of primary shards | | 13 | |
-| Min cumulative merge time across primary shards | | 0.0028 | min |
-| Median cumulative merge time across primary shards | | 0.538375 | min |
-| Max cumulative merge time across primary shards | | 0.624367 | min |
-| Cumulative merge throttle time of primary shards | | 0.407467 | min |
-| Min cumulative merge throttle time across primary shards | | 0 | min |
-| Median cumulative merge throttle time across primary shards | | 0.131758 | min |
-| Max cumulative merge throttle time across primary shards | | 0.14395 | min |
-| Cumulative refresh time of primary shards | | 1.01585 | min |
-| Cumulative refresh count of primary shards | | 55 | |
-| Min cumulative refresh time across primary shards | | 0.0084 | min |
-| Median cumulative refresh time across primary shards | | 0.330733 | min |
-| Max cumulative refresh time across primary shards | | 0.345983 | min |
-| Cumulative flush time of primary shards | | 0 | min |
-| Cumulative flush count of primary shards | | 0 | |
-| Min cumulative flush time across primary shards | | 0 | min |
-| Median cumulative flush time across primary shards | | 0 | min |
-| Max cumulative flush time across primary shards | | 0 | min |
-| Total Young Gen GC time | | 0.218 | s |
-| Total Young Gen GC count | | 5 | |
-| Total Old Gen GC time | | 0 | s |
-| Total Old Gen GC count | | 0 | |
-| Store size | | 3.18335 | GB |
-| Translog size | | 1.29415 | GB |
-| Heap used for segments | | 0.100433 | MB |
-| Heap used for doc values | | 0.0101166 | MB |
-| Heap used for terms | | 0.0339661 | MB |
-| Heap used for norms | | 0 | MB |
-| Heap used for points | | 0 | MB |
-| Heap used for stored fields | | 0.0563507 | MB |
-| Segment count | | 84 | |
-| Min Throughput | custom-vector-bulk | 32004.5 | docs/s |
-| Mean Throughput | custom-vector-bulk | 40288.7 | docs/s |
-| Median Throughput | custom-vector-bulk | 36826.6 | docs/s |
-| Max Throughput | custom-vector-bulk | 89105.4 | docs/s |
-| 50th percentile latency | custom-vector-bulk | 21.4377 | ms |
-| 90th percentile latency | custom-vector-bulk | 37.6029 | ms |
-| 99th percentile latency | custom-vector-bulk | 822.604 | ms |
-| 99.9th percentile latency | custom-vector-bulk | 1396.8 | ms |
-| 100th percentile latency | custom-vector-bulk | 1751.85 | ms |
-| 50th percentile service time | custom-vector-bulk | 21.4377 | ms |
-| 90th percentile service time | custom-vector-bulk | 37.6029 | ms |
-| 99th percentile service time | custom-vector-bulk | 822.604 | ms |
-| 99.9th percentile service time | custom-vector-bulk | 1396.8 | ms |
-| 100th percentile service time | custom-vector-bulk | 1751.85 | ms |
-| error rate | custom-vector-bulk | 0 | % |
-| Min Throughput | refresh-target-index | 0.04 | ops/s |
-| Mean Throughput | refresh-target-index | 0.04 | ops/s |
-| Median Throughput | refresh-target-index | 0.04 | ops/s |
-| Max Throughput | refresh-target-index | 0.04 | ops/s |
-| 100th percentile latency | refresh-target-index | 23522.6 | ms |
-| 100th percentile service time | refresh-target-index | 23522.6 | ms |
-| error rate | refresh-target-index | 0 | % |
+------------------------------------------------------
+ _______ __ _____
+ / ____(_)___ ____ _/ / / ___/_________ ________
+ / /_ / / __ \/ __ `/ / \__ \/ ___/ __ \/ ___/ _ \
+ / __/ / / / / / /_/ / / ___/ / /__/ /_/ / / / __/
+/_/ /_/_/ /_/\__,_/_/ /____/\___/\____/_/ \___/
+------------------------------------------------------
+
+| Metric | Task | Value | Unit |
+|---------------------------------------------------------------:|------------------------:|------------:|-------:|
+| Cumulative indexing time of primary shards | | 0.00173333 | min |
+| Min cumulative indexing time across primary shards | | 0 | min |
+| Median cumulative indexing time across primary shards | | 0 | min |
+| Max cumulative indexing time across primary shards | | 0.000616667 | min |
+| Cumulative indexing throttle time of primary shards | | 0 | min |
+| Min cumulative indexing throttle time across primary shards | | 0 | min |
+| Median cumulative indexing throttle time across primary shards | | 0 | min |
+| Max cumulative indexing throttle time across primary shards | | 0 | min |
+| Cumulative merge time of primary shards | | 0 | min |
+| Cumulative merge count of primary shards | | 0 | |
+| Min cumulative merge time across primary shards | | 0 | min |
+| Median cumulative merge time across primary shards | | 0 | min |
+| Max cumulative merge time across primary shards | | 0 | min |
+| Cumulative merge throttle time of primary shards | | 0 | min |
+| Min cumulative merge throttle time across primary shards | | 0 | min |
+| Median cumulative merge throttle time across primary shards | | 0 | min |
+| Max cumulative merge throttle time across primary shards | | 0 | min |
+| Cumulative refresh time of primary shards | | 0.00271667 | min |
+| Cumulative refresh count of primary shards | | 115 | |
+| Min cumulative refresh time across primary shards | | 0 | min |
+| Median cumulative refresh time across primary shards | | 0 | min |
+| Max cumulative refresh time across primary shards | | 0.00135 | min |
+| Cumulative flush time of primary shards | | 0 | min |
+| Cumulative flush count of primary shards | | 43 | |
+| Min cumulative flush time across primary shards | | 0 | min |
+| Median cumulative flush time across primary shards | | 0 | min |
+| Max cumulative flush time across primary shards | | 0 | min |
+| Total Young Gen GC time | | 0.849 | s |
+| Total Young Gen GC count | | 20 | |
+| Total Old Gen GC time | | 0 | s |
+| Total Old Gen GC count | | 0 | |
+| Store size | | 0.647921 | GB |
+| Translog size | | 0.00247511 | GB |
+| Heap used for segments | | 0.284451 | MB |
+| Heap used for doc values | | 0.0872688 | MB |
+| Heap used for terms | | 0.0714417 | MB |
+| Heap used for norms | | 6.10352e-05 | MB |
+| Heap used for points | | 0 | MB |
+| Heap used for stored fields | | 0.125679 | MB |
+| Segment count | | 257 | |
+| Min Throughput | custom-vector-bulk | 18018.5 | docs/s |
+| Mean Throughput | custom-vector-bulk | 18018.5 | docs/s |
+| Median Throughput | custom-vector-bulk | 18018.5 | docs/s |
+| Max Throughput | custom-vector-bulk | 18018.5 | docs/s |
+| 50th percentile latency | custom-vector-bulk | 98.5565 | ms |
+| 90th percentile latency | custom-vector-bulk | 100.033 | ms |
+| 100th percentile latency | custom-vector-bulk | 103.792 | ms |
+| 50th percentile service time | custom-vector-bulk | 98.5565 | ms |
+| 90th percentile service time | custom-vector-bulk | 100.033 | ms |
+| 100th percentile service time | custom-vector-bulk | 103.792 | ms |
+| error rate | custom-vector-bulk | 0 | % |
+| Min Throughput | refresh-target-index | 76.22 | ops/s |
+| Mean Throughput | refresh-target-index | 76.22 | ops/s |
+| Median Throughput | refresh-target-index | 76.22 | ops/s |
+| Max Throughput | refresh-target-index | 76.22 | ops/s |
+| 100th percentile latency | refresh-target-index | 12.7619 | ms |
+| 100th percentile service time | refresh-target-index | 12.7619 | ms |
+| error rate | refresh-target-index | 0 | % |
+| Min Throughput | knn-query-from-data-set | 1587.47 | ops/s |
+| Mean Throughput | knn-query-from-data-set | 1649.97 | ops/s |
+| Median Throughput | knn-query-from-data-set | 1661.79 | ops/s |
+| Max Throughput | knn-query-from-data-set | 1677.06 | ops/s |
+| 50th percentile latency | knn-query-from-data-set | 4.79125 | ms |
+| 90th percentile latency | knn-query-from-data-set | 5.38 | ms |
+| 99th percentile latency | knn-query-from-data-set | 46.8965 | ms |
+| 99.9th percentile latency | knn-query-from-data-set | 58.2049 | ms |
+| 99.99th percentile latency | knn-query-from-data-set | 59.6476 | ms |
+| 100th percentile latency | knn-query-from-data-set | 60.9245 | ms |
+| 50th percentile service time | knn-query-from-data-set | 4.79125 | ms |
+| 90th percentile service time | knn-query-from-data-set | 5.38 | ms |
+| 99th percentile service time | knn-query-from-data-set | 46.8965 | ms |
+| 99.9th percentile service time | knn-query-from-data-set | 58.2049 | ms |
+| 99.99th percentile service time | knn-query-from-data-set | 59.6476 | ms |
+| 100th percentile service time | knn-query-from-data-set | 60.9245 | ms |
+| error rate | knn-query-from-data-set | 0 | % |
--------------------------------
-[INFO] SUCCESS (took 76 seconds)
+[INFO] SUCCESS (took 46 seconds)
--------------------------------
```
@@ -212,41 +239,45 @@ algorithm that requires training.
7. Create an OpenSearch index with `knn_vector` configured to use the model
8. Ingest vectors into the target index
9. Refresh the target index
+10. Run queries from data set against the cluster
#### Parameters
-| Name | Description |
-|-----------------------------------------|--------------------------------------------------------------------------|
-| target_index_name | Name of index to add vectors to |
-| target_field_name | Name of field to add vectors to |
-| target_index_body | Path to target index definition |
-| target_index_primary_shards | Target index primary shards |
-| target_index_replica_shards | Target index replica shards |
-| target_index_dimension | Dimension of target index |
-| target_index_space_type | Target index space type |
-| target_index_bulk_size | Target index bulk size |
-| target_index_bulk_index_data_set_format | Format of vector data set |
-| target_index_bulk_index_data_set_path | Path to vector data set |
-| target_index_bulk_index_clients | Clients to be used for bulk ingestion (must be divisor of data set size) |
-| ivf_nlists | IVF nlist parameter |
-| ivf_nprobes | IVF nprobe parameter |
-| pq_code_size | PQ code_size parameter |
-| pq_m | PQ m parameter |
-| train_model_method | Method to be used for model (ivf or ivfpq) |
-| train_model_id | Model ID |
-| train_index_name | Name of index to put training data into |
-| train_field_name | Name of field to put training data into |
-| train_index_body | Path to train index definition |
-| train_search_size | Search size to use when pulling training data |
-| train_timeout | Timeout to wait for training to finish |
-| train_index_primary_shards | Train index primary shards |
-| train_index_replica_shards | Train index replica shards |
-| train_index_bulk_size | Train index bulk size |
-| train_index_data_set_format | Format of vector data set |
-| train_index_data_set_path | Path to vector data set |
-| train_index_num_vectors | Number of vectors to use from vector data set for training |
-| train_index_bulk_index_clients | Clients to be used for bulk ingestion (must be divisor of data set size) |
-
+| Name | Description |
+|-----------------------------------------|----------------------------------------------------------------------------------|
+| target_index_name | Name of index to add vectors to |
+| target_field_name | Name of field to add vectors to |
+| target_index_body | Path to target index definition |
+| target_index_primary_shards | Target index primary shards |
+| target_index_replica_shards | Target index replica shards |
+| target_index_dimension | Dimension of target index |
+| target_index_space_type | Target index space type |
+| target_index_bulk_size | Target index bulk size |
+| target_index_bulk_index_data_set_format | Format of vector data set for ingestion |
+| target_index_bulk_index_data_set_path | Path to vector data set for ingestion |
+| target_index_bulk_index_clients | Clients to be used for bulk ingestion (must be divisor of data set size) |
+| ivf_nlists | IVF nlist parameter |
+| ivf_nprobes | IVF nprobe parameter |
+| pq_code_size | PQ code_size parameter |
+| pq_m | PQ m parameter |
+| train_model_method | Method to be used for model (ivf or ivfpq) |
+| train_model_id | Model ID |
+| train_index_name | Name of index to put training data into |
+| train_field_name | Name of field to put training data into |
+| train_index_body | Path to train index definition |
+| train_search_size | Search size to use when pulling training data |
+| train_timeout | Timeout to wait for training to finish |
+| train_index_primary_shards | Train index primary shards |
+| train_index_replica_shards | Train index replica shards |
+| train_index_bulk_size | Train index bulk size |
+| train_index_data_set_format | Format of vector data set for training |
+| train_index_data_set_path | Path to vector data set for training |
+| train_index_num_vectors | Number of vectors to use from vector data set for training |
+| train_index_bulk_index_clients | Clients to be used for bulk ingestion (must be divisor of data set size) |
+| query_k | The number of neighbors to return for the search |
+| query_clients | Number of clients to use for running queries |
+| query_data_set_format | Format of vector data set for queries |
+| query_data_set_path | Path to vector data set for queries |
#### Metrics
@@ -255,108 +286,131 @@ The result metrics of this procedure will look like:
------------------------------------------------------
_______ __ _____
/ ____(_)___ ____ _/ / / ___/_________ ________
- / /_ / / __ \/ __ `/ / \__ \/ ___/ __ \/ ___/ _ \
+ / /_ / / __ \/ __ `/ / \__ \/ ___/ __ \/ ___/ _ \ [63/1855]
/ __/ / / / / / /_/ / / ___/ / /__/ /_/ / / / __/
/_/ /_/_/ /_/\__,_/_/ /____/\___/\____/_/ \___/
------------------------------------------------------
-
-| Metric | Task | Value | Unit |
-|---------------------------------------------------------------:|---------------------:|-----------:|-----------------:|
-| Cumulative indexing time of primary shards | | 1.08917 | min |
-| Min cumulative indexing time across primary shards | | 0.0923333 | min |
-| Median cumulative indexing time across primary shards | | 0.328675 | min |
-| Max cumulative indexing time across primary shards | | 0.339483 | min |
-| Cumulative indexing throttle time of primary shards | | 0 | min |
-| Min cumulative indexing throttle time across primary shards | | 0 | min |
-| Median cumulative indexing throttle time across primary shards | | 0 | min |
-| Max cumulative indexing throttle time across primary shards | | 0 | min |
-| Cumulative merge time of primary shards | | 0.44465 | min |
-| Cumulative merge count of primary shards | | 19 | |
-| Min cumulative merge time across primary shards | | 0.0028 | min |
-| Median cumulative merge time across primary shards | | 0.145408 | min |
-| Max cumulative merge time across primary shards | | 0.151033 | min |
-| Cumulative merge throttle time of primary shards | | 0.295033 | min |
-| Min cumulative merge throttle time across primary shards | | 0 | min |
-| Median cumulative merge throttle time across primary shards | | 0.0973167 | min |
-| Max cumulative merge throttle time across primary shards | | 0.1004 | min |
-| Cumulative refresh time of primary shards | | 0.07955 | min |
-| Cumulative refresh count of primary shards | | 67 | |
-| Min cumulative refresh time across primary shards | | 0.0084 | min |
-| Median cumulative refresh time across primary shards | | 0.022725 | min |
-| Max cumulative refresh time across primary shards | | 0.0257 | min |
-| Cumulative flush time of primary shards | | 0 | min |
-| Cumulative flush count of primary shards | | 0 | |
-| Min cumulative flush time across primary shards | | 0 | min |
-| Median cumulative flush time across primary shards | | 0 | min |
-| Max cumulative flush time across primary shards | | 0 | min |
-| Total Young Gen GC time | | 0.034 | s |
-| Total Young Gen GC count | | 6 | |
-| Total Old Gen GC time | | 0 | s |
-| Total Old Gen GC count | | 0 | |
-| Store size | | 1.81242 | GB |
-| Heap used for points | | 0 | MB |
-| Heap used for stored fields | | 0.041626 | MB |
-| Segment count | | 62 | |
-| Min Throughput | delete-model | 33.25 | ops/s |
-| Mean Throughput | delete-model | 33.25 | ops/s |
-| Median Throughput | delete-model | 33.25 | ops/s |
-| Max Throughput | delete-model | 33.25 | ops/s |
-| 100th percentile latency | delete-model | 29.6471 | ms |
-| 100th percentile service time | delete-model | 29.6471 | ms |
-| error rate | delete-model | 0 | % |
-| Min Throughput | train-vector-bulk | 78682.2 | docs/s |
-| Mean Throughput | train-vector-bulk | 78682.2 | docs/s |
-| Median Throughput | train-vector-bulk | 78682.2 | docs/s |
-| Max Throughput | train-vector-bulk | 78682.2 | docs/s |
-| 50th percentile latency | train-vector-bulk | 16.4609 | ms |
-| 90th percentile latency | train-vector-bulk | 21.8225 | ms |
-| 99th percentile latency | train-vector-bulk | 117.632 | ms |
-| 100th percentile latency | train-vector-bulk | 237.021 | ms |
-| 50th percentile service time | train-vector-bulk | 16.4609 | ms |
-| 90th percentile service time | train-vector-bulk | 21.8225 | ms |
-| 99th percentile service time | train-vector-bulk | 117.632 | ms |
-| 100th percentile service time | train-vector-bulk | 237.021 | ms |
-| error rate | train-vector-bulk | 0 | % |
-| Min Throughput | refresh-train-index | 149.22 | ops/s |
-| Mean Throughput | refresh-train-index | 149.22 | ops/s |
-| Median Throughput | refresh-train-index | 149.22 | ops/s |
-| Max Throughput | refresh-train-index | 149.22 | ops/s |
-| 100th percentile latency | refresh-train-index | 6.35862 | ms |
-| 100th percentile service time | refresh-train-index | 6.35862 | ms |
-| error rate | refresh-train-index | 0 | % |
-| Min Throughput | ivfpq-train-model | 0.04 | models_trained/s |
-| Mean Throughput | ivfpq-train-model | 0.04 | models_trained/s |
-| Median Throughput | ivfpq-train-model | 0.04 | models_trained/s |
-| Max Throughput | ivfpq-train-model | 0.04 | models_trained/s |
-| 100th percentile latency | ivfpq-train-model | 28123 | ms |
-| 100th percentile service time | ivfpq-train-model | 28123 | ms |
-| error rate | ivfpq-train-model | 0 | % |
-| Min Throughput | custom-vector-bulk | 71222.6 | docs/s |
-| Mean Throughput | custom-vector-bulk | 79465.5 | docs/s |
-| Median Throughput | custom-vector-bulk | 77764.4 | docs/s |
-| Max Throughput | custom-vector-bulk | 90646.3 | docs/s |
-| 50th percentile latency | custom-vector-bulk | 14.5099 | ms |
-| 90th percentile latency | custom-vector-bulk | 18.1755 | ms |
-| 99th percentile latency | custom-vector-bulk | 123.359 | ms |
-| 99.9th percentile latency | custom-vector-bulk | 171.928 | ms |
-| 100th percentile latency | custom-vector-bulk | 216.383 | ms |
-| 50th percentile service time | custom-vector-bulk | 14.5099 | ms |
-| 90th percentile service time | custom-vector-bulk | 18.1755 | ms |
-| 99th percentile service time | custom-vector-bulk | 123.359 | ms |
-| 99.9th percentile service time | custom-vector-bulk | 171.928 | ms |
-| 100th percentile service time | custom-vector-bulk | 216.383 | ms |
-| error rate | custom-vector-bulk | 0 | % |
-| Min Throughput | refresh-target-index | 64.45 | ops/s |
-| Mean Throughput | refresh-target-index | 64.45 | ops/s |
-| Median Throughput | refresh-target-index | 64.45 | ops/s |
-| Max Throughput | refresh-target-index | 64.45 | ops/s |
-| 100th percentile latency | refresh-target-index | 15.177 | ms |
-| 100th percentile service time | refresh-target-index | 15.177 | ms |
-| error rate | refresh-target-index | 0 | % |
+| Metric | Task | Value | Unit |
+|---------------------------------------------------------------:|------------------------:|------------:|-----------------:|
+| Cumulative indexing time of primary shards | | 2.92355 | min |
+| Min cumulative indexing time across primary shards | | 0 | min |
+| Median cumulative indexing time across primary shards | | 0.497817 | min |
+| Max cumulative indexing time across primary shards | | 1.37717 | min |
+| Cumulative indexing throttle time of primary shards | | 0 | min |
+| Min cumulative indexing throttle time across primary shards | | 0 | min |
+| Median cumulative indexing throttle time across primary shards | | 0 | min |
+| Max cumulative indexing throttle time across primary shards | | 0 | min |
+| Cumulative merge time of primary shards | | 1.34895 | min |
+| Cumulative merge count of primary shards | | 39 | |
+| Min cumulative merge time across primary shards | | 0 | min |
+| Median cumulative merge time across primary shards | | 0.292033 | min |
+| Max cumulative merge time across primary shards | | 0.6268 | min |
+| Cumulative merge throttle time of primary shards | | 0.62845 | min |
+| Min cumulative merge throttle time across primary shards | | 0 | min |
+| Median cumulative merge throttle time across primary shards | | 0.155617 | min |
+| Max cumulative merge throttle time across primary shards | | 0.290117 | min |
+| Cumulative refresh time of primary shards | | 0.369433 | min |
+| Cumulative refresh count of primary shards | | 96 | |
+| Min cumulative refresh time across primary shards | | 0 | min |
+| Median cumulative refresh time across primary shards | | 0.0903833 | min |
+| Max cumulative refresh time across primary shards | | 0.10365 | min |
+| Cumulative flush time of primary shards | | 0.0278667 | min |
+| Cumulative flush count of primary shards | | 2 | |
+| Min cumulative flush time across primary shards | | 0 | min |
+| Median cumulative flush time across primary shards | | 0 | min |
+| Max cumulative flush time across primary shards | | 0.0278667 | min |
+| Total Young Gen GC time | | 13.106 | s |
+| Total Young Gen GC count | | 263 | |
+| Total Old Gen GC time | | 0 | s |
+| Total Old Gen GC count | | 0 | |
+| Store size | | 2.60183 | GB |
+| Translog size | | 1.34787 | GB |
+| Heap used for segments | | 0.0646248 | MB |
+| Heap used for doc values | | 0.00899887 | MB |
+| Heap used for terms | | 0.0203552 | MB |
+| Heap used for norms | | 6.10352e-05 | MB |
+| Heap used for points | | 0 | MB |
+| Heap used for stored fields | | 0.0352097 | MB |
+| Segment count | | 71 | |
+| Min Throughput | delete-model | 10.55 | ops/s |
+| Mean Throughput | delete-model | 10.55 | ops/s |
+| Median Throughput | delete-model | 10.55 | ops/s |
+| Max Throughput | delete-model | 10.55 | ops/s |
+| 100th percentile latency | delete-model | 94.4726 | ms |
+| 100th percentile service time | delete-model | 94.4726 | ms |
+| error rate | delete-model | 0 | % |
+| Min Throughput | train-vector-bulk | 44763.1 | docs/s |
+| Mean Throughput | train-vector-bulk | 52022.4 | docs/s |
+| Median Throughput | train-vector-bulk | 52564.8 | docs/s |
+| Max Throughput | train-vector-bulk | 53833 | docs/s |
+| 50th percentile latency | train-vector-bulk | 22.3364 | ms |
+| 90th percentile latency | train-vector-bulk | 47.799 | ms |
+| 99th percentile latency | train-vector-bulk | 195.954 | ms |
+| 99.9th percentile latency | train-vector-bulk | 495.217 | ms |
+| 100th percentile latency | train-vector-bulk | 663.48 | ms |
+| 50th percentile service time | train-vector-bulk | 22.3364 | ms |
+| 90th percentile service time | train-vector-bulk | 47.799 | ms |
+| 99th percentile service time | train-vector-bulk | 195.954 | ms |
+| 99.9th percentile service time | train-vector-bulk | 495.217 | ms |
+| 100th percentile service time | train-vector-bulk | 663.48 | ms |
+| error rate | train-vector-bulk | 0 | % |
+| Min Throughput | refresh-train-index | 0.98 | ops/s |
+| Mean Throughput | refresh-train-index | 0.98 | ops/s |
+| Median Throughput | refresh-train-index | 0.98 | ops/s |
+| Max Throughput | refresh-train-index | 0.98 | ops/s |
+| 100th percentile latency | refresh-train-index | 1019.54 | ms |
+| 100th percentile service time | refresh-train-index | 1019.54 | ms |
+| error rate | refresh-train-index | 0 | % |
+| Min Throughput | ivfpq-train-model | 0.01 | models_trained/s |
+| Mean Throughput | ivfpq-train-model | 0.01 | models_trained/s |
+| Median Throughput | ivfpq-train-model | 0.01 | models_trained/s |
+| Max Throughput | ivfpq-train-model | 0.01 | models_trained/s |
+| 100th percentile latency | ivfpq-train-model | 150952 | ms |
+| 100th percentile service time | ivfpq-train-model | 150952 | ms |
+| error rate | ivfpq-train-model | 0 | % |
+| Min Throughput | custom-vector-bulk | 32367.4 | docs/s |
+| Mean Throughput | custom-vector-bulk | 36027.5 | docs/s |
+| Median Throughput | custom-vector-bulk | 35276.7 | docs/s |
+| Max Throughput | custom-vector-bulk | 41095 | docs/s |
+| 50th percentile latency | custom-vector-bulk | 22.2419 | ms |
+| 90th percentile latency | custom-vector-bulk | 70.163 | ms |
+| 99th percentile latency | custom-vector-bulk | 308.395 | ms |
+| 99.9th percentile latency | custom-vector-bulk | 548.558 | ms |
+| 100th percentile latency | custom-vector-bulk | 655.628 | ms |
+| 50th percentile service time | custom-vector-bulk | 22.2419 | ms |
+| 90th percentile service time | custom-vector-bulk | 70.163 | ms |
+| 99th percentile service time | custom-vector-bulk | 308.395 | ms |
+| 99.9th percentile service time | custom-vector-bulk | 548.558 | ms |
+| 100th percentile service time | custom-vector-bulk | 655.628 | ms |
+| error rate | custom-vector-bulk | 0 | % |
+| Min Throughput | refresh-target-index | 0.23 | ops/s |
+| Mean Throughput | refresh-target-index | 0.23 | ops/s |
+| Median Throughput | refresh-target-index | 0.23 | ops/s |
+| Max Throughput | refresh-target-index | 0.23 | ops/s |
+| 100th percentile latency | refresh-target-index | 4331.17 | ms |
+| 100th percentile service time | refresh-target-index | 4331.17 | ms |
+| error rate | refresh-target-index | 0 | % |
+| Min Throughput | knn-query-from-data-set | 455.19 | ops/s |
+| Mean Throughput | knn-query-from-data-set | 511.74 | ops/s |
+| Median Throughput | knn-query-from-data-set | 510.85 | ops/s |
+| Max Throughput | knn-query-from-data-set | 570.07 | ops/s |
+| 50th percentile latency | knn-query-from-data-set | 14.1626 | ms |
+| 90th percentile latency | knn-query-from-data-set | 30.2389 | ms |
+| 99th percentile latency | knn-query-from-data-set | 71.2793 | ms |
+| 99.9th percentile latency | knn-query-from-data-set | 104.733 | ms |
+| 99.99th percentile latency | knn-query-from-data-set | 127.298 | ms |
+| 100th percentile latency | knn-query-from-data-set | 145.229 | ms |
+| 50th percentile service time | knn-query-from-data-set | 14.1626 | ms |
+| 90th percentile service time | knn-query-from-data-set | 30.2389 | ms |
+| 99th percentile service time | knn-query-from-data-set | 71.2793 | ms |
+| 99.9th percentile service time | knn-query-from-data-set | 104.733 | ms |
+| 99.99th percentile service time | knn-query-from-data-set | 127.298 | ms |
+| 100th percentile service time | knn-query-from-data-set | 145.229 | ms |
+| error rate | knn-query-from-data-set | 0 | % |
---------------------------------
-[INFO] SUCCESS (took 108 seconds)
+[INFO] SUCCESS (took 295 seconds)
---------------------------------
```
@@ -381,9 +435,10 @@ the operations against OpenSearch.
Custom parameter sources are defined in [extensions/param_sources.py](extensions/param_sources.py).
-| Name | Description | Parameters |
-|--------------------|------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| bulk-from-data-set | Provides bulk payloads containing vectors from a data set for indexing | 1. data_set_format - (hdf5, bigann)
2. data_set_path - path to data set
3. index - name of index for bulk ingestion
4. field - field to place vector in
5. bulk_size - vectors per bulk request |
+| Name | Description | Parameters |
+|-------------------------|------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| bulk-from-data-set | Provides bulk payloads containing vectors from a data set for indexing | 1. data_set_format - (hdf5, bigann)
2. data_set_path - path to data set
3. index - name of index for bulk ingestion
4. field - field to place vector in
5. bulk_size - vectors per bulk request
6. num_vectors - number of vectors to use from the data set. Defaults to the whole data set. |
+| knn-query-from-data-set | Provides a query generated from a data set | 1. data_set_format - (hdf5, bigann)
2. data_set_path - path to data set
3. index - name of index to query against
4. field - field to to query against
5. k - number of results to return
6. dimension - size of vectors to produce
7. num_vectors - number of vectors to use from the data set. Defaults to the whole data set. |
### Custom Runners
@@ -397,3 +452,17 @@ Custom runners are defined in [extensions/runners.py](extensions/runners.py).
| train-model | Trains a model. | 1. body - model definition
2. timeout - time to wait for model to finish
3. model_id - ID of model |
| delete-model | Deletes a model if it exists. | 1. model_id - ID of model |
+### Testing
+
+We have a set of unit tests for our extensions in
+[tests](tests). To run all the tests, run the following
+command:
+
+```commandline
+python -m unittest discover ./tests
+```
+
+To run an individual test:
+```commandline
+python -m unittest tests.test_param_sources.VectorsFromDataSetParamSourceTestCase.test_partition_hdf5
+```
diff --git a/benchmarks/osb/extensions/data_set.py b/benchmarks/osb/extensions/data_set.py
index 4feb4c2e7..7e8058844 100644
--- a/benchmarks/osb/extensions/data_set.py
+++ b/benchmarks/osb/extensions/data_set.py
@@ -61,7 +61,7 @@ class HDF5DataSet(DataSet):
def __init__(self, dataset_path: str, context: Context):
file = h5py.File(dataset_path)
- self.data = cast(h5py.Dataset, file[self._parse_context(context)])
+ self.data = cast(h5py.Dataset, file[self.parse_context(context)])
self.current = self.BEGINNING
def read(self, chunk_size: int):
@@ -93,7 +93,7 @@ def reset(self):
self.current = self.BEGINNING
@staticmethod
- def _parse_context(context: Context) -> str:
+ def parse_context(context: Context) -> str:
if context == Context.NEIGHBORS:
return "neighbors"
@@ -176,6 +176,9 @@ def reset(self):
self.file.seek(BigANNVectorDataSet.DATA_SET_HEADER_LENGTH)
self.current = BigANNVectorDataSet.BEGINNING
+ def __del__(self):
+ self.file.close()
+
@staticmethod
def _get_data_size(file_name):
ext = file_name.split('.')[-1]
diff --git a/benchmarks/osb/extensions/param_sources.py b/benchmarks/osb/extensions/param_sources.py
index 36bf21f33..040490317 100644
--- a/benchmarks/osb/extensions/param_sources.py
+++ b/benchmarks/osb/extensions/param_sources.py
@@ -4,6 +4,7 @@
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
import copy
+from abc import ABC, abstractmethod
from .data_set import Context, HDF5DataSet, DataSet, BigANNVectorDataSet
from .util import bulk_transform, parse_string_parameter, parse_int_parameter, \
@@ -15,17 +16,42 @@ def register(registry):
"bulk-from-data-set", BulkVectorsFromDataSetParamSource
)
+ registry.register_param_source(
+ "knn-query-from-data-set", QueryVectorsFromDataSetParamSource
+ )
-class BulkVectorsFromDataSetParamSource:
- def __init__(self, workload, params, **kwargs):
+
+class VectorsFromDataSetParamSource(ABC):
+ """ Abstract class that can read vectors from a data set and partition the
+ vectors across multiple clients.
+
+ Attributes:
+ index_name: Name of the index to generate the query for
+ field_name: Name of the field to generate the query for
+ data_set_format: Format data set is serialized with. bigann or hdf5
+ data_set_path: Path to data set
+ context: Context the data set will be used in.
+ data_set: Structure containing meta data about data and ability to read
+ num_vectors: Number of vectors to use from the data set
+ total: Number of vectors for the partition
+ current: Current vector offset in data set
+ infinite: Property of param source signalling that it can be exhausted
+ percent_completed: Progress indicator for how exhausted data set is
+ offset: Offset into the data set to start at. Relevant when there are
+ multiple partitions
+ """
+
+ def __init__(self, params, context: Context):
+ self.index_name: str = parse_string_parameter("index", params)
+ self.field_name: str = parse_string_parameter("field", params)
+
+ self.context = context
self.data_set_format = parse_string_parameter("data_set_format", params)
self.data_set_path = parse_string_parameter("data_set_path", params)
- self.data_set: DataSet = self._read_data_set()
+ self.data_set: DataSet = self._read_data_set(self.data_set_format,
+ self.data_set_path,
+ self.context)
- self.field_name: str = parse_string_parameter("field", params)
- self.index_name: str = parse_string_parameter("index", params)
- self.bulk_size: int = parse_int_parameter("bulk_size", params)
- self.retries: int = parse_int_parameter("retries", params, 10)
self.num_vectors: int = parse_int_parameter(
"num_vectors", params, self.data_set.size()
)
@@ -35,29 +61,143 @@ def __init__(self, workload, params, **kwargs):
self.percent_completed = 0
self.offset = 0
- def _read_data_set(self):
- if self.data_set_format == HDF5DataSet.FORMAT_NAME:
- return HDF5DataSet(self.data_set_path, Context.INDEX)
- if self.data_set_format == BigANNVectorDataSet.FORMAT_NAME:
- return BigANNVectorDataSet(self.data_set_path)
+ def _read_data_set(self, data_set_format: str, data_set_path: str,
+ data_set_context: Context):
+ if data_set_format == HDF5DataSet.FORMAT_NAME:
+ return HDF5DataSet(data_set_path, data_set_context)
+ if data_set_format == BigANNVectorDataSet.FORMAT_NAME:
+ return BigANNVectorDataSet(data_set_path)
raise ConfigurationError("Invalid data set format")
def partition(self, partition_index, total_partitions):
- if self.data_set.size() % total_partitions != 0:
- raise ValueError("Data set must be divisible by number of clients")
+ """
+ Splits up the parameters source so that multiple clients can read data
+ from it.
+ Args:
+ partition_index: index of one particular partition
+ total_partitions: total number of partitions data set is split into
+
+ Returns:
+ The parameter source for this particular partion
+ """
+ if self.num_vectors % total_partitions != 0:
+ raise ValueError("Num vectors must be divisible by number of "
+ "partitions")
partition_x = copy.copy(self)
partition_x.num_vectors = int(self.num_vectors / total_partitions)
partition_x.offset = int(partition_index * partition_x.num_vectors)
# We need to create a new instance of the data set for each client
- partition_x.data_set = partition_x._read_data_set()
+ partition_x.data_set = partition_x._read_data_set(
+ self.data_set_format,
+ self.data_set_path,
+ self.context
+ )
partition_x.data_set.seek(partition_x.offset)
partition_x.current = partition_x.offset
return partition_x
+ @abstractmethod
def params(self):
+ """
+ Returns: A single parameter from this sourc
+ """
+ pass
+
+
+class QueryVectorsFromDataSetParamSource(VectorsFromDataSetParamSource):
+ """ Query parameter source for k-NN. Queries are created from data set
+ provided.
+
+ Attributes:
+ k: The number of results to return for the search
+ vector_batch: List of vectors to be read from data set. Read are batched
+ so that we do not need to read from disk for each query
+ """
+
+ VECTOR_READ_BATCH_SIZE = 100 # batch size to read vectors from data-set
+
+ def __init__(self, workload, params, **kwargs):
+ super().__init__(params, Context.QUERY)
+ self.k = parse_int_parameter("k", params)
+ self.vector_batch = None
+ def params(self):
+ """
+ Returns: A query parameter with a vector from a data set
+ """
+ if self.current >= self.num_vectors + self.offset:
+ raise StopIteration
+
+ if self.vector_batch is None or len(self.vector_batch) == 0:
+ self.vector_batch = self._batch_read(self.data_set)
+ if self.vector_batch is None:
+ raise StopIteration
+ vector = self.vector_batch.pop(0)
+ self.current += 1
+ self.percent_completed = self.current / self.total
+
+ return self._build_query_body(self.index_name, self.field_name, self.k,
+ vector)
+
+ def _batch_read(self, data_set: DataSet):
+ return list(data_set.read(self.VECTOR_READ_BATCH_SIZE))
+
+ def _build_query_body(self, index_name: str, field_name: str, k: int,
+ vector) -> dict:
+ """Builds a k-NN query that can be used to execute an approximate nearest
+ neighbor search against a k-NN plugin index
+ Args:
+ index_name: name of index to search
+ field_name: name of field to search
+ k: number of results to return
+ vector: vector used for query
+ Returns:
+ A dictionary containing the body used for search, a set of request
+ parameters to attach to the search and the name of the index.
+ """
+ return {
+ "index": index_name,
+ "request-params": {
+ "_source": {
+ "exclude": [field_name]
+ }
+ },
+ "body": {
+ "size": k,
+ "query": {
+ "knn": {
+ field_name: {
+ "vector": vector,
+ "k": k
+ }
+ }
+ }
+ }
+ }
+
+
+class BulkVectorsFromDataSetParamSource(VectorsFromDataSetParamSource):
+ """ Create bulk index requests from a data set of vectors.
+
+ Attributes:
+ bulk_size: number of vectors per request
+ retries: number of times to retry the request when it fails
+ """
+
+ DEFAULT_RETRIES = 10
+
+ def __init__(self, workload, params, **kwargs):
+ super().__init__(params, Context.INDEX)
+ self.bulk_size: int = parse_int_parameter("bulk_size", params)
+ self.retries: int = parse_int_parameter("retries", params,
+ self.DEFAULT_RETRIES)
+
+ def params(self):
+ """
+ Returns: A bulk index parameter with vectors from a data set.
+ """
if self.current >= self.num_vectors + self.offset:
raise StopIteration
diff --git a/benchmarks/osb/params/no-train-params.json b/benchmarks/osb/params/no-train-params.json
index 988c1717b..64fe3c296 100644
--- a/benchmarks/osb/params/no-train-params.json
+++ b/benchmarks/osb/params/no-train-params.json
@@ -14,7 +14,10 @@
"hnsw_ef_construction": 512,
"hnsw_m": 16,
-
+ "query_k": 10,
+ "query_clients": 10,
+ "query_data_set_format": "hdf5",
+ "query_data_set_path": "",
"ivf_nlists": 1,
"ivf_nprobes": 1,
diff --git a/benchmarks/osb/params/train-params.json b/benchmarks/osb/params/train-params.json
index b50c235c4..4c598d25b 100644
--- a/benchmarks/osb/params/train-params.json
+++ b/benchmarks/osb/params/train-params.json
@@ -27,5 +27,10 @@
"train_index_data_set_format": "hdf5",
"train_index_data_set_path": "",
"train_index_num_vectors": 1000000,
- "train_index_bulk_index_clients": 10
+ "train_index_bulk_index_clients": 10,
+
+ "query_k": 10,
+ "query_clients": 10,
+ "query_data_set_format": "hdf5",
+ "query_data_set_path": ""
}
diff --git a/benchmarks/osb/procedures/no-train-test.json b/benchmarks/osb/procedures/no-train-test.json
index f54696360..03d72d6bd 100644
--- a/benchmarks/osb/procedures/no-train-test.json
+++ b/benchmarks/osb/procedures/no-train-test.json
@@ -45,6 +45,19 @@
"index": "{{ target_index_name }}",
"retries": 100
}
+ },
+ {
+ "operation": {
+ "name": "knn-query-from-data-set",
+ "operation-type": "search",
+ "index": "{{ target_index_name }}",
+ "param-source": "knn-query-from-data-set",
+ "k": {{ query_k }},
+ "field": "{{ target_field_name }}",
+ "data_set_format": "{{ query_data_set_format }}",
+ "data_set_path": "{{ query_data_set_path }}"
+ },
+ "clients": {{ query_clients }}
}
]
}
diff --git a/benchmarks/osb/procedures/train-test.json b/benchmarks/osb/procedures/train-test.json
index 8f5efd674..49930044a 100644
--- a/benchmarks/osb/procedures/train-test.json
+++ b/benchmarks/osb/procedures/train-test.json
@@ -99,6 +99,19 @@
"index": "{{ target_index_name }}",
"retries": 100
}
+ },
+ {
+ "operation": {
+ "name": "knn-query-from-data-set",
+ "operation-type": "search",
+ "index": "{{ target_index_name }}",
+ "param-source": "knn-query-from-data-set",
+ "k": {{ query_k }},
+ "field": "{{ target_field_name }}",
+ "data_set_format": "{{ query_data_set_format }}",
+ "data_set_path": "{{ query_data_set_path }}"
+ },
+ "clients": {{ query_clients }}
}
]
}
diff --git a/benchmarks/osb/tests/__init__.py b/benchmarks/osb/tests/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/benchmarks/osb/tests/data_set_helper.py b/benchmarks/osb/tests/data_set_helper.py
new file mode 100644
index 000000000..2b144da49
--- /dev/null
+++ b/benchmarks/osb/tests/data_set_helper.py
@@ -0,0 +1,197 @@
+# SPDX-License-Identifier: Apache-2.0
+#
+# The OpenSearch Contributors require contributions made to
+# this file be licensed under the Apache-2.0 license or a
+# compatible open source license.
+
+from abc import ABC, abstractmethod
+
+import h5py
+import numpy as np
+
+from osb.extensions.data_set import Context, HDF5DataSet, BigANNVectorDataSet
+
+""" Module containing utility classes and functions for working with data sets.
+
+Included are utilities that can be used to build data sets and write them to
+paths.
+"""
+
+
+class DataSetBuildContext:
+ """ Data class capturing information needed to build a particular data set
+
+ Attributes:
+ data_set_context: Indicator of what the data set is used for,
+ vectors: A 2D array containing vectors that are used to build data set.
+ path: string representing path where data set should be serialized to.
+ """
+ def __init__(self, data_set_context: Context, vectors: np.ndarray, path: str):
+ self.data_set_context: Context = data_set_context
+ self.vectors: np.ndarray = vectors #TODO: Validate shape
+ self.path: str = path
+
+ def get_num_vectors(self) -> int:
+ return self.vectors.shape[0]
+
+ def get_dimension(self) -> int:
+ return self.vectors.shape[1]
+
+ def get_type(self) -> np.dtype:
+ return self.vectors.dtype
+
+
+class DataSetBuilder(ABC):
+ """ Abstract builder used to create a build a collection of data sets
+
+ Attributes:
+ data_set_build_contexts: list of data set build contexts that builder
+ will build.
+ """
+ def __init__(self):
+ self.data_set_build_contexts = list()
+
+ def add_data_set_build_context(self, data_set_build_context: DataSetBuildContext):
+ """ Adds a data set build context to list of contexts to be built.
+
+ Args:
+ data_set_build_context: DataSetBuildContext to be added to list
+
+ Returns: Updated DataSetBuilder
+
+ """
+ self._validate_data_set_context(data_set_build_context)
+ self.data_set_build_contexts.append(data_set_build_context)
+ return self
+
+ def build(self):
+ """ Builds and serializes all data sets build contexts
+
+ Returns:
+
+ """
+ [self._build_data_set(data_set_build_context) for data_set_build_context
+ in self.data_set_build_contexts]
+
+ @abstractmethod
+ def _build_data_set(self, context: DataSetBuildContext):
+ """ Builds an individual data set
+
+ Args:
+ context: DataSetBuildContext of data set to be built
+
+ Returns:
+
+ """
+ pass
+
+ @abstractmethod
+ def _validate_data_set_context(self, context: DataSetBuildContext):
+ """ Validates that data set context can be added to this builder
+
+ Args:
+ context: DataSetBuildContext to be validated
+
+ Returns:
+
+ """
+ pass
+
+
+class HDF5Builder(DataSetBuilder):
+
+ def __init__(self):
+ super(HDF5Builder, self).__init__()
+ self.data_set_meta_data = dict()
+
+ def _validate_data_set_context(self, context: DataSetBuildContext):
+ if context.path not in self.data_set_meta_data.keys():
+ self.data_set_meta_data[context.path] = {
+ context.data_set_context: context
+ }
+ return
+
+ if context.data_set_context in \
+ self.data_set_meta_data[context.path].keys():
+ raise IllegalDataSetBuildContext("Path and context for data set "
+ "are already present in builder.")
+
+ self.data_set_meta_data[context.path][context.data_set_context] = \
+ context
+
+ @staticmethod
+ def _validate_extension(context: DataSetBuildContext):
+ ext = context.path.split('.')[-1]
+
+ if ext != HDF5DataSet.FORMAT_NAME:
+ raise IllegalDataSetBuildContext("Invalid file extension")
+
+ def _build_data_set(self, context: DataSetBuildContext):
+ # For HDF5, because multiple data sets can be grouped in the same file,
+ # we will build data sets in memory and not write to disk until
+ # _flush_data_sets_to_disk is called
+ with h5py.File(context.path, 'a') as hf:
+ hf.create_dataset(
+ HDF5DataSet.parse_context(context.data_set_context),
+ data=context.vectors
+ )
+
+
+class BigANNBuilder(DataSetBuilder):
+
+ def _validate_data_set_context(self, context: DataSetBuildContext):
+ self._validate_extension(context)
+
+ # prevent the duplication of paths for data sets
+ data_set_paths = [c.path for c in self.data_set_build_contexts]
+ if any(data_set_paths.count(x) > 1 for x in data_set_paths):
+ raise IllegalDataSetBuildContext("Build context paths have to be "
+ "unique.")
+
+ @staticmethod
+ def _validate_extension(context: DataSetBuildContext):
+ ext = context.path.split('.')[-1]
+
+ if ext != BigANNVectorDataSet.U8BIN_EXTENSION and ext != \
+ BigANNVectorDataSet.FBIN_EXTENSION:
+ raise IllegalDataSetBuildContext("Invalid file extension")
+
+ if ext == BigANNVectorDataSet.U8BIN_EXTENSION and context.get_type() != \
+ np.u8int:
+ raise IllegalDataSetBuildContext("Invalid data type for {} ext."
+ .format(BigANNVectorDataSet
+ .U8BIN_EXTENSION))
+
+ if ext == BigANNVectorDataSet.FBIN_EXTENSION and context.get_type() != \
+ np.float32:
+ print(context.get_type())
+ raise IllegalDataSetBuildContext("Invalid data type for {} ext."
+ .format(BigANNVectorDataSet
+ .FBIN_EXTENSION))
+
+ def _build_data_set(self, context: DataSetBuildContext):
+ num_vectors = context.get_num_vectors()
+ dimension = context.get_dimension()
+
+ with open(context.path, 'wb') as f:
+ f.write(int.to_bytes(num_vectors, 4, "little"))
+ f.write(int.to_bytes(dimension, 4, "little"))
+ context.vectors.tofile(f)
+
+
+def create_random_2d_array(num_vectors: int, dimension: int) -> np.ndarray:
+ rng = np.random.default_rng()
+ return rng.random(size=(num_vectors, dimension), dtype=np.float32)
+
+
+class IllegalDataSetBuildContext(Exception):
+ """Exception raised when passed in DataSetBuildContext is illegal
+
+ Attributes:
+ message -- explanation of the error
+ """
+
+ def __init__(self, message: str):
+ self.message = f'{message}'
+ super().__init__(self.message)
+
diff --git a/benchmarks/osb/tests/test_param_sources.py b/benchmarks/osb/tests/test_param_sources.py
new file mode 100644
index 000000000..cda730cee
--- /dev/null
+++ b/benchmarks/osb/tests/test_param_sources.py
@@ -0,0 +1,353 @@
+# SPDX-License-Identifier: Apache-2.0
+#
+# The OpenSearch Contributors require contributions made to
+# this file be licensed under the Apache-2.0 license or a
+# compatible open source license.
+
+import os
+import random
+import shutil
+import string
+import sys
+import tempfile
+import unittest
+
+# Add parent directory to path
+import numpy as np
+
+sys.path.append(os.path.abspath(os.path.join(os.getcwd(), os.pardir)))
+
+from osb.tests.data_set_helper import HDF5Builder, create_random_2d_array, \
+ DataSetBuildContext, BigANNBuilder
+from osb.extensions.data_set import Context, HDF5DataSet
+from osb.extensions.param_sources import VectorsFromDataSetParamSource, \
+ QueryVectorsFromDataSetParamSource, BulkVectorsFromDataSetParamSource
+from osb.extensions.util import ConfigurationError
+
+DEFAULT_INDEX_NAME = "test-index"
+DEFAULT_FIELD_NAME = "test-field"
+DEFAULT_CONTEXT = Context.INDEX
+DEFAULT_TYPE = HDF5DataSet.FORMAT_NAME
+DEFAULT_NUM_VECTORS = 10
+DEFAULT_DIMENSION = 10
+DEFAULT_RANDOM_STRING_LENGTH = 8
+
+
+class VectorsFromDataSetParamSourceTestCase(unittest.TestCase):
+
+ def setUp(self) -> None:
+ self.data_set_dir = tempfile.mkdtemp()
+
+ # Create a data set we know to be valid for convenience
+ self.valid_data_set_path = _create_data_set(
+ DEFAULT_NUM_VECTORS,
+ DEFAULT_DIMENSION,
+ DEFAULT_TYPE,
+ DEFAULT_CONTEXT,
+ self.data_set_dir
+ )
+
+ def tearDown(self):
+ shutil.rmtree(self.data_set_dir)
+
+ def test_missing_params(self):
+ empty_params = dict()
+ self.assertRaises(
+ ConfigurationError,
+ lambda: VectorsFromDataSetParamSourceTestCase.
+ TestVectorsFromDataSetParamSource(empty_params, DEFAULT_CONTEXT)
+ )
+
+ def test_invalid_data_set_format(self):
+ invalid_data_set_format = "invalid-data-set-format"
+
+ test_param_source_params = {
+ "index": DEFAULT_INDEX_NAME,
+ "field": DEFAULT_FIELD_NAME,
+ "data_set_format": invalid_data_set_format,
+ "data_set_path": self.valid_data_set_path,
+ }
+ self.assertRaises(
+ ConfigurationError,
+ lambda: self.TestVectorsFromDataSetParamSource(
+ test_param_source_params,
+ DEFAULT_CONTEXT
+ )
+ )
+
+ def test_invalid_data_set_path(self):
+ invalid_data_set_path = "invalid-data-set-path"
+ test_param_source_params = {
+ "index": DEFAULT_INDEX_NAME,
+ "field": DEFAULT_FIELD_NAME,
+ "data_set_format": HDF5DataSet.FORMAT_NAME,
+ "data_set_path": invalid_data_set_path,
+ }
+ self.assertRaises(
+ FileNotFoundError,
+ lambda: self.TestVectorsFromDataSetParamSource(
+ test_param_source_params,
+ DEFAULT_CONTEXT
+ )
+ )
+
+ def test_partition_hdf5(self):
+ num_vectors = 100
+
+ hdf5_data_set_path = _create_data_set(
+ num_vectors,
+ DEFAULT_DIMENSION,
+ HDF5DataSet.FORMAT_NAME,
+ DEFAULT_CONTEXT,
+ self.data_set_dir
+ )
+
+ test_param_source_params = {
+ "index": DEFAULT_INDEX_NAME,
+ "field": DEFAULT_FIELD_NAME,
+ "data_set_format": HDF5DataSet.FORMAT_NAME,
+ "data_set_path": hdf5_data_set_path,
+ }
+ test_param_source = self.TestVectorsFromDataSetParamSource(
+ test_param_source_params,
+ DEFAULT_CONTEXT
+ )
+
+ num_partitions = 10
+ vecs_per_partition = test_param_source.num_vectors // num_partitions
+
+ self._test_partition(
+ test_param_source,
+ num_partitions,
+ vecs_per_partition
+ )
+
+ def test_partition_bigann(self):
+ num_vectors = 100
+ float_extension = "fbin"
+
+ bigann_data_set_path = _create_data_set(
+ num_vectors,
+ DEFAULT_DIMENSION,
+ float_extension,
+ DEFAULT_CONTEXT,
+ self.data_set_dir
+ )
+
+ test_param_source_params = {
+ "index": DEFAULT_INDEX_NAME,
+ "field": DEFAULT_FIELD_NAME,
+ "data_set_format": "bigann",
+ "data_set_path": bigann_data_set_path,
+ }
+ test_param_source = self.TestVectorsFromDataSetParamSource(
+ test_param_source_params,
+ DEFAULT_CONTEXT
+ )
+
+ num_partitions = 10
+ vecs_per_partition = test_param_source.num_vectors // num_partitions
+
+ self._test_partition(
+ test_param_source,
+ num_partitions,
+ vecs_per_partition
+ )
+
+ def _test_partition(
+ self,
+ test_param_source: VectorsFromDataSetParamSource,
+ num_partitions: int,
+ vec_per_partition: int
+ ):
+ for i in range(num_partitions):
+ test_param_source_i = test_param_source.partition(i, num_partitions)
+ self.assertEqual(test_param_source_i.num_vectors, vec_per_partition)
+ self.assertEqual(test_param_source_i.offset, i * vec_per_partition)
+
+ class TestVectorsFromDataSetParamSource(VectorsFromDataSetParamSource):
+ """
+ Empty implementation of ABC VectorsFromDataSetParamSource so that we can
+ test the concrete methods.
+ """
+
+ def params(self):
+ pass
+
+
+class QueryVectorsFromDataSetParamSourceTestCase(unittest.TestCase):
+
+ def setUp(self) -> None:
+ self.data_set_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.data_set_dir)
+
+ def test_params(self):
+ # Create a data set
+ k = 12
+ data_set_path = _create_data_set(
+ DEFAULT_NUM_VECTORS,
+ DEFAULT_DIMENSION,
+ DEFAULT_TYPE,
+ Context.QUERY,
+ self.data_set_dir
+ )
+
+ # Create a QueryVectorsFromDataSetParamSource with relevant params
+ test_param_source_params = {
+ "index": DEFAULT_INDEX_NAME,
+ "field": DEFAULT_FIELD_NAME,
+ "data_set_format": DEFAULT_TYPE,
+ "data_set_path": data_set_path,
+ "k": k,
+ }
+ query_param_source = QueryVectorsFromDataSetParamSource(
+ None, test_param_source_params
+ )
+
+ # Check each
+ for i in range(DEFAULT_NUM_VECTORS):
+ self._check_params(
+ query_param_source.params(),
+ DEFAULT_INDEX_NAME,
+ DEFAULT_FIELD_NAME,
+ DEFAULT_DIMENSION,
+ k
+ )
+
+ # Assert last call creates stop iteration
+ self.assertRaises(
+ StopIteration,
+ lambda: query_param_source.params()
+ )
+
+ def _check_params(
+ self,
+ params: dict,
+ expected_index: str,
+ expected_field: str,
+ expected_dimension: int,
+ expected_k: int
+ ):
+ index_name = params.get("index")
+ self.assertEqual(expected_index, index_name)
+ body = params.get("body")
+ self.assertIsInstance(body, dict)
+ query = body.get("query")
+ self.assertIsInstance(query, dict)
+ query_knn = query.get("knn")
+ self.assertIsInstance(query_knn, dict)
+ field = query_knn.get(expected_field)
+ self.assertIsInstance(field, dict)
+ vector = field.get("vector")
+ self.assertIsInstance(vector, np.ndarray)
+ self.assertEqual(len(list(vector)), expected_dimension)
+ k = field.get("k")
+ self.assertEqual(k, expected_k)
+
+
+class BulkVectorsFromDataSetParamSourceTestCase(unittest.TestCase):
+
+ def setUp(self) -> None:
+ self.data_set_dir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ shutil.rmtree(self.data_set_dir)
+
+ def test_params(self):
+ num_vectors = 49
+ bulk_size = 10
+ data_set_path = _create_data_set(
+ num_vectors,
+ DEFAULT_DIMENSION,
+ DEFAULT_TYPE,
+ Context.INDEX,
+ self.data_set_dir
+ )
+
+ test_param_source_params = {
+ "index": DEFAULT_INDEX_NAME,
+ "field": DEFAULT_FIELD_NAME,
+ "data_set_format": DEFAULT_TYPE,
+ "data_set_path": data_set_path,
+ "bulk_size": bulk_size
+ }
+ bulk_param_source = BulkVectorsFromDataSetParamSource(
+ None, test_param_source_params
+ )
+
+ # Check each payload returned
+ vectors_consumed = 0
+ while vectors_consumed < num_vectors:
+ expected_num_vectors = min(num_vectors - vectors_consumed, bulk_size)
+ self._check_params(
+ bulk_param_source.params(),
+ DEFAULT_INDEX_NAME,
+ DEFAULT_FIELD_NAME,
+ DEFAULT_DIMENSION,
+ expected_num_vectors
+ )
+ vectors_consumed += expected_num_vectors
+
+ # Assert last call creates stop iteration
+ self.assertRaises(
+ StopIteration,
+ lambda: bulk_param_source.params()
+ )
+
+ def _check_params(
+ self,
+ params: dict,
+ expected_index: str,
+ expected_field: str,
+ expected_dimension: int,
+ expected_num_vectors_in_payload: int
+ ):
+ size = params.get("size")
+ self.assertEqual(size, expected_num_vectors_in_payload)
+ body = params.get("body")
+ self.assertIsInstance(body, list)
+ self.assertEqual(len(body) // 2, expected_num_vectors_in_payload)
+
+ # Bulk payload has 2 parts: first one is the header and the second one
+ # is the body. The header will have the index name and the body will
+ # have the vector
+ for header, req_body in zip(*[iter(body)] * 2):
+ index = header.get("index")
+ self.assertIsInstance(index, dict)
+ index_name = index.get("_index")
+ self.assertEqual(index_name, expected_index)
+
+ vector = req_body.get(expected_field)
+ self.assertIsInstance(vector, list)
+ self.assertEqual(len(vector), expected_dimension)
+
+
+def _create_data_set(
+ num_vectors: int,
+ dimension: int,
+ extension: str,
+ data_set_context: Context,
+ data_set_dir
+) -> str:
+
+ file_name_base = ''.join(random.choice(string.ascii_letters) for _ in
+ range(DEFAULT_RANDOM_STRING_LENGTH))
+ data_set_file_name = "{}.{}".format(file_name_base, extension)
+ data_set_path = os.path.join(data_set_dir, data_set_file_name)
+ context = DataSetBuildContext(
+ data_set_context,
+ create_random_2d_array(num_vectors, dimension),
+ data_set_path)
+
+ if extension == HDF5DataSet.FORMAT_NAME:
+ HDF5Builder().add_data_set_build_context(context).build()
+ else:
+ BigANNBuilder().add_data_set_build_context(context).build()
+
+ return data_set_path
+
+
+if __name__ == '__main__':
+ unittest.main()