Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor tag methods to infer created, deleted, and kept repo objects #2142

Merged
merged 7 commits into from
Dec 15, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions sdk/python/feast/diff/FcoDiff.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
from dataclasses import dataclass
from enum import Enum
from typing import Any, List, Set, Tuple, Union

from feast.entity import Entity
from feast.feature_service import FeatureService
from feast.feature_table import FeatureTable
from feast.feature_view import FeatureView
from feast.on_demand_feature_view import OnDemandFeatureView
from feast.request_feature_view import RequestFeatureView


@dataclass
class PropertyDiff:
property_name: str
val_existing: str
val_declared: str


class TransitionType(Enum):
UNKNOWN = 0
CREATE = 1
DELETE = 2
UPDATE = 3
UNCHANGED = 4


@dataclass
class FcoDiff:
current_fco: Any
new_fco: Any
fco_property_diffs: List[PropertyDiff]
transition_type: TransitionType


class RegistryDiff:
fco_diffs: List[FcoDiff]

def __init__(self):
self.fco_diffs = []

def add_fco_diff(self, fco_diff: FcoDiff):
self.fco_diffs.append(fco_diff)


def _tag_registry_entities_for_keep_delete(
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
existing_entities: Set[Entity], desired_entities: Set[Entity]
) -> Tuple[Set[Entity], Set[Entity], Set[Entity]]:
felixwang9817 marked this conversation as resolved.
Show resolved Hide resolved
existing_entity_names = {e.name for e in existing_entities}
desired_entity_names = {e.name for e in desired_entities}

entities_to_add = {
e for e in desired_entities if e.name not in existing_entity_names
}
entities_to_keep = {e for e in desired_entities if e.name in existing_entity_names}
entities_to_delete = {
e for e in existing_entities if e.name not in desired_entity_names
}

return entities_to_keep, entities_to_delete, entities_to_add


def _tag_registry_views_for_keep_delete(
existing_views: Union[
Set[FeatureView], Set[RequestFeatureView], Set[OnDemandFeatureView]
],
desired_views: Union[
Set[FeatureView], Set[RequestFeatureView], Set[OnDemandFeatureView]
],
) -> Tuple[
Set[Union[FeatureView, RequestFeatureView, OnDemandFeatureView]],
Set[Union[FeatureView, RequestFeatureView, OnDemandFeatureView]],
Set[Union[FeatureView, RequestFeatureView, OnDemandFeatureView]],
]:
existing_view_names = {v.name for v in existing_views}
desired_view_names = {v.name for v in desired_views}

views_to_add = {v for v in desired_views if v.name not in existing_view_names}
views_to_keep = {v for v in desired_views if v.name in existing_view_names}
views_to_delete = {v for v in existing_views if v.name not in desired_view_names}
return views_to_keep, views_to_delete, views_to_add


def _tag_registry_tables_for_keep_delete(
existing_tables: Set[FeatureTable], desired_tables: Set[FeatureTable]
) -> Tuple[Set[FeatureTable], Set[FeatureTable], Set[FeatureTable]]:
existing_table_names = {v.name for v in existing_tables}
desired_table_names = {v.name for v in desired_tables}

tables_to_add = {t for t in desired_tables if t.name not in existing_table_names}
tables_to_keep = {t for t in desired_tables if t.name in existing_table_names}
tables_to_delete = {t for t in existing_tables if t.name not in desired_table_names}
return tables_to_keep, tables_to_delete, tables_to_add


def _tag_registry_services_for_keep_delete(
existing_service: Set[FeatureService], desired_service: Set[FeatureService]
) -> Tuple[Set[FeatureService], Set[FeatureService], Set[FeatureService]]:
existing_service_names = {v.name for v in existing_service}
desired_service_names = {v.name for v in desired_service}

services_to_add = {
s for s in desired_service if s.name not in existing_service_names
}
services_to_delete = {
s for s in existing_service if s.name not in desired_service_names
}
services_to_keep = {s for s in desired_service if s.name in existing_service_names}
return services_to_keep, services_to_delete, services_to_add
Empty file.
34 changes: 18 additions & 16 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from feast.feature_service import FeatureService
from feast.feature_table import FeatureTable
from feast.feature_view import (
DUMMY_ENTITY,
DUMMY_ENTITY_ID,
DUMMY_ENTITY_NAME,
DUMMY_ENTITY_VAL,
Expand All @@ -61,7 +62,6 @@
from feast.request_feature_view import RequestFeatureView
from feast.type_map import python_value_to_proto_value
from feast.usage import log_exceptions, log_exceptions_and_usage, set_usage_attribute
from feast.value_type import ValueType
from feast.version import get_version

warnings.simplefilter("once", DeprecationWarning)
Expand Down Expand Up @@ -379,16 +379,18 @@ def apply(
]
],
],
objects_to_delete: List[
Union[
FeatureView,
OnDemandFeatureView,
RequestFeatureView,
Entity,
FeatureService,
FeatureTable,
objects_to_delete: Optional[
List[
Union[
FeatureView,
OnDemandFeatureView,
RequestFeatureView,
Entity,
FeatureService,
FeatureTable,
]
]
] = [],
] = None,
partial: bool = True,
):
"""Register objects to metadata store and update related infrastructure.
Expand Down Expand Up @@ -435,6 +437,9 @@ def apply(

assert isinstance(objects, list)

if not objects_to_delete:
objects_to_delete = []

# Separate all objects into entities, feature services, and different feature view types.
entities_to_update = [ob for ob in objects if isinstance(ob, Entity)]
views_to_update = [ob for ob in objects if isinstance(ob, FeatureView)]
Expand Down Expand Up @@ -484,11 +489,6 @@ def apply(
odfv.infer_features()

# Handle all entityless feature views by using DUMMY_ENTITY as a placeholder entity.
DUMMY_ENTITY = Entity(
name=DUMMY_ENTITY_NAME,
join_key=DUMMY_ENTITY_ID,
value_type=ValueType.INT32,
)
entities_to_update.append(DUMMY_ENTITY)

# Add all objects to the registry and update the provider's infrastructure.
Expand Down Expand Up @@ -1560,7 +1560,9 @@ def _validate_feature_views(feature_views: List[BaseFeatureView]):
case_insensitive_fv_name = fv.name.lower()
if case_insensitive_fv_name in fv_names:
raise ValueError(
f"More than one feature view with name {case_insensitive_fv_name} found. Please ensure that all feature view names are case-insensitively unique. It may be necessary to ignore certain files in your feature repository by using a .feastignore file."
f"More than one feature view with name {case_insensitive_fv_name} found. "
f"Please ensure that all feature view names are case-insensitively unique. "
f"It may be necessary to ignore certain files in your feature repository by using a .feastignore file."
)
else:
fv_names.add(case_insensitive_fv_name)
4 changes: 4 additions & 0 deletions sdk/python/feast/feature_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from feast import utils
from feast.base_feature_view import BaseFeatureView
from feast.data_source import DataSource
from feast.entity import Entity
from feast.feature import Feature
from feast.feature_view_projection import FeatureViewProjection
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
Expand All @@ -42,6 +43,9 @@
DUMMY_ENTITY_ID = "__dummy_id"
DUMMY_ENTITY_NAME = "__dummy"
DUMMY_ENTITY_VAL = ""
DUMMY_ENTITY = Entity(
name=DUMMY_ENTITY_NAME, join_key=DUMMY_ENTITY_ID, value_type=ValueType.INT32,
)


class FeatureView(BaseFeatureView):
Expand Down
77 changes: 64 additions & 13 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
from collections import defaultdict
from datetime import datetime, timedelta
from pathlib import Path
Expand All @@ -24,6 +24,12 @@

from feast import importer
from feast.base_feature_view import BaseFeatureView
from feast.diff.FcoDiff import (
FcoDiff,
RegistryDiff,
TransitionType,
_tag_registry_entities_for_keep_delete,
)
from feast.entity import Entity
from feast.errors import (
ConflictingFeatureViewNames,
Expand Down Expand Up @@ -57,6 +63,8 @@
"": "LocalRegistryStore",
}

logger = logging.getLogger(__name__)


def get_registry_store_class_from_type(registry_store_type: str):
if not registry_store_type.endswith("RegistryStore"):
Expand Down Expand Up @@ -95,7 +103,9 @@ class Registry:
cached_registry_proto_ttl: timedelta
cache_being_updated: bool = False

def __init__(self, registry_config: RegistryConfig, repo_path: Path):
def __init__(
self, registry_config: Optional[RegistryConfig], repo_path: Optional[Path]
):
"""
Create the Registry object.

Expand All @@ -104,20 +114,60 @@ def __init__(self, registry_config: RegistryConfig, repo_path: Path):
repo_path: Path to the base of the Feast repository
or where it will be created if it does not exist yet.
"""
registry_store_type = registry_config.registry_store_type
registry_path = registry_config.path
if registry_store_type is None:
cls = get_registry_store_class_from_scheme(registry_path)
else:
cls = get_registry_store_class_from_type(str(registry_store_type))

self._registry_store = cls(registry_config, repo_path)
self.cached_registry_proto_ttl = timedelta(
seconds=registry_config.cache_ttl_seconds
if registry_config.cache_ttl_seconds is not None
else 0
if registry_config:
registry_store_type = registry_config.registry_store_type
registry_path = registry_config.path
if registry_store_type is None:
cls = get_registry_store_class_from_scheme(registry_path)
else:
cls = get_registry_store_class_from_type(str(registry_store_type))

self._registry_store = cls(registry_config, repo_path)
self.cached_registry_proto_ttl = timedelta(
seconds=registry_config.cache_ttl_seconds
if registry_config.cache_ttl_seconds is not None
else 0
)

@classmethod
def from_proto(cls, regsitry_proto: RegistryProto):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: regsitry is mispelled

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q: why do we need a from_proto for the registry? I thought the registry contents would always be persisted by the registry store

to clarify, even when we want to compare two registries, we will be comparing protos; I don't think we'll need to reconstruct a registry from a proto?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah fair, I can change this to operate on the protos themselves. I think I misunderstood earlier

registry = cls(None, None)
registry.cached_registry_proto = regsitry_proto
registry.cached_registry_proto_created = datetime.utcnow()
registry.cached_registry_proto_ttl = timedelta(days=1)
registry.cache_being_updated = True
return registry

# TODO(achals): This method needs to be filled out and used in the feast plan/apply methods.
@staticmethod
def diff_between(
project: str, current_registry: "Registry", new_registry: "Registry"
) -> RegistryDiff:
diff = RegistryDiff()

# Handle Entities
(
entities_to_keep,
entities_to_delete,
entities_to_add,
) = _tag_registry_entities_for_keep_delete(
set(current_registry.list_entities(project=project, allow_cache=True)),
set(new_registry.list_entities(project=project, allow_cache=True)),
)

for e in entities_to_add:
diff.add_fco_diff(FcoDiff(None, e, [], TransitionType.CREATE))
for e in entities_to_delete:
diff.add_fco_diff(FcoDiff(e, None, [], TransitionType.DELETE))

# Handle Feature Views
# Handle On Demand Feature Views
# Handle Request Feature Views
# Handle Feature Services
logger.info(f"Diff: {diff}")
return diff

def _initialize_registry(self):
"""Explicitly initializes the registry with an empty proto if it doesn't exist."""
try:
Expand Down Expand Up @@ -752,6 +802,7 @@ def _get_registry_proto(self, allow_cache: bool = False) -> RegistryProto:
> (self.cached_registry_proto_created + self.cached_registry_proto_ttl)
)
)

if allow_cache and (not expired or self.cache_being_updated):
assert isinstance(self.cached_registry_proto, RegistryProto)
return self.cached_registry_proto
Expand Down
Loading