Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enable stream feature view materialization #2798

Merged
merged 7 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ func (fs *FeatureStore) listAllViews() (map[string]*model.FeatureView, map[strin
fvs[featureView.Base.Name] = featureView
}

streamFeatureViews, err := fs.ListStreamFeatureViews()
if err != nil {
return nil, nil, err
}
for _, streamFeatureView := range streamFeatureViews {
fvs[streamFeatureView.Base.Name] = streamFeatureView
}

onDemandFeatureViews, err := fs.registry.ListOnDemandFeatureViews(fs.config.Project)
if err != nil {
return nil, nil, err
Expand All @@ -242,6 +250,14 @@ func (fs *FeatureStore) ListFeatureViews() ([]*model.FeatureView, error) {
return featureViews, nil
}

func (fs *FeatureStore) ListStreamFeatureViews() ([]*model.FeatureView, error) {
streamFeatureViews, err := fs.registry.ListStreamFeatureViews(fs.config.Project)
if err != nil {
return streamFeatureViews, err
}
return streamFeatureViews, nil
}

func (fs *FeatureStore) ListEntities(hideDummyEntity bool) ([]*model.Entity, error) {

allEntities, err := fs.registry.ListEntities(fs.config.Project)
Expand Down
19 changes: 18 additions & 1 deletion go/internal/feast/model/featureview.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,24 @@ type FeatureView struct {

func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView {
featureView := &FeatureView{Base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features),
Ttl: &(*proto.Spec.Ttl),
Ttl: proto.Spec.Ttl,
}
if len(proto.Spec.Entities) == 0 {
featureView.EntityNames = []string{DUMMY_ENTITY_NAME}
} else {
featureView.EntityNames = proto.Spec.Entities
}
entityColumns := make([]*Field, len(proto.Spec.EntityColumns))
for i, entityColumn := range proto.Spec.EntityColumns {
entityColumns[i] = NewFieldFromProto(entityColumn)
}
featureView.EntityColumns = entityColumns
return featureView
}

func NewFeatureViewFromStreamFeatureViewProto(proto *core.StreamFeatureView) *FeatureView {
featureView := &FeatureView{Base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features),
Ttl: proto.Spec.Ttl,
}
if len(proto.Spec.Entities) == 0 {
featureView.EntityNames = []string{DUMMY_ENTITY_NAME}
Expand Down
46 changes: 45 additions & 1 deletion go/internal/feast/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Registry struct {
cachedFeatureServices map[string]map[string]*core.FeatureService
cachedEntities map[string]map[string]*core.Entity
cachedFeatureViews map[string]map[string]*core.FeatureView
cachedStreamFeatureViews map[string]map[string]*core.StreamFeatureView
cachedOnDemandFeatureViews map[string]map[string]*core.OnDemandFeatureView
cachedRegistry *core.Registry
cachedRegistryProtoLastUpdated time.Time
Expand Down Expand Up @@ -106,10 +107,12 @@ func (r *Registry) load(registry *core.Registry) {
r.cachedFeatureServices = make(map[string]map[string]*core.FeatureService)
r.cachedEntities = make(map[string]map[string]*core.Entity)
r.cachedFeatureViews = make(map[string]map[string]*core.FeatureView)
r.cachedStreamFeatureViews = make(map[string]map[string]*core.StreamFeatureView)
r.cachedOnDemandFeatureViews = make(map[string]map[string]*core.OnDemandFeatureView)
r.loadEntities(registry)
r.loadFeatureServices(registry)
r.loadFeatureViews(registry)
r.loadStreamFeatureViews(registry)
r.loadOnDemandFeatureViews(registry)
r.cachedRegistryProtoLastUpdated = time.Now()
}
Expand Down Expand Up @@ -144,6 +147,16 @@ func (r *Registry) loadFeatureViews(registry *core.Registry) {
}
}

func (r *Registry) loadStreamFeatureViews(registry *core.Registry) {
streamFeatureViews := registry.StreamFeatureViews
for _, streamFeatureView := range streamFeatureViews {
if _, ok := r.cachedStreamFeatureViews[streamFeatureView.Spec.Project]; !ok {
r.cachedStreamFeatureViews[streamFeatureView.Spec.Project] = make(map[string]*core.StreamFeatureView)
}
r.cachedStreamFeatureViews[streamFeatureView.Spec.Project][streamFeatureView.Spec.Name] = streamFeatureView
}
}

func (r *Registry) loadOnDemandFeatureViews(registry *core.Registry) {
onDemandFeatureViews := registry.OnDemandFeatureViews
for _, onDemandFeatureView := range onDemandFeatureViews {
Expand Down Expand Up @@ -193,7 +206,26 @@ func (r *Registry) ListFeatureViews(project string) ([]*model.FeatureView, error
}

/*
Look up Feature Views inside project
Look up Stream Feature Views inside project
Returns empty list if project not found
*/

func (r *Registry) ListStreamFeatureViews(project string) ([]*model.FeatureView, error) {
if cachedStreamFeatureViews, ok := r.cachedStreamFeatureViews[project]; !ok {
return []*model.FeatureView{}, nil
} else {
streamFeatureViews := make([]*model.FeatureView, len(cachedStreamFeatureViews))
index := 0
for _, streamFeatureViewProto := range cachedStreamFeatureViews {
streamFeatureViews[index] = model.NewFeatureViewFromStreamFeatureViewProto(streamFeatureViewProto)
index += 1
}
return streamFeatureViews, nil
}
}

/*
Look up Feature Services inside project
Returns empty list if project not found
*/

Expand Down Expand Up @@ -254,6 +286,18 @@ func (r *Registry) GetFeatureView(project, featureViewName string) (*model.Featu
}
}

func (r *Registry) GetStreamFeatureView(project, streamFeatureViewName string) (*model.FeatureView, error) {
if cachedStreamFeatureViews, ok := r.cachedStreamFeatureViews[project]; !ok {
return nil, fmt.Errorf("no cached stream feature views found for project %s", project)
} else {
if streamFeatureViewProto, ok := cachedStreamFeatureViews[streamFeatureViewName]; !ok {
return nil, fmt.Errorf("no cached stream feature view %s found for project %s", streamFeatureViewName, project)
} else {
return model.NewFeatureViewFromStreamFeatureViewProto(streamFeatureViewProto), nil
}
}
}

func (r *Registry) GetFeatureService(project, featureServiceName string) (*model.FeatureService, error) {
if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok {
return nil, fmt.Errorf("no cached feature services found for project %s", project)
Expand Down
112 changes: 73 additions & 39 deletions sdk/python/feast/feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,19 @@ def _list_feature_views(
feature_views.append(fv)
return feature_views

def _list_stream_feature_views(
self, allow_cache: bool = False, hide_dummy_entity: bool = True,
) -> List[StreamFeatureView]:
stream_feature_views = []
for sfv in self._registry.list_stream_feature_views(
self.project, allow_cache=allow_cache
):
if hide_dummy_entity and sfv.entities[0] == DUMMY_ENTITY_NAME:
sfv.entities = []
sfv.entity_columns = []
stream_feature_views.append(sfv)
return stream_feature_views

@log_exceptions_and_usage
def list_on_demand_feature_views(
self, allow_cache: bool = False
Expand All @@ -289,9 +302,7 @@ def list_stream_feature_views(
Returns:
A list of stream feature views.
"""
return self._registry.list_stream_feature_views(
self.project, allow_cache=allow_cache
)
return self._list_stream_feature_views(allow_cache)

@log_exceptions_and_usage
def list_data_sources(self, allow_cache: bool = False) -> List[DataSource]:
Expand Down Expand Up @@ -558,6 +569,9 @@ def _make_inferences(
update_feature_views_with_inferred_features_and_entities(
views_to_update, entities + entities_to_update, self.config
)
update_feature_views_with_inferred_features_and_entities(
sfvs_to_update, entities + entities_to_update, self.config
)
# TODO(kevjumba): Update schema inferrence
for sfv in sfvs_to_update:
if not sfv.schema:
Expand All @@ -574,6 +588,53 @@ def _make_inferences(
for feature_service in feature_services_to_update:
feature_service.infer_features(fvs_to_update=fvs_to_update_map)

def _get_feature_views_to_materialize(
self, feature_views: Optional[List[str]],
) -> List[FeatureView]:
"""
Returns the list of feature views that should be materialized.

If no feature views are specified, all feature views will be returned.

Args:
feature_views: List of names of feature views to materialize.

Raises:
FeatureViewNotFoundException: One of the specified feature views could not be found.
ValueError: One of the specified feature views is not configured for materialization.
"""
feature_views_to_materialize: List[FeatureView] = []

if feature_views is None:
feature_views_to_materialize = self._list_feature_views(
hide_dummy_entity=False
)
feature_views_to_materialize = [
fv for fv in feature_views_to_materialize if fv.online
]
stream_feature_views_to_materialize = self._list_stream_feature_views(
hide_dummy_entity=False
)
feature_views_to_materialize += [
sfv for sfv in stream_feature_views_to_materialize if sfv.online
]
else:
for name in feature_views:
try:
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
except FeatureViewNotFoundException:
feature_view = self._get_stream_feature_view(
name, hide_dummy_entity=False
)

if not feature_view.online:
raise ValueError(
f"FeatureView {feature_view.name} is not configured to be served online."
)
feature_views_to_materialize.append(feature_view)

return feature_views_to_materialize

@log_exceptions_and_usage
def _plan(
self, desired_repo_contents: RepoContents
Expand Down Expand Up @@ -873,8 +934,8 @@ def apply(

self._get_provider().update_infra(
project=self.project,
tables_to_delete=views_to_delete if not partial else [],
tables_to_keep=views_to_update,
tables_to_delete=views_to_delete + sfvs_to_delete if not partial else [],
tables_to_keep=views_to_update + sfvs_to_update,
entities_to_delete=entities_to_delete if not partial else [],
entities_to_keep=entities_to_update,
partial=partial,
Expand Down Expand Up @@ -1151,23 +1212,9 @@ def materialize_incremental(
<BLANKLINE>
...
"""
feature_views_to_materialize: List[FeatureView] = []
if feature_views is None:
feature_views_to_materialize = self._list_feature_views(
hide_dummy_entity=False
)
feature_views_to_materialize = [
fv for fv in feature_views_to_materialize if fv.online
]
else:
for name in feature_views:
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
if not feature_view.online:
raise ValueError(
f"FeatureView {feature_view.name} is not configured to be served online."
)
feature_views_to_materialize.append(feature_view)

feature_views_to_materialize = self._get_feature_views_to_materialize(
feature_views
)
_print_materialization_log(
None,
end_date,
Expand Down Expand Up @@ -1258,23 +1305,9 @@ def materialize(
f"The given start_date {start_date} is greater than the given end_date {end_date}."
)

feature_views_to_materialize: List[FeatureView] = []
if feature_views is None:
feature_views_to_materialize = self._list_feature_views(
hide_dummy_entity=False
)
feature_views_to_materialize = [
fv for fv in feature_views_to_materialize if fv.online
]
else:
for name in feature_views:
feature_view = self._get_feature_view(name, hide_dummy_entity=False)
if not feature_view.online:
raise ValueError(
f"FeatureView {feature_view.name} is not configured to be served online."
)
feature_views_to_materialize.append(feature_view)

feature_views_to_materialize = self._get_feature_views_to_materialize(
feature_views
)
_print_materialization_log(
start_date,
end_date,
Expand Down Expand Up @@ -1327,6 +1360,7 @@ def push(
from feast.data_source import PushSource

all_fvs = self.list_feature_views(allow_cache=allow_registry_cache)
all_fvs += self.list_stream_feature_views(allow_cache=allow_registry_cache)

fvs_with_push_sources = {
fv
Expand Down
4 changes: 4 additions & 0 deletions sdk/python/feast/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ def update_feature_views_with_inferred_features_and_entities(
other columns except designated timestamp columns are considered to be feature columns. If
the feature view already has features, feature inference is skipped.

Note that this inference logic currently does not take any transformations (either a UDF or
aggregations) into account. For example, even if a stream feature view has a transformation,
this method assumes that the batch source contains transformed data with the correct final schema.

Args:
fvs: The feature views to be updated.
entities: A list containing entities associated with the feature views.
Expand Down
24 changes: 24 additions & 0 deletions sdk/python/feast/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,30 @@ def apply_materialization(
self.commit()
return

for idx, existing_stream_feature_view_proto in enumerate(
self.cached_registry_proto.stream_feature_views
):
if (
existing_stream_feature_view_proto.spec.name == feature_view.name
and existing_stream_feature_view_proto.spec.project == project
):
existing_stream_feature_view = StreamFeatureView.from_proto(
existing_stream_feature_view_proto
)
existing_stream_feature_view.materialization_intervals.append(
(start_date, end_date)
)
existing_stream_feature_view.last_updated_timestamp = datetime.utcnow()
stream_feature_view_proto = existing_stream_feature_view.to_proto()
stream_feature_view_proto.spec.project = project
del self.cached_registry_proto.stream_feature_views[idx]
self.cached_registry_proto.stream_feature_views.append(
stream_feature_view_proto
)
if commit:
self.commit()
return

raise FeatureViewNotFoundException(feature_view.name, project)

def list_feature_views(
Expand Down
Loading