diff --git a/morpheus/_lib/src/llm/utils.cpp b/morpheus/_lib/src/llm/utils.cpp index 8addc5c4a8..e7be60bdad 100644 --- a/morpheus/_lib/src/llm/utils.cpp +++ b/morpheus/_lib/src/llm/utils.cpp @@ -112,9 +112,16 @@ input_mappings_t process_input_names(user_input_mappings_t user_inputs, const st if (found_star_input_name != found_star_node_name) { - throw std::invalid_argument( - "LLMNode::add_node() called with a placeholder input name and node name that " - "do not match"); + if (found_star_input_name) + { + throw std::invalid_argument( + "LLMNode::add_node() called with a placeholder external name but no placeholder internal name"); + } + else + { + throw std::invalid_argument( + "LLMNode::add_node() called with a placeholder internal name but no placeholder external name"); + } } else if (found_star_input_name && found_star_node_name) { diff --git a/morpheus/llm/services/llm_service.py b/morpheus/llm/services/llm_service.py index 07c777c547..2d2a3ae333 100644 --- a/morpheus/llm/services/llm_service.py +++ b/morpheus/llm/services/llm_service.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import importlib import logging import typing from abc import ABC @@ -133,3 +134,56 @@ def get_client(self, *, model_name: str, **model_kwargs) -> LLMClient: Additional keyword arguments to pass to the model. """ pass + + @typing.overload + @staticmethod + def create(service_type: typing.Literal["nemo"], *service_args, + **service_kwargs) -> "morpheus.llm.services.nemo_llm_service.NeMoLLMService": + pass + + @typing.overload + @staticmethod + def create(service_type: typing.Literal["openai"], *service_args, + **service_kwargs) -> "morpheus.llm.services.openai_chat_service.OpenAIChatService": + pass + + @typing.overload + @staticmethod + def create(service_type: str, *service_args, **service_kwargs) -> "LLMService": + pass + + @staticmethod + def create(service_type: str | typing.Literal["nemo"] | typing.Literal["openai"], *service_args, **service_kwargs): + """ + Returns a service for interacting with LLM models. + Parameters + ---------- + service_type : str + The type of the service to create + service_kwargs : dict[str, typing.Any] + Additional keyword arguments to pass to the service. + """ + if service_type.lower() == 'openai': + llm_or_chat = "chat" + else: + llm_or_chat = "llm" + + module_name = f"morpheus.llm.services.{service_type.lower()}_{llm_or_chat}_service" + module = importlib.import_module(module_name) + + # Get all of the classes in the module to find the correct service class + mod_classes = dict([(name, cls) for name, cls in module.__dict__.items() if isinstance(cls, type)]) + + class_name_lower = f"{service_type}{llm_or_chat}Service".lower() + + # Find case-insensitive match for the class name + matching_classes = [name for name in mod_classes if name.lower() == class_name_lower] + + assert len(matching_classes) == 1, f"Expected to find exactly one class with name {class_name_lower} in module {module_name}, but found {matching_classes}" + + # Create the class + class_ = getattr(module, matching_classes[0]) + + instance = class_(*service_args, **service_kwargs) + + return instance diff --git a/morpheus/llm/services/nemo_llm_service.py b/morpheus/llm/services/nemo_llm_service.py index 81fb4f9b97..e00ed13434 100644 --- a/morpheus/llm/services/nemo_llm_service.py +++ b/morpheus/llm/services/nemo_llm_service.py @@ -190,7 +190,7 @@ class NeMoLLMService(LLMService): A service for interacting with NeMo LLM models, this class should be used to create a client for a specific model. """ - def __init__(self, *, api_key: str = None, org_id: str = None, retry_count=5) -> None: + def __init__(self, *, api_key: str = None, org_id: str = None, base_url: str = None, retry_count=5) -> None: """ Creates a service for interacting with NeMo LLM models. @@ -203,6 +203,9 @@ def __init__(self, *, api_key: str = None, org_id: str = None, retry_count=5) -> The organization ID for the LLM service, by default None. If `None` the organization ID will be read from the `NGC_ORG_ID` environment variable. This value is only required if the account associated with the `api_key` is a member of multiple NGC organizations., by default None + base_url : str, optional + The api host url, by default None. If `None` the url will be read from the `NGC_API_BASE` environment + variable. If neither are present an error will be raised., by default None retry_count : int, optional The number of times to retry a request before raising an exception, by default 5 @@ -214,11 +217,12 @@ def __init__(self, *, api_key: str = None, org_id: str = None, retry_count=5) -> super().__init__() api_key = api_key if api_key is not None else os.environ.get("NGC_API_KEY", None) org_id = org_id if org_id is not None else os.environ.get("NGC_ORG_ID", None) + base_url = base_url if base_url is not None else os.environ.get("NGC_API_BASE", None) self._retry_count = retry_count self._conn = nemollm.NemoLLM( - api_host=os.environ.get("NGC_API_BASE", None), + api_host=base_url, # The client must configure the authentication and authorization parameters # in accordance with the API server security policy. # Configure Bearer authorization diff --git a/morpheus/llm/services/nvfoundation_llm_service.py b/morpheus/llm/services/nvfoundation_llm_service.py new file mode 100644 index 0000000000..9be5130bf9 --- /dev/null +++ b/morpheus/llm/services/nvfoundation_llm_service.py @@ -0,0 +1,177 @@ +# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import logging +import os +import typing + +from morpheus.llm.services.llm_service import LLMClient +from morpheus.llm.services.llm_service import LLMService + +logger = logging.getLogger(__name__) + +IMPORT_EXCEPTION = None +IMPORT_ERROR_MESSAGE = ( + "The `langchain-nvidia-ai-endpoints` package was not found. Install it and other additional dependencies by running the following command:\n" + "`conda env update --solver=libmamba -n morpheus " + "--file morpheus/conda/environments/dev_cuda-121_arch-x86_64.yaml --prune`") + +try: + from langchain_core.prompt_values import StringPromptValue + from langchain_nvidia_ai_endpoints import ChatNVIDIA + from langchain_nvidia_ai_endpoints._common import NVEModel +except ImportError as import_exc: + IMPORT_EXCEPTION = import_exc + + +class NVFoundationLLMClient(LLMClient): + """ + Client for interacting with a specific model in Nemo. This class should be constructed with the + `NeMoLLMService.get_client` method. + Parameters + ---------- + parent : NeMoLLMService + The parent service for this client. + model_name : str + The name of the model to interact with. + model_kwargs : dict[str, typing.Any] + Additional keyword arguments to pass to the model when generating text. + """ + + def __init__(self, parent: "NVFoundationLLMService", *, model_name: str, **model_kwargs) -> None: + if IMPORT_EXCEPTION is not None: + raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION + + super().__init__() + + assert parent is not None, "Parent service cannot be None." + + self._parent = parent + self._model_name = model_name + self._model_kwargs = model_kwargs + self._prompt_key = "prompt" + + self._client = ChatNVIDIA(client=self._parent._nve_client, model=model_name, **model_kwargs) + + def get_input_names(self) -> list[str]: + schema = self._client.get_input_schema() + + return [self._prompt_key] + + def generate(self, **input_dict) -> str: + """ + Issue a request to generate a response based on a given prompt. + Parameters + ---------- + input_dict : dict + Input containing prompt data. + """ + return self.generate_batch({self._prompt_key: [input_dict[self._prompt_key]]})[0] + + async def generate_async(self, **input_dict) -> str: + """ + Issue an asynchronous request to generate a response based on a given prompt. + Parameters + ---------- + input_dict : dict + Input containing prompt data. + """ + + inputs = {self._prompt_key: [input_dict[self._prompt_key]]} + + input_dict.pop(self._prompt_key) + + return (await self.generate_batch_async(inputs=inputs, **input_dict))[0] + + def generate_batch(self, inputs: dict[str, list], **kwargs) -> list[str]: + """ + Issue a request to generate a list of responses based on a list of prompts. + Parameters + ---------- + inputs : dict + Inputs containing prompt data. + """ + prompts = [StringPromptValue(text=p) for p in inputs[self._prompt_key]] + + responses = self._client.generate_prompt(prompts=prompts, **self._model_kwargs) # type: ignore + + return [g[0].text for g in responses.generations] + + async def generate_batch_async(self, inputs: dict[str, list], **kwargs) -> list[str]: + """ + Issue an asynchronous request to generate a list of responses based on a list of prompts. + Parameters + ---------- + inputs : dict + Inputs containing prompt data. + """ + + prompts = [StringPromptValue(text=p) for p in inputs[self._prompt_key]] + + final_kwargs = {**self._model_kwargs, **kwargs} + + responses = await self._client.agenerate_prompt(prompts=prompts, **final_kwargs) # type: ignore + + return [g[0].text for g in responses.generations] + + +class NVFoundationLLMService(LLMService): + """ + A service for interacting with NeMo LLM models, this class should be used to create a client for a specific model. + Parameters + ---------- + api_key : str, optional + The API key for the LLM service, by default None. If `None` the API key will be read from the `NGC_API_KEY` + environment variable. If neither are present an error will be raised. + org_id : str, optional + The organization ID for the LLM service, by default None. If `None` the organization ID will be read from the + `NGC_ORG_ID` environment variable. This value is only required if the account associated with the `api_key` is + a member of multiple NGC organizations. + base_url : str, optional + The api host url, by default None. If `None` the url will be read from the `NVIDIA_API_BASE` environment + variable. If neither are present `https://api.nvcf.nvidia.com/v2` will be used., by default None + """ + + def __init__(self, *, api_key: str = None, base_url: str = None) -> None: + if IMPORT_EXCEPTION is not None: + raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION + + super().__init__() + + self._api_key = api_key + if base_url is None: + self._base_url = os.getenv('NVIDIA_API_BASE', 'https://api.nvcf.nvidia.com/v2') + else: + self._base_url = base_url + + self._nve_client = NVEModel( + nvidia_api_key=self._api_key, + fetch_url_format=f"{self._base_url}/nvcf/pexec/status/", + call_invoke_base=f"{self._base_url}/nvcf/pexec/functions", + func_list_format=f"{self._base_url}/nvcf/functions", + ) # type: ignore + + def get_client(self, *, model_name: str, **model_kwargs) -> NVFoundationLLMClient: + """ + Returns a client for interacting with a specific model. This method is the preferred way to create a client. + Parameters + ---------- + model_name : str + The name of the model to create a client for. + model_kwargs : dict[str, typing.Any] + Additional keyword arguments to pass to the model when generating text. + """ + + return NVFoundationLLMClient(self, model_name=model_name, **model_kwargs) diff --git a/morpheus/llm/services/openai_chat_service.py b/morpheus/llm/services/openai_chat_service.py index 3179d2474f..82bdd467ab 100644 --- a/morpheus/llm/services/openai_chat_service.py +++ b/morpheus/llm/services/openai_chat_service.py @@ -113,8 +113,8 @@ def __init__(self, self._model_kwargs = copy.deepcopy(model_kwargs) # Create the client objects for both sync and async - self._client = openai.OpenAI(max_retries=max_retries) - self._client_async = openai.AsyncOpenAI(max_retries=max_retries) + self._client = openai.OpenAI(api_key=parent._api_key, base_url=parent._base_url, max_retries=max_retries) + self._client_async = openai.AsyncOpenAI(api_key=parent._api_key, base_url=parent._base_url, max_retries=max_retries) def get_input_names(self) -> list[str]: input_names = [self._prompt_key] @@ -316,12 +316,18 @@ class OpenAIChatService(LLMService): A service for interacting with OpenAI Chat models, this class should be used to create clients. """ - def __init__(self, *, default_model_kwargs: dict = None) -> None: + def __init__(self, *, api_key: str = None, base_url: str = None, default_model_kwargs: dict = None) -> None: """ Creates a service for interacting with OpenAI Chat models, this class should be used to create clients. Parameters ---------- + api_key : str, optional + The API key for the LLM service, by default None. If `None` the API key will be read from the + `OPENAI_API_KEY` environment variable. If neither are present an error will be raised. + base_url : str, optional + The api host url, by default None. If `None` the url will be read from the `OPENAI_API_BASE` environment + variable. If neither are present the OpenAI default will be used., by default None default_model_kwargs : dict, optional Default arguments to use when creating a client via the `get_client` function. Any argument specified here will automatically be used when calling `get_client`. Arguments specified in the `get_client` function will @@ -338,6 +344,9 @@ def __init__(self, *, default_model_kwargs: dict = None) -> None: super().__init__() + self._api_key = api_key + self._base_url = base_url + self._default_model_kwargs = default_model_kwargs or {} self._logger = logging.getLogger(f"{__package__}.{OpenAIChatService.__name__}") diff --git a/morpheus/llm/services/utils/__init__.py b/morpheus/llm/services/utils/__init__.py new file mode 100644 index 0000000000..e8b752e957 --- /dev/null +++ b/morpheus/llm/services/utils/__init__.py @@ -0,0 +1,13 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/morpheus/llm/services/utils/langchain_llm_client_wrapper.py b/morpheus/llm/services/utils/langchain_llm_client_wrapper.py new file mode 100644 index 0000000000..1215ab35b8 --- /dev/null +++ b/morpheus/llm/services/utils/langchain_llm_client_wrapper.py @@ -0,0 +1,61 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import typing + +from morpheus.llm.services.llm_service import LLMClient + +IMPORT_EXCEPTION = None +IMPORT_ERROR_MESSAGE = ("LangchainLLMClientWrapper require the langchain package to be installed. " + "Install it by running the following command:\n" + "`conda env update --solver=libmamba -n morpheus " + "--file morpheus/conda/environments/examples_cuda-121_arch-x86_64.yaml --prune`") + +try: + from langchain_core.callbacks import AsyncCallbackManagerForLLMRun + from langchain_core.callbacks import CallbackManagerForLLMRun + from langchain_core.language_models.llms import LLM +except ImportError as import_exc: + IMPORT_EXCEPTION = import_exc + + +class LangchainLLMClientWrapper(LLM): + + client: LLMClient + + @property + def _llm_type(self) -> str: + """Return type of llm.""" + return "morpheus" + + def _call( + self, + prompt: str, + stop: typing.Optional[list[str]] = None, + run_manager: typing.Optional[CallbackManagerForLLMRun] = None, + **kwargs: typing.Any, + ) -> str: + """Run the LLM on the given prompt and input.""" + + return self.client.generate(prompt=prompt, stop=stop) + + async def _acall( + self, + prompt: str, + stop: typing.Optional[list[str]] = None, + run_manager: typing.Optional[AsyncCallbackManagerForLLMRun] = None, + **kwargs: typing.Any, + ) -> str: + """Run the LLM on the given prompt and input.""" + return await self.client.generate_async(prompt=prompt, stop=stop) diff --git a/morpheus/service/vdb/faiss_vdb_service.py b/morpheus/service/vdb/faiss_vdb_service.py new file mode 100644 index 0000000000..81f63aef5b --- /dev/null +++ b/morpheus/service/vdb/faiss_vdb_service.py @@ -0,0 +1,765 @@ +# Copyright (c) 2023-2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import copy +import json +import logging +import threading +import time +import typing +from functools import wraps + +import pandas as pd + +import cudf + +from morpheus.service.vdb.vector_db_service import VectorDBResourceService +from morpheus.service.vdb.vector_db_service import VectorDBService + +logger = logging.getLogger(__name__) + +IMPORT_EXCEPTION = None +IMPORT_ERROR_MESSAGE = "MilvusVectorDBResourceService requires the milvus and pymilvus packages to be installed." + +try: + from langchain.vectorstores.faiss import FAISS +except ImportError as import_exc: + IMPORT_EXCEPTION = import_exc + + +class FaissVectorDBResourceService(VectorDBResourceService): + """ + Represents a service for managing resources in a Milvus Vector Database. + + Parameters + ---------- + name : str + Name of the resource. + client : MilvusClient + An instance of the MilvusClient for interaction with the Milvus Vector Database. + """ + + def __init__(self, parent: "FaissVectorDBService", *, name: str) -> None: + if IMPORT_EXCEPTION is not None: + raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION + + super().__init__() + + self._parent = parent + self._name = name + + self._index = FAISS.load_local(folder_path=self._parent._local_dir, + embeddings=self._parent._embeddings, + index_name=self._name, + allow_dangerous_deserialization=True) + + def insert(self, data: list[list] | list[dict], **kwargs: dict[str, typing.Any]) -> dict: + """ + Insert data into the vector database. + + Parameters + ---------- + data : list[list] | list[dict] + Data to be inserted into the collection. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + raise NotImplementedError("Insert operation is not supported in FAISS") + + def insert_dataframe(self, df: typing.Union[cudf.DataFrame, pd.DataFrame], **kwargs: dict[str, typing.Any]) -> dict: + """ + Insert a dataframe entires into the vector database. + + Parameters + ---------- + df : typing.Union[cudf.DataFrame, pd.DataFrame] + Dataframe to be inserted into the collection. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + raise NotImplementedError("Insert operation is not supported in FAISS") + + def describe(self, **kwargs: dict[str, typing.Any]) -> dict: + """ + Provides a description of the collection. + + Parameters + ---------- + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict + Returns response content as a dictionary. + """ + raise NotImplementedError("Describe operation is not supported in FAISS") + + def query(self, query: str, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Query data in a collection in the Milvus vector database. + + This method performs a search operation in the specified collection/partition in the Milvus vector database. + + Parameters + ---------- + query : str, optional + The search query, which can be a filter expression, by default None. + **kwargs : dict + Additional keyword arguments for the search operation. + + Returns + ------- + typing.Any + The search result, which can vary depending on the query and options. + + Raises + ------ + RuntimeError + If an error occurs during the search operation. + If query argument is `None` and `data` keyword argument doesn't exist. + If `data` keyword arguement is `None`. + """ + raise NotImplementedError("Query operation is not supported in FAISS") + + async def similarity_search(self, + embeddings: list[list[float]], + k: int = 4, + **kwargs: dict[str, typing.Any]) -> list[list[dict]]: + """ + Perform a similarity search within the collection. + + Parameters + ---------- + embeddings : list[list[float]] + Embeddings for which to perform the similarity search. + k : int, optional + The number of nearest neighbors to return, by default 4. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + list[dict] + Returns a list of dictionaries representing the results of the similarity search. + """ + + async def single_search(single_embedding): + docs = await self._index.asimilarity_search_by_vector(embedding=single_embedding, k=k) + + return [d.dict() for d in docs] + + return list(await asyncio.gather(*[single_search(embedding) for embedding in embeddings])) + + def update(self, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Update data in the collection. + + Parameters + ---------- + data : list[typing.Any] + Data to be updated in the collection. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to upsert operation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the updated operation stats. + """ + raise NotImplementedError("Update operation is not supported in FAISS") + + def delete_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Delete vectors by keys from the collection. + + Parameters + ---------- + keys : int | str | list + Primary keys to delete vectors. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + typing.Any + Returns result of the given keys that are deleted from the collection. + """ + raise NotImplementedError("Delete by keys operation is not supported in FAISS") + + def delete(self, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Delete vectors from the collection using expressions. + + Parameters + ---------- + expr : str + Delete expression. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the given keys that are deleted from the collection. + """ + raise NotImplementedError("Delete operation is not supported in FAISS") + + def retrieve_by_keys(self, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]: + """ + Retrieve the inserted vectors using their primary keys. + + Parameters + ---------- + keys : int | str | list + Primary keys to get vectors for. Depending on pk_field type it can be int or str + or a list of either. + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the retrieval operation. + + Returns + ------- + list[typing.Any] + Returns result rows of the given keys from the collection. + """ + raise NotImplementedError("Retrieve by keys operation is not supported in FAISS") + + def count(self, **kwargs: dict[str, typing.Any]) -> int: + """ + Returns number of rows/entities. + + Parameters + ---------- + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the count operation. + + Returns + ------- + int + Returns number of entities in the collection. + """ + raise NotImplementedError("Count operation is not supported in FAISS") + + def drop(self, **kwargs: dict[str, typing.Any]) -> None: + """ + Drop a collection, index, or partition in the Milvus vector database. + + This function allows you to drop a collection. + + Parameters + ---------- + **kwargs : dict + Additional keyword arguments for specifying the type and partition name (if applicable). + """ + raise NotImplementedError("Drop operation is not supported in FAISS") + + +class FaissVectorDBService(VectorDBService): + """ + Service class for Milvus Vector Database implementation. This class provides functions for interacting + with a Milvus vector database. + + Parameters + ---------- + host : str + The hostname or IP address of the Milvus server. + port : str + The port number for connecting to the Milvus server. + alias : str, optional + Alias for the Milvus connection, by default "default". + **kwargs : dict + Additional keyword arguments specific to the Milvus connection configuration. + """ + + _collection_locks = {} + _cleanup_interval = 600 # 10mins + _last_cleanup_time = time.time() + + def __init__(self, local_dir: str, embeddings, **kwargs: dict[str, typing.Any]): + + if IMPORT_EXCEPTION is not None: + raise ImportError(IMPORT_ERROR_MESSAGE) from IMPORT_EXCEPTION + + self._local_dir = local_dir + self._embeddings = embeddings + + def load_resource(self, name: str = "index", **kwargs: dict[str, typing.Any]) -> FaissVectorDBResourceService: + + return FaissVectorDBResourceService(self, name=name, **kwargs) + + def has_store_object(self, name: str) -> bool: + """ + Check if a collection exists in the Milvus vector database. + + Parameters + ---------- + name : str + Name of the collection to check. + + Returns + ------- + bool + True if the collection exists, False otherwise. + """ + return self._client.has_collection(collection_name=name) + + def list_store_objects(self, **kwargs: dict[str, typing.Any]) -> list[str]: + """ + List the names of all collections in the Milvus vector database. + + Returns + ------- + list[str] + A list of collection names. + """ + return self._client.list_collections(**kwargs) + + def _create_schema_field(self, field_conf: dict) -> "pymilvus.FieldSchema": + + field_schema = pymilvus.FieldSchema.construct_from_dict(field_conf) + + return field_schema + + def create(self, name: str, overwrite: bool = False, **kwargs: dict[str, typing.Any]): + """ + Create a collection in the Milvus vector database with the specified name and configuration. This method + creates a new collection in the Milvus vector database with the provided name and configuration options. + If the collection already exists, it can be overwritten if the `overwrite` parameter is set to True. + + Parameters + ---------- + name : str + Name of the collection to be created. + overwrite : bool, optional + If True, the collection will be overwritten if it already exists, by default False. + **kwargs : dict + Additional keyword arguments containing collection configuration. + + Raises + ------ + ValueError + If the provided schema fields configuration is empty. + """ + logger.debug("Creating collection: %s, overwrite=%s, kwargs=%s", name, overwrite, kwargs) + + # Preserve original configuration. + collection_conf = copy.deepcopy(kwargs) + + auto_id = collection_conf.get("auto_id", False) + index_conf = collection_conf.get("index_conf", None) + partition_conf = collection_conf.get("partition_conf", None) + + schema_conf = collection_conf.get("schema_conf") + schema_fields_conf = schema_conf.pop("schema_fields") + + if not self.has_store_object(name) or overwrite: + if overwrite and self.has_store_object(name): + self.drop(name) + + if len(schema_fields_conf) == 0: + raise ValueError("Cannot create collection as provided empty schema_fields configuration") + + schema_fields = [FieldSchemaEncoder.from_dict(field_conf) for field_conf in schema_fields_conf] + + schema = pymilvus.CollectionSchema(fields=schema_fields, **schema_conf) + + self._client.create_collection_with_schema(collection_name=name, + schema=schema, + index_params=index_conf, + auto_id=auto_id, + shards_num=collection_conf.get("shards", 2), + consistency_level=collection_conf.get( + "consistency_level", "Strong")) + + if partition_conf: + timeout = partition_conf.get("timeout", 1.0) + # Iterate over each partition configuration + for part in partition_conf["partitions"]: + self._client.create_partition(collection_name=name, partition_name=part["name"], timeout=timeout) + + def create_from_dataframe(self, + name: str, + df: typing.Union[cudf.DataFrame, pd.DataFrame], + overwrite: bool = False, + **kwargs: dict[str, typing.Any]) -> None: + """ + Create collections in the vector database. + + Parameters + ---------- + name : str + Name of the collection. + df : Union[cudf.DataFrame, pd.DataFrame] + The dataframe to create the collection from. + overwrite : bool, optional + Whether to overwrite the collection if it already exists. Default is False. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + """ + + fields = self._build_schema_conf(df=df) + + create_kwargs = { + "schema_conf": { + "description": "Auto generated schema from DataFrame in Morpheus", + "schema_fields": fields, + } + } + + if (kwargs.get("index_field", None) is not None): + # Check to make sure the column name exists in the fields + create_kwargs["index_conf"] = { + "field_name": kwargs.get("index_field"), # Default index type + "metric_type": "L2", + "index_type": "HNSW", + "params": { + "M": 8, + "efConstruction": 64, + }, + } + + self.create(name=name, overwrite=overwrite, **create_kwargs) + + def insert(self, name: str, data: list[list] | list[dict], **kwargs: dict[str, + typing.Any]) -> dict[str, typing.Any]: + """ + Insert a collection specific data in the Milvus vector database. + + Parameters + ---------- + name : str + Name of the collection to be inserted. + data : list[list] | list[dict] + Data to be inserted in the collection. + **kwargs : dict[str, typing.Any] + Additional keyword arguments containing collection configuration. + + Returns + ------- + dict + Returns response content as a dictionary. + + Raises + ------ + RuntimeError + If the collection not exists exists. + """ + + resource = self.load_resource(name) + return resource.insert(data, **kwargs) + + def insert_dataframe(self, + name: str, + df: typing.Union[cudf.DataFrame, pd.DataFrame], + **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Converts dataframe to rows and insert to a collection in the Milvus vector database. + + Parameters + ---------- + name : str + Name of the collection to be inserted. + df : typing.Union[cudf.DataFrame, pd.DataFrame] + Dataframe to be inserted in the collection. + **kwargs : dict[str, typing.Any] + Additional keyword arguments containing collection configuration. + + Returns + ------- + dict + Returns response content as a dictionary. + + Raises + ------ + RuntimeError + If the collection not exists exists. + """ + resource = self.load_resource(name) + + return resource.insert_dataframe(df=df, **kwargs) + + def query(self, name: str, query: str = None, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Query data in a collection in the Milvus vector database. + + This method performs a search operation in the specified collection/partition in the Milvus vector database. + + Parameters + ---------- + name : str + Name of the collection to search within. + query : str + The search query, which can be a filter expression. + **kwargs : dict + Additional keyword arguments for the search operation. + + Returns + ------- + typing.Any + The search result, which can vary depending on the query and options. + """ + + resource = self.load_resource(name) + + return resource.query(query, **kwargs) + + async def similarity_search(self, name: str, **kwargs: dict[str, typing.Any]) -> list[dict]: + """ + Perform a similarity search within the collection. + + Parameters + ---------- + name : str + Name of the collection. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + list[dict] + Returns a list of dictionaries representing the results of the similarity search. + """ + + resource = self.load_resource(name) + + return resource.similarity_search(**kwargs) + + def update(self, name: str, data: list[typing.Any], **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Update data in the vector database. + + Parameters + ---------- + name : str + Name of the collection. + data : list[typing.Any] + Data to be updated in the collection. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to upsert operation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the updated operation stats. + """ + + if not isinstance(data, list): + raise RuntimeError("Data is not of type list.") + + resource = self.load_resource(name) + + return resource.update(data=data, **kwargs) + + def delete_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> typing.Any: + """ + Delete vectors by keys from the collection. + + Parameters + ---------- + name : str + Name of the collection. + keys : int | str | list + Primary keys to delete vectors. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + typing.Any + Returns result of the given keys that are delete from the collection. + """ + + resource = self.load_resource(name) + + return resource.delete_by_keys(keys=keys, **kwargs) + + def delete(self, name: str, expr: str, **kwargs: dict[str, typing.Any]) -> dict[str, typing.Any]: + """ + Delete vectors from the collection using expressions. + + Parameters + ---------- + name : str + Name of the collection. + expr : str + Delete expression. + **kwargs : dict[str, typing.Any] + Extra keyword arguments specific to the vector database implementation. + + Returns + ------- + dict[str, typing.Any] + Returns result of the given keys that are delete from the collection. + """ + + resource = self.load_resource(name) + result = resource.delete(expr=expr, **kwargs) + + return result + + def retrieve_by_keys(self, name: str, keys: int | str | list, **kwargs: dict[str, typing.Any]) -> list[typing.Any]: + """ + Retrieve the inserted vectors using their primary keys from the Collection. + + Parameters + ---------- + name : str + Name of the collection. + keys : int | str | list + Primary keys to get vectors for. Depending on pk_field type it can be int or str + or a list of either. + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the retrieval operation. + + Returns + ------- + list[typing.Any] + Returns result rows of the given keys from the collection. + """ + + resource = self.load_resource(name) + + result = resource.retrieve_by_keys(keys=keys, **kwargs) + + return result + + def count(self, name: str, **kwargs: dict[str, typing.Any]) -> int: + """ + Returns number of rows/entities in the given collection. + + Parameters + ---------- + name : str + Name of the collection. + **kwargs : dict[str, typing.Any] + Additional keyword arguments for the count operation. + + Returns + ------- + int + Returns number of entities in the collection. + """ + resource = self.load_resource(name) + + return resource.count(**kwargs) + + def drop(self, name: str, **kwargs: dict[str, typing.Any]) -> None: + """ + Drop a collection, index, or partition in the Milvus vector database. + + This method allows you to drop a collection, an index within a collection, + or a specific partition within a collection in the Milvus vector database. + + Parameters + ---------- + name : str + Name of the collection, index, or partition to be dropped. + **kwargs : dict + Additional keyword arguments for specifying the type and partition name (if applicable). + + Notes on Expected Keyword Arguments: + ------------------------------------ + - 'collection' (str, optional): + Specifies the type of collection to drop. Possible values: 'collection' (default), 'index', 'partition'. + + - 'partition_name' (str, optional): + Required when dropping a specific partition within a collection. Specifies the partition name to be dropped. + + - 'field_name' (str, optional): + Required when dropping an index within a collection. Specifies the field name for which the index is created. + + - 'index_name' (str, optional): + Required when dropping an index within a collection. Specifies the name of the index to be dropped. + + Raises + ------ + ValueError + If mandatory arguments are missing or if the provided 'collection' value is invalid. + """ + + logger.debug("Dropping collection: %s, kwargs=%s", name, kwargs) + + if self.has_store_object(name): + resource = kwargs.get("resource", "collection") + if resource == "collection": + self._client.drop_collection(collection_name=name) + elif resource == "partition": + if "partition_name" not in kwargs: + raise ValueError("Mandatory argument 'partition_name' is required when resource='partition'") + partition_name = kwargs["partition_name"] + if self._client.has_partition(collection_name=name, partition_name=partition_name): + # Collection need to be released before dropping the partition. + self._client.release_collection(collection_name=name) + self._client.drop_partition(collection_name=name, partition_name=partition_name) + elif resource == "index": + if "field_name" in kwargs and "index_name" in kwargs: + self._client.drop_index(collection_name=name, + field_name=kwargs["field_name"], + index_name=kwargs["index_name"]) + else: + raise ValueError( + "Mandatory arguments 'field_name' and 'index_name' are required when resource='index'") + + def describe(self, name: str, **kwargs: dict[str, typing.Any]) -> dict: + """ + Describe the collection in the vector database. + + Parameters + ---------- + name : str + Name of the collection. + **kwargs : dict[str, typing.Any] + Additional keyword arguments specific to the Milvus vector database. + + Returns + ------- + dict + Returns collection information. + """ + + resource = self.load_resource(name) + + return resource.describe(**kwargs) + + def release_resource(self, name: str) -> None: + """ + Release a loaded collection from the memory. + + Parameters + ---------- + name : str + Name of the collection to release. + """ + + self._client.release_collection(collection_name=name) + + def close(self) -> None: + """ + Close the connection to the Milvus vector database. + + This method disconnects from the Milvus vector database by removing the connection. + + """ + self._client.close() diff --git a/morpheus/utils/logger.py b/morpheus/utils/logger.py index 693e6574b7..7d4932edf1 100644 --- a/morpheus/utils/logger.py +++ b/morpheus/utils/logger.py @@ -112,49 +112,53 @@ def _configure_from_log_level(*extra_handlers: logging.Handler, log_level: int): # Get the root Morpheus logger morpheus_logger = logging.getLogger("morpheus") - # Set the level here - set_log_level(log_level=log_level) + # Prevent reconfiguration if called again + if (not getattr(morpheus_logger, "_configured_by_morpheus", False)): + setattr(morpheus_logger, "_configured_by_morpheus", True) - # Dont propagate upstream - morpheus_logger.propagate = False - morpheus_logging_queue = multiprocessing.Queue() + # Set the level here + set_log_level(log_level=log_level) - # This needs the be the only handler for morpheus logger - morpheus_queue_handler = logging.handlers.QueueHandler(morpheus_logging_queue) + # Dont propagate upstream + morpheus_logger.propagate = False + morpheus_logging_queue = multiprocessing.Queue() - # At this point, any morpheus logger will propagate upstream to the morpheus root and then be handled by the queue - # handler - morpheus_logger.addHandler(morpheus_queue_handler) + # This needs the be the only handler for morpheus logger + morpheus_queue_handler = logging.handlers.QueueHandler(morpheus_logging_queue) - log_file = os.path.join(appdirs.user_log_dir(appauthor="NVIDIA", appname="morpheus"), "morpheus.log") + # At this point, any morpheus logger will propagate upstream to the morpheus root and then be handled by the queue + # handler + morpheus_logger.addHandler(morpheus_queue_handler) - # Ensure the log directory exists - os.makedirs(os.path.dirname(log_file), exist_ok=True) + log_file = os.path.join(appdirs.user_log_dir(appauthor="NVIDIA", appname="morpheus"), "morpheus.log") - # Now we build all of the handlers for the queue listener - file_handler = logging.handlers.RotatingFileHandler(filename=log_file, backupCount=5, maxBytes=1000000) - file_handler.setLevel(logging.DEBUG) - file_handler.setFormatter( - logging.Formatter('%(asctime)s - [%(levelname)s]: %(message)s {%(name)s, %(threadName)s}')) + # Ensure the log directory exists + os.makedirs(os.path.dirname(log_file), exist_ok=True) - # Tqdm stream handler (avoids messing with progress bars) - console_handler = TqdmLoggingHandler() + # Now we build all of the handlers for the queue listener + file_handler = logging.handlers.RotatingFileHandler(filename=log_file, backupCount=5, maxBytes=1000000) + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter( + logging.Formatter('%(asctime)s - [%(levelname)s]: %(message)s {%(name)s, %(threadName)s}')) - # Build and run the queue listener to actually process queued messages - queue_listener = logging.handlers.QueueListener(morpheus_logging_queue, - console_handler, - file_handler, - *extra_handlers, - respect_handler_level=True) - queue_listener.start() - queue_listener._thread.name = "Logging Thread" + # Tqdm stream handler (avoids messing with progress bars) + console_handler = TqdmLoggingHandler() - # Register a function to kill the listener thread before shutting down. prevents error on intpreter close - def stop_queue_listener(): - queue_listener.stop() + # Build and run the queue listener to actually process queued messages + queue_listener = logging.handlers.QueueListener(morpheus_logging_queue, + console_handler, + file_handler, + *extra_handlers, + respect_handler_level=True) + queue_listener.start() + queue_listener._thread.name = "Logging Thread" - import atexit - atexit.register(stop_queue_listener) + # Register a function to kill the listener thread before shutting down. prevents error on intpreter close + def stop_queue_listener(): + queue_listener.stop() + + import atexit + atexit.register(stop_queue_listener) def configure_logging(*extra_handlers: logging.Handler, log_level: int = None, log_config_file: str = None):