diff --git a/app/chainlit_app.py b/app/chainlit_app.py
index d027fec..f602cbf 100644
--- a/app/chainlit_app.py
+++ b/app/chainlit_app.py
@@ -3,9 +3,8 @@
import sys, os, yaml, torch
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../')))
-from src.components import RagPipeline, load_config, AudioTranscriber
+from src.components import RagPipeline, load_config
from loguru import logger
-from io import BytesIO
APP_DIR = Path(__file__).parent.absolute() # Path.cwd().parent.absolute()
UPLOAD_DIR = APP_DIR / "upload_dir"
diff --git a/main.py b/main.py
index f699dad..8d7bb58 100644
--- a/main.py
+++ b/main.py
@@ -2,7 +2,8 @@
import asyncio
from loguru import logger
from pathlib import Path
-from src.components import RagPipeline, load_config, evaluate, Indexer
+from src.components import RagPipeline, load_config, evaluate
+from filecatcher.components import Indexer
# config_path = Path(__file__) / '.hydra_config'
config = load_config()
diff --git a/manage_collection.py b/manage_collection.py
index d030e3b..897cf90 100755
--- a/manage_collection.py
+++ b/manage_collection.py
@@ -47,12 +47,12 @@ async def main():
if args.folder:
- from src.components import Indexer
+ from filecatcher.components import Indexer
collection = config.vectordb["collection_name"]
logger.warning(f"Data will be upserted to the collection {collection}")
- indexer = Indexer(config, logger)
+ indexer = Indexer(config = config, logger = logger)
start = time.time()
await indexer.add_files2vdb(path=args.folder)
diff --git a/pyproject.toml b/pyproject.toml
index b35ed30..3de3a1c 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -17,6 +17,7 @@ langchain-community = "^0.3.7"
langchain-openai = "^0.2.8"
ragatouille = "^0.0.8.post4"
whisperx = {git = "https://github.com/federicotorrielli/BetterWhisperX", branch="main"}
+filecatcher = {git = "https://github.com/OpenLLM-France/fileCatcher.git"}
pyannote-audio = "3.1.1"
ctranslate2 = "4.4.0"
qdrant-client = "^1.12.1"
diff --git a/src/components/chunker.py b/src/components/chunker.py
deleted file mode 100644
index a3962c8..0000000
--- a/src/components/chunker.py
+++ /dev/null
@@ -1,299 +0,0 @@
-import re
-import asyncio
-from .llm import LLM
-from typing import Optional
-from abc import ABCMeta, abstractmethod, ABC
-from langchain_openai import ChatOpenAI
-from langchain_core.documents.base import Document
-from langchain_core.prompts import ChatPromptTemplate
-from langchain_core.output_parsers import StrOutputParser
-from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceEmbeddings
-from langchain.callbacks import StdOutCallbackHandler
-from loguru import logger
-from langchain_core.runnables import RunnableLambda
-from omegaconf import OmegaConf
-class ABCChunker(ABC):
- @abstractmethod
- def __init__(self, *args, **kwargs) -> None:
- pass
- @abstractmethod
- def split_document(self, doc: Document):
- pass
-# templtate = """
-# {origin}
-# This is the chunk we want to situate within the document named `{source}`.
-# The document’s name itself may contain relevant information (such as "employees CV," "tutorials," etc.) that can help contextualize the content.
-# {chunk}
-# Please provide a brief, succinct context to situate this chunk within the document, specifically to improve search retrieval of this chunk.
-# Respond only with the concise context in the same language as the provided document and chunk.
-# """
-templtate = """
-Title: {source}
-# The document title may contain key metadata (e.g., "cv", "videos", "client proposals").
-First page of the document: {first_page}
-Previous chunk: {prev_chunk}
-Provide a concise, one-sentence context that situates the ** within the **, integrating relevant information from:
-1. **Title** (e.g., type, or category information encoded in the filename).
-2. **First page of the document**.
-3. **Previous chunk**.
-4. **Current chunk content**.
-**Response Format:**
-- Only provide a single, concise contextual sentence for the **.
-- Write the response in the **same language** as the current chunk to enhance retrieval quality.
-- Do not include any additional text or explanation.
-class ChunkContextualizer:
- def __init__(self, contextual_retrieval: bool = False, llm: Optional[ChatOpenAI] = None):
- self.contextual_retrieval = contextual_retrieval
- self.context_generator = None
- if self.contextual_retrieval:
- assert isinstance(llm, ChatOpenAI), f'The `llm` should be of type `ChatOpenAI` is contextual_retrieval is `True`'
- prompt = ChatPromptTemplate.from_template(
- template=templtate
- )
- self.context_generator = (prompt | llm | StrOutputParser())
- def contextualize(self, prev_chunks: list[Document], b_chunks: list[Document], pages: list[str], source: str) -> list[str]:
- if not self.contextual_retrieval:
- return [chunk.page_content for chunk in b_chunks]
- try:
- b_contexts = self.context_generator\
- .with_retry(
- retry_if_exception_type=(Exception,),
- wait_exponential_jitter=False,
- stop_after_attempt=3
- )\
- .batch([
- {
- "first_page": pages[0],
- 'prev_chunk': prev_chunk.page_content,
- 'chunk': chunk.page_content,
- 'source': source
- } for prev_chunk, chunk in zip(prev_chunks, b_chunks)
- ])
- s = "chunk's context: {chunk_context}\n\n=> chunk: {chunk}"
- return [s.format(chunk=chunk.page_content, chunk_context=chunk_context)
- for chunk, chunk_context in zip(b_chunks, b_contexts)]
- except Exception as e:
- logger.warning(f"An error occurred with document `{source}`: {e}")
- return [chunk.page_content for chunk in b_chunks]
-class RecursiveSplitter(ABCChunker):
- def __init__(
- self,
- chunk_size: int=200,
- chunk_overlap: int=20,
- contextual_retrieval: bool=True,
- llm: Optional[ChatOpenAI]=None,
- **args
- ):
- super().__init__(contextual_retrieval=contextual_retrieval, llm=llm)
- from langchain.text_splitter import RecursiveCharacterTextSplitter
- self.splitter = RecursiveCharacterTextSplitter(
- chunk_size=chunk_size,
- chunk_overlap=chunk_overlap,
- **args
- )
- self.contextualizer = ChunkContextualizer(contextual_retrieval=contextual_retrieval, llm=llm)
- def split_document(self, doc: Document):
- text = ''
- page_idx = []
- source = doc.metadata["source"]
- page_sep = doc.metadata["page_sep"]
- pages: list[str] = doc.page_content.split(sep=page_sep)
- start_index = 0
- # TODO: We can apply apply this function 'split_text' once.
- for page_num, p in enumerate(pages, start=1):
- text += ' ' + p
- c = ' '.join(
- self.splitter.split_text(text)
- )
- end_index = len(c) - 1
- page_idx.append(
- {
- "start_idx": start_index,
- "end_idx": end_index,
- "page": page_num
- }
- )
- start_index = end_index
- # split
- full_text_preprocessed = ' '.join(
- self.splitter.split_text(text)
- )
- # chunking the full text
- chunks = self.splitter.create_documents([text])
- i = 0
- batch_size = 4
- filtered_chunks = []
- chunks = [Document(page_content='')] + chunks
- for j in range(1, len(chunks), batch_size):
- b_chunks = chunks[j:j+batch_size]
- prev_chunks = chunks[j-1:j+batch_size-1]
- b_chunks_w_context = self.contextualizer.contextualize(
- prev_chunks=prev_chunks, b_chunks=b_chunks,
- pages=pages, source=source
- )
- for chunk, b_chunk_w_context in zip(b_chunks, b_chunks_w_context):
- start_idx = full_text_preprocessed.find(chunk.page_content)
- while not (page_idx[i]["start_idx"] <= start_idx <= page_idx[i]["end_idx"]) and i < len(page_idx)-1:
- i += 1
- if len(chunk.page_content.strip()) > 1:
- chunk.page_content = b_chunk_w_context
- chunk.metadata = {
- "page": page_idx[i]["page"],
- "source": source,
- }
- filtered_chunks.append(chunk)
- return filtered_chunks
-class SemanticSplitter(ABCChunker):
- def __init__(self,
- min_chunk_size: int = 1000,
- embeddings = None,
- breakpoint_threshold_amount=85,
- contextual_retrieval: bool=False, llm: Optional[ChatOpenAI]=None,
- **args
- ) -> None:
- from langchain_experimental.text_splitter import SemanticChunker
- self.splitter = SemanticChunker(
- embeddings=embeddings,
- buffer_size=1,
- breakpoint_threshold_type='percentile',
- breakpoint_threshold_amount=breakpoint_threshold_amount,
- min_chunk_size=min_chunk_size,
- add_start_index=True,
- **args
- )
- self.contextualizer = ChunkContextualizer(contextual_retrieval=contextual_retrieval, llm=llm)
- def split_document(self, doc: Document):
- text = ''
- page_idx = []
- source = doc.metadata["source"]
- page_sep = doc.metadata["page_sep"]
- pages = doc.page_content.split(sep=page_sep)
- start_index = 0
- for page_num, page_txt in enumerate(pages, start=1):
- text += ' ' + page_txt
- c = ' '.join(
- re.split(self.splitter.sentence_split_regex, text)
- )
- end_index = len(c) - 1
- page_idx.append(
- {
- "start_idx": start_index,
- "end_idx": end_index,
- "page": page_num
- }
- )
- start_index = end_index
- chunks = self.splitter.create_documents(
- [' '.join(re.split(self.splitter.sentence_split_regex, text))]
- )
- i = 0
- filtered_chunks = []
- batch_size = 4
- chunks = [Document(page_content='')] + chunks
- for j in range(1, len(chunks), batch_size):
- b_chunks = chunks[j:j+batch_size]
- prev_chunks = chunks[j-1:j+batch_size-1]
- b_chunks_w_context = self.contextualizer.contextualize(
- prev_chunks=prev_chunks, b_chunks=b_chunks,
- pages=pages, source=source
- )
- for chunk, b_chunk_w_context in zip(b_chunks, b_chunks_w_context):
- start_idx = chunk.metadata["start_index"]
- while not (page_idx[i]["start_idx"] <= start_idx <= page_idx[i]["end_idx"]) and i < len(page_idx)-1:
- i += 1
- if len(chunk.page_content.strip()) > 1:
- chunk.page_content = b_chunk_w_context
- chunk.metadata = {
- "page": page_idx[i]["page"],
- "source": source,
- }
- filtered_chunks.append(chunk)
- return filtered_chunks
-class ChunkerFactory:
- 'recursive_splitter': RecursiveSplitter,
- 'semantic_splitter': SemanticSplitter,
- }
- @staticmethod
- def create_chunker(config:OmegaConf, embedder: Optional[HuggingFaceBgeEmbeddings | HuggingFaceEmbeddings]=None) -> ABCChunker:
- # Extract parameters
- chunker_params = OmegaConf.to_container(config.chunker, resolve=True)
- name = chunker_params.pop("name")
- # Initialize and return the chunker
- chunker_class: ABCChunker = ChunkerFactory.CHUNKERS.get(name)
- if not chunker_class:
- raise ValueError(f"Chunker '{name}' is not recognized.")
- # Add embeddings if semantic splitter is selected
- if name == 'semantic_splitter':
- if embedder is not None:
- chunker_params.update({"embeddings": embedder})
- else:
- raise AttributeError(f"{name} type chunker requires the `embedder` parameter")
- # Include contextual retrieval if specified
- if chunker_params['contextual_retrieval']:
- chunker_params['llm'] = LLM(config, logger=None).client
- return chunker_class(**chunker_params)
\ No newline at end of file
diff --git a/src/components/config.py b/src/components/config.py
index 16715d7..07ec9d5 100644
--- a/src/components/config.py
+++ b/src/components/config.py
@@ -1,19 +1,9 @@
-import os
-from dotenv import load_dotenv, find_dotenv
from omegaconf import OmegaConf
from pathlib import Path
from hydra import initialize, compose
-def load_config(config_path="../../.hydra_config", overrides=None)-> OmegaConf:
- load_dotenv()
- print(overrides)
+def load_config(config_path="../../.hydra_config", overrides=None)-> OmegaConf:
with initialize(config_path=config_path, job_name="config_loader"):
config = compose(config_name="config", overrides=overrides)
config.paths.root_dir = Path(config.paths.root_dir).absolute()
- return config
-# # Example usage
-# if __name__ == "__main__":
-# config = load_config()
-# print(config)
\ No newline at end of file
+ return config
\ No newline at end of file
diff --git a/src/components/embeddings.py b/src/components/embeddings.py
deleted file mode 100644
index 33a0942..0000000
--- a/src/components/embeddings.py
+++ /dev/null
@@ -1,75 +0,0 @@
-from abc import abstractmethod, ABC
-from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceEmbeddings
-import torch
-from omegaconf import OmegaConf
-class ABCEmbedder(ABC):
- """Abstract base class defining the interface for embedder implementations.
- This class serves as a template for creating embedder classes that convert text
- into vector representations using various embedding models.
- """
- @abstractmethod
- def get_embeddings(self):
- """Return the embeddings model instance.
- Returns:
- Any: The embeddings model instance that can generate vector representations.
- """
- pass
-# Dictionary mapping embedding model types to their corresponding classes
- "huggingface_bge": HuggingFaceBgeEmbeddings,
- "huggingface": HuggingFaceEmbeddings
-class HFEmbedder(ABCEmbedder):
- """Factory class for loading and managing HuggingFace embedding models.
- This class handles the initialization and configuration of various HuggingFace
- embedding models, supporting both BGE and standard HuggingFace embeddings.
- Args:
- embedder_config (OmegaConf): Configuration object containing model parameters
- device (str, optional): Device to run the model on ('cuda' or 'cpu').
- Defaults to None, which auto-selects based on CUDA availability.
- Raises:
- ValueError: If the specified model type is not supported or if initialization fails.
- """
- def __init__(self, embedder_config: OmegaConf, device=None) -> None:
- # Extract model type from config
- model_type = embedder_config["type"]
- if model_type in HG_EMBEDDER_TYPE:
- # Auto-select device if none specified
- if device is None:
- device = 'cuda' if torch.cuda.is_available() else 'cpu'
- try:
- model_name = embedder_config["model_name"]
- self.embedding = HG_EMBEDDER_TYPE[model_type](
- model_name=model_name,
- model_kwargs={"device": device, 'trust_remote_code': True},
- encode_kwargs={"normalize_embeddings": True}
- )
- except Exception as e:
- raise ValueError(f"An error occurred during model initialization: {e}")
- else:
- raise ValueError(f"{model_type} is not a supported `model_type`")
- def get_embeddings(self) -> HuggingFaceBgeEmbeddings:
- """Retrieve the initialized embedding model.
- Returns:
- HuggingFaceBgeEmbeddings: The configured embedding model instance
- ready for generating embeddings.
- """
- return self.embedding
\ No newline at end of file
diff --git a/src/components/evaluation.py b/src/components/evaluation.py
index 44933aa..1674646 100644
--- a/src/components/evaluation.py
+++ b/src/components/evaluation.py
@@ -1,5 +1,5 @@
from pathlib import Path
-from .utils import load_sys_template
+from filecatcher.components import load_sys_template
from langchain_openai import ChatOpenAI
from langchain_core.prompts import (
diff --git a/src/components/llm.py b/src/components/llm.py
deleted file mode 100644
index 58bb314..0000000
--- a/src/components/llm.py
+++ /dev/null
@@ -1,56 +0,0 @@
-from uuid import UUID
-from langchain_core.outputs import ChatGenerationChunk, GenerationChunk, LLMResult
-from langchain_openai import ChatOpenAI
-from langchain_core.prompts import (
- MessagesPlaceholder,
- ChatPromptTemplate
-from langchain_core.messages import AIMessage, HumanMessage, BaseMessage
-from langchain_core.callbacks import AsyncCallbackHandler
-from typing import Any, AsyncIterator
-class LLM:
- def __init__(self, config, logger=None):
- self.logger = logger
- self.client: ChatOpenAI = ChatOpenAI(
- model=config.llm["name"],
- base_url=config.llm["base_url"],
- api_key=config.llm['api_key'],
- timeout=60,
- temperature=config.llm["temperature"],
- max_tokens=config.llm["max_tokens"],
- streaming=True
- )
- def run(self,
- question: str, context: str,
- chat_history: list[AIMessage | HumanMessage],
- sys_pmpt_tmpl: str
- )-> AsyncIterator[BaseMessage]:
- """This method runs the LLM given the user's input (`question`), `chat_history`, and the system prompt template (`sys_prompt_tmpl`)
- Args:
- question (str): The input from the user; not necessarily a question.
- context (str): It's the retrieved documents (formatted into a string)
- chat_history (list[AIMessage | HumanMessage]): The Chat history.
- sys_prompt_tmpl (str): The system prompt
- Returns:
- AsyncIterator[BaseMessage]
- """
- qa_prompt = ChatPromptTemplate.from_messages(
- [
- ("system", sys_pmpt_tmpl),
- MessagesPlaceholder("chat_history"),
- ("human", "{input}")
- ]
- )
- rag_chain = qa_prompt | self.client.with_retry(stop_after_attempt=3)
- input_ = {
- "input": question,
- "context": context,
- "chat_history": chat_history,
- }
- return rag_chain.astream(input_)
\ No newline at end of file
diff --git a/src/components/loader.py b/src/components/loader.py
deleted file mode 100644
index 188cea4..0000000
--- a/src/components/loader.py
+++ /dev/null
@@ -1,358 +0,0 @@
-from abc import abstractmethod, ABC
-import asyncio
-import os
-from collections import defaultdict
-import gc
-import random
-import whisperx
-from .utils import SingletonMeta
-from pydub import AudioSegment
-import torch
-from pathlib import Path
-from typing import AsyncGenerator
-from langchain_core.documents.base import Document
-from langchain_community.document_loaders import (
- PyMuPDFLoader,
- UnstructuredODTLoader,
- UnstructuredWordDocumentLoader,
- UnstructuredPowerPointLoader
-from langchain_community.document_loaders import UnstructuredWordDocumentLoader
-from langchain_community.document_loaders import TextLoader
-from langchain_community.document_loaders import UnstructuredHTMLLoader
-import pymupdf4llm
-from loguru import logger
-from aiopath import AsyncPath
-from typing import Dict
-# from langchain_community.document_loaders import UnstructuredXMLLoader, PyPDFLoader
-# from langchain_community.document_loaders.csv_loader import CSVLoader
-# from langchain_community.document_loaders.text import TextLoader
-# from langchain_community.document_loaders import UnstructuredHTMLLoader
-class BaseLoader(ABC):
- @abstractmethod
- async def aload_document(self, file_path):
- pass
-class Custompymupdf4llm(BaseLoader):
- def __init__(self, page_sep: str='[PAGE_SEP]', **loader_args) -> None:
- self.loader_args = loader_args
- self.page_sep = page_sep
- async def aload_document(self, file_path):
- pages = pymupdf4llm.to_markdown(
- file_path,
- write_images=False,
- page_chunks=True,
- **self.loader_args
- )
- page_content = f'{self.page_sep}'.join([p['text'] for p in pages])
- return Document(
- page_content=page_content,
- metadata={
- 'source': str(file_path),
- 'page_sep': self.page_sep
- }
- )
-class AudioTranscriber(metaclass=SingletonMeta):
- def __init__(self, device='cpu', compute_type='float32', model_name='large-v2', language='fr'):
- self.model = whisperx.load_model(
- model_name,
- device=device, language=language,
- compute_type=compute_type
- )
-class VideoAudioLoader(BaseLoader):
- def __init__(self, page_sep: str='[PAGE_SEP]', batch_size=4):
- self.batch_size = batch_size
- self.page_sep = page_sep
- self.formats = [".wav", '.mp3', ".mp4"]
- self.transcriber = AudioTranscriber(
- device='cuda' if torch.cuda.is_available() else 'cpu'
- )
- def free_memory(self):
- gc.collect()
- torch.cuda.empty_cache()
- async def aload_document(self, file_path):
- path = Path(file_path)
- if path.suffix not in self.formats:
- logger.warning(
- f'This audio/video file ({path.suffix}) is not supported.'
- f'The format should be {self.formats}'
- )
- return None
- # load it in wave format
- if path.suffix == '.wav':
- audio_path_wav = path
- else:
- sound = AudioSegment.from_file(file=path, format=path.suffix[1:])
- audio_path_wav = path.with_suffix('.wav')
- # Export to wav format
- sound.export(
- audio_path_wav, format="wav",
- parameters=["-ar", "16000", "-ac", "1", "-ab", "32k"]
- )
- audio = whisperx.load_audio(audio_path_wav)
- if path.suffix != '.wav':
- os.remove(audio_path_wav)
- transcription_l = self.transcriber.model.transcribe(audio, batch_size=self.batch_size)
- content = ' '.join([tr['text'] for tr in transcription_l['segments']])
- self.free_memory()
- return Document(
- page_content=f"{content}{self.page_sep}",
- metadata={
- 'source':str(file_path),
- 'page_sep': self.page_sep
- }
- )
-class CustomPPTLoader(BaseLoader):
- doc_loaders = {
- '.pptx': UnstructuredPowerPointLoader,
- '.ppt': UnstructuredPowerPointLoader
- }
- cat2md = {
- 'Title': '#',
- 'NarrativeText': '>',
- 'ListItem': '-',
- 'UncategorizedText': '',
- 'EmailAddress': '',
- 'FigureCaption': '*'
- }
- def __init__(self, page_sep: str='[PAGE_SEP]', **loader_args) -> None:
- self.loader_args = loader_args
- self.page_sep = page_sep
- def group_by_hierarchy(self, docs: list[Document]):
- """Group related elements within each page by parent_id and category_depth"""
- grouped_elements = defaultdict(list)
- # Sort by depth to ensure proper hierarchy
- sorted_docs = sorted(docs, key=lambda x: x.metadata.get('category_depth', 0))
- for doc in sorted_docs:
- parent_id = doc.metadata.get('parent_id', None)
- if parent_id:
- grouped_elements[parent_id].append(doc)
- else:
- grouped_elements[doc.metadata.get('element_id')].append(doc)
- return grouped_elements
- async def aload_document(self, file_path):
- path = Path(file_path)
- cls_loader = CustomPPTLoader.doc_loaders.get(path.suffix, None)
- if cls_loader is None:
- raise Exception(f"This loader only supports {CustomPPTLoader.doc_loaders.keys()} format")
- loader = cls_loader(
- file_path=str(file_path),
- mode='elements',
- **self.loader_args
- )
- elements = await loader.aload()
- # Step 1: Group elements by page_number
- grouped_by_page = defaultdict(list)
- for doc in elements:
- page_number = doc.metadata.get('page_number')
- grouped_by_page[page_number].append(doc)
- # Final structure: dictionary of pages, each containing grouped elements
- final_grouped_structure = {}
- for page, docs in grouped_by_page.items():
- final_grouped_structure[page] = self.group_by_hierarchy(docs)
- content = ''
- for page, grouped_elements in final_grouped_structure.items():
- t = ''
- for v in grouped_elements.values():
- for elem in v:
- meta = elem.metadata
- cat_md = CustomPPTLoader.cat2md.get(meta.get("category"), '')
- t += f"{cat_md} {elem.page_content}\n"
- if page is not None and content:
- t = f'{self.page_sep}' + t
- content += t
- return Document(
- page_content=content,
- metadata={
- 'source': str(file_path),
- 'page_sep': self.page_sep
- }
- )
-class CustomPyMuPDFLoader(BaseLoader):
- def __init__(self, page_sep: str='[PAGE_SEP]', **loader_args) -> None:
- self.loader_args = loader_args
- self.page_sep = page_sep
- async def aload_document(self, file_path):
- loader = PyMuPDFLoader(
- file_path=Path(file_path),
- **self.loader_args
- )
- pages = await loader.aload()
- return Document(
- page_content=f'{self.page_sep}'.join([p.page_content for p in pages]),
- metadata={
- 'source': str(file_path),
- 'page_sep': self.page_sep
- }
- )
-class CustomTextLoader(BaseLoader):
- def __init__(self, page_sep: str='[PAGE_SEP]', **loader_args) -> None:
- self.loader_args = loader_args
- self.page_sep = page_sep
- async def aload_document(self, file_path):
- path = Path(file_path)
- loader = TextLoader(file_path=str(path), autodetect_encoding=True)
- doc = await loader.aload()
- return Document(
- page_content=f'{self.page_sep}'.join([p.page_content for p in doc]),
- metadata={
- 'source': str(file_path),
- 'page_sep': self.page_sep
- }
- )
-class CustomHTMLLoader(BaseLoader):
- def __init__(self, page_sep: str='[PAGE_SEP]', **loader_args) -> None:
- self.loader_args = loader_args
- self.page_sep = page_sep
- async def aload_document(self, file_path):
- path = Path(file_path)
- loader = UnstructuredHTMLLoader(file_path=str(path), autodetect_encoding=True)
- doc = await loader.aload()
- return Document(
- page_content=f'{self.page_sep}'.join([p.page_content for p in doc]),
- metadata={
- 'source': str(file_path),
- 'page_sep': self.page_sep
- }
- )
-class CustomDocLoader(BaseLoader):
- doc_loaders = {
- ".docx": UnstructuredWordDocumentLoader,
- '.doc': UnstructuredWordDocumentLoader,
- '.odt': UnstructuredODTLoader
- }
- def __init__(self, page_sep: str='[PAGE_SEP]', **loader_args) -> None:
- self.loader_args = loader_args
- self.page_sep = page_sep
- async def aload_document(self, file_path):
- path = Path(file_path)
- cls_loader = CustomDocLoader.doc_loaders.get(path.suffix, None)
- if cls_loader is None:
- raise ValueError(f"This loader only supports {CustomDocLoader.doc_loaders.keys()} format")
- loader = cls_loader(
- file_path=str(file_path),
- mode='single',
- **self.loader_args
- )
- pages = await loader.aload()
- content = ' '.join([p.page_content for p in pages])
- return Document(
- page_content=f"{content}{self.page_sep}",
- metadata={
- 'source': str(file_path),
- 'page_sep': self.page_sep
- }
- )
-class DocSerializer:
- def __init__(self, root_dir=None) -> None:
- self.root_dir = root_dir
- async def serialize_documents(self, path: str | Path, recursive=True) -> AsyncGenerator[Document, None]:
- p = AsyncPath(path)
- if await p.is_file():
- type = p.suffix
- pattern = f"**/*{type}"
- loader_cls: BaseLoader = LOADERS.get(p.suffix)
- logger.info(f'Loading {type} files.')
- doc: Document = await loader_cls().aload_document(file_path=path)
- yield doc
- is_dir = await p.is_dir()
- if is_dir:
- for type, loader_cls in LOADERS.items(): # TODO Rendre ceci async: Priority 0
- pattern = f"**/*{type}"
- logger.info(f'Loading {type} files.')
- files = get_files(path, pattern, recursive)
- async for file in files:
- loader = loader_cls()
- relative_file_path = Path(file).relative_to(self.root_dir)
- doc: Document = await loader.aload_document(file_path=relative_file_path)
- print(f"==> Serialized: {str(file)}")
- yield doc
-async def get_files(path, pattern, recursive) -> AsyncGenerator:
- p = AsyncPath(path)
- async for file in (p.rglob(pattern) if recursive else p.glob(pattern)):
- yield file
-# TODO create a Meta class that aggregates registery of supported documents from each child class
-LOADERS: Dict[str, BaseLoader] = {
- '.pdf': CustomPyMuPDFLoader,
- '.docx': CustomDocLoader,
- '.doc': CustomDocLoader,
- '.odt': CustomDocLoader,
- '.mp4': VideoAudioLoader,
- '.pptx': CustomPPTLoader,
- '.txt': CustomTextLoader,
- #'.html': CustomHTMLLoader
-# if __name__ == "__main__":
-# async def main():
-# loader = DocSerializer()
-# dir_path = "../../data/" # Replace with your actual directory path
-# docs = loader.serialize_documents(dir_path)
-# async for d in docs:
-# pass
-# asyncio.run(main())
diff --git a/src/components/pipeline.py b/src/components/pipeline.py
index 3b11e27..93f22c6 100644
--- a/src/components/pipeline.py
+++ b/src/components/pipeline.py
@@ -2,23 +2,12 @@
import sys
import torch
-from .chunker import ABCChunker, ChunkerFactory
from langchain_core.documents.base import Document
-from .llm import LLM
-from .utils import format_context, load_sys_template
from .reranker import Reranker
from .retriever import ABCRetriever, RetrieverFactory
-from .vectordb import ConnectorFactory
-from .embeddings import HFEmbedder
-from omegaconf import OmegaConf
-from .loader import DocSerializer
from loguru import logger
-from typing import AsyncGenerator
from langchain_core.output_parsers import StrOutputParser
-from langchain_core.prompts import (
- MessagesPlaceholder,
- ChatPromptTemplate
+from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import (
@@ -26,31 +15,7 @@
from pathlib import Path
from collections import deque
from .grader import Grader
-class Indexer:
- """This class bridges static files with the vector store database.
- """
- def __init__(self, config: OmegaConf, logger, device=None) -> None:
- embedder = HFEmbedder(embedder_config=config.embedder, device=device)
- self.serializer = DocSerializer(root_dir=config.paths.root_dir)
- self.chunker: ABCChunker = ChunkerFactory.create_chunker(config, embedder=embedder.get_embeddings())
- self.vectordb = ConnectorFactory.create_vdb(config, logger=logger, embeddings=embedder.get_embeddings())
- self.logger = logger
- self.logger.info("Indexer initialized...")
- async def add_files2vdb(self, path):
- """Add a files to the vector database in async mode"""
- try:
- doc_generator: AsyncGenerator[Document, None] = self.serializer.serialize_documents(path, recursive=True)
- await self.vectordb.async_add_documents(
- doc_generator=doc_generator,
- chunker=self.chunker,
- document_batch_size=4
- )
- self.logger.info(f"Documents from {path} added.")
- except Exception as e:
- raise Exception(f"An exception as occured: {e}")
+from filecatcher.components import Indexer, LLM, format_context, load_sys_template
class RagPipeline:
@@ -58,7 +23,7 @@ def __init__(self, config, device="cpu") -> None:
self.config = config
# print(self.config)
self.logger = self.set_logger(config)
- self.indexer = Indexer(config, self.logger, device=device)
+ self.indexer = Indexer(config = config, logger = self.logger, device=device)
self.reranker = None
if config.reranker["model_name"]:
diff --git a/src/components/retriever.py b/src/components/retriever.py
index a878ef2..8b6f69c 100644
--- a/src/components/retriever.py
+++ b/src/components/retriever.py
@@ -1,17 +1,12 @@
# Import necessary modules and classes
from abc import abstractmethod, ABC
-import asyncio
from pathlib import Path
-from .llm import LLM
-from .vectordb import QdrantDB, ABCVectorDB
-from loguru import logger
from langchain_core.prompts import ChatPromptTemplate
-from .utils import load_sys_template
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
-from .llm import LLM
from langchain_core.documents.base import Document
from omegaconf import OmegaConf
+from filecatcher.components import LLM, QdrantDB, ABCVectorDB, load_sys_template
CRITERIAS = ["similarity"]
diff --git a/src/components/utils.py b/src/components/utils.py
deleted file mode 100644
index 848093a..0000000
--- a/src/components/utils.py
+++ /dev/null
@@ -1,56 +0,0 @@
-from pathlib import Path
-from langchain_core.documents.base import Document
-class SingletonMeta(type):
- _instances = {}
- def __call__(cls, *args, **kwargs):
- if cls not in cls._instances:
- # print("1st creation")
- instance = super().__call__(*args, **kwargs)
- cls._instances[cls] = instance
- # else:
- # print("Same one")
- return cls._instances[cls]
-def load_sys_template(file_path: Path) -> tuple[str, str]:
- with open(file_path, mode="r") as f:
- sys_msg = f.read()
- return sys_msg
-def format_context(docs: list[Document]) -> str:
- """Build context string from list of documents."""
- if not docs:
- return 'No document found from the database', []
- sources = []
- context = "Extracted documents:\n"
- for i, doc in enumerate(docs, start=1):
- doc_id = f"[doc_{i}]"
- source = doc.metadata["source"]
- page = doc.metadata["page"]
- document = f"""
- document id: {doc_id}
- content: \n{doc.page_content.strip()}\n
- """
- # document = f"""\n{doc.page_content.strip()}\n\n"""
- # Source: {source} (Page: {page})
- context += document
- context += "=" * 40 + "\n\n"
- sources.append(
- {
- "doc_id": doc_id,
- 'source': source,
- 'page': page,
- 'content': doc.page_content
- }
- )
- return context, sources
\ No newline at end of file
diff --git a/src/components/vectordb.py b/src/components/vectordb.py
deleted file mode 100644
index 61fdeb0..0000000
--- a/src/components/vectordb.py
+++ /dev/null
@@ -1,225 +0,0 @@
-from abc import abstractmethod, ABC
-import asyncio
-from typing import Union
-from qdrant_client import QdrantClient, models
-from langchain_community.embeddings import HuggingFaceBgeEmbeddings, HuggingFaceEmbeddings
-from langchain_core.documents.base import Document
-from langchain_qdrant import QdrantVectorStore, FastEmbedSparse, RetrievalMode
-from .chunker import ABCChunker
-# https://python-client.qdrant.tech/qdrant_client.qdrant_client
-class ABCVectorDB(ABC):
- """
- Abstract base class for a Vector Database.
- This class defines the interface for a vector database connector.
- """
- @abstractmethod
- async def async_add_documents(self, index_name, chunks, embeddings):
- pass
- @abstractmethod
- async def async_search(self, query: str, top_k: int = 5) -> list[Document]:
- pass
- @abstractmethod
- async def async_multy_query_search(self, queries: list[str], top_k_per_query: int = 5) -> list[Document]:
- pass
-class QdrantDB(ABCVectorDB):
- """
- Concrete class for a Qdrant Vector Database.
- This class implements the **`BaseVectorDd`** interface for a Qdrant database.
- """
- def __init__(
- self,
- host, port,
- embeddings: HuggingFaceBgeEmbeddings | HuggingFaceEmbeddings=None,
- collection_name: str = None, logger = None,
- hybrid_mode=True,
- ):
- """
- Initialize Qdrant_Connector.
- Args:
- host (str): The host of the Qdrant server.
- port (int): The port of the Qdrant server.
- collection_name (str): The name of the collection in the Qdrant database.
- embeddings (list): The embeddings.
- """
- self.logger = logger
- self.embeddings: Union[HuggingFaceBgeEmbeddings, HuggingFaceEmbeddings] = embeddings
- self.port = port
- self.host = host
- self.client = QdrantClient(
- port=port, host=host,
- prefer_grpc=False,
- )
- self.sparse_embeddings = FastEmbedSparse(model_name="Qdrant/bm25") if hybrid_mode else None
- self.retrieval_mode = RetrievalMode.HYBRID if hybrid_mode else RetrievalMode.DENSE
- print("RETRIEVE MODE ==>", self.retrieval_mode)
- # Initialize collection-related attributes
- self._collection_name = None
- self.vector_store = None
- # Set the initial collection name (if provided)
- if collection_name:
- self.collection_name = collection_name
- @property
- def collection_name(self):
- return self._collection_name
- @collection_name.setter
- def collection_name(self, name: str):
- if not name:
- raise ValueError("Collection name cannot be empty.")
- if self.client.collection_exists(collection_name=name):
- self.vector_store = QdrantVectorStore(
- client=self.client,
- collection_name=name,
- embedding=self.embeddings,
- sparse_embedding=self.sparse_embeddings,
- retrieval_mode=self.retrieval_mode,
- )
- self.logger.warning(f"The Collection named `{name}` loaded.")
- else:
- self.vector_store = QdrantVectorStore.construct_instance(
- embedding=self.embeddings,
- sparse_embedding=self.sparse_embeddings,
- collection_name=name,
- client_options={'port': self.port, 'host':self.host},
- retrieval_mode=self.retrieval_mode,
- )
- self.logger.info(f"As the collection `{name}` is non-existant, it's created.")
- async def async_search(self, query: str, top_k: int = 5, similarity_threshold: int=0.80) -> list[Document]:
- docs_scores = await self.vector_store.asimilarity_search_with_relevance_scores(query=query, k=top_k, score_threshold=similarity_threshold)
- docs = [doc for doc, score in docs_scores]
- return docs
- async def async_multy_query_search(self, queries: list[str], top_k_per_query: int = 5, similarity_threshold: int=0.80) -> list[Document]:
- # Gather all search tasks concurrently
- search_tasks = [self.async_search(query=query, top_k=top_k_per_query, similarity_threshold=similarity_threshold) for query in queries]
- retrieved_results = await asyncio.gather(*search_tasks)
- s = sum(retrieved_results, [])
- retrieved_chunks = {}
- # Process the retrieved documents
- for retrieved in retrieved_results:
- if retrieved:
- for document in retrieved:
- retrieved_chunks[document.metadata["_id"]] = document
- return list(retrieved_chunks.values())
- async def async_add_documents(self,
- doc_generator,
- chunker: ABCChunker,
- document_batch_size: int=6,
- max_concurrent_gpu_ops: int=5,
- max_queued_batches: int=2
- ) -> None:
- """
- Asynchronously process documents through a GPU-based chunker using a producer-consumer pattern.
- This implementation maintains high GPU utilization by preparing batches ahead of time while
- the current batch is being processed. It uses a queue system to manage document batches and
- controls GPU memory usage through semaphores.
- Args:
- doc_generator: An async iterator yielding documents to process
- chunker (BaseChunker): The chunker instance that will split documents using GPU or CPU
- document_batch_size (int): Number of documents to process in each batch. Default: 6
- max_concurrent_gpu_ops (int): Maximum number of concurrent GPU operations. Default: 5
- max_queued_batches (int): Number of batches to prepare ahead in queue. Default: 2
- """
- gpu_semaphore = asyncio.Semaphore(max_concurrent_gpu_ops) # Only allow max_concurrent_gpu_ops GPU operation at a time
- batch_queue = asyncio.Queue(maxsize=max_queued_batches)
- async def chunk(doc):
- async with gpu_semaphore:
- chunks = await asyncio.to_thread(chunker.split_document, doc) # uses GPU
- self.logger.info(f"Processed doc: {doc.metadata['source']}")
- return chunks
- async def producer():
- current_batch = []
- try:
- async for doc in doc_generator:
- current_batch.append(doc)
- if len(current_batch) == document_batch_size:
- await batch_queue.put(current_batch)
- current_batch = []
- # Put remaining documents
- if current_batch:
- await batch_queue.put(current_batch)
- finally:
- # Send one None for each consumer
- for _ in range(max_queued_batches):
- await batch_queue.put(None)
- async def consumer(consumer_id=0):
- while True:
- batch = await batch_queue.get()
- if batch is None: # End signal
- batch_queue.task_done()
- self.logger.info(f"Consumer {consumer_id} ended")
- break
- tasks = [asyncio.create_task(chunk(doc)) for doc in batch]
- chunks_list = await asyncio.gather(*tasks, return_exceptions=True)
- all_chunks = sum(chunks_list, [])
- if all_chunks:
- await self.vector_store.aadd_documents(all_chunks)
- self.logger.info("INSERTED")
- batch_queue.task_done()
- # Run producer and consumer concurrently
- producer_task = asyncio.create_task(producer())
- consumer_tasks = [asyncio.create_task(consumer(consumer_id=i)) for i in range(max_queued_batches)]
- # Wait for producer to complete and queue to be empty
- await producer_task
- await batch_queue.join()
- # Wait for all consumers to complete
- await asyncio.gather(*consumer_tasks)
-class ConnectorFactory:
- "qdrant": QdrantDB
- }
- @staticmethod
- def create_vdb(config, logger, embeddings) -> ABCVectorDB:
- # Extract parameters
- dbconfig = dict(config.vectordb)
- name = dbconfig.pop("connector_name")
- vdb_cls = ConnectorFactory.CONNECTORS.get(name)
- if not vdb_cls:
- raise ValueError(f"VectorDB '{name}' is not supported.")
- dbconfig['embeddings'] = embeddings
- dbconfig['logger'] = logger
- return vdb_cls(**dbconfig)
\ No newline at end of file