Skip to content

Commit

Permalink
Support parsing json and csv (#32)
Browse files Browse the repository at this point in the history
* Support parsing json and csv

* parallelize requests

* try to fix deletion

* keep document id

* revert delete changes

* remove delete_document

* fix lint
  • Loading branch information
conradocloudera authored Nov 21, 2024
1 parent 5902fa1 commit f5d9350
Show file tree
Hide file tree
Showing 8 changed files with 189 additions and 28 deletions.
65 changes: 46 additions & 19 deletions llm-service/app/ai/indexing/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,21 @@

import logging
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Type
from typing import Dict, Generator, List, Type

from llama_index.core.base.embeddings.base import BaseEmbedding
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.readers.base import BaseReader
from llama_index.core.schema import BaseNode, Document, TextNode
from llama_index.readers.file import DocxReader

from ...services.utils import batch_sequence, flatten_sequence
from ...services.vector_store import VectorStore
from .readers.csv import CSVReader
from .readers.json import JSONReader
from .readers.nop import NopReader
from .readers.pdf import PDFReader

Expand All @@ -59,6 +63,8 @@
".txt": NopReader,
".md": NopReader,
".docx": DocxReader,
".csv": CSVReader,
".json": JSONReader,
}
CHUNKABLE_FILE_EXTENSIONS = {".pdf", ".txt", ".md", ".docx"}

Expand All @@ -81,7 +87,7 @@ def __init__(
self.embedding_model = embedding_model
self.chunks_vector_store = chunks_vector_store

def index_file(self, file_path: Path, file_id: str) -> None:
def index_file(self, file_path: Path, document_id: str) -> None:
logger.debug(f"Indexing file: {file_path}")

file_extension = os.path.splitext(file_path)[1]
Expand All @@ -93,7 +99,7 @@ def index_file(self, file_path: Path, file_id: str) -> None:

logger.debug(f"Parsing file: {file_path}")

documents = self._documents_in_file(reader, file_path, file_id)
documents = self._documents_in_file(reader, file_path, document_id)
if file_extension in CHUNKABLE_FILE_EXTENSIONS:
logger.debug(f"Chunking file: {file_path}")
chunks = [
Expand All @@ -104,34 +110,34 @@ def index_file(self, file_path: Path, file_id: str) -> None:
else:
chunks = documents

texts = [chunk.text for chunk in chunks]
logger.debug(f"Embedding {len(texts)} chunks")
embeddings = self.embedding_model.get_text_embedding_batch(texts)
logger.debug(f"Embedding {len(chunks)} chunks")

for chunk, embedding in zip(chunks, embeddings):
chunk.embedding = embedding
chunks_with_embeddings = flatten_sequence(self._compute_embeddings(chunks))

logger.debug(f"Adding {len(chunks)} chunks to vector store")
chunks_vector_store = self.chunks_vector_store.access_vector_store()
acc = 0
for chunk_batch in batch_sequence(chunks_with_embeddings, 1000):
acc += len(chunk_batch)
logger.debug(f"Adding {acc}/{len(chunks)} chunks to vector store")

# We have to explicitly convert here even though the types are compatible (TextNode inherits from BaseNode)
# because the "add" annotation uses List instead of Sequence. We need to use TextNode explicitly because
# we're capturing "text".
converted_chunks: List[BaseNode] = [chunk for chunk in chunks]
chunks_vector_store.add(converted_chunks)
# We have to explicitly convert here even though the types are compatible (TextNode inherits from BaseNode)
# because the "add" annotation uses List instead of Sequence. We need to use TextNode explicitly because
# we're capturing "text".
converted_chunks: List[BaseNode] = [chunk for chunk in chunk_batch]

chunks_vector_store = self.chunks_vector_store.access_vector_store()
chunks_vector_store.add(converted_chunks)

logger.debug(f"Indexing file: {file_path} completed")

def _documents_in_file(
self, reader: BaseReader, file_path: Path, file_id: str
self, reader: BaseReader, file_path: Path, document_id: str
) -> List[Document]:
documents = reader.load_data(file_path)

for i, document in enumerate(documents):
# Update the document metadata
document.id_ = file_id
document.id_ = document_id
document.metadata["file_name"] = os.path.basename(file_path)
document.metadata["document_id"] = file_id
document.metadata["document_id"] = document_id
document.metadata["document_part_number"] = i
document.metadata["data_source_id"] = self.data_source_id

Expand All @@ -155,3 +161,24 @@ def _chunks_in_document(self, document: Document) -> List[TextNode]:
converted_chunks.append(chunk)

return converted_chunks

def _compute_embeddings(
self, chunks: List[TextNode]
) -> Generator[List[TextNode], None, None]:
batched_chunks = list(batch_sequence(chunks, 100))
batched_texts = [[chunk.text for chunk in batch] for batch in batched_chunks]

with ThreadPoolExecutor(max_workers=20) as executor:
futures = [
executor.submit(
lambda b: (i, self.embedding_model.get_text_embedding_batch(b)),
batch,
)
for i, batch in enumerate(batched_texts)
]
logger.debug(f"Waiting for {len(futures)} futures")
for future in as_completed(futures):
i, batch_embeddings = future.result()
for chunk, embedding in zip(batched_chunks[i], batch_embeddings):
chunk.embedding = embedding
yield batched_chunks[i]
56 changes: 56 additions & 0 deletions llm-service/app/ai/indexing/readers/csv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#
# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP)
# (C) Cloudera, Inc. 2024
# All rights reserved.
#
# Applicable Open Source License: Apache 2.0
#
# NOTE: Cloudera open source products are modular software products
# made up of hundreds of individual components, each of which was
# individually copyrighted. Each Cloudera open source product is a
# collective work under U.S. Copyright Law. Your license to use the
# collective work is as provided in your written agreement with
# Cloudera. Used apart from the collective work, this file is
# licensed for your use pursuant to the open source license
# identified above.
#
# This code is provided to you pursuant a written agreement with
# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
# this code. If you do not have a written agreement with Cloudera nor
# with an authorized and properly licensed third party, you do not
# have any rights to access nor to use this code.
#
# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the
# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
# DATA.
#

import json
from pathlib import Path
from typing import List

import pandas as pd
from llama_index.core.readers.base import BaseReader
from llama_index.core.schema import Document


class CSVReader(BaseReader):
def load_data(self, file_path: Path) -> List[Document]:
# Read the CSV file into a pandas DataFrame
df = pd.read_csv(file_path)
# Convert the dataframe into a list of dictionaries, one per row
rows = df.to_dict(orient="records")
# Convert each dictionary into a Document
documents = [Document(text=json.dumps(row, sort_keys=True)) for row in rows]
return documents
51 changes: 51 additions & 0 deletions llm-service/app/ai/indexing/readers/json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# CLOUDERA APPLIED MACHINE LEARNING PROTOTYPE (AMP)
# (C) Cloudera, Inc. 2024
# All rights reserved.
#
# Applicable Open Source License: Apache 2.0
#
# NOTE: Cloudera open source products are modular software products
# made up of hundreds of individual components, each of which was
# individually copyrighted. Each Cloudera open source product is a
# collective work under U.S. Copyright Law. Your license to use the
# collective work is as provided in your written agreement with
# Cloudera. Used apart from the collective work, this file is
# licensed for your use pursuant to the open source license
# identified above.
#
# This code is provided to you pursuant a written agreement with
# (i) Cloudera, Inc. or (ii) a third-party authorized to distribute
# this code. If you do not have a written agreement with Cloudera nor
# with an authorized and properly licensed third party, you do not
# have any rights to access nor to use this code.
#
# Absent a written agreement with Cloudera, Inc. ("Cloudera") to the
# contrary, A) CLOUDERA PROVIDES THIS CODE TO YOU WITHOUT WARRANTIES OF ANY
# KIND; (B) CLOUDERA DISCLAIMS ANY AND ALL EXPRESS AND IMPLIED
# WARRANTIES WITH RESPECT TO THIS CODE, INCLUDING BUT NOT LIMITED TO
# IMPLIED WARRANTIES OF TITLE, NON-INFRINGEMENT, MERCHANTABILITY AND
# FITNESS FOR A PARTICULAR PURPOSE; (C) CLOUDERA IS NOT LIABLE TO YOU,
# AND WILL NOT DEFEND, INDEMNIFY, NOR HOLD YOU HARMLESS FOR ANY CLAIMS
# ARISING FROM OR RELATED TO THE CODE; AND (D)WITH RESPECT TO YOUR EXERCISE
# OF ANY RIGHTS GRANTED TO YOU FOR THE CODE, CLOUDERA IS NOT LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, PUNITIVE OR
# CONSEQUENTIAL DAMAGES INCLUDING, BUT NOT LIMITED TO, DAMAGES
# RELATED TO LOST REVENUE, LOST PROFITS, LOSS OF INCOME, LOSS OF
# BUSINESS ADVANTAGE OR UNAVAILABILITY, OR LOSS OR CORRUPTION OF
# DATA.
#

import json
from pathlib import Path
from typing import List

from llama_index.core.readers.base import BaseReader
from llama_index.core.schema import Document


class JSONReader(BaseReader):
def load_data(self, file_path: Path) -> List[Document]:
with open(file_path, "r") as f:
content = json.load(f)
return [Document(text=json.dumps(content, sort_keys=True))]
4 changes: 3 additions & 1 deletion llm-service/app/ai/indexing/readers/nop.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,14 @@
# DATA.
#

from pathlib import Path
from typing import List

from llama_index.core.readers.base import BaseReader
from llama_index.core.schema import Document


class NopReader(BaseReader):
def load_data(self, file_path: str) -> List[Document]:
def load_data(self, file_path: Path) -> List[Document]:
with open(file_path, "r") as f:
return [Document(text=f.read())]
27 changes: 25 additions & 2 deletions llm-service/app/services/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@
# ##############################################################################

import re
from typing import List, Tuple

from typing import Generator, List, Sequence, Tuple, TypeVar, Union

# TODO delete this if it's not being used

Expand Down Expand Up @@ -82,3 +81,27 @@ def parse_choice_select_answer_fn(

def get_last_segment(path: str) -> str:
return path.split("/")[-1]


T = TypeVar("T")


def batch_sequence(
sequence: Union[Sequence[T], Generator[T, None, None]], batch_size: int
) -> Generator[List[T], None, None]:
batch = []
for val in sequence:
batch.append(val)
if len(batch) == batch_size:
yield batch
batch = []
if batch:
yield batch


def flatten_sequence(
sequence: Union[Sequence[Sequence[T]], Generator[Sequence[T], None, None]],
) -> Generator[T, None, None]:
for sublist in sequence:
for item in sublist:
yield item
10 changes: 6 additions & 4 deletions llm-service/app/tests/routers/index/test_data_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_create_document(
assert document_id is not None
index = get_vector_store_index(data_source_id)
vectors = index.vector_store.query(
VectorStoreQuery(query_embedding=[0.66] * 1024, doc_ids=[document_id])
VectorStoreQuery(query_embedding=[0.66] * 1024)
)
assert len(vectors.nodes or []) == 1

Expand All @@ -93,7 +93,7 @@ def test_delete_data_source(

index = get_vector_store_index(data_source_id)
vectors = index.vector_store.query(
VectorStoreQuery(query_embedding=[0.66] * 1024, doc_ids=[document_id])
VectorStoreQuery(query_embedding=[0.66] * 1024)
)
assert len(vectors.nodes or []) == 1

Expand Down Expand Up @@ -122,9 +122,11 @@ def test_delete_document(

index = get_vector_store_index(data_source_id)
vectors = index.vector_store.query(
VectorStoreQuery(query_embedding=[0.2] * 1024, doc_ids=[document_id])
VectorStoreQuery(query_embedding=[0.2] * 1024)
)
assert len(vectors.nodes or []) == 1
print(document_id)
print("\n" * 10)

response = client.delete(
f"/data_sources/{data_source_id}/documents/{document_id}"
Expand All @@ -133,7 +135,7 @@ def test_delete_document(

index = get_vector_store_index(data_source_id)
vectors = index.vector_store.query(
VectorStoreQuery(query_embedding=[0.2] * 1024, doc_ids=[document_id])
VectorStoreQuery(query_embedding=[0.2] * 1024)
)
assert len(vectors.nodes or []) == 0

Expand Down
2 changes: 1 addition & 1 deletion llm-service/pdm.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion llm-service/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ description = "Default template for PDM package"
authors = [
{name = "Conrado Silva Miranda", email = "csilvamiranda@cloudera.com"},
]
dependencies = ["llama-index-core==0.10.68", "llama-index-readers-file==0.1.33", "fastapi==0.111.0", "pydantic==2.8.2", "pydantic-settings==2.3.4", "boto3>=1.35.66", "llama-index-embeddings-bedrock==0.2.1", "llama-index-llms-bedrock==0.1.13", "llama-index-llms-openai==0.1.31", "llama-index-llms-mistralai==0.1.20", "llama-index-embeddings-openai==0.1.11", "llama-index-vector-stores-qdrant==0.2.17", "docx2txt>=0.8"]
dependencies = ["llama-index-core==0.10.68", "llama-index-readers-file==0.1.33", "fastapi==0.111.0", "pydantic==2.8.2", "pydantic-settings==2.3.4", "boto3>=1.35.66", "llama-index-embeddings-bedrock==0.2.1", "llama-index-llms-bedrock==0.1.13", "llama-index-llms-openai==0.1.31", "llama-index-llms-mistralai==0.1.20", "llama-index-embeddings-openai==0.1.11", "llama-index-vector-stores-qdrant==0.2.17", "docx2txt>=0.8", "pandas>=2.2.3"]
requires-python = "==3.10.*"
readme = "README.md"
license = {text = "APACHE"}
Expand Down

0 comments on commit f5d9350

Please sign in to comment.