diff --git a/benchmarks/perf-tool/README.md b/benchmarks/perf-tool/README.md index b76fd1557..f935abf8c 100644 --- a/benchmarks/perf-tool/README.md +++ b/benchmarks/perf-tool/README.md @@ -219,7 +219,7 @@ Ingests a dataset of vectors into the cluster. | index_name | Name of index to ingest into | No default | | field_name | Name of field to ingest into | No default | | bulk_size | Documents per bulk request | 300 | -| dataset_format | Format the dataset is in. Currently only hdf5 is supported. The hdf5 file must be organized in the same way that the ann-benchmarks organizes theirs. This will use the "train" data as the data to ingest. | 'hdf5' | +| dataset_format | Format the dataset is in. Currently hdf5 and bigann is supported. The hdf5 file must be organized in the same way that the ann-benchmarks organizes theirs. | 'hdf5' | | dataset_path | Path to dataset | No default | ##### Metrics @@ -241,8 +241,10 @@ Runs a set of queries against an index. | index_name | Name of index to search | No default | | field_name | Name field to search | No default | | calculate_recall | Whether to calculate recall values | False | -| dataset_format | Format the dataset is in. Currently only hdf5 is supported. The hdf5 file must be organized in the same way that the ann-benchmarks organizes theirs. This will use the "test" data as the data to use for queries. | 'hdf5' | +| dataset_format | Format the dataset is in. Currently hdf5 and bigann is supported. The hdf5 file must be organized in the same way that the ann-benchmarks organizes theirs. | 'hdf5' | | dataset_path | Path to dataset | No default | +| neighbors_format | Format the neighbors dataset is in. Currently hdf5 and bigann is supported. The hdf5 file must be organized in the same way that the ann-benchmarks organizes theirs. | 'hdf5' | +| neighbors_path | Path to neighbors dataset | No default | ##### Metrics diff --git a/benchmarks/perf-tool/okpt/io/config/parsers/util.py b/benchmarks/perf-tool/okpt/io/config/parsers/util.py index 3decd7eda..cecb9f2d0 100644 --- a/benchmarks/perf-tool/okpt/io/config/parsers/util.py +++ b/benchmarks/perf-tool/okpt/io/config/parsers/util.py @@ -6,30 +6,24 @@ """Utility functions for parsing""" -from dataclasses import dataclass -from typing import Union, cast -import h5py from okpt.io.config.parsers.base import ConfigurationError +from okpt.io.dataset import HDF5DataSet, BigANNNeighborDataSet, \ + BigANNVectorDataSet, DataSet, Context -@dataclass -class Dataset: - train: h5py.Dataset - test: h5py.Dataset - neighbors: h5py.Dataset - distances: h5py.Dataset +def parse_dataset(dataset_format: str, dataset_path: str, + context: Context) -> DataSet: + if dataset_format == 'hdf5': + return HDF5DataSet(dataset_path, context) + if dataset_format == 'bigann' and context == Context.NEIGHBORS: + return BigANNNeighborDataSet(dataset_path) -def parse_dataset(dataset_path: str, dataset_format: str) -> Union[Dataset]: - if dataset_format == 'hdf5': - file = h5py.File(dataset_path) - return Dataset(train=cast(h5py.Dataset, file['train']), - test=cast(h5py.Dataset, file['test']), - neighbors=cast(h5py.Dataset, file['neighbors']), - distances=cast(h5py.Dataset, file['distances'])) - else: - raise Exception() + if dataset_format == 'bigann': + return BigANNVectorDataSet(dataset_path) + + raise Exception("Unsupported data-set format") def parse_string_param(key: str, first_map, second_map, default) -> str: diff --git a/benchmarks/perf-tool/okpt/io/dataset.py b/benchmarks/perf-tool/okpt/io/dataset.py new file mode 100644 index 000000000..4f8bc22a2 --- /dev/null +++ b/benchmarks/perf-tool/okpt/io/dataset.py @@ -0,0 +1,218 @@ +# SPDX-License-Identifier: Apache-2.0 +# +# The OpenSearch Contributors require contributions made to +# this file be licensed under the Apache-2.0 license or a +# compatible open source license. + +"""Defines DataSet interface and implements particular formats + +A DataSet is the basic functionality that it can be read in chunks, or +read completely and reset to the start. + +Currently, we support HDF5 formats from ann-benchmarks and big-ann-benchmarks +datasets. + +Classes: + HDF5DataSet: Format used in ann-benchmarks + BigANNNeighborDataSet: Neighbor format for big-ann-benchmarks + BigANNVectorDataSet: Vector format for big-ann-benchmarks +""" +import os +from abc import ABC, ABCMeta, abstractmethod +from enum import Enum +from typing import cast +import h5py +import numpy as np + +import struct + + +class Context(Enum): + """DataSet context enum. Can be used to add additional context for how a + data-set should be interpreted. + """ + INDEX = 1 + QUERY = 2 + NEIGHBORS = 3 + + +class DataSet(ABC): + """DataSet interface. Used for reading data-sets from files. + + Methods: + read: Read a chunk of data from the data-set + size: Gets the number of items in the data-set + reset: Resets internal state of data-set to beginning + """ + __metaclass__ = ABCMeta + + @abstractmethod + def read(self, chunk_size: int): + pass + + @abstractmethod + def size(self): + pass + + @abstractmethod + def reset(self): + pass + + +class HDF5DataSet(DataSet): + """ Data-set format corresponding to `ANN Benchmarks + `_ + """ + + def __init__(self, dataset_path: str, context: Context): + file = h5py.File(dataset_path) + self.data = cast(h5py.Dataset, file[self._parse_context(context)]) + self.current = 0 + + def read(self, chunk_size: int): + if self.current >= self.size(): + return None + + end_i = self.current + chunk_size + if end_i > self.size(): + end_i = self.size() + + v = cast(np.ndarray, self.data[self.current:end_i]) + self.current = end_i + return v + + def size(self): + return self.data.len() + + def reset(self): + self.current = 0 + + @staticmethod + def _parse_context(context: Context) -> str: + if context == Context.NEIGHBORS: + return "neighbors" + + if context == Context.INDEX: + return "train" + + if context == Context.QUERY: + return "test" + + raise Exception("Unsupported context") + + +class BigANNNeighborDataSet(DataSet): + """ Data-set format for neighbor data-sets for `Big ANN Benchmarks + `_""" + + def __init__(self, dataset_path: str): + self.file = open(dataset_path, 'rb') + self.file.seek(0, os.SEEK_END) + num_bytes = self.file.tell() + self.file.seek(0) + + if num_bytes < 8: + raise Exception("File is invalid") + + self.num_queries = int.from_bytes(self.file.read(4), "little") + self.k = int.from_bytes(self.file.read(4), "little") + + # According to the website, the number of bytes that will follow will + # be: num_queries X K x sizeof(uint32_t) bytes + num_queries X K x + # sizeof(float) + if (num_bytes - 8) != 2 * (self.num_queries * self.k * 4): + raise Exception("File is invalid") + + self.current = 0 + + def read(self, chunk_size: int): + if self.current >= self.size(): + return None + + end_i = self.current + chunk_size + if end_i > self.size(): + end_i = self.size() + + v = [[int.from_bytes(self.file.read(4), "little") for _ in + range(self.k)] for _ in range(end_i - self.current)] + + self.current = end_i + return v + + def size(self): + return self.num_queries + + def reset(self): + self.file.seek(8) + self.current = 0 + + +class BigANNVectorDataSet(DataSet): + """ Data-set format for vector data-sets for `Big ANN Benchmarks + `_ + """ + + def __init__(self, dataset_path: str): + self.file = open(dataset_path, 'rb') + self.file.seek(0, os.SEEK_END) + num_bytes = self.file.tell() + self.file.seek(0) + + if num_bytes < 8: + raise Exception("File is invalid") + + self.num_points = int.from_bytes(self.file.read(4), "little") + self.dimension = int.from_bytes(self.file.read(4), "little") + bytes_per_num = self._get_data_size(dataset_path) + + if (num_bytes - 8) != self.num_points * self.dimension * bytes_per_num: + raise Exception("File is invalid") + + self.reader = self._value_reader(dataset_path) + self.current = 0 + + def read(self, chunk_size: int): + if self.current >= self.size(): + return None + + end_i = self.current + chunk_size + if end_i > self.size(): + end_i = self.size() + + v = np.asarray([self._read_vector() for _ in + range(end_i - self.current)]) + self.current = end_i + return v + + def _read_vector(self): + return np.asarray([self.reader(self.file) for _ in + range(self.dimension)]) + + def size(self): + return self.num_points + + def reset(self): + self.file.seek(8) # Seek to 8 bytes to skip re-reading metadata + self.current = 0 + + @staticmethod + def _get_data_size(file_name): + ext = file_name.split('.')[-1] + if ext == "u8bin": + return 1 + + if ext == "fbin": + return 4 + + raise Exception("Unknown extension") + + @staticmethod + def _value_reader(file_name): + ext = file_name.split('.')[-1] + if ext == "u8bin": + return lambda file: float(int.from_bytes(file.read(1), "little")) + + if ext == "fbin": + return lambda file: struct.unpack(' List[str]: @@ -319,9 +324,17 @@ def __init__(self, step_config: StepConfig): step_config.config, {}, False) dataset_format = parse_string_param('dataset_format', step_config.config, {}, 'hdf5') - dataset_path = parse_string_param('dataset_path', step_config.config, - {}, None) - self.dataset = parse_dataset(dataset_path, dataset_format) + dataset_path = parse_string_param('dataset_path', + step_config.config, {}, None) + self.dataset = parse_dataset(dataset_format, dataset_path, + Context.QUERY) + + neighbors_format = parse_string_param('neighbors_format', + step_config.config, {}, 'hdf5') + neighbors_path = parse_string_param('neighbors_path', + step_config.config, {}, None) + self.neighbors = parse_dataset(neighbors_format, neighbors_path, + Context.NEIGHBORS) self.implicit_config = step_config.implicit_config def _action(self): @@ -341,10 +354,13 @@ def get_body(vec): results = {} query_responses = [] - for v in self.dataset.test: + while True: + query = self.dataset.read(1) + if query is None: + break query_responses.append( - query_index(self.opensearch, self.index_name, get_body(v), - [self.field_name])) + query_index(self.opensearch, self.index_name, + get_body(query[0]), [self.field_name])) results['took'] = [ float(query_response['took']) for query_response in query_responses @@ -355,10 +371,14 @@ def get_body(vec): ids = [[int(hit['_id']) for hit in query_response['hits']['hits']] for query_response in query_responses] - results['recall@K'] = recall_at_r(ids, self.dataset.neighbors, + results['recall@K'] = recall_at_r(ids, self.neighbors, self.k, self.k) + self.neighbors.reset() results[f'recall@{str(self.r)}'] = recall_at_r( - ids, self.dataset.neighbors, self.r, self.k) + ids, self.neighbors, self.r, self.k) + self.neighbors.reset() + + self.dataset.reset() return results @@ -458,7 +478,7 @@ def get_opensearch_client(endpoint: str, port: int): ) -def recall_at_r(results, ground_truth_set, r, k): +def recall_at_r(results, neighbor_dataset, r, k): """ Calculates the recall@R for a set of queries against a ground truth nearest neighbor set @@ -466,8 +486,8 @@ def recall_at_r(results, ground_truth_set, r, k): results: 2D list containing ids of results returned by OpenSearch. results[i][j] i refers to query, j refers to result in the query - ground_truth_set: 2D list containing ids of the true nearest neighbors - for a set of queries + neighbor_dataset: 2D dataset containing ids of the true nearest + neighbors for a set of queries r: number of top results to check if they are in the ground truth k-NN set. k: k value for the query @@ -475,13 +495,18 @@ def recall_at_r(results, ground_truth_set, r, k): Recall at R """ correct = 0.0 - for i, true_neighbors in enumerate(ground_truth_set): - true_neighbors_set = set(true_neighbors[:k]) + query = 0 + while True: + true_neighbors = neighbor_dataset.read(1) + if true_neighbors is None: + break + true_neighbors_set = set(true_neighbors[0][:k]) for j in range(r): - if results[i][j] in true_neighbors_set: + if results[query][j] in true_neighbors_set: correct += 1.0 + query += 1 - return correct / (r * len(ground_truth_set)) + return correct / (r * neighbor_dataset.size()) def get_index_size_in_kb(opensearch, index_name): @@ -527,5 +552,5 @@ def query_index(opensearch: OpenSearch, index_name: str, body: dict, _source_excludes=excluded_fields) -def bulk_index(opensearch: OpenSearch, index_name: str, body: dict): +def bulk_index(opensearch: OpenSearch, index_name: str, body: List): return opensearch.bulk(index=index_name, body=body, timeout='5m') diff --git a/benchmarks/perf-tool/sample-configs/faiss-sift-ivf/test.yml b/benchmarks/perf-tool/sample-configs/faiss-sift-ivf/test.yml index b9dc98a33..c8fb42ec4 100644 --- a/benchmarks/perf-tool/sample-configs/faiss-sift-ivf/test.yml +++ b/benchmarks/perf-tool/sample-configs/faiss-sift-ivf/test.yml @@ -51,6 +51,8 @@ steps: field_name: target_field dataset_format: hdf5 dataset_path: ../dataset/sift-128-euclidean.hdf5 + neighbors_format: hdf5 + neighbors_path: ../dataset/sift-128-euclidean.hdf5 cleanup: - name: delete_model model_id: test-model diff --git a/benchmarks/perf-tool/sample-configs/nmslib-sift-hnsw/test.yml b/benchmarks/perf-tool/sample-configs/nmslib-sift-hnsw/test.yml index 5e933f913..deea1ad47 100644 --- a/benchmarks/perf-tool/sample-configs/nmslib-sift-hnsw/test.yml +++ b/benchmarks/perf-tool/sample-configs/nmslib-sift-hnsw/test.yml @@ -29,6 +29,8 @@ steps: field_name: target_field dataset_format: hdf5 dataset_path: ../dataset/sift-128-euclidean.hdf5 + neighbors_format: hdf5 + neighbors_path: ../dataset/sift-128-euclidean.hdf5 cleanup: - name: delete_index index_name: target_index