diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 3d6d3b1a4b..11be8ee94c 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -4,14 +4,15 @@ import ( "context" "errors" "fmt" + "sort" + "strings" + "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" - "sort" - "strings" ) type FeatureStore struct { @@ -92,7 +93,7 @@ func (fs *FeatureStore) GetOnlineFeatures(ctx context.Context, request *serving. return nil, err } - fvs, requestedFeatureViews, requestedRequestFeatureViews, requestedOnDemandFeatureViews, err := fs.getFeatureViewsToUse(features, true, false) + fvs, requestedFeatureViews, requestedRequestFeatureViews, requestedOnDemandFeatureViews, err := fs.getFeatureViewsToUse(features, false) if len(requestedRequestFeatureViews)+len(requestedOnDemandFeatureViews) > 0 { return nil, status.Errorf(codes.InvalidArgument, "on demand feature views are currently not supported") @@ -253,22 +254,31 @@ func (fs *FeatureStore) getFeatureRefs(features *Features) ([]string, error) { retrieving all feature views. Similar argument to FeatureService applies. */ -func (fs *FeatureStore) getFeatureViewsToUse(features *Features, allowCache, hideDummyEntity bool) (map[string]*FeatureView, map[*FeatureView][]string, []*RequestFeatureView, []*OnDemandFeatureView, error) { +func (fs *FeatureStore) getFeatureViewsToUse(features *Features, hideDummyEntity bool) (map[string]*FeatureView, map[*FeatureView][]string, []*RequestFeatureView, []*OnDemandFeatureView, error) { fvs := make(map[string]*FeatureView) requestFvs := make(map[string]*RequestFeatureView) odFvs := make(map[string]*OnDemandFeatureView) - featureViews := fs.listFeatureViews(allowCache, hideDummyEntity) + featureViews, err := fs.listFeatureViews(hideDummyEntity) + if err != nil { + return nil, nil, nil, nil, err + } for _, featureView := range featureViews { fvs[featureView.base.name] = featureView } - requestFeatureViews := fs.registry.listRequestFeatureViews(fs.config.Project) + requestFeatureViews, err := fs.registry.listRequestFeatureViews(fs.config.Project) + if err != nil { + return nil, nil, nil, nil, err + } for _, requestFeatureView := range requestFeatureViews { requestFvs[requestFeatureView.base.name] = requestFeatureView } - onDemandFeatureViews := fs.registry.listOnDemandFeatureViews(fs.config.Project) + onDemandFeatureViews, err := fs.registry.listOnDemandFeatureViews(fs.config.Project) + if err != nil { + return nil, nil, nil, nil, err + } for _, onDemandFeatureView := range onDemandFeatureViews { odFvs[onDemandFeatureView.base.name] = onDemandFeatureView } @@ -347,7 +357,10 @@ func (fs *FeatureStore) getEntityMaps(requestedFeatureViews map[*FeatureView][]s var joinKeyMap map[string]string var featureView *FeatureView - entities := fs.listEntities(true, false) + entities, err := fs.listEntities(false) + if err != nil { + return nil, err + } for _, entity := range entities { entityNameToJoinKeyMap[entity.name] = entity.joinKey @@ -636,26 +649,31 @@ func (fs *FeatureStore) dropUnneededColumns(onlineFeaturesResponse *serving.GetO } } -func (fs *FeatureStore) listFeatureViews(allowCache, hideDummyEntity bool) []*FeatureView { - featureViews := fs.registry.listFeatureViews(fs.config.Project) - for _, featureView := range featureViews { - if _, ok := featureView.entities[DUMMY_ENTITY_NAME]; ok && hideDummyEntity { - featureView.entities = make(map[string]struct{}) - } +func (fs *FeatureStore) listFeatureViews(hideDummyEntity bool) ([]*FeatureView, error) { + featureViews, err := fs.registry.listFeatureViews(fs.config.Project) + if err != nil { + return featureViews, err } - return featureViews + return featureViews, nil +} + +func (fs *FeatureStore) listRequestFeatureViews() ([]*RequestFeatureView, error) { + return fs.registry.listRequestFeatureViews(fs.config.Project) } -func (fs *FeatureStore) listEntities(allowCache, hideDummyEntity bool) []*Entity { +func (fs *FeatureStore) listEntities(hideDummyEntity bool) ([]*Entity, error) { - allEntities := fs.registry.listEntities(fs.config.Project) + allEntities, err := fs.registry.listEntities(fs.config.Project) + if err != nil { + return allEntities, err + } entities := make([]*Entity, 0) for _, entity := range allEntities { if entity.name != DUMMY_ENTITY_NAME || !hideDummyEntity { entities = append(entities, entity) } } - return entities + return entities, nil } func (fs *FeatureStore) getFvEntityValues(fv *FeatureView, @@ -820,7 +838,7 @@ func (fs *FeatureStore) groupFeatureRefs(requestedFeatureViews map[*FeatureView] return fvFeatures, nil } -func (fs *FeatureStore) getFeatureView(project, featureViewName string, allowCache, hideDummyEntity bool) (*FeatureView, error) { +func (fs *FeatureStore) getFeatureView(project, featureViewName string, hideDummyEntity bool) (*FeatureView, error) { fv, err := fs.registry.getFeatureView(fs.config.Project, featureViewName) if err != nil { return nil, err diff --git a/go/internal/feast/registry.go b/go/internal/feast/registry.go index f8ba83c45d..8963703f51 100644 --- a/go/internal/feast/registry.go +++ b/go/internal/feast/registry.go @@ -3,9 +3,11 @@ package feast import ( "errors" "fmt" - "github.com/feast-dev/feast/go/protos/feast/core" "net/url" + "sync" "time" + + "github.com/feast-dev/feast/go/protos/feast/core" ) var REGISTRY_SCHEMA_VERSION string = "1" @@ -22,15 +24,16 @@ var REGISTRY_STORE_CLASS_FOR_SCHEME map[string]string = map[string]string{ */ type Registry struct { - registryStore RegistryStore - cachedFeatureServices map[string]map[string]*core.FeatureService - cachedEntities map[string]map[string]*core.Entity - cachedFeatureViews map[string]map[string]*core.FeatureView - cachedOnDemandFeatureViews map[string]map[string]*core.OnDemandFeatureView - cachedRequestFeatureViews map[string]map[string]*core.RequestFeatureView - - cachedRegistryProtoCreated time.Time - cachedRegistryProtoTtl time.Duration + registryStore RegistryStore + cachedFeatureServices map[string]map[string]*core.FeatureService + cachedEntities map[string]map[string]*core.Entity + cachedFeatureViews map[string]map[string]*core.FeatureView + cachedOnDemandFeatureViews map[string]map[string]*core.OnDemandFeatureView + cachedRequestFeatureViews map[string]map[string]*core.RequestFeatureView + cachedRegistry *core.Registry + cachedRegistryProtoLastUpdated time.Time + cachedRegistryProtoTtl time.Duration + mu sync.Mutex } func NewRegistry(registryConfig *RegistryConfig, repoPath string) (*Registry, error) { @@ -41,7 +44,7 @@ func NewRegistry(registryConfig *RegistryConfig, repoPath string) (*Registry, er } if len(registryStoreType) == 0 { - registryStore, err := getRegistryStoreFromSheme(registryPath, registryConfig, repoPath) + registryStore, err := getRegistryStoreFromScheme(registryPath, registryConfig, repoPath) if err != nil { return nil, err } @@ -58,33 +61,47 @@ func NewRegistry(registryConfig *RegistryConfig, repoPath string) (*Registry, er } func (r *Registry) initializeRegistry() { - err := r.getRegistryProto(false) + _, err := r.getRegistryProto() if err != nil { registryProto := &core.Registry{RegistrySchemaVersion: REGISTRY_SCHEMA_VERSION} r.registryStore.UpdateRegistryProto(registryProto) - r.load(registryProto) + go r.refreshRegistryOnInterval() + } +} + +func (r *Registry) refreshRegistryOnInterval() { + ticker := time.NewTicker(r.cachedRegistryProtoTtl) + for ; true; <-ticker.C { + err := r.refresh() + if err != nil { + return + } } } // TODO: Add a goroutine and automatically refresh every cachedRegistryProtoTtl func (r *Registry) refresh() error { - return r.getRegistryProto(false) + _, err := r.getRegistryProto() + return err } -func (r *Registry) getRegistryProto(allowCache bool) error { - expired := r.cachedFeatureServices == nil || (r.cachedRegistryProtoTtl > 0 && time.Now().After(r.cachedRegistryProtoCreated.Add(r.cachedRegistryProtoTtl))) - if allowCache && !expired { - return nil +func (r *Registry) getRegistryProto() (*core.Registry, error) { + expired := r.cachedRegistry == nil || (r.cachedRegistryProtoTtl > 0 && time.Now().After(r.cachedRegistryProtoLastUpdated.Add(r.cachedRegistryProtoTtl))) + if !expired { + return r.cachedRegistry, nil } registryProto, err := r.registryStore.GetRegistryProto() if err != nil { - return err + return registryProto, err } r.load(registryProto) - return nil + return registryProto, nil } func (r *Registry) load(registry *core.Registry) { + r.mu.Lock() + defer r.mu.Unlock() + r.cachedRegistry = registry r.cachedFeatureServices = make(map[string]map[string]*core.FeatureService) r.cachedEntities = make(map[string]map[string]*core.Entity) r.cachedFeatureViews = make(map[string]map[string]*core.FeatureView) @@ -95,6 +112,7 @@ func (r *Registry) load(registry *core.Registry) { r.loadFeatureViews(registry) r.loadOnDemandFeatureViews(registry) r.loadRequestFeatureViews(registry) + r.cachedRegistryProtoLastUpdated = time.Now() } func (r *Registry) loadEntities(registry *core.Registry) { @@ -152,17 +170,17 @@ func (r *Registry) loadRequestFeatureViews(registry *core.Registry) { Returns empty list if project not found */ -func (r *Registry) listEntities(project string) []*Entity { - if entities, ok := r.cachedEntities[project]; !ok { - return []*Entity{} +func (r *Registry) listEntities(project string) ([]*Entity, error) { + if cachedEntities, ok := r.cachedEntities[project]; !ok { + return []*Entity{}, nil } else { - entityList := make([]*Entity, len(entities)) + entities := make([]*Entity, len(cachedEntities)) index := 0 - for _, entity := range entities { - entityList[index] = NewEntityFromProto(entity) + for _, entityProto := range cachedEntities { + entities[index] = NewEntityFromProto(entityProto) index += 1 } - return entityList + return entities, nil } } @@ -171,17 +189,17 @@ func (r *Registry) listEntities(project string) []*Entity { Returns empty list if project not found */ -func (r *Registry) listFeatureViews(project string) []*FeatureView { - if featureViewProtos, ok := r.cachedFeatureViews[project]; !ok { - return []*FeatureView{} +func (r *Registry) listFeatureViews(project string) ([]*FeatureView, error) { + if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok { + return []*FeatureView{}, nil } else { - featureViews := make([]*FeatureView, len(featureViewProtos)) + featureViews := make([]*FeatureView, len(cachedFeatureViews)) index := 0 - for _, featureViewProto := range featureViewProtos { + for _, featureViewProto := range cachedFeatureViews { featureViews[index] = NewFeatureViewFromProto(featureViewProto) index += 1 } - return featureViews + return featureViews, nil } } @@ -190,17 +208,17 @@ func (r *Registry) listFeatureViews(project string) []*FeatureView { Returns empty list if project not found */ -func (r *Registry) listFeatureServices(project string) []*FeatureService { - if featureServiceProtos, ok := r.cachedFeatureServices[project]; !ok { - return []*FeatureService{} +func (r *Registry) listFeatureServices(project string) ([]*FeatureService, error) { + if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok { + return []*FeatureService{}, nil } else { - featureServices := make([]*FeatureService, len(featureServiceProtos)) + featureServices := make([]*FeatureService, len(cachedFeatureServices)) index := 0 - for _, featureServiceProto := range featureServiceProtos { + for _, featureServiceProto := range cachedFeatureServices { featureServices[index] = NewFeatureServiceFromProto(featureServiceProto) index += 1 } - return featureServices + return featureServices, nil } } @@ -209,17 +227,17 @@ func (r *Registry) listFeatureServices(project string) []*FeatureService { Returns empty list if project not found */ -func (r *Registry) listOnDemandFeatureViews(project string) []*OnDemandFeatureView { - if onDemandFeatureViewProtos, ok := r.cachedOnDemandFeatureViews[project]; !ok { - return []*OnDemandFeatureView{} +func (r *Registry) listOnDemandFeatureViews(project string) ([]*OnDemandFeatureView, error) { + if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok { + return []*OnDemandFeatureView{}, nil } else { - onDemandFeatureViews := make([]*OnDemandFeatureView, len(onDemandFeatureViewProtos)) + onDemandFeatureViews := make([]*OnDemandFeatureView, len(cachedOnDemandFeatureViews)) index := 0 - for _, onDemandFeatureViewProto := range onDemandFeatureViewProtos { + for _, onDemandFeatureViewProto := range cachedOnDemandFeatureViews { onDemandFeatureViews[index] = NewOnDemandFeatureViewFromProto(onDemandFeatureViewProto) index += 1 } - return onDemandFeatureViews + return onDemandFeatureViews, nil } } @@ -228,26 +246,26 @@ func (r *Registry) listOnDemandFeatureViews(project string) []*OnDemandFeatureVi Returns empty list if project not found */ -func (r *Registry) listRequestFeatureViews(project string) []*RequestFeatureView { - if requestFeatureViewProtos, ok := r.cachedRequestFeatureViews[project]; !ok { - return []*RequestFeatureView{} +func (r *Registry) listRequestFeatureViews(project string) ([]*RequestFeatureView, error) { + if cachedRequestFeatureViews, ok := r.cachedRequestFeatureViews[project]; !ok { + return []*RequestFeatureView{}, nil } else { - requestFeatureViews := make([]*RequestFeatureView, len(requestFeatureViewProtos)) + requestFeatureViews := make([]*RequestFeatureView, len(cachedRequestFeatureViews)) index := 0 - for _, requestFeatureViewProto := range requestFeatureViewProtos { + for _, requestFeatureViewProto := range cachedRequestFeatureViews { requestFeatureViews[index] = NewRequestFeatureViewFromProto(requestFeatureViewProto) index += 1 } - return requestFeatureViews + return requestFeatureViews, nil } } func (r *Registry) getEntity(project, entityName string) (*Entity, error) { - if entities, ok := r.cachedEntities[project]; !ok { - return nil, errors.New(fmt.Sprintf("project %s not found in getEntity", project)) + if cachedEntities, ok := r.cachedEntities[project]; !ok { + return nil, fmt.Errorf("no cached entities found for project %s", project) } else { - if entity, ok := entities[entityName]; !ok { - return nil, errors.New(fmt.Sprintf("entity %s not found inside project %s", entityName, project)) + if entity, ok := cachedEntities[entityName]; !ok { + return nil, fmt.Errorf("no cached entity %s found for project %s", entityName, project) } else { return NewEntityFromProto(entity), nil } @@ -255,11 +273,11 @@ func (r *Registry) getEntity(project, entityName string) (*Entity, error) { } func (r *Registry) getFeatureView(project, featureViewName string) (*FeatureView, error) { - if featureViews, ok := r.cachedFeatureViews[project]; !ok { - return nil, errors.New(fmt.Sprintf("project %s not found in getFeatureView", project)) + if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok { + return nil, fmt.Errorf("no cached feature views found for project %s", project) } else { - if featureViewProto, ok := featureViews[featureViewName]; !ok { - return nil, errors.New(fmt.Sprintf("featureView %s not found inside project %s", featureViewName, project)) + if featureViewProto, ok := cachedFeatureViews[featureViewName]; !ok { + return nil, fmt.Errorf("no cached feature view %s found for project %s", featureViewName, project) } else { return NewFeatureViewFromProto(featureViewProto), nil } @@ -267,11 +285,11 @@ func (r *Registry) getFeatureView(project, featureViewName string) (*FeatureView } func (r *Registry) getFeatureService(project, featureServiceName string) (*FeatureService, error) { - if featureServices, ok := r.cachedFeatureServices[project]; !ok { - return nil, errors.New(fmt.Sprintf("project %s not found in getFeatureService", project)) + if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok { + return nil, fmt.Errorf("no cached feature services found for project %s", project) } else { - if featureServiceProto, ok := featureServices[featureServiceName]; !ok { - return nil, errors.New(fmt.Sprintf("featureService %s not found inside project %s", featureServiceName, project)) + if featureServiceProto, ok := cachedFeatureServices[featureServiceName]; !ok { + return nil, fmt.Errorf("no cached feature service %s found for project %s", featureServiceName, project) } else { return NewFeatureServiceFromProto(featureServiceProto), nil } @@ -279,11 +297,11 @@ func (r *Registry) getFeatureService(project, featureServiceName string) (*Featu } func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName string) (*OnDemandFeatureView, error) { - if onDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok { - return nil, errors.New(fmt.Sprintf("project %s not found in getOnDemandFeatureView", project)) + if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok { + return nil, fmt.Errorf("no cached on demand feature views found for project %s", project) } else { - if onDemandFeatureViewProto, ok := onDemandFeatureViews[onDemandFeatureViewName]; !ok { - return nil, errors.New(fmt.Sprintf("onDemandFeatureView %s not found inside project %s", onDemandFeatureViewName, project)) + if onDemandFeatureViewProto, ok := cachedOnDemandFeatureViews[onDemandFeatureViewName]; !ok { + return nil, fmt.Errorf("no cached on demand feature view %s found for project %s", onDemandFeatureViewName, project) } else { return NewOnDemandFeatureViewFromProto(onDemandFeatureViewProto), nil } @@ -291,18 +309,18 @@ func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName strin } func (r *Registry) getRequestFeatureView(project, requestFeatureViewName string) (*RequestFeatureView, error) { - if requestFeatureViews, ok := r.cachedRequestFeatureViews[project]; !ok { - return nil, errors.New(fmt.Sprintf("project %s not found in getRequestFeatureView", project)) + if cachedRequestFeatureViews, ok := r.cachedRequestFeatureViews[project]; !ok { + return nil, fmt.Errorf("no cached on request feature views found for project %s", project) } else { - if requestFeatureViewProto, ok := requestFeatureViews[requestFeatureViewName]; !ok { - return nil, errors.New(fmt.Sprintf("requestFeatureView %s not found inside project %s", requestFeatureViewName, project)) + if requestFeatureViewProto, ok := cachedRequestFeatureViews[requestFeatureViewName]; !ok { + return nil, fmt.Errorf("no cached request feature view %s found for project %s", requestFeatureViewName, project) } else { return NewRequestFeatureViewFromProto(requestFeatureViewProto), nil } } } -func getRegistryStoreFromSheme(registryPath string, registryConfig *RegistryConfig, repoPath string) (RegistryStore, error) { +func getRegistryStoreFromScheme(registryPath string, registryConfig *RegistryConfig, repoPath string) (RegistryStore, error) { uri, err := url.Parse(registryPath) if err != nil { return nil, err @@ -310,7 +328,7 @@ func getRegistryStoreFromSheme(registryPath string, registryConfig *RegistryConf if registryStoreType, ok := REGISTRY_STORE_CLASS_FOR_SCHEME[uri.Scheme]; ok { return getRegistryStoreFromType(registryStoreType, registryConfig, repoPath) } - return nil, errors.New(fmt.Sprintf("registry path %s has unsupported scheme %s. Supported schemes are file, s3 and gs.", registryPath, uri.Scheme)) + return nil, fmt.Errorf("registry path %s has unsupported scheme %s. Supported schemes are file, s3 and gs", registryPath, uri.Scheme) } func getRegistryStoreFromType(registryStoreType string, registryConfig *RegistryConfig, repoPath string) (RegistryStore, error) {