Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
Signed-off-by: John Mazanec <jmazane@amazon.com>
  • Loading branch information
jmazanec15 committed Apr 19, 2022
1 parent 934643f commit 7b1c5a5
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 34 deletions.
59 changes: 36 additions & 23 deletions benchmarks/osb/extensions/data_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class DataSet(ABC):
"""
__metaclass__ = ABCMeta

BEGINNING = 0

@abstractmethod
def read(self, chunk_size: int):
pass
Expand All @@ -58,31 +60,35 @@ class HDF5DataSet(DataSet):
def __init__(self, dataset_path: str, context: Context):
file = h5py.File(dataset_path)
self.data = cast(h5py.Dataset, file[self._parse_context(context)])
self.current = 0
self.current = self.BEGINNING

def read(self, chunk_size: int):
if self.current >= self.size():
return None

end_i = self.current + chunk_size
if end_i > self.size():
end_i = self.size()
end_offset = self.current + chunk_size
if end_offset > self.size():
end_offset = self.size()

v = cast(np.ndarray, self.data[self.current:end_i])
self.current = end_i
v = cast(np.ndarray, self.data[self.current:end_offset])
self.current = end_offset
return v

def seek(self, offset: int):

if offset < self.BEGINNING:
raise Exception("Offset must be greater than or equal to 0")

if offset >= self.size():
raise Exception("Offset is greater than the size")
raise Exception("Offset must be less than the data set size")

self.current = offset

def size(self):
return self.data.len()

def reset(self):
self.current = 0
self.current = self.BEGINNING

@staticmethod
def _parse_context(context: Context) -> str:
Expand All @@ -103,44 +109,51 @@ class BigANNVectorDataSet(DataSet):
<https://big-ann-benchmarks.com/index.html#bench-datasets>`_
"""

DATA_SET_HEADER_LENGTH = 8

def __init__(self, dataset_path: str):
self.file = open(dataset_path, 'rb')
self.file.seek(0, os.SEEK_END)
self.file.seek(self.BEGINNING, os.SEEK_END)
num_bytes = self.file.tell()
self.file.seek(0)
self.file.seek(self.BEGINNING)

if num_bytes < 8:
if num_bytes < self.DATA_SET_HEADER_LENGTH:
raise Exception("File is invalid")

self.num_points = int.from_bytes(self.file.read(4), "little")
self.dimension = int.from_bytes(self.file.read(4), "little")
self.bytes_per_num = self._get_data_size(dataset_path)

if (num_bytes - 8) != self.num_points * self.dimension * \
self.bytes_per_num:
if (num_bytes - self.DATA_SET_HEADER_LENGTH) != self.num_points * \
self.dimension * self.bytes_per_num:
raise Exception("File is invalid")

self.reader = self._value_reader(dataset_path)
self.current = 0
self.current = self.BEGINNING

def read(self, chunk_size: int):
if self.current >= self.size():
return None

end_i = self.current + chunk_size
if end_i > self.size():
end_i = self.size()
end_offset = self.current + chunk_size
if end_offset > self.size():
end_offset = self.size()

v = np.asarray([self._read_vector() for _ in
range(end_i - self.current)])
self.current = end_i
range(end_offset - self.current)])
self.current = end_offset
return v

def seek(self, offset: int):

if offset < self.BEGINNING:
raise Exception("Offset must be greater than or equal to 0")

if offset >= self.size():
raise Exception("Offset is greater than the size")
raise Exception("Offset must be less than the data set size")

bytes_offset = 8 + self.dimension*self.bytes_per_num*offset
bytes_offset = self.DATA_SET_HEADER_LENGTH + self.dimension * \
self.bytes_per_num * offset
self.file.seek(bytes_offset)
self.current = offset

Expand All @@ -152,8 +165,8 @@ def size(self):
return self.num_points

def reset(self):
self.file.seek(8) # Seek to 8 bytes to skip re-reading metadata
self.current = 0
self.file.seek(self.DATA_SET_HEADER_LENGTH)
self.current = self.BEGINNING

@staticmethod
def _get_data_size(file_name):
Expand Down
12 changes: 5 additions & 7 deletions benchmarks/osb/extensions/param_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,10 @@ def __init__(self, workload, params, **kwargs):

def _read_data_set(self):
if self.data_set_format == "hdf5":
data_set = HDF5DataSet(self.data_set_path, Context.INDEX)
elif self.data_set_format == "bigann":
data_set = BigANNVectorDataSet(self.data_set_path)
else:
raise ConfigurationError("Invalid data set format")
return data_set
return HDF5DataSet(self.data_set_path, Context.INDEX)
if self.data_set_format == "bigann":
return BigANNVectorDataSet(self.data_set_path)
raise ConfigurationError("Invalid data set format")

def partition(self, partition_index, total_partitions):
if self.data_set.size() % total_partitions != 0:
Expand All @@ -68,7 +66,7 @@ def action(doc_id):

partition = self.data_set.read(self.bulk_size)
body = bulk_transform(partition, self.field_name, action, self.current)
size = int(len(body) / 2)
size = len(body) / 2
self.current += size
self.percent_completed = float(self.current)/self.total

Expand Down
8 changes: 4 additions & 4 deletions benchmarks/osb/extensions/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ async def __call__(self, opensearch, params):
except:
pass

raise TimeoutError('Failed to submit bulk request in specified number '
'of retries')
raise TimeoutError("Failed to submit bulk request in specified number "
"of retries: {}".format(retries))

def __repr__(self, *args, **kwargs):
return "custom-vector-bulk"
Expand All @@ -62,8 +62,8 @@ async def __call__(self, opensearch, params):
except:
pass

raise TimeoutError('Failed to refresh the index in specified number '
'of retries')
raise TimeoutError("Failed to refresh the index in specified number "
"of retries: {}".format(retries))

def __repr__(self, *args, **kwargs):
return "custom-refresh"
Expand Down

0 comments on commit 7b1c5a5

Please sign in to comment.