From 8be8cc8eca4dac781a4edde657ff7977f3908636 Mon Sep 17 00:00:00 2001 From: Mardone Date: Wed, 17 Jul 2024 16:50:24 -0300 Subject: [PATCH] improve task --- app/celery.py | 27 +++++++++++++++++++-------- app/indexer/content_bases.py | 18 ++++++------------ app/indexer/indexer_file_manager.py | 18 ++++++++---------- 3 files changed, 33 insertions(+), 30 deletions(-) diff --git a/app/celery.py b/app/celery.py index c8095a7..b3df984 100644 --- a/app/celery.py +++ b/app/celery.py @@ -1,12 +1,11 @@ import os from celery import Celery -from typing import Dict, List +from typing import Dict -from langchain.docstore.document import Document +from app.indexer.indexer_file_manager import IndexerFileManager, add_file_metadata -from app.store import IStorage -from app.indexer.indexer_file_manager import IndexerFileManager +from app.indexer.content_bases import ContentBaseIndexer from app.downloaders.s3 import S3FileDownloader from app.handlers.nexus import NexusRESTClient from app.text_splitters import TextSplitter, character_text_splitter @@ -60,13 +59,25 @@ def index_file_data(content_base: Dict) -> bool: return index_result -@celery.task(name="save_file") -def start_save( - docs: List[Document], - search_results: List[Dict] +@celery.task(name="document_save") +def document_save( + docs: dict, + content_base_uuid: str ) -> bool: from app.main import main_app + storage = main_app.content_base_vectorstore + docs = add_file_metadata(docs, content_base_uuid) + content_base_indexer = ContentBaseIndexer(storage) + + file_uuid = docs[0].metadata["file_uuid"] + content_base_uuid = docs[0].metadata["content_base_uuid"] + + search_results = content_base_indexer._search_docs_by_content_base_uuid( + content_base_uuid=content_base_uuid, + file_uuid=file_uuid, + ) + ids = [] if len(search_results) > 0: ids = [item["_id"] for item in search_results] diff --git a/app/indexer/content_bases.py b/app/indexer/content_bases.py index 7998be6..0577806 100644 --- a/app/indexer/content_bases.py +++ b/app/indexer/content_bases.py @@ -1,8 +1,9 @@ from langchain.docstore.document import Document -from app.celery import start_save +from app.celery import document_save from app.handlers.products import Product from app.indexer import IDocumentIndexer + from app.store import IStorage from typing import List from uuid import UUID @@ -12,19 +13,12 @@ class ContentBaseIndexer(IDocumentIndexer): def __init__(self, storage: IStorage): self.storage = storage - def index_documents(self, docs: List[Document]): - file_uuid = docs[0].metadata["file_uuid"] - content_base_uuid = docs[0].metadata["content_base_uuid"] - print("start _search_docs_by_content_base_uuid") - results = self._search_docs_by_content_base_uuid( + def index_documents(self, dict_docs: dict, content_base_uuid: str): + print("mandou para task") + task_status = document_save.delay( content_base_uuid=content_base_uuid, - file_uuid=file_uuid, + docs=dict_docs ) - print("end _search_docs_by_content_base_uuid") - - print("start save") - - task_status = start_save.delay(search_results=results, docs=docs) return task_status.wait() def index(self, texts: List, metadatas: dict): diff --git a/app/indexer/indexer_file_manager.py b/app/indexer/indexer_file_manager.py index d4be1f8..b6beb72 100644 --- a/app/indexer/indexer_file_manager.py +++ b/app/indexer/indexer_file_manager.py @@ -4,6 +4,7 @@ load_file_url_and_split_text ) from app.text_splitters import get_split_text +from app.indexer.indexer_file_manager import add_file_metadata from typing import Dict, List from fastapi.logger import logger from langchain.schema.document import Document @@ -53,22 +54,19 @@ def index_file_url(self, content_base, **kwargs) -> bool: docs: List[Document] full_content: str - print("Start load_file_url_and_split_text") - docs, full_content = load_file_url_and_split_text( + + docs, full_content = load_file_url_and_split_text( content_base.get("file"), content_base.get('extension_file'), self.text_splitter, load_type=load_type ) - print("End load_filk_url_and_split_text") - print("Start add_file_metadata") - document_pages: List[Document] = add_file_metadata(docs, content_base) - print("End add_file_metadata") + dict_docs = [doc.dict() for doc in docs] + try: - print("Start index_documents") - self.content_base_indexer.index_documents(document_pages) - print("End index_documents") - print("Start index_doc_content") + print("Start index_documents_content") + self.content_base_indexer.index_documents(dict_docs, content_base) + print("Start index doc content") self.content_base_indexer.index_doc_content( full_content=full_content, content_base_uuid=str(content_base.get('content_base')),