Skip to content

Commit

Permalink
Add Querying Functionality to OSB (#409)
Browse files Browse the repository at this point in the history
Adds custom parameter source to read queries from data set. 

Adds unit tests for param sources for benchmarking. In addition, adds a
test utility to create data sets dynamically.

Signed-off-by: John Mazanec <jmazane@amazon.com>
  • Loading branch information
jmazanec15 authored Jun 21, 2022
1 parent 714ddf3 commit b59dcff
Show file tree
Hide file tree
Showing 10 changed files with 1,028 additions and 232 deletions.
495 changes: 282 additions & 213 deletions benchmarks/osb/README.md

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions benchmarks/osb/extensions/data_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class HDF5DataSet(DataSet):

def __init__(self, dataset_path: str, context: Context):
file = h5py.File(dataset_path)
self.data = cast(h5py.Dataset, file[self._parse_context(context)])
self.data = cast(h5py.Dataset, file[self.parse_context(context)])
self.current = self.BEGINNING

def read(self, chunk_size: int):
Expand Down Expand Up @@ -93,7 +93,7 @@ def reset(self):
self.current = self.BEGINNING

@staticmethod
def _parse_context(context: Context) -> str:
def parse_context(context: Context) -> str:
if context == Context.NEIGHBORS:
return "neighbors"

Expand Down Expand Up @@ -176,6 +176,9 @@ def reset(self):
self.file.seek(BigANNVectorDataSet.DATA_SET_HEADER_LENGTH)
self.current = BigANNVectorDataSet.BEGINNING

def __del__(self):
self.file.close()

@staticmethod
def _get_data_size(file_name):
ext = file_name.split('.')[-1]
Expand Down
170 changes: 155 additions & 15 deletions benchmarks/osb/extensions/param_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
import copy
from abc import ABC, abstractmethod

from .data_set import Context, HDF5DataSet, DataSet, BigANNVectorDataSet
from .util import bulk_transform, parse_string_parameter, parse_int_parameter, \
Expand All @@ -15,17 +16,42 @@ def register(registry):
"bulk-from-data-set", BulkVectorsFromDataSetParamSource
)

registry.register_param_source(
"knn-query-from-data-set", QueryVectorsFromDataSetParamSource
)

class BulkVectorsFromDataSetParamSource:
def __init__(self, workload, params, **kwargs):

class VectorsFromDataSetParamSource(ABC):
""" Abstract class that can read vectors from a data set and partition the
vectors across multiple clients.
Attributes:
index_name: Name of the index to generate the query for
field_name: Name of the field to generate the query for
data_set_format: Format data set is serialized with. bigann or hdf5
data_set_path: Path to data set
context: Context the data set will be used in.
data_set: Structure containing meta data about data and ability to read
num_vectors: Number of vectors to use from the data set
total: Number of vectors for the partition
current: Current vector offset in data set
infinite: Property of param source signalling that it can be exhausted
percent_completed: Progress indicator for how exhausted data set is
offset: Offset into the data set to start at. Relevant when there are
multiple partitions
"""

def __init__(self, params, context: Context):
self.index_name: str = parse_string_parameter("index", params)
self.field_name: str = parse_string_parameter("field", params)

self.context = context
self.data_set_format = parse_string_parameter("data_set_format", params)
self.data_set_path = parse_string_parameter("data_set_path", params)
self.data_set: DataSet = self._read_data_set()
self.data_set: DataSet = self._read_data_set(self.data_set_format,
self.data_set_path,
self.context)

self.field_name: str = parse_string_parameter("field", params)
self.index_name: str = parse_string_parameter("index", params)
self.bulk_size: int = parse_int_parameter("bulk_size", params)
self.retries: int = parse_int_parameter("retries", params, 10)
self.num_vectors: int = parse_int_parameter(
"num_vectors", params, self.data_set.size()
)
Expand All @@ -35,29 +61,143 @@ def __init__(self, workload, params, **kwargs):
self.percent_completed = 0
self.offset = 0

def _read_data_set(self):
if self.data_set_format == HDF5DataSet.FORMAT_NAME:
return HDF5DataSet(self.data_set_path, Context.INDEX)
if self.data_set_format == BigANNVectorDataSet.FORMAT_NAME:
return BigANNVectorDataSet(self.data_set_path)
def _read_data_set(self, data_set_format: str, data_set_path: str,
data_set_context: Context):
if data_set_format == HDF5DataSet.FORMAT_NAME:
return HDF5DataSet(data_set_path, data_set_context)
if data_set_format == BigANNVectorDataSet.FORMAT_NAME:
return BigANNVectorDataSet(data_set_path)
raise ConfigurationError("Invalid data set format")

def partition(self, partition_index, total_partitions):
if self.data_set.size() % total_partitions != 0:
raise ValueError("Data set must be divisible by number of clients")
"""
Splits up the parameters source so that multiple clients can read data
from it.
Args:
partition_index: index of one particular partition
total_partitions: total number of partitions data set is split into
Returns:
The parameter source for this particular partion
"""
if self.num_vectors % total_partitions != 0:
raise ValueError("Num vectors must be divisible by number of "
"partitions")

partition_x = copy.copy(self)
partition_x.num_vectors = int(self.num_vectors / total_partitions)
partition_x.offset = int(partition_index * partition_x.num_vectors)

# We need to create a new instance of the data set for each client
partition_x.data_set = partition_x._read_data_set()
partition_x.data_set = partition_x._read_data_set(
self.data_set_format,
self.data_set_path,
self.context
)
partition_x.data_set.seek(partition_x.offset)
partition_x.current = partition_x.offset
return partition_x

@abstractmethod
def params(self):
"""
Returns: A single parameter from this sourc
"""
pass


class QueryVectorsFromDataSetParamSource(VectorsFromDataSetParamSource):
""" Query parameter source for k-NN. Queries are created from data set
provided.
Attributes:
k: The number of results to return for the search
vector_batch: List of vectors to be read from data set. Read are batched
so that we do not need to read from disk for each query
"""

VECTOR_READ_BATCH_SIZE = 100 # batch size to read vectors from data-set

def __init__(self, workload, params, **kwargs):
super().__init__(params, Context.QUERY)
self.k = parse_int_parameter("k", params)
self.vector_batch = None

def params(self):
"""
Returns: A query parameter with a vector from a data set
"""
if self.current >= self.num_vectors + self.offset:
raise StopIteration

if self.vector_batch is None or len(self.vector_batch) == 0:
self.vector_batch = self._batch_read(self.data_set)
if self.vector_batch is None:
raise StopIteration
vector = self.vector_batch.pop(0)
self.current += 1
self.percent_completed = self.current / self.total

return self._build_query_body(self.index_name, self.field_name, self.k,
vector)

def _batch_read(self, data_set: DataSet):
return list(data_set.read(self.VECTOR_READ_BATCH_SIZE))

def _build_query_body(self, index_name: str, field_name: str, k: int,
vector) -> dict:
"""Builds a k-NN query that can be used to execute an approximate nearest
neighbor search against a k-NN plugin index
Args:
index_name: name of index to search
field_name: name of field to search
k: number of results to return
vector: vector used for query
Returns:
A dictionary containing the body used for search, a set of request
parameters to attach to the search and the name of the index.
"""
return {
"index": index_name,
"request-params": {
"_source": {
"exclude": [field_name]
}
},
"body": {
"size": k,
"query": {
"knn": {
field_name: {
"vector": vector,
"k": k
}
}
}
}
}


class BulkVectorsFromDataSetParamSource(VectorsFromDataSetParamSource):
""" Create bulk index requests from a data set of vectors.
Attributes:
bulk_size: number of vectors per request
retries: number of times to retry the request when it fails
"""

DEFAULT_RETRIES = 10

def __init__(self, workload, params, **kwargs):
super().__init__(params, Context.INDEX)
self.bulk_size: int = parse_int_parameter("bulk_size", params)
self.retries: int = parse_int_parameter("retries", params,
self.DEFAULT_RETRIES)

def params(self):
"""
Returns: A bulk index parameter with vectors from a data set.
"""
if self.current >= self.num_vectors + self.offset:
raise StopIteration

Expand Down
5 changes: 4 additions & 1 deletion benchmarks/osb/params/no-train-params.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
"hnsw_ef_construction": 512,
"hnsw_m": 16,


"query_k": 10,
"query_clients": 10,
"query_data_set_format": "hdf5",
"query_data_set_path": "<path to data>",

"ivf_nlists": 1,
"ivf_nprobes": 1,
Expand Down
7 changes: 6 additions & 1 deletion benchmarks/osb/params/train-params.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,10 @@
"train_index_data_set_format": "hdf5",
"train_index_data_set_path": "<path to data>",
"train_index_num_vectors": 1000000,
"train_index_bulk_index_clients": 10
"train_index_bulk_index_clients": 10,

"query_k": 10,
"query_clients": 10,
"query_data_set_format": "hdf5",
"query_data_set_path": "<path to data>"
}
13 changes: 13 additions & 0 deletions benchmarks/osb/procedures/no-train-test.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@
"index": "{{ target_index_name }}",
"retries": 100
}
},
{
"operation": {
"name": "knn-query-from-data-set",
"operation-type": "search",
"index": "{{ target_index_name }}",
"param-source": "knn-query-from-data-set",
"k": {{ query_k }},
"field": "{{ target_field_name }}",
"data_set_format": "{{ query_data_set_format }}",
"data_set_path": "{{ query_data_set_path }}"
},
"clients": {{ query_clients }}
}
]
}
13 changes: 13 additions & 0 deletions benchmarks/osb/procedures/train-test.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@
"index": "{{ target_index_name }}",
"retries": 100
}
},
{
"operation": {
"name": "knn-query-from-data-set",
"operation-type": "search",
"index": "{{ target_index_name }}",
"param-source": "knn-query-from-data-set",
"k": {{ query_k }},
"field": "{{ target_field_name }}",
"data_set_format": "{{ query_data_set_format }}",
"data_set_path": "{{ query_data_set_path }}"
},
"clients": {{ query_clients }}
}
]
}
Empty file.
Loading

0 comments on commit b59dcff

Please sign in to comment.