From 77d103d0469d7adff844b787432adfa4bc25f2b0 Mon Sep 17 00:00:00 2001 From: davitbzh <44586065+davitbzh@users.noreply.github.com> Date: Mon, 9 Nov 2020 13:37:34 +0100 Subject: [PATCH] [HOPSWORKS-1374] Append: get commit details in correct order and make upsert default op for fg.insert (#132) --- README.md | 7 ++- docs/index.md | 2 +- .../com/logicalclocks/hsfs/FeatureGroup.java | 45 +++++++++++-------- .../hsfs/FeatureGroupCommit.java | 5 ++- .../hsfs/engine/FeatureGroupEngine.java | 19 +++++++- .../logicalclocks/hsfs/engine/HudiEngine.java | 7 ++- .../hsfs/metadata/FeatureGroupApi.java | 10 +++-- python/hsfs/core/feature_group_api.py | 28 ++++++++++++ python/hsfs/core/feature_group_engine.py | 25 ++++++++++- python/hsfs/core/query.py | 9 ++-- python/hsfs/core/tfdata_engine.py | 15 +++++-- python/hsfs/feature_group.py | 27 ++++++++--- python/hsfs/feature_group_commit.py | 7 +++ 13 files changed, 161 insertions(+), 45 deletions(-) diff --git a/README.md b/README.md index c090771f52..30c19db492 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,12 @@ fg.save(dataframe) Upsert new data in to the feature group with `time_travel_format="HUDI"`". ```python -fg.insert(upsert_df, operation="upsert") +fg.insert(upsert_df) +``` + +Retrieve commit timeline metdata of the feature group with `time_travel_format="HUDI"`". +```python +fg.commit_details() ``` "Reading feature group as of specific point in time". diff --git a/docs/index.md b/docs/index.md index c090771f52..bb338cb65d 100644 --- a/docs/index.md +++ b/docs/index.md @@ -66,7 +66,7 @@ fg.save(dataframe) Upsert new data in to the feature group with `time_travel_format="HUDI"`". ```python -fg.insert(upsert_df, operation="upsert") +fg.insert(upsert_df) ``` "Reading feature group as of specific point in time". diff --git a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java index c2fdc737b6..1fd116cb7e 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java +++ b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java @@ -261,21 +261,6 @@ public void insert(Dataset featureData, boolean overwrite, Map featureData, HudiOperationType operation, Map writeOptions) - throws FeatureStoreException, IOException { - - insert(featureData, null, false, operation, writeOptions); - } - /** * Commit insert or upsert to time travel enabled Feature group. * @@ -294,13 +279,20 @@ public void insert(Dataset featureData, Storage storage, boolean overwrite, throws FeatureStoreException, IOException { // operation is only valid for time travel enabled feature group - if (this.timeTravelFormat == TimeTravelFormat.NONE && operation != null) { + if (operation != null && this.timeTravelFormat == TimeTravelFormat.NONE) { throw new IllegalArgumentException("operation argument is valid only for time travel enable feature groups"); } + if (operation == null && this.timeTravelFormat == TimeTravelFormat.HUDI) { + if (overwrite) { + operation = HudiOperationType.BULK_INSERT; + } else { + operation = HudiOperationType.UPSERT; + } + } + featureGroupEngine.saveDataframe(this, featureData, storage, - overwrite ? SaveMode.Overwrite : SaveMode.Append, operation, - writeOptions); + overwrite ? SaveMode.Overwrite : SaveMode.Append, operation, writeOptions); computeStatistics(); } @@ -333,7 +325,22 @@ public void commitDeleteRecord(Dataset featureData, Map wri featureGroupEngine.commitDelete(this, featureData, writeOptions); } - public FeatureGroupCommit[] commitDetails(Integer limit) throws IOException, FeatureStoreException { + /** + * Return commit details. + * @throws FeatureStoreException + * @throws IOException + */ + public Map> commitDetails() throws IOException, FeatureStoreException { + return featureGroupEngine.commitDetails(this, null); + } + + /** + * Return commit details. + * @param limit number of commits to return. + * @throws FeatureStoreException + * @throws IOException + */ + public Map> commitDetails(Integer limit) throws IOException, FeatureStoreException { return featureGroupEngine.commitDetails(this, limit); } diff --git a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroupCommit.java b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroupCommit.java index 5606ed8a19..3b482845c9 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroupCommit.java +++ b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroupCommit.java @@ -17,6 +17,7 @@ package com.logicalclocks.hsfs; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.logicalclocks.hsfs.metadata.RestDto; import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; @@ -27,7 +28,7 @@ @AllArgsConstructor @NoArgsConstructor @JsonIgnoreProperties(ignoreUnknown = true) -public class FeatureGroupCommit { +public class FeatureGroupCommit extends RestDto { @Getter @Setter private Long commitID; @Getter @Setter @@ -38,4 +39,4 @@ public class FeatureGroupCommit { private Long rowsUpdated; @Getter @Setter private Long rowsDeleted; -} \ No newline at end of file +} diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java index 1c9f979e2a..2b523b5ae3 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java @@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -185,9 +186,23 @@ public void updateStatisticsConfig(FeatureGroup featureGroup) throws FeatureStor featureGroup.setHistograms(apiFG.getHistograms()); } - public FeatureGroupCommit[] commitDetails(FeatureGroup featureGroup, Integer limit) + public Map> commitDetails(FeatureGroup featureGroup, Integer limit) throws IOException, FeatureStoreException { - return featureGroupApi.commitDetails(featureGroup, limit); + List featureGroupCommits = featureGroupApi.commitDetails(featureGroup, limit); + if (featureGroupCommits == null) { + throw new FeatureStoreException("There are no commit details available for this Feature group"); + } + Map> commitDetails = new HashMap>(); + for (FeatureGroupCommit featureGroupCommit : featureGroupCommits) { + commitDetails.put(featureGroupCommit.getCommitID().toString(), new HashMap() {{ + put("committedOn", hudiEngine.timeStampToHudiFormat(featureGroupCommit.getCommitID())); + put("rowsUpdated", featureGroupCommit.getRowsUpdated().toString()); + put("rowsInserted", featureGroupCommit.getRowsInserted().toString()); + put("rowsDeleted", featureGroupCommit.getRowsDeleted().toString()); + }} + ); + } + return commitDetails; } public FeatureGroupCommit commitDelete(FeatureGroup featureGroup, Dataset dataset, diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/HudiEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/HudiEngine.java index 96471a7a5c..5f5cb225a9 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/HudiEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/HudiEngine.java @@ -67,6 +67,8 @@ public class HudiEngine { "hoodie.datasource.hive_sync.partition_extractor_class"; private static final String DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL = "org.apache.hudi.hive.MultiPartKeysValueExtractor"; + private static final String HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database"; + private static final String HIVE_AUTO_CREATE_DATABASE_OPT_VAL = "false"; private static final String HUDI_COPY_ON_WRITE = "COPY_ON_WRITE"; private static final String HUDI_QUERY_TYPE_OPT_KEY = "hoodie.datasource.query.type"; @@ -168,7 +170,7 @@ private Map setupHudiWriteOpts(FeatureGroup featureGroup, HudiOp } // table name - String tableName = utils.getTableName(featureGroup); + String tableName = utils.getFgName(featureGroup); hudiArgs.put(HUDI_TABLE_NAME, tableName); hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL); @@ -178,6 +180,7 @@ private Map setupHudiWriteOpts(FeatureGroup featureGroup, HudiOp String jdbcUrl = utils.getHiveMetastoreConnector(featureGroup); hudiArgs.put(HUDI_HIVE_SYNC_JDBC_URL, jdbcUrl); hudiArgs.put(HUDI_HIVE_SYNC_DB, featureGroup.getFeatureStore().getName()); + hudiArgs.put(HIVE_AUTO_CREATE_DATABASE_OPT_KEY, HIVE_AUTO_CREATE_DATABASE_OPT_VAL); hudiArgs.put(HUDI_TABLE_OPERATION,operation.getValue()); @@ -207,7 +210,7 @@ private Map setupHudiReadOpts(Long startTimestamp, Long endTimes } @SneakyThrows - private String timeStampToHudiFormat(Long commitedOnTimeStamp) { + public String timeStampToHudiFormat(Long commitedOnTimeStamp) { Date commitedOnDate = new Timestamp(commitedOnTimeStamp); return dateFormat.format(commitedOnDate); } diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java index 6c75386be8..70890c9379 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java @@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.List; import static com.logicalclocks.hsfs.metadata.HopsworksClient.PROJECT_PATH; @@ -41,7 +42,7 @@ public class FeatureGroupApi { public static final String FEATURE_GROUP_ID_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgId}{?updateStatsSettings," + "updateMetadata}"; public static final String FEATURE_GROUP_COMMIT_PATH = FEATURE_GROUP_ID_PATH - + "/commits{?limit}"; + + "/commits{?sort_by,offset,limit}"; public static final String FEATURE_GROUP_CLEAR_PATH = FEATURE_GROUP_ID_PATH + "/clear"; private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class); @@ -174,7 +175,7 @@ public FeatureGroupCommit featureGroupCommit(FeatureGroup featureGroup, FeatureG return hopsworksClient.handleRequest(postRequest, FeatureGroupCommit.class); } - public FeatureGroupCommit[] commitDetails(FeatureGroup featureGroupBase, Integer limit) + public List commitDetails(FeatureGroup featureGroupBase, Integer limit) throws IOException, FeatureStoreException { HopsworksClient hopsworksClient = HopsworksClient.getInstance(); String pathTemplate = PROJECT_PATH @@ -185,10 +186,13 @@ public FeatureGroupCommit[] commitDetails(FeatureGroup featureGroupBase, Integer .set("projectId", featureGroupBase.getFeatureStore().getProjectId()) .set("fsId", featureGroupBase.getFeatureStore().getId()) .set("fgId", featureGroupBase.getId()) + .set("sort_by","committed_on:desc") + .set("offset", 0) .set("limit", limit) .expand(); LOGGER.info("Sending metadata request: " + uri); - return hopsworksClient.handleRequest(new HttpGet(uri), FeatureGroupCommit[].class); + FeatureGroupCommit featureGroupCommit = hopsworksClient.handleRequest(new HttpGet(uri), FeatureGroupCommit.class); + return featureGroupCommit.getItems(); } } diff --git a/python/hsfs/core/feature_group_api.py b/python/hsfs/core/feature_group_api.py index b9526e2aaf..ce58977a46 100644 --- a/python/hsfs/core/feature_group_api.py +++ b/python/hsfs/core/feature_group_api.py @@ -16,6 +16,7 @@ from hsfs import client from hsfs import feature_group +from hsfs import feature_group_commit class FeatureGroupApi: @@ -186,3 +187,30 @@ def commit(self, feature_group_instance, feature_group_commit_instance): data=feature_group_commit_instance.json(), ), ) + + def commit_details(self, feature_group_instance, limit): + """ + Get feature group commit metadata. + # Arguments + feature_group_instance: FeatureGroup, required + metadata object of feature group. + limit: number of commits to retrieve + # Returns + `FeatureGroupCommit`. + """ + _client = client.get_instance() + path_params = [ + "project", + _client._project_id, + "featurestores", + self._feature_store_id, + "featuregroups", + feature_group_instance.id, + "commits", + ] + headers = {"content-type": "application/json"} + query_params = {"sort_by": "committed_on:desc", "offset": 0, "limit": limit} + + return feature_group_commit.FeatureGroupCommit.from_response_json( + _client._send_request("GET", path_params, query_params, headers=headers), + ) diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index c3018a992e..8462f293b5 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -113,7 +113,7 @@ def insert( feature_group, feature_dataframe, self.APPEND, - operation, + "bulk_insert" if overwrite else operation, feature_group.online_enabled, storage, offline_write_options, @@ -123,6 +123,29 @@ def insert( def delete(self, feature_group): self._feature_group_api.delete(feature_group) + def commit_details(self, feature_group, limit): + hudi_engine_instance = hudi_engine.HudiEngine( + feature_group.feature_store_id, + feature_group.feature_store_name, + feature_group, + engine.get_instance()._spark_context, + engine.get_instance()._spark_session, + ) + feature_group_commits = self._feature_group_api.commit_details( + feature_group, limit + ) + commit_details = {} + for feature_group_commit in feature_group_commits: + commit_details[feature_group_commit.commitid] = { + "committedOn": hudi_engine_instance._timestamp_to_hudiformat( + feature_group_commit.commitid + ), + "rowsUpdated": feature_group_commit.rows_updated, + "rowsInserted": feature_group_commit.rows_inserted, + "rowsDeleted": feature_group_commit.rows_deleted, + } + return commit_details + @staticmethod def commit_delete(feature_group, delete_df, write_options): hudi_engine_instance = hudi_engine.HudiEngine( diff --git a/python/hsfs/core/query.py b/python/hsfs/core/query.py index 0a34726390..48005ddf8d 100644 --- a/python/hsfs/core/query.py +++ b/python/hsfs/core/query.py @@ -68,10 +68,10 @@ def show(self, n, online=False): query = self._query_constructor_api.construct_query(self) if online: - sql_query = query["queryOnline"] + sql_query = query.query_online online_conn = self._storage_connector_api.get_online_connector() else: - sql_query = query["query"] + sql_query = query.query online_conn = None return engine.get_instance().show( @@ -108,9 +108,8 @@ def to_dict(self): } def to_string(self, online=False): - return self._query_constructor_api.construct_query(self)[ - "queryOnline" if online else "query" - ] + fs_query_instance = self._query_constructor_api.construct_query(self) + return fs_query_instance.query_online if online else fs_query_instance.query def __str__(self): return self._query_constructor_api.construct_query(self) diff --git a/python/hsfs/core/tfdata_engine.py b/python/hsfs/core/tfdata_engine.py index fad28ae0c0..8f6526dd9f 100644 --- a/python/hsfs/core/tfdata_engine.py +++ b/python/hsfs/core/tfdata_engine.py @@ -421,7 +421,7 @@ def _get_hopsfs_dataset_files(training_dataset_location, split, filter_empty): for file in all_list: # remove empty file if any if filter_empty: - _file_size = hdfs.hdfs("default", 0).get_path_info(file)["size"] + _file_size = hdfs.path.getsize(file) if _file_size == 0: include_file = False else: @@ -470,14 +470,23 @@ def _convert_to_tf_dtype(input_type): try: tf_type = TFDataEngine.SPARK_TO_TFDTYPES_MAPPINGS[input_type] except KeyError: - raise ValueError("Unknown type of value, please report to hsfs maintainers") + raise ValueError( + "Type " + + input_type + + " is not allowed here. allowed types are '" + + "', '".join( + [key for key in TFDataEngine.SPARK_TO_TFDTYPES_MAPPINGS.keys()] + ) + + "'. Please refer to `record_defaults` in " + + "https://www.tensorflow.org/api_docs/python/tf/data/experimental/CsvDataset" + ) return tf_type @staticmethod def _convert2float32(input): if input.dtype == tf.string: raise ValueError( - "tf.string feature is not allowed here. please provide process=False and preprocess " + "tf.string feature is not allowed here. please set process=False and preprocess " "dataset accordingly" ) elif input.dtype in TFDataEngine.SUPPORTED_TFDTYPES: diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index 35f7c278f0..b2b4025171 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -150,7 +150,7 @@ def read( `list`. A two-dimensional Python list. # Raises - `RestAPIError`. + `RestAPIError`. No data is available for feature group with this commit date, If time travel enabled. """ engine.get_instance().set_job_group( "Fetching Feature group", @@ -204,7 +204,7 @@ def read_changes( feature data. # Raises - `RestAPIError`. + `RestAPIError`. No data is available for feature group with this commit date. """ return ( @@ -318,7 +318,7 @@ def insert( List[list], ], overwrite: Optional[bool] = False, - operation: Optional[str] = None, + operation: Optional[str] = "upsert", storage: Optional[str] = None, write_options: Optional[Dict[Any, Any]] = {}, ): @@ -344,7 +344,7 @@ def insert( fs = conn.get_feature_store(); fg = fs.get_feature_group("example_feature_group", 1) upsert_df = ... - fg.insert(upsert_df, operation="upsert") + fg.insert(upsert_df) ``` # Arguments @@ -352,7 +352,7 @@ def insert( overwrite: Drop all data in the feature group before inserting new data. This does not affect metadata, defaults to False. operation: Apache Hudi operation type `"insert"` or `"upsert"`. - Defaults to `None`. + Defaults to `"upsert"`. storage: Overwrite default behaviour, write to offline storage only with `"offline"` or online only with `"online"`, defaults to `None`. @@ -375,6 +375,21 @@ def insert( self.compute_statistics() + def commit_details(self, limit: Optional[int] = None): + """Retrieves commit timeline for this feature group. + + # Arguments + limit: Number of commits to retrieve. Defaults to `None`. + + # Returns + `Dict[str, Dict[str, str]]`. Dictionary object of commit metadata timeline, where Key is commit id and value + is `Dict[str, str]` with key value pairs of date committed on, number of rows updated, inserted and deleted. + + # Raises + `RestAPIError`. + """ + return self._feature_group_engine.commit_details(self, limit) + def delete(self): """Drop the entire feature group along with its feature data. @@ -393,7 +408,7 @@ def commit_delete_record( delete_df: TypeVar("pyspark.sql.DataFrame"), # noqa: F821 write_options: Optional[Dict[Any, Any]] = {}, ): - """Drops records in the provided DataFrame and commits it as update to this + """Drops records present in the provided DataFrame and commits it as update to this Feature group. # Arguments diff --git a/python/hsfs/feature_group_commit.py b/python/hsfs/feature_group_commit.py index 8d7c3c008c..5a7a20bdc1 100644 --- a/python/hsfs/feature_group_commit.py +++ b/python/hsfs/feature_group_commit.py @@ -28,6 +28,11 @@ def __init__( rows_inserted=None, rows_updated=None, rows_deleted=None, + committime=None, + type=None, + items=None, + count=None, + href=None, ): self._commitid = commitid self._commit_date_string = commit_date_string @@ -38,6 +43,8 @@ def __init__( @classmethod def from_response_json(cls, json_dict): json_decamelized = humps.decamelize(json_dict) + if json_decamelized["count"] >= 1: + return [cls(**commit_dto) for commit_dto in json_decamelized["items"]] return cls(**json_decamelized) def update_from_response_json(self, json_dict):