Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add collection operations and tests #37

Merged
merged 1 commit into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions milvus_cli/Collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from pymilvus import (
list_collections,
has_collection,
utility,
Collection,
DataType,
FieldSchema,
CollectionSchema,
)
from tabulate import tabulate
from milvus_cli.Types import DataTypeByNum


class MilvusCollection(object):
alias = "default"

def create_collection(
self,
collectionName,
primaryField,
fields,
autoId=None,
description=None,
isDynamic=None,
consistencyLevel=None,
shardsNum=2,
alias=None,
):
fieldList = []
for field in fields:
[fieldName, fieldType, fieldData] = field.split(":")
upperFieldType = fieldType.upper()
if upperFieldType in ["BINARY_VECTOR", "FLOAT_VECTOR"]:
fieldList.append(
FieldSchema(
name=fieldName,
dtype=DataType[upperFieldType],
dim=int(fieldData),
)
)
elif upperFieldType == "VARCHAR":
fieldList.append(
FieldSchema(
name=fieldName,
dtype=DataType[upperFieldType],
max_length=fieldData,
)
)
else:
fieldList.append(
FieldSchema(
name=fieldName,
dtype=DataType[upperFieldType],
description=fieldData,
)
)
schema = CollectionSchema(
fields=fieldList,
primary_field=primaryField,
auto_id=autoId,
description=description,
_enable_dynamic_field=isDynamic,
)
collection = Collection(
name=collectionName,
schema=schema,
consistency_level=consistencyLevel,
shards_num=shardsNum,
using=alias if alias else self.alias,
)
return self.get_collection_details(collection=collection)

def list_collections(self, alias=None):
tempAlias = alias if alias else self.alias
try:
res = list_collections(using=tempAlias)
except Exception as e:
raise Exception(f"List collection error!{str(e)}")
else:
return res

def getTargetCollection(self, collectionName, alias=None):
try:
tempAlias = alias if alias else self.alias
target = Collection(collectionName, using=tempAlias)
except Exception as e:
raise Exception(f"Get collection error!{str(e)}")
else:
return target

def drop_collection(self, alias=None, collectionName=None):
try:
target = self.getTargetCollection(
collectionName=collectionName, alias=alias
)
target.drop()
except Exception as e:
raise Exception(f"Delete collection error!{str(e)}")
else:
return f"Drop collection {collectionName} successfully!"

def has_collection(self, alias=None, collectionName=None):
try:
res = has_collection(
collection_name=collectionName, using=alias if alias else self.alias
)
except Exception as e:
raise Exception(f"Has collection error!{str(e)}")
else:
return res

def rename_collection(self, alias=None, collectionName=None, newName=None):
try:
utility.rename_collection(
old_collection_name=collectionName,
new_collection_name=newName,
using=alias if alias else self.alias,
)
except Exception as e:
raise Exception(f"Rename collection error!{str(e)}")
else:
return f"Rename collection {collectionName} to {newName} successfully!"

def get_collection_details(self, collectionName="", alias=None, collection=None):
try:
target = collection or self.getTargetCollection(
collectionName=collectionName, alias=alias
)
except Exception as e:
raise Exception(f"Get collection detail error!{str(e)}")
rows = []
schema = target.schema
partitions = target.partitions
indexes = target.indexes
fieldSchemaDetails = ""
for fieldSchema in schema.fields:
_name = f"{'*' if fieldSchema.is_primary else ''}{fieldSchema.name}"
_type = DataTypeByNum[fieldSchema.dtype]
_desc = fieldSchema.description
_params = fieldSchema.params
_dim = _params.get("dim")
_params_desc = f"dim: {_dim}" if _dim else ""
fieldSchemaDetails += f"\n - {_name} {_type} {_params_desc} {_desc}"
schemaDetails = """Description: {}\n\nAuto ID: {}\n\nFields(* is the primary field):{}""".format(
schema.description, schema.auto_id, fieldSchemaDetails
)
partitionDetails = " - " + "\n- ".join(map(lambda x: x.name, partitions))
indexesDetails = " - " + "\n- ".join(map(lambda x: x.field_name, indexes))
rows.append(["Name", target.name])
rows.append(["Description", target.description])
rows.append(["Is Empty", target.is_empty])
rows.append(["Entities", target.num_entities])
rows.append(["Primary Field", target.primary_field.name])
rows.append(["Schema", schemaDetails])
rows.append(["Partitions", partitionDetails])
rows.append(["Indexes", indexesDetails])
return tabulate(rows, tablefmt="grid")
72 changes: 72 additions & 0 deletions milvus_cli/test/test_collection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import unittest
import sys
import os

current_dir = os.path.dirname(os.path.realpath(__file__))
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)
from Connection import MilvusConnection
from Collection import MilvusCollection

uri = "http://localhost:19530"
tempAlias = "zilliz2"
collectionName = "test_collection"
newCollectionName = "test_collection2"

milvusConnection = MilvusConnection()
collection = MilvusCollection()


class TestCollection(unittest.TestCase):
def setUp(self):
milvusConnection.connect(uri=uri, alias=tempAlias)

def tearDown(self):
milvusConnection.disconnect(alias=tempAlias)

def test_create_collection(self):
fields = [
"id:VARCHAR:128",
"title:VARCHAR:512",
"title_vector:FLOAT_VECTOR:768",
]
result = collection.create_collection(
collectionName=collectionName,
fields=fields,
alias=tempAlias,
autoId=False,
description="this is a test collection",
primaryField="id",
isDynamic=True,
consistencyLevel="Strong",
)
print(result)

def test_list_collection(self):
result = collection.list_collections(alias=tempAlias)
self.assertIsInstance(result, list)

def test_has_collection(self):
result = collection.has_collection(
collectionName=collectionName, alias=tempAlias
)
self.assertTrue(result)

def test_rename_collection(self):
result = collection.rename_collection(
alias=tempAlias, collectionName=collectionName, newName=newCollectionName
)
self.assertEqual(
result,
f"Rename collection {collectionName} to {newCollectionName} successfully!",
)

def test_drop_collection(self):
result = collection.drop_collection(
collectionName=newCollectionName, alias=tempAlias
)
self.assertEqual(result, f"Drop collection {newCollectionName} successfully!")


if __name__ == "__main__":
unittest.main()