Skip to content

Commit

Permalink
[HOPSWORKS-3235] Use Hudi snapshot query when users doesn't specify a… (
Browse files Browse the repository at this point in the history
logicalclocks#700)

* [HOPSWORKS-3235] Use Hudi snapshot query when users doesn't specify as_of in the query param

* Fix Python bugs and improve as_of documentation
  • Loading branch information
SirOibaf authored and kennethmhc committed Nov 16, 2022
1 parent 82ddd43 commit b7114b0
Show file tree
Hide file tree
Showing 7 changed files with 107 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,7 @@ public void registerOnDemandFeatureGroups() throws FeatureStoreException, IOExce

public void registerHudiFeatureGroups(Map<String, String> readOptions) {
for (HudiFeatureGroupAlias hudiFeatureGroupAlias : hudiCachedFeatureGroups) {
String alias = hudiFeatureGroupAlias.getAlias();
FeatureGroupBase featureGroup = hudiFeatureGroupAlias.getFeatureGroup();

SparkEngine.getInstance().registerHudiTemporaryTable(featureGroup, alias,
hudiFeatureGroupAlias.getLeftFeatureGroupStartTimestamp(),
hudiFeatureGroupAlias.getLeftFeatureGroupEndTimestamp(),
readOptions);
SparkEngine.getInstance().registerHudiTemporaryTable(hudiFeatureGroupAlias, readOptions);
}
}
}
17 changes: 12 additions & 5 deletions java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.logicalclocks.hsfs.StreamFeatureGroup;
import com.logicalclocks.hsfs.TimeTravelFormat;
import com.logicalclocks.hsfs.TrainingDataset;
import com.logicalclocks.hsfs.constructor.HudiFeatureGroupAlias;
import com.logicalclocks.hsfs.engine.hudi.HudiEngine;
import com.logicalclocks.hsfs.metadata.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.OnDemandOptions;
Expand Down Expand Up @@ -196,11 +197,17 @@ private Map<String, String> getOnDemandOptions(OnDemandFeatureGroup onDemandFeat
.collect(Collectors.toMap(OnDemandOptions::getName, OnDemandOptions::getValue));
}

public void registerHudiTemporaryTable(FeatureGroupBase featureGroup, String alias,
Long leftFeaturegroupStartTimestamp,
Long leftFeaturegroupEndTimestamp, Map<String, String> readOptions) {
hudiEngine.registerTemporaryTable(sparkSession, featureGroup, alias,
leftFeaturegroupStartTimestamp, leftFeaturegroupEndTimestamp, readOptions);
public void registerHudiTemporaryTable(HudiFeatureGroupAlias hudiFeatureGroupAlias, Map<String, String> readOptions) {
Map<String, String> hudiArgs = hudiEngine.setupHudiReadOpts(
hudiFeatureGroupAlias.getLeftFeatureGroupStartTimestamp(),
hudiFeatureGroupAlias.getLeftFeatureGroupEndTimestamp(),
readOptions);

sparkSession.read()
.format(HudiEngine.HUDI_SPARK_FORMAT)
.options(hudiArgs)
.load(hudiFeatureGroupAlias.getFeatureGroup().getLocation())
.createOrReplaceTempView(hudiFeatureGroupAlias.getAlias());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import com.logicalclocks.hsfs.FeatureStoreException;
import com.logicalclocks.hsfs.HudiOperationType;
import com.logicalclocks.hsfs.StreamFeatureGroup;
import com.logicalclocks.hsfs.constructor.HudiFeatureGroupAlias;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.engine.SparkEngine;
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import com.logicalclocks.hsfs.metadata.FeatureGroupBase;

Expand Down Expand Up @@ -57,8 +57,9 @@

public class HudiEngine {

public static final String HUDI_SPARK_FORMAT = "org.apache.hudi";

protected static final String HUDI_BASE_PATH = "hoodie.base.path";
protected static final String HUDI_SPARK_FORMAT = "org.apache.hudi";
protected static final String HUDI_TABLE_NAME = "hoodie.table.name";
protected static final String HUDI_TABLE_STORAGE_TYPE = "hoodie.datasource.write.storage.type";
protected static final String HUDI_TABLE_OPERATION = "hoodie.datasource.write.operation";
Expand Down Expand Up @@ -90,6 +91,7 @@ public class HudiEngine {
protected static final String HUDI_COPY_ON_WRITE = "COPY_ON_WRITE";
protected static final String HUDI_QUERY_TYPE_OPT_KEY = "hoodie.datasource.query.type";
protected static final String HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental";
protected static final String HUDI_QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot";
protected static final String HUDI_BEGIN_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.begin.instanttime";
protected static final String HUDI_END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime";

Expand Down Expand Up @@ -179,13 +181,9 @@ public <S> FeatureGroupCommit deleteRecord(SparkSession sparkSession, FeatureGro
}
}

public void registerTemporaryTable(SparkSession sparkSession, FeatureGroupBase featureGroup, String alias,
Long startTimestamp, Long endTimestamp, Map<String, String> readOptions) {
Map<String, String> hudiArgs = setupHudiReadOpts(startTimestamp, endTimestamp, readOptions);
sparkSession.read()
.format(HUDI_SPARK_FORMAT)
.options(hudiArgs)
.load(featureGroup.getLocation()).createOrReplaceTempView(alias);
public void registerTemporaryTable(SparkSession sparkSession, HudiFeatureGroupAlias hudiFeatureGroupAlias,
Map<String, String> readOptions) {

}

private FeatureGroupCommit getLastCommitMetadata(SparkSession sparkSession, String basePath)
Expand Down Expand Up @@ -260,19 +258,24 @@ private Map<String, String> setupHudiWriteOpts(FeatureGroupBase featureGroup, Hu
return hudiArgs;
}

private Map<String, String> setupHudiReadOpts(Long startTimestamp, Long endTimestamp,
public Map<String, String> setupHudiReadOpts(Long startTimestamp, Long endTimestamp,
Map<String, String> readOptions) {
Map<String, String> hudiArgs = new HashMap<String, String>();

if (startTimestamp != null) {
hudiArgs.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, utils.timeStampToHudiFormat(startTimestamp));
Map<String, String> hudiArgs = new HashMap<>();
if (endTimestamp != null) {
// if endTimestamp was specified, trigger an incremental query.
hudiArgs.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL);
hudiArgs.put(HUDI_END_INSTANTTIME_OPT_KEY, utils.timeStampToHudiFormat(endTimestamp));

if (startTimestamp != null) {
hudiArgs.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, utils.timeStampToHudiFormat(startTimestamp));
} else {
hudiArgs.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, utils.timeStampToHudiFormat(0L));
}
} else {
hudiArgs.put(HUDI_BEGIN_INSTANTTIME_OPT_KEY, utils.timeStampToHudiFormat(0L));
// if endTimestamp was not specified, trigger a snapshot query
hudiArgs.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_SNAPSHOT_OPT_VAL);
}

hudiArgs.put(HUDI_END_INSTANTTIME_OPT_KEY, utils.timeStampToHudiFormat(endTimestamp));
hudiArgs.put(HUDI_QUERY_TYPE_OPT_KEY, HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL);

// Overwrite with user provided options if any
if (readOptions != null && !readOptions.isEmpty()) {
hudiArgs.putAll(readOptions);
Expand All @@ -282,7 +285,7 @@ private Map<String, String> setupHudiReadOpts(Long startTimestamp, Long endTimes

private void createEmptyTable(SparkSession sparkSession, StreamFeatureGroup streamFeatureGroup)
throws IOException, FeatureStoreException {
Configuration configuration = SparkEngine.getInstance().getSparkSession().sparkContext().hadoopConfiguration();
Configuration configuration = sparkSession.sparkContext().hadoopConfiguration();
Properties properties = new Properties();
properties.putAll(setupHudiWriteOpts((FeatureGroupBase) streamFeatureGroup,
HudiOperationType.BULK_INSERT, null));
Expand Down
10 changes: 1 addition & 9 deletions python/hsfs/constructor/hudi_feature_group_alias.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
#

import humps
import time

from hsfs import feature_group as feature_group_module

Expand All @@ -33,14 +32,7 @@ def __init__(
)
self._alias = alias
self._left_feature_group_start_timestamp = left_feature_group_start_timestamp
self._left_feature_group_end_timestamp = (
left_feature_group_end_timestamp
if left_feature_group_end_timestamp
# if there is no end commit, set the time to now.
# *1000 as Hudi deals with commit times in milliseconds and
# later on HSFS does /1000
else time.time() * 1000
)
self._left_feature_group_end_timestamp = left_feature_group_end_timestamp

@classmethod
def from_response_json(cls, json_dict):
Expand Down
8 changes: 8 additions & 0 deletions python/hsfs/constructor/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ def as_of(self, wallclock_time):
"""Perform time travel on the given Query.
This method returns a new Query object at the specified point in time.
!!! warning
The wallclock_time needs to be a time included into the Hudi active timeline.
By default Hudi keeps the last 20 to 30 commits in the active timeline.
If you need to keep a longer active timeline, you can overwrite the options:
`hoodie.keep.min.commits` and `hoodie.keep.max.commits`
when calling the `insert()` method.
This can then either be read into a Dataframe or used further to perform joins
or construct a training dataset.
Expand Down
112 changes: 60 additions & 52 deletions python/hsfs/core/hudi_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class HudiEngine:
HUDI_QUERY_TYPE_OPT_KEY = "hoodie.datasource.query.type"
HUDI_QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"
HUDI_QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
HUDI_BEGIN_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.begin.instanttime"
HUDI_END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime"
PAYLOAD_CLASS_OPT_KEY = "hoodie.datasource.write.payload.class"
Expand All @@ -75,30 +76,6 @@ def __init__(
self._spark_session = spark_session
self._feature_store_id = feature_store_id
self._feature_store_name = feature_store_name
self._base_path = self._feature_group.location
self._table_name = feature_group._get_online_table_name()

self._primary_key = ",".join(feature_group.primary_key)

# add event time to primary key for upserts
if feature_group.event_time is not None:
self._primary_key = self._primary_key + "," + feature_group.event_time

self._partition_key = (
",".join(feature_group.partition_key)
if len(feature_group.partition_key) >= 1
else ""
)
self._partition_path = (
":SIMPLE,".join(feature_group.partition_key) + ":SIMPLE"
if len(feature_group.partition_key) >= 1
else ""
)
self._pre_combine_key = (
feature_group.hudi_precombine_key
if feature_group.hudi_precombine_key
else feature_group.primary_key[0]
)

self._feature_group_api = feature_group_api.FeatureGroupApi(feature_store_id)
self._storage_connector_api = storage_connector_api.StorageConnectorApi(
Expand Down Expand Up @@ -132,46 +109,68 @@ def delete_record(self, delete_df, write_options):
)
return self._feature_group_api.commit(self._feature_group, fg_commit)

def register_temporary_table(
self, alias, start_timestamp, end_timestamp, read_options
):
hudi_options = self._setup_hudi_read_opts(
start_timestamp, end_timestamp, read_options
)
def register_temporary_table(self, hudi_fg_alias, read_options):
hudi_options = self._setup_hudi_read_opts(hudi_fg_alias, read_options)
self._spark_session.read.format(self.HUDI_SPARK_FORMAT).options(
**hudi_options
).load(self._base_path).createOrReplaceTempView(alias)
).load(self._feature_group.location).createOrReplaceTempView(
hudi_fg_alias.alias
)

def _write_hudi_dataset(self, dataset, save_mode, operation, write_options):
hudi_options = self._setup_hudi_write_opts(operation, write_options)
dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode(
save_mode
).save(self._base_path)
).save(self._feature_group.location)

feature_group_commit = self._get_last_commit_metadata(
self._spark_context, self._base_path
self._spark_context, self._feature_group.location
)

return feature_group_commit

def _setup_hudi_write_opts(self, operation, write_options):
_jdbc_url = self._get_conn_str()
table_name = self._feature_group._get_online_table_name()

primary_key = ",".join(self._feature_group.primary_key)

# add event time to primary key for upserts
if self._feature_group.event_time is not None:
primary_key = primary_key + "," + self._feature_group.event_time

partition_key = (
",".join(self._feature_group.partition_key)
if len(self._feature_group.partition_key) >= 1
else ""
)
partition_path = (
":SIMPLE,".join(self._feature_group.partition_key) + ":SIMPLE"
if len(self._feature_group.partition_key) >= 1
else ""
)
pre_combine_key = (
self._feature_group.hudi_precombine_key
if self._feature_group.hudi_precombine_key
else self._feature_group.primary_key[0]
)

jdbc_url = self._get_conn_str()
hudi_options = {
self.HUDI_KEY_GENERATOR_OPT_KEY: self.HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL,
self.HUDI_PRECOMBINE_FIELD: self._pre_combine_key[0]
if isinstance(self._pre_combine_key, list)
else self._pre_combine_key,
self.HUDI_RECORD_KEY: self._primary_key,
self.HUDI_PARTITION_FIELD: self._partition_path,
self.HUDI_TABLE_NAME: self._table_name,
if isinstance(pre_combine_key, list)
else pre_combine_key,
self.HUDI_RECORD_KEY: primary_key,
self.HUDI_PARTITION_FIELD: partition_path,
self.HUDI_TABLE_NAME: table_name,
self.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY: self.DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL
if len(self._partition_key) >= 1
if len(partition_key) >= 1
else self.HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
self.HUDI_HIVE_SYNC_ENABLE: "true",
self.HUDI_HIVE_SYNC_TABLE: self._table_name,
self.HUDI_HIVE_SYNC_JDBC_URL: _jdbc_url,
self.HUDI_HIVE_SYNC_TABLE: table_name,
self.HUDI_HIVE_SYNC_JDBC_URL: jdbc_url,
self.HUDI_HIVE_SYNC_DB: self._feature_store_name,
self.HUDI_HIVE_SYNC_PARTITION_FIELDS: self._partition_key,
self.HUDI_HIVE_SYNC_PARTITION_FIELDS: partition_key,
self.HUDI_TABLE_OPERATION: operation,
self.HUDI_HIVE_SYNC_SUPPORT_TIMESTAMP: "true",
}
Expand All @@ -182,15 +181,24 @@ def _setup_hudi_write_opts(self, operation, write_options):

return hudi_options

def _setup_hudi_read_opts(self, start_timestamp, end_timestamp, read_options):
_hudi_commit_start_time = util.get_hudi_datestr_from_timestamp(start_timestamp)
_hudi_commit_end_time = util.get_hudi_datestr_from_timestamp(end_timestamp)

hudi_options = {
self.HUDI_QUERY_TYPE_OPT_KEY: self.HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL,
self.HUDI_BEGIN_INSTANTTIME_OPT_KEY: _hudi_commit_start_time,
self.HUDI_END_INSTANTTIME_OPT_KEY: _hudi_commit_end_time,
}
def _setup_hudi_read_opts(self, hudi_fg_alias, read_options):
if hudi_fg_alias.left_feature_group_end_timestamp:
_hudi_commit_start_time = util.get_hudi_datestr_from_timestamp(
hudi_fg_alias.left_feature_group_start_timestamp
)
_hudi_commit_end_time = util.get_hudi_datestr_from_timestamp(
hudi_fg_alias.left_feature_group_end_timestamp
)

hudi_options = {
self.HUDI_QUERY_TYPE_OPT_KEY: self.HUDI_QUERY_TYPE_INCREMENTAL_OPT_VAL,
self.HUDI_BEGIN_INSTANTTIME_OPT_KEY: _hudi_commit_start_time,
self.HUDI_END_INSTANTTIME_OPT_KEY: _hudi_commit_end_time,
}
else:
hudi_options = {
self.HUDI_QUERY_TYPE_OPT_KEY: self.HUDI_QUERY_TYPE_SNAPSHOT_OPT_VAL,
}

if read_options:
hudi_options.update(read_options)
Expand Down
10 changes: 3 additions & 7 deletions python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,7 @@ def register_hudi_temporary_table(
self._spark_session,
)
hudi_engine_instance.register_temporary_table(
hudi_fg_alias.alias,
hudi_fg_alias.left_feature_group_start_timestamp,
hudi_fg_alias.left_feature_group_end_timestamp,
hudi_fg_alias,
read_options,
)

Expand Down Expand Up @@ -203,10 +201,8 @@ def save_dataframe(
self._save_online_dataframe(
feature_group, dataframe, online_write_options
)
except Exception:
raise FeatureStoreException(
"Error writing to offline and online feature store"
)
except Exception as e:
raise FeatureStoreException(e)

def save_stream_dataframe(
self,
Expand Down

0 comments on commit b7114b0

Please sign in to comment.