diff --git a/milvus_cli/Cli.py b/milvus_cli/Cli.py index 5913f2e..539e120 100644 --- a/milvus_cli/Cli.py +++ b/milvus_cli/Cli.py @@ -2,6 +2,7 @@ from Database import MilvusDatabase from Collection import MilvusCollection from Index import MilvusIndex +from Data import MilvusData from pymilvus import __version__ from Types import ParameterException @@ -23,3 +24,4 @@ class MilvusCli(object): database = MilvusDatabase() collection = MilvusCollection() index = MilvusIndex() + data = MilvusData() diff --git a/milvus_cli/Data.py b/milvus_cli/Data.py new file mode 100644 index 0000000..ff7d526 --- /dev/null +++ b/milvus_cli/Data.py @@ -0,0 +1,57 @@ +from Collection import getTargetCollection +from tabulate import tabulate + + +class MilvusData: + alias = "default" + + def insert( + self, collectionName, data, partitionName=None, alias=None, timeout=None + ): + temaAlias = alias if alias else self.alias + collection = getTargetCollection(collectionName, temaAlias) + result = collection.insert(data, partition_name=partitionName, timeout=timeout) + return result + + def query(self, collectionName, queryParameters, alias=None): + temaAlias = alias if alias else self.alias + collection = getTargetCollection(collectionName, temaAlias) + res = collection.query(**queryParameters) + return res + + def delete_entities( + self, + expr, + collectionName, + partition_name=None, + alias=None, + ): + temaAlias = alias if alias else self.alias + collection = getTargetCollection(collectionName, temaAlias) + result = collection.delete(expr, partition_name=partition_name) + return result + + def search(self, collectionName, searchParameters, alias=None, prettierFormat=True): + tempAlias = alias if alias else self.alias + collection = getTargetCollection(collectionName, tempAlias) + res = collection.search(**searchParameters) + if not prettierFormat: + return res + hits = res[0] + results = [] + for hits in res: + results += [ + tabulate( + map(lambda x: [x.id, x.distance, x.score], hits), + headers=["Index", "ID", "Distance", "Score"], + tablefmt="grid", + showindex=True, + ) + ] + return tabulate( + map(lambda x: [x.id, x.distance], hits), + headers=["Index", "ID", "Distance"], + tablefmt="grid", + showindex=True, + ) + # return res diff --git a/milvus_cli/Index.py b/milvus_cli/Index.py index 1a6765d..7c3ff17 100644 --- a/milvus_cli/Index.py +++ b/milvus_cli/Index.py @@ -14,7 +14,7 @@ def create_index( indexType, metricType, params, - alias, + alias=None, ): try: tempAlias = alias if alias else self.alias @@ -44,7 +44,7 @@ def create_index( except Exception as e: raise Exception(f"Create index error!{str(e)}") - def get_index_details(self, collectionName, indexName, alias): + def get_index_details(self, collectionName, indexName, alias=None): tempAlias = alias if alias else self.alias collection = getTargetCollection(collectionName, tempAlias) @@ -62,22 +62,22 @@ def get_index_details(self, collectionName, indexName, alias): rows.append(["Params", paramsDetails]) return tabulate(rows, tablefmt="grid") - def drop_index(self, collectionName, indexName, alias, timeout=None): + def drop_index(self, collectionName, indexName, alias=None, timeout=None): tempAlias = alias if alias else self.alias collection = getTargetCollection(collectionName, tempAlias) collection.drop_index(index_name=indexName, timeout=timeout) return self.list_indexes(collectionName, tempAlias) - def has_index(self, collectionName, indexName, alias, timeout=None): + def has_index(self, collectionName, indexName, alias=None, timeout=None): tempAlias = alias if alias else self.alias collection = getTargetCollection(collectionName, tempAlias) return collection.has_index(index_name=indexName, timeout=timeout) - def get_index_build_progress(self, collectionName, indexName, alias): + def get_index_build_progress(self, collectionName, indexName, alias=None): tempAlias = alias if alias else self.alias return index_building_progress(collectionName, indexName, tempAlias) - def list_indexes(self, collectionName, alias): + def list_indexes(self, collectionName, alias=None): tempAlias = alias if alias else self.alias target = getTargetCollection(collectionName, tempAlias) result = target.indexes @@ -100,3 +100,19 @@ def list_indexes(self, collectionName, alias): tablefmt="grid", showindex=True, ) + + def get_vector_index(self, collectionName): + target = getTargetCollection(collectionName, self.alias) + try: + result = target.index() + except Exception as e: + return {} + else: + details = { + "field_name": result.field_name, + "index_type": result.params["index_type"], + "metric_type": result.params["metric_type"], + "params": result.params["params"], + } + + return details diff --git a/milvus_cli/scripts/data_cli.py b/milvus_cli/scripts/data_cli.py new file mode 100644 index 0000000..0268149 --- /dev/null +++ b/milvus_cli/scripts/data_cli.py @@ -0,0 +1,360 @@ +from helper_cli import search, query, delete, insert +from init_cli import cli + +import click +import os +import sys + +current_dir = os.path.dirname(os.path.realpath(__file__)) +parent_dir = os.path.dirname(current_dir) +sys.path.append(parent_dir) + +from Validation import validateQueryParams, validateSearchParams +from Types import ParameterException, IndexTypesMap +from Fs import readCsvFile + + +@delete.command("entities") +@click.option("-c", "--collection-name", "collectionName", help="Collection name.") +@click.option( + "-p", + "--partition", + "partitionName", + help="[Optional] - Name of partitions that contain entities.", + default=None, +) +@click.option("-a", "--alias", "alias", help="The connection alias name.", type=str) +@click.pass_obj +def delete_entities( + obj, + collectionName, + partitionName, + alias, +): + """ + Delete entities from collection. + + Example: + + milvus_cli > delete entities -c test_collection + """ + expr = click.prompt( + '''The expression to specify entities to be deleted, such as "film_id in [ 0, 1 ]"''' + ) + click.echo( + "You are trying to delete the entities of collection. This action cannot be undone!\n" + ) + if not click.confirm("Do you want to continue?"): + return + result = obj.data.delete_entities(expr, collectionName, partitionName, alias) + click.echo(result) + + +@cli.command("query") +@click.pass_obj +def query(obj): + """ + Query with a set of criteria, and results in a list of records that match the query exactly. + + Example: + + milvus_cli > query + + Collection name: car + + The query expression: color in [2000,2002] + + Name of partitions that contain entities(split by "," if multiple) []: default + + A list of fields to return(split by "," if multiple) []: id, color, brand + + timeout []: + + Guarantee timestamp. This instructs Milvus to see all operations performed before a provided timestamp. If no such timestamp is provided, then Milvus will search all operations performed to date. [0]: + + Graceful time. Only used in bounded consistency level. If graceful_time is set, PyMilvus will use current timestamp minus the graceful_time as the guarantee_timestamp. This option is 5s by default if not set. [5]: + + Travel timestamp. Users can specify a timestamp in a search to get results based on a data view at a specified point in time. [0]: 428960801420883491 + """ + collectionName = click.prompt( + "Collection name", type=click.Choice(obj.collection.list_collections()) + ) + expr = click.prompt("The query expression") + # partitionNames = click.prompt( + # f'The names of partitions to search (split by "," if multiple) {obj._list_partition_names(collectionName)}', + # default="", + # ) + outputFields = click.prompt( + f'Fields to return(split by "," if multiple) {obj.collection.list_field_names(collectionName)}', + default="", + ) + timeout = click.prompt("timeout", default="") + guarantee_timestamp = click.prompt( + "Guarantee timestamp. This instructs Milvus to see all operations performed before a provided timestamp. If no such timestamp is provided, then Milvus will search all operations performed to date.", + default=0, + type=int, + ) + graceful_time = click.prompt( + "Graceful time. Only used in bounded consistency level. If graceful_time is set, PyMilvus will use current timestamp minus the graceful_time as the guarantee_timestamp. This option is 5s by default if not set.", + default=5, + type=int, + ) + travel_timestamp = click.prompt( + "Travel timestamp. Users can specify a timestamp in a search to get results based on a data view at a specified point in time.", + default=0, + type=int, + ) + partitionNames = [] + try: + queryParameters = validateQueryParams( + expr, + partitionNames, + outputFields, + timeout, + guarantee_timestamp, + graceful_time, + travel_timestamp, + ) + except ParameterException as pe: + click.echo("Error!\n{}".format(str(pe))) + else: + click.echo(obj.data.query(collectionName, queryParameters)) + + +@insert.command("data") +@click.option( + "-c", + "--collection-name", + "collectionName", + help="The name of collection to be imported.", +) +@click.option( + "-p", + "--partition", + "partitionName", + help="[Optional] - The partition name which the data will be inserted to, if partition name is not passed, then the data will be inserted to “_default” partition.", + default=None, +) +@click.option( + "-a", "--alias", "alias", help="[Optional]-The connection alias name.", type=str +) +@click.option( + "-t", + "--timeout", + "timeout", + help="[Optional] - An optional duration of time in seconds to allow for the RPC. If timeout is not set, the client keeps waiting until the server responds or an error occurs.", + default=None, + type=float, +) +@click.argument("path") +@click.pass_obj +def insert_data(obj, collectionName, partitionName, alias, timeout, path): + """ + Import data from csv file(local or remote) with headers and insert into target collection. + + Example-1: + + milvus_cli > import -c car 'examples/import_csv/vectors.csv' + + Reading file from local path. + + Reading csv file... [####################################] 100% + + Column names are ['vector', 'color', 'brand'] + + Processed 50001 lines. + + Inserting ... + + Insert successfully. + + \b + -------------------------- ------------------ + Total insert entities: 50000 + Total collection entities: 150000 + Milvus timestamp: 428849214449254403 + -------------------------- ------------------ + + Example-2: + + milvus_cli > import -c car 'https://raw.githubusercontent.com/zilliztech/milvus_cli/main/examples/import_csv/vectors.csv' + + Reading file from remote URL. + + Reading csv file... [####################################] 100% + + Column names are ['vector', 'color', 'brand'] + + Processed 50001 lines. + + Inserting ... + + Insert successfully. + + \b + -------------------------- ------------------ + Total insert entities: 50000 + Total collection entities: 150000 + Milvus timestamp: 428849214449254403 + -------------------------- ------------------ + """ + try: + result = readCsvFile(path.replace('"', "").replace("'", "")) + data = result["data"] + result = obj.data.insert(collectionName, data, partitionName, alias, timeout) + except Exception as e: + click.echo("Error!\n{}".format(str(e))) + else: + click.echo(f"\nInserted successfully.\n") + click.echo(result) + + +@cli.command("search") +@click.pass_obj +def search(obj): + """ + Conducts a vector similarity search with an optional boolean expression as filter. + + Example-1(import a CSV file): + + Collection name (car, test_collection): car + + The vectors of search data (the length of data is number of query (nq), + the dim of every vector in data must be equal to vector field’s of + collection. You can also import a CSV file without headers): examples/import_csv/search_vectors.csv + + The vector field used to search of collection (vector): vector + + Metric type: L2 + + Search parameter nprobe's value: 10 + + The max number of returned record, also known as topk: 2 + + The boolean expression used to filter attribute []: id > 0 + + The names of partitions to search (split by "," if multiple) ['_default'] []: _default + + Example-2(collection has index): + + Collection name (car, test_collection): car + + \b + The vectors of search data (the length of data is number of query (nq), + the dim of every vector in data must be equal to vector field’s of + collection. You can also import a CSV file without headers): + [[0.71, 0.76, 0.17, 0.13, 0.42, 0.07, 0.15, 0.67, 0.58, 0.02, 0.39, + 0.47, 0.58, 0.88, 0.73, 0.31, 0.23, 0.57, 0.33, 0.2, 0.03, 0.43, + 0.78, 0.49, 0.17, 0.56, 0.76, 0.54, 0.45, 0.46, 0.05, 0.1, 0.43, + 0.63, 0.29, 0.44, 0.65, 0.01, 0.35, 0.46, 0.66, 0.7, 0.88, 0.07, + 0.49, 0.92, 0.57, 0.5, 0.16, 0.77, 0.98, 0.1, 0.44, 0.88, 0.82, + 0.16, 0.67, 0.63, 0.57, 0.55, 0.95, 0.13, 0.64, 0.43, 0.71, 0.81, + 0.43, 0.65, 0.76, 0.7, 0.05, 0.24, 0.03, 0.9, 0.46, 0.28, 0.92, + 0.25, 0.97, 0.79, 0.73, 0.97, 0.49, 0.28, 0.64, 0.19, 0.23, 0.51, + 0.09, 0.1, 0.53, 0.03, 0.23, 0.94, 0.87, 0.14, 0.42, 0.82, 0.91, + 0.11, 0.91, 0.37, 0.26, 0.6, 0.89, 0.6, 0.32, 0.11, 0.98, 0.67, + 0.12, 0.66, 0.47, 0.02, 0.15, 0.6, 0.64, 0.57, 0.14, 0.81, 0.75, + 0.11, 0.49, 0.78, 0.16, 0.63, 0.57, 0.18]] + + The vector field used to search of collection (vector): vector + + Metric type: L2 + + Search parameter nprobe's value: 10 + + The specified number of decimal places of returned distance [-1]: 5 + + The max number of returned record, also known as topk: 2 + + The boolean expression used to filter attribute []: id > 0 + + The names of partitions to search (split by "," if multiple) ['_default'] []: _default + + timeout []: + + """ + collectionName = click.prompt( + "Collection name", type=click.Choice(obj.collection.list_collections()) + ) + data = click.prompt( + "The vectors of search data (the length of data is number of query (nq), the dim of every vector in data must be equal to vector field’s of collection. You can also import a CSV file without headers)" + ) + annsField = click.prompt( + "The vector field used to search of collection", + type=click.Choice(obj.collection.list_field_names(collectionName)), + ) + indexDetails = obj.index.get_vector_index(collectionName) + hasIndex = not not indexDetails + if indexDetails: + index_type = indexDetails["index_type"] + search_parameters = IndexTypesMap[index_type]["search_parameters"] + metric_type = indexDetails["metric_type"] + click.echo(f"Metric type: {metric_type}") + metricType = metric_type + params = [] + for parameter in search_parameters: + paramInput = click.prompt(f"Search parameter {parameter}'s value") + params += [f"{parameter}:{paramInput}"] + else: + metricType = "" + params = [] + roundDecimal = click.prompt( + "The specified number of decimal places of returned distance", + default=-1, + type=int, + ) + limit = click.prompt( + "The max number of returned record, also known as topk", default=None, type=int + ) + expr = click.prompt("The boolean expression used to filter attribute", default="") + # partitionNames = click.prompt( + # f'The names of partitions to search (split by "," if multiple) {obj._list_partition_names(collectionName)}', + # default="", + # ) + partitionNames = [] + timeout = click.prompt("Timeout", default="") + guarantee_timestamp = click.prompt( + "Guarantee Timestamp(It instructs Milvus to see all operations performed before a provided timestamp. If no such timestamp is provided, then Milvus will search all operations performed to date)", + default=0, + type=int, + ) + travel_timestamp = click.prompt( + "Travel Timestamp(Specify a timestamp in a search to get results based on a data view)", + default=0, + type=int, + ) + export, exportPath = False, "" + # if click.confirm('Would you like to export results as a CSV file?'): + # export = True + # exportPath = click.prompt('Directory path to csv file') + # export = click.prompt('Would you like to export results as a CSV file?', default='n', type=click.Choice(['Y', 'n'])) + # if export: + # exportPath = click.prompt('Directory path to csv file') + try: + searchParameters = validateSearchParams( + data, + annsField, + metricType, + params, + limit, + expr, + partitionNames, + timeout, + roundDecimal, + hasIndex=hasIndex, + guarantee_timestamp=guarantee_timestamp, + travel_timestamp=travel_timestamp, + ) + except ParameterException as pe: + click.echo("Error!\n{}".format(str(pe))) + + else: + if export: + results = obj.data.search( + collectionName, searchParameters, prettierFormat=False + ) + else: + results = obj.data.search(collectionName, searchParameters) + click.echo(f"Search results:\n") + click.echo(results) + # click.echo(obj.search(collectionName, searchParameters)) diff --git a/milvus_cli/scripts/helper_cli.py b/milvus_cli/scripts/helper_cli.py index 922c92b..6a29dec 100644 --- a/milvus_cli/scripts/helper_cli.py +++ b/milvus_cli/scripts/helper_cli.py @@ -34,7 +34,7 @@ def clear(): click.clear() -@cli.group(no_args_is_help=False) +@cli.group("show", no_args_is_help=False) @click.pass_obj def show(obj): """Show connection, database,collection, loading_progress and index_progress.""" @@ -62,14 +62,14 @@ def create(obj): pass -@cli.group(no_args_is_help=False) +@cli.group("load", no_args_is_help=False) @click.pass_obj def load(obj): """Load collection, partition""" pass -@cli.group(no_args_is_help=False) +@cli.group("release", no_args_is_help=False) @click.pass_obj def release(obj): """Release collection, partition""" @@ -83,6 +83,27 @@ def delete(obj): pass +@cli.group("search", no_args_is_help=False) +@click.pass_obj +def search(obj): + """Similarity search""" + pass + + +@cli.group("query", no_args_is_help=False) +@click.pass_obj +def query(obj): + """Query entities in collection.""" + pass + + +@cli.group("insert", no_args_is_help=False) +@click.pass_obj +def insert(obj): + """Insert entities""" + pass + + @cli.command("exit") def quit_app(): """Exit the CLI.""" diff --git a/milvus_cli/scripts/milvus_cli.py b/milvus_cli/scripts/milvus_cli.py index e22cf3c..a86828a 100644 --- a/milvus_cli/scripts/milvus_cli.py +++ b/milvus_cli/scripts/milvus_cli.py @@ -4,6 +4,7 @@ from database_cli import * from collection_cli import * from index_cli import * +from data_cli import * if __name__ == "__main__": diff --git a/milvus_cli/test/test_data.py b/milvus_cli/test/test_data.py new file mode 100644 index 0000000..619e1fb --- /dev/null +++ b/milvus_cli/test/test_data.py @@ -0,0 +1,122 @@ +import unittest +import sys +import os + +current_dir = os.path.dirname(os.path.realpath(__file__)) +parent_dir = os.path.dirname(current_dir) +sys.path.append(parent_dir) +from Connection import MilvusConnection +from Collection import MilvusCollection +from Data import MilvusData +from Index import MilvusIndex + +uri = "http://localhost:19530" +tempAlias = "zilliz2" +collectionName = "test_collection" +vectorName = "title_vector" +indexName = "vec_index" + +milvusConnection = MilvusConnection() +collection = MilvusCollection() +milvusIndex = MilvusIndex() +milvusData = MilvusData() + + +class TestIndex(unittest.TestCase): + @classmethod + def setUpClass(cls): + milvusConnection.connect(uri=uri, alias=tempAlias) + fields = [ + "name:VARCHAR:128", + "title:VARCHAR:512", + ] + fields.append(f"{vectorName}:FLOAT_VECTOR:4") + + collection.create_collection( + collectionName=collectionName, + fields=fields, + alias=tempAlias, + autoId=False, + description="this is a test collection", + primaryField="name", + isDynamic=True, + consistencyLevel="Strong", + ) + milvusIndex.create_index( + collectionName=collectionName, + metricType="L2", + indexName=indexName, + alias=tempAlias, + fieldName=vectorName, + indexType="IVF_FLAT", + params=["nlist:128"], + ) + collection.load_collection(collectionName=collectionName, alias=tempAlias) + + @classmethod + def tearDownClass(cls): + collection.drop_collection(tempAlias, collectionName) + milvusConnection.disconnect(alias=tempAlias) + + def test_insert(self): + data = [ + ["1", "2"], + [ + "this is a test title1", + "this is a test title2", + ], + [ + [1, 2, 3, 4], + [0.1, 0.2, 0.3, 0.4], + ], + ] + res = milvusData.insert( + collectionName=collectionName, data=data, alias=tempAlias + ) + # print(res) + self.assertEqual(res.insert_count, 2) + self.assertEqual(res.succ_count, 2) + + def test_query(self): + queryParameters = { + "expr": "name in ['1','2']", + "output_fields": ["name", "title", vectorName], + "partition_names": None, + "timeout": None, + "alias": None, + } + res = milvusData.query( + collectionName=collectionName, + queryParameters=queryParameters, + alias=tempAlias, + ) + # print(res) + self.assertEqual(len(res), 2) + + def test_search(self): + searchParameters = { + "data": [[1, 2, 3, 4]], + "anns_field": vectorName, + "param": {"nprobe": 16}, + "limit": 10, + "round_decimal": 4, + "alias": tempAlias, + } + res = milvusData.search( + collectionName=collectionName, + searchParameters=searchParameters, + alias=tempAlias, + ) + self.assertIsInstance(res, str) + + def test_delete_entities(self): + res = milvusData.delete_entities( + collectionName=collectionName, + expr="name in ['1']", + alias=tempAlias, + ) + self.assertEqual(res.delete_count, 1) + + +if __name__ == "__main__": + unittest.main() diff --git a/milvus_cli/utils.py b/milvus_cli/utils.py index 486f8d9..f516bfa 100644 --- a/milvus_cli/utils.py +++ b/milvus_cli/utils.py @@ -679,6 +679,7 @@ class Completer(object): "exit": [], "help": [], "import": [], + "insert": [], "list": ["collections", "databases", "partitions", "indexes", "users"], "load_balance": [], "load": ["collection", "partition"],