-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Msudhir/add vector update functionality #14
Changes from 28 commits
d372a09
f6d3caf
d4f9158
4861af0
48e0971
315073f
870762a
478caec
1c01035
ef4ef32
0ad2d62
e4c0c9b
bef5791
928be7b
12f57a9
9527183
76270f6
c75a01f
0578b9b
8487678
5828891
4a29d33
e209770
ebe1e32
62692e0
0680c94
5c5490d
cdadb87
e1fd230
d0c4269
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,14 +1,37 @@ | ||
import logging | ||
from datetime import datetime | ||
from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple | ||
|
||
from pydantic.typing import Literal | ||
from pymilvus import ( | ||
Collection, | ||
CollectionSchema, | ||
DataType, | ||
FieldSchema, | ||
connections, | ||
utility, | ||
) | ||
|
||
from feast import Entity, RepoConfig | ||
from feast.expediagroup.vectordb.vector_feature_view import VectorFeatureView | ||
from feast.expediagroup.vectordb.vector_online_store import VectorOnlineStore | ||
from feast.field import Field | ||
from feast.protos.feast.types.EntityKey_pb2 import EntityKey as EntityKeyProto | ||
from feast.protos.feast.types.Value_pb2 import Value as ValueProto | ||
from feast.repo_config import FeastConfigBaseModel | ||
from feast.types import ( | ||
Array, | ||
FeastType, | ||
Float32, | ||
Float64, | ||
Int32, | ||
Int64, | ||
Invalid, | ||
String, | ||
) | ||
from feast.usage import log_exceptions_and_usage | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class MilvusOnlineStoreConfig(FeastConfigBaseModel): | ||
|
@@ -17,13 +40,47 @@ class MilvusOnlineStoreConfig(FeastConfigBaseModel): | |
type: Literal["milvus"] = "milvus" | ||
"""Online store type selector""" | ||
|
||
alias: str = "default" | ||
""" alias for milvus connection""" | ||
|
||
host: str | ||
""" the host URL """ | ||
|
||
username: str | ||
""" username to connect to Milvus """ | ||
|
||
password: str | ||
""" password to connect to Milvus """ | ||
|
||
port: int = 19530 | ||
""" the port to connect to a Milvus instance. Should be the one used for GRPC (default: 19530) """ | ||
|
||
|
||
class MilvusConnectionManager: | ||
def __init__(self, online_config: RepoConfig): | ||
self.online_config = online_config | ||
|
||
def __enter__(self): | ||
# Connecting to Milvus | ||
logger.info( | ||
f"Connecting to Milvus with alias {self.online_config.alias} and host {self.online_config.host} and default port {self.online_config.port}." | ||
) | ||
connections.connect( | ||
host=self.online_config.host, | ||
username=self.online_config.username, | ||
password=self.online_config.password, | ||
use_secure=True, | ||
) | ||
|
||
def __exit__(self, exc_type, exc_value, traceback): | ||
# Disconnecting from Milvus | ||
logger.info("Closing the connection to Milvus") | ||
connections.disconnect(self.online_config.alias) | ||
logger.info("Connection Closed") | ||
if exc_type is not None: | ||
logger.error(f"An exception of type {exc_type} occurred: {exc_value}") | ||
|
||
|
||
class MilvusOnlineStore(VectorOnlineStore): | ||
def online_write_batch( | ||
self, | ||
|
@@ -49,6 +106,7 @@ def online_read( | |
"to be implemented in https://jira.expedia.biz/browse/EAPC-7972" | ||
) | ||
|
||
@log_exceptions_and_usage(online_store="milvus") | ||
def update( | ||
self, | ||
config: RepoConfig, | ||
|
@@ -58,9 +116,39 @@ def update( | |
entities_to_keep: Sequence[Entity], | ||
partial: bool, | ||
): | ||
raise NotImplementedError( | ||
"to be implemented in https://jira.expedia.biz/browse/EAPC-7970" | ||
) | ||
with MilvusConnectionManager(config.online_store): | ||
for table_to_keep in tables_to_keep: | ||
collection_available = utility.has_collection(table_to_keep.name) | ||
try: | ||
if collection_available: | ||
logger.info(f"Collection {table_to_keep.name} already exists.") | ||
else: | ||
schema = self._convert_featureview_schema_to_milvus_readable( | ||
table_to_keep.schema | ||
) | ||
|
||
collection = Collection(name=table_to_keep.name, schema=schema) | ||
logger.info(f"Collection name is {collection.name}") | ||
logger.info( | ||
f"Collection {table_to_keep.name} has been created successfully." | ||
) | ||
except Exception as e: | ||
logger.error(f"Collection update failed due to {e}") | ||
|
||
for table_to_delete in tables_to_delete: | ||
collection_available = utility.has_collection(table_to_delete.name) | ||
try: | ||
if collection_available: | ||
utility.drop_collection(table_to_delete.name) | ||
logger.info( | ||
f"Collection {table_to_delete.name} has been deleted successfully." | ||
) | ||
else: | ||
return logger.error( | ||
f"Collection {table_to_delete.name} does not exist or is already deleted." | ||
) | ||
except Exception as e: | ||
logger.error(f"Collection deletion failed due to {e}") | ||
|
||
def teardown( | ||
self, | ||
|
@@ -71,3 +159,65 @@ def teardown( | |
raise NotImplementedError( | ||
"to be implemented in https://jira.expedia.biz/browse/EAPC-7974" | ||
) | ||
|
||
def _convert_featureview_schema_to_milvus_readable( | ||
self, feast_schema: List[Field] | ||
) -> CollectionSchema: | ||
""" | ||
Converting a schema understood by Feast to a schema that is readable by Milvus so that it | ||
can be used when a collection is created in Milvus. | ||
|
||
Parameters: | ||
feast_schema (List[Field]): Schema stored in VectorFeatureView. | ||
|
||
Returns: | ||
(CollectionSchema): Schema readable by Milvus. | ||
|
||
""" | ||
boolean_mapping_from_string = {"True": True, "False": False} | ||
field_list = [] | ||
|
||
for field in feast_schema: | ||
data_type = self._feast_to_milvus_data_type(field.dtype) | ||
field_name = field.name | ||
description = field.tags.get("description") | ||
is_primary = boolean_mapping_from_string.get(field.tags.get("is_primary")) | ||
dimension = field.tags.get("dimension") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dimensions have to be retrieved from |
||
|
||
if dimension is not None: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this needed? |
||
dimension = int(field.tags.get("dimension")) | ||
# Appending the above converted values to construct a FieldSchema | ||
field_list.append( | ||
FieldSchema( | ||
name=field_name, | ||
dtype=data_type, | ||
description=description, | ||
is_primary=is_primary, | ||
dim=dimension, | ||
) | ||
) | ||
# Returning a CollectionSchema which is a list of type FieldSchema. | ||
return CollectionSchema(field_list) | ||
|
||
def _feast_to_milvus_data_type(self, feast_type: FeastType) -> DataType: | ||
""" | ||
Mapping for converting Feast data type to a data type compatible wih Milvus. | ||
|
||
Parameters: | ||
feast_type (FeastType): This is a type associated with a Feature that is stored in a VectorFeatureView, readable with Feast. | ||
|
||
Returns: | ||
DataType : DataType associated with what Milvus can understand and associate its Feature types to | ||
""" | ||
|
||
return { | ||
Int32: DataType.INT32, | ||
Int64: DataType.INT64, | ||
Float32: DataType.FLOAT, | ||
Float64: DataType.DOUBLE, | ||
String: DataType.STRING, | ||
Invalid: DataType.UNKNOWN, | ||
Array(Float32): DataType.FLOAT_VECTOR, | ||
# TODO: Need to think about list of binaries and list of bytes | ||
# FeastType.BYTES_LIST: DataType.BINARY_VECTOR | ||
}.get(feast_type, None) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -50,6 +50,7 @@ class VectorFeatureView(BaseFeatureView): | |
|
||
# inheriting from FeatureView wouldn't work due to issue with conflicting proto classes | ||
# therefore using composition instead | ||
name: str | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. feature_view already has an attribute There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It did not reflect and threw an error when I tried to add name |
||
feature_view: FeatureView | ||
vector_field: str | ||
dimensions: int | ||
|
@@ -106,7 +107,7 @@ def __init__( | |
tags=tags, | ||
owner=owner, | ||
) | ||
|
||
self.name = name | ||
self.feature_view = feature_view | ||
self.vector_field = vector_field | ||
self.dimensions = dimensions | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove return statement