Skip to content

Commit

Permalink
Add SFVs to Go serving path
Browse files Browse the repository at this point in the history
Signed-off-by: Felix Wang <wangfelix98@gmail.com>
  • Loading branch information
felixwang9817 committed Jun 16, 2022
1 parent 1f92e00 commit 47d1c45
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 2 deletions.
16 changes: 16 additions & 0 deletions go/internal/feast/featurestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,14 @@ func (fs *FeatureStore) listAllViews() (map[string]*model.FeatureView, map[strin
fvs[featureView.Base.Name] = featureView
}

streamFeatureViews, err := fs.ListStreamFeatureViews()
if err != nil {
return nil, nil, err
}
for _, streamFeatureView := range streamFeatureViews {
fvs[streamFeatureView.Base.Name] = streamFeatureView
}

onDemandFeatureViews, err := fs.registry.ListOnDemandFeatureViews(fs.config.Project)
if err != nil {
return nil, nil, err
Expand All @@ -242,6 +250,14 @@ func (fs *FeatureStore) ListFeatureViews() ([]*model.FeatureView, error) {
return featureViews, nil
}

func (fs *FeatureStore) ListStreamFeatureViews() ([]*model.FeatureView, error) {
streamFeatureViews, err := fs.registry.ListStreamFeatureViews(fs.config.Project)
if err != nil {
return streamFeatureViews, err
}
return streamFeatureViews, nil
}

func (fs *FeatureStore) ListEntities(hideDummyEntity bool) ([]*model.Entity, error) {

allEntities, err := fs.registry.ListEntities(fs.config.Project)
Expand Down
19 changes: 18 additions & 1 deletion go/internal/feast/model/featureview.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,24 @@ type FeatureView struct {

func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView {
featureView := &FeatureView{Base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features),
Ttl: &(*proto.Spec.Ttl),
Ttl: proto.Spec.Ttl,
}
if len(proto.Spec.Entities) == 0 {
featureView.EntityNames = []string{DUMMY_ENTITY_NAME}
} else {
featureView.EntityNames = proto.Spec.Entities
}
entityColumns := make([]*Field, len(proto.Spec.EntityColumns))
for i, entityColumn := range proto.Spec.EntityColumns {
entityColumns[i] = NewFieldFromProto(entityColumn)
}
featureView.EntityColumns = entityColumns
return featureView
}

func NewFeatureViewFromStreamFeatureViewProto(proto *core.StreamFeatureView) *FeatureView {
featureView := &FeatureView{Base: NewBaseFeatureView(proto.Spec.Name, proto.Spec.Features),
Ttl: proto.Spec.Ttl,
}
if len(proto.Spec.Entities) == 0 {
featureView.EntityNames = []string{DUMMY_ENTITY_NAME}
Expand Down
46 changes: 45 additions & 1 deletion go/internal/feast/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Registry struct {
cachedFeatureServices map[string]map[string]*core.FeatureService
cachedEntities map[string]map[string]*core.Entity
cachedFeatureViews map[string]map[string]*core.FeatureView
cachedStreamFeatureViews map[string]map[string]*core.StreamFeatureView
cachedOnDemandFeatureViews map[string]map[string]*core.OnDemandFeatureView
cachedRegistry *core.Registry
cachedRegistryProtoLastUpdated time.Time
Expand Down Expand Up @@ -106,10 +107,12 @@ func (r *Registry) load(registry *core.Registry) {
r.cachedFeatureServices = make(map[string]map[string]*core.FeatureService)
r.cachedEntities = make(map[string]map[string]*core.Entity)
r.cachedFeatureViews = make(map[string]map[string]*core.FeatureView)
r.cachedStreamFeatureViews = make(map[string]map[string]*core.StreamFeatureView)
r.cachedOnDemandFeatureViews = make(map[string]map[string]*core.OnDemandFeatureView)
r.loadEntities(registry)
r.loadFeatureServices(registry)
r.loadFeatureViews(registry)
r.loadStreamFeatureViews(registry)
r.loadOnDemandFeatureViews(registry)
r.cachedRegistryProtoLastUpdated = time.Now()
}
Expand Down Expand Up @@ -144,6 +147,16 @@ func (r *Registry) loadFeatureViews(registry *core.Registry) {
}
}

func (r *Registry) loadStreamFeatureViews(registry *core.Registry) {
streamFeatureViews := registry.StreamFeatureViews
for _, streamFeatureView := range streamFeatureViews {
if _, ok := r.cachedStreamFeatureViews[streamFeatureView.Spec.Project]; !ok {
r.cachedStreamFeatureViews[streamFeatureView.Spec.Project] = make(map[string]*core.StreamFeatureView)
}
r.cachedStreamFeatureViews[streamFeatureView.Spec.Project][streamFeatureView.Spec.Name] = streamFeatureView
}
}

func (r *Registry) loadOnDemandFeatureViews(registry *core.Registry) {
onDemandFeatureViews := registry.OnDemandFeatureViews
for _, onDemandFeatureView := range onDemandFeatureViews {
Expand Down Expand Up @@ -193,7 +206,26 @@ func (r *Registry) ListFeatureViews(project string) ([]*model.FeatureView, error
}

/*
Look up Feature Views inside project
Look up Stream Feature Views inside project
Returns empty list if project not found
*/

func (r *Registry) ListStreamFeatureViews(project string) ([]*model.FeatureView, error) {
if cachedStreamFeatureViews, ok := r.cachedStreamFeatureViews[project]; !ok {
return []*model.FeatureView{}, nil
} else {
streamFeatureViews := make([]*model.FeatureView, len(cachedStreamFeatureViews))
index := 0
for _, streamFeatureViewProto := range cachedStreamFeatureViews {
streamFeatureViews[index] = model.NewFeatureViewFromStreamFeatureViewProto(streamFeatureViewProto)
index += 1
}
return streamFeatureViews, nil
}
}

/*
Look up Feature Services inside project
Returns empty list if project not found
*/

Expand Down Expand Up @@ -254,6 +286,18 @@ func (r *Registry) GetFeatureView(project, featureViewName string) (*model.Featu
}
}

func (r *Registry) GetStreamFeatureView(project, streamFeatureViewName string) (*model.FeatureView, error) {
if cachedStreamFeatureViews, ok := r.cachedStreamFeatureViews[project]; !ok {
return nil, fmt.Errorf("no cached stream feature views found for project %s", project)
} else {
if streamFeatureViewProto, ok := cachedStreamFeatureViews[streamFeatureViewName]; !ok {
return nil, fmt.Errorf("no cached stream feature view %s found for project %s", streamFeatureViewName, project)
} else {
return model.NewFeatureViewFromStreamFeatureViewProto(streamFeatureViewProto), nil
}
}
}

func (r *Registry) GetFeatureService(project, featureServiceName string) (*model.FeatureService, error) {
if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok {
return nil, fmt.Errorf("no cached feature services found for project %s", project)
Expand Down

0 comments on commit 47d1c45

Please sign in to comment.