diff --git a/java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java b/java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java index 3390ee1743..df339c21aa 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java +++ b/java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java @@ -17,6 +17,7 @@ package com.logicalclocks.hsfs; import com.fasterxml.jackson.annotation.JsonProperty; +import com.logicalclocks.hsfs.constructor.Query; import com.logicalclocks.hsfs.engine.FeatureViewEngine; import com.logicalclocks.hsfs.engine.SparkEngine; import com.logicalclocks.hsfs.metadata.FeatureGroupApi; @@ -253,14 +254,14 @@ public FeatureGroup.FeatureGroupBuilder createFeatureGroup() { } public FeatureGroup getOrCreateFeatureGroup(String name, Integer version) throws IOException, FeatureStoreException { - return featureGroupApi.getOrCreateFeatureGroup(this, name, version, null, null, + return featureGroupApi.getOrCreateFeatureGroup(this, name, version, null, null, null, null, false, null, null, null); } public FeatureGroup getOrCreateFeatureGroup(String name, Integer version, List primaryKeys, boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException { - return featureGroupApi.getOrCreateFeatureGroup(this, name, version, null, primaryKeys, + return featureGroupApi.getOrCreateFeatureGroup(this, name, version, null, primaryKeys, null, null, onlineEnabled, null, null, eventTime); } @@ -270,7 +271,7 @@ public FeatureGroup getOrCreateFeatureGroup(String name, Integer version, boolean onlineEnabled, String eventTime) throws IOException, FeatureStoreException { - return featureGroupApi.getOrCreateFeatureGroup(this, name, version, null, primaryKeys, + return featureGroupApi.getOrCreateFeatureGroup(this, name, version, null, primaryKeys, partitionKeys, null, onlineEnabled, null, null, eventTime); } @@ -281,7 +282,7 @@ public FeatureGroup getOrCreateFeatureGroup(String name, Integer version, String StatisticsConfig statisticsConfig, String eventTime) throws IOException, FeatureStoreException { - return featureGroupApi.getOrCreateFeatureGroup(this, name, version, description, primaryKeys, + return featureGroupApi.getOrCreateFeatureGroup(this, name, version, description, primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, timeTravelFormat, statisticsConfig, eventTime); } @@ -338,6 +339,36 @@ public FeatureView.FeatureViewBuilder createFeatureView() { return new FeatureView.FeatureViewBuilder(this); } + /** + * Get feature view metadata object or create a new one if it doesn't exist. This method doesn't update + * existing feature view metadata object. + * + * @param name name of the feature view + * @param query Query object + * @param version version of the feature view + * @return FeatureView + */ + public FeatureView getOrCreateFeatureView(String name, Query query, Integer version) + throws FeatureStoreException, IOException { + return featureViewEngine.getOrCreateFeatureView(this, name, version, query, null, null); + } + + /** + * Get feature view metadata object or create a new one if it doesn't exist. This method doesn't update + * existing feature view metadata object. + * + * @param name name of the feature view + * @param query Query object + * @param version version of the feature view + * @param description description of the feature view + * @param labels list of label features + * @return FeatureView + */ + public FeatureView getOrCreateFeatureView(String name, Query query, Integer version, String description, + List labels) throws FeatureStoreException, IOException { + return featureViewEngine.getOrCreateFeatureView(this, name, version, query, description, labels); + } + /** * Get a feature view object from the selected feature store. * diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureViewEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureViewEngine.java index df57b00f93..8fe5ef65fe 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureViewEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureViewEngine.java @@ -398,4 +398,24 @@ public Map getTags(FeatureView featureView, Integer trainingData throws FeatureStoreException, IOException { return tagsApi.get(featureView, trainingDataVersion); } + + public FeatureView getOrCreateFeatureView(FeatureStore featureStore, String name, Integer version, Query query, + String description, List labels) + throws FeatureStoreException, IOException { + FeatureView featureView = null; + try { + featureView = get(featureStore, name, version); + } catch (IOException | FeatureStoreException e) { + if (e.getMessage().contains("Error: 404") && e.getMessage().contains("\"errorCode\":270181")) { + featureView = new FeatureView.FeatureViewBuilder(featureStore) + .name(name) + .version(version) + .query(query) + .description(description) + .labels(labels) + .build(); + } + } + return featureView; + } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/StreamFeatureGroupEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/StreamFeatureGroupEngine.java index 437b45ef97..c23680bfa7 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/StreamFeatureGroupEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/StreamFeatureGroupEngine.java @@ -116,7 +116,7 @@ public void insert(StreamFeatureGroup streamFeatureGroup, S featureData, public StreamFeatureGroup saveFeatureGroupMetaData(StreamFeatureGroup featureGroup, List partitionKeys, String hudiPrecombineKey, Map writeOptions, JobConfiguration sparkJobConfiguration, S featureData) - throws FeatureStoreException, IOException, ParseException { + throws FeatureStoreException, IOException { if (featureGroup.getFeatures() == null) { featureGroup.setFeatures(utils diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 292df31641..9b92e8a968 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -963,6 +963,56 @@ def create_feature_view( ) return self._feature_view_engine.save(feat_view) + def get_or_create_feature_view( + self, + name: str, + query: Query, + version: int, + description: Optional[str] = "", + labels: Optional[List[str]] = [], + transformation_functions: Optional[Dict[str, TransformationFunction]] = {}, + ): + """Get feature view metadata object or create a new one if it doesn't exist. This method doesn't update + existing feature view metadata object. + + # Arguments + name: Name of the feature view to create. + query: Feature store `Query`. + version: Version of the feature view to create. + description: A string describing the contents of the feature view to + improve discoverability for Data Scientists, defaults to empty string + `""`. + labels: A list of feature names constituting the prediction label/feature of + the feature view. When replaying a `Query` during model inference, + the label features can be omitted from the feature vector retrieval. + Defaults to `[]`, no label. + transformation_functions: A dictionary mapping tansformation functions to + to the features they should be applied to before writing out the + vector and at inference time. Defaults to `{}`, no + transformations. + + # Returns: + `FeatureView`: The feature view metadata object. + """ + + try: + return self._feature_view_engine.get(name, version) + except exceptions.RestAPIError as e: + if ( + e.response.json().get("errorCode", "") == 270181 + and e.response.status_code == 404 + ): + return self.create_feature_view( + name=name, + query=query, + version=version, + description=description, + labels=labels, + transformation_functions=transformation_functions, + ) + else: + raise e + def get_feature_view(self, name: str, version: int = None): """Get a feature view entity from the feature store. diff --git a/python/tests/fixtures/generate_backend_fixtures.ipynb b/python/tests/fixtures/generate_backend_fixtures.ipynb index 22b408dd2d..7d17b83b8f 100644 --- a/python/tests/fixtures/generate_backend_fixtures.ipynb +++ b/python/tests/fixtures/generate_backend_fixtures.ipynb @@ -354,17 +354,12 @@ " fg_2.save(df_2)\n", " \n", " # fv\n", - "\n", - " try:\n", - " self.feature_view = fs.get_feature_view('fv_test')\n", - " except RestAPIError:\n", - " query = fg_1.select_all().join(fg_2.select_all())\n", - " \n", - " self.feature_view = fs.create_feature_view(\n", - " name='fv_test',\n", - " query=query,\n", - " version=1\n", - " )\n", + " query = fg_1.select_all().join(fg_2.select_all())\n", + " self.feature_view = fs.get_or_create_feature_view(\n", + " name='fv_test',\n", + " query=query,\n", + " version=1\n", + " )\n", "\n", " def call(self):\n", " fs.get_feature_view('fv_test')\n", @@ -559,20 +554,23 @@ ], "metadata": { "kernelspec": { - "display_name": "PySpark", + "display_name": "Python 3 (ipykernel)", "language": "python", - "name": "pysparkkernel" + "name": "python3" }, "language_info": { "codemirror_mode": { - "name": "python", + "name": "ipython", "version": 3 }, + "file_extension": ".py", "mimetype": "text/x-python", - "name": "pyspark", - "pygments_lexer": "python3" + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.8" } }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +}