From 93f7a202ea897fed9523d5dc5d621d54ba59f4d1 Mon Sep 17 00:00:00 2001 From: yihanzhao Date: Wed, 30 Oct 2024 16:37:13 +1100 Subject: [PATCH] Remove unused add_document logics from tensor_search.py --- src/marqo/tensor_search/tensor_search.py | 1649 +--------------------- tests/marqo_test.py | 3 +- 2 files changed, 9 insertions(+), 1643 deletions(-) diff --git a/src/marqo/tensor_search/tensor_search.py b/src/marqo/tensor_search/tensor_search.py index 8d893e823..a04791714 100644 --- a/src/marqo/tensor_search/tensor_search.py +++ b/src/marqo/tensor_search/tensor_search.py @@ -30,63 +30,43 @@ won’t be searched) """ -import copy -import json -import traceback import typing -import uuid -import os from collections import defaultdict -from contextlib import ExitStack from timeit import default_timer as timer from typing import List, Optional, Union, Iterable, Sequence, Dict, Any, Tuple import numpy as np import psutil -from numpy import ndarray import marqo.core.unstructured_vespa_index.common as unstructured_common from marqo import marqo_docs from marqo.api import exceptions as api_exceptions from marqo.api import exceptions as errors -from marqo.core.constants import MARQO_CUSTOM_VECTOR_NORMALIZATION_MINIMUM_VERSION -from marqo.core.semi_structured_vespa_index.semi_structured_add_document_handler import \ - SemiStructuredAddDocumentsHandler -from marqo.core.structured_vespa_index.structured_add_document_handler import StructuredAddDocumentsHandler -from marqo.core.unstructured_vespa_index.unstructured_add_document_handler import UnstructuredAddDocumentsHandler -from marqo.tensor_search.models.api_models import CustomVectorQuery -# We depend on _httprequests.py for now, but this may be replaced in the future, as -# _httprequests.py is designed for the client from marqo.config import Config from marqo.core import constants from marqo.core import exceptions as core_exceptions from marqo.core.models.hybrid_parameters import HybridParameters -from marqo.core.models.marqo_index import IndexType, SemiStructuredMarqoIndex -from marqo.core.models.marqo_index import MarqoIndex, FieldType, UnstructuredMarqoIndex, StructuredMarqoIndex +from marqo.core.models.marqo_get_documents_by_id_response import (MarqoGetDocumentsByIdsResponse, + MarqoGetDocumentsByIdsItem) +from marqo.core.models.marqo_index import IndexType +from marqo.core.models.marqo_index import MarqoIndex from marqo.core.models.marqo_query import MarqoTensorQuery, MarqoLexicalQuery -from marqo.core.structured_vespa_index.structured_vespa_index import StructuredVespaIndex from marqo.core.structured_vespa_index.common import RANK_PROFILE_BM25, RANK_PROFILE_EMBEDDING_SIMILARITY -from marqo.core.unstructured_vespa_index import unstructured_validation as unstructured_index_add_doc_validation -from marqo.core.unstructured_vespa_index.unstructured_vespa_index import UnstructuredVespaIndex from marqo.core.vespa_index.vespa_index import for_marqo_index as vespa_index_factory from marqo.s2_inference import errors as s2_inference_errors from marqo.s2_inference import s2_inference -from marqo.s2_inference.s2_inference import infer_modality, Modality -from marqo.s2_inference.clip_utils import _is_image, validate_url -from marqo.s2_inference.processing import image as image_processor -from marqo.s2_inference.processing import text as text_processor from marqo.s2_inference.reranking import rerank +from marqo.s2_inference.s2_inference import infer_modality, Modality from marqo.tensor_search import delete_docs -from marqo.tensor_search import enums from marqo.tensor_search import index_meta_cache -from marqo.tensor_search import utils, validation, add_docs +from marqo.tensor_search import utils, validation from marqo.tensor_search.enums import ( Device, TensorField, SearchMethod ) from marqo.tensor_search.enums import EnvVars from marqo.tensor_search.index_meta_cache import get_cache -from marqo.core.models.add_docs_params import AddDocsParams from marqo.tensor_search.models.api_models import BulkSearchQueryEntity, ScoreModifierLists +from marqo.tensor_search.models.api_models import CustomVectorQuery from marqo.tensor_search.models.delete_docs_objects import MqDeleteDocsRequest from marqo.tensor_search.models.private_models import ModelAuth from marqo.tensor_search.models.search import Qidx, JHash, SearchContext, VectorisedJobs, VectorisedJobPointer, \ @@ -94,1153 +74,11 @@ from marqo.tensor_search.telemetry import RequestMetricsStore from marqo.tensor_search.tensor_search_logging import get_logger from marqo.vespa.exceptions import VespaStatusError -from marqo.vespa.models import VespaDocument, QueryResult -from marqo.core.models.marqo_add_documents_response import MarqoAddDocumentsResponse, MarqoAddDocumentsItem -from marqo.core.models.marqo_get_documents_by_id_response import (MarqoGetDocumentsByIdsResponse, - MarqoGetDocumentsByIdsItem) +from marqo.vespa.models import QueryResult logger = get_logger(__name__) -def add_documents(config: Config, add_docs_params: AddDocsParams) -> MarqoAddDocumentsResponse: - """ - Args: - config: Config object - add_docs_params: add_documents()'s parameters - """ - try: - marqo_index = index_meta_cache.get_index( - index_management=config.index_management, index_name=add_docs_params.index_name, force_refresh=True - ) - - # TODO: raise core_exceptions.IndexNotFoundError instead (fix associated tests) - except api_exceptions.IndexNotFoundError: - raise api_exceptions.IndexNotFoundError( - f"Cannot add documents to non-existent index {add_docs_params.index_name}") - - if isinstance(marqo_index, SemiStructuredMarqoIndex): - return SemiStructuredAddDocumentsHandler(marqo_index, add_docs_params, config.vespa_client, - config.index_management).add_documents() - if isinstance(marqo_index, UnstructuredMarqoIndex): - # return _add_documents_unstructured(config, add_docs_params, marqo_index) - return UnstructuredAddDocumentsHandler(marqo_index, add_docs_params, config.vespa_client).add_documents() - elif isinstance(marqo_index, StructuredMarqoIndex): - # return _add_documents_structured(config, add_docs_params, marqo_index) - return StructuredAddDocumentsHandler(marqo_index, add_docs_params, config.vespa_client).add_documents() - else: - raise api_exceptions.InternalError(f"Unknown index type {type(marqo_index)}") - - -def _add_documents_unstructured(config: Config, add_docs_params: AddDocsParams, marqo_index: UnstructuredMarqoIndex) \ - -> MarqoAddDocumentsResponse: - # ADD DOCS TIMER-LOGGER (3) - vespa_client = config.vespa_client - unstructured_vespa_index = UnstructuredVespaIndex(marqo_index) - index_model_dimensions = marqo_index.model.get_dimension() - - RequestMetricsStore.for_request().start("add_documents.processing_before_vespa") - - unstructured_index_add_doc_validation.validate_tensor_fields(add_docs_params.tensor_fields) - - multimodal_sub_fields = [] - if add_docs_params.mappings is not None: - unstructured_index_add_doc_validation.validate_mappings_object_format(add_docs_params.mappings) - for field_name, mapping in add_docs_params.mappings.items(): - if mapping.get("type", None) == enums.MappingsObjectType.multimodal_combination: - multimodal_sub_fields.extend(mapping["weights"].keys()) - - t0 = timer() - bulk_parent_dicts = [] - - if len(add_docs_params.docs) == 0: - raise errors.BadRequestError(message="Received empty add documents request") - - unsuccessful_docs: List[Tuple[int, MarqoAddDocumentsItem]] = [] - total_vectorise_time = 0 - batch_size = len(add_docs_params.docs) - media_repo = {} - - text_chunk_prefix = marqo_index.model.get_text_chunk_prefix(add_docs_params.text_chunk_prefix) - - docs, doc_ids = config.document.remove_duplicated_documents(add_docs_params.docs) - - media_download_thread_count = _determine_thread_count(marqo_index, add_docs_params) - - with ExitStack() as exit_stack: - if marqo_index.treat_urls_and_pointers_as_images or marqo_index.treat_urls_and_pointers_as_media: # review this logic - with RequestMetricsStore.for_request().time( - "image_download.full_time", - lambda t: logger.debug( - f"add_documents image download: took {t:.3f}ms to concurrently download " - f"images for {batch_size} docs using {media_download_thread_count} threads" - ) - ): - # TODO - Refactor this part to make it more readable - # We need to pass the subfields to the image downloader, so that it can download the images in the - # multimodal subfields even if the subfield is not a tensor_field - tensor_fields_and_multimodal_subfields = copy.deepcopy(add_docs_params.tensor_fields) \ - if add_docs_params.tensor_fields else [] - tensor_fields_and_multimodal_subfields.extend(multimodal_sub_fields) - media_repo = exit_stack.enter_context( - add_docs.download_and_preprocess_content( - docs=docs, - thread_count=media_download_thread_count, - tensor_fields=tensor_fields_and_multimodal_subfields, - media_download_headers=add_docs_params.media_download_headers, - model_name=marqo_index.model.name, - normalize_embeddings=marqo_index.normalize_embeddings, - media_field_types_mapping=None, - model_properties=marqo_index.model.get_properties(), - device=add_docs_params.device, - model_auth=add_docs_params.model_auth, - patch_method_exists=marqo_index.image_preprocessing.patch_method is not None, - marqo_index_type=marqo_index.type, - marqo_index_model=marqo_index.model, - audio_preprocessing=marqo_index.audio_preprocessing, - video_preprocessing=marqo_index.video_preprocessing, - ) - ) - - if add_docs_params.use_existing_tensors: - existing_docs_dict: Dict[str, dict] = {} - if len(doc_ids) > 0: - existing_docs = _get_marqo_documents_by_ids(config, marqo_index.name, doc_ids, ignore_invalid_ids=True) - for doc in existing_docs: - id = doc["_id"] - if id in existing_docs_dict: - raise errors.InternalError(f"Received duplicate documents for ID {id} from Vespa") - existing_docs_dict[id] = doc - - logger.debug(f"Found {len(existing_docs_dict)} existing docs") - - for i, doc in enumerate(docs): - copied = copy.deepcopy(doc) - document_is_valid = True - doc_id = None - - try: - validation.validate_doc(doc) - - if add_docs_params.mappings and multimodal_sub_fields: - unstructured_index_add_doc_validation.validate_coupling_of_mappings_and_doc( - doc, add_docs_params.mappings, multimodal_sub_fields - ) - - if "_id" in doc: - doc_id = validation.validate_id(doc["_id"]) - del copied["_id"] - else: - doc_id = str(uuid.uuid4()) - - [unstructured_index_add_doc_validation.validate_field_name(field) for field in copied] - - except errors.__InvalidRequestError as err: - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=err.message, - message=err.message, - status=int(err.status_code), - code=err.code) - ) - ) - continue - - processed_tensor_fields: List[str] = [] - embeddings_list: List[str] = [] - - for field in copied: - - is_tensor_field = utils.is_tensor_field(field, add_docs_params.tensor_fields) - - try: - field_content = unstructured_vespa_index.validate_field_content( - field_content=copied[field], - is_tensor_field=is_tensor_field - ) - # Used to validate custom_vector field or any other new dict field type - if isinstance(field_content, dict): - field_content = validation.validate_dict( - field=field, field_content=field_content, - is_non_tensor_field=not is_tensor_field, - mappings=add_docs_params.mappings, index_model_dimensions=index_model_dimensions, - marqo_index_version=marqo_index.parsed_marqo_version()) - except (errors.InvalidArgError, core_exceptions.MarqoDocumentParsingError) as err: - document_is_valid = False - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=err.message, - message=err.message, - status=int(err.status_code), - code=err.code) - ) - ) - break - - # Proceed from here only for tensor fields - if not is_tensor_field: - continue - - # chunks generated by processing this field for this doc: - chunks: List[str] = [] - embeddings: List[List[float]] = [] - - # 4 current options for chunking/vectorisation behavior: - # A) field type is custom_vector -> no chunking or vectorisation - # B) use_existing_tensors=True and field content hasn't changed -> no chunking or vectorisation - # C) field type is standard -> chunking and vectorisation - # D) field type is multimodal -> use vectorise_multimodal_combination_field (does chunking and vectorisation) - # Do step D regardless. It will generate separate chunks for multimodal. - - # A) Calculate custom vector field logic here. It should ignore use_existing_tensors, as this step has no vectorisation. - document_dict_field_type = add_docs.determine_document_dict_field_type(field, field_content, - add_docs_params.mappings) - - if document_dict_field_type == FieldType.CustomVector: - # Generate exactly 1 chunk with the custom vector. - chunks = [f"{field}::{copied[field]['content']}"] - embeddings = [copied[field]["vector"]] - # If normalize_embeddings is true and the index version is > 2.13.0, normalize the embeddings. - # We have added version specific check here to prevent backwards compatibility issues. - if marqo_index.normalize_embeddings and marqo_index.parsed_marqo_version() >= MARQO_CUSTOM_VECTOR_NORMALIZATION_MINIMUM_VERSION: - try: - embeddings = normalize_vector(embeddings) - except core_exceptions.ZeroMagnitudeVectorError as e: - error_message = (f" Zero magnitude vector found while normalizing custom vector field. " - f"Please check `{marqo_docs.api_reference_document_body()}` for more info.") - document_is_valid = False - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=e.message + error_message, - message=e.message + error_message, - status=int(errors.InvalidArgError.status_code), - code=errors.InvalidArgError.code) - ) - ) - break - - # Update parent document (copied) to fit new format. Use content (text) to replace input dict - copied[field] = field_content["content"] - logger.debug(f"Custom vector field {field} added as 1 chunk.") - - # B) Use existing tensors if available and existing content did not change. - elif ( - add_docs_params.use_existing_tensors and - doc_id in existing_docs_dict and - field in existing_docs_dict[doc_id] and - existing_docs_dict[doc_id][field] == field_content - ): - if ( - constants.MARQO_DOC_TENSORS in existing_docs_dict[doc_id] and - field in existing_docs_dict[doc_id][constants.MARQO_DOC_TENSORS] - ): - chunks: List[str] = [f"{field}::{content}" for content in - existing_docs_dict[doc_id][constants.MARQO_DOC_TENSORS][field][ - constants.MARQO_DOC_CHUNKS]] - embeddings: List[List[float]] = [existing_docs_dict[doc_id][constants.MARQO_DOC_TENSORS][field][ - constants.MARQO_DOC_EMBEDDINGS]] - logger.debug(f"Using existing tensors for field {field} for doc {doc_id}") - else: - # Happens if this wasn't a tensor field last time we indexed this doc - logger.debug(f"Found document but not tensors for field {field} for doc {doc_id}. " - f"Is this a new tensor field?") - - # C) field type is standard - if len(chunks) == 0: # Not using existing tensors or didn't find it - modality = infer_modality(field_content) - video_audio_check = modality in [Modality.VIDEO, - Modality.AUDIO] and marqo_index.treat_urls_and_pointers_as_media - - if video_audio_check: - try: - # Check for UnsupportedModalityError in media_repo - if isinstance(media_repo[field_content], s2_inference_errors.S2InferenceError): - raise media_repo[field_content] - - media_chunks = media_repo[field_content] - for chunk_index, media_chunk in enumerate(media_chunks): - chunk_start = media_chunk['start_time'] - chunk_end = media_chunk['end_time'] - chunk_time = [chunk_start, chunk_end] - chunk_id = f"{field}::{chunk_time}" - chunks.append(chunk_id) - - start_time = timer() - with RequestMetricsStore.for_request().time(f"add_documents.create_vectors"): - vector = s2_inference.vectorise( - model_name=marqo_index.model.name, - content=[media_chunk['tensor']], - model_properties=marqo_index.model.get_properties(), - device=add_docs_params.device, - normalize_embeddings=marqo_index.normalize_embeddings, - infer=True, - model_auth=add_docs_params.model_auth, - modality=modality - ) - end_time = timer() - total_vectorise_time += (end_time - start_time) - embeddings.extend(vector) - except s2_inference_errors.S2InferenceError as e: - document_is_valid = False - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=str(e), - message=str(e), - status=int(errors.InvalidArgError.status_code), - code=errors.InvalidArgError.code) - ) - ) - break - - elif isinstance(field_content, str): - # 1. check if urls should be downloaded -> "treat_pointers_and_urls_as_images":True - # 2. check if it is a url or pointer - # 3. If yes in 1 and 2, download blindly (without type) - # 4. Determine media type of downloaded - # 5. load correct media type into memory -> PIL (images), videos (), audio (torchaudio) - # 6. if chunking -> then add the extra chunker - - if not _is_image(field_content): - # text processing pipeline: - split_by = marqo_index.text_preprocessing.split_method.value - split_length = marqo_index.text_preprocessing.split_length - split_overlap = marqo_index.text_preprocessing.split_overlap - content_chunks: List[str] = text_processor.split_text(field_content, split_by=split_by, - split_length=split_length, - split_overlap=split_overlap) - text_chunks = content_chunks - content_chunks = text_processor.prefix_text_chunks(content_chunks, text_chunk_prefix) - else: - # TODO put the logic for getting field parameters into a function and add per field options - image_method = marqo_index.image_preprocessing.patch_method - - # the chunk_image contains the no-op logic as of now - method = None will be a no-op - try: - # in the future, if we have different chunking methods, make sure we catch possible - # errors of different types generated here, too. - if isinstance(field_content, str) and marqo_index.treat_urls_and_pointers_as_images: - if not isinstance(media_repo[field_content], Exception): - image_data = media_repo[field_content] - else: - raise s2_inference_errors.S2InferenceError( - f"Could not process the media file found at `{field_content}`. \n" - f"Reason: {str(media_repo[field_content])}" - ) - else: - image_data = field_content - if image_method is not None: - content_chunks, text_chunks = image_processor.chunk_image( - image_data, device=add_docs_params.device, method=image_method.value) - else: - # if we are not chunking, then we set the chunks as 1-len lists - # content_chunk is the PIL image - # text_chunk refers to URL - content_chunks, text_chunks = [image_data], [field_content] - - except s2_inference_errors.S2InferenceError as e: - document_is_valid = False - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=e.message, - message=e.message, - status=int(errors.InvalidArgError.status_code), - code=errors.InvalidArgError.code) - ) - ) - break - - normalize_embeddings = marqo_index.normalize_embeddings - - try: - # in the future, if we have different underlying vectorising methods, make sure we catch possible - # errors of different types generated here, too. - - # ADD DOCS TIMER-LOGGER (4) - start_time = timer() - with RequestMetricsStore.for_request().time(f"add_documents.create_vectors"): - vector_chunks = s2_inference.vectorise( - model_name=marqo_index.model.name, - model_properties=marqo_index.model.get_properties(), content=content_chunks, - device=add_docs_params.device, normalize_embeddings=normalize_embeddings, - infer=marqo_index.treat_urls_and_pointers_as_images, - model_auth=add_docs_params.model_auth, - modality=modality - ) - - end_time = timer() - total_vectorise_time += (end_time - start_time) - except (s2_inference_errors.UnknownModelError, - s2_inference_errors.InvalidModelPropertiesError, - s2_inference_errors.ModelLoadError, - s2_inference.ModelDownloadError) as model_error: - raise errors.BadRequestError( - message=f'Problem vectorising query. Reason: {str(model_error)}', - link=marqo_docs.list_of_models() - ) - except s2_inference_errors.S2InferenceError as e: - document_is_valid = False - unsuccessful_docs.append( - ( - i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=e.message, - message=e.message, - status=int(errors.InvalidArgError.status_code), - code=errors.InvalidArgError.code - ) - ) - ) - break - - if len(vector_chunks) != len(text_chunks): - raise RuntimeError( - f"the input content after preprocessing and its vectorized counterparts must be the same length." - f"received text_chunks={len(text_chunks)} and vector_chunks={len(vector_chunks)}. " - f"check the preprocessing functions and try again. ") - - chunks: List[str] = [f"{field}::{text_chunk}" for text_chunk in text_chunks] - embeddings: List[List[float]] = vector_chunks - - assert len(chunks) == len(embeddings), "Chunks and embeddings must be the same length" - else: - raise errors.InvalidArgError(f'Invalid type {type(field_content)} for tensor field {field}') - - processed_tensor_fields.extend(chunks) - embeddings_list.extend(embeddings) - - # All the plain tensor/non-tensor fields are processed, now we process the multimodal fields - if document_is_valid and add_docs_params.mappings: - multimodal_mappings: Dict[str, Dict] = utils.extract_multimodal_mappings(add_docs_params.mappings) - - for field_name, multimodal_params in multimodal_mappings.items(): - if not utils.is_tensor_field(field_name, add_docs_params.tensor_fields): - raise errors.InvalidArgError(f"Multimodal field {field_name} must be a tensor field") - - field_content: Dict[str, str] = utils.extract_multimodal_content(copied, multimodal_params) - - combo_chunk: Optional[str] = None - - if ( - add_docs_params.use_existing_tensors and - doc_id in existing_docs_dict - ): - existing_doc = existing_docs_dict[doc_id] - current_field_contents = utils.extract_multimodal_content(existing_doc, multimodal_params) - if ( - field_content == current_field_contents and - unstructured_common.MARQO_DOC_MULTIMODAL_PARAMS in existing_doc and - field_name in existing_doc[unstructured_common.MARQO_DOC_MULTIMODAL_PARAMS] and - existing_doc[unstructured_common.MARQO_DOC_MULTIMODAL_PARAMS][ - field_name] == multimodal_params and - field_name in existing_doc[constants.MARQO_DOC_TENSORS] - ): - combo_chunk = f"{field_name}::{existing_doc[constants.MARQO_DOC_TENSORS][field_name][constants.MARQO_DOC_CHUNKS][0]}" - combo_embeddings = existing_doc[constants.MARQO_DOC_TENSORS][field_name][ - constants.MARQO_DOC_EMBEDDINGS] - - if unstructured_common.MARQO_DOC_MULTIMODAL_PARAMS not in copied: - copied[unstructured_common.MARQO_DOC_MULTIMODAL_PARAMS] = {} - copied[unstructured_common.MARQO_DOC_MULTIMODAL_PARAMS][field_name] = json.dumps( - multimodal_params) - processed_tensor_fields.append(combo_chunk) - embeddings_list.append(combo_embeddings) - - logger.debug( - f"Using existing tensors for multimodal combination field {field_name} for doc {doc_id}" - ) - else: - logger.debug( - f'Not using existing tensors for multimodal combination field {field_name} for ' - f'doc {doc_id} because field content or config has changed') - - # Use_existing tensor does not apply, or we didn't find it, then we vectorise - if combo_chunk is None: - - if field_content: # Check if the subfields are present - (combo_chunk, combo_embeddings, combo_document_is_valid, - unsuccessful_doc_to_append, - combo_vectorise_time_to_add) = vectorise_multimodal_combination_field_unstructured( - field_name, - field_content, i, doc_id, add_docs_params.device, marqo_index, - media_repo, multimodal_params, model_auth=add_docs_params.model_auth, - text_chunk_prefix=text_chunk_prefix, - ) - - total_vectorise_time = total_vectorise_time + combo_vectorise_time_to_add - if combo_document_is_valid is False: - document_is_valid = False - unsuccessful_docs.append(unsuccessful_doc_to_append) - break - else: - - if unstructured_common.MARQO_DOC_MULTIMODAL_PARAMS not in copied: - copied[unstructured_common.MARQO_DOC_MULTIMODAL_PARAMS] = {} - - copied[unstructured_common.MARQO_DOC_MULTIMODAL_PARAMS][field_name] = json.dumps( - multimodal_params) - processed_tensor_fields.append(combo_chunk) - embeddings_list.append(combo_embeddings) - else: - continue - - if document_is_valid: - if processed_tensor_fields: - - # Ensure embeddings_list is flat - flat_embeddings_list = [emb for sublist in embeddings_list for emb in (sublist if isinstance(sublist[0], list) else [sublist])] - - processed_marqo_embeddings = {k: v for k, v in enumerate(flat_embeddings_list)} - - assert len(processed_tensor_fields) == len( - processed_marqo_embeddings), "Chunks and embeddings must be the same length" - copied[constants.MARQO_DOC_CHUNKS] = processed_tensor_fields - copied[constants.MARQO_DOC_EMBEDDINGS] = processed_marqo_embeddings - copied[constants.MARQO_DOC_ID] = doc_id - bulk_parent_dicts.append(copied) - - total_preproc_time = 0.001 * RequestMetricsStore.for_request().stop( - "add_documents.processing_before_vespa") - logger.debug( - f" add_documents pre-processing: took {(total_preproc_time):.3f}s total for {batch_size} docs, " - f"for an average of {(total_preproc_time / batch_size):.3f}s per doc.") - - logger.debug(f" add_documents vectorise: took {(total_vectorise_time):.3f}s for {batch_size} docs, " - f"for an average of {(total_vectorise_time / batch_size):.3f}s per doc.") - - if bulk_parent_dicts: - vespa_docs = [ - VespaDocument(**unstructured_vespa_index.to_vespa_document(marqo_document=doc)) - for doc in bulk_parent_dicts - ] - # ADD DOCS TIMER-LOGGER (5) - start_time_5 = timer() - with RequestMetricsStore.for_request().time("add_documents.vespa._bulk"): - index_responses = vespa_client.feed_batch(vespa_docs, marqo_index.schema_name) - - end_time_5 = timer() - total_http_time = end_time_5 - start_time_5 - logger.debug( - f" add_documents roundtrip: took {(total_http_time):.3f}s to send {batch_size} " - f"docs (roundtrip) to vector store, " - f"for an average of {(total_http_time / batch_size):.3f}s per doc.") - else: - index_responses = None - - with RequestMetricsStore.for_request().time("add_documents.postprocess"): - t1 = timer() - - marqo_add_documents_response = config.document.translate_add_documents_response( - index_responses, index_name=add_docs_params.index_name, unsuccessful_docs=unsuccessful_docs, - add_docs_processing_time_ms=1000 * (t1 - t0) - ) - return marqo_add_documents_response - - -def _add_documents_structured(config: Config, add_docs_params: AddDocsParams, marqo_index: StructuredMarqoIndex) \ - -> MarqoAddDocumentsResponse: - # ADD DOCS TIMER-LOGGER (3) - vespa_client = config.vespa_client - vespa_index = StructuredVespaIndex(marqo_index) - index_model_dimensions = marqo_index.model.get_dimension() - - RequestMetricsStore.for_request().start("add_documents.processing_before_vespa") - - if add_docs_params.tensor_fields is not None: - raise api_exceptions.InvalidArgError("Cannot specify 'tensorFields' when adding documents to a " - "structured index. 'tensorFields' must be defined in structured " - "index schema at index creation time") - - if add_docs_params.mappings is not None: - validation.validate_mappings_object( - add_docs_params.mappings, - marqo_index - ) - t0 = timer() - bulk_parent_dicts: List[VespaDocument] = [] - - if len(add_docs_params.docs) == 0: - raise api_exceptions.BadRequestError(message="Received empty add documents request") - - unsuccessful_docs: List[Tuple[int, MarqoAddDocumentsItem]] = [] - total_vectorise_time = 0 - batch_size = len(add_docs_params.docs) # use length before deduplication - media_repo = {} - - text_chunk_prefix = marqo_index.model.get_text_chunk_prefix(add_docs_params.text_chunk_prefix) - - # Deduplicate docs, keep the latest - docs, doc_ids = config.document.remove_duplicated_documents(add_docs_params.docs) - - # Check if model is Video/Audio. If so, manually set thread_count to 5 - media_download_thread_count = _determine_thread_count(marqo_index, add_docs_params) - - with ExitStack() as exit_stack: - media_fields = [ - field.name for field in - marqo_index.field_map_by_type[FieldType.ImagePointer] + - marqo_index.field_map_by_type[FieldType.VideoPointer] + - marqo_index.field_map_by_type[FieldType.AudioPointer] - ] - - media_field_types_mapping = {field.name: field.type for field in - marqo_index.field_map_by_type[FieldType.ImagePointer] + - marqo_index.field_map_by_type[FieldType.VideoPointer] + - marqo_index.field_map_by_type[FieldType.AudioPointer] - } - - if media_fields: - with RequestMetricsStore.for_request().time( - "image_download.full_time", - lambda t: logger.debug( - f"add_documents image download: took {t:.3f}ms to concurrently download " - f"images for {batch_size} docs using {media_download_thread_count} threads" - ) - ): - - if '_id' in media_fields: - raise api_exceptions.BadRequestError(message="`_id` field cannot be an image pointer field.") - - media_repo = exit_stack.enter_context( - add_docs.download_and_preprocess_content( - docs=docs, - thread_count=media_download_thread_count, - tensor_fields=media_fields, - media_download_headers=add_docs_params.media_download_headers, - # add non image download headers in the future - model_name=marqo_index.model.name, - normalize_embeddings=marqo_index.normalize_embeddings, - media_field_types_mapping=media_field_types_mapping, - model_properties=marqo_index.model.get_properties(), - device=add_docs_params.device, - model_auth=add_docs_params.model_auth, - patch_method_exists=marqo_index.image_preprocessing.patch_method is not None, - marqo_index_type=marqo_index.type, - marqo_index_model=marqo_index.model, - audio_preprocessing=marqo_index.audio_preprocessing, - video_preprocessing=marqo_index.video_preprocessing, - force_download=True - ) - ) - - if add_docs_params.use_existing_tensors: - existing_docs_dict: Dict[str, dict] = {} - if len(doc_ids) > 0: - existing_docs = _get_marqo_documents_by_ids(config, marqo_index.name, doc_ids, ignore_invalid_ids=True) - for doc in existing_docs: - if not isinstance(doc, dict): - continue - - id = doc["_id"] - if id in existing_docs_dict: - raise api_exceptions.InternalError(f"Received duplicate documents for ID {id} from Vespa") - existing_docs_dict[id] = doc - - logger.debug(f"Found {len(existing_docs_dict)} existing docs") - - for i, doc in enumerate(docs): - copied = copy.deepcopy(doc) - - document_is_valid = True - - doc_id = None - try: - validation.validate_doc(doc) - - if "_id" in doc: - doc_id = validation.validate_id(doc["_id"]) - del copied["_id"] - else: - doc_id = str(uuid.uuid4()) - - [validation.validate_field_name(field) for field in copied] - except api_exceptions.__InvalidRequestError as err: - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=err.message, - message=err.message, - status=int(err.status_code), - code=err.code) - ) - ) - continue - - processed_tensor_fields = {} - for field in copied: - marqo_field = marqo_index.field_map.get(field) - tensor_field = marqo_index.tensor_field_map.get(field) - is_tensor_field = tensor_field is not None - if not marqo_field: - message = (f"Field {field} is not a valid field for structured index {add_docs_params.index_name}. " - f"Valid fields are: {', '.join(marqo_index.field_map.keys())}") - document_is_valid = False - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=message, - message=message, - status=int(api_exceptions.InvalidArgError.status_code), - code=api_exceptions.InvalidArgError.code) - ) - ) - break - if marqo_field.type == FieldType.MultimodalCombination: - message = f"Field {field} is a multimodal combination field and cannot be assigned a value." - document_is_valid = False - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=message, - message=message, - status=int(api_exceptions.InvalidArgError.status_code), - code=api_exceptions.InvalidArgError.code) - ) - ) - break - - try: - field_content = validation.validate_field_content( - field_content=copied[field], - is_non_tensor_field=not is_tensor_field - ) - # Used to validate custom_vector field or any other new dict field type - if isinstance(field_content, dict): - field_content = validation.validate_dict( - field=field, field_content=field_content, - is_non_tensor_field=not is_tensor_field, - mappings=add_docs_params.mappings, index_model_dimensions=index_model_dimensions, - structured_field_type=marqo_field.type, - marqo_index_version=marqo_index.parsed_marqo_version()) - except api_exceptions.InvalidArgError as err: - document_is_valid = False - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=err.message, - message=err.message, - status=int(err.status_code), - code=err.code) - ) - ) - break - - # Proceed from here only for tensor fields - if not tensor_field: - continue - - # chunks generated by processing this field for this doc: - chunks = [] - embeddings = [] - - # 4 current options for chunking/vectorisation behavior: - # A) field type is custom_vector -> no chunking or vectorisation - # B) use_existing_tensors=True and field content hasn't changed -> no chunking or vectorisation - # C) field type is standard -> chunking and vectorisation - # D) field type is multimodal -> use vectorise_multimodal_combination_field (does chunking and vectorisation) - - # A) Calculate custom vector field logic here. It should ignore use_existing_tensors, as this step has no vectorisation. - if marqo_field.type == FieldType.CustomVector: - # Generate exactly 1 chunk with the custom vector. - chunks = [copied[field]['content']] - embeddings = [copied[field]["vector"]] - - # If normalize_embeddings is true and the index version is > 2.13.0, normalize the embeddings. - # We have added version specific check here to prevent backwards compatibility issues. - if marqo_index.normalize_embeddings and marqo_index.parsed_marqo_version() >= MARQO_CUSTOM_VECTOR_NORMALIZATION_MINIMUM_VERSION: - try: - embeddings = normalize_vector(embeddings) - except core_exceptions.ZeroMagnitudeVectorError as e: - document_is_valid = False - error_message = (f" Zero magnitude vector found while normalizing custom vector field. " - f"Please check `{marqo_docs.api_reference_document_body()}` for more info.") - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=e.message + error_message, - message=e.message + error_message, - status=int(errors.InvalidArgError.status_code), - code=errors.InvalidArgError.code) - ) - ) - break - - # Update parent document (copied) to fit new format. Use content (text) to replace input dict - copied[field] = field_content["content"] - logger.debug(f"Custom vector field {field} added as 1 chunk.") - - # B) Use existing tensors if available and existing content did not change. - elif ( - add_docs_params.use_existing_tensors and - doc_id in existing_docs_dict and - field in existing_docs_dict[doc_id] and - existing_docs_dict[doc_id][field] == field_content - ): - if ( - constants.MARQO_DOC_TENSORS in existing_docs_dict[doc_id] and - field in existing_docs_dict[doc_id][constants.MARQO_DOC_TENSORS] - ): - chunks = existing_docs_dict[doc_id][constants.MARQO_DOC_TENSORS][field][ - constants.MARQO_DOC_CHUNKS] - embeddings = existing_docs_dict[doc_id][constants.MARQO_DOC_TENSORS][field][ - constants.MARQO_DOC_EMBEDDINGS] - logger.debug(f"Using existing tensors for field {field} for doc {doc_id}") - else: - # Happens if this wasn't a tensor field last time we indexed this doc - logger.debug(f"Found document but not tensors for field {field} for doc {doc_id}. " - f"Is this a new tensor field?") - - if len(chunks) == 0: # Not using existing tensors or didn't find it - if marqo_field.type in [FieldType.VideoPointer, FieldType.AudioPointer]: - try: - media_chunks = media_repo[field_content] - - if isinstance(media_repo[field_content], s2_inference_errors.S2InferenceError): - raise media_repo[field_content] - for chunk_index, media_chunk in enumerate(media_chunks): - chunk_start = media_chunk['start_time'] - chunk_end = media_chunk['end_time'] - chunk_time = [chunk_start, chunk_end] - chunk_id = f"{chunk_time}" - chunks.append(chunk_id) - - start_time = timer() - with RequestMetricsStore.for_request().time(f"add_documents.create_vectors"): - vector = s2_inference.vectorise( - model_name=marqo_index.model.name, - content=[media_chunk['tensor']], # Wrap in list as vectorise expects an iterable - model_properties=marqo_index.model.get_properties(), - device=add_docs_params.device, - normalize_embeddings=marqo_index.normalize_embeddings, - infer=True, - model_auth=add_docs_params.model_auth, - modality=Modality.VIDEO if marqo_field.type == FieldType.VideoPointer else Modality.AUDIO - ) - - end_time = timer() - total_vectorise_time += (end_time - start_time) - embeddings.extend(vector) # vectorise returns a list of vectors - - except s2_inference_errors.S2InferenceError as e: - document_is_valid = False - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=str(e), - message=str(e), - status=int(api_exceptions.InvalidArgError.status_code), - code=api_exceptions.InvalidArgError.code - )) - ) - continue - elif isinstance(field_content, str): - # C) Handle standard fields (text and images) - - # TODO: better/consistent handling of a no-op for processing (but still vectorize) - - # 1. check if urls should be downloaded -> "treat_pointers_and_urls_as_images":True - # 2. check if it is a url or pointer - # 3. If yes in 1 and 2, download blindly (without type) - # 4. Determine media type of downloaded - # 5. load correct media type into memory -> PIL (images), videos (), audio (torchaudio) - # 6. if chunking -> then add the extra chunker - - if not marqo_field.type == FieldType.ImagePointer: - # text processing pipeline: - modality = Modality.TEXT - split_by = marqo_index.text_preprocessing.split_method.value - split_length = marqo_index.text_preprocessing.split_length - split_overlap = marqo_index.text_preprocessing.split_overlap - content_chunks = text_processor.split_text(field_content, split_by=split_by, - split_length=split_length, - split_overlap=split_overlap) - text_chunks = content_chunks - content_chunks = text_processor.prefix_text_chunks(content_chunks, text_chunk_prefix) - else: - modality = Modality.IMAGE - # TODO put the logic for getting field parameters into a function and add per field options - image_method = marqo_index.image_preprocessing.patch_method - - # the chunk_image contains the no-op logic as of now - method = None will be a no-op - try: - # in the future, if we have different chunking methods, make sure we catch possible - # errors of different types generated here, too. - if isinstance(field_content, str) and field in media_fields: - if not isinstance(media_repo[field_content], Exception): - image_data = media_repo[field_content] - else: - raise s2_inference_errors.S2InferenceError( - f"Could not process the media file found at `{field_content}`. \n" - f"Reason: {str(media_repo[field_content])}" - ) - else: - image_data = field_content - if image_method is not None: - content_chunks, text_chunks = image_processor.chunk_image( - image_data, device=add_docs_params.device, method=image_method.value) - else: - # if we are not chunking, then we set the chunks as 1-len lists - # content_chunk is the PIL image - # text_chunk refers to URL - content_chunks, text_chunks = [image_data], [field_content] - except s2_inference_errors.S2InferenceError as e: - document_is_valid = False - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=e.message, - message=e.message, - status=int(errors.InvalidArgError.status_code), - code=errors.InvalidArgError.code) - ) - ) - - break - - normalize_embeddings = marqo_index.normalize_embeddings - - try: - # in the future, if we have different underlying vectorising methods, make sure we catch possible - # errors of different types generated here, too. - - # ADD DOCS TIMER-LOGGER (4) - start_time = timer() - with RequestMetricsStore.for_request().time(f"add_documents.create_vectors"): - vector_chunks = s2_inference.vectorise( - model_name=marqo_index.model.name, - model_properties=marqo_index.model.get_properties(), content=content_chunks, - device=add_docs_params.device, normalize_embeddings=normalize_embeddings, - infer=marqo_field.type == FieldType.ImagePointer, - model_auth=add_docs_params.model_auth, - modality=modality - ) - - end_time = timer() - total_vectorise_time += (end_time - start_time) - except (s2_inference_errors.UnknownModelError, - s2_inference_errors.InvalidModelPropertiesError, - s2_inference_errors.ModelLoadError, - s2_inference.ModelDownloadError) as model_error: - raise api_exceptions.BadRequestError( - message=f'Problem vectorising query. Reason: {str(model_error)}', - link=marqo_docs.list_of_models() - ) - except s2_inference_errors.S2InferenceError as e: - document_is_valid = False - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=e.message, - message=e.message, - status=int(errors.InvalidArgError.status_code), - code=errors.InvalidArgError.code) - ) - ) - break - - if len(vector_chunks) != len(text_chunks): - raise RuntimeError( - f"the input content after preprocessing and its vectorized counterparts must be the same length." - f"received text_chunks={len(text_chunks)} and vector_chunks={len(vector_chunks)}. " - f"check the preprocessing functions and try again. ") - - chunks = text_chunks - embeddings = vector_chunks - - else: - document_is_valid = False - e = api_exceptions.InvalidArgError( - f'Invalid type {type(field_content)} for tensor field {field}') - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=e.message, - message=e.message, - status=int(api_exceptions.InvalidArgError.status_code), - code=api_exceptions.InvalidArgError.code) - ) - ) - break - - # Add chunks_to_append along with doc metadata to total chunks - processed_tensor_fields[tensor_field.name] = {} - processed_tensor_fields[tensor_field.name][constants.MARQO_DOC_CHUNKS] = chunks - processed_tensor_fields[tensor_field.name][constants.MARQO_DOC_EMBEDDINGS] = embeddings - - # Multimodal fields haven't been processed yet, so we do that here - if document_is_valid: # No need to process multimodal fields if the document is invalid - for tensor_field in marqo_index.tensor_fields: - - marqo_field = marqo_index.field_map[tensor_field.name] - if marqo_field.type == FieldType.MultimodalCombination: - field_name = tensor_field.name - field_content = { - dependent_field: copied[dependent_field] - for dependent_field in marqo_field.dependent_fields if dependent_field in copied - } - if not field_content: - # None of the fields are present in the document, so we skip this multimodal field - continue - - if ( - add_docs_params.mappings is not None and - field_name in add_docs_params.mappings and - add_docs_params.mappings[field_name]["type"] == FieldType.MultimodalCombination - ): - mappings = add_docs_params.mappings[field_name] - # Record custom weights in the document - copied[field_name] = mappings['weights'] - logger.debug(f'Using custom weights for multimodal combination field {field_name}') - else: - mappings = { - 'weights': marqo_field.dependent_fields - } - logger.debug(f'Using default weights for multimodal combination field {field_name}: ' - f'{marqo_field.dependent_fields}') - - chunks = [] - embeddings = [] - - if ( - add_docs_params.use_existing_tensors and - doc_id in existing_docs_dict - ): - existing_doc = existing_docs_dict[doc_id] - current_field_contents = { - dependent_field: existing_doc.get(dependent_field) - for dependent_field in marqo_field.dependent_fields if dependent_field in copied - } - current_weights = existing_doc.get(field_name) or marqo_field.dependent_fields - if ( - field_content == current_field_contents and - current_weights == mappings['weights'] and - field_name in existing_doc[constants.MARQO_DOC_TENSORS] - ): - chunks = existing_doc[constants.MARQO_DOC_TENSORS][field_name][ - constants.MARQO_DOC_CHUNKS] - embeddings = existing_doc[constants.MARQO_DOC_TENSORS][field_name][ - constants.MARQO_DOC_EMBEDDINGS] - logger.debug( - f"Using existing tensors for multimodal combination field {field_name} for doc {doc_id}" - ) - else: - logger.debug( - f'Not using existing tensors for multimodal combination field {field_name} for ' - f'doc {doc_id} because field content or config has changed') - - if len(chunks) == 0: # Not using existing tensors or didn't find it - (combo_chunk, combo_document_is_valid, - unsuccessful_doc_to_append, - combo_vectorise_time_to_add) = vectorise_multimodal_combination_field_structured( - field_name, field_content, copied, i, doc_id, add_docs_params.device, marqo_index, - media_repo, mappings, model_auth=add_docs_params.model_auth, - text_chunk_prefix=text_chunk_prefix, - ) - - total_vectorise_time = total_vectorise_time + combo_vectorise_time_to_add - - if combo_document_is_valid is False: - document_is_valid = False - unsuccessful_docs.append(unsuccessful_doc_to_append) - break - else: - chunks = [combo_chunk[TensorField.field_content]] - embeddings = [combo_chunk[TensorField.marqo_knn_field]] - - processed_tensor_fields[tensor_field.name] = {} - processed_tensor_fields[tensor_field.name][constants.MARQO_DOC_CHUNKS] = chunks - processed_tensor_fields[tensor_field.name][constants.MARQO_DOC_EMBEDDINGS] = embeddings - - if document_is_valid: - if processed_tensor_fields: - copied[constants.MARQO_DOC_TENSORS] = processed_tensor_fields - copied[constants.MARQO_DOC_ID] = doc_id - - try: - converted_doc = VespaDocument(**vespa_index.to_vespa_document(copied)) - bulk_parent_dicts.append(converted_doc) - except core_exceptions.MarqoDocumentParsingError as e: - document_is_valid = False - unsuccessful_docs.append( - (i, MarqoAddDocumentsItem( - id=doc_id if doc_id is not None else '', - error=e.message, - message=e.message, - status=int(api_exceptions.InvalidArgError.status_code), - code=api_exceptions.InvalidArgError.code) - ) - ) - - total_preproc_time = 0.001 * RequestMetricsStore.for_request().stop( - "add_documents.processing_before_vespa") - logger.debug( - f" add_documents pre-processing: took {(total_preproc_time):.3f}s total for {batch_size} docs, " - f"for an average of {(total_preproc_time / batch_size):.3f}s per doc.") - - logger.debug(f" add_documents vectorise: took {(total_vectorise_time):.3f}s for {batch_size} docs, " - f"for an average of {(total_vectorise_time / batch_size):.3f}s per doc.") - - if bulk_parent_dicts: - # ADD DOCS TIMER-LOGGER (5) - start_time_5 = timer() - with RequestMetricsStore.for_request().time("add_documents.vespa._bulk"): - index_responses = vespa_client.feed_batch(bulk_parent_dicts, marqo_index.schema_name) - - end_time_5 = timer() - total_http_time = end_time_5 - start_time_5 - - logger.debug( - f" add_documents roundtrip: took {(total_http_time):.3f}s to send {batch_size} docs (roundtrip) to Marqo-os, " - f"for an average of {(total_http_time / batch_size):.3f}s per doc.") - else: - index_responses = None - - with RequestMetricsStore.for_request().time("add_documents.postprocess"): - t1 = timer() - - marqo_add_documents_response = config.document.translate_add_documents_response( - index_responses, index_name=add_docs_params.index_name, unsuccessful_docs=unsuccessful_docs, - add_docs_processing_time_ms=(t1 - t0) * 1000 - ) - return marqo_add_documents_response - - -def _determine_thread_count(marqo_index, add_docs_params): - model_properties = marqo_index.model.get_properties() - is_languagebind_model = model_properties.get('type') == 'languagebind' - - default_image_thread_count = 20 - default_media_thread_count = 5 - - - # Check if media_download_thread_count is set in params - if add_docs_params.media_download_thread_count is not None and add_docs_params.media_download_thread_count != default_media_thread_count: - return add_docs_params.media_download_thread_count - - env_media_thread_count = os.environ.get(EnvVars.MARQO_MEDIA_DOWNLOAD_THREAD_COUNT_PER_REQUEST) - if env_media_thread_count is not None and int(env_media_thread_count) != default_media_thread_count: - return int(env_media_thread_count) - - # If it's a LanguageBind model and no explicit setting, use 5 - if is_languagebind_model: - return 5 - - # Check if image_download_thread_count is explicitly set in params - if add_docs_params.image_download_thread_count is not None and add_docs_params.image_download_thread_count != default_image_thread_count: - return add_docs_params.image_download_thread_count - - # Check if environment variable is explicitly set - env_image_thread_count = os.environ.get(EnvVars.MARQO_IMAGE_DOWNLOAD_THREAD_COUNT_PER_REQUEST) - if env_image_thread_count is not None and int(env_image_thread_count) != default_image_thread_count: - return int(env_image_thread_count) - - # Default case - return default_image_thread_count - - def _get_marqo_document_by_id(config: Config, index_name: str, document_id: str): marqo_index = _get_latest_index(config, index_name) @@ -2273,449 +1111,6 @@ def get_cpu_info() -> dict: } -def vectorise_multimodal_combination_field_unstructured(field: str, - field_content: Dict[str, str], doc_index: int, - doc_id: str, device: str, marqo_index: UnstructuredMarqoIndex, - media_repo, field_map: dict, - model_auth: Optional[ModelAuth] = None, - text_chunk_prefix: str = None, - modality=None): - ''' - This function is used to vectorise multimodal combination field. - Over all this is a simplified version of the vectorise pipeline in add_documents. Specifically, - 1. we don't do any chunking here. - 2. we don't use image repo for concurrent downloading. - Args: - field_name: the name of the multimodal - field_content: the subfields name and content, e.g., - {"subfield_one" : "content-1", - "subfield_two" : "content-2"}, - unsuccessful_docs: a list to store all the unsuccessful documents - total_vectorise_time: total vectorise time in the main body - doc_index: the index of the document. This is an interator variable `i` in the main body to iterator throught the docs - doc_id: the document id - device: device from main body - index_info: index_info from main body, - model_auth: Model download authorisation information (if required) - Returns: - combo_chunk: the combo_chunk to be appended to the main body - combo_document_is_valid: if the document is a valid - unsuccessful_docs: appended unsucessful_docs - combo_total_vectorise_time: the vectorise time spent in combo field - - ''' - - combo_document_is_valid = True - combo_vectorise_time_to_add = 0 - combo_chunk = {} - combo_embeddings = [] - unsuccessful_doc_to_append = tuple() - - # Lists to store the field name and field content to vectorise. - text_field_names = [] - text_content_to_vectorise = [] - image_field_names = [] - image_content_to_vectorise = [] - video_field_names = [] - video_content_to_vectorise = [] - audio_field_names = [] - audio_content_to_vectorise = [] - - normalize_embeddings = marqo_index.normalize_embeddings - infer_if_image = marqo_index.treat_urls_and_pointers_as_images - infer_if_media = marqo_index.treat_urls_and_pointers_as_media - - if not infer_if_image and not infer_if_media: - text_field_names = list(field_content.keys()) - text_content_to_vectorise = list(field_content.values()) - else: - for sub_field_name, sub_content in field_content.items(): - modality = infer_modality(sub_content) - - if isinstance(sub_content, str) and modality == Modality.TEXT: - text_field_names.append(sub_field_name) - text_content_to_vectorise.append(sub_content) - else: - try: - if isinstance(sub_content, str): - if not isinstance(media_repo[sub_content], Exception): - media_data = media_repo[sub_content] - else: - raise s2_inference_errors.S2InferenceError( - f"Could not find media content at `{sub_content}`. \n" - f"Reason: {str(media_repo[sub_content])}" - ) - else: - media_data = sub_content - - if modality == Modality.IMAGE: - image_content_to_vectorise.append(media_data) - image_field_names.append(sub_field_name) - elif modality == Modality.VIDEO: - video_content_to_vectorise.append([media_data[i]['tensor'] for i in range(len(media_data))]) - video_field_names.append(sub_field_name) - elif modality == Modality.AUDIO: - audio_content_to_vectorise.append([media_data[i]['tensor'] for i in range(len(media_data))]) - audio_field_names.append(sub_field_name) - - except s2_inference_errors.S2InferenceError as e: - combo_document_is_valid = False - unsuccessful_doc_to_append = ( - doc_index, MarqoAddDocumentsItem( - id=doc_id, - error=e.message, - message=e.message, - status=int(errors.InvalidArgError.status_code), - code=errors.InvalidArgError.code - ) - ) - return (combo_chunk, combo_embeddings, combo_document_is_valid, unsuccessful_doc_to_append, - combo_vectorise_time_to_add) - - try: - start_time = timer() - vectors_list = [] - sub_field_name_list = [] - - if len(text_content_to_vectorise) > 0: - with RequestMetricsStore.for_request().time(f"create_vectors"): - prefixed_text_content_to_vectorise = text_processor.prefix_text_chunks(text_content_to_vectorise, - text_chunk_prefix) - text_vectors = s2_inference.vectorise( - model_name=marqo_index.model.name, - model_properties=marqo_index.model.properties, content=prefixed_text_content_to_vectorise, - device=device, normalize_embeddings=normalize_embeddings, - infer=True, model_auth=model_auth, modality=Modality.TEXT - ) - - vectors_list.extend(text_vectors) - sub_field_name_list.extend(text_field_names) - - if len(image_content_to_vectorise) > 0: - with RequestMetricsStore.for_request().time(f"create_vectors"): - image_vectors = s2_inference.vectorise( - model_name=marqo_index.model.name, - model_properties=marqo_index.model.properties, content=image_content_to_vectorise, - device=device, normalize_embeddings=normalize_embeddings, - infer=True, model_auth=model_auth, modality=Modality.IMAGE - ) - vectors_list.extend(image_vectors) - sub_field_name_list.extend(image_field_names) - - if len(video_content_to_vectorise) > 0: - with RequestMetricsStore.for_request().time(f"create_vectors"): - for video_chunks_list in video_content_to_vectorise: - video_vectors = [] - for video_chunk in video_chunks_list: - video_vector = s2_inference.vectorise( - model_name=marqo_index.model.name, - model_properties=marqo_index.model.properties, content=[video_chunk], - device=device, normalize_embeddings=normalize_embeddings, - infer=True, model_auth=model_auth, modality=Modality.VIDEO - ) - video_vectors.append(video_vector) - # Average the vectors for this video field - avg_video_vector = np.mean(video_vectors, axis=0).tolist() - vectors_list.append(avg_video_vector) - sub_field_name_list.extend(video_field_names) - - if len(audio_content_to_vectorise) > 0: - with RequestMetricsStore.for_request().time(f"create_vectors"): - for audio_chunks_list in audio_content_to_vectorise: - audio_vectors = [] - for audio_chunk in audio_chunks_list: - audio_vector = s2_inference.vectorise( - model_name=marqo_index.model.name, - model_properties=marqo_index.model.properties, content=[audio_chunk], - device=device, normalize_embeddings=normalize_embeddings, - infer=True, model_auth=model_auth, modality=Modality.AUDIO - ) - audio_vectors.extend(audio_vector) - # Average the vectors for this audio field - avg_audio_vector = np.mean(audio_vectors, axis=0).tolist() - vectors_list.append(avg_audio_vector) - sub_field_name_list.extend(audio_field_names) - - end_time = timer() - combo_vectorise_time_to_add += (end_time - start_time) - except (s2_inference_errors.UnknownModelError, - s2_inference_errors.InvalidModelPropertiesError, - s2_inference_errors.ModelLoadError) as model_error: - raise errors.BadRequestError( - message=f'Problem vectorising query. Reason: {str(model_error)}', - link=marqo_docs.list_of_models() - ) - except s2_inference_errors.S2InferenceError as e: - combo_document_is_valid = False - unsuccessful_doc_to_append = \ - (doc_index, MarqoAddDocumentsItem( - id=doc_id, - error=e.message, - message=e.message, - status=int(errors.InvalidArgError.status_code), - code=errors.InvalidArgError.code - ) - ) - return combo_chunk, combo_embeddings, combo_document_is_valid, unsuccessful_doc_to_append, combo_vectorise_time_to_add - - if not len(sub_field_name_list) == len(vectors_list): - raise errors.BatchInferenceSizeError( - message=f"Batch inference size does not match content for multimodal field {field}") - - vector_chunk = np.squeeze(np.mean( - [np.array(vector) * field_map["weights"][sub_field_name] for sub_field_name, vector in - zip(sub_field_name_list, vectors_list)], axis=0)) - - if normalize_embeddings is True: - vector_chunk = vector_chunk / np.linalg.norm(vector_chunk) - - combo_embeddings: List[float] = vector_chunk.tolist() - combo_chunk: str = f"{field}::{json.dumps(field_content)}" - - return combo_chunk, combo_embeddings, combo_document_is_valid, unsuccessful_doc_to_append, combo_vectorise_time_to_add - - -def vectorise_multimodal_combination_field_structured( - field: str, multimodal_object: Dict[str, dict], doc: dict, doc_index: int, - doc_id: str, device: str, marqo_index: StructuredMarqoIndex, media_repo, field_map: dict, - model_auth: Optional[ModelAuth] = None, - text_chunk_prefix: str = None -): - """ - This function is used to vectorise multimodal combination field. The field content should - have the following structure: - field_content = {"tensor_field_one" : {"weight":0.5, "parameter": "test-parameter-1"}, - "tensor_field_two" : {"weight": 0.5, parameter": "test-parameter-2"}}, - Over all this is a simplified version of the vectorise pipeline in add_documents. Specifically, - 1. we don't do any chunking here. - 2. we don't use image repo for concurrent downloading. - Args: - field_content: the field content that is a dictionary - copied: the copied document - unsuccessful_docs: a list to store all the unsuccessful documents - total_vectorise_time: total vectorise time in the main body - doc_index: the index of the document. This is an interator variable `i` in the main body to iterator throught the docs - doc_id: the document id - device: device from main body - marqo_index: index_info from main body, - model_auth: Model download authorisation information (if required) - Returns: - combo_chunk: the combo_chunk to be appended to the main body - combo_document_is_valid: if the document is a valid - unsuccessful_docs: appended unsucessful_docs - combo_total_vectorise_time: the vectorise time spent in combo field - - """ - # field_content = {"tensor_field_one" : {"weight":0.5, "parameter": "test-paramater-1"}, - # "tensor_field_two" : {"weight": 0.5, parameter": "test-parameter-2"}}, - combo_document_is_valid = True - combo_vectorise_time_to_add = 0 - combo_chunk = {} - unsuccessful_doc_to_append = tuple() - - # 4 lists to store the field name and field content to vectorise. - text_field_names, image_field_names, video_field_names, audio_field_names = [], [], [], [] - text_content_to_vectorise, image_content_to_vectorise, video_content_to_vectorise, audio_content_to_vectorise = [], [], [], [] - - normalize_embeddings = marqo_index.normalize_embeddings - image_fields = [field.name for field in marqo_index.field_map_by_type[FieldType.ImagePointer]] - video_fields = [field.name for field in marqo_index.field_map_by_type[FieldType.VideoPointer]] - audio_fields = [field.name for field in marqo_index.field_map_by_type[FieldType.AudioPointer]] - - for sub_field_name, sub_content in multimodal_object.items(): - if isinstance(sub_content, str) and sub_field_name not in image_fields + video_fields + audio_fields: - text_field_names.append(sub_field_name) - text_content_to_vectorise.append(sub_content) - else: - try: - if isinstance(sub_content, str): - if sub_field_name in image_fields: - if not isinstance(media_repo[sub_content], Exception): - image_data = media_repo[sub_content] - else: - raise s2_inference_errors.S2InferenceError( - f"Could not process image at `{sub_content}`. \n" - f"Reason: {str(media_repo[sub_content])}" - ) - image_content_to_vectorise.append(image_data) - image_field_names.append(sub_field_name) - elif sub_field_name in video_fields: - if not isinstance(media_repo[sub_content], Exception): - video_data = [media_repo[sub_content][i]['tensor'] for i in - range(len(media_repo[sub_content]))] - else: - raise s2_inference_errors.S2InferenceError( - f"Could not process video at `{sub_content}`. \n" - f"Reason: {str(media_repo[sub_content])}" - ) - video_content_to_vectorise.append(video_data) - video_field_names.append(sub_field_name) - elif sub_field_name in audio_fields: - if not isinstance(media_repo[sub_content], Exception): - audio_data = [media_repo[sub_content][i]['tensor'] for i in - range(len(media_repo[sub_content]))] - else: - raise s2_inference_errors.S2InferenceError( - f"Could not process audio at `{sub_content}`. \n" - f"Reason: {str(media_repo[sub_content])}" - ) - audio_content_to_vectorise.append(audio_data) - audio_field_names.append(sub_field_name) - else: - raise s2_inference_errors.S2InferenceError( - f"Unsupported field type for `{sub_field_name}`" - ) - else: - # Assume it's already processed data - if sub_field_name in image_fields: - image_content_to_vectorise.append(sub_content) - image_field_names.append(sub_field_name) - elif sub_field_name in video_fields: - video_content_to_vectorise.append([sub_content[i]['tensor'] for i in range(len(sub_content))]) - video_field_names.append(sub_field_name) - elif sub_field_name in audio_fields: - audio_content_to_vectorise.append([sub_content[i]['tensor'] for i in range(len(sub_content))]) - audio_field_names.append(sub_field_name) - else: - raise s2_inference_errors.S2InferenceError( - f"Unsupported field type for `{sub_field_name}`" - ) - except s2_inference_errors.S2InferenceError as e: - combo_document_is_valid = False - unsuccessful_doc_to_append = ( - doc_index, - MarqoAddDocumentsItem( - id=doc_id, - error=e.message, - message=e.message, - status=int(errors.InvalidArgError.status_code), - code=api_exceptions.InvalidArgError.code - ) - ) - return combo_chunk, combo_document_is_valid, unsuccessful_doc_to_append, combo_vectorise_time_to_add - - try: - start_time = timer() - vectors_list = [] - sub_field_name_list = [] - - # Process text content - if text_content_to_vectorise: - with RequestMetricsStore.for_request().time("create_vectors"): - prefixed_text_content = text_processor.prefix_text_chunks(text_content_to_vectorise, text_chunk_prefix) - text_vectors = s2_inference.vectorise( - model_name=marqo_index.model.name, - model_properties=marqo_index.model.get_properties(), - content=prefixed_text_content, - device=device, - normalize_embeddings=normalize_embeddings, - infer=False, - model_auth=model_auth, - modality=Modality.TEXT - ) - vectors_list.extend(text_vectors) - sub_field_name_list.extend(text_field_names) - - # Process image content - if image_content_to_vectorise: - with RequestMetricsStore.for_request().time("create_vectors"): - image_vectors = s2_inference.vectorise( - model_name=marqo_index.model.name, - model_properties=marqo_index.model.get_properties(), - content=image_content_to_vectorise, - device=device, - normalize_embeddings=normalize_embeddings, - infer=True, - model_auth=model_auth, - modality=Modality.IMAGE - ) - vectors_list.extend(image_vectors) - sub_field_name_list.extend(image_field_names) - - # Process video content - if video_content_to_vectorise: - with RequestMetricsStore.for_request().time("create_vectors"): - - for video_chunks_list in video_content_to_vectorise: - video_vectors = [] - for video_chunk in video_chunks_list: - video_vector = s2_inference.vectorise( - model_name=marqo_index.model.name, - model_properties=marqo_index.model.properties, content=[video_chunk], - device=device, normalize_embeddings=normalize_embeddings, - infer=True, model_auth=model_auth, modality=Modality.VIDEO - ) - video_vectors.append(video_vector) - # Average the vectors for this video field - avg_video_vector = np.mean(video_vectors, axis=0).tolist() - vectors_list.append(avg_video_vector) - sub_field_name_list.extend(video_field_names) - - # Process audio content - if audio_content_to_vectorise: - with RequestMetricsStore.for_request().time(f"create_vectors"): - for audio_chunks_list in audio_content_to_vectorise: - audio_vectors = [] - for audio_chunk in audio_chunks_list: - audio_vector = s2_inference.vectorise( - model_name=marqo_index.model.name, - model_properties=marqo_index.model.properties, content=[audio_chunk], - device=device, normalize_embeddings=normalize_embeddings, - infer=True, model_auth=model_auth, modality=Modality.AUDIO - ) - audio_vectors.extend(audio_vector) - # Average the vectors for this audio field - avg_audio_vector = np.mean(audio_vectors, axis=0).tolist() - vectors_list.append(avg_audio_vector) - sub_field_name_list.extend(audio_field_names) - - end_time = timer() - combo_vectorise_time_to_add += (end_time - start_time) - - except (s2_inference_errors.UnknownModelError, - s2_inference_errors.InvalidModelPropertiesError, - s2_inference_errors.ModelLoadError) as model_error: - raise api_exceptions.BadRequestError( - message=f'Problem vectorising query. Reason: {str(model_error)}', - link=marqo_docs.list_of_models() - ) - except s2_inference_errors.S2InferenceError as e: - combo_document_is_valid = False - unsuccessful_doc_to_append = ( - doc_index, - MarqoAddDocumentsItem( - id=doc_id, - error=e.message, - message=e.message, - status=int(errors.InvalidArgError.status_code), - code=errors.InvalidArgError.code - ) - ) - return combo_chunk, combo_document_is_valid, unsuccessful_doc_to_append, combo_vectorise_time_to_add - - if not len(sub_field_name_list) == len(vectors_list): - raise api_exceptions.BatchInferenceSizeError( - message=f"Batch inference size does not match content for multimodal field {field}" - ) - - vector_chunk = np.squeeze(np.mean( - [np.array(vector) * field_map["weights"][sub_field_name] for sub_field_name, vector in - zip(sub_field_name_list, vectors_list)], axis=0)) - - if normalize_embeddings: - vector_chunk = vector_chunk / np.linalg.norm(vector_chunk) - - vector_chunk = vector_chunk.tolist() - - combo_chunk = { - TensorField.marqo_knn_field: vector_chunk, - TensorField.field_content: json.dumps(multimodal_object), - TensorField.field_name: field, - } - - return combo_chunk, combo_document_is_valid, unsuccessful_doc_to_append, combo_vectorise_time_to_add - - def delete_documents(config: Config, index_name: str, doc_ids: List[str]): """Delete documents from the Marqo index with the given doc_ids """ # Make sure the index exists @@ -2730,32 +1125,4 @@ def delete_documents(config: Config, index_name: str, doc_ids: List[str]): ) ) -def normalize_vector(embeddings: Union[List[List[float]], ndarray, List[float]]) -> List[List[float]]: - """ - Normalizes a list of vectors (embeddings) to have unit length. - - Args: - embeddings (Union[List[List[float]], ndarray], List[float]): A list of vectors or a numpy ndarray of vectors to be normalized. - - Returns: - List[List[float]]: A list of normalized vectors. - """ - - # Convert the input embeddings to a numpy array - if embeddings.__class__ == ndarray: - embeddings_array = embeddings - else: - embeddings_array = np.array(embeddings) - - # Calculate the magnitude (Euclidean norm) of each vector along the last axis - magnitude = np.linalg.norm(embeddings_array, axis = -1, keepdims=True) - - # Normalize each vector by dividing by its magnitude, handle zero magnitude case - if magnitude != 0: - embeddings_array = embeddings_array / magnitude - else: - raise core_exceptions.ZeroMagnitudeVectorError(f"Zero magnitude vector detected, cannot normalize.") - - # Convert the normalized numpy array back to a list and return - return embeddings_array.tolist() diff --git a/tests/marqo_test.py b/tests/marqo_test.py index c18249ab1..0475b495c 100644 --- a/tests/marqo_test.py +++ b/tests/marqo_test.py @@ -101,8 +101,7 @@ def create_indexes(cls, index_requests: List[MarqoIndexRequest]) -> List[MarqoIn @classmethod def add_documents(cls, *args, **kwargs): - # TODO change to use config.document.add_documents when tensor_search.add_documents is removed - return tensor_search.add_documents(*args, **kwargs) + return cls.config.document.add_documents(add_docs_params=kwargs.get('add_docs_params')) def setUp(self) -> None: self.clear_indexes(self.indexes)