Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Querying Functionality to OSB #409

Merged
merged 13 commits into from
Jun 21, 2022
495 changes: 282 additions & 213 deletions benchmarks/osb/README.md

Large diffs are not rendered by default.

7 changes: 5 additions & 2 deletions benchmarks/osb/extensions/data_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class HDF5DataSet(DataSet):

def __init__(self, dataset_path: str, context: Context):
file = h5py.File(dataset_path)
self.data = cast(h5py.Dataset, file[self._parse_context(context)])
self.data = cast(h5py.Dataset, file[self.parse_context(context)])
self.current = self.BEGINNING

def read(self, chunk_size: int):
Expand Down Expand Up @@ -93,7 +93,7 @@ def reset(self):
self.current = self.BEGINNING

@staticmethod
def _parse_context(context: Context) -> str:
def parse_context(context: Context) -> str:
if context == Context.NEIGHBORS:
return "neighbors"

Expand Down Expand Up @@ -176,6 +176,9 @@ def reset(self):
self.file.seek(BigANNVectorDataSet.DATA_SET_HEADER_LENGTH)
self.current = BigANNVectorDataSet.BEGINNING

def __del__(self):
self.file.close()

@staticmethod
def _get_data_size(file_name):
ext = file_name.split('.')[-1]
Expand Down
170 changes: 155 additions & 15 deletions benchmarks/osb/extensions/param_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# this file be licensed under the Apache-2.0 license or a
# compatible open source license.
import copy
from abc import ABC, abstractmethod

from .data_set import Context, HDF5DataSet, DataSet, BigANNVectorDataSet
from .util import bulk_transform, parse_string_parameter, parse_int_parameter, \
Expand All @@ -15,17 +16,42 @@ def register(registry):
"bulk-from-data-set", BulkVectorsFromDataSetParamSource
)

registry.register_param_source(
"knn-query-from-data-set", QueryVectorsFromDataSetParamSource
)

class BulkVectorsFromDataSetParamSource:
def __init__(self, workload, params, **kwargs):

class VectorsFromDataSetParamSource(ABC):
""" Abstract class that can read vectors from a data set and partition the
vectors across multiple clients.

Attributes:
index_name: Name of the index to generate the query for
field_name: Name of the field to generate the query for
data_set_format: Format data set is serialized with. bigann or hdf5
data_set_path: Path to data set
context: Context the data set will be used in.
data_set: Structure containing meta data about data and ability to read
num_vectors: Number of vectors to use from the data set
total: Number of vectors for the partition
current: Current vector offset in data set
infinite: Property of param source signalling that it can be exhausted
percent_completed: Progress indicator for how exhausted data set is
offset: Offset into the data set to start at. Relevant when there are
multiple partitions
"""

def __init__(self, params, context: Context):
self.index_name: str = parse_string_parameter("index", params)
self.field_name: str = parse_string_parameter("field", params)

self.context = context
self.data_set_format = parse_string_parameter("data_set_format", params)
self.data_set_path = parse_string_parameter("data_set_path", params)
self.data_set: DataSet = self._read_data_set()
self.data_set: DataSet = self._read_data_set(self.data_set_format,
self.data_set_path,
self.context)

self.field_name: str = parse_string_parameter("field", params)
self.index_name: str = parse_string_parameter("index", params)
self.bulk_size: int = parse_int_parameter("bulk_size", params)
self.retries: int = parse_int_parameter("retries", params, 10)
self.num_vectors: int = parse_int_parameter(
"num_vectors", params, self.data_set.size()
)
Expand All @@ -35,29 +61,143 @@ def __init__(self, workload, params, **kwargs):
self.percent_completed = 0
self.offset = 0

def _read_data_set(self):
if self.data_set_format == HDF5DataSet.FORMAT_NAME:
return HDF5DataSet(self.data_set_path, Context.INDEX)
if self.data_set_format == BigANNVectorDataSet.FORMAT_NAME:
return BigANNVectorDataSet(self.data_set_path)
def _read_data_set(self, data_set_format: str, data_set_path: str,
data_set_context: Context):
if data_set_format == HDF5DataSet.FORMAT_NAME:
return HDF5DataSet(data_set_path, data_set_context)
if data_set_format == BigANNVectorDataSet.FORMAT_NAME:
return BigANNVectorDataSet(data_set_path)
raise ConfigurationError("Invalid data set format")

def partition(self, partition_index, total_partitions):
if self.data_set.size() % total_partitions != 0:
raise ValueError("Data set must be divisible by number of clients")
"""
Splits up the parameters source so that multiple clients can read data
from it.
Args:
partition_index: index of one particular partition
total_partitions: total number of partitions data set is split into

Returns:
The parameter source for this particular partion
"""
if self.num_vectors % total_partitions != 0:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this mean that the data set size must be divisible by the number of parallel clients?
If so I think in next revision we need to relax this requirement and divide evenly except for last client that will have the remainder

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes thats a good point. I can update this in a future PR. Will create an issue when PR is merged.

raise ValueError("Num vectors must be divisible by number of "
"partitions")

partition_x = copy.copy(self)
partition_x.num_vectors = int(self.num_vectors / total_partitions)
partition_x.offset = int(partition_index * partition_x.num_vectors)

# We need to create a new instance of the data set for each client
partition_x.data_set = partition_x._read_data_set()
partition_x.data_set = partition_x._read_data_set(
self.data_set_format,
self.data_set_path,
self.context
)
partition_x.data_set.seek(partition_x.offset)
partition_x.current = partition_x.offset
return partition_x

@abstractmethod
def params(self):
"""
Returns: A single parameter from this sourc
"""
pass


class QueryVectorsFromDataSetParamSource(VectorsFromDataSetParamSource):
""" Query parameter source for k-NN. Queries are created from data set
provided.

Attributes:
k: The number of results to return for the search
vector_batch: List of vectors to be read from data set. Read are batched
so that we do not need to read from disk for each query
"""

VECTOR_READ_BATCH_SIZE = 100 # batch size to read vectors from data-set

def __init__(self, workload, params, **kwargs):
super().__init__(params, Context.QUERY)
self.k = parse_int_parameter("k", params)
self.vector_batch = None

def params(self):
"""
Returns: A query parameter with a vector from a data set
"""
if self.current >= self.num_vectors + self.offset:
raise StopIteration

if self.vector_batch is None or len(self.vector_batch) == 0:
self.vector_batch = self._batch_read(self.data_set)
if self.vector_batch is None:
raise StopIteration
vector = self.vector_batch.pop(0)
self.current += 1
self.percent_completed = self.current / self.total

return self._build_query_body(self.index_name, self.field_name, self.k,
vector)

def _batch_read(self, data_set: DataSet):
return list(data_set.read(self.VECTOR_READ_BATCH_SIZE))

def _build_query_body(self, index_name: str, field_name: str, k: int,
vector) -> dict:
"""Builds a k-NN query that can be used to execute an approximate nearest
neighbor search against a k-NN plugin index
Args:
index_name: name of index to search
field_name: name of field to search
k: number of results to return
vector: vector used for query
Returns:
A dictionary containing the body used for search, a set of request
parameters to attach to the search and the name of the index.
"""
return {
"index": index_name,
"request-params": {
"_source": {
"exclude": [field_name]
}
},
"body": {
"size": k,
"query": {
"knn": {
field_name: {
"vector": vector,
"k": k
}
}
}
}
}


class BulkVectorsFromDataSetParamSource(VectorsFromDataSetParamSource):
""" Create bulk index requests from a data set of vectors.

Attributes:
bulk_size: number of vectors per request
retries: number of times to retry the request when it fails
"""

DEFAULT_RETRIES = 10

def __init__(self, workload, params, **kwargs):
super().__init__(params, Context.INDEX)
self.bulk_size: int = parse_int_parameter("bulk_size", params)
self.retries: int = parse_int_parameter("retries", params,
self.DEFAULT_RETRIES)

def params(self):
"""
Returns: A bulk index parameter with vectors from a data set.
"""
if self.current >= self.num_vectors + self.offset:
raise StopIteration

Expand Down
5 changes: 4 additions & 1 deletion benchmarks/osb/params/no-train-params.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
"hnsw_ef_construction": 512,
"hnsw_m": 16,


"query_k": 10,
"query_clients": 10,
"query_data_set_format": "hdf5",
"query_data_set_path": "<path to data>",

"ivf_nlists": 1,
"ivf_nprobes": 1,
Expand Down
7 changes: 6 additions & 1 deletion benchmarks/osb/params/train-params.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,10 @@
"train_index_data_set_format": "hdf5",
"train_index_data_set_path": "<path to data>",
"train_index_num_vectors": 1000000,
"train_index_bulk_index_clients": 10
"train_index_bulk_index_clients": 10,

"query_k": 10,
"query_clients": 10,
"query_data_set_format": "hdf5",
"query_data_set_path": "<path to data>"
}
13 changes: 13 additions & 0 deletions benchmarks/osb/procedures/no-train-test.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@
"index": "{{ target_index_name }}",
"retries": 100
}
},
{
"operation": {
"name": "knn-query-from-data-set",
"operation-type": "search",
"index": "{{ target_index_name }}",
"param-source": "knn-query-from-data-set",
"k": {{ query_k }},
"field": "{{ target_field_name }}",
"data_set_format": "{{ query_data_set_format }}",
"data_set_path": "{{ query_data_set_path }}"
},
"clients": {{ query_clients }}
}
]
}
13 changes: 13 additions & 0 deletions benchmarks/osb/procedures/train-test.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,19 @@
"index": "{{ target_index_name }}",
"retries": 100
}
},
{
"operation": {
"name": "knn-query-from-data-set",
"operation-type": "search",
"index": "{{ target_index_name }}",
"param-source": "knn-query-from-data-set",
"k": {{ query_k }},
"field": "{{ target_field_name }}",
"data_set_format": "{{ query_data_set_format }}",
"data_set_path": "{{ query_data_set_path }}"
},
"clients": {{ query_clients }}
}
]
}
Empty file.
Loading