From 488e89e692d9803e5cff96f62c9a944077d68f18 Mon Sep 17 00:00:00 2001 From: htagourti Date: Thu, 28 Nov 2024 10:11:09 +0000 Subject: [PATCH] added filecatcher lib --- app/chainlit_app.py | 3 +- main.py | 3 +- manage_collection.py | 4 +- pyproject.toml | 1 + src/components/chunker.py | 299 ----------------------------- src/components/config.py | 14 +- src/components/embeddings.py | 75 -------- src/components/evaluation.py | 2 +- src/components/llm.py | 56 ------ src/components/loader.py | 358 ----------------------------------- src/components/pipeline.py | 41 +--- src/components/retriever.py | 7 +- src/components/utils.py | 56 ------ src/components/vectordb.py | 225 ---------------------- 14 files changed, 13 insertions(+), 1131 deletions(-) delete mode 100644 src/components/chunker.py delete mode 100644 src/components/embeddings.py delete mode 100644 src/components/llm.py delete mode 100644 src/components/loader.py delete mode 100644 src/components/utils.py delete mode 100644 src/components/vectordb.py diff --git a/app/chainlit_app.py b/app/chainlit_app.py index d027fec..f602cbf 100644 --- a/app/chainlit_app.py +++ b/app/chainlit_app.py @@ -3,9 +3,8 @@ import sys, os, yaml, torch sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../'))) -from src.components import RagPipeline, load_config, AudioTranscriber +from src.components import RagPipeline, load_config from loguru import logger -from io import BytesIO APP_DIR = Path(__file__).parent.absolute() # Path.cwd().parent.absolute() UPLOAD_DIR = APP_DIR / "upload_dir" diff --git a/main.py b/main.py index f699dad..8d7bb58 100644 --- a/main.py +++ b/main.py @@ -2,7 +2,8 @@ import asyncio from loguru import logger from pathlib import Path -from src.components import RagPipeline, load_config, evaluate, Indexer +from src.components import RagPipeline, load_config, evaluate +from filecatcher.components import Indexer # config_path = Path(__file__) / '.hydra_config' config = load_config() diff --git a/manage_collection.py b/manage_collection.py index d030e3b..897cf90 100755 --- a/manage_collection.py +++ b/manage_collection.py @@ -47,12 +47,12 @@ async def main(): print(config) if args.folder: - from src.components import Indexer + from filecatcher.components import Indexer collection = config.vectordb["collection_name"] logger.warning(f"Data will be upserted to the collection {collection}") - indexer = Indexer(config, logger) + indexer = Indexer(config = config, logger = logger) start = time.time() await indexer.add_files2vdb(path=args.folder) diff --git a/pyproject.toml b/pyproject.toml index b35ed30..3de3a1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ langchain-community = "^0.3.7" langchain-openai = "^0.2.8" ragatouille = "^0.0.8.post4" whisperx = {git = "https://github.com/federicotorrielli/BetterWhisperX", branch="main"} +filecatcher = {git = "https://github.com/OpenLLM-France/fileCatcher.git"} pyannote-audio = "3.1.1" ctranslate2 = "4.4.0" qdrant-client = "^1.12.1" diff --git a/src/components/chunker.py b/src/components/chunker.py deleted file mode 100644 index a3962c8..0000000 --- a/src/components/chunker.py +++ /dev/null @@ -1,299 +0,0 @@ -import re -import asyncio -from .llm import LLM -from typing import Optional -from abc import ABCMeta, abstractmethod, ABC -from langchain_openai import ChatOpenAI -from langchain_core.documents.base import Document -from langchain_core.prompts import ChatPromptTemplate -from langchain_core.output_parsers import StrOutputParser -from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceEmbeddings -from langchain.callbacks import StdOutCallbackHandler -from loguru import logger -from langchain_core.runnables import RunnableLambda -from omegaconf import OmegaConf - - -class ABCChunker(ABC): - @abstractmethod - def __init__(self, *args, **kwargs) -> None: - pass - - @abstractmethod - def split_document(self, doc: Document): - pass - - -# templtate = """ -# -# {origin} -# - -# This is the chunk we want to situate within the document named `{source}`. -# The document’s name itself may contain relevant information (such as "employees CV," "tutorials," etc.) that can help contextualize the content. - -# -# {chunk} -# - -# Please provide a brief, succinct context to situate this chunk within the document, specifically to improve search retrieval of this chunk. -# Respond only with the concise context in the same language as the provided document and chunk. -# """ - -templtate = """ - -Title: {source} -# The document title may contain key metadata (e.g., "cv", "videos", "client proposals"). -First page of the document: {first_page} -Previous chunk: {prev_chunk} - - - -{chunk} - - -**Task:** -Provide a concise, one-sentence context that situates the ** within the **, integrating relevant information from: -1. **Title** (e.g., type, or category information encoded in the filename). -2. **First page of the document**. -3. **Previous chunk**. -4. **Current chunk content**. - -**Response Format:** -- Only provide a single, concise contextual sentence for the **. -- Write the response in the **same language** as the current chunk to enhance retrieval quality. -- Do not include any additional text or explanation. -""" - - -class ChunkContextualizer: - def __init__(self, contextual_retrieval: bool = False, llm: Optional[ChatOpenAI] = None): - self.contextual_retrieval = contextual_retrieval - self.context_generator = None - if self.contextual_retrieval: - assert isinstance(llm, ChatOpenAI), f'The `llm` should be of type `ChatOpenAI` is contextual_retrieval is `True`' - prompt = ChatPromptTemplate.from_template( - template=templtate - ) - self.context_generator = (prompt | llm | StrOutputParser()) - - def contextualize(self, prev_chunks: list[Document], b_chunks: list[Document], pages: list[str], source: str) -> list[str]: - if not self.contextual_retrieval: - return [chunk.page_content for chunk in b_chunks] - - try: - b_contexts = self.context_generator\ - .with_retry( - retry_if_exception_type=(Exception,), - wait_exponential_jitter=False, - stop_after_attempt=3 - )\ - .batch([ - { - "first_page": pages[0], - 'prev_chunk': prev_chunk.page_content, - 'chunk': chunk.page_content, - 'source': source - } for prev_chunk, chunk in zip(prev_chunks, b_chunks) - ]) - - s = "chunk's context: {chunk_context}\n\n=> chunk: {chunk}" - return [s.format(chunk=chunk.page_content, chunk_context=chunk_context) - for chunk, chunk_context in zip(b_chunks, b_contexts)] - - except Exception as e: - logger.warning(f"An error occurred with document `{source}`: {e}") - return [chunk.page_content for chunk in b_chunks] - - - -class RecursiveSplitter(ABCChunker): - def __init__( - self, - chunk_size: int=200, - chunk_overlap: int=20, - contextual_retrieval: bool=True, - llm: Optional[ChatOpenAI]=None, - **args - ): - super().__init__(contextual_retrieval=contextual_retrieval, llm=llm) - - from langchain.text_splitter import RecursiveCharacterTextSplitter - self.splitter = RecursiveCharacterTextSplitter( - chunk_size=chunk_size, - chunk_overlap=chunk_overlap, - **args - ) - self.contextualizer = ChunkContextualizer(contextual_retrieval=contextual_retrieval, llm=llm) - - - def split_document(self, doc: Document): - text = '' - page_idx = [] - source = doc.metadata["source"] - page_sep = doc.metadata["page_sep"] - pages: list[str] = doc.page_content.split(sep=page_sep) - - start_index = 0 - # TODO: We can apply apply this function 'split_text' once. - for page_num, p in enumerate(pages, start=1): - text += ' ' + p - c = ' '.join( - self.splitter.split_text(text) - ) - end_index = len(c) - 1 - page_idx.append( - { - "start_idx": start_index, - "end_idx": end_index, - "page": page_num - } - ) - start_index = end_index - - # split - full_text_preprocessed = ' '.join( - self.splitter.split_text(text) - ) - - # chunking the full text - chunks = self.splitter.create_documents([text]) - - i = 0 - batch_size = 4 - filtered_chunks = [] - - chunks = [Document(page_content='')] + chunks - - for j in range(1, len(chunks), batch_size): - b_chunks = chunks[j:j+batch_size] - prev_chunks = chunks[j-1:j+batch_size-1] - - b_chunks_w_context = self.contextualizer.contextualize( - prev_chunks=prev_chunks, b_chunks=b_chunks, - pages=pages, source=source - ) - - for chunk, b_chunk_w_context in zip(b_chunks, b_chunks_w_context): - start_idx = full_text_preprocessed.find(chunk.page_content) - - while not (page_idx[i]["start_idx"] <= start_idx <= page_idx[i]["end_idx"]) and i < len(page_idx)-1: - i += 1 - - if len(chunk.page_content.strip()) > 1: - chunk.page_content = b_chunk_w_context - chunk.metadata = { - "page": page_idx[i]["page"], - "source": source, - } - filtered_chunks.append(chunk) - - return filtered_chunks - - -class SemanticSplitter(ABCChunker): - def __init__(self, - min_chunk_size: int = 1000, - embeddings = None, - breakpoint_threshold_amount=85, - contextual_retrieval: bool=False, llm: Optional[ChatOpenAI]=None, - **args - ) -> None: - from langchain_experimental.text_splitter import SemanticChunker - self.splitter = SemanticChunker( - embeddings=embeddings, - buffer_size=1, - breakpoint_threshold_type='percentile', - breakpoint_threshold_amount=breakpoint_threshold_amount, - min_chunk_size=min_chunk_size, - add_start_index=True, - **args - ) - self.contextualizer = ChunkContextualizer(contextual_retrieval=contextual_retrieval, llm=llm) - - def split_document(self, doc: Document): - text = '' - page_idx = [] - source = doc.metadata["source"] - page_sep = doc.metadata["page_sep"] - pages = doc.page_content.split(sep=page_sep) - - start_index = 0 - for page_num, page_txt in enumerate(pages, start=1): - text += ' ' + page_txt - c = ' '.join( - re.split(self.splitter.sentence_split_regex, text) - ) - end_index = len(c) - 1 - page_idx.append( - { - "start_idx": start_index, - "end_idx": end_index, - "page": page_num - } - ) - start_index = end_index - - chunks = self.splitter.create_documents( - [' '.join(re.split(self.splitter.sentence_split_regex, text))] - ) - - i = 0 - filtered_chunks = [] - batch_size = 4 - - chunks = [Document(page_content='')] + chunks - for j in range(1, len(chunks), batch_size): - b_chunks = chunks[j:j+batch_size] - prev_chunks = chunks[j-1:j+batch_size-1] - - b_chunks_w_context = self.contextualizer.contextualize( - prev_chunks=prev_chunks, b_chunks=b_chunks, - pages=pages, source=source - ) - - for chunk, b_chunk_w_context in zip(b_chunks, b_chunks_w_context): - start_idx = chunk.metadata["start_index"] - - while not (page_idx[i]["start_idx"] <= start_idx <= page_idx[i]["end_idx"]) and i < len(page_idx)-1: - i += 1 - - if len(chunk.page_content.strip()) > 1: - chunk.page_content = b_chunk_w_context - chunk.metadata = { - "page": page_idx[i]["page"], - "source": source, - } - filtered_chunks.append(chunk) - return filtered_chunks - - -class ChunkerFactory: - CHUNKERS = { - 'recursive_splitter': RecursiveSplitter, - 'semantic_splitter': SemanticSplitter, - } - - @staticmethod - def create_chunker(config:OmegaConf, embedder: Optional[HuggingFaceBgeEmbeddings | HuggingFaceEmbeddings]=None) -> ABCChunker: - # Extract parameters - chunker_params = OmegaConf.to_container(config.chunker, resolve=True) - name = chunker_params.pop("name") - - # Initialize and return the chunker - chunker_class: ABCChunker = ChunkerFactory.CHUNKERS.get(name) - if not chunker_class: - raise ValueError(f"Chunker '{name}' is not recognized.") - - # Add embeddings if semantic splitter is selected - if name == 'semantic_splitter': - if embedder is not None: - chunker_params.update({"embeddings": embedder}) - else: - raise AttributeError(f"{name} type chunker requires the `embedder` parameter") - - # Include contextual retrieval if specified - if chunker_params['contextual_retrieval']: - chunker_params['llm'] = LLM(config, logger=None).client - - return chunker_class(**chunker_params) \ No newline at end of file diff --git a/src/components/config.py b/src/components/config.py index 16715d7..07ec9d5 100644 --- a/src/components/config.py +++ b/src/components/config.py @@ -1,19 +1,9 @@ -import os -from dotenv import load_dotenv, find_dotenv from omegaconf import OmegaConf from pathlib import Path from hydra import initialize, compose -def load_config(config_path="../../.hydra_config", overrides=None)-> OmegaConf: - load_dotenv() - print(overrides) - +def load_config(config_path="../../.hydra_config", overrides=None)-> OmegaConf: with initialize(config_path=config_path, job_name="config_loader"): config = compose(config_name="config", overrides=overrides) config.paths.root_dir = Path(config.paths.root_dir).absolute() - return config - -# # Example usage -# if __name__ == "__main__": -# config = load_config() -# print(config) \ No newline at end of file + return config \ No newline at end of file diff --git a/src/components/embeddings.py b/src/components/embeddings.py deleted file mode 100644 index 33a0942..0000000 --- a/src/components/embeddings.py +++ /dev/null @@ -1,75 +0,0 @@ -from abc import abstractmethod, ABC -from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceEmbeddings -import torch -from omegaconf import OmegaConf - - -class ABCEmbedder(ABC): - """Abstract base class defining the interface for embedder implementations. - - This class serves as a template for creating embedder classes that convert text - into vector representations using various embedding models. - """ - - @abstractmethod - def get_embeddings(self): - """Return the embeddings model instance. - - Returns: - Any: The embeddings model instance that can generate vector representations. - """ - pass - - -# Dictionary mapping embedding model types to their corresponding classes -HG_EMBEDDER_TYPE = { - "huggingface_bge": HuggingFaceBgeEmbeddings, - "huggingface": HuggingFaceEmbeddings -} - - -class HFEmbedder(ABCEmbedder): - """Factory class for loading and managing HuggingFace embedding models. - - This class handles the initialization and configuration of various HuggingFace - embedding models, supporting both BGE and standard HuggingFace embeddings. - - Args: - embedder_config (OmegaConf): Configuration object containing model parameters - device (str, optional): Device to run the model on ('cuda' or 'cpu'). - Defaults to None, which auto-selects based on CUDA availability. - - Raises: - ValueError: If the specified model type is not supported or if initialization fails. - """ - - def __init__(self, embedder_config: OmegaConf, device=None) -> None: - # Extract model type from config - model_type = embedder_config["type"] - - if model_type in HG_EMBEDDER_TYPE: - # Auto-select device if none specified - if device is None: - device = 'cuda' if torch.cuda.is_available() else 'cpu' - - try: - model_name = embedder_config["model_name"] - self.embedding = HG_EMBEDDER_TYPE[model_type]( - model_name=model_name, - model_kwargs={"device": device, 'trust_remote_code': True}, - encode_kwargs={"normalize_embeddings": True} - ) - except Exception as e: - raise ValueError(f"An error occurred during model initialization: {e}") - else: - raise ValueError(f"{model_type} is not a supported `model_type`") - - - def get_embeddings(self) -> HuggingFaceBgeEmbeddings: - """Retrieve the initialized embedding model. - - Returns: - HuggingFaceBgeEmbeddings: The configured embedding model instance - ready for generating embeddings. - """ - return self.embedding \ No newline at end of file diff --git a/src/components/evaluation.py b/src/components/evaluation.py index 44933aa..1674646 100644 --- a/src/components/evaluation.py +++ b/src/components/evaluation.py @@ -1,5 +1,5 @@ from pathlib import Path -from .utils import load_sys_template +from filecatcher.components import load_sys_template from langchain_openai import ChatOpenAI from langchain_core.prompts import ( MessagesPlaceholder, diff --git a/src/components/llm.py b/src/components/llm.py deleted file mode 100644 index 58bb314..0000000 --- a/src/components/llm.py +++ /dev/null @@ -1,56 +0,0 @@ -from uuid import UUID -from langchain_core.outputs import ChatGenerationChunk, GenerationChunk, LLMResult -from langchain_openai import ChatOpenAI -from langchain_core.prompts import ( - MessagesPlaceholder, - ChatPromptTemplate -) -from langchain_core.messages import AIMessage, HumanMessage, BaseMessage -from langchain_core.callbacks import AsyncCallbackHandler -from typing import Any, AsyncIterator - -class LLM: - def __init__(self, config, logger=None): - self.logger = logger - self.client: ChatOpenAI = ChatOpenAI( - model=config.llm["name"], - base_url=config.llm["base_url"], - api_key=config.llm['api_key'], - timeout=60, - temperature=config.llm["temperature"], - max_tokens=config.llm["max_tokens"], - streaming=True - ) - - def run(self, - question: str, context: str, - chat_history: list[AIMessage | HumanMessage], - sys_pmpt_tmpl: str - )-> AsyncIterator[BaseMessage]: - - """This method runs the LLM given the user's input (`question`), `chat_history`, and the system prompt template (`sys_prompt_tmpl`) - - Args: - question (str): The input from the user; not necessarily a question. - context (str): It's the retrieved documents (formatted into a string) - chat_history (list[AIMessage | HumanMessage]): The Chat history. - sys_prompt_tmpl (str): The system prompt - Returns: - AsyncIterator[BaseMessage] - """ - - qa_prompt = ChatPromptTemplate.from_messages( - [ - ("system", sys_pmpt_tmpl), - MessagesPlaceholder("chat_history"), - ("human", "{input}") - ] - ) - rag_chain = qa_prompt | self.client.with_retry(stop_after_attempt=3) - - input_ = { - "input": question, - "context": context, - "chat_history": chat_history, - } - return rag_chain.astream(input_) \ No newline at end of file diff --git a/src/components/loader.py b/src/components/loader.py deleted file mode 100644 index 188cea4..0000000 --- a/src/components/loader.py +++ /dev/null @@ -1,358 +0,0 @@ -from abc import abstractmethod, ABC -import asyncio -import os -from collections import defaultdict -import gc -import random -import whisperx -from .utils import SingletonMeta -from pydub import AudioSegment -import torch -from pathlib import Path -from typing import AsyncGenerator -from langchain_core.documents.base import Document -from langchain_community.document_loaders import ( - PyMuPDFLoader, - UnstructuredODTLoader, - UnstructuredWordDocumentLoader, - UnstructuredPowerPointLoader -) -from langchain_community.document_loaders import UnstructuredWordDocumentLoader -from langchain_community.document_loaders import TextLoader -from langchain_community.document_loaders import UnstructuredHTMLLoader - -import pymupdf4llm -from loguru import logger -from aiopath import AsyncPath -from typing import Dict - -# from langchain_community.document_loaders import UnstructuredXMLLoader, PyPDFLoader -# from langchain_community.document_loaders.csv_loader import CSVLoader -# from langchain_community.document_loaders.text import TextLoader -# from langchain_community.document_loaders import UnstructuredHTMLLoader - -class BaseLoader(ABC): - @abstractmethod - async def aload_document(self, file_path): - pass - -class Custompymupdf4llm(BaseLoader): - def __init__(self, page_sep: str='[PAGE_SEP]', **loader_args) -> None: - self.loader_args = loader_args - self.page_sep = page_sep - - async def aload_document(self, file_path): - pages = pymupdf4llm.to_markdown( - file_path, - write_images=False, - page_chunks=True, - **self.loader_args - ) - page_content = f'{self.page_sep}'.join([p['text'] for p in pages]) - return Document( - page_content=page_content, - metadata={ - 'source': str(file_path), - 'page_sep': self.page_sep - } - ) - - -class AudioTranscriber(metaclass=SingletonMeta): - def __init__(self, device='cpu', compute_type='float32', model_name='large-v2', language='fr'): - self.model = whisperx.load_model( - model_name, - device=device, language=language, - compute_type=compute_type - ) - -class VideoAudioLoader(BaseLoader): - def __init__(self, page_sep: str='[PAGE_SEP]', batch_size=4): - self.batch_size = batch_size - self.page_sep = page_sep - self.formats = [".wav", '.mp3', ".mp4"] - - self.transcriber = AudioTranscriber( - device='cuda' if torch.cuda.is_available() else 'cpu' - ) - - def free_memory(self): - gc.collect() - torch.cuda.empty_cache() - - async def aload_document(self, file_path): - path = Path(file_path) - if path.suffix not in self.formats: - logger.warning( - f'This audio/video file ({path.suffix}) is not supported.' - f'The format should be {self.formats}' - ) - return None - - # load it in wave format - if path.suffix == '.wav': - audio_path_wav = path - else: - sound = AudioSegment.from_file(file=path, format=path.suffix[1:]) - audio_path_wav = path.with_suffix('.wav') - # Export to wav format - sound.export( - audio_path_wav, format="wav", - parameters=["-ar", "16000", "-ac", "1", "-ab", "32k"] - ) - - audio = whisperx.load_audio(audio_path_wav) - - if path.suffix != '.wav': - os.remove(audio_path_wav) - - transcription_l = self.transcriber.model.transcribe(audio, batch_size=self.batch_size) - content = ' '.join([tr['text'] for tr in transcription_l['segments']]) - - self.free_memory() - - return Document( - page_content=f"{content}{self.page_sep}", - metadata={ - 'source':str(file_path), - 'page_sep': self.page_sep - } - ) - - -class CustomPPTLoader(BaseLoader): - doc_loaders = { - '.pptx': UnstructuredPowerPointLoader, - '.ppt': UnstructuredPowerPointLoader - } - - cat2md = { - 'Title': '#', - 'NarrativeText': '>', - 'ListItem': '-', - 'UncategorizedText': '', - 'EmailAddress': '', - 'FigureCaption': '*' - } - def __init__(self, page_sep: str='[PAGE_SEP]', **loader_args) -> None: - self.loader_args = loader_args - self.page_sep = page_sep - - def group_by_hierarchy(self, docs: list[Document]): - """Group related elements within each page by parent_id and category_depth""" - grouped_elements = defaultdict(list) - - # Sort by depth to ensure proper hierarchy - sorted_docs = sorted(docs, key=lambda x: x.metadata.get('category_depth', 0)) - - for doc in sorted_docs: - parent_id = doc.metadata.get('parent_id', None) - if parent_id: - grouped_elements[parent_id].append(doc) - else: - grouped_elements[doc.metadata.get('element_id')].append(doc) - return grouped_elements - - async def aload_document(self, file_path): - path = Path(file_path) - cls_loader = CustomPPTLoader.doc_loaders.get(path.suffix, None) - - if cls_loader is None: - raise Exception(f"This loader only supports {CustomPPTLoader.doc_loaders.keys()} format") - - loader = cls_loader( - file_path=str(file_path), - mode='elements', - **self.loader_args - ) - elements = await loader.aload() - - # Step 1: Group elements by page_number - grouped_by_page = defaultdict(list) - for doc in elements: - page_number = doc.metadata.get('page_number') - grouped_by_page[page_number].append(doc) - - # Final structure: dictionary of pages, each containing grouped elements - final_grouped_structure = {} - for page, docs in grouped_by_page.items(): - final_grouped_structure[page] = self.group_by_hierarchy(docs) - - content = '' - for page, grouped_elements in final_grouped_structure.items(): - t = '' - for v in grouped_elements.values(): - for elem in v: - meta = elem.metadata - cat_md = CustomPPTLoader.cat2md.get(meta.get("category"), '') - t += f"{cat_md} {elem.page_content}\n" - - if page is not None and content: - t = f'{self.page_sep}' + t - - content += t - - return Document( - page_content=content, - metadata={ - 'source': str(file_path), - 'page_sep': self.page_sep - } - ) - -class CustomPyMuPDFLoader(BaseLoader): - def __init__(self, page_sep: str='[PAGE_SEP]', **loader_args) -> None: - self.loader_args = loader_args - self.page_sep = page_sep - - async def aload_document(self, file_path): - loader = PyMuPDFLoader( - file_path=Path(file_path), - **self.loader_args - ) - pages = await loader.aload() - return Document( - page_content=f'{self.page_sep}'.join([p.page_content for p in pages]), - metadata={ - 'source': str(file_path), - 'page_sep': self.page_sep - } - ) - -class CustomTextLoader(BaseLoader): - def __init__(self, page_sep: str='[PAGE_SEP]', **loader_args) -> None: - self.loader_args = loader_args - self.page_sep = page_sep - - async def aload_document(self, file_path): - path = Path(file_path) - loader = TextLoader(file_path=str(path), autodetect_encoding=True) - doc = await loader.aload() - return Document( - page_content=f'{self.page_sep}'.join([p.page_content for p in doc]), - metadata={ - 'source': str(file_path), - 'page_sep': self.page_sep - } - ) - - -class CustomHTMLLoader(BaseLoader): - def __init__(self, page_sep: str='[PAGE_SEP]', **loader_args) -> None: - self.loader_args = loader_args - self.page_sep = page_sep - - async def aload_document(self, file_path): - path = Path(file_path) - loader = UnstructuredHTMLLoader(file_path=str(path), autodetect_encoding=True) - doc = await loader.aload() - return Document( - page_content=f'{self.page_sep}'.join([p.page_content for p in doc]), - metadata={ - 'source': str(file_path), - 'page_sep': self.page_sep - } - ) - - - -class CustomDocLoader(BaseLoader): - doc_loaders = { - ".docx": UnstructuredWordDocumentLoader, - '.doc': UnstructuredWordDocumentLoader, - '.odt': UnstructuredODTLoader - } - def __init__(self, page_sep: str='[PAGE_SEP]', **loader_args) -> None: - self.loader_args = loader_args - self.page_sep = page_sep - - - async def aload_document(self, file_path): - path = Path(file_path) - cls_loader = CustomDocLoader.doc_loaders.get(path.suffix, None) - - if cls_loader is None: - raise ValueError(f"This loader only supports {CustomDocLoader.doc_loaders.keys()} format") - - loader = cls_loader( - file_path=str(file_path), - mode='single', - **self.loader_args - ) - pages = await loader.aload() - content = ' '.join([p.page_content for p in pages]) - - return Document( - page_content=f"{content}{self.page_sep}", - metadata={ - 'source': str(file_path), - 'page_sep': self.page_sep - } - ) - - -class DocSerializer: - def __init__(self, root_dir=None) -> None: - self.root_dir = root_dir - - async def serialize_documents(self, path: str | Path, recursive=True) -> AsyncGenerator[Document, None]: - p = AsyncPath(path) - - if await p.is_file(): - type = p.suffix - pattern = f"**/*{type}" - loader_cls: BaseLoader = LOADERS.get(p.suffix) - logger.info(f'Loading {type} files.') - doc: Document = await loader_cls().aload_document(file_path=path) - yield doc - - - is_dir = await p.is_dir() - if is_dir: - for type, loader_cls in LOADERS.items(): # TODO Rendre ceci async: Priority 0 - pattern = f"**/*{type}" - logger.info(f'Loading {type} files.') - files = get_files(path, pattern, recursive) - - async for file in files: - loader = loader_cls() - relative_file_path = Path(file).relative_to(self.root_dir) - doc: Document = await loader.aload_document(file_path=relative_file_path) - print(f"==> Serialized: {str(file)}") - yield doc - - - - - -async def get_files(path, pattern, recursive) -> AsyncGenerator: - p = AsyncPath(path) - async for file in (p.rglob(pattern) if recursive else p.glob(pattern)): - yield file - - - -# TODO create a Meta class that aggregates registery of supported documents from each child class -LOADERS: Dict[str, BaseLoader] = { - '.pdf': CustomPyMuPDFLoader, - '.docx': CustomDocLoader, - '.doc': CustomDocLoader, - '.odt': CustomDocLoader, - - '.mp4': VideoAudioLoader, - '.pptx': CustomPPTLoader, - '.txt': CustomTextLoader, - #'.html': CustomHTMLLoader -} - - -# if __name__ == "__main__": -# async def main(): -# loader = DocSerializer() -# dir_path = "../../data/" # Replace with your actual directory path -# docs = loader.serialize_documents(dir_path) -# async for d in docs: -# pass - - -# asyncio.run(main()) diff --git a/src/components/pipeline.py b/src/components/pipeline.py index 3b11e27..93f22c6 100644 --- a/src/components/pipeline.py +++ b/src/components/pipeline.py @@ -2,23 +2,12 @@ import sys import torch -from .chunker import ABCChunker, ChunkerFactory from langchain_core.documents.base import Document -from .llm import LLM -from .utils import format_context, load_sys_template from .reranker import Reranker from .retriever import ABCRetriever, RetrieverFactory -from .vectordb import ConnectorFactory -from .embeddings import HFEmbedder -from omegaconf import OmegaConf -from .loader import DocSerializer from loguru import logger -from typing import AsyncGenerator from langchain_core.output_parsers import StrOutputParser -from langchain_core.prompts import ( - MessagesPlaceholder, - ChatPromptTemplate -) +from langchain_core.prompts import ChatPromptTemplate from langchain_core.messages import ( AIMessage, HumanMessage @@ -26,31 +15,7 @@ from pathlib import Path from collections import deque from .grader import Grader - -class Indexer: - """This class bridges static files with the vector store database. - """ - def __init__(self, config: OmegaConf, logger, device=None) -> None: - embedder = HFEmbedder(embedder_config=config.embedder, device=device) - self.serializer = DocSerializer(root_dir=config.paths.root_dir) - self.chunker: ABCChunker = ChunkerFactory.create_chunker(config, embedder=embedder.get_embeddings()) - self.vectordb = ConnectorFactory.create_vdb(config, logger=logger, embeddings=embedder.get_embeddings()) - self.logger = logger - self.logger.info("Indexer initialized...") - - - async def add_files2vdb(self, path): - """Add a files to the vector database in async mode""" - try: - doc_generator: AsyncGenerator[Document, None] = self.serializer.serialize_documents(path, recursive=True) - await self.vectordb.async_add_documents( - doc_generator=doc_generator, - chunker=self.chunker, - document_batch_size=4 - ) - self.logger.info(f"Documents from {path} added.") - except Exception as e: - raise Exception(f"An exception as occured: {e}") +from filecatcher.components import Indexer, LLM, format_context, load_sys_template class RagPipeline: @@ -58,7 +23,7 @@ def __init__(self, config, device="cpu") -> None: self.config = config # print(self.config) self.logger = self.set_logger(config) - self.indexer = Indexer(config, self.logger, device=device) + self.indexer = Indexer(config = config, logger = self.logger, device=device) self.reranker = None if config.reranker["model_name"]: diff --git a/src/components/retriever.py b/src/components/retriever.py index a878ef2..8b6f69c 100644 --- a/src/components/retriever.py +++ b/src/components/retriever.py @@ -1,17 +1,12 @@ # Import necessary modules and classes from abc import abstractmethod, ABC -import asyncio from pathlib import Path -from .llm import LLM -from .vectordb import QdrantDB, ABCVectorDB -from loguru import logger from langchain_core.prompts import ChatPromptTemplate -from .utils import load_sys_template from langchain_openai import ChatOpenAI from langchain_core.output_parsers import StrOutputParser -from .llm import LLM from langchain_core.documents.base import Document from omegaconf import OmegaConf +from filecatcher.components import LLM, QdrantDB, ABCVectorDB, load_sys_template CRITERIAS = ["similarity"] diff --git a/src/components/utils.py b/src/components/utils.py deleted file mode 100644 index 848093a..0000000 --- a/src/components/utils.py +++ /dev/null @@ -1,56 +0,0 @@ -from pathlib import Path -from langchain_core.documents.base import Document - -class SingletonMeta(type): - _instances = {} - - def __call__(cls, *args, **kwargs): - if cls not in cls._instances: - # print("1st creation") - instance = super().__call__(*args, **kwargs) - cls._instances[cls] = instance - # else: - # print("Same one") - return cls._instances[cls] - - -def load_sys_template(file_path: Path) -> tuple[str, str]: - with open(file_path, mode="r") as f: - sys_msg = f.read() - return sys_msg - - -def format_context(docs: list[Document]) -> str: - """Build context string from list of documents.""" - if not docs: - return 'No document found from the database', [] - - sources = [] - context = "Extracted documents:\n" - - for i, doc in enumerate(docs, start=1): - doc_id = f"[doc_{i}]" - source = doc.metadata["source"] - page = doc.metadata["page"] - - document = f""" - document id: {doc_id} - content: \n{doc.page_content.strip()}\n - """ - - # document = f"""\n{doc.page_content.strip()}\n\n""" - # Source: {source} (Page: {page}) - - context += document - context += "=" * 40 + "\n\n" - - sources.append( - { - "doc_id": doc_id, - 'source': source, - 'page': page, - 'content': doc.page_content - } - ) - - return context, sources \ No newline at end of file diff --git a/src/components/vectordb.py b/src/components/vectordb.py deleted file mode 100644 index 61fdeb0..0000000 --- a/src/components/vectordb.py +++ /dev/null @@ -1,225 +0,0 @@ -from abc import abstractmethod, ABC -import asyncio -from typing import Union -from qdrant_client import QdrantClient, models -from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceEmbeddings -from langchain_core.documents.base import Document -from langchain_qdrant import QdrantVectorStore, FastEmbedSparse, RetrievalMode -from .chunker import ABCChunker - - -# https://python-client.qdrant.tech/qdrant_client.qdrant_client - -class ABCVectorDB(ABC): - """ - Abstract base class for a Vector Database. - This class defines the interface for a vector database connector. - """ - @abstractmethod - async def async_add_documents(self, index_name, chunks, embeddings): - pass - - @abstractmethod - async def async_search(self, query: str, top_k: int = 5) -> list[Document]: - pass - - @abstractmethod - async def async_multy_query_search(self, queries: list[str], top_k_per_query: int = 5) -> list[Document]: - pass - - -class QdrantDB(ABCVectorDB): - """ - Concrete class for a Qdrant Vector Database. - This class implements the **`BaseVectorDd`** interface for a Qdrant database. - """ - - def __init__( - self, - host, port, - embeddings: HuggingFaceBgeEmbeddings | HuggingFaceEmbeddings=None, - collection_name: str = None, logger = None, - hybrid_mode=True, - ): - """ - Initialize Qdrant_Connector. - - Args: - host (str): The host of the Qdrant server. - port (int): The port of the Qdrant server. - collection_name (str): The name of the collection in the Qdrant database. - embeddings (list): The embeddings. - """ - - self.logger = logger - self.embeddings: Union[HuggingFaceBgeEmbeddings, HuggingFaceEmbeddings] = embeddings - self.port = port - self.host = host - self.client = QdrantClient( - port=port, host=host, - prefer_grpc=False, - ) - - self.sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25") if hybrid_mode else None - self.retrieval_mode = RetrievalMode.HYBRID if hybrid_mode else RetrievalMode.DENSE - print("RETRIEVE MODE ==>", self.retrieval_mode) - - # Initialize collection-related attributes - self._collection_name = None - self.vector_store = None - - # Set the initial collection name (if provided) - if collection_name: - self.collection_name = collection_name - - @property - def collection_name(self): - return self._collection_name - - @collection_name.setter - def collection_name(self, name: str): - if not name: - raise ValueError("Collection name cannot be empty.") - - - if self.client.collection_exists(collection_name=name): - self.vector_store = QdrantVectorStore( - client=self.client, - collection_name=name, - embedding=self.embeddings, - sparse_embedding=self.sparse_embeddings, - retrieval_mode=self.retrieval_mode, - ) - self.logger.warning(f"The Collection named `{name}` loaded.") - else: - self.vector_store = QdrantVectorStore.construct_instance( - embedding=self.embeddings, - sparse_embedding=self.sparse_embeddings, - collection_name=name, - client_options={'port': self.port, 'host':self.host}, - retrieval_mode=self.retrieval_mode, - ) - self.logger.info(f"As the collection `{name}` is non-existant, it's created.") - - - async def async_search(self, query: str, top_k: int = 5, similarity_threshold: int=0.80) -> list[Document]: - docs_scores = await self.vector_store.asimilarity_search_with_relevance_scores(query=query, k=top_k, score_threshold=similarity_threshold) - docs = [doc for doc, score in docs_scores] - return docs - - - async def async_multy_query_search(self, queries: list[str], top_k_per_query: int = 5, similarity_threshold: int=0.80) -> list[Document]: - # Gather all search tasks concurrently - search_tasks = [self.async_search(query=query, top_k=top_k_per_query, similarity_threshold=similarity_threshold) for query in queries] - retrieved_results = await asyncio.gather(*search_tasks) - - s = sum(retrieved_results, []) - - retrieved_chunks = {} - # Process the retrieved documents - for retrieved in retrieved_results: - if retrieved: - for document in retrieved: - retrieved_chunks[document.metadata["_id"]] = document - - return list(retrieved_chunks.values()) - - - async def async_add_documents(self, - doc_generator, - chunker: ABCChunker, - document_batch_size: int=6, - max_concurrent_gpu_ops: int=5, - max_queued_batches: int=2 - ) -> None: - """ - Asynchronously process documents through a GPU-based chunker using a producer-consumer pattern. - - This implementation maintains high GPU utilization by preparing batches ahead of time while - the current batch is being processed. It uses a queue system to manage document batches and - controls GPU memory usage through semaphores. - - Args: - doc_generator: An async iterator yielding documents to process - chunker (BaseChunker): The chunker instance that will split documents using GPU or CPU - document_batch_size (int): Number of documents to process in each batch. Default: 6 - max_concurrent_gpu_ops (int): Maximum number of concurrent GPU operations. Default: 5 - max_queued_batches (int): Number of batches to prepare ahead in queue. Default: 2 - """ - gpu_semaphore = asyncio.Semaphore(max_concurrent_gpu_ops) # Only allow max_concurrent_gpu_ops GPU operation at a time - batch_queue = asyncio.Queue(maxsize=max_queued_batches) - - async def chunk(doc): - async with gpu_semaphore: - chunks = await asyncio.to_thread(chunker.split_document, doc) # uses GPU - self.logger.info(f"Processed doc: {doc.metadata['source']}") - return chunks - - async def producer(): - current_batch = [] - try: - async for doc in doc_generator: - current_batch.append(doc) - if len(current_batch) == document_batch_size: - await batch_queue.put(current_batch) - current_batch = [] - - # Put remaining documents - if current_batch: - await batch_queue.put(current_batch) - - finally: - # Send one None for each consumer - for _ in range(max_queued_batches): - await batch_queue.put(None) - - - async def consumer(consumer_id=0): - while True: - batch = await batch_queue.get() - if batch is None: # End signal - batch_queue.task_done() - self.logger.info(f"Consumer {consumer_id} ended") - break - - tasks = [asyncio.create_task(chunk(doc)) for doc in batch] - chunks_list = await asyncio.gather(*tasks, return_exceptions=True) - all_chunks = sum(chunks_list, []) - - if all_chunks: - await self.vector_store.aadd_documents(all_chunks) - self.logger.info("INSERTED") - - batch_queue.task_done() - - # Run producer and consumer concurrently - producer_task = asyncio.create_task(producer()) - consumer_tasks = [asyncio.create_task(consumer(consumer_id=i)) for i in range(max_queued_batches)] - - # Wait for producer to complete and queue to be empty - await producer_task - await batch_queue.join() - - # Wait for all consumers to complete - await asyncio.gather(*consumer_tasks) - - - -class ConnectorFactory: - CONNECTORS = { - "qdrant": QdrantDB - } - - @staticmethod - def create_vdb(config, logger, embeddings) -> ABCVectorDB: - # Extract parameters - dbconfig = dict(config.vectordb) - name = dbconfig.pop("connector_name") - vdb_cls = ConnectorFactory.CONNECTORS.get(name) - if not vdb_cls: - raise ValueError(f"VectorDB '{name}' is not supported.") - - dbconfig['embeddings'] = embeddings - dbconfig['logger'] = logger - - return vdb_cls(**dbconfig) \ No newline at end of file