diff --git a/jinahub/indexers/query/vector/FaissIndexer/Dockerfile b/jinahub/indexers/query/vector/FaissIndexer/Dockerfile index 62f95e39..8a995b24 100644 --- a/jinahub/indexers/query/vector/FaissIndexer/Dockerfile +++ b/jinahub/indexers/query/vector/FaissIndexer/Dockerfile @@ -1,9 +1,15 @@ -FROM jinaai/jina:1.2.0 +FROM jinaai/jina:master +# install and upgrade pip +RUN apt-get update && apt-get install -y python3.7 python3.7-dev python3-pip git +RUN python3.7 -m pip install --upgrade pip + +# setup the workspace COPY . /workspace WORKDIR /workspace -RUN pip install -r requirements.txt -RUN pip install pytest && pytest tests/ -v -s +# install Jina and third-party requirements +RUN python3.7 -m pip install -r requirements.txt +RUN python3.7 -m pip install pytest -ENTRYPOINT ["jina", "pod", "--uses", "config.yml"] \ No newline at end of file +ENTRYPOINT ["jina", "pod", "--uses", "config.yml"] diff --git a/jinahub/indexers/query/vector/FaissIndexer/README.md b/jinahub/indexers/query/vector/FaissIndexer/README.md index 0f88f856..11c08f78 100644 --- a/jinahub/indexers/query/vector/FaissIndexer/README.md +++ b/jinahub/indexers/query/vector/FaissIndexer/README.md @@ -2,64 +2,3 @@ Faiss is a library for efficient similarity search and clustering of dense vectors. It contains algorithms that search in sets of vectors of any size, up to ones that possibly do not fit in RAM. It also contains supporting code for evaluation and parameter tuning. Faiss is written in C++ with complete wrappers for Python/numpy. Some of the most useful algorithms are implemented on the GPU. It is developed by Facebook AI Research. FaissIndexer is hence a Faiss powered vector indexer. For more information, refer [Facebook Research's Faiss](https://github.com/facebookresearch/faiss) GitHub repo. - -## Snippets: - -Initialise FaissIndexer: - -`FaissIndexer(index_key=key_str, train_filepath=fpath_str, max_num_training_points=train_int, requires_training=train_bool, distance='l2', normalize=norm_bool, nprobe=probe_int )` - -Users can use Pod images in several ways: - -**NOTE**: - -- `MODULE_VERSION` is the version of the FaissIndexer, in semver format. E.g. `0.0.15`. -- `JINA_VERSION` is the version of the Jina core version with which the Docker image was built. E.g. `1.0.0` - -- Flow API - - ```python - from jina.flow import Flow - f = (Flow() - .add(name='my-indexer', uses='docker://jinahub/pod.indexer.faissindexer:MODULE_VERSION-JINA_VERSION') - ``` - -- Flow YAML file - - This is the only way to provide arguments to its parameters: - - ```yaml - pods: - - name: faiss - uses: indexers/vector/FaissIndexer/config.yml - ``` - - and then in `leveldb.yml`: - - ```yaml - !FaissIndexer - with: - hostname: yourdomain.com - port: 6379 - db: 0 - ``` - -- Jina CLI - - ```bash - jina pod --uses docker://jinahub/pod.indexer.faissindexer:MODULE_VERSION-JINA_VERSION - ``` - -- Conventional local usage with `uses` argument - - ```bash - jina pod --uses indexers/vector/FaissIndexer/config.yml --port-in 55555 --port-out 55556 - ``` - -- Run with Docker (`docker run`) - - Specify the image name along with the version tag. The snippet below uses Jina version as `JINA_VERSION`. - - ```bash - docker run --network host docker://jinahub/pod.indexer.faissindexer:MODULE_VERSION-JINA_VERSION --port-in 55555 --port-out 55556 - ``` diff --git a/jinahub/indexers/query/vector/FaissIndexer/__init__.py b/jinahub/indexers/query/vector/FaissIndexer/__init__.py index f3cfbbef..68334fce 100644 --- a/jinahub/indexers/query/vector/FaissIndexer/__init__.py +++ b/jinahub/indexers/query/vector/FaissIndexer/__init__.py @@ -1,17 +1,17 @@ __copyright__ = "Copyright (c) 2021 Jina AI Limited. All rights reserved." __license__ = "Apache-2.0" -from typing import Tuple, Optional +import gzip +from typing import Optional, Dict import numpy as np -from jina.executors.devices import FaissDevice -from jina.executors.indexers.vector import BaseNumpyIndexer -from jina.executors.decorators import batching +from jina import Executor, DocumentArray, requests, Document +from jina_commons import get_logger +from jina_commons.indexers.dump import import_vectors -class FaissIndexer(FaissDevice, BaseNumpyIndexer): - batch_size = 512 +class FaissIndexer(Executor): """Faiss powered vector indexer For more information about the Faiss supported parameters and installation problems, please consult: @@ -19,51 +19,54 @@ class FaissIndexer(FaissDevice, BaseNumpyIndexer): .. note:: Faiss package dependency is only required at the query time. + + :param index_key: index type supported by ``faiss.index_factory`` + :param train_filepath: the training data file path, e.g ``faiss.tgz`` or `faiss.npy`. The data file is expected + to be either `.npy` file from `numpy.save()` or a `.tgz` file from `NumpyIndexer`. If none is provided, `indexed` data will be used + to train the Indexer (In that case, one must be careful when sharding is enabled, because every shard will be trained with its own part of data). + The data will only be loaded if `requires_training` is set to True. + :param max_num_training_points: Optional argument to consider only a subset of training points to training data from `train_filepath`. + The points will be selected randomly from the available points + :param requires_training: Boolean flag indicating if the index type requires training to be run before building index. + :param distance: 'l2' or 'inner_product' accepted. Determines which distances to optimize by FAISS. l2...smaller is better, inner_product...larger is better + :param normalize: whether or not to normalize the vectors e.g. for the cosine similarity https://github.com/facebookresearch/faiss/wiki/MetricType-and-distances#how-can-i-index-vectors-for-cosine-similarity + :param nprobe: Number of clusters to consider at search time. + + .. highlight:: python + .. code-block:: python + # generate a training file in `.tgz` + import gzip + import numpy as np + from jina.executors.indexers.vector.faiss import FaissIndexer + + train_filepath = 'faiss_train.tgz' + train_data = np.random.rand(10000, 128) + with gzip.open(train_filepath, 'wb', compresslevel=1) as f: + f.write(train_data.astype('float32')) + indexer = FaissIndexer('PCA64,FLAT', train_filepath) + + # generate a training file in `.npy` + train_filepath = 'faiss_train' + np.save(train_filepath, train_data) + indexer = FaissIndexer('PCA64,FLAT', train_filepath) """ - def __init__(self, - index_key: str, - train_filepath: Optional[str] = None, - max_num_training_points: Optional[int] = None, - requires_training: bool = True, - distance: str = 'l2', - normalize: bool = False, - nprobe: int = 1, - *args, - **kwargs): - """ - Initialize an Faiss Indexer - - :param index_key: index type supported by ``faiss.index_factory`` - :param train_filepath: the training data file path, e.g ``faiss.tgz`` or `faiss.npy`. The data file is expected - to be either `.npy` file from `numpy.save()` or a `.tgz` file from `NumpyIndexer`. If none is provided, `indexed` data will be used - to train the Indexer (In that case, one must be careful when sharding is enabled, because every shard will be trained with its own part of data). - The data will only be loaded if `requires_training` is set to True. - :param max_num_training_points: Optional argument to consider only a subset of training points to training data from `train_filepath`. - The points will be selected randomly from the available points - :param requires_training: Boolean flag indicating if the index type requires training to be run before building index. - :param distance: 'l2' or 'inner_product' accepted. Determines which distances to optimize by FAISS. l2...smaller is better, inner_product...larger is better - :param normalize: whether or not to normalize the vectors e.g. for the cosine similarity https://github.com/facebookresearch/faiss/wiki/MetricType-and-distances#how-can-i-index-vectors-for-cosine-similarity - :param nprobe: Number of clusters to consider at search time. - - .. highlight:: python - .. code-block:: python - # generate a training file in `.tgz` - import gzip - import numpy as np - from jina.executors.indexers.vector.faiss import FaissIndexer - - train_filepath = 'faiss_train.tgz' - train_data = np.random.rand(10000, 128) - with gzip.open(train_filepath, 'wb', compresslevel=1) as f: - f.write(train_data.astype('float32')) - indexer = FaissIndexer('PCA64,FLAT', train_filepath) - - # generate a training file in `.npy` - train_filepath = 'faiss_train' - np.save(train_filepath, train_data) - indexer = FaissIndexer('PCA64,FLAT', train_filepath) - """ + def __init__( + self, + index_key: str, + train_filepath: Optional[str] = None, + max_num_training_points: Optional[int] = None, + requires_training: bool = True, + distance: str = 'l2', + normalize: bool = False, + nprobe: int = 1, + dump_path: Optional[str] = None, + traverse_path: str = 'r', + default_top_k: int = 5, + on_gpu: bool = False, + *args, + **kwargs, + ): super().__init__(*args, **kwargs) self.index_key = index_key self.requires_training = requires_training @@ -72,8 +75,53 @@ def __init__(self, self.distance = distance self.normalize = normalize self.nprobe = nprobe + self.on_gpu = on_gpu + + self.default_top_k = default_top_k + self.traverse_path = traverse_path + + self.logger = get_logger(self) + + dump_path = dump_path or kwargs.get('runtime_args').get('dump_path') + if dump_path is not None: + self.logger.info('Start building "AnnoyIndexer" from dump data') + ids, vecs = import_vectors(dump_path, str(self.runtime_args.pea_id)) + self._ids = np.array(list(ids)) + self._ext2int = {v: i for i, v in enumerate(self._ids)} + self._vecs = np.array(list(vecs)) + self.num_dim = self._vecs.shape[1] + self.dtype = self._vecs.dtype + self.index = self._build_index(self._vecs) + else: + self.logger.warning( + 'No data loaded in "AnnoyIndexer". Use .rolling_update() to re-initialize it...' + ) + + def device(self): + """ + Set the device on which the executors using :mod:`faiss` library will be running. + + ..notes: + In the case of using GPUs, we only use the first gpu from the visible gpus. To specify which gpu to use, + please use the environment variable `CUDA_VISIBLE_DEVICES`. + """ + import faiss + + # For now, consider only one GPU, do not distribute the index + return faiss.StandardGpuResources() if self.on_gpu else None - def build_advanced_index(self, vecs: 'np.ndarray'): + def to_device(self, index, *args, **kwargs): + """Load the model to device.""" + import faiss + + device = self.device + return ( + faiss.index_cpu_to_gpu(device, 0, index, None) + if device is not None + else index + ) + + def _build_index(self, vecs: 'np.ndarray'): """Build an advanced index structure from a numpy array. :param vecs: numpy array containing the vectors to index @@ -82,12 +130,18 @@ def build_advanced_index(self, vecs: 'np.ndarray'): metric = faiss.METRIC_L2 if self.distance == 'inner_product': - self.logger.warning('inner_product will be output as distance instead of similarity.') + self.logger.warning( + 'inner_product will be output as distance instead of similarity.' + ) metric = faiss.METRIC_INNER_PRODUCT if self.distance not in {'inner_product', 'l2'}: - self.logger.warning('Invalid distance metric for Faiss index construction. Defaulting to l2 distance') + self.logger.warning( + 'Invalid distance metric for Faiss index construction. Defaulting to l2 distance' + ) - index = self.to_device(index=faiss.index_factory(self.num_dim, self.index_key, metric)) + index = self.to_device( + index=faiss.index_factory(self.num_dim, self.index_key, metric) + ) if self.requires_training: if self.train_filepath: @@ -96,14 +150,20 @@ def build_advanced_index(self, vecs: 'np.ndarray'): self.logger.info(f'Taking indexed data as training points') train_data = vecs if train_data is None: - self.logger.warning('Loading training data failed. some faiss indexes require previous training.') + self.logger.warning( + 'Loading training data failed. some faiss indexes require previous training.' + ) else: if self.max_num_training_points: - self.logger.warning(f'From train_data with num_points {train_data.shape[0]}, ' - f'sample {self.max_num_training_points} points') - random_indices = np.random.choice(train_data.shape[0], - size=min(self.max_num_training_points, train_data.shape[0]), - replace=False) + self.logger.warning( + f'From train_data with num_points {train_data.shape[0]}, ' + f'sample {self.max_num_training_points} points' + ) + random_indices = np.random.choice( + train_data.shape[0], + size=min(self.max_num_training_points, train_data.shape[0]), + replace=False, + ) train_data = train_data[random_indices, :] train_data = train_data.astype(np.float32) if self.normalize: @@ -114,36 +174,60 @@ def build_advanced_index(self, vecs: 'np.ndarray'): index.nprobe = self.nprobe return index - @batching def _build_partial_index(self, vecs: 'np.ndarray', index): vecs = vecs.astype(np.float32) if self.normalize: from faiss import normalize_L2 + normalize_L2(vecs) index.add(vecs) - def query(self, vecs: 'np.ndarray', top_k: int, *args, **kwargs) -> Tuple['np.ndarray', 'np.ndarray']: + return + + @requests(on='/search') + def query(self, docs: DocumentArray, parameters: Dict = None, *args, **kwargs): """Find the top-k vectors with smallest ``metric`` and return their ids in ascending order. - :param keys: numpy array containing vectors to search for - :param top_k: upper limit of responses for each search vector + + :param docs: the DocumentArray containing the documents to search with + :param parameters: """ + if parameters is None: + parameters = {} + + top_k = parameters.get('top_k', self.default_top_k) + traversal_path = parameters.get('traversal_path', self.traverse_path) + + docs_to_traverse = docs.traverse_flat(traversal_path) + + vecs = np.array(docs_to_traverse.get_attributes('embedding')) + if self.normalize: from faiss import normalize_L2 + normalize_L2(vecs) - dist, ids = self.query_handler.search(vecs, top_k) + dists, ids = self.index.search(vecs, top_k) if self.distance == 'inner_product': - dist = 1 - dist - keys = self._int2ext_id[self.valid_indices][ids] - return keys, dist + dists = 1 - dists + for doc_idx, matches in enumerate(zip(ids, dists)): + for m_info in zip(*matches): + idx, dist = m_info + match = Document(id=self._ids[idx], embedding=self._vecs[idx]) + match.score.value = dist + docs_to_traverse[doc_idx].matches.append(match) def _train(self, index, data: 'np.ndarray', *args, **kwargs) -> None: _num_samples, _num_dim = data.shape if not self.num_dim: self.num_dim = _num_dim if self.num_dim != _num_dim: - raise ValueError('training data should have the same number of features as the index, {} != {}'.format( - self.num_dim, _num_dim)) - self.logger.info(f'Training faiss Indexer with {_num_samples} points of {self.num_dim}') + raise ValueError( + 'training data should have the same number of features as the index, {} != {}'.format( + self.num_dim, _num_dim + ) + ) + self.logger.info( + f'Training faiss Indexer with {_num_samples} points of {self.num_dim}' + ) index.train(data) @@ -153,17 +237,26 @@ def _load_training_data(self, train_filepath: str) -> 'np.ndarray': try: result = self._load_gzip(train_filepath) except Exception as e: - self.logger.error('Loading training data from gzip failed, filepath={}, {}'.format(train_filepath, e)) + self.logger.error( + 'Loading training data from gzip failed, filepath={}, {}'.format( + train_filepath, e + ) + ) if result is None: try: result = np.load(train_filepath) if isinstance(result, np.lib.npyio.NpzFile): - self.logger.warning('.npz format is not supported. Please save the array in .npy format.') + self.logger.warning( + '.npz format is not supported. Please save the array in .npy format.' + ) result = None except Exception as e: self.logger.error( - 'Loading training data with np.load failed, filepath={}, {}'.format(train_filepath, e)) + 'Loading training data with np.load failed, filepath={}, {}'.format( + train_filepath, e + ) + ) if result is None: try: @@ -172,5 +265,32 @@ def _load_training_data(self, train_filepath: str) -> 'np.ndarray': result = f.read() except Exception as e: self.logger.error( - 'Loading training data from binary file failed, filepath={}, {}'.format(train_filepath, e)) + 'Loading training data from binary file failed, filepath={}, {}'.format( + train_filepath, e + ) + ) return result + + def _load_gzip(self, abspath: str, mode='rb') -> Optional['np.ndarray']: + try: + self.logger.info(f'loading index from {abspath}...') + with gzip.open(abspath, mode) as fp: + return np.frombuffer(fp.read(), dtype=self.dtype).reshape( + [-1, self.num_dim] + ) + except EOFError: + self.logger.error( + f'{abspath} is broken/incomplete, perhaps forgot to ".close()" in the last usage?' + ) + + @property + def size(self): + if hasattr(self, '_ids'): + return len(self._ids) + else: + return 0 + + @requests(on='/fill_embedding') + def fill_embedding(self, query_da: DocumentArray, **kwargs): + for doc in query_da: + doc.embedding = self._vecs[self._ext2int[doc.id]] diff --git a/jinahub/indexers/query/vector/FaissIndexer/requirements.txt b/jinahub/indexers/query/vector/FaissIndexer/requirements.txt index b41920fa..5e8f9c08 100644 --- a/jinahub/indexers/query/vector/FaissIndexer/requirements.txt +++ b/jinahub/indexers/query/vector/FaissIndexer/requirements.txt @@ -1 +1,2 @@ faiss-cpu==1.6.5 +git+https://github.com/jina-ai/jina-commons diff --git a/jinahub/indexers/query/vector/FaissIndexer/tests/test_faissindexer.py b/jinahub/indexers/query/vector/FaissIndexer/tests/test_faissindexer.py index a116091c..95cde569 100644 --- a/jinahub/indexers/query/vector/FaissIndexer/tests/test_faissindexer.py +++ b/jinahub/indexers/query/vector/FaissIndexer/tests/test_faissindexer.py @@ -3,20 +3,33 @@ import gzip import os -import pytest import numpy as np -from jina.executors.indexers import BaseIndexer +import pytest +from jina import DocumentArray, Document from jina.executors.metas import get_default_metas +from jina_commons.indexers.dump import export_dump_streaming from .. import FaissIndexer + +def _get_docs_from_vecs(queries): + docs = DocumentArray() + for q in queries: + doc = Document(embedding=q) + docs.append(doc) + return docs + + # fix the seed here np.random.seed(500) retr_idx = None vec_idx = np.random.randint(0, high=100, size=[10]).astype(str) vec = np.array(np.random.random([10, 10]), dtype=np.float32) + query = np.array(np.random.random([10, 10]), dtype=np.float32) +query_docs = _get_docs_from_vecs(query) + cur_dir = os.path.dirname(os.path.abspath(__file__)) @@ -30,34 +43,52 @@ def metas(tmpdir): del os.environ['TEST_WORKSPACE'] -def test_faiss_indexer(metas): +@pytest.fixture() +def tmpdir_dump(tmpdir): + from jina_commons.indexers.dump import export_dump_streaming + + export_dump_streaming( + os.path.join(tmpdir, 'dump'), + 1, + len(vec_idx), + zip(vec_idx, vec, [b'' for _ in range(len(vec))]), + ) + return os.path.join(tmpdir, 'dump') + + +def test_faiss_indexer(metas, tmpdir_dump): + print(f'dump path = {tmpdir_dump}') + train_filepath = os.path.join(os.environ['TEST_WORKSPACE'], 'train.tgz') train_data = np.array(np.random.random([1024, 10]), dtype=np.float32) with gzip.open(train_filepath, 'wb', compresslevel=1) as f: f.write(train_data.tobytes()) - with FaissIndexer(index_filename='faiss.test.gz', index_key='IVF10,PQ2', train_filepath=train_filepath, - metas=metas) as indexer: - indexer.add(vec_idx, vec) - indexer.save() - assert os.path.exists(indexer.index_abspath) - save_abspath = indexer.save_abspath + indexer = FaissIndexer( + index_key='IVF10,PQ2', + train_filepath=train_filepath, + dump_path=tmpdir_dump, + metas=metas, + runtime_args={'pea_id': 0}, + ) + indexer.query(query_docs, parameters={'top_k': 4}) + assert len(query_docs[0].matches) == 4 + for d in query_docs: + assert d.matches[0].score.value <= d.matches[1].score.value - with BaseIndexer.load(save_abspath) as indexer: - assert isinstance(indexer, FaissIndexer) - idx, dist = indexer.query(query, top_k=4) - assert idx.shape == dist.shape - assert idx.shape == (10, 4) - -@pytest.mark.parametrize('compression_level', [0, 1, 2, 3]) @pytest.mark.parametrize('train_data', ['new', 'none', 'index']) -def test_faiss_indexer_known(metas, train_data, compression_level): - vectors = np.array([[1, 1, 1], - [10, 10, 10], - [100, 100, 100], - [1000, 1000, 1000]], dtype=np.float32) - keys = np.array([4, 5, 6, 7]).reshape(-1, 1).astype(str) +def test_faiss_indexer_known(metas, train_data, tmpdir): + vectors = np.array( + [[1, 1, 1], [10, 10, 10], [100, 100, 100], [1000, 1000, 1000]], dtype=np.float32 + ) + keys = np.array([4, 5, 6, 7]).astype(str) + export_dump_streaming( + os.path.join(tmpdir, 'dump'), + 1, + len(keys), + zip(keys, vectors, [b'' for _ in range(len(vectors))]), + ) if train_data == 'new': train_filepath = os.path.join(os.environ['TEST_WORKSPACE'], 'train.tgz') @@ -69,94 +100,45 @@ def test_faiss_indexer_known(metas, train_data, compression_level): elif train_data == 'index': train_filepath = os.path.join(metas['workspace'], 'faiss.test.gz') - with FaissIndexer(index_filename='faiss.test.gz', - compression_level=compression_level, - index_key='Flat', - train_filepath=train_filepath, - metas=metas) as indexer: - indexer.add(keys, vectors) - indexer.save() - assert os.path.exists(indexer.index_abspath) - save_abspath = indexer.save_abspath - - queries = np.array([[1, 1, 1], - [10, 10, 10], - [100, 100, 100], - [1000, 1000, 1000]], dtype=np.float32) - with BaseIndexer.load(save_abspath) as indexer: - assert isinstance(indexer, FaissIndexer) - idx, dist = indexer.query(queries, top_k=2) - np.testing.assert_equal(idx, np.array([[4, 5], [5, 4], [6, 5], [7, 6]]).astype(str)) - assert idx.shape == dist.shape - assert idx.shape == (4, 2) - np.testing.assert_equal(indexer.query_by_key(['7', '4']), vectors[[3, 0]]) - - -def test_faiss_indexer_known_update_delete(metas): - vectors = np.array([[1, 1, 1], - [10, 10, 10], - [100, 100, 100], - [1000, 1000, 1000]], dtype=np.float32) - keys = np.array([4, 5, 6, 7]).reshape(-1, 1).astype(str) - - train_filepath = os.path.join(os.environ['TEST_WORKSPACE'], 'train.tgz') - train_data = vectors - with gzip.open(train_filepath, 'wb', compresslevel=1) as f: - f.write(train_data.tobytes()) - - with FaissIndexer(index_filename='faiss.test.gz', index_key='Flat', train_filepath=train_filepath, - metas=metas) as indexer: - indexer.add(keys, vectors) - indexer.save() - assert os.path.exists(indexer.index_abspath) - save_abspath = indexer.save_abspath - - queries = np.array([[1, 1, 1], - [10, 10, 10], - [100, 100, 100], - [1000, 1000, 1000]], dtype=np.float32) - with BaseIndexer.load(save_abspath) as indexer: - assert isinstance(indexer, FaissIndexer) - idx, dist = indexer.query(queries, top_k=2) - np.testing.assert_equal(idx, np.array([[4, 5], [5, 4], [6, 5], [7, 6]]).astype(str)) - assert idx.shape == dist.shape - assert idx.shape == (4, 2) - np.testing.assert_equal(indexer.query_by_key(['7', '4']), vectors[[3, 0]]) - - # update - with BaseIndexer.load(save_abspath) as indexer: - indexer.update(['4'], np.array([[200, 200, 200]], dtype=np.float32)) - indexer.save() - assert indexer.size == 4 - - with BaseIndexer.load(save_abspath) as indexer: - assert isinstance(indexer, FaissIndexer) - idx, dist = indexer.query(queries, top_k=3) - np.testing.assert_equal(idx, np.array([[5, 6, 4], [5, 6, 4], [6, 5, 4], [7, 4, 6]]).astype(str)) - assert idx.shape == dist.shape - assert idx.shape == (4, 3) - - # delete - with BaseIndexer.load(save_abspath) as indexer: - indexer.delete(['4']) - indexer.save() - assert indexer.size == 3 - - with BaseIndexer.load(save_abspath) as indexer: - assert isinstance(indexer, FaissIndexer) - idx, dist = indexer.query(queries, top_k=2) - np.testing.assert_equal(idx, np.array([[5, 6], [5, 6], [6, 5], [7, 6]]).astype(str)) - assert idx.shape == dist.shape - assert idx.shape == (4, 2) - - -def test_faiss_indexer_known_big(metas): + indexer = FaissIndexer( + index_key='Flat', + train_filepath=train_filepath, + metas=metas, + dump_path=os.path.join(tmpdir, 'dump'), + runtime_args={'pea_id': 0}, + ) + assert indexer.size == len(keys) + + queries = np.array( + [[1, 1, 1], [10, 10, 10], [100, 100, 100], [1000, 1000, 1000]], dtype=np.float32 + ) + TOP_K = 2 + docs = _get_docs_from_vecs(queries) + indexer.query(docs, parameters={'top_k': TOP_K}) + idx = docs.traverse_flat('m').get_attributes('id') + dist = docs.traverse_flat('m').get_attributes('score') + np.testing.assert_equal( + idx, np.concatenate(np.array([[4, 5], [5, 4], [6, 5], [7, 6]])).astype(str) + ) + assert len(idx) == len(dist) + assert len(idx) == len(docs) * TOP_K + + docs = DocumentArray([Document(id=id) for id in ['7', '4']]) + indexer.fill_embedding(docs) + embs = docs.traverse_flat('r').get_attributes('embedding') + + np.testing.assert_equal(embs, vectors[[3, 0]]) + + +def test_faiss_indexer_known_big(metas, tmpdir): """Let's try to have some real test. We will have an index with 10k vectors of random values between 5 and 10. - We will change tweak some specific vectors that we expect to be retrieved at query time. We will tweak vector - at index [0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000], this will also be the query vectors. - Then the keys will be assigned shifted to test the proper usage of `int2ext_id` and `ext2int_id` + We will change tweak some specific vectors that we expect to be retrieved at query time. We will tweak vector + at index [0, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000], this will also be the query vectors. + Then the keys will be assigned shifted to test the proper usage of `int2ext_id` and `ext2int_id` """ - vectors = np.random.uniform(low=5.0, high=10.0, size=(10000, 1024)).astype('float32') + vectors = np.random.uniform(low=5.0, high=10.0, size=(10000, 1024)).astype( + 'float32' + ) queries = np.empty((10, 1024), dtype=np.float32) for idx in range(0, 10000, 1000): @@ -169,32 +151,64 @@ def test_faiss_indexer_known_big(metas): with gzip.open(train_filepath, 'wb', compresslevel=1) as f: f.write(train_data.tobytes()) - keys = np.arange(10000, 20000).reshape(-1, 1).astype(str) - - with FaissIndexer(index_filename='faiss.test.gz', - index_key='Flat', - requires_training=True, - train_filepath=train_filepath, - metas=metas) as indexer: - indexer.add(keys, vectors) - indexer.save() - assert os.path.exists(indexer.index_abspath) - save_abspath = indexer.save_abspath - - with BaseIndexer.load(save_abspath) as indexer: - assert isinstance(indexer, FaissIndexer) - idx, dist = indexer.query(queries, top_k=1) - np.testing.assert_equal(idx, np.array( - [[10000], [11000], [12000], [13000], [14000], [15000], [16000], [17000], [18000], [19000]]).astype(str)) - assert idx.shape == dist.shape - assert idx.shape == (10, 1) - np.testing.assert_equal(indexer.query_by_key(['10000', '15000']), vectors[[0, 5000]]) - - -@pytest.mark.parametrize('compression_level', [0, 1, 4]) -@pytest.mark.parametrize('train_data', ['new', 'none', 'index']) + keys = np.arange(10000, 20000).astype(str) + + dump_path = os.path.join(tmpdir, 'dump') + export_dump_streaming( + dump_path, + 1, + len(keys), + zip(keys, vectors, [b'' for _ in range(len(vectors))]), + ) + indexer = FaissIndexer( + index_key='Flat', + requires_training=True, + train_filepath=train_filepath, + metas=metas, + dump_path=dump_path, + runtime_args={'pea_id': 0}, + ) + assert isinstance(indexer, FaissIndexer) + docs = _get_docs_from_vecs(queries) + top_k = 1 + indexer.query(docs, parameters={'top_k': top_k}) + idx = docs.traverse_flat('m').get_attributes('id') + np.testing.assert_equal( + idx, + np.concatenate( + np.array( + [ + [10000], + [11000], + [12000], + [13000], + [14000], + [15000], + [16000], + [17000], + [18000], + [19000], + ] + ) + ).astype(str), + ) + dist = docs.traverse_flat('m').get_attributes('score') + assert len(idx) == len(dist) + assert len(idx) == (10 * top_k) + + docs = DocumentArray([Document(id=id) for id in ['10000', '15000']]) + indexer.fill_embedding(docs) + embs = docs.traverse_flat('r').get_attributes('embedding') + + np.testing.assert_equal( + embs, + vectors[[0, 5000]], + ) + + +@pytest.mark.parametrize('train_data', ['new', 'none']) @pytest.mark.parametrize('max_num_points', [None, 257, 500, 10000]) -def test_indexer_train_from_index_different_compression_levels(metas, compression_level, train_data, max_num_points): +def test_indexer_train(metas, train_data, max_num_points, tmpdir): np.random.seed(500) num_data = 500 num_dim = 64 @@ -211,48 +225,65 @@ def test_indexer_train_from_index_different_compression_levels(metas, compressio f.write(train_data.tobytes()) elif train_data == 'none': train_filepath = None - elif train_data == 'index': - train_filepath = os.path.join(metas['workspace'], f'{metas["name"]}-0', 'faiss.test.gz') - - with FaissIndexer(index_filename='faiss.test.gz', - index_key='IVF10,PQ4', - train_filepath=train_filepath, - max_num_training_points=max_num_points, - requires_training=True, - compression_level=compression_level, - metas=metas) as indexer: - indexer.add(vec_idx, vec) - indexer.save() - assert os.path.exists(indexer.index_abspath) - save_abspath = indexer.save_abspath - - with BaseIndexer.load(save_abspath) as indexer: - assert isinstance(indexer, FaissIndexer) - idx, dist = indexer.query(query, top_k=4) - assert idx.shape == dist.shape - assert idx.shape == (num_query, 4) + + dump_path = os.path.join(tmpdir, 'dump') + export_dump_streaming( + dump_path, + 1, + len(vec_idx), + zip(vec_idx, vec, [b'' for _ in range(len(vec))]), + ) + indexer = FaissIndexer( + index_key='IVF10,PQ4', + train_filepath=train_filepath, + max_num_training_points=max_num_points, + requires_training=True, + metas=metas, + dump_path=dump_path, + runtime_args={'pea_id': 0}, + ) + + query_docs = _get_docs_from_vecs(query) + top_k = 4 + indexer.query(query_docs, parameters={'top_k': top_k}) + # idx, dist = + idx = query_docs.traverse_flat('m').get_attributes('id') + dist = query_docs.traverse_flat('m').get_attributes('score') + + assert len(idx) == len(dist) + assert len(idx) == num_query * top_k @pytest.mark.parametrize('distance', ['l2', 'inner_product']) -def test_faiss_normalization(metas, distance): +def test_faiss_normalization(metas, distance, tmpdir): num_data = 2 num_dims = 64 - with FaissIndexer(index_key='Flat', - distance=distance, - normalize=True, - requires_training=True, - metas=metas) as indexer: - vecs = np.zeros((num_data, num_dims)) - vecs[:, 0] = 2 - vecs[0, 1] = 3 - keys = np.arange(0, num_data).reshape(-1, 1) - indexer.add(keys, vecs) - indexer.save() - save_abspath = indexer.save_abspath - - with BaseIndexer.load(save_abspath) as indexer: - query = np.zeros((1, num_dims)) - query[0, 0] = 5 - idx, dist = indexer.query(query.astype('float32'), top_k=2) - assert dist[0, 0] == 0 + vecs = np.zeros((num_data, num_dims)) + vecs[:, 0] = 2 + vecs[0, 1] = 3 + keys = np.arange(0, num_data).astype(str) + + dump_path = os.path.join(tmpdir, 'dump') + export_dump_streaming( + dump_path, + 1, + len(keys), + zip(keys, vecs, [b'' for _ in range(len(vecs))]), + ) + + indexer = FaissIndexer( + index_key='Flat', + distance=distance, + normalize=True, + requires_training=True, + metas=metas, + dump_path=dump_path, + runtime_args={'pea_id': 0}, + ) + query = np.zeros((1, num_dims)) + query[0, 0] = 5 + docs = _get_docs_from_vecs(query.astype('float32')) + indexer.query(docs, parameters={'top_k': 2}) + dist = docs.traverse_flat('m').get_attributes('score') + assert dist[0].value == 0