From 287302ff588664016f2e9856b10b6adb36b7d041 Mon Sep 17 00:00:00 2001 From: yangxuan Date: Mon, 4 Mar 2024 12:02:28 +0800 Subject: [PATCH] fix: Unable to download some files from AliyunOSS Fixes: #267, #275, #285 Signed-off-by: yangxuan --- tests/test_data_source.py | 84 ++++--------------- tests/test_dataset.py | 54 ++++++------ tests/test_utils.py | 27 ++++++ vectordb_bench/backend/dataset.py | 114 +++++++++++--------------- vectordb_bench/backend/task_runner.py | 4 +- vectordb_bench/backend/utils.py | 30 +++++++ 6 files changed, 149 insertions(+), 164 deletions(-) diff --git a/tests/test_data_source.py b/tests/test_data_source.py index a2b1d0ed8..07e5b1878 100644 --- a/tests/test_data_source.py +++ b/tests/test_data_source.py @@ -1,78 +1,28 @@ import logging -import pathlib import pytest -from vectordb_bench.backend.data_source import AliyunOSSReader, AwsS3Reader -from vectordb_bench.backend.dataset import Dataset, DatasetManager +from vectordb_bench.backend.data_source import DatasetSource +from vectordb_bench.backend.cases import type2case -log = logging.getLogger(__name__) +log = logging.getLogger("vectordb_bench") class TestReader: - @pytest.mark.parametrize("size", [ - 100_000, - 1_000_000, - 10_000_000, + @pytest.mark.parametrize("type_case", [ + (k, v) for k, v in type2case.items() ]) - def test_cohere(self, size): - cohere = Dataset.COHERE.manager(size) - self.per_dataset_test(cohere) + def test_type_cases(self, type_case): + self.per_case_test(type_case) - @pytest.mark.parametrize("size", [ - 100_000, - 1_000_000, - ]) - def test_gist(self, size): - gist = Dataset.GIST.manager(size) - self.per_dataset_test(gist) - - @pytest.mark.parametrize("size", [ - 1_000_000, - ]) - def test_glove(self, size): - glove = Dataset.GLOVE.manager(size) - self.per_dataset_test(glove) - - @pytest.mark.parametrize("size", [ - 500_000, - 5_000_000, - # 50_000_000, - ]) - def test_sift(self, size): - sift = Dataset.SIFT.manager(size) - self.per_dataset_test(sift) - - @pytest.mark.parametrize("size", [ - 50_000, - 500_000, - 5_000_000, - ]) - def test_openai(self, size): - openai = Dataset.OPENAI.manager(size) - self.per_dataset_test(openai) - - - def per_dataset_test(self, dataset: DatasetManager): - s3_reader = AwsS3Reader() - all_files = s3_reader.ls_all(dataset.data.dir_name) - - - remote_f_names = [] - for file in all_files: - remote_f = pathlib.Path(file).name - if dataset.data.use_shuffled and remote_f.startswith("train"): - continue - - elif (not dataset.data.use_shuffled) and remote_f.startswith("shuffle"): - continue - - remote_f_names.append(remote_f) + def per_case_test(self, type_case): + t, ca_cls = type_case + ca = ca_cls() + log.info(f"test case: {t.name}, {ca.name}") - assert set(dataset.data.files) == set(remote_f_names) + filters = ca.filter_rate + ca.dataset.prepare(source=DatasetSource.AliyunOSS, check=False, filters=filters) + ali_trains = ca.dataset.train_files - aliyun_reader = AliyunOSSReader() - for fname in dataset.data.files: - p = pathlib.Path("benchmark", dataset.data.dir_name, fname) - assert aliyun_reader.bucket.object_exists(p.as_posix()) + ca.dataset.prepare(check=False, filters=filters) + s3_trains = ca.dataset.train_files - log.info(f"downloading to {dataset.data_dir}") - aliyun_reader.read(dataset.data.dir_name.lower(), dataset.data.files, dataset.data_dir) + assert ali_trains == s3_trains diff --git a/tests/test_dataset.py b/tests/test_dataset.py index f6ba05036..60a219e25 100644 --- a/tests/test_dataset.py +++ b/tests/test_dataset.py @@ -1,7 +1,8 @@ -from vectordb_bench.backend.dataset import Dataset, get_files +from vectordb_bench.backend.dataset import Dataset import logging import pytest from pydantic import ValidationError +from vectordb_bench.backend.data_source import DatasetSource log = logging.getLogger("vectordb_bench") @@ -35,34 +36,31 @@ def test_iter_cohere(self): dur_iter = time.time() - before log.warning(f"iter through cohere_10m cost={dur_iter/60}min") + # pytest -sv tests/test_dataset.py::TestDataSet::test_iter_laion + def test_iter_laion(self): + laion_100m = Dataset.LAION.manager(100_000_000) + from vectordb_bench.backend.data_source import DatasetSource + laion_100m.prepare(source=DatasetSource.AliyunOSS, check=False) -class TestGetFiles: - @pytest.mark.parametrize("train_count", [ - 1, - 10, - 50, - 100, - ]) - @pytest.mark.parametrize("with_gt", [True, False]) - def test_train_count(self, train_count, with_gt): - files = get_files(train_count, True, with_gt) - log.info(files) + import time + before = time.time() + for i in laion_100m: + log.debug(i.head(1)) - if with_gt: - assert len(files) - 4 == train_count - else: - assert len(files) - 1 == train_count + dur_iter = time.time() - before + log.warning(f"iter through laion_100m cost={dur_iter/60}min") - @pytest.mark.parametrize("use_shuffled", [True, False]) - def test_use_shuffled(self, use_shuffled): - files = get_files(1, use_shuffled, True) - log.info(files) + # https://github.com/zilliztech/VectorDBBench/issues/285 + # TODO: ok + def test_iter_openai(self): + + openai_500k = Dataset.OPENAI.manager(500_000) + openai_500k.prepare(source=DatasetSource.AliyunOSS, check=False) - trains = [f for f in files if "train" in f] - if use_shuffled: - for t in trains: - assert "shuffle_train" in t - else: - for t in trains: - assert "shuffle" not in t - assert "train" in t + import time + before = time.time() + for i in openai_500k: + log.debug(i.head(1)) + + dur_iter = time.time() - before + log.warning(f"iter through openai 500K cost={dur_iter/60}min, source=AliyunOSS") diff --git a/tests/test_utils.py b/tests/test_utils.py index f1a8e1f3d..df3fa6ffe 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -37,3 +37,30 @@ def test_recall(self, got_expected): log.info(f"recall: {res}, expected: {expected}") assert res == expected + +class TestGetFiles: + @pytest.mark.parametrize("train_count", [ + 1, + 10, + 50, + 100, + ]) + def test_train_count(self, train_count): + files = utils.compose_train_files(train_count, True) + log.info(files) + + assert len(files) == train_count + + @pytest.mark.parametrize("use_shuffled", [True, False]) + def test_use_shuffled(self, use_shuffled): + files = utils.compose_train_files(1, use_shuffled) + log.info(files) + + trains = [f for f in files if "train" in f] + if use_shuffled: + for t in trains: + assert "shuffle_train" in t + else: + for t in trains: + assert "shuffle" not in t + assert "train" in t diff --git a/vectordb_bench/backend/dataset.py b/vectordb_bench/backend/dataset.py index a227b4147..ffe5b10f0 100644 --- a/vectordb_bench/backend/dataset.py +++ b/vectordb_bench/backend/dataset.py @@ -22,7 +22,7 @@ log = logging.getLogger(__name__) -SizeLabel = namedtuple('SizeLabel', ['size', 'label', 'files']) +SizeLabel = namedtuple('SizeLabel', ['size', 'label', 'file_count']) class BaseDataset(BaseModel): @@ -31,6 +31,7 @@ class BaseDataset(BaseModel): dim: int metric_type: MetricType use_shuffled: bool + with_gt: bool = False _size_label: dict[int, SizeLabel] = PrivateAttr() @validator("size") @@ -48,34 +49,8 @@ def dir_name(self) -> str: return f"{self.name}_{self.label}_{utils.numerize(self.size)}".lower() @property - def files(self) -> str: - return self._size_label.get(self.size).files - - -def get_files(train_count: int, use_shuffled: bool, with_gt: bool = True) -> list[str]: - prefix = "shuffle_train" if use_shuffled else "train" - middle = f"of-{train_count}" - surfix = "parquet" - - train_files = [] - if train_count > 1: - just_size = len(str(train_count)) - for i in range(train_count): - sub_file = f"{prefix}-{str(i).rjust(just_size, '0')}-{middle}.{surfix}" - train_files.append(sub_file) - else: - train_files.append(f"{prefix}.{surfix}") - - files = ['test.parquet'] - if with_gt: - files.extend([ - 'neighbors.parquet', - 'neighbors_tail_1p.parquet', - 'neighbors_head_1p.parquet', - ]) - - files.extend(train_files) - return files + def file_count(self) -> int: + return self._size_label.get(self.size).file_count class LAION(BaseDataset): @@ -83,8 +58,9 @@ class LAION(BaseDataset): dim: int = 768 metric_type: MetricType = MetricType.L2 use_shuffled: bool = False + with_gt: bool = True _size_label: dict = { - 100_000_000: SizeLabel(100_000_000, "LARGE", get_files(100, False)), + 100_000_000: SizeLabel(100_000_000, "LARGE", 100), } @@ -94,8 +70,8 @@ class GIST(BaseDataset): metric_type: MetricType = MetricType.L2 use_shuffled: bool = False _size_label: dict = { - 100_000: SizeLabel(100_000, "SMALL", get_files(1, False, False)), - 1_000_000: SizeLabel(1_000_000, "MEDIUM", get_files(1, False, False)), + 100_000: SizeLabel(100_000, "SMALL", 1), + 1_000_000: SizeLabel(1_000_000, "MEDIUM", 1), } @@ -104,10 +80,11 @@ class Cohere(BaseDataset): dim: int = 768 metric_type: MetricType = MetricType.COSINE use_shuffled: bool = config.USE_SHUFFLED_DATA + with_gt: bool = True, _size_label: dict = { - 100_000: SizeLabel(100_000, "SMALL", get_files(1, config.USE_SHUFFLED_DATA)), - 1_000_000: SizeLabel(1_000_000, "MEDIUM", get_files(1, config.USE_SHUFFLED_DATA)), - 10_000_000: SizeLabel(10_000_000, "LARGE", get_files(10, config.USE_SHUFFLED_DATA)), + 100_000: SizeLabel(100_000, "SMALL", 1), + 1_000_000: SizeLabel(1_000_000, "MEDIUM", 1), + 10_000_000: SizeLabel(10_000_000, "LARGE", 10), } @@ -116,7 +93,7 @@ class Glove(BaseDataset): dim: int = 200 metric_type: MetricType = MetricType.COSINE use_shuffled: bool = False - _size_label: dict = {1_000_000: SizeLabel(1_000_000, "MEDIUM", get_files(1, False, False))} + _size_label: dict = {1_000_000: SizeLabel(1_000_000, "MEDIUM", 1)} class SIFT(BaseDataset): @@ -125,9 +102,9 @@ class SIFT(BaseDataset): metric_type: MetricType = MetricType.L2 use_shuffled: bool = False _size_label: dict = { - 500_000: SizeLabel(500_000, "SMALL", get_files(1, False, False)), - 5_000_000: SizeLabel(5_000_000, "MEDIUM", get_files(1, False, False)), - # 50_000_000: SizeLabel(50_000_000, "LARGE", get_files(50, False, False)), + 500_000: SizeLabel(500_000, "SMALL", 1,), + 5_000_000: SizeLabel(5_000_000, "MEDIUM", 1), + # 50_000_000: SizeLabel(50_000_000, "LARGE", 50), } @@ -136,10 +113,11 @@ class OpenAI(BaseDataset): dim: int = 1536 metric_type: MetricType = MetricType.COSINE use_shuffled: bool = config.USE_SHUFFLED_DATA + with_gt: bool = True, _size_label: dict = { - 50_000: SizeLabel(50_000, "SMALL", get_files(1, config.USE_SHUFFLED_DATA)), - 500_000: SizeLabel(500_000, "MEDIUM", get_files(1, config.USE_SHUFFLED_DATA)), - 5_000_000: SizeLabel(5_000_000, "LARGE", get_files(10, config.USE_SHUFFLED_DATA)), + 50_000: SizeLabel(50_000, "SMALL", 1), + 500_000: SizeLabel(500_000, "MEDIUM", 1), + 5_000_000: SizeLabel(5_000_000, "LARGE", 10), } @@ -155,6 +133,7 @@ class DatasetManager(BaseModel): """ data: BaseDataset test_data: pd.DataFrame | None = None + gt_data: pd.DataFrame | None = None train_files : list[str] = [] reader: DatasetReader | None = None @@ -180,50 +159,51 @@ def data_dir(self) -> pathlib.Path: def __iter__(self): return DataSetIterator(self) - def prepare(self, source: DatasetSource=DatasetSource.S3, check: bool=True) -> bool: + # TODO passing use_shuffle from outside + def prepare(self, + source: DatasetSource=DatasetSource.S3, + check: bool=True, + filters: int | float | str | None = None, + ) -> bool: """Download the dataset from DatasetSource url = f"{source}/{self.data.dir_name}" - download files from url to self.data_dir, there'll be 4 types of files in the data_dir - - train*.parquet: for training - - test.parquet: for testing - - neighbors.parquet: ground_truth of the test.parquet - - neighbors_head_1p.parquet: ground_truth of the test.parquet after filtering 1% data - - neighbors_99p.parquet: ground_truth of the test.parquet after filtering 99% data - Args: source(DatasetSource): S3 or AliyunOSS, default as S3 - check(bool): Whether to do etags check + check(bool): Whether to do etags check, default as ture + filters(Optional[int | float | str]): combined with dataset's with_gt to + compose the correct ground_truth file Returns: bool: whether the dataset is successfully prepared """ + file_count, use_shuffled = self.data.file_count, self.data.use_shuffled + + train_files = utils.compose_train_files(file_count, use_shuffled) + all_files = train_files + + gt_file, test_file = None, None + if self.data.with_gt: + gt_file, test_file = utils.compose_gt_file(filters), "test.parquet" + all_files.extend([gt_file, test_file]) + source.reader().read( dataset=self.data.dir_name.lower(), - files=self.data.files, + files=all_files, local_ds_root=self.data_dir, check_etag=check, ) - prefix = "shuffle_train" if self.data.use_shuffled else "train" + if gt_file is not None and test_file is not None: + self.test_data = self._read_file(test_file) + self.gt_data = self._read_file(gt_file) + + prefix = "shuffle_train" if use_shuffled else "train" self.train_files = sorted([f.name for f in self.data_dir.glob(f'{prefix}*.parquet')]) log.debug(f"{self.data.name}: available train files {self.train_files}") - self.test_data = self._read_file("test.parquet") - return True - def get_ground_truth(self, filters: int | float | None = None) -> pd.DataFrame: - - file_name = "" - if filters is None: - file_name = "neighbors.parquet" - elif filters == 0.01: - file_name = "neighbors_head_1p.parquet" - elif filters == 0.99: - file_name = "neighbors_tail_1p.parquet" - else: - raise ValueError(f"Filters not supported: {filters}") - return self._read_file(file_name) + return True def _read_file(self, file_name: str) -> pd.DataFrame: """read one file from disk into memory""" diff --git a/vectordb_bench/backend/task_runner.py b/vectordb_bench/backend/task_runner.py index 80c5ac1df..a5f4e317f 100644 --- a/vectordb_bench/backend/task_runner.py +++ b/vectordb_bench/backend/task_runner.py @@ -84,7 +84,7 @@ def init_db(self, drop_old: bool = True) -> None: def _pre_run(self, drop_old: bool = True): try: self.init_db(drop_old) - self.ca.dataset.prepare(self.dataset_source) + self.ca.dataset.prepare(self.dataset_source, filters=self.ca.filter_rate) except ModuleNotFoundError as e: log.warning(f"pre run case error: please install client for db: {self.config.db}, error={e}") raise e from None @@ -215,7 +215,7 @@ def _init_search_runner(self): test_emb = test_emb / np.linalg.norm(test_emb, axis=1)[:, np.newaxis] self.test_emb = test_emb.tolist() - gt_df = self.ca.dataset.get_ground_truth(self.ca.filter_rate) + gt_df = self.ca.dataset.gt_data self.serial_search_runner = SerialSearchRunner( db=self.db, diff --git a/vectordb_bench/backend/utils.py b/vectordb_bench/backend/utils.py index d53da31a6..690cff3e8 100644 --- a/vectordb_bench/backend/utils.py +++ b/vectordb_bench/backend/utils.py @@ -42,3 +42,33 @@ def inner(*args, **kwargs): delta = time.perf_counter() - pref return result, delta return inner + + +def compose_train_files(train_count: int, use_shuffled: bool) -> list[str]: + prefix = "shuffle_train" if use_shuffled else "train" + middle = f"of-{train_count}" + surfix = "parquet" + + train_files = [] + if train_count > 1: + just_size = 2 + for i in range(train_count): + sub_file = f"{prefix}-{str(i).rjust(just_size, '0')}-{middle}.{surfix}" + train_files.append(sub_file) + else: + train_files.append(f"{prefix}.{surfix}") + + return train_files + + +def compose_gt_file(filters: int | float | str | None = None) -> str: + if filters is None: + return "neighbors.parquet" + + if filters == 0.01: + return "neighbors_head_1p.parquet" + + if filters == 0.99: + return "neighbors_tail_1p.parquet" + + raise ValueError(f"Filters not supported: {filters}")