Skip to content

Commit

Permalink
[HOPSWORKS-1374] Append: get commit details in correct order and make…
Browse files Browse the repository at this point in the history
… upsert default op for fg.insert (#132)
  • Loading branch information
davitbzh authored Nov 9, 2020
1 parent 7c704e0 commit 77d103d
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 45 deletions.
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ fg.save(dataframe)

Upsert new data in to the feature group with `time_travel_format="HUDI"`".
```python
fg.insert(upsert_df, operation="upsert")
fg.insert(upsert_df)
```

Retrieve commit timeline metdata of the feature group with `time_travel_format="HUDI"`".
```python
fg.commit_details()
```

"Reading feature group as of specific point in time".
Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fg.save(dataframe)

Upsert new data in to the feature group with `time_travel_format="HUDI"`".
```python
fg.insert(upsert_df, operation="upsert")
fg.insert(upsert_df)
```

"Reading feature group as of specific point in time".
Expand Down
45 changes: 26 additions & 19 deletions java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,21 +261,6 @@ public void insert(Dataset<Row> featureData, boolean overwrite, Map<String, Stri
insert(featureData, null, overwrite, null, writeOptions);
}

/**
* Commit insert or upsert to time travel enabled Feature group.
*
* @param featureData dataframe to be committed.
* @param operation commit operation type, INSERT or UPSERT.
* @param writeOptions user provided write options.
* @throws FeatureStoreException
* @throws IOException
*/
public void insert(Dataset<Row> featureData, HudiOperationType operation, Map<String, String> writeOptions)
throws FeatureStoreException, IOException {

insert(featureData, null, false, operation, writeOptions);
}

/**
* Commit insert or upsert to time travel enabled Feature group.
*
Expand All @@ -294,13 +279,20 @@ public void insert(Dataset<Row> featureData, Storage storage, boolean overwrite,
throws FeatureStoreException, IOException {

// operation is only valid for time travel enabled feature group
if (this.timeTravelFormat == TimeTravelFormat.NONE && operation != null) {
if (operation != null && this.timeTravelFormat == TimeTravelFormat.NONE) {
throw new IllegalArgumentException("operation argument is valid only for time travel enable feature groups");
}

if (operation == null && this.timeTravelFormat == TimeTravelFormat.HUDI) {
if (overwrite) {
operation = HudiOperationType.BULK_INSERT;
} else {
operation = HudiOperationType.UPSERT;
}
}

featureGroupEngine.saveDataframe(this, featureData, storage,
overwrite ? SaveMode.Overwrite : SaveMode.Append, operation,
writeOptions);
overwrite ? SaveMode.Overwrite : SaveMode.Append, operation, writeOptions);

computeStatistics();
}
Expand Down Expand Up @@ -333,7 +325,22 @@ public void commitDeleteRecord(Dataset<Row> featureData, Map<String, String> wri
featureGroupEngine.commitDelete(this, featureData, writeOptions);
}

public FeatureGroupCommit[] commitDetails(Integer limit) throws IOException, FeatureStoreException {
/**
* Return commit details.
* @throws FeatureStoreException
* @throws IOException
*/
public Map<String, Map<String,String>> commitDetails() throws IOException, FeatureStoreException {
return featureGroupEngine.commitDetails(this, null);
}

/**
* Return commit details.
* @param limit number of commits to return.
* @throws FeatureStoreException
* @throws IOException
*/
public Map<String, Map<String,String>> commitDetails(Integer limit) throws IOException, FeatureStoreException {
return featureGroupEngine.commitDetails(this, limit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.logicalclocks.hsfs;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.logicalclocks.hsfs.metadata.RestDto;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand All @@ -27,7 +28,7 @@
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class FeatureGroupCommit {
public class FeatureGroupCommit extends RestDto<FeatureGroupCommit> {
@Getter @Setter
private Long commitID;
@Getter @Setter
Expand All @@ -38,4 +39,4 @@ public class FeatureGroupCommit {
private Long rowsUpdated;
@Getter @Setter
private Long rowsDeleted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -185,9 +186,23 @@ public void updateStatisticsConfig(FeatureGroup featureGroup) throws FeatureStor
featureGroup.setHistograms(apiFG.getHistograms());
}

public FeatureGroupCommit[] commitDetails(FeatureGroup featureGroup, Integer limit)
public Map<String, Map<String,String>> commitDetails(FeatureGroup featureGroup, Integer limit)
throws IOException, FeatureStoreException {
return featureGroupApi.commitDetails(featureGroup, limit);
List<FeatureGroupCommit> featureGroupCommits = featureGroupApi.commitDetails(featureGroup, limit);
if (featureGroupCommits == null) {
throw new FeatureStoreException("There are no commit details available for this Feature group");
}
Map<String, Map<String,String>> commitDetails = new HashMap<String, Map<String,String>>();
for (FeatureGroupCommit featureGroupCommit : featureGroupCommits) {
commitDetails.put(featureGroupCommit.getCommitID().toString(), new HashMap<String, String>() {{
put("committedOn", hudiEngine.timeStampToHudiFormat(featureGroupCommit.getCommitID()));
put("rowsUpdated", featureGroupCommit.getRowsUpdated().toString());
put("rowsInserted", featureGroupCommit.getRowsInserted().toString());
put("rowsDeleted", featureGroupCommit.getRowsDeleted().toString());
}}
);
}
return commitDetails;
}

public FeatureGroupCommit commitDelete(FeatureGroup featureGroup, Dataset<Row> dataset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class HudiEngine {
"hoodie.datasource.hive_sync.partition_extractor_class";
private static final String DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL =
"org.apache.hudi.hive.MultiPartKeysValueExtractor";
private static final String HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database";
private static final String HIVE_AUTO_CREATE_DATABASE_OPT_VAL = "false";

private static final String HUDI_COPY_ON_WRITE = "COPY_ON_WRITE";
private static final String HUDI_QUERY_TYPE_OPT_KEY = "hoodie.datasource.query.type";
Expand Down Expand Up @@ -168,7 +170,7 @@ private Map<String, String> setupHudiWriteOpts(FeatureGroup featureGroup, HudiOp
}

// table name
String tableName = utils.getTableName(featureGroup);
String tableName = utils.getFgName(featureGroup);
hudiArgs.put(HUDI_TABLE_NAME, tableName);
hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL);

Expand All @@ -178,6 +180,7 @@ private Map<String, String> setupHudiWriteOpts(FeatureGroup featureGroup, HudiOp
String jdbcUrl = utils.getHiveMetastoreConnector(featureGroup);
hudiArgs.put(HUDI_HIVE_SYNC_JDBC_URL, jdbcUrl);
hudiArgs.put(HUDI_HIVE_SYNC_DB, featureGroup.getFeatureStore().getName());
hudiArgs.put(HIVE_AUTO_CREATE_DATABASE_OPT_KEY, HIVE_AUTO_CREATE_DATABASE_OPT_VAL);

hudiArgs.put(HUDI_TABLE_OPERATION,operation.getValue());

Expand Down Expand Up @@ -207,7 +210,7 @@ private Map<String, String> setupHudiReadOpts(Long startTimestamp, Long endTimes
}

@SneakyThrows
private String timeStampToHudiFormat(Long commitedOnTimeStamp) {
public String timeStampToHudiFormat(Long commitedOnTimeStamp) {
Date commitedOnDate = new Timestamp(commitedOnTimeStamp);
return dateFormat.format(commitedOnDate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

import static com.logicalclocks.hsfs.metadata.HopsworksClient.PROJECT_PATH;

Expand All @@ -41,7 +42,7 @@ public class FeatureGroupApi {
public static final String FEATURE_GROUP_ID_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgId}{?updateStatsSettings,"
+ "updateMetadata}";
public static final String FEATURE_GROUP_COMMIT_PATH = FEATURE_GROUP_ID_PATH
+ "/commits{?limit}";
+ "/commits{?sort_by,offset,limit}";
public static final String FEATURE_GROUP_CLEAR_PATH = FEATURE_GROUP_ID_PATH + "/clear";

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class);
Expand Down Expand Up @@ -174,7 +175,7 @@ public FeatureGroupCommit featureGroupCommit(FeatureGroup featureGroup, FeatureG
return hopsworksClient.handleRequest(postRequest, FeatureGroupCommit.class);
}

public FeatureGroupCommit[] commitDetails(FeatureGroup featureGroupBase, Integer limit)
public List<FeatureGroupCommit> commitDetails(FeatureGroup featureGroupBase, Integer limit)
throws IOException, FeatureStoreException {
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = PROJECT_PATH
Expand All @@ -185,10 +186,13 @@ public FeatureGroupCommit[] commitDetails(FeatureGroup featureGroupBase, Integer
.set("projectId", featureGroupBase.getFeatureStore().getProjectId())
.set("fsId", featureGroupBase.getFeatureStore().getId())
.set("fgId", featureGroupBase.getId())
.set("sort_by","committed_on:desc")
.set("offset", 0)
.set("limit", limit)
.expand();

LOGGER.info("Sending metadata request: " + uri);
return hopsworksClient.handleRequest(new HttpGet(uri), FeatureGroupCommit[].class);
FeatureGroupCommit featureGroupCommit = hopsworksClient.handleRequest(new HttpGet(uri), FeatureGroupCommit.class);
return featureGroupCommit.getItems();
}
}
28 changes: 28 additions & 0 deletions python/hsfs/core/feature_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from hsfs import client
from hsfs import feature_group
from hsfs import feature_group_commit


class FeatureGroupApi:
Expand Down Expand Up @@ -186,3 +187,30 @@ def commit(self, feature_group_instance, feature_group_commit_instance):
data=feature_group_commit_instance.json(),
),
)

def commit_details(self, feature_group_instance, limit):
"""
Get feature group commit metadata.
# Arguments
feature_group_instance: FeatureGroup, required
metadata object of feature group.
limit: number of commits to retrieve
# Returns
`FeatureGroupCommit`.
"""
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"featurestores",
self._feature_store_id,
"featuregroups",
feature_group_instance.id,
"commits",
]
headers = {"content-type": "application/json"}
query_params = {"sort_by": "committed_on:desc", "offset": 0, "limit": limit}

return feature_group_commit.FeatureGroupCommit.from_response_json(
_client._send_request("GET", path_params, query_params, headers=headers),
)
25 changes: 24 additions & 1 deletion python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def insert(
feature_group,
feature_dataframe,
self.APPEND,
operation,
"bulk_insert" if overwrite else operation,
feature_group.online_enabled,
storage,
offline_write_options,
Expand All @@ -123,6 +123,29 @@ def insert(
def delete(self, feature_group):
self._feature_group_api.delete(feature_group)

def commit_details(self, feature_group, limit):
hudi_engine_instance = hudi_engine.HudiEngine(
feature_group.feature_store_id,
feature_group.feature_store_name,
feature_group,
engine.get_instance()._spark_context,
engine.get_instance()._spark_session,
)
feature_group_commits = self._feature_group_api.commit_details(
feature_group, limit
)
commit_details = {}
for feature_group_commit in feature_group_commits:
commit_details[feature_group_commit.commitid] = {
"committedOn": hudi_engine_instance._timestamp_to_hudiformat(
feature_group_commit.commitid
),
"rowsUpdated": feature_group_commit.rows_updated,
"rowsInserted": feature_group_commit.rows_inserted,
"rowsDeleted": feature_group_commit.rows_deleted,
}
return commit_details

@staticmethod
def commit_delete(feature_group, delete_df, write_options):
hudi_engine_instance = hudi_engine.HudiEngine(
Expand Down
9 changes: 4 additions & 5 deletions python/hsfs/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ def show(self, n, online=False):
query = self._query_constructor_api.construct_query(self)

if online:
sql_query = query["queryOnline"]
sql_query = query.query_online
online_conn = self._storage_connector_api.get_online_connector()
else:
sql_query = query["query"]
sql_query = query.query
online_conn = None

return engine.get_instance().show(
Expand Down Expand Up @@ -108,9 +108,8 @@ def to_dict(self):
}

def to_string(self, online=False):
return self._query_constructor_api.construct_query(self)[
"queryOnline" if online else "query"
]
fs_query_instance = self._query_constructor_api.construct_query(self)
return fs_query_instance.query_online if online else fs_query_instance.query

def __str__(self):
return self._query_constructor_api.construct_query(self)
Expand Down
15 changes: 12 additions & 3 deletions python/hsfs/core/tfdata_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def _get_hopsfs_dataset_files(training_dataset_location, split, filter_empty):
for file in all_list:
# remove empty file if any
if filter_empty:
_file_size = hdfs.hdfs("default", 0).get_path_info(file)["size"]
_file_size = hdfs.path.getsize(file)
if _file_size == 0:
include_file = False
else:
Expand Down Expand Up @@ -470,14 +470,23 @@ def _convert_to_tf_dtype(input_type):
try:
tf_type = TFDataEngine.SPARK_TO_TFDTYPES_MAPPINGS[input_type]
except KeyError:
raise ValueError("Unknown type of value, please report to hsfs maintainers")
raise ValueError(
"Type "
+ input_type
+ " is not allowed here. allowed types are '"
+ "', '".join(
[key for key in TFDataEngine.SPARK_TO_TFDTYPES_MAPPINGS.keys()]
)
+ "'. Please refer to `record_defaults` in "
+ "https://www.tensorflow.org/api_docs/python/tf/data/experimental/CsvDataset"
)
return tf_type

@staticmethod
def _convert2float32(input):
if input.dtype == tf.string:
raise ValueError(
"tf.string feature is not allowed here. please provide process=False and preprocess "
"tf.string feature is not allowed here. please set process=False and preprocess "
"dataset accordingly"
)
elif input.dtype in TFDataEngine.SUPPORTED_TFDTYPES:
Expand Down
Loading

0 comments on commit 77d103d

Please sign in to comment.