Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HOPSWORKS-1374] Append: get commit details in correct order and make upsert default op for fg.insert #132

Merged
merged 17 commits into from
Nov 9, 2020
7 changes: 6 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ fg.save(dataframe)

Upsert new data in to the feature group with `time_travel_format="HUDI"`".
```python
fg.insert(upsert_df, operation="upsert")
fg.insert(upsert_df)
```

Retrieve commit timeline metdata of the feature group with `time_travel_format="HUDI"`".
```python
fg.commit_details()
```

"Reading feature group as of specific point in time".
Expand Down
2 changes: 1 addition & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fg.save(dataframe)

Upsert new data in to the feature group with `time_travel_format="HUDI"`".
```python
fg.insert(upsert_df, operation="upsert")
fg.insert(upsert_df)
```

"Reading feature group as of specific point in time".
Expand Down
45 changes: 26 additions & 19 deletions java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -261,21 +261,6 @@ public void insert(Dataset<Row> featureData, boolean overwrite, Map<String, Stri
insert(featureData, null, overwrite, null, writeOptions);
}

/**
* Commit insert or upsert to time travel enabled Feature group.
*
* @param featureData dataframe to be committed.
* @param operation commit operation type, INSERT or UPSERT.
* @param writeOptions user provided write options.
* @throws FeatureStoreException
* @throws IOException
*/
public void insert(Dataset<Row> featureData, HudiOperationType operation, Map<String, String> writeOptions)
throws FeatureStoreException, IOException {

insert(featureData, null, false, operation, writeOptions);
}

/**
* Commit insert or upsert to time travel enabled Feature group.
*
Expand All @@ -294,13 +279,20 @@ public void insert(Dataset<Row> featureData, Storage storage, boolean overwrite,
throws FeatureStoreException, IOException {

// operation is only valid for time travel enabled feature group
if (this.timeTravelFormat == TimeTravelFormat.NONE && operation != null) {
if (operation != null && this.timeTravelFormat == TimeTravelFormat.NONE) {
throw new IllegalArgumentException("operation argument is valid only for time travel enable feature groups");
}

if (operation == null && this.timeTravelFormat == TimeTravelFormat.HUDI) {
if (overwrite) {
operation = HudiOperationType.BULK_INSERT;
} else {
operation = HudiOperationType.UPSERT;
}
}

featureGroupEngine.saveDataframe(this, featureData, storage,
overwrite ? SaveMode.Overwrite : SaveMode.Append, operation,
writeOptions);
overwrite ? SaveMode.Overwrite : SaveMode.Append, operation, writeOptions);

computeStatistics();
}
Expand Down Expand Up @@ -333,7 +325,22 @@ public void commitDeleteRecord(Dataset<Row> featureData, Map<String, String> wri
featureGroupEngine.commitDelete(this, featureData, writeOptions);
}

public FeatureGroupCommit[] commitDetails(Integer limit) throws IOException, FeatureStoreException {
/**
* Return commit details.
* @throws FeatureStoreException
* @throws IOException
*/
public Map<String, Map<String,String>> commitDetails() throws IOException, FeatureStoreException {
return featureGroupEngine.commitDetails(this, null);
}

/**
* Return commit details.
* @param limit number of commits to return.
* @throws FeatureStoreException
* @throws IOException
*/
public Map<String, Map<String,String>> commitDetails(Integer limit) throws IOException, FeatureStoreException {
return featureGroupEngine.commitDetails(this, limit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.logicalclocks.hsfs;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.logicalclocks.hsfs.metadata.RestDto;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
Expand All @@ -27,7 +28,7 @@
@AllArgsConstructor
@NoArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
public class FeatureGroupCommit {
public class FeatureGroupCommit extends RestDto<FeatureGroupCommit> {
@Getter @Setter
private Long commitID;
@Getter @Setter
Expand All @@ -38,4 +39,4 @@ public class FeatureGroupCommit {
private Long rowsUpdated;
@Getter @Setter
private Long rowsDeleted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -185,9 +186,23 @@ public void updateStatisticsConfig(FeatureGroup featureGroup) throws FeatureStor
featureGroup.setHistograms(apiFG.getHistograms());
}

public FeatureGroupCommit[] commitDetails(FeatureGroup featureGroup, Integer limit)
public Map<String, Map<String,String>> commitDetails(FeatureGroup featureGroup, Integer limit)
throws IOException, FeatureStoreException {
return featureGroupApi.commitDetails(featureGroup, limit);
List<FeatureGroupCommit> featureGroupCommits = featureGroupApi.commitDetails(featureGroup, limit);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are no commits, this is going to be null and the next part of the code will trigger a NullPointerException. Check for it.

if (featureGroupCommits == null) {
throw new FeatureStoreException("There are no commit details available for this Feature group");
}
Map<String, Map<String,String>> commitDetails = new HashMap<String, Map<String,String>>();
for (FeatureGroupCommit featureGroupCommit : featureGroupCommits) {
commitDetails.put(featureGroupCommit.getCommitID().toString(), new HashMap<String, String>() {{
put("committedOn", hudiEngine.timeStampToHudiFormat(featureGroupCommit.getCommitID()));
put("rowsUpdated", featureGroupCommit.getRowsUpdated().toString());
put("rowsInserted", featureGroupCommit.getRowsInserted().toString());
put("rowsDeleted", featureGroupCommit.getRowsDeleted().toString());
}}
);
}
return commitDetails;
}

public FeatureGroupCommit commitDelete(FeatureGroup featureGroup, Dataset<Row> dataset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ public class HudiEngine {
"hoodie.datasource.hive_sync.partition_extractor_class";
private static final String DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL =
"org.apache.hudi.hive.MultiPartKeysValueExtractor";
private static final String HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database";
private static final String HIVE_AUTO_CREATE_DATABASE_OPT_VAL = "false";

private static final String HUDI_COPY_ON_WRITE = "COPY_ON_WRITE";
private static final String HUDI_QUERY_TYPE_OPT_KEY = "hoodie.datasource.query.type";
Expand Down Expand Up @@ -168,7 +170,7 @@ private Map<String, String> setupHudiWriteOpts(FeatureGroup featureGroup, HudiOp
}

// table name
String tableName = utils.getTableName(featureGroup);
String tableName = utils.getFgName(featureGroup);
hudiArgs.put(HUDI_TABLE_NAME, tableName);
hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL);

Expand All @@ -178,6 +180,7 @@ private Map<String, String> setupHudiWriteOpts(FeatureGroup featureGroup, HudiOp
String jdbcUrl = utils.getHiveMetastoreConnector(featureGroup);
hudiArgs.put(HUDI_HIVE_SYNC_JDBC_URL, jdbcUrl);
hudiArgs.put(HUDI_HIVE_SYNC_DB, featureGroup.getFeatureStore().getName());
hudiArgs.put(HIVE_AUTO_CREATE_DATABASE_OPT_KEY, HIVE_AUTO_CREATE_DATABASE_OPT_VAL);

hudiArgs.put(HUDI_TABLE_OPERATION,operation.getValue());

Expand Down Expand Up @@ -207,7 +210,7 @@ private Map<String, String> setupHudiReadOpts(Long startTimestamp, Long endTimes
}

@SneakyThrows
private String timeStampToHudiFormat(Long commitedOnTimeStamp) {
public String timeStampToHudiFormat(Long commitedOnTimeStamp) {
Date commitedOnDate = new Timestamp(commitedOnTimeStamp);
return dateFormat.format(commitedOnDate);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.List;

import static com.logicalclocks.hsfs.metadata.HopsworksClient.PROJECT_PATH;

Expand All @@ -41,7 +42,7 @@ public class FeatureGroupApi {
public static final String FEATURE_GROUP_ID_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgId}{?updateStatsSettings,"
+ "updateMetadata}";
public static final String FEATURE_GROUP_COMMIT_PATH = FEATURE_GROUP_ID_PATH
+ "/commits{?limit}";
+ "/commits{?sort_by,offset,limit}";
public static final String FEATURE_GROUP_CLEAR_PATH = FEATURE_GROUP_ID_PATH + "/clear";

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class);
Expand Down Expand Up @@ -174,7 +175,7 @@ public FeatureGroupCommit featureGroupCommit(FeatureGroup featureGroup, FeatureG
return hopsworksClient.handleRequest(postRequest, FeatureGroupCommit.class);
}

public FeatureGroupCommit[] commitDetails(FeatureGroup featureGroupBase, Integer limit)
public List<FeatureGroupCommit> commitDetails(FeatureGroup featureGroupBase, Integer limit)
throws IOException, FeatureStoreException {
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = PROJECT_PATH
Expand All @@ -185,10 +186,13 @@ public FeatureGroupCommit[] commitDetails(FeatureGroup featureGroupBase, Integer
.set("projectId", featureGroupBase.getFeatureStore().getProjectId())
.set("fsId", featureGroupBase.getFeatureStore().getId())
.set("fgId", featureGroupBase.getId())
.set("sort_by","committed_on:desc")
.set("offset", 0)
.set("limit", limit)
.expand();

LOGGER.info("Sending metadata request: " + uri);
return hopsworksClient.handleRequest(new HttpGet(uri), FeatureGroupCommit[].class);
FeatureGroupCommit featureGroupCommit = hopsworksClient.handleRequest(new HttpGet(uri), FeatureGroupCommit.class);
return featureGroupCommit.getItems();
}
}
28 changes: 28 additions & 0 deletions python/hsfs/core/feature_group_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from hsfs import client
from hsfs import feature_group
from hsfs import feature_group_commit


class FeatureGroupApi:
Expand Down Expand Up @@ -186,3 +187,30 @@ def commit(self, feature_group_instance, feature_group_commit_instance):
data=feature_group_commit_instance.json(),
),
)

def commit_details(self, feature_group_instance, limit):
"""
Get feature group commit metadata.
# Arguments
feature_group_instance: FeatureGroup, required
metadata object of feature group.
limit: number of commits to retrieve
# Returns
`FeatureGroupCommit`.
"""
_client = client.get_instance()
path_params = [
"project",
_client._project_id,
"featurestores",
self._feature_store_id,
"featuregroups",
feature_group_instance.id,
"commits",
]
headers = {"content-type": "application/json"}
query_params = {"sort_by": "committed_on:desc", "offset": 0, "limit": limit}

return feature_group_commit.FeatureGroupCommit.from_response_json(
_client._send_request("GET", path_params, query_params, headers=headers),
)
25 changes: 24 additions & 1 deletion python/hsfs/core/feature_group_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def insert(
feature_group,
feature_dataframe,
self.APPEND,
operation,
"bulk_insert" if overwrite else operation,
feature_group.online_enabled,
storage,
offline_write_options,
Expand All @@ -123,6 +123,29 @@ def insert(
def delete(self, feature_group):
self._feature_group_api.delete(feature_group)

def commit_details(self, feature_group, limit):
hudi_engine_instance = hudi_engine.HudiEngine(
feature_group.feature_store_id,
feature_group.feature_store_name,
feature_group,
engine.get_instance()._spark_context,
engine.get_instance()._spark_session,
)
feature_group_commits = self._feature_group_api.commit_details(
feature_group, limit
)
commit_details = {}
for feature_group_commit in feature_group_commits:
commit_details[feature_group_commit.commitid] = {
"committedOn": hudi_engine_instance._timestamp_to_hudiformat(
feature_group_commit.commitid
),
"rowsUpdated": feature_group_commit.rows_updated,
"rowsInserted": feature_group_commit.rows_inserted,
"rowsDeleted": feature_group_commit.rows_deleted,
}
return commit_details

@staticmethod
def commit_delete(feature_group, delete_df, write_options):
hudi_engine_instance = hudi_engine.HudiEngine(
Expand Down
9 changes: 4 additions & 5 deletions python/hsfs/core/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ def show(self, n, online=False):
query = self._query_constructor_api.construct_query(self)

if online:
sql_query = query["queryOnline"]
sql_query = query.query_online
online_conn = self._storage_connector_api.get_online_connector()
else:
sql_query = query["query"]
sql_query = query.query
online_conn = None

return engine.get_instance().show(
Expand Down Expand Up @@ -108,9 +108,8 @@ def to_dict(self):
}

def to_string(self, online=False):
return self._query_constructor_api.construct_query(self)[
"queryOnline" if online else "query"
]
fs_query_instance = self._query_constructor_api.construct_query(self)
return fs_query_instance.query_online if online else fs_query_instance.query

def __str__(self):
return self._query_constructor_api.construct_query(self)
Expand Down
15 changes: 12 additions & 3 deletions python/hsfs/core/tfdata_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ def _get_hopsfs_dataset_files(training_dataset_location, split, filter_empty):
for file in all_list:
# remove empty file if any
if filter_empty:
_file_size = hdfs.hdfs("default", 0).get_path_info(file)["size"]
_file_size = hdfs.path.getsize(file)
if _file_size == 0:
include_file = False
else:
Expand Down Expand Up @@ -470,14 +470,23 @@ def _convert_to_tf_dtype(input_type):
try:
tf_type = TFDataEngine.SPARK_TO_TFDTYPES_MAPPINGS[input_type]
except KeyError:
raise ValueError("Unknown type of value, please report to hsfs maintainers")
raise ValueError(
"Type "
+ input_type
+ " is not allowed here. allowed types are '"
+ "', '".join(
[key for key in TFDataEngine.SPARK_TO_TFDTYPES_MAPPINGS.keys()]
)
+ "'. Please refer to `record_defaults` in "
+ "https://www.tensorflow.org/api_docs/python/tf/data/experimental/CsvDataset"
)
return tf_type

@staticmethod
def _convert2float32(input):
if input.dtype == tf.string:
raise ValueError(
"tf.string feature is not allowed here. please provide process=False and preprocess "
"tf.string feature is not allowed here. please set process=False and preprocess "
"dataset accordingly"
)
elif input.dtype in TFDataEngine.SUPPORTED_TFDTYPES:
Expand Down
Loading