Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Vector Search Nested benchmark #584

Merged
merged 7 commits into from
Aug 22, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions osbenchmark/utils/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class Context(Enum):
INDEX = 1
QUERY = 2
NEIGHBORS = 3
PARENTS = 6


class DataSet(ABC):
Expand Down Expand Up @@ -141,6 +142,9 @@ def parse_context(context: Context) -> str:
if context == Context.QUERY:
return "test"

if context == Context.PARENTS:
return "parents" # used in nested benchmarks to get the parent document id associated with each vector.

raise Exception("Unsupported context")


Expand Down
192 changes: 164 additions & 28 deletions osbenchmark/workload/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import time
from abc import ABC, abstractmethod
from enum import Enum
from typing import List, Dict, Any
from typing import List, Dict, Any, Optional, Tuple

import numpy as np

Expand Down Expand Up @@ -884,6 +884,8 @@ class VectorDataSetPartitionParamSource(ParamSource):
def __init__(self, workload, params, context: Context, **kwargs):
super().__init__(workload, params, **kwargs)
self.field_name: str = parse_string_parameter("field", params)
self.NESTED_FIELD_SEPARATOR = "."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make it constant

self.is_nested = self.NESTED_FIELD_SEPARATOR in self.field_name # in base class because used for both bulk ingest and queries.
self.context = context
self.data_set_format = parse_string_parameter("data_set_format", params)
self.data_set_path = parse_string_parameter("data_set_path", params, "")
Expand Down Expand Up @@ -979,6 +981,18 @@ def partition(self, partition_index, total_partitions):
partition_x.current = partition_x.offset
return partition_x

def get_split_fields(self) -> Tuple[str, str]:
fields_as_array = self.field_name.split(self.NESTED_FIELD_SEPARATOR)

# TODO: Add support to multiple levels of nesting if a future benchmark requires it.

if len(fields_as_array) != 2:
raise ValueError(
f"Field name {self.field_name} is not a nested field name. Currently we support only 1 level of nesting."
)
return fields_as_array[0], fields_as_array[1]


@abstractmethod
def params(self):
"""
Expand Down Expand Up @@ -1155,12 +1169,24 @@ def _build_vector_search_query_body(self, vector, efficient_filter=None) -> dict
query.update({
"filter": efficient_filter,
})
return {

knn_search_query = {
"knn": {
self.field_name: query,
},
}

if self.is_nested:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if more than one level nested is identified, lets raise an exception and add TODO to fix that later

outer_field_name, _inner_field_name = self.get_split_fields()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
outer_field_name, _inner_field_name = self.get_split_fields()
outer_field_name, _ = self.get_split_fields()

return {
"nested": {
"path": outer_field_name,
"query": knn_search_query
}
}

return knn_search_query


class BulkVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource):
""" Create bulk index requests from a data set of vectors.
Expand All @@ -1177,13 +1203,74 @@ class BulkVectorsFromDataSetParamSource(VectorDataSetPartitionParamSource):
def __init__(self, workload, params, **kwargs):
super().__init__(workload, params, Context.INDEX, **kwargs)
self.bulk_size: int = parse_int_parameter("bulk_size", params)
self.retries: int = parse_int_parameter("retries", params,
self.DEFAULT_RETRIES)
self.retries: int = parse_int_parameter("retries", params, self.DEFAULT_RETRIES)
self.index_name: str = parse_string_parameter("index", params)
self.id_field_name: str = parse_string_parameter(
self.PARAMS_NAME_ID_FIELD_NAME, params, self.DEFAULT_ID_FIELD_NAME)
self.PARAMS_NAME_ID_FIELD_NAME, params, self.DEFAULT_ID_FIELD_NAME
)

def bulk_transform(self, partition: np.ndarray, action) -> List[Dict[str, Any]]:
self.action_buffer = None
self.num_nested_vectors = 10

self.parent_data_set_path = parse_string_parameter(
"parents_data_set_path", params, self.data_set_path
)

self.parent_data_set_format = self.data_set_format

self.parent_data_set_corpus = self.data_set_corpus

self.logger = logging.getLogger(__name__)

def partition(self, partition_index, total_partitions):
partition = super().partition(partition_index, total_partitions)
if self.parent_data_set_corpus and not self.parent_data_set_path:
parent_data_set_path = self._get_corpora_file_paths(
self.parent_data_set_corpus, self.parent_data_set_format
)
self._validate_data_set_corpus(parent_data_set_path)
self.parent_data_set_path = parent_data_set_path[0]
if not self.parent_data_set_path:
self.parent_data_set_path = self.data_set_path
# add neighbor instance to partition
if self.is_nested:
partition.parent_data_set = get_data_set(
self.parent_data_set_format, self.parent_data_set_path, Context.PARENTS
)
partition.parent_data_set.seek(partition.offset)

return partition

def bulk_transform_non_nested(self, partition: np.ndarray, action) -> List[Dict[str, Any]]:
"""
Create bulk ingest actions for data with a non-nested field.
"""
actions = []

_ = [
actions.extend([action(self.id_field_name, i + self.current), None])
for i in range(len(partition))
]
bulk_contents = []

add_id_field_to_body = self.id_field_name != self.DEFAULT_ID_FIELD_NAME
for vec, identifier in zip(
partition.tolist(), range(self.current, self.current + len(partition))
):
row = {self.field_name: vec}
if add_id_field_to_body:
row.update({self.id_field_name: identifier})
bulk_contents.append(row)

actions[1::2] = bulk_contents

self.logger.info("Actions: %s", actions)
return actions


def bulk_transform(
self, partition: np.ndarray, action, parents_ids: Optional[np.ndarray]
) -> List[Dict[str, Any]]:
"""Partitions and transforms a list of vectors into OpenSearch's bulk
injection format.
Args:
Expand All @@ -1193,19 +1280,63 @@ def bulk_transform(self, partition: np.ndarray, action) -> List[Dict[str, Any]]:
Returns:
An array of transformed vectors in bulk format.
"""

if not self.is_nested:
return self.bulk_transform_non_nested(partition, action)

actions = []
_ = [
actions.extend([action(self.id_field_name, i + self.current), None])
for i in range(len(partition))
]
bulk_contents = []

outer_field_name, inner_field_name = self.get_split_fields()

add_id_field_to_body = self.id_field_name != self.DEFAULT_ID_FIELD_NAME
for vec, identifier in zip(partition.tolist(), range(self.current, self.current + len(partition))):
row = {self.field_name: vec}

if self.action_buffer is None:
first_index_of_parent_ids = 0
self.action_buffer = {outer_field_name: []}
self.action_parent_id = parents_ids[first_index_of_parent_ids]
if add_id_field_to_body:
row.update({self.id_field_name: identifier})
bulk_contents.append(row)
actions[1::2] = bulk_contents
self.action_buffer.update({self.id_field_name: self.action_parent_id})

part_list = partition.tolist()
for i in range(len(partition)):

nested = {inner_field_name: part_list[i]}

current_parent_id = parents_ids[i]

if self.action_parent_id == current_parent_id:
self.action_buffer[outer_field_name].append(nested)
else:
# flush action buffer
actions.extend(
[
action(self.id_field_name, self.action_parent_id),
self.action_buffer,
]
)

self.current += len(self.action_buffer[outer_field_name])

self.action_buffer = {outer_field_name: []}
if add_id_field_to_body:

self.action_buffer.update({self.id_field_name: current_parent_id})

self.action_buffer[outer_field_name].append(nested)

self.action_parent_id = current_parent_id

max_position = self.offset + self.num_vectors
if (
self.current + len(self.action_buffer[outer_field_name]) + self.bulk_size
>= max_position
):
# final flush of remaining vectors in the last partition (for the last client)
self.current += len(self.action_buffer[outer_field_name])
actions.extend(
[action(self.id_field_name, self.action_parent_id), self.action_buffer]
)

return actions

def params(self):
Expand All @@ -1217,29 +1348,34 @@ def params(self):

def action(id_field_name, doc_id):
# support only index operation
bulk_action = 'index'
metadata = {
'_index': self.index_name
}
bulk_action = "index"
metadata = {"_index": self.index_name}
# Add id field to metadata only if it is _id
if id_field_name == self.DEFAULT_ID_FIELD_NAME:
metadata.update({id_field_name: doc_id})
return {bulk_action: metadata}

remaining_vectors_in_partition = self.num_vectors + self.offset - self.current
# update bulk size if number of vectors to read is less than actual bulk size

bulk_size = min(self.bulk_size, remaining_vectors_in_partition)

partition = self.data_set.read(bulk_size)
body = self.bulk_transform(partition, action)

if self.is_nested:
parent_ids = self.parent_data_set.read(bulk_size)
else:
parent_ids = None

body = self.bulk_transform(partition, action, parent_ids)
size = len(body) // 2
self.current += size

if not self.is_nested:
# in the nested case, we may have irregular number of vectors ingested,
# so we calculate self.current within bulk_transform method when self.is_nested.
self.current += size
self.percent_completed = self.current / self.total

return {
"body": body,
"retries": self.retries,
"size": size
}
return {"body": body, "retries": self.retries, "size": size}


def get_target(workload, params):
Expand Down
Binary file added small-nested-works.hdf5
Binary file not shown.
35 changes: 35 additions & 0 deletions tests/utils/dataset_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,12 @@ def _build_data_set(self, context: DataSetBuildContext):
# file with distance.
context.vectors.tofile(f)

def create_parent_ids(num_vectors: int, group_size: int = 10) -> np.ndarray:
num_ids = (num_vectors + group_size - 1) // group_size # Calculate total number of different IDs needed
ids = np.arange(1, num_ids + 1) # Create an array of IDs starting from 1
parent_ids = np.repeat(ids, group_size)[:num_vectors] # Repeat each ID 'group_size' times and trim to 'num_vectors'
return parent_ids


def create_random_2d_array(num_vectors: int, dimension: int) -> np.ndarray:
rng = np.random.default_rng()
Expand Down Expand Up @@ -239,6 +245,35 @@ def create_data_set(
return data_set_path


def create_parent_data_set(
num_vectors: int,
dimension: int,
extension: str,
data_set_context: Context,
data_set_dir,
file_path: str = None
) -> str:
if file_path:
data_set_path = file_path
else:
file_name_base = ''.join(random.choice(string.ascii_letters) for _ in
range(DEFAULT_RANDOM_STRING_LENGTH))
data_set_file_name = "{}.{}".format(file_name_base, extension)
data_set_path = os.path.join(data_set_dir, data_set_file_name)
context = DataSetBuildContext(
data_set_context,
create_parent_ids(num_vectors),
data_set_path)

if extension == HDF5DataSet.FORMAT_NAME:
HDF5Builder().add_data_set_build_context(context).build()
else:
BigANNVectorBuilder().add_data_set_build_context(context).build()

return data_set_path



def create_ground_truth(
num_queries: int,
k: int,
Expand Down
Loading
Loading