Skip to content

Commit

Permalink
Added WriteToVDBParamContract to the module
Browse files Browse the repository at this point in the history
  • Loading branch information
bsuryadevara committed Jan 16, 2024
1 parent be80767 commit 18c574f
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 14 deletions.
1 change: 1 addition & 0 deletions examples/llm/common/content_extractor_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def process_content(docs: list[Document], file_meta: FileMeta, chunk_size: int,
for chunk in split_text:
processed_data.append({
'title': file_meta.file_name,
'link': 'none',
'source': f"{file_meta.file_type}:{file_meta.file_path}",
'summary': 'none',
'content': chunk
Expand Down
62 changes: 48 additions & 14 deletions morpheus/modules/output/write_to_vector_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,46 @@
from morpheus.utils.module_utils import register_module
from morpheus.utils.module_ids import WRITE_TO_VECTOR_DB
from morpheus.utils.module_ids import MORPHEUS_MODULE_NAMESPACE
from pydantic import BaseModel
from pydantic import Field
from pydantic import ValidationError
from pydantic import validator

logger = logging.getLogger(__name__)


@dataclass
class AccumulationStats:
msg_count: int
last_insert_time: float
data: list[cudf.DataFrame]


class WriteToVDBParamContract(BaseModel):
embedding_column_name: str = "embedding"
recreate: bool = False
service: str = Field(default_factory=None)
is_service_serialized: bool = False
resource_name: str = Field(default_factory=None)
resource_kwargs: dict = Field(default_factory=dict)
service_kwargs: dict = Field(default_factory=dict)
batch_size: int = 1024
write_time_interval: float = 3.0

@validator('service', pre=True)
def validate_service(cls, v):
if not v:
raise ValueError("Service must be a service name or a serialized instance of VectorDBService")
return v

@validator('resource_name', pre=True)
def validate_resource_name(cls, v):
if not v:
raise ValueError("Resource name must not be None or Empty.")
return v



@register_module(WRITE_TO_VECTOR_DB, MORPHEUS_MODULE_NAMESPACE)
def _write_to_vector_db(builder: mrc.Builder):
"""
Expand Down Expand Up @@ -71,21 +101,25 @@ def _write_to_vector_db(builder: mrc.Builder):
"""

module_config = builder.get_current_module_config()
embedding_column_name = module_config.get("embedding_column_name", "embedding")
recreate = module_config.get("recreate", False)
service = module_config.get("service", None)
is_service_serialized = module_config.get("is_service_serialized", False)
resource_name = module_config.get("resource_name", None)
resource_kwargs = module_config.get("resource_kwargs", {})
service_kwargs = module_config.get("service_kwargs", {})
batch_size = module_config.get("batch_size", 1024)
write_time_interval = module_config.get("write_time_interval", 3.0)

if not resource_name:
raise ValueError("Resource name must not be None or Empty.")

if not service:
raise ValueError("Service must be a service name or a serialized instance of VectorDBService")
try:
write_to_vdb_config = WriteToVDBParamContract(**module_config)
except ValidationError as e:
# Format the error message for better readability
error_messages = '; '.join([f"{error['loc'][0]}: {error['msg']}" for error in e.errors()])
log_error_message = f"Invalid configuration for write_to_vector_db: {error_messages}"
logger.error(log_error_message)
raise ValueError(log_error_message)

embedding_column_name = write_to_vdb_config.embedding_column_name
recreate = write_to_vdb_config.recreate
service = write_to_vdb_config.service
is_service_serialized = write_to_vdb_config.is_service_serialized
resource_name = write_to_vdb_config.resource_name
resource_kwargs = write_to_vdb_config.resource_kwargs
service_kwargs = write_to_vdb_config.service_kwargs
batch_size = write_to_vdb_config.batch_size
write_time_interval = write_to_vdb_config.write_time_interval

# Check if service is serialized and convert if needed
service: VectorDBService = (
Expand Down

0 comments on commit 18c574f

Please sign in to comment.