diff --git a/examples/llm/vdb_upload/langchain.py b/examples/llm/vdb_upload/langchain.py index f296e077d3..9b9b616bd3 100644 --- a/examples/llm/vdb_upload/langchain.py +++ b/examples/llm/vdb_upload/langchain.py @@ -20,7 +20,7 @@ from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores.milvus import Milvus -from examples.llm.vdb_upload.vdb_utils import build_rss_urls +from examples.llm.vdb_upload.vdb_utils import DEFAULT_RSS_URLS from morpheus.utils.logging_timer import log_time logger = logging.getLogger(__name__) @@ -28,7 +28,7 @@ def chain(model_name, save_cache): with log_time(msg="Seeding with chain took {duration} ms. {rate_per_sec} docs/sec", log_fn=logger.debug) as log: - loader = RSSFeedLoader(urls=build_rss_urls()) + loader = RSSFeedLoader(urls=DEFAULT_RSS_URLS.copy()) documents = loader.load() diff --git a/examples/llm/vdb_upload/run.py b/examples/llm/vdb_upload/run.py index b889f6c17a..c43ef91ed7 100644 --- a/examples/llm/vdb_upload/run.py +++ b/examples/llm/vdb_upload/run.py @@ -16,8 +16,7 @@ import os import click -from vdb_upload.vdb_utils import build_cli_configs -from vdb_upload.vdb_utils import build_final_config +from vdb_upload.vdb_utils import build_config from vdb_upload.vdb_utils import is_valid_service logger = logging.getLogger(__name__) @@ -144,7 +143,8 @@ def run(): default="http://localhost:19530", help="URI for connecting to Vector Database server.", ) -def pipeline(**kwargs): +@click.pass_context +def pipeline(ctx: click.Context, **kwargs): """ Configure and run the data processing pipeline based on the specified command-line options. @@ -154,6 +154,8 @@ def pipeline(**kwargs): Parameters ---------- + ctx: click.Context + Click context object. **kwargs : dict Keyword arguments containing command-line options. @@ -161,18 +163,19 @@ def pipeline(**kwargs): ------- The result of the internal pipeline function call. """ + vdb_config_path = kwargs.pop('vdb_config_path', None) - cli_source_conf, cli_embed_conf, cli_pipe_conf, cli_tok_conf, cli_vdb_conf = build_cli_configs(**kwargs) - final_config = build_final_config(vdb_config_path, - cli_source_conf, - cli_embed_conf, - cli_pipe_conf, - cli_tok_conf, - cli_vdb_conf) + # When a config file is provided, only merge the explicit flags set by the user + explicit_cli_args = {} + for (key, value) in kwargs.items(): + if ctx.get_parameter_source(key) is not click.core.ParameterSource.DEFAULT: + explicit_cli_args[key] = value + + config = build_config(vdb_conf_path=vdb_config_path, explicit_cli_args=explicit_cli_args, implicit_cli_args=kwargs) # Call the internal pipeline function with the final config dictionary from .pipeline import pipeline as _pipeline - return _pipeline(**final_config) + return _pipeline(**config) @run.command() diff --git a/examples/llm/vdb_upload/vdb_config.yaml b/examples/llm/vdb_upload/vdb_config.yaml index 0931665637..35ff2facb5 100644 --- a/examples/llm/vdb_upload/vdb_config.yaml +++ b/examples/llm/vdb_upload/vdb_config.yaml @@ -31,7 +31,7 @@ vdb_pipeline: sources: - type: rss - name: "rss_cve" + name: "rss" config: batch_size: 128 # Number of rss feeds per batch cache_dir: "./.cache/http" @@ -75,7 +75,6 @@ vdb_pipeline: output_batch_size: 2048 # Number of chunked documents per output batch request_timeout_sec: 2.0 run_indefinitely: true - stop_after_rec: 0 strip_markup: true web_scraper_config: chunk_overlap: 51 diff --git a/examples/llm/vdb_upload/vdb_utils.py b/examples/llm/vdb_upload/vdb_utils.py index 3aee5584ec..496df72797 100644 --- a/examples/llm/vdb_upload/vdb_utils.py +++ b/examples/llm/vdb_upload/vdb_utils.py @@ -24,6 +24,127 @@ logger = logging.getLogger(__name__) +DEFAULT_RSS_URLS = [ + "https://www.theregister.com/security/headlines.atom", + "https://isc.sans.edu/dailypodcast.xml", + "https://threatpost.com/feed/", + "http://feeds.feedburner.com/TheHackersNews?format=xml", + "https://www.bleepingcomputer.com/feed/", + "https://therecord.media/feed/", + "https://blog.badsectorlabs.com/feeds/all.atom.xml", + "https://krebsonsecurity.com/feed/", + "https://www.darkreading.com/rss_simple.asp", + "https://blog.malwarebytes.com/feed/", + "https://msrc.microsoft.com/blog/feed", + "https://securelist.com/feed", + "https://www.crowdstrike.com/blog/feed/", + "https://threatconnect.com/blog/rss/", + "https://news.sophos.com/en-us/feed/", + "https://www.us-cert.gov/ncas/current-activity.xml", + "https://www.csoonline.com/feed", + "https://www.cyberscoop.com/feed", + "https://research.checkpoint.com/feed", + "https://feeds.fortinet.com/fortinet/blog/threat-research", + "https://www.mcafee.com/blogs/rss", + "https://www.digitalshadows.com/blog-and-research/rss.xml", + "https://www.nist.gov/news-events/cybersecurity/rss.xml", + "https://www.sentinelone.com/blog/rss/", + "https://www.bitdefender.com/blog/api/rss/labs/", + "https://www.welivesecurity.com/feed/", + "https://unit42.paloaltonetworks.com/feed/", + "https://mandiant.com/resources/blog/rss.xml", + "https://www.wired.com/feed/category/security/latest/rss", + "https://www.wired.com/feed/tag/ai/latest/rss", + "https://blog.google/threat-analysis-group/rss/", + "https://intezer.com/feed/", +] + +DEFAULT_RSS_CONFIG = { + # RSS feeds can take a while to pull, smaller batch sizes allows the pipeline to feel more responsive + "batch_size": 32, + "output_batch_size": 2048, + "cache_dir": "./.cache/http", + "stop_after_rec": 0, + "feed_input": DEFAULT_RSS_URLS.copy(), + "strip_markup": True, + "web_scraper_config": {} +} + +DEFAULT_EMBEDDINGS_MODEL_KWARGS = {"force_convert_inputs": True, "use_shared_memory": False} + +DEFAULT_PIPELINE_CONFIG = {"edge_buffer_size": 128, "max_batch_size": 256} + +DEFAULT_TOKENIZER_CONFIG = { + "model_name": "bert-base-uncased-hash", + "model_kwargs": { + "add_special_tokens": False, + "column": "content", + "do_lower_case": True, + "truncation": True, + "vocab_hash_file": "data/bert-base-uncased-hash.txt", + } +} + +DEFAULT_VDB_CONFIG = { + # Vector db upload has some significant transaction overhead, batch size here should be as large as possible + 'batch_size': 16384, + 'recreate': True, + 'truncate_long_strings': True +} + +DEFAULT_MILVUS_CONFIG = { + "index_conf": { + "field_name": "embedding", + "metric_type": "L2", + "index_type": "HNSW", + "params": { + "M": 8, + "efConstruction": 64, + }, + }, + "schema_conf": { + "enable_dynamic_field": True, + "schema_fields": [ + { + "name": "id", + "dtype": "INT64", + "description": "Primary key for the collection", + "is_primary": True, + "auto_id": True + }, + { + "name": "title", "dtype": "VARCHAR", "description": "The title of the RSS Page", "max_length": 65_535 + }, + { + "name": "source", "dtype": "VARCHAR", "description": "The URL of the RSS Page", "max_length": 65_535 + }, + { + "name": "summary", + "dtype": "VARCHAR", + "description": "The summary of the RSS Page", + "max_length": 65_535 + }, + { + "name": "content", + "dtype": "VARCHAR", + "description": "A chunk of text from the RSS Page", + "max_length": 65_535 + }, + { + "name": "embedding", "dtype": "FLOAT_VECTOR", "description": "Embedding vectors", "dim": 384 + }, + ], + "description": "RSS collection schema" + } +} + +YAML_TO_CONFIG_MAPPING = { + 'embeddings': 'embeddings_config', + 'pipeline': 'pipeline_config', + 'tokenizer': 'tokenizer_config', + 'vdb': 'vdb_config' +} + def build_milvus_config(resource_schema_config: dict): schema_fields = [] @@ -87,248 +208,14 @@ def merge_dicts(dict_1, dict_2): dict The merged dictionary. """ + result = dict_1.copy() for key, value in dict_2.items(): - if key in dict_1 and isinstance(dict_1[key], dict) and isinstance(value, dict): - merge_dicts(dict_1[key], value) + dict_1_value = dict_1.get(key) + if isinstance(dict_1_value, dict) and isinstance(value, dict): + result[key] = merge_dicts(dict_1_value, value) else: - dict_1[key] = value - return dict_1 - - -def merge_configs(file_config, cli_config): - """ - Merge two configuration dictionaries, prioritizing the CLI configuration. - - This function merges configurations provided from a file and the CLI, with the CLI configuration taking precedence - in case of overlapping keys. Nested dictionaries are merged recursively. - - Parameters - ---------- - file_config : dict - The configuration dictionary loaded from a file. - cli_config : dict - The configuration dictionary provided via CLI arguments. - - Returns - ------- - dict - A merged dictionary with CLI configurations overriding file configurations where they overlap. - """ - return merge_dicts(file_config.copy(), {k: v for k, v in cli_config.items() if v is not None}) - - -def _build_default_rss_source(enable_cache, - enable_monitors, - interval_secs, - run_indefinitely, - stop_after, - vector_db_resource_name, - content_chunking_size, - rss_request_timeout_sec, - feed_inputs): - return { - 'type': 'rss', - 'name': 'rss-cli', - 'config': { - # RSS feeds can take a while to pull, smaller batch sizes allows the pipeline to feel more responsive - "batch_size": 32, - "output_batch_size": 2048, - "cache_dir": "./.cache/http", - "cooldown_interval_sec": interval_secs, - "stop_after_rec": stop_after or 0, - "enable_cache": enable_cache, - "enable_monitor": enable_monitors, - "feed_input": feed_inputs if feed_inputs else build_rss_urls(), - "interval_sec": interval_secs, - "request_timeout_sec": rss_request_timeout_sec, - "run_indefinitely": run_indefinitely, - "strip_markup": True, - "vdb_resource_name": vector_db_resource_name, - "web_scraper_config": { - "chunk_size": content_chunking_size, - "enable_cache": enable_cache, - } - } - } - - -def _build_default_filesystem_source(enable_monitors, - file_source, - pipeline_batch_size, - run_indefinitely, - vector_db_resource_name, - content_chunking_size, - num_threads): - return { - 'type': 'filesystem', - 'name': 'filesystem-cli', - 'config': { - "batch_size": pipeline_batch_size, - "enable_monitor": enable_monitors, - "extractor_config": { - "chunk_size": content_chunking_size, - "num_threads": num_threads, - }, - "filenames": file_source, - "vdb_resource_name": vector_db_resource_name, - "watch": run_indefinitely, - } - } - - -def build_cli_configs(source_type, - enable_cache, - embedding_size, - isolate_embeddings, - embedding_model_name, - enable_monitors, - file_source, - interval_secs, - pipeline_batch_size, - run_indefinitely, - stop_after, - vector_db_resource_name, - vector_db_service, - vector_db_uri, - content_chunking_size, - num_threads, - rss_request_timeout_sec, - model_max_batch_size, - model_fea_length, - triton_server_url, - feed_inputs): - """ - Create configuration dictionaries based on CLI arguments. - - Constructs individual configuration dictionaries for various components of the data processing pipeline, - such as source, embeddings, pipeline, tokenizer, and vector database configurations. - - Parameters - ---------- - source_type : list of str - Types of data sources (e.g., 'rss', 'filesystem'). - enable_cache : bool - Flag to enable caching. - embedding_size : int - Size of the embeddings. - isolate_embeddings : bool - Flag to isolate embeddings. - embedding_model_name : str - Name of the embedding model. - enable_monitors : bool - Flag to enable monitor functionality. - file_source : list of str - File sources or paths to be processed. - interval_secs : int - Interval in seconds for operations. - pipeline_batch_size : int - Batch size for the pipeline. - run_indefinitely : bool - Flag to run the process indefinitely. - stop_after : int - Stop after a certain number of records. - vector_db_resource_name : str - Name of the resource in the vector database. - vector_db_service : str - Name of the vector database service. - vector_db_uri : str - URI for the vector database server. - content_chunking_size : int - Size of content chunks. - num_threads : int - Number of threads to use. - rss_request_timeout_sec : float - Timeout in seconds for RSS requests. - model_max_batch_size : int - Maximum batch size for the model. - model_fea_length : int - Feature length for the model. - triton_server_url : str - URL of the Triton server. - feed_inputs : list of str - RSS feed inputs. - - Returns - ------- - tuple - A tuple containing five dictionaries for source, embeddings, pipeline, tokenizer, and vector database - configurations. - """ - - # Source Configuration - cli_source_conf = {} - if 'rss' in source_type: - cli_source_conf['rss'] = _build_default_rss_source(enable_cache, - enable_monitors, - interval_secs, - run_indefinitely, - stop_after, - vector_db_resource_name, - content_chunking_size, - rss_request_timeout_sec, - feed_inputs) - if 'filesystem' in source_type: - cli_source_conf['filesystem'] = _build_default_filesystem_source(enable_monitors, - file_source, - pipeline_batch_size, - run_indefinitely, - vector_db_resource_name, - content_chunking_size, - num_threads) - - # Embeddings Configuration - cli_embeddings_conf = { - "feature_length": model_fea_length, - "max_batch_size": model_max_batch_size, - "model_kwargs": { - "force_convert_inputs": True, - "model_name": embedding_model_name, - "server_url": triton_server_url, - "use_shared_memory": False, - }, - "num_threads": num_threads, - } - - # Pipeline Configuration - cli_pipeline_conf = { - "edge_buffer_size": 128, - "embedding_size": embedding_size, - "feature_length": model_fea_length, - "isolate_embeddings": isolate_embeddings, - "max_batch_size": 256, - "num_threads": num_threads, - "pipeline_batch_size": pipeline_batch_size, - } - - # Tokenizer Configuration - cli_tokenizer_conf = { - "model_name": "bert-base-uncased-hash", - "model_kwargs": { - "add_special_tokens": False, - "column": "content", - "do_lower_case": True, - "truncation": True, - "vocab_hash_file": "data/bert-base-uncased-hash.txt", - } - } - - # VDB Configuration - cli_vdb_conf = { - # Vector db upload has some significant transaction overhead, batch size here should be as large as possible - 'batch_size': 16384, - 'embedding_size': embedding_size, - 'recreate': True, - 'resource_name': vector_db_resource_name, - 'resource_schemas': { - vector_db_resource_name: - build_defualt_milvus_config(embedding_size) if (vector_db_service == 'milvus') else None, - }, - 'service': vector_db_service, - 'truncate_long_strings': True, - 'uri': vector_db_uri, - } - - return cli_source_conf, cli_embeddings_conf, cli_pipeline_conf, cli_tokenizer_conf, cli_vdb_conf + result[key] = value + return result def build_pipeline_config(pipeline_config: dict): @@ -366,204 +253,214 @@ def build_pipeline_config(pipeline_config: dict): return config -def build_final_config(vdb_conf_path, - cli_source_conf, - cli_embeddings_conf, - cli_pipeline_conf, - cli_tokenizer_conf, - cli_vdb_conf): +def _set_values_if_exists(dest_dict: dict[str, typing.Any], src_dict: dict[str, typing.Any], mapping: dict[str, str]): + """ + Set values in dest_dict if they exist in src_dict + Since a single source key can map to multiple destination keys, the mapping dictionary is in the form of: + {dest_key: src_key} + """ + for dest_key, src_key in mapping.items(): + # Explicitly using an `in` test here since `None` is a valid value + if src_key in src_dict: + dest_dict[dest_key] = src_dict[src_key] + + +def _cli_args_to_config(cli_args: dict[str, typing.Any], include_defaults: bool = False) -> dict: + """ + CLI arguments are in a flat structure, this function converts them to the nested structure used byt the yaml congig + allowing for easy merging of the two. + """ + config = {} + source_config = {} + source_type = cli_args.get('source_type', []) + if 'rss' in source_type: + if include_defaults: + rss_config = DEFAULT_RSS_CONFIG.copy() + else: + rss_config = {"web_scraper_config": {}} + + _set_values_if_exists(rss_config['web_scraper_config'], + cli_args, { + "chunk_size": "content_chunking_size", "enable_cache": "enable_cache" + }) + _set_values_if_exists( + rss_config, + cli_args, + { + "cooldown_interval_sec": "interval_secs", + "stop_after_rec": "stop_after", + "enable_cache": "enable_cache", + "enable_monitor": "enable_monitors", + "interval_sec": "interval_secs", + "request_timeout_sec": "rss_request_timeout_sec", + "run_indefinitely": "run_indefinitely", + "vdb_resource_name": "vector_db_resource_name", + }) + + # Handle feed inputs separately + if len(cli_args.get('feed_inputs', [])) > 0: + rss_config['feed_input'] = cli_args['feed_inputs'] + + source_config['rss'] = {'type': 'rss', 'name': 'rss', 'config': rss_config} + + if 'filesystem' in source_type: + fs_config = {"extractor_config": {}} + _set_values_if_exists(fs_config["extractor_config"], + cli_args, { + "chunk_size": "content_chunking_size", "num_threads": "num_threads" + }) + + _set_values_if_exists( + fs_config, + cli_args, + { + "batch_size": "pipeline_batch_size", + "enable_monitor": "enable_monitors", + "filenames": "file_source", + "vdb_resource_name": "vector_db_resource_name", + "watch": "run_indefinitely" + }) + + source_config['filesystem'] = {'type': 'filesystem', 'name': 'filesystem-cli', 'config': fs_config} + + config['source_config'] = source_config + + embeddings_model_kwargs = {} + if include_defaults: + embeddings_model_kwargs.update(DEFAULT_EMBEDDINGS_MODEL_KWARGS.copy()) + + _set_values_if_exists(embeddings_model_kwargs, + cli_args, { + "model_name": "embedding_model_name", "server_url": "triton_server_url" + }) + + embeddings_config = {"model_kwargs": embeddings_model_kwargs} + + _set_values_if_exists(embeddings_config, + cli_args, + { + "feature_length": "model_fea_length", + "max_batch_size": "model_max_batch_size", + "num_threads": "num_threads" + }) + + config['embeddings_config'] = embeddings_config + + # These values will be replaced later with a morpheus.config.Config object + pipeline_config = {} + if include_defaults: + pipeline_config.update(DEFAULT_PIPELINE_CONFIG.copy()) + + _set_values_if_exists( + pipeline_config, + cli_args, + { + "embedding_size": "embedding_size", + "feature_length": "model_fea_length", + "isolate_embeddings": "isolate_embeddings", + "num_threads": "num_threads", + "pipeline_batch_size": "pipeline_batch_size", + }) + + config['pipeline_config'] = pipeline_config + + if include_defaults: + config['tokenizer_config'] = DEFAULT_TOKENIZER_CONFIG.copy() + + vdb_config = {} + if include_defaults: + vdb_config.update(DEFAULT_VDB_CONFIG.copy()) + + _set_values_if_exists( + vdb_config, + cli_args, + { + 'embedding_size': 'embedding_size', + 'resource_name': 'vector_db_resource_name', + 'service': 'vector_db_service', + 'uri': 'vector_db_uri' + }) + + # Milvus configs to be built later if needed. The reason here is that we could use the default embedding size, but + # override the service type or name in other levels, we need the final resolved values of all three, for now just + # stub in hte resource name + resource_name = vdb_config.get('resource_name') + if resource_name is not None: + vdb_config["resource_schemas"] = {resource_name: None} + + config['vdb_config'] = vdb_config + + return config + + +def build_config(vdb_conf_path: str | None, + explicit_cli_args: dict[str, typing.Any], + implicit_cli_args: dict[str, typing.Any]): """ Load and merge configurations from the CLI and YAML file. This function combines the configurations provided via the CLI with those specified in a YAML file. - If a YAML configuration file is specified and exists, it will merge its settings with the CLI settings, - with the YAML settings taking precedence. + If a YAML configuration file is specified and exists, it will merge its settings with the CLI settings. + + The order of precedence is as follows: Explict CLI args set by user > YAML settings > default values of CLI args. Parameters ---------- vdb_conf_path : str Path to the YAML configuration file. - cli_source_conf : dict - Source configuration provided via CLI. - cli_embeddings_conf : dict - Embeddings configuration provided via CLI. - cli_pipeline_conf : dict - Pipeline configuration provided via CLI. - cli_tokenizer_conf : dict - Tokenizer configuration provided via CLI. - cli_vdb_conf : dict - Vector Database (VDB) configuration provided via CLI. + explicit_cli_args : dict[str, typing.Any] + CLI args explicitly set by the user. + implicit_cli_args : dict[str, typing.Any] + CLI args including default values not explicitly set by the user. Returns ------- dict - A dictionary containing the final merged configuration for the pipeline, including source, embeddings, - tokenizer, and VDB configurations. - - Notes - ----- - The function prioritizes the YAML file configurations over CLI configurations. In case of overlapping - settings, the values from the YAML file will overwrite those from the CLI. + A dictionary containing the final merged configuration for the pipeline. """ - final_config = {} # Load and merge configurations from the YAML file if it exists if vdb_conf_path: with open(vdb_conf_path, 'r', encoding='utf-8') as file: - vdb_pipeline_config = yaml.safe_load(file).get('vdb_pipeline', {}) - - embeddings_conf = merge_configs(vdb_pipeline_config.get('embeddings', {}), cli_embeddings_conf) - pipeline_conf = merge_configs(vdb_pipeline_config.get('pipeline', {}), cli_pipeline_conf) - source_conf = vdb_pipeline_config.get('sources', []) + list(cli_source_conf.values()) - tokenizer_conf = merge_configs(vdb_pipeline_config.get('tokenizer', {}), cli_tokenizer_conf) - vdb_conf = vdb_pipeline_config.get('vdb', {}) - resource_schema = vdb_conf.pop("resource_schema", None) - - if resource_schema: - vdb_conf["resource_kwargs"] = build_milvus_config(resource_schema) - vdb_conf = merge_configs(vdb_conf, cli_vdb_conf) - - pipeline_conf['embedding_size'] = vdb_conf.get('embedding_size', 384) - - final_config.update({ - 'embeddings_config': embeddings_conf, - 'pipeline_config': build_pipeline_config(pipeline_conf), - 'source_config': source_conf, - 'tokenizer_config': tokenizer_conf, - 'vdb_config': vdb_conf, - }) - else: - # Use CLI configurations only - final_config.update({ - 'embeddings_config': cli_embeddings_conf, - 'pipeline_config': build_pipeline_config(cli_pipeline_conf), - 'source_config': list(cli_source_conf.values()), - 'tokenizer_config': cli_tokenizer_conf, - 'vdb_config': cli_vdb_conf, - }) + yaml_config = yaml.safe_load(file).get('vdb_pipeline', {}) - # If no sources are specified either via CLI or in the yaml config, add a default RSS source - if (not final_config['source_config']): - final_config['source_config'].append( - _build_default_rss_source(enable_cache=True, - enable_monitors=True, - interval_secs=60, - run_indefinitely=True, - stop_after=None, - vector_db_resource_name="RSS", - content_chunking_size=128, - rss_request_timeout_sec=30, - feed_inputs=build_rss_urls())) + # Yaml specific transforms + for (yaml_key, config_key) in YAML_TO_CONFIG_MAPPING.items(): + yaml_config[config_key] = yaml_config.pop(yaml_key, {}) - return final_config - - -def build_defualt_milvus_config(embedding_size: int) -> typing.Dict[str, typing.Any]: - """ - Builds the configuration for Milvus. + sources = yaml_config.pop('sources', []) + yaml_config['source_config'] = {src['name']: src for src in sources} - This function creates a dictionary configuration for a Milvus collection. - It includes the index configuration and the schema configuration, with - various fields like id, title, link, summary, page_content, and embedding. + else: + yaml_config = {} - Parameters - ---------- - embedding_size : int - The size of the embedding vector. + implicit_cli_config = _cli_args_to_config(implicit_cli_args, include_defaults=True) + explicit_cli_config = _cli_args_to_config(explicit_cli_args, include_defaults=False) - Returns - ------- - typing.Dict[str, Any] - A dictionary containing the configuration settings for Milvus. - """ + final_config = merge_dicts(implicit_cli_config, yaml_config) + final_config = merge_dicts(final_config, explicit_cli_config) - milvus_resource_kwargs = { - "index_conf": { - "field_name": "embedding", - "metric_type": "L2", - "index_type": "HNSW", - "params": { - "M": 8, - "efConstruction": 64, - }, - }, - "schema_conf": { - "enable_dynamic_field": True, - "schema_fields": [ - pymilvus.FieldSchema(name="id", - dtype=pymilvus.DataType.INT64, - description="Primary key for the collection", - is_primary=True, - auto_id=True).to_dict(), - pymilvus.FieldSchema(name="title", - dtype=pymilvus.DataType.VARCHAR, - description="The title of the RSS Page", - max_length=65_535).to_dict(), - pymilvus.FieldSchema(name="source", - dtype=pymilvus.DataType.VARCHAR, - description="The URL of the RSS Page", - max_length=65_535).to_dict(), - pymilvus.FieldSchema(name="summary", - dtype=pymilvus.DataType.VARCHAR, - description="The summary of the RSS Page", - max_length=65_535).to_dict(), - pymilvus.FieldSchema(name="content", - dtype=pymilvus.DataType.VARCHAR, - description="A chunk of text from the RSS Page", - max_length=65_535).to_dict(), - pymilvus.FieldSchema(name="embedding", - dtype=pymilvus.DataType.FLOAT_VECTOR, - description="Embedding vectors", - dim=embedding_size).to_dict(), - ], - "description": "Test collection schema" - } - } + # Flatten the source configs into a list + final_config['source_config'] = list(final_config.pop('source_config').values()) - return milvus_resource_kwargs + # Handle the resource schema separately, the reason is we need both the service type, resource name and the + # embedding size to all be defined, some or all of these values could be defined at any level. + vdb_config = final_config['vdb_config'] + if vdb_config.get('service') == 'milvus': + # Replace the resource schema configs with Milvus config objects + resource_schema_configs = vdb_config.pop("resource_schemas", {}) + resource_schemas = {} + for (resource_name, resource_schema) in resource_schema_configs.items(): + if resource_schema is None: + resource_schema = DEFAULT_MILVUS_CONFIG.copy() + # Update the embedding_size + resource_schema['schema_conf']['schema_fields'][-1]['dim'] = vdb_config['embedding_size'] + resource_schemas[resource_name] = build_milvus_config(resource_schema) -def build_rss_urls() -> typing.List[str]: - """ - Builds a list of RSS feed URLs. + vdb_config['resource_schemas'] = resource_schemas - Returns - ------- - typing.List[str] - A list of URLs as strings, each pointing to a different RSS feed. - """ + # convert the pipeline config to a morpheus.config.Config object + final_config['pipeline_config'] = build_pipeline_config(final_config['pipeline_config']) - return [ - "https://www.theregister.com/security/headlines.atom", - "https://isc.sans.edu/dailypodcast.xml", - "https://threatpost.com/feed/", - "http://feeds.feedburner.com/TheHackersNews?format=xml", - "https://www.bleepingcomputer.com/feed/", - "https://therecord.media/feed/", - "https://blog.badsectorlabs.com/feeds/all.atom.xml", - "https://krebsonsecurity.com/feed/", - "https://www.darkreading.com/rss_simple.asp", - "https://blog.malwarebytes.com/feed/", - "https://msrc.microsoft.com/blog/feed", - "https://securelist.com/feed", - "https://www.crowdstrike.com/blog/feed/", - "https://threatconnect.com/blog/rss/", - "https://news.sophos.com/en-us/feed/", - "https://www.us-cert.gov/ncas/current-activity.xml", - "https://www.csoonline.com/feed", - "https://www.cyberscoop.com/feed", - "https://research.checkpoint.com/feed", - "https://feeds.fortinet.com/fortinet/blog/threat-research", - "https://www.mcafee.com/blogs/rss", - "https://www.digitalshadows.com/blog-and-research/rss.xml", - "https://www.nist.gov/news-events/cybersecurity/rss.xml", - "https://www.sentinelone.com/blog/rss/", - "https://www.bitdefender.com/blog/api/rss/labs/", - "https://www.welivesecurity.com/feed/", - "https://unit42.paloaltonetworks.com/feed/", - "https://mandiant.com/resources/blog/rss.xml", - "https://www.wired.com/feed/category/security/latest/rss", - "https://www.wired.com/feed/tag/ai/latest/rss", - "https://blog.google/threat-analysis-group/rss/", - "https://intezer.com/feed/", - ] + return final_config diff --git a/tests/examples/llm/vdb_upload/test_vdb_utils.py b/tests/examples/llm/vdb_upload/test_vdb_utils.py index 38080f402a..e55950e4ef 100644 --- a/tests/examples/llm/vdb_upload/test_vdb_utils.py +++ b/tests/examples/llm/vdb_upload/test_vdb_utils.py @@ -28,29 +28,29 @@ def test_is_valid_service_with_mixed_case(import_vdb_update_utils_module): assert import_vdb_update_utils_module.is_valid_service(None, None, "MilVuS") == "milvus" -def test_merge_configs_non_overlapping(import_vdb_update_utils_module): +def test_merge_dicts_non_overlapping(import_vdb_update_utils_module): file_config = {"key1": "value1"} cli_config = {"key2": "value2"} expected = {"key1": "value1", "key2": "value2"} - assert import_vdb_update_utils_module.merge_configs(file_config, cli_config) == expected + assert import_vdb_update_utils_module.merge_dicts(file_config, cli_config) == expected -def test_merge_configs_overlapping(import_vdb_update_utils_module): +def test_merge_dicts_overlapping(import_vdb_update_utils_module): file_config = {"key1": "value1", "key2": "old_value"} cli_config = {"key2": "new_value"} expected = {"key1": "value1", "key2": "new_value"} - assert import_vdb_update_utils_module.merge_configs(file_config, cli_config) == expected + assert import_vdb_update_utils_module.merge_dicts(file_config, cli_config) == expected -def test_merge_configs_none_in_cli(import_vdb_update_utils_module): +def test_merge_dicts_none_in_cli(import_vdb_update_utils_module): file_config = {"key1": "value1", "key2": "value2"} cli_config = {"key2": None} - expected = {"key1": "value1", "key2": "value2"} - assert import_vdb_update_utils_module.merge_configs(file_config, cli_config) == expected + expected = {"key1": "value1", "key2": None} + assert import_vdb_update_utils_module.merge_dicts(file_config, cli_config) == expected -def test_merge_configs_empty(import_vdb_update_utils_module): +def test_merge_dicts_empty(import_vdb_update_utils_module): file_config = {} cli_config = {"key1": "value1"} expected = {"key1": "value1"} - assert import_vdb_update_utils_module.merge_configs(file_config, cli_config) == expected + assert import_vdb_update_utils_module.merge_dicts(file_config, cli_config) == expected