diff --git a/benchmarks/osb/README.md b/benchmarks/osb/README.md new file mode 100644 index 000000000..95b48edfb --- /dev/null +++ b/benchmarks/osb/README.md @@ -0,0 +1,399 @@ +# OpenSearch Benchmarks for k-NN + +## Overview + +This directory contains code and configurations to run k-NN benchmarking +workloads using OpenSearch Benchmarks. + +The [extensions](extensions) directory contains common code shared between +procedures. The [procedures](procedures) directory contains the individual +test procedures for this workload. + +## Getting Started + +### OpenSearch Benchmarks Background + +OpenSearch Benchmark is a framework for performance benchmarking an OpenSearch +cluster. For more details, checkout their +[repo](https://github.com/opensearch-project/opensearch-benchmark/). + +Before getting into the benchmarks, it is helpful to know a few terms: +1. Workload - Top level description of a benchmark suite. A workload will have a `workload.json` file that defines different components of the tests +2. Test Procedures - A workload can have a schedule of operations that run the test. However, a workload can also have several test procedures that define their own schedule of operations. This is helpful for sharing code between tests +3. Operation - An action against the OpenSearch cluster +4. Parameter source - Producers of parameters for OpenSearch operations +5. Runners - Code that actually will execute the OpenSearch operations + +### Setup + +OpenSearch Benchmarks requires Python 3.8 or greater to be installed. One of +the easier ways to do this is through Conda, a package and environment +management system for Python. + +First, follow the +[installation instructions](https://docs.conda.io/projects/conda/en/latest/user-guide/install/index.html) +to install Conda on your system. + +Next, create a Python 3.8 environment: +``` +conda create -n knn-osb python=3.8 +``` + +After the environment is created, activate it: +``` +source activate knn-osb +``` + +Lastly, clone the k-NN repo and install all required python packages: +``` +git clone https://github.com/opensearch-project/k-NN.git +cd k-NN/benchmarks/osb +pip install -r requirements.txt +``` + +After all of this completes, you should be ready to run your first benchmark! + +### Running a benchmark + +Before running a benchmark, make sure you have the endpoint of your cluster and + and the machine you are running the benchmarks from can access it. + Additionally, ensure that all data has been pulled to the client. + +Currently, we support 2 test procedures for the k-NN workload: train-test and +no-train-test. The train test has steps to train a model included in the +schedule, while no train does not. Both test procedures will index a data set +of vectors into an OpenSearch index. + +Once you have decided which test procedure you want to use, open up +[params/train-params.json](params/train-params.json) or +[params/no-train-params.json](params/no-train-params.json) and +fill out the parameters. Notice, at the bottom of `no-train-params.json` there +are several parameters that relate to training. Ignore these. They need to be +defined for the workload but not used. + +Once the parameters are set, set the URL and PORT of your cluster and run the +command to run the test procedure. + +``` +export URL= +export PORT= +export PARAMS_FILE= +export PROCEDURE={no-train-test | train-test} + +opensearch-benchmark execute_test \ + --target-hosts $URL:$PORT \ + --workload-path ./workload.json \ + --workload-params ${PARAMS_FILE} \ + --test-procedure=${PROCEDURE} \ + --pipeline benchmark-only +``` + +## Current Procedures + +### No Train Test + +The No Train Test procedure is used to test `knn_vector` indices that do not +use an algorithm that requires training. + +#### Workflow + +1. Delete old resources in the cluster if they are present +2. Create an OpenSearch index with `knn_vector` configured to use the HNSW algorithm +3. Wait for cluster to be green +4. Ingest data set into the cluster +5. Refresh the index + +#### Parameters + +| Name | Description | +|-----------------------------------------|--------------------------------------------------------------------------| +| target_index_name | Name of index to add vectors to | +| target_field_name | Name of field to add vectors to | +| target_index_body | Path to target index definition | +| target_index_primary_shards | Target index primary shards | +| target_index_replica_shards | Target index replica shards | +| target_index_dimension | Dimension of target index | +| target_index_space_type | Target index space type | +| target_index_bulk_size | Target index bulk size | +| target_index_bulk_index_data_set_format | Format of vector data set | +| target_index_bulk_index_data_set_path | Path to vector data set | +| target_index_bulk_index_clients | Clients to be used for bulk ingestion (must be divisor of data set size) | +| hnsw_ef_search | HNSW ef search parameter | +| hnsw_ef_construction | HNSW ef construction parameter | +| hnsw_m | HNSW m parameter | + +#### Metrics + +The result metrics of this procedure will look like: +``` +|---------------------------------------------------------------:|---------------------:|----------:|-------:| +| Cumulative indexing time of primary shards | | 2.36965 | min | +| Min cumulative indexing time across primary shards | | 0.0923333 | min | +| Median cumulative indexing time across primary shards | | 0.732892 | min | +| Max cumulative indexing time across primary shards | | 0.811533 | min | +| Cumulative indexing throttle time of primary shards | | 0 | min | +| Min cumulative indexing throttle time across primary shards | | 0 | min | +| Median cumulative indexing throttle time across primary shards | | 0 | min | +| Max cumulative indexing throttle time across primary shards | | 0 | min | +| Cumulative merge time of primary shards | | 1.70392 | min | +| Cumulative merge count of primary shards | | 13 | | +| Min cumulative merge time across primary shards | | 0.0028 | min | +| Median cumulative merge time across primary shards | | 0.538375 | min | +| Max cumulative merge time across primary shards | | 0.624367 | min | +| Cumulative merge throttle time of primary shards | | 0.407467 | min | +| Min cumulative merge throttle time across primary shards | | 0 | min | +| Median cumulative merge throttle time across primary shards | | 0.131758 | min | +| Max cumulative merge throttle time across primary shards | | 0.14395 | min | +| Cumulative refresh time of primary shards | | 1.01585 | min | +| Cumulative refresh count of primary shards | | 55 | | +| Min cumulative refresh time across primary shards | | 0.0084 | min | +| Median cumulative refresh time across primary shards | | 0.330733 | min | +| Max cumulative refresh time across primary shards | | 0.345983 | min | +| Cumulative flush time of primary shards | | 0 | min | +| Cumulative flush count of primary shards | | 0 | | +| Min cumulative flush time across primary shards | | 0 | min | +| Median cumulative flush time across primary shards | | 0 | min | +| Max cumulative flush time across primary shards | | 0 | min | +| Total Young Gen GC time | | 0.218 | s | +| Total Young Gen GC count | | 5 | | +| Total Old Gen GC time | | 0 | s | +| Total Old Gen GC count | | 0 | | +| Store size | | 3.18335 | GB | +| Translog size | | 1.29415 | GB | +| Heap used for segments | | 0.100433 | MB | +| Heap used for doc values | | 0.0101166 | MB | +| Heap used for terms | | 0.0339661 | MB | +| Heap used for norms | | 0 | MB | +| Heap used for points | | 0 | MB | +| Heap used for stored fields | | 0.0563507 | MB | +| Segment count | | 84 | | +| Min Throughput | custom-vector-bulk | 32004.5 | docs/s | +| Mean Throughput | custom-vector-bulk | 40288.7 | docs/s | +| Median Throughput | custom-vector-bulk | 36826.6 | docs/s | +| Max Throughput | custom-vector-bulk | 89105.4 | docs/s | +| 50th percentile latency | custom-vector-bulk | 21.4377 | ms | +| 90th percentile latency | custom-vector-bulk | 37.6029 | ms | +| 99th percentile latency | custom-vector-bulk | 822.604 | ms | +| 99.9th percentile latency | custom-vector-bulk | 1396.8 | ms | +| 100th percentile latency | custom-vector-bulk | 1751.85 | ms | +| 50th percentile service time | custom-vector-bulk | 21.4377 | ms | +| 90th percentile service time | custom-vector-bulk | 37.6029 | ms | +| 99th percentile service time | custom-vector-bulk | 822.604 | ms | +| 99.9th percentile service time | custom-vector-bulk | 1396.8 | ms | +| 100th percentile service time | custom-vector-bulk | 1751.85 | ms | +| error rate | custom-vector-bulk | 0 | % | +| Min Throughput | refresh-target-index | 0.04 | ops/s | +| Mean Throughput | refresh-target-index | 0.04 | ops/s | +| Median Throughput | refresh-target-index | 0.04 | ops/s | +| Max Throughput | refresh-target-index | 0.04 | ops/s | +| 100th percentile latency | refresh-target-index | 23522.6 | ms | +| 100th percentile service time | refresh-target-index | 23522.6 | ms | +| error rate | refresh-target-index | 0 | % | + + +-------------------------------- +[INFO] SUCCESS (took 76 seconds) +-------------------------------- +``` + +### Train Test + +The Train Test procedure is used to test `knn_vector` indices that do use an +algorithm that requires training. + +#### Workflow + +1. Delete old resources in the cluster if they are present +2. Create an OpenSearch index with `knn_vector` configured to load with training data +3. Wait for cluster to be green +4. Ingest data set into the training index +5. Refresh the index +6. Train a model based on user provided input parameters +7. Create an OpenSearch index with `knn_vector` configured to use the model +8. Ingest vectors into the target index +9. Refresh the target index + +#### Parameters + +| Name | Description | +|-----------------------------------------|--------------------------------------------------------------------------| +| target_index_name | Name of index to add vectors to | +| target_field_name | Name of field to add vectors to | +| target_index_body | Path to target index definition | +| target_index_primary_shards | Target index primary shards | +| target_index_replica_shards | Target index replica shards | +| target_index_dimension | Dimension of target index | +| target_index_space_type | Target index space type | +| target_index_bulk_size | Target index bulk size | +| target_index_bulk_index_data_set_format | Format of vector data set | +| target_index_bulk_index_data_set_path | Path to vector data set | +| target_index_bulk_index_clients | Clients to be used for bulk ingestion (must be divisor of data set size) | +| ivf_nlists | IVF nlist parameter | +| ivf_nprobes | IVF nprobe parameter | +| pq_code_size | PQ code_size parameter | +| pq_m | PQ m parameter | +| train_model_method | Method to be used for model (ivf or ivfpq) | +| train_model_id | Model ID | +| train_index_name | Name of index to put training data into | +| train_field_name | Name of field to put training data into | +| train_index_body | Path to train index definition | +| train_search_size | Search size to use when pulling training data | +| train_timeout | Timeout to wait for training to finish | +| train_index_primary_shards | Train index primary shards | +| train_index_replica_shards | Train index replica shards | +| train_index_bulk_size | Train index bulk size | +| train_index_data_set_format | Format of vector data set | +| train_index_data_set_path | Path to vector data set | +| train_index_num_vectors | Number of vectors to use from vector data set for training | +| train_index_bulk_index_clients | Clients to be used for bulk ingestion (must be divisor of data set size) | + + +#### Metrics + +The result metrics of this procedure will look like: +``` +------------------------------------------------------ + _______ __ _____ + / ____(_)___ ____ _/ / / ___/_________ ________ + / /_ / / __ \/ __ `/ / \__ \/ ___/ __ \/ ___/ _ \ + / __/ / / / / / /_/ / / ___/ / /__/ /_/ / / / __/ +/_/ /_/_/ /_/\__,_/_/ /____/\___/\____/_/ \___/ +------------------------------------------------------ + +| Metric | Task | Value | Unit | +|---------------------------------------------------------------:|---------------------:|-----------:|-----------------:| +| Cumulative indexing time of primary shards | | 1.08917 | min | +| Min cumulative indexing time across primary shards | | 0.0923333 | min | +| Median cumulative indexing time across primary shards | | 0.328675 | min | +| Max cumulative indexing time across primary shards | | 0.339483 | min | +| Cumulative indexing throttle time of primary shards | | 0 | min | +| Min cumulative indexing throttle time across primary shards | | 0 | min | +| Median cumulative indexing throttle time across primary shards | | 0 | min | +| Max cumulative indexing throttle time across primary shards | | 0 | min | +| Cumulative merge time of primary shards | | 0.44465 | min | +| Cumulative merge count of primary shards | | 19 | | +| Min cumulative merge time across primary shards | | 0.0028 | min | +| Median cumulative merge time across primary shards | | 0.145408 | min | +| Max cumulative merge time across primary shards | | 0.151033 | min | +| Cumulative merge throttle time of primary shards | | 0.295033 | min | +| Min cumulative merge throttle time across primary shards | | 0 | min | +| Median cumulative merge throttle time across primary shards | | 0.0973167 | min | +| Max cumulative merge throttle time across primary shards | | 0.1004 | min | +| Cumulative refresh time of primary shards | | 0.07955 | min | +| Cumulative refresh count of primary shards | | 67 | | +| Min cumulative refresh time across primary shards | | 0.0084 | min | +| Median cumulative refresh time across primary shards | | 0.022725 | min | +| Max cumulative refresh time across primary shards | | 0.0257 | min | +| Cumulative flush time of primary shards | | 0 | min | +| Cumulative flush count of primary shards | | 0 | | +| Min cumulative flush time across primary shards | | 0 | min | +| Median cumulative flush time across primary shards | | 0 | min | +| Max cumulative flush time across primary shards | | 0 | min | +| Total Young Gen GC time | | 0.034 | s | +| Total Young Gen GC count | | 6 | | +| Total Old Gen GC time | | 0 | s | +| Total Old Gen GC count | | 0 | | +| Store size | | 1.81242 | GB | +| Heap used for points | | 0 | MB | +| Heap used for stored fields | | 0.041626 | MB | +| Segment count | | 62 | | +| Min Throughput | delete-model | 33.25 | ops/s | +| Mean Throughput | delete-model | 33.25 | ops/s | +| Median Throughput | delete-model | 33.25 | ops/s | +| Max Throughput | delete-model | 33.25 | ops/s | +| 100th percentile latency | delete-model | 29.6471 | ms | +| 100th percentile service time | delete-model | 29.6471 | ms | +| error rate | delete-model | 0 | % | +| Min Throughput | train-vector-bulk | 78682.2 | docs/s | +| Mean Throughput | train-vector-bulk | 78682.2 | docs/s | +| Median Throughput | train-vector-bulk | 78682.2 | docs/s | +| Max Throughput | train-vector-bulk | 78682.2 | docs/s | +| 50th percentile latency | train-vector-bulk | 16.4609 | ms | +| 90th percentile latency | train-vector-bulk | 21.8225 | ms | +| 99th percentile latency | train-vector-bulk | 117.632 | ms | +| 100th percentile latency | train-vector-bulk | 237.021 | ms | +| 50th percentile service time | train-vector-bulk | 16.4609 | ms | +| 90th percentile service time | train-vector-bulk | 21.8225 | ms | +| 99th percentile service time | train-vector-bulk | 117.632 | ms | +| 100th percentile service time | train-vector-bulk | 237.021 | ms | +| error rate | train-vector-bulk | 0 | % | +| Min Throughput | refresh-train-index | 149.22 | ops/s | +| Mean Throughput | refresh-train-index | 149.22 | ops/s | +| Median Throughput | refresh-train-index | 149.22 | ops/s | +| Max Throughput | refresh-train-index | 149.22 | ops/s | +| 100th percentile latency | refresh-train-index | 6.35862 | ms | +| 100th percentile service time | refresh-train-index | 6.35862 | ms | +| error rate | refresh-train-index | 0 | % | +| Min Throughput | ivfpq-train-model | 0.04 | models_trained/s | +| Mean Throughput | ivfpq-train-model | 0.04 | models_trained/s | +| Median Throughput | ivfpq-train-model | 0.04 | models_trained/s | +| Max Throughput | ivfpq-train-model | 0.04 | models_trained/s | +| 100th percentile latency | ivfpq-train-model | 28123 | ms | +| 100th percentile service time | ivfpq-train-model | 28123 | ms | +| error rate | ivfpq-train-model | 0 | % | +| Min Throughput | custom-vector-bulk | 71222.6 | docs/s | +| Mean Throughput | custom-vector-bulk | 79465.5 | docs/s | +| Median Throughput | custom-vector-bulk | 77764.4 | docs/s | +| Max Throughput | custom-vector-bulk | 90646.3 | docs/s | +| 50th percentile latency | custom-vector-bulk | 14.5099 | ms | +| 90th percentile latency | custom-vector-bulk | 18.1755 | ms | +| 99th percentile latency | custom-vector-bulk | 123.359 | ms | +| 99.9th percentile latency | custom-vector-bulk | 171.928 | ms | +| 100th percentile latency | custom-vector-bulk | 216.383 | ms | +| 50th percentile service time | custom-vector-bulk | 14.5099 | ms | +| 90th percentile service time | custom-vector-bulk | 18.1755 | ms | +| 99th percentile service time | custom-vector-bulk | 123.359 | ms | +| 99.9th percentile service time | custom-vector-bulk | 171.928 | ms | +| 100th percentile service time | custom-vector-bulk | 216.383 | ms | +| error rate | custom-vector-bulk | 0 | % | +| Min Throughput | refresh-target-index | 64.45 | ops/s | +| Mean Throughput | refresh-target-index | 64.45 | ops/s | +| Median Throughput | refresh-target-index | 64.45 | ops/s | +| Max Throughput | refresh-target-index | 64.45 | ops/s | +| 100th percentile latency | refresh-target-index | 15.177 | ms | +| 100th percentile service time | refresh-target-index | 15.177 | ms | +| error rate | refresh-target-index | 0 | % | + + +--------------------------------- +[INFO] SUCCESS (took 108 seconds) +--------------------------------- +``` + +## Adding a procedure + +Adding additional benchmarks is very simple. First, place any custom parameter +sources or runners in the [extensions](extensions) directory so that other tests +can use them and also update the [documentation](#custom-extensions) +accordingly. + +Next, create a new test procedure file and add the operations you want your test +to run. Lastly, be sure to update documentation. + +## Custom Extensions + +OpenSearch Benchmarks is very extendable. To fit the plugins needs, we add +customer parameter sources and custom runners. Parameter sources allow users to +supply custom parameters to an operation. Runners are what actually performs +the operations against OpenSearch. + +### Custom Parameter Sources + +Custom parameter sources are defined in [extensions/param_sources.py](extensions/param_sources.py). + +| Name | Description | Parameters | +|--------------------|------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| bulk-from-data-set | Provides bulk payloads containing vectors from a data set for indexing | 1. data_set_format - (hdf5, bigann)
2. data_set_path - path to data set
3. index - name of index for bulk ingestion
4. field - field to place vector in
5. bulk_size - vectors per bulk request | + + +### Custom Runners + +Custom runners are defined in [extensions/runners.py](extensions/runners.py). + +| Syntax | Description | Parameters | +|--------------------|-----------------------------------------------------|:-------------------------------------------------------------------------------------------------------------| +| custom-vector-bulk | Bulk index a set of vectors in an OpenSearch index. | 1. bulk-from-data-set | +| custom-refresh | Run refresh with retry capabilities. | 1. index - name of index to refresh
2. retries - number of times to retry the operation | +| train-model | Trains a model. | 1. body - model definition
2. timeout - time to wait for model to finish
3. model_id - ID of model | +| delete-model | Deletes a model if it exists. | 1. model_id - ID of model | + diff --git a/benchmarks/osb/__init__.py b/benchmarks/osb/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/benchmarks/osb/extensions/__init__.py b/benchmarks/osb/extensions/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/benchmarks/osb/extensions/data_set.py b/benchmarks/osb/extensions/data_set.py new file mode 100644 index 000000000..4feb4c2e7 --- /dev/null +++ b/benchmarks/osb/extensions/data_set.py @@ -0,0 +1,199 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import os +import numpy as np +from abc import ABC, ABCMeta, abstractmethod +from enum import Enum +from typing import cast +import h5py +import struct + + +class Context(Enum): + """DataSet context enum. Can be used to add additional context for how a + data-set should be interpreted. + """ + INDEX = 1 + QUERY = 2 + NEIGHBORS = 3 + + +class DataSet(ABC): + """DataSet interface. Used for reading data-sets from files. + + Methods: + read: Read a chunk of data from the data-set + seek: Get to position in the data-set + size: Gets the number of items in the data-set + reset: Resets internal state of data-set to beginning + """ + __metaclass__ = ABCMeta + + BEGINNING = 0 + + @abstractmethod + def read(self, chunk_size: int): + pass + + @abstractmethod + def seek(self, offset: int): + pass + + @abstractmethod + def size(self): + pass + + @abstractmethod + def reset(self): + pass + + +class HDF5DataSet(DataSet): + """ Data-set format corresponding to `ANN Benchmarks + `_ + """ + + FORMAT_NAME = "hdf5" + + def __init__(self, dataset_path: str, context: Context): + file = h5py.File(dataset_path) + self.data = cast(h5py.Dataset, file[self._parse_context(context)]) + self.current = self.BEGINNING + + def read(self, chunk_size: int): + if self.current >= self.size(): + return None + + end_offset = self.current + chunk_size + if end_offset > self.size(): + end_offset = self.size() + + v = cast(np.ndarray, self.data[self.current:end_offset]) + self.current = end_offset + return v + + def seek(self, offset: int): + + if offset < self.BEGINNING: + raise Exception("Offset must be greater than or equal to 0") + + if offset >= self.size(): + raise Exception("Offset must be less than the data set size") + + self.current = offset + + def size(self): + return self.data.len() + + def reset(self): + self.current = self.BEGINNING + + @staticmethod + def _parse_context(context: Context) -> str: + if context == Context.NEIGHBORS: + return "neighbors" + + if context == Context.INDEX: + return "train" + + if context == Context.QUERY: + return "test" + + raise Exception("Unsupported context") + + +class BigANNVectorDataSet(DataSet): + """ Data-set format for vector data-sets for `Big ANN Benchmarks + `_ + """ + + DATA_SET_HEADER_LENGTH = 8 + U8BIN_EXTENSION = "u8bin" + FBIN_EXTENSION = "fbin" + FORMAT_NAME = "bigann" + + BYTES_PER_U8INT = 1 + BYTES_PER_FLOAT = 4 + + def __init__(self, dataset_path: str): + self.file = open(dataset_path, 'rb') + self.file.seek(BigANNVectorDataSet.BEGINNING, os.SEEK_END) + num_bytes = self.file.tell() + self.file.seek(BigANNVectorDataSet.BEGINNING) + + if num_bytes < BigANNVectorDataSet.DATA_SET_HEADER_LENGTH: + raise Exception("File is invalid") + + self.num_points = int.from_bytes(self.file.read(4), "little") + self.dimension = int.from_bytes(self.file.read(4), "little") + self.bytes_per_num = self._get_data_size(dataset_path) + + if (num_bytes - BigANNVectorDataSet.DATA_SET_HEADER_LENGTH) != self.num_points * \ + self.dimension * self.bytes_per_num: + raise Exception("File is invalid") + + self.reader = self._value_reader(dataset_path) + self.current = BigANNVectorDataSet.BEGINNING + + def read(self, chunk_size: int): + if self.current >= self.size(): + return None + + end_offset = self.current + chunk_size + if end_offset > self.size(): + end_offset = self.size() + + v = np.asarray([self._read_vector() for _ in + range(end_offset - self.current)]) + self.current = end_offset + return v + + def seek(self, offset: int): + + if offset < self.BEGINNING: + raise Exception("Offset must be greater than or equal to 0") + + if offset >= self.size(): + raise Exception("Offset must be less than the data set size") + + bytes_offset = BigANNVectorDataSet.DATA_SET_HEADER_LENGTH + \ + self.dimension * self.bytes_per_num * offset + self.file.seek(bytes_offset) + self.current = offset + + def _read_vector(self): + return np.asarray([self.reader(self.file) for _ in + range(self.dimension)]) + + def size(self): + return self.num_points + + def reset(self): + self.file.seek(BigANNVectorDataSet.DATA_SET_HEADER_LENGTH) + self.current = BigANNVectorDataSet.BEGINNING + + @staticmethod + def _get_data_size(file_name): + ext = file_name.split('.')[-1] + if ext == BigANNVectorDataSet.U8BIN_EXTENSION: + return BigANNVectorDataSet.BYTES_PER_U8INT + + if ext == BigANNVectorDataSet.FBIN_EXTENSION: + return BigANNVectorDataSet.BYTES_PER_FLOAT + + raise Exception("Unknown extension") + + @staticmethod + def _value_reader(file_name): + ext = file_name.split('.')[-1] + if ext == BigANNVectorDataSet.U8BIN_EXTENSION: + return lambda file: float(int.from_bytes(file.read(BigANNVectorDataSet.BYTES_PER_U8INT), "little")) + + if ext == BigANNVectorDataSet.FBIN_EXTENSION: + return lambda file: struct.unpack('= self.num_vectors + self.offset: + raise StopIteration + + def action(doc_id): + return {'index': {'_index': self.index_name, '_id': doc_id}} + + partition = self.data_set.read(self.bulk_size) + body = bulk_transform(partition, self.field_name, action, self.current) + size = len(body) // 2 + self.current += size + self.percent_completed = self.current / self.total + + return { + "body": body, + "retries": self.retries, + "size": size + } diff --git a/benchmarks/osb/extensions/registry.py b/benchmarks/osb/extensions/registry.py new file mode 100644 index 000000000..5ce17ab6f --- /dev/null +++ b/benchmarks/osb/extensions/registry.py @@ -0,0 +1,13 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +from .param_sources import register as param_sources_register +from .runners import register as runners_register + + +def register(registry): + param_sources_register(registry) + runners_register(registry) diff --git a/benchmarks/osb/extensions/runners.py b/benchmarks/osb/extensions/runners.py new file mode 100644 index 000000000..d048f80b0 --- /dev/null +++ b/benchmarks/osb/extensions/runners.py @@ -0,0 +1,121 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. +from opensearchpy.exceptions import ConnectionTimeout +from .util import parse_int_parameter, parse_string_parameter +import logging +import time + + +def register(registry): + registry.register_runner( + "custom-vector-bulk", BulkVectorsFromDataSetRunner(), async_runner=True + ) + registry.register_runner( + "custom-refresh", CustomRefreshRunner(), async_runner=True + ) + registry.register_runner( + "train-model", TrainModelRunner(), async_runner=True + ) + registry.register_runner( + "delete-model", DeleteModelRunner(), async_runner=True + ) + + +class BulkVectorsFromDataSetRunner: + + async def __call__(self, opensearch, params): + size = parse_int_parameter("size", params) + retries = parse_int_parameter("retries", params, 0) + 1 + + for _ in range(retries): + try: + await opensearch.bulk( + body=params["body"], + timeout='5m' + ) + + return size, "docs" + except ConnectionTimeout: + logging.getLogger(__name__)\ + .warning("Bulk vector ingestion timed out. Retrying") + + raise TimeoutError("Failed to submit bulk request in specified number " + "of retries: {}".format(retries)) + + def __repr__(self, *args, **kwargs): + return "custom-vector-bulk" + + +class CustomRefreshRunner: + + async def __call__(self, opensearch, params): + retries = parse_int_parameter("retries", params, 0) + 1 + + for _ in range(retries): + try: + await opensearch.indices.refresh( + index=parse_string_parameter("index", params) + ) + + return + except ConnectionTimeout: + logging.getLogger(__name__)\ + .warning("Custom refresh timed out. Retrying") + + raise TimeoutError("Failed to refresh the index in specified number " + "of retries: {}".format(retries)) + + def __repr__(self, *args, **kwargs): + return "custom-refresh" + + +class TrainModelRunner: + + async def __call__(self, opensearch, params): + # Train a model and wait for it training to complete + body = params["body"] + timeout = parse_int_parameter("timeout", params) + model_id = parse_string_parameter("model_id", params) + + method = "POST" + model_uri = "/_plugins/_knn/models/{}".format(model_id) + await opensearch.transport.perform_request(method, "{}/_train".format(model_uri), body=body) + + start_time = time.time() + while time.time() < start_time + timeout: + time.sleep(1) + model_response = await opensearch.transport.perform_request("GET", model_uri) + + if 'state' not in model_response.keys(): + continue + + if model_response['state'] == 'created': + #TODO: Return model size as well + return 1, "models_trained" + + if model_response['state'] == 'failed': + raise Exception("Failed to create model: {}".format(model_response)) + + raise Exception('Failed to create model: {} within timeout {} seconds' + .format(model_id, timeout)) + + def __repr__(self, *args, **kwargs): + return "train-model" + + +class DeleteModelRunner: + + async def __call__(self, opensearch, params): + # Delete model provided by model id + method = "DELETE" + model_id = parse_string_parameter("model_id", params) + uri = "/_plugins/_knn/models/{}".format(model_id) + + # Ignore if model doesnt exist + await opensearch.transport.perform_request(method, uri, params={"ignore": [400, 404]}) + + def __repr__(self, *args, **kwargs): + return "delete-model" diff --git a/benchmarks/osb/extensions/util.py b/benchmarks/osb/extensions/util.py new file mode 100644 index 000000000..f7f6aab62 --- /dev/null +++ b/benchmarks/osb/extensions/util.py @@ -0,0 +1,71 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +import numpy as np +from typing import List +from typing import Dict +from typing import Any + + +def bulk_transform(partition: np.ndarray, field_name: str, action, + offset: int) -> List[Dict[str, Any]]: + """Partitions and transforms a list of vectors into OpenSearch's bulk + injection format. + Args: + offset: to start counting from + partition: An array of vectors to transform. + field_name: field name for action + action: Bulk API action. + Returns: + An array of transformed vectors in bulk format. + """ + actions = [] + _ = [ + actions.extend([action(i + offset), None]) + for i in range(len(partition)) + ] + actions[1::2] = [{field_name: vec} for vec in partition.tolist()] + return actions + + +def parse_string_parameter(key: str, params: dict, default: str = None) -> str: + if key not in params: + if default is not None: + return default + raise ConfigurationError( + "Value cannot be None for param {}".format(key) + ) + + if type(params[key]) is str: + return params[key] + + raise ConfigurationError("Value must be a string for param {}".format(key)) + + +def parse_int_parameter(key: str, params: dict, default: int = None) -> int: + if key not in params: + if default: + return default + raise ConfigurationError( + "Value cannot be None for param {}".format(key) + ) + + if type(params[key]) is int: + return params[key] + + raise ConfigurationError("Value must be a int for param {}".format(key)) + + +class ConfigurationError(Exception): + """Exception raised for errors configuration. + + Attributes: + message -- explanation of the error + """ + + def __init__(self, message: str): + self.message = f'{message}' + super().__init__(self.message) diff --git a/benchmarks/osb/indices/faiss-index.json b/benchmarks/osb/indices/faiss-index.json new file mode 100644 index 000000000..2db4d34d4 --- /dev/null +++ b/benchmarks/osb/indices/faiss-index.json @@ -0,0 +1,27 @@ +{ + "settings": { + "index": { + "knn": true, + "number_of_shards": {{ target_index_primary_shards }}, + "number_of_replicas": {{ target_index_replica_shards }} + } + }, + "mappings": { + "properties": { + "target_field": { + "type": "knn_vector", + "dimension": {{ target_index_dimension }}, + "method": { + "name": "hnsw", + "space_type": "{{ target_index_space_type }}", + "engine": "faiss", + "parameters": { + "ef_search": {{ hnsw_ef_search }}, + "ef_construction": {{ hnsw_ef_construction }}, + "m": {{ hnsw_m }} + } + } + } + } + } +} diff --git a/benchmarks/osb/indices/model-index.json b/benchmarks/osb/indices/model-index.json new file mode 100644 index 000000000..0e92c8903 --- /dev/null +++ b/benchmarks/osb/indices/model-index.json @@ -0,0 +1,17 @@ +{ + "settings": { + "index": { + "knn": true, + "number_of_shards": {{ target_index_primary_shards | default(1) }}, + "number_of_replicas": {{ target_index_replica_shards | default(0) }} + } + }, + "mappings": { + "properties": { + "{{ target_field_name }}": { + "type": "knn_vector", + "model_id": "{{ train_model_id }}" + } + } + } +} diff --git a/benchmarks/osb/indices/nmslib-index.json b/benchmarks/osb/indices/nmslib-index.json new file mode 100644 index 000000000..4ceb57977 --- /dev/null +++ b/benchmarks/osb/indices/nmslib-index.json @@ -0,0 +1,27 @@ +{ + "settings": { + "index": { + "knn": true, + "knn.algo_param.ef_search": {{ hnsw_ef_search }}, + "number_of_shards": {{ target_index_primary_shards }}, + "number_of_replicas": {{ target_index_replica_shards }} + } + }, + "mappings": { + "properties": { + "target_field": { + "type": "knn_vector", + "dimension": {{ target_index_dimension }}, + "method": { + "name": "hnsw", + "space_type": "{{ target_index_space_type }}", + "engine": "nmslib", + "parameters": { + "ef_construction": {{ hnsw_ef_construction }}, + "m": {{ hnsw_m }} + } + } + } + } + } +} diff --git a/benchmarks/osb/indices/train-index.json b/benchmarks/osb/indices/train-index.json new file mode 100644 index 000000000..82af8215e --- /dev/null +++ b/benchmarks/osb/indices/train-index.json @@ -0,0 +1,16 @@ +{ + "settings": { + "index": { + "number_of_shards": {{ train_index_primary_shards }}, + "number_of_replicas": {{ train_index_replica_shards }} + } + }, + "mappings": { + "properties": { + "{{ train_field_name }}": { + "type": "knn_vector", + "dimension": {{ target_index_dimension }} + } + } + } +} diff --git a/benchmarks/osb/operations/default.json b/benchmarks/osb/operations/default.json new file mode 100644 index 000000000..ee33166f0 --- /dev/null +++ b/benchmarks/osb/operations/default.json @@ -0,0 +1,53 @@ +[ + { + "name": "ivfpq-train-model", + "operation-type": "train-model", + "model_id": "{{ train_model_id }}", + "timeout": {{ train_timeout }}, + "body": { + "training_index": "{{ train_index_name }}", + "training_field": "{{ train_field_name }}", + "dimension": {{ target_index_dimension }}, + "search_size": {{ train_search_size }}, + "max_training_vector_count": {{ train_index_num_vectors }}, + "method": { + "name":"ivf", + "engine":"faiss", + "space_type": "{{ target_index_space_type }}", + "parameters":{ + "nlist": {{ ivf_nlists }}, + "nprobes": {{ ivf_nprobes }}, + "encoder":{ + "name":"pq", + "parameters":{ + "code_size": {{ pq_code_size }}, + "m": {{ pq_m }} + } + } + } + } + } + }, + { + "name": "ivf-train-model", + "operation-type": "train-model", + "model_id": "{{ train_model_id }}", + "timeout": {{ train_timeout | default(1000) }}, + "body": { + "training_index": "{{ train_index_name }}", + "training_field": "{{ train_field_name }}", + "search_size": {{ train_search_size }}, + "dimension": {{ target_index_dimension }}, + "max_training_vector_count": {{ train_index_num_vectors }}, + "method": { + "name":"ivf", + "engine":"faiss", + "space_type": "{{ target_index_space_type }}", + "parameters":{ + "nlist": {{ ivf_nlists }}, + "nprobes": {{ ivf_nprobes }} + } + } + } + } +] diff --git a/benchmarks/osb/params/no-train-params.json b/benchmarks/osb/params/no-train-params.json new file mode 100644 index 000000000..988c1717b --- /dev/null +++ b/benchmarks/osb/params/no-train-params.json @@ -0,0 +1,35 @@ +{ + "target_index_name": "target_index", + "target_field_name": "target_field", + "target_index_body": "indices/nmslib-index.json", + "target_index_primary_shards": 3, + "target_index_replica_shards": 1, + "target_index_dimension": 128, + "target_index_space_type": "l2", + "target_index_bulk_size": 200, + "target_index_bulk_index_data_set_format": "hdf5", + "target_index_bulk_index_data_set_path": "", + "target_index_bulk_index_clients": 10, + "hnsw_ef_search": 512, + "hnsw_ef_construction": 512, + "hnsw_m": 16, + + + + "ivf_nlists": 1, + "ivf_nprobes": 1, + "pq_code_size": 1, + "pq_m": 1, + "train_model_method": "", + "train_model_id": "", + "train_index_name": "", + "train_field_name": "", + "train_index_body": "", + "train_search_size": 1, + "train_timeout": 1, + "train_index_bulk_size": 1, + "train_index_data_set_format": "", + "train_index_data_set_path": "", + "train_index_num_vectors": 1, + "train_index_bulk_index_clients": 1 +} diff --git a/benchmarks/osb/params/train-params.json b/benchmarks/osb/params/train-params.json new file mode 100644 index 000000000..b50c235c4 --- /dev/null +++ b/benchmarks/osb/params/train-params.json @@ -0,0 +1,31 @@ +{ + "target_index_name": "target_index", + "target_field_name": "target_field", + "target_index_body": "indices/model-index.json", + "target_index_primary_shards": 3, + "target_index_replica_shards": 1, + "target_index_dimension": 128, + "target_index_space_type": "l2", + "target_index_bulk_size": 200, + "target_index_bulk_index_data_set_format": "hdf5", + "target_index_bulk_index_data_set_path": "", + "target_index_bulk_index_clients": 10, + "ivf_nlists": 10, + "ivf_nprobes": 1, + "pq_code_size": 8, + "pq_m": 8, + "train_model_method": "ivfpq", + "train_model_id": "test-model", + "train_index_name": "train_index", + "train_field_name": "train_field", + "train_index_body": "indices/train-index.json", + "train_search_size": 500, + "train_timeout": 5000, + "train_index_primary_shards": 1, + "train_index_replica_shards": 0, + "train_index_bulk_size": 200, + "train_index_data_set_format": "hdf5", + "train_index_data_set_path": "", + "train_index_num_vectors": 1000000, + "train_index_bulk_index_clients": 10 +} diff --git a/benchmarks/osb/procedures/no-train-test.json b/benchmarks/osb/procedures/no-train-test.json new file mode 100644 index 000000000..f54696360 --- /dev/null +++ b/benchmarks/osb/procedures/no-train-test.json @@ -0,0 +1,50 @@ +{% import "benchmark.helpers" as benchmark with context %} +{ + "name": "no-train-test", + "default": true, + "schedule": [ + { + "operation": { + "name": "delete-target-index", + "operation-type": "delete-index", + "only-if-exists": true, + "index": "{{ target_index_name }}" + } + }, + { + "operation": { + "name": "create-target-index", + "operation-type": "create-index", + "index": "{{ target_index_name }}" + } + }, + { + "name": "wait-for-cluster-to-be-green", + "operation": "cluster-health", + "request-params": { + "wait_for_status": "green" + } + }, + { + "operation": { + "name": "custom-vector-bulk", + "operation-type": "custom-vector-bulk", + "param-source": "bulk-from-data-set", + "index": "{{ target_index_name }}", + "field": "{{ target_field_name }}", + "bulk_size": {{ target_index_bulk_size }}, + "data_set_format": "{{ target_index_bulk_index_data_set_format }}", + "data_set_path": "{{ target_index_bulk_index_data_set_path }}" + }, + "clients": {{ target_index_bulk_index_clients }} + }, + { + "operation": { + "name": "refresh-target-index", + "operation-type": "custom-refresh", + "index": "{{ target_index_name }}", + "retries": 100 + } + } + ] +} diff --git a/benchmarks/osb/procedures/train-test.json b/benchmarks/osb/procedures/train-test.json new file mode 100644 index 000000000..8f5efd674 --- /dev/null +++ b/benchmarks/osb/procedures/train-test.json @@ -0,0 +1,104 @@ +{% import "benchmark.helpers" as benchmark with context %} +{ + "name": "train-test", + "default": false, + "schedule": [ + { + "operation": { + "name": "delete-target-index", + "operation-type": "delete-index", + "only-if-exists": true, + "index": "{{ target_index_name }}" + } + }, + { + "operation": { + "name": "delete-train-index", + "operation-type": "delete-index", + "only-if-exists": true, + "index": "{{ train_index_name }}" + } + }, + { + "operation": { + "operation-type": "delete-model", + "name": "delete-model", + "model_id": "{{ train_model_id }}" + } + }, + { + "operation": { + "name": "create-train-index", + "operation-type": "create-index", + "index": "{{ train_index_name }}" + } + }, + { + "name": "wait-for-train-index-to-be-green", + "operation": "cluster-health", + "request-params": { + "wait_for_status": "green" + } + }, + { + "operation": { + "name": "train-vector-bulk", + "operation-type": "custom-vector-bulk", + "param-source": "bulk-from-data-set", + "index": "{{ train_index_name }}", + "field": "{{ train_field_name }}", + "bulk_size": {{ train_index_bulk_size }}, + "data_set_format": "{{ train_index_data_set_format }}", + "data_set_path": "{{ train_index_data_set_path }}", + "num_vectors": {{ train_index_num_vectors }} + }, + "clients": {{ train_index_bulk_index_clients }} + }, + { + "operation": { + "name": "refresh-train-index", + "operation-type": "custom-refresh", + "index": "{{ train_index_name }}", + "retries": 100 + } + }, + { + "operation": "{{ train_model_method }}-train-model" + }, + { + "operation": { + "name": "create-target-index", + "operation-type": "create-index", + "index": "{{ target_index_name }}" + } + }, + { + "name": "wait-for-target-index-to-be-green", + "operation": "cluster-health", + "request-params": { + "wait_for_status": "green" + } + }, + { + "operation": { + "name": "custom-vector-bulk", + "operation-type": "custom-vector-bulk", + "param-source": "bulk-from-data-set", + "index": "{{ target_index_name }}", + "field": "{{ target_field_name }}", + "bulk_size": {{ target_index_bulk_size }}, + "data_set_format": "{{ target_index_bulk_index_data_set_format }}", + "data_set_path": "{{ target_index_bulk_index_data_set_path }}" + }, + "clients": {{ target_index_bulk_index_clients }} + }, + { + "operation": { + "name": "refresh-target-index", + "operation-type": "custom-refresh", + "index": "{{ target_index_name }}", + "retries": 100 + } + } + ] +} diff --git a/benchmarks/osb/requirements.in b/benchmarks/osb/requirements.in new file mode 100644 index 000000000..a9e12b5d3 --- /dev/null +++ b/benchmarks/osb/requirements.in @@ -0,0 +1,4 @@ +opensearch-py +numpy +h5py +opensearch-benchmark diff --git a/benchmarks/osb/requirements.txt b/benchmarks/osb/requirements.txt new file mode 100644 index 000000000..271e8ab07 --- /dev/null +++ b/benchmarks/osb/requirements.txt @@ -0,0 +1,98 @@ +# +# This file is autogenerated by pip-compile with python 3.8 +# To update, run: +# +# pip-compile +# +aiohttp==3.8.1 + # via opensearch-py +aiosignal==1.2.0 + # via aiohttp +async-timeout==4.0.2 + # via aiohttp +attrs==21.4.0 + # via + # aiohttp + # jsonschema +cachetools==4.2.4 + # via google-auth +certifi==2021.10.8 + # via + # opensearch-benchmark + # opensearch-py +charset-normalizer==2.0.12 + # via aiohttp +frozenlist==1.3.0 + # via + # aiohttp + # aiosignal +google-auth==1.22.1 + # via opensearch-benchmark +google-crc32c==1.3.0 + # via google-resumable-media +google-resumable-media==1.1.0 + # via opensearch-benchmark +h5py==3.6.0 + # via -r requirements.in +idna==3.3 + # via yarl +ijson==2.6.1 + # via opensearch-benchmark +importlib-metadata==4.11.3 + # via jsonschema +jinja2==2.11.3 + # via opensearch-benchmark +jsonschema==3.1.1 + # via opensearch-benchmark +markupsafe==2.0.1 + # via + # jinja2 + # opensearch-benchmark +multidict==6.0.2 + # via + # aiohttp + # yarl +numpy==1.22.3 + # via + # -r requirements.in + # h5py +opensearch-benchmark==0.0.2 + # via -r requirements.in +opensearch-py[async]==1.0.0 + # via + # -r requirements.in + # opensearch-benchmark +psutil==5.8.0 + # via opensearch-benchmark +py-cpuinfo==7.0.0 + # via opensearch-benchmark +pyasn1==0.4.8 + # via + # pyasn1-modules + # rsa +pyasn1-modules==0.2.8 + # via google-auth +pyrsistent==0.18.1 + # via jsonschema +rsa==4.8 + # via google-auth +six==1.16.0 + # via + # google-auth + # google-resumable-media + # jsonschema +tabulate==0.8.7 + # via opensearch-benchmark +thespian==3.10.1 + # via opensearch-benchmark +urllib3==1.26.9 + # via opensearch-py +yappi==1.2.3 + # via opensearch-benchmark +yarl==1.7.2 + # via aiohttp +zipp==3.7.0 + # via importlib-metadata + +# The following packages are considered to be unsafe in a requirements file: +# setuptools \ No newline at end of file diff --git a/benchmarks/osb/workload.json b/benchmarks/osb/workload.json new file mode 100644 index 000000000..bd0d84195 --- /dev/null +++ b/benchmarks/osb/workload.json @@ -0,0 +1,17 @@ +{% import "benchmark.helpers" as benchmark with context %} +{ + "version": 2, + "description": "k-NN Plugin train workload", + "indices": [ + { + "name": "{{ target_index_name }}", + "body": "{{ target_index_body }}" + }, + { + "name": "{{ train_index_name }}", + "body": "{{ train_index_body }}" + } + ], + "operations": {{ benchmark.collect(parts="operations/*.json") }}, + "test_procedures": [{{ benchmark.collect(parts="procedures/*.json") }}] +} diff --git a/benchmarks/osb/workload.py b/benchmarks/osb/workload.py new file mode 100644 index 000000000..32e6ad02c --- /dev/null +++ b/benchmarks/osb/workload.py @@ -0,0 +1,18 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +# This code needs to be included at the top of every workload.py file. +# OpenSearch Benchmarks is not able to find other helper files unless the path +# is updated. +import os +import sys +sys.path.append(os.path.abspath(os.getcwd())) + +from extensions.registry import register as custom_register + + +def register(registry): + custom_register(registry)