Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added list proto methods #148

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions sdk/python/feast/infra/registry/caching_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@
from feast.permissions.permission import Permission
from feast.project import Project
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList
from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureServiceList as FeatureServiceProtoList,
)
from feast.protos.feast.core.FeatureView_pb2 import (
FeatureViewList as FeatureViewProtoList,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureViewList as OnDemandFeatureViewProtoList,
)
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.saved_dataset import SavedDataset, ValidationReference
from feast.stream_feature_view import StreamFeatureView
Expand Down Expand Up @@ -467,3 +478,100 @@ def _start_thread_async_refresh(self, cache_ttl_seconds):

def _exit_handler(self):
self.registry_refresh_thread.cancel()

# Methods to improve the registry calls

@abstractmethod
def _list_feature_views_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> FeatureViewProtoList:
pass

def list_feature_views_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> FeatureViewProtoList:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_feature_views_proto(
self.cached_registry_proto, project, tags
)
return self._list_feature_views_proto(project, tags)

@abstractmethod
def _list_entities_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> EntityProtoList:
pass

def list_entities_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> EntityProtoList:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_entities_proto(
self.cached_registry_proto, project, tags
)
return self._list_entities_proto(project, tags)

@abstractmethod
def _list_data_sources_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> DataSourceProtoList:
pass

def list_data_sources_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> DataSourceProtoList:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_data_sources_proto(
self.cached_registry_proto, project, tags
)
return self._list_data_sources_proto(project, tags)

@abstractmethod
def _list_on_demand_feature_views_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> OnDemandFeatureViewProtoList:
pass

def list_on_demand_feature_views_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> OnDemandFeatureViewProtoList:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_on_demand_feature_views_proto(
self.cached_registry_proto, project, tags
)
return self._list_on_demand_feature_views_proto(project, tags)

@abstractmethod
def _list_feature_services_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> FeatureServiceProtoList:
pass

def list_feature_services_proto(
self,
project: str,
allow_cache: bool = False,
tags: Optional[dict[str, str]] = None,
) -> FeatureServiceProtoList:
if allow_cache:
self._refresh_cached_registry_if_necessary()
return proto_registry_utils.list_feature_services_proto(
self.cached_registry_proto, project, tags
)
return self._list_feature_services_proto(project, tags)
76 changes: 76 additions & 0 deletions sdk/python/feast/infra/registry/proto_registry_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@
from feast.permissions.permission import Permission
from feast.project import Project
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList
from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureServiceList as FeatureServiceProtoList,
)
from feast.protos.feast.core.FeatureView_pb2 import (
FeatureViewList as FeatureViewProtoList,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureViewList as OnDemandFeatureViewProtoList,
)
from feast.protos.feast.core.Registry_pb2 import ProjectMetadata as ProjectMetadataProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
from feast.saved_dataset import SavedDataset, ValidationReference
Expand Down Expand Up @@ -367,3 +378,68 @@ def get_project(registry_proto: RegistryProto, name: str) -> Project:
if projects_proto.spec.name == name:
return Project.from_proto(projects_proto)
raise ProjectObjectNotFoundException(name=name)


@registry_proto_cache_with_tags
def list_feature_views_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> FeatureViewProtoList:
feature_views: FeatureViewProtoList = FeatureViewProtoList()
for feature_view_proto in registry_proto.feature_views:
if feature_view_proto.spec.project == project and utils.has_all_tags(
feature_view_proto.spec.tags, tags
):
feature_views.featureviews.append(feature_view_proto)
return feature_views


@registry_proto_cache_with_tags
def list_feature_services_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> FeatureServiceProtoList:
feature_services = FeatureServiceProtoList()
for feature_service_proto in registry_proto.feature_services:
if feature_service_proto.spec.project == project and utils.has_all_tags(
feature_service_proto.spec.tags, tags
):
feature_services.featureservices.append(feature_service_proto)
return feature_services


@registry_proto_cache_with_tags
def list_on_demand_feature_views_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> OnDemandFeatureViewProtoList:
on_demand_feature_views = OnDemandFeatureViewProtoList()
for on_demand_feature_view in registry_proto.on_demand_feature_views:
if on_demand_feature_view.spec.project == project and utils.has_all_tags(
on_demand_feature_view.spec.tags, tags
):
on_demand_feature_views.ondemandfeatureviews.append(on_demand_feature_view)
return on_demand_feature_views


@registry_proto_cache_with_tags
def list_entities_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> EntityProtoList:
entities = EntityProtoList()
for entity_proto in registry_proto.entities:
if entity_proto.spec.project == project and utils.has_all_tags(
entity_proto.spec.tags, tags
):
entities.entities.append(entity_proto)
return entities


@registry_proto_cache_with_tags
def list_data_sources_proto(
registry_proto: RegistryProto, project: str, tags: Optional[dict[str, str]]
) -> DataSourceProtoList:
data_sources = DataSourceProtoList()
for data_source_proto in registry_proto.data_sources:
if data_source_proto.project == project and utils.has_all_tags(
data_source_proto.tags, tags
):
data_sources.datasources.append(data_source_proto)
return data_sources
126 changes: 125 additions & 1 deletion sdk/python/feast/infra/registry/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Dict, List, Optional, Union, cast
from typing import Any, Callable, Dict, List, Optional, Type, Union, cast

from pydantic import StrictInt, StrictStr
from sqlalchemy import ( # type: ignore
Expand Down Expand Up @@ -50,15 +50,26 @@
from feast.project import Project
from feast.project_metadata import ProjectMetadata
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
from feast.protos.feast.core.DataSource_pb2 import DataSourceList as DataSourceProtoList
from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto
from feast.protos.feast.core.Entity_pb2 import EntityList as EntityProtoList
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureService as FeatureServiceProto,
)
from feast.protos.feast.core.FeatureService_pb2 import (
FeatureServiceList as FeatureServiceProtoList,
)
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.protos.feast.core.FeatureView_pb2 import (
FeatureViewList as FeatureViewProtoList,
)
from feast.protos.feast.core.InfraObject_pb2 import Infra as InfraProto
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureView as OnDemandFeatureViewProto,
)
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
OnDemandFeatureViewList as OnDemandFeatureViewProtoList,
)
from feast.protos.feast.core.Permission_pb2 import Permission as PermissionProto
from feast.protos.feast.core.Project_pb2 import Project as ProjectProto
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
Expand Down Expand Up @@ -1326,3 +1337,116 @@ def get_project_metadata(
datetime.utcfromtimestamp(int(metadata_value))
)
return project_metadata_model

def get_objects_list(
self, proto_class: Type
) -> Union[
FeatureViewProtoList,
OnDemandFeatureViewProtoList,
EntityProtoList,
DataSourceProtoList,
FeatureServiceProtoList,
]:
# Define the mapping from proto_class to list type
proto_class_to_list = {
FeatureViewProto: FeatureViewProtoList,
OnDemandFeatureViewProto: OnDemandFeatureViewProtoList,
EntityProto: EntityProtoList,
DataSourceProto: DataSourceProtoList,
FeatureServiceProto: FeatureServiceProtoList,
}
proto_list = proto_class_to_list.get(proto_class, None)
if proto_list is None:
raise ValueError(f"Unsupported proto class: {proto_class}")
return proto_list()

def _list_objects_proto(
self,
table: Table,
project: str,
proto_class: Any,
proto_field_name: str,
tags: Optional[dict[str, str]] = None,
):
with self.read_engine.begin() as conn:
stmt = select(table).where(table.c.project_id == project)
rows = conn.execute(stmt).all()
if rows:
objects = self.get_objects_list(proto_class)
for row in rows:
obj = proto_class.FromString(row._mapping[proto_field_name])
if utils.has_all_tags(
dict(
obj.tags
if isinstance(objects, DataSourceProtoList)
else obj.spec.tags
),
tags,
):
if isinstance(objects, DataSourceProtoList):
objects.datasources.append(obj)
elif isinstance(objects, FeatureViewProtoList):
objects.featureviews.append(obj)
elif isinstance(objects, OnDemandFeatureViewProtoList):
objects.ondemandfeatureviews.append(obj)
elif isinstance(objects, EntityProtoList):
objects.entities.append(obj)
elif isinstance(objects, FeatureServiceProtoList):
objects.featureservices.append(obj)
return objects
return []

def _list_feature_services_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> FeatureServiceProtoList:
return self._list_objects_proto(
feature_services,
project,
FeatureServiceProto,
"feature_service_proto",
tags=tags,
)

def _list_feature_views_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> FeatureViewProtoList:
return self._list_objects_proto(
feature_views,
project,
FeatureViewProto,
"feature_view_proto",
tags=tags,
)

def _list_on_demand_feature_views_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> OnDemandFeatureViewProtoList:
return self._list_objects_proto(
on_demand_feature_views,
project,
OnDemandFeatureViewProto,
"feature_view_proto",
tags=tags,
)

def _list_entities_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> EntityProtoList:
return self._list_objects_proto(
entities,
project,
EntityProto,
"entity_proto",
tags=tags,
)

def _list_data_sources_proto(
self, project: str, tags: Optional[dict[str, str]]
) -> DataSourceProtoList:
return self._list_objects_proto(
data_sources,
project,
DataSourceProto,
"data_source_proto",
tags=tags,
)
Loading