Skip to content

Commit

Permalink
Merge pull request #22 from bsuryadevara/bhargav_fea_1453_simplify_ve…
Browse files Browse the repository at this point in the history
…ctordb_example

Create WriteToVectorDB module
  • Loading branch information
drobison00 authored Jan 16, 2024
2 parents b516233 + 18c574f commit a845cf9
Show file tree
Hide file tree
Showing 11 changed files with 358 additions and 134 deletions.
14 changes: 13 additions & 1 deletion examples/llm/common/content_extractor_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int,
for chunk in split_text:
processed_data.append({
'title': file_meta.file_name,
'link': 'none',
'source': f"{file_meta.file_type}:{file_meta.file_path}",
'summary': 'none',
'content': chunk
Expand Down Expand Up @@ -272,6 +273,14 @@ def file_content_extractor(builder: mrc.Builder):
"txt": TextConverter()
}

chunk_params = {
file_type: {
"chunk_size": converters_meta.get(file_type, {}).get("chunk_size", chunk_size),
"chunk_overlap": converters_meta.get(file_type, {}).get("chunk_overlap", chunk_overlap)
}
for file_type in converters.keys()
}

def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta:
data = []
with ThreadPoolExecutor(max_workers=num_threads) as executor:
Expand All @@ -292,7 +301,10 @@ def parse_files(open_files: typing.List[fsspec.core.OpenFile]) -> MessageMeta:
for file_meta, future in zip(files_meta, futures):
docs = future.result()
if docs:
result = process_content(docs, file_meta, chunk_size, chunk_overlap)
file_type_chunk_params = chunk_params[file_meta.file_type]
result = process_content(docs, file_meta,
file_type_chunk_params["chunk_size"],
file_type_chunk_params["chunk_overlap"])
if result:
data.extend(result)

Expand Down
57 changes: 12 additions & 45 deletions examples/llm/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import logging

import pymilvus
from morpheus.service.vdb.milvus_client import DATA_TYPE_MAP
from langchain.embeddings import HuggingFaceEmbeddings

from morpheus.llm.services.nemo_llm_service import NeMoLLMService
Expand Down Expand Up @@ -45,51 +46,17 @@ def build_llm_service(model_name: str, llm_service: str, tokens_to_generate: int
return llm_service.get_client(model_name, **model_kwargs)


def build_milvus_config(embedding_size: int):
milvus_resource_kwargs = {
"index_conf": {
"field_name": "embedding",
"metric_type": "L2",
"index_type": "HNSW",
"params": {
"M": 8,
"efConstruction": 64,
},
},
"schema_conf": {
"enable_dynamic_field": True,
"schema_fields": [
pymilvus.FieldSchema(name="id",
dtype=pymilvus.DataType.INT64,
description="Primary key for the collection",
is_primary=True,
auto_id=True).to_dict(),
pymilvus.FieldSchema(name="title",
dtype=pymilvus.DataType.VARCHAR,
description="Title or heading of the data entry",
max_length=65_535).to_dict(),
pymilvus.FieldSchema(name="source",
dtype=pymilvus.DataType.VARCHAR,
description="Source or origin of the data entry",
max_length=65_535).to_dict(),
pymilvus.FieldSchema(name="summary",
dtype=pymilvus.DataType.VARCHAR,
description="Brief summary or abstract of the data content",
max_length=65_535).to_dict(),
pymilvus.FieldSchema(name="content",
dtype=pymilvus.DataType.VARCHAR,
description="Main content or body of the data entry",
max_length=65_535).to_dict(),
pymilvus.FieldSchema(name="embedding",
dtype=pymilvus.DataType.FLOAT_VECTOR,
description="Embedding vectors representing the data entry",
dim=embedding_size).to_dict(),
],
"description": "Collection schema for diverse data sources"
}
}

return milvus_resource_kwargs
def build_milvus_config(resource_schema_config: dict):

schema_fields = []
for field_data in resource_schema_config["schema_conf"]["schema_fields"]:
field_data["dtype"] = DATA_TYPE_MAP.get(field_data["dtype"])
field_schema = pymilvus.FieldSchema(**field_data)
schema_fields.append(field_schema.to_dict())

resource_schema_config["schema_conf"]["schema_fields"] = schema_fields

return resource_schema_config


def build_milvus_service(embedding_size: int, uri: str = "http://localhost:19530"):
Expand Down
2 changes: 1 addition & 1 deletion examples/llm/vdb_upload/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def process_vdb_sources(pipe: Pipeline, config: Config, vdb_source_config: typin
return vdb_sources


def build_milvus_config(embedding_size: int) -> typing.Dict[str, typing.Any]:
def build_defualt_milvus_config(embedding_size: int) -> typing.Dict[str, typing.Any]:
"""
Builds the configuration for Milvus.
Expand Down
1 change: 0 additions & 1 deletion examples/llm/vdb_upload/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def pipeline(pipeline_config: Config, source_config: typing.List, vdb_config: ty
monitor_2 = pipe.add_stage(
MonitorStage(pipeline_config, description="Inference rate", unit="events", delayed_start=True))

# TODO(Bhargav): Convert WriteToVectorDBStage to module + retain backwards compatibility.
vector_db = pipe.add_stage(WriteToVectorDBStage(pipeline_config, **vdb_config))

monitor_3 = pipe.add_stage(
Expand Down
12 changes: 9 additions & 3 deletions examples/llm/vdb_upload/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@
import yaml

from morpheus.config import Config, PipelineModes
from ..common.utils import build_milvus_config
from .common import build_defualt_milvus_config
from ..common.utils import build_rss_urls
from ..common.utils import build_milvus_config

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -223,7 +224,7 @@ def build_cli_configs(source_type, enable_cache, embedding_size, isolate_embeddi
cli_vdb_conf = {
'embedding_size': embedding_size,
'recreate': True,
'resource_kwargs': build_milvus_config(embedding_size) if (vector_db_service == 'milvus') else None,
'resource_kwargs': build_defualt_milvus_config(embedding_size) if (vector_db_service == 'milvus') else None,
'resource_name': vector_db_resource_name,
'service': vector_db_service,
'uri': vector_db_uri,
Expand Down Expand Up @@ -313,7 +314,12 @@ def build_final_config(vdb_conf_path, cli_source_conf, cli_embeddings_conf, cli_
pipeline_conf = merge_configs(vdb_pipeline_config.get('pipeline', {}), cli_pipeline_conf)
source_conf = vdb_pipeline_config.get('sources', []) + list(cli_source_conf.values())
tokenizer_conf = merge_configs(vdb_pipeline_config.get('tokenizer', {}), cli_tokenizer_conf)
vdb_conf = merge_configs(vdb_pipeline_config.get('vdb', {}), cli_vdb_conf)
vdb_conf = vdb_pipeline_config.get('vdb', {})
resource_schema = vdb_conf.pop("resource_shema", None)

if resource_schema:
vdb_conf["resource_kwargs"] = build_milvus_config(resource_schema)
vdb_conf = merge_configs(vdb_conf, cli_vdb_conf)

# TODO: class labels depends on this, so it should be a pipeline level parameter, not a vdb level parameter
pipeline_conf['embedding_size'] = vdb_conf.get('embedding_size', 384)
Expand Down
40 changes: 39 additions & 1 deletion examples/llm/vdb_upload/vdb_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,46 @@ vdb_pipeline:
model_name: "bert-base-uncased-hash"

vdb:
embedding_size: 384 # Size of the embeddings to store in the vector database
embedding_size: 384
recreate: True # Whether to recreate the resource if it already exists
resource_name: "VDB2" # Identifier for the resource in the vector database
service: "milvus" # Specify the type of vector database
uri: "http://localhost:19530" # URI for connecting to the Vector Database server
resource_schema:
index_conf:
field_name: embedding
metric_type: L2
index_type: HNSW
params:
M: 8
efConstruction: 64

schema_conf:
enable_dynamic_field: true
schema_fields:
- name: id
dtype: INT64
description: Primary key for the collection
is_primary: true
auto_id: true
- name: title
dtype: VARCHAR
description: Title or heading of the data entry
max_length: 65_535
- name: source
dtype: VARCHAR
description: Source or origin of the data entry
max_length: 65_535
- name: summary
dtype: VARCHAR
description: Brief summary or abstract of the data content
max_length: 65_535
- name: content
dtype: VARCHAR
description: Main content or body of the data entry
max_length: 65_535
- name: embedding
dtype: FLOAT_VECTOR
description: Embedding vectors representing the data entry
dim: 384 # Size of the embeddings to store in the vector database
description: Collection schema for diverse data sources
Empty file.
Loading

0 comments on commit a845cf9

Please sign in to comment.