From 7b1c5a53f7c416ea036267fa092222e25fb07e6e Mon Sep 17 00:00:00 2001 From: John Mazanec Date: Tue, 19 Apr 2022 15:09:36 -0700 Subject: [PATCH] Address comments Signed-off-by: John Mazanec --- benchmarks/osb/extensions/data_set.py | 59 +++++++++++++--------- benchmarks/osb/extensions/param_sources.py | 12 ++--- benchmarks/osb/extensions/runners.py | 8 +-- 3 files changed, 45 insertions(+), 34 deletions(-) diff --git a/benchmarks/osb/extensions/data_set.py b/benchmarks/osb/extensions/data_set.py index 22af1da5d..3c05f53c5 100644 --- a/benchmarks/osb/extensions/data_set.py +++ b/benchmarks/osb/extensions/data_set.py @@ -33,6 +33,8 @@ class DataSet(ABC): """ __metaclass__ = ABCMeta + BEGINNING = 0 + @abstractmethod def read(self, chunk_size: int): pass @@ -58,23 +60,27 @@ class HDF5DataSet(DataSet): def __init__(self, dataset_path: str, context: Context): file = h5py.File(dataset_path) self.data = cast(h5py.Dataset, file[self._parse_context(context)]) - self.current = 0 + self.current = self.BEGINNING def read(self, chunk_size: int): if self.current >= self.size(): return None - end_i = self.current + chunk_size - if end_i > self.size(): - end_i = self.size() + end_offset = self.current + chunk_size + if end_offset > self.size(): + end_offset = self.size() - v = cast(np.ndarray, self.data[self.current:end_i]) - self.current = end_i + v = cast(np.ndarray, self.data[self.current:end_offset]) + self.current = end_offset return v def seek(self, offset: int): + + if offset < self.BEGINNING: + raise Exception("Offset must be greater than or equal to 0") + if offset >= self.size(): - raise Exception("Offset is greater than the size") + raise Exception("Offset must be less than the data set size") self.current = offset @@ -82,7 +88,7 @@ def size(self): return self.data.len() def reset(self): - self.current = 0 + self.current = self.BEGINNING @staticmethod def _parse_context(context: Context) -> str: @@ -103,44 +109,51 @@ class BigANNVectorDataSet(DataSet): `_ """ + DATA_SET_HEADER_LENGTH = 8 + def __init__(self, dataset_path: str): self.file = open(dataset_path, 'rb') - self.file.seek(0, os.SEEK_END) + self.file.seek(self.BEGINNING, os.SEEK_END) num_bytes = self.file.tell() - self.file.seek(0) + self.file.seek(self.BEGINNING) - if num_bytes < 8: + if num_bytes < self.DATA_SET_HEADER_LENGTH: raise Exception("File is invalid") self.num_points = int.from_bytes(self.file.read(4), "little") self.dimension = int.from_bytes(self.file.read(4), "little") self.bytes_per_num = self._get_data_size(dataset_path) - if (num_bytes - 8) != self.num_points * self.dimension * \ - self.bytes_per_num: + if (num_bytes - self.DATA_SET_HEADER_LENGTH) != self.num_points * \ + self.dimension * self.bytes_per_num: raise Exception("File is invalid") self.reader = self._value_reader(dataset_path) - self.current = 0 + self.current = self.BEGINNING def read(self, chunk_size: int): if self.current >= self.size(): return None - end_i = self.current + chunk_size - if end_i > self.size(): - end_i = self.size() + end_offset = self.current + chunk_size + if end_offset > self.size(): + end_offset = self.size() v = np.asarray([self._read_vector() for _ in - range(end_i - self.current)]) - self.current = end_i + range(end_offset - self.current)]) + self.current = end_offset return v def seek(self, offset: int): + + if offset < self.BEGINNING: + raise Exception("Offset must be greater than or equal to 0") + if offset >= self.size(): - raise Exception("Offset is greater than the size") + raise Exception("Offset must be less than the data set size") - bytes_offset = 8 + self.dimension*self.bytes_per_num*offset + bytes_offset = self.DATA_SET_HEADER_LENGTH + self.dimension * \ + self.bytes_per_num * offset self.file.seek(bytes_offset) self.current = offset @@ -152,8 +165,8 @@ def size(self): return self.num_points def reset(self): - self.file.seek(8) # Seek to 8 bytes to skip re-reading metadata - self.current = 0 + self.file.seek(self.DATA_SET_HEADER_LENGTH) + self.current = self.BEGINNING @staticmethod def _get_data_size(file_name): diff --git a/benchmarks/osb/extensions/param_sources.py b/benchmarks/osb/extensions/param_sources.py index 17e17af10..ccc36544d 100644 --- a/benchmarks/osb/extensions/param_sources.py +++ b/benchmarks/osb/extensions/param_sources.py @@ -37,12 +37,10 @@ def __init__(self, workload, params, **kwargs): def _read_data_set(self): if self.data_set_format == "hdf5": - data_set = HDF5DataSet(self.data_set_path, Context.INDEX) - elif self.data_set_format == "bigann": - data_set = BigANNVectorDataSet(self.data_set_path) - else: - raise ConfigurationError("Invalid data set format") - return data_set + return HDF5DataSet(self.data_set_path, Context.INDEX) + if self.data_set_format == "bigann": + return BigANNVectorDataSet(self.data_set_path) + raise ConfigurationError("Invalid data set format") def partition(self, partition_index, total_partitions): if self.data_set.size() % total_partitions != 0: @@ -68,7 +66,7 @@ def action(doc_id): partition = self.data_set.read(self.bulk_size) body = bulk_transform(partition, self.field_name, action, self.current) - size = int(len(body) / 2) + size = len(body) / 2 self.current += size self.percent_completed = float(self.current)/self.total diff --git a/benchmarks/osb/extensions/runners.py b/benchmarks/osb/extensions/runners.py index 3c553611c..5ed43b8c5 100644 --- a/benchmarks/osb/extensions/runners.py +++ b/benchmarks/osb/extensions/runners.py @@ -40,8 +40,8 @@ async def __call__(self, opensearch, params): except: pass - raise TimeoutError('Failed to submit bulk request in specified number ' - 'of retries') + raise TimeoutError("Failed to submit bulk request in specified number " + "of retries: {}".format(retries)) def __repr__(self, *args, **kwargs): return "custom-vector-bulk" @@ -62,8 +62,8 @@ async def __call__(self, opensearch, params): except: pass - raise TimeoutError('Failed to refresh the index in specified number ' - 'of retries') + raise TimeoutError("Failed to refresh the index in specified number " + "of retries: {}".format(retries)) def __repr__(self, *args, **kwargs): return "custom-refresh"