Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support parsing json and csv #32

Merged
merged 8 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 46 additions & 19 deletions llm-service/app/ai/indexing/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,21 @@

import logging
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Type
from typing import Dict, Generator, List, Type

from llama_index.core.base.embeddings.base import BaseEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.readers.base import BaseReader
from llama_index.core.schema import BaseNode, Document, TextNode
from llama_index.readers.file import DocxReader

from ...services.utils import batch_sequence, flatten_sequence
from ...services.vector_store import VectorStore
from .readers.csv import CSVReader
from .readers.json import JSONReader
from .readers.nop import NopReader
from .readers.pdf import PDFReader

Expand All @@ -59,6 +63,8 @@
".txt": NopReader,
".md": NopReader,
".docx": DocxReader,
".csv": CSVReader,
".json": JSONReader,
}
CHUNKABLE_FILE_EXTENSIONS = {".pdf", ".txt", ".md", ".docx"}

Expand All @@ -81,7 +87,7 @@ def __init__(
self.embedding_model = embedding_model
self.chunks_vector_store = chunks_vector_store

def index_file(self, file_path: Path, file_id: str) -> None:
def index_file(self, file_path: Path, document_id: str) -> None:
logger.debug(f"Indexing file: {file_path}")

file_extension = os.path.splitext(file_path)[1]
Expand All @@ -93,7 +99,7 @@ def index_file(self, file_path: Path, file_id: str) -> None:

logger.debug(f"Parsing file: {file_path}")

documents = self._documents_in_file(reader, file_path, file_id)
documents = self._documents_in_file(reader, file_path, document_id)
if file_extension in CHUNKABLE_FILE_EXTENSIONS:
logger.debug(f"Chunking file: {file_path}")
chunks = [
Expand All @@ -104,34 +110,34 @@ def index_file(self, file_path: Path, file_id: str) -> None:
else:
chunks = documents

texts = [chunk.text for chunk in chunks]
logger.debug(f"Embedding {len(texts)} chunks")
embeddings = self.embedding_model.get_text_embedding_batch(texts)
logger.debug(f"Embedding {len(chunks)} chunks")

for chunk, embedding in zip(chunks, embeddings):
chunk.embedding = embedding
chunks_with_embeddings = flatten_sequence(self._compute_embeddings(chunks))

logger.debug(f"Adding {len(chunks)} chunks to vector store")
chunks_vector_store = self.chunks_vector_store.access_vector_store()
acc = 0
for chunk_batch in batch_sequence(chunks_with_embeddings, 1000):
acc += len(chunk_batch)
logger.debug(f"Adding {acc}/{len(chunks)} chunks to vector store")

# We have to explicitly convert here even though the types are compatible (TextNode inherits from BaseNode)
# because the "add" annotation uses List instead of Sequence. We need to use TextNode explicitly because
# we're capturing "text".
converted_chunks: List[BaseNode] = [chunk for chunk in chunks]
chunks_vector_store.add(converted_chunks)
# We have to explicitly convert here even though the types are compatible (TextNode inherits from BaseNode)
# because the "add" annotation uses List instead of Sequence. We need to use TextNode explicitly because
# we're capturing "text".
converted_chunks: List[BaseNode] = [chunk for chunk in chunk_batch]

chunks_vector_store = self.chunks_vector_store.access_vector_store()
chunks_vector_store.add(converted_chunks)

logger.debug(f"Indexing file: {file_path} completed")

def _documents_in_file(
self, reader: BaseReader, file_path: Path, file_id: str
self, reader: BaseReader, file_path: Path, document_id: str
) -> List[Document]:
documents = reader.load_data(file_path)

for i, document in enumerate(documents):
# Update the document metadata
document.id_ = file_id
document.id_ = document_id
document.metadata["file_name"] = os.path.basename(file_path)
document.metadata["document_id"] = file_id
document.metadata["document_id"] = document_id
document.metadata["document_part_number"] = i
document.metadata["data_source_id"] = self.data_source_id

Expand All @@ -155,3 +161,24 @@ def _chunks_in_document(self, document: Document) -> List[TextNode]:
converted_chunks.append(chunk)

return converted_chunks

def _compute_embeddings(
self, chunks: List[TextNode]
) -> Generator[List[TextNode], None, None]:
batched_chunks = list(batch_sequence(chunks, 100))
batched_texts = [[chunk.text for chunk in batch] for batch in batched_chunks]

with ThreadPoolExecutor(max_workers=20) as executor:
futures = [
executor.submit(
lambda b: (i, self.embedding_model.get_text_embedding_batch(b)),
batch,
)
for i, batch in enumerate(batched_texts)
]
logger.debug(f"Waiting for {len(futures)} futures")
for future in as_completed(futures):
i, batch_embeddings = future.result()
for chunk, embedding in zip(batched_chunks[i], batch_embeddings):
chunk.embedding = embedding
yield batched_chunks[i]
56 changes: 56 additions & 0 deletions llm-service/app/ai/indexing/readers/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP)
# (C) Cloudera, Inc. 2024
# All rights reserved.
#
# Applicable Open Source License: Apache 2.0
#
# NOTE: Cloudera open source products are modular software products
# made up of hundreds of individual components, each of which was
# individually copyrighted. Each Cloudera open source product is a
# collective work under U.S. Copyright Law. Your license to use the
# collective work is as provided in your written agreement with
# Cloudera. Used apart from the collective work, this file is
# licensed for your use pursuant to the open source license
# identified above.
#
# This code is provided to you pursuant a written agreement with
# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
# this code. If you do not have a written agreement with Cloudera nor
# with an authorized and properly licensed third party, you do not
# have any rights to access nor to use this code.
#
# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the
# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
# DATA.
#

import json
from pathlib import Path
from typing import List

import pandas as pd
from llama_index.core.readers.base import BaseReader
from llama_index.core.schema import Document


class CSVReader(BaseReader):
def load_data(self, file_path: Path) -> List[Document]:
# Read the CSV file into a pandas DataFrame
df = pd.read_csv(file_path)
# Convert the dataframe into a list of dictionaries, one per row
rows = df.to_dict(orient="records")
# Convert each dictionary into a Document
documents = [Document(text=json.dumps(row, sort_keys=True)) for row in rows]
return documents
51 changes: 51 additions & 0 deletions llm-service/app/ai/indexing/readers/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP)
# (C) Cloudera, Inc. 2024
# All rights reserved.
#
# Applicable Open Source License: Apache 2.0
#
# NOTE: Cloudera open source products are modular software products
# made up of hundreds of individual components, each of which was
# individually copyrighted. Each Cloudera open source product is a
# collective work under U.S. Copyright Law. Your license to use the
# collective work is as provided in your written agreement with
# Cloudera. Used apart from the collective work, this file is
# licensed for your use pursuant to the open source license
# identified above.
#
# This code is provided to you pursuant a written agreement with
# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
# this code. If you do not have a written agreement with Cloudera nor
# with an authorized and properly licensed third party, you do not
# have any rights to access nor to use this code.
#
# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the
# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
# DATA.
#

import json
from pathlib import Path
from typing import List

from llama_index.core.readers.base import BaseReader
from llama_index.core.schema import Document


class JSONReader(BaseReader):
def load_data(self, file_path: Path) -> List[Document]:
with open(file_path, "r") as f:
content = json.load(f)
return [Document(text=json.dumps(content, sort_keys=True))]
4 changes: 3 additions & 1 deletion llm-service/app/ai/indexing/readers/nop.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
# DATA.
#

from pathlib import Path
from typing import List

from llama_index.core.readers.base import BaseReader
from llama_index.core.schema import Document


class NopReader(BaseReader):
def load_data(self, file_path: str) -> List[Document]:
def load_data(self, file_path: Path) -> List[Document]:
with open(file_path, "r") as f:
return [Document(text=f.read())]
27 changes: 25 additions & 2 deletions llm-service/app/services/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
# ##############################################################################

import re
from typing import List, Tuple

from typing import Generator, List, Sequence, Tuple, TypeVar, Union

# TODO delete this if it's not being used

Expand Down Expand Up @@ -82,3 +81,27 @@ def parse_choice_select_answer_fn(

def get_last_segment(path: str) -> str:
return path.split("/")[-1]


T = TypeVar("T")


def batch_sequence(
sequence: Union[Sequence[T], Generator[T, None, None]], batch_size: int
) -> Generator[List[T], None, None]:
batch = []
for val in sequence:
batch.append(val)
if len(batch) == batch_size:
yield batch
batch = []
if batch:
yield batch


def flatten_sequence(
sequence: Union[Sequence[Sequence[T]], Generator[Sequence[T], None, None]],
) -> Generator[T, None, None]:
for sublist in sequence:
for item in sublist:
yield item
10 changes: 6 additions & 4 deletions llm-service/app/tests/routers/index/test_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_create_document(
assert document_id is not None
index = get_vector_store_index(data_source_id)
vectors = index.vector_store.query(
VectorStoreQuery(query_embedding=[0.66] * 1024, doc_ids=[document_id])
VectorStoreQuery(query_embedding=[0.66] * 1024)
)
assert len(vectors.nodes or []) == 1

Expand All @@ -93,7 +93,7 @@ def test_delete_data_source(

index = get_vector_store_index(data_source_id)
vectors = index.vector_store.query(
VectorStoreQuery(query_embedding=[0.66] * 1024, doc_ids=[document_id])
VectorStoreQuery(query_embedding=[0.66] * 1024)
)
assert len(vectors.nodes or []) == 1

Expand Down Expand Up @@ -122,9 +122,11 @@ def test_delete_document(

index = get_vector_store_index(data_source_id)
vectors = index.vector_store.query(
VectorStoreQuery(query_embedding=[0.2] * 1024, doc_ids=[document_id])
VectorStoreQuery(query_embedding=[0.2] * 1024)
)
assert len(vectors.nodes or []) == 1
print(document_id)
print("\n" * 10)

response = client.delete(
f"/data_sources/{data_source_id}/documents/{document_id}"
Expand All @@ -133,7 +135,7 @@ def test_delete_document(

index = get_vector_store_index(data_source_id)
vectors = index.vector_store.query(
VectorStoreQuery(query_embedding=[0.2] * 1024, doc_ids=[document_id])
VectorStoreQuery(query_embedding=[0.2] * 1024)
)
assert len(vectors.nodes or []) == 0

Expand Down
2 changes: 1 addition & 1 deletion llm-service/pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion llm-service/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "Default template for PDM package"
authors = [
{name = "Conrado Silva Miranda", email = "csilvamiranda@cloudera.com"},
]
dependencies = ["llama-index-core==0.10.68", "llama-index-readers-file==0.1.33", "fastapi==0.111.0", "pydantic==2.8.2", "pydantic-settings==2.3.4", "boto3>=1.35.66", "llama-index-embeddings-bedrock==0.2.1", "llama-index-llms-bedrock==0.1.13", "llama-index-llms-openai==0.1.31", "llama-index-llms-mistralai==0.1.20", "llama-index-embeddings-openai==0.1.11", "llama-index-vector-stores-qdrant==0.2.17", "docx2txt>=0.8"]
dependencies = ["llama-index-core==0.10.68", "llama-index-readers-file==0.1.33", "fastapi==0.111.0", "pydantic==2.8.2", "pydantic-settings==2.3.4", "boto3>=1.35.66", "llama-index-embeddings-bedrock==0.2.1", "llama-index-llms-bedrock==0.1.13", "llama-index-llms-openai==0.1.31", "llama-index-llms-mistralai==0.1.20", "llama-index-embeddings-openai==0.1.11", "llama-index-vector-stores-qdrant==0.2.17", "docx2txt>=0.8", "pandas>=2.2.3"]
requires-python = "==3.10.*"
readme = "README.md"
license = {text = "APACHE"}
Expand Down
Loading