From cf9fa7285f3057e20ad7c17593035522071fc9e5 Mon Sep 17 00:00:00 2001 From: davitbzh <44586065+davitbzh@users.noreply.github.com> Date: Thu, 7 Jan 2021 10:46:57 +0100 Subject: [PATCH] [HOPSWORKS-2188] Remove partition key requirements for HUDI feature groups and allow users to set precombine keys (#198) Also fixes [HOPSWORKS-2196] remove hudiEnabled. --- .../java/com/logicalclocks/hsfs/Feature.java | 4 +++ .../com/logicalclocks/hsfs/FeatureGroup.java | 20 ++++++++++--- .../hsfs/engine/FeatureGroupEngine.java | 18 ++++++++++- .../logicalclocks/hsfs/engine/HudiEngine.java | 23 ++++++++------ python/hsfs/core/feature_group_engine.py | 5 ++++ python/hsfs/core/hudi_engine.py | 30 +++++++++++++++---- python/hsfs/feature.py | 12 ++++++++ python/hsfs/feature_group.py | 25 ++++++++++++++-- python/hsfs/feature_store.py | 6 ++++ 9 files changed, 120 insertions(+), 23 deletions(-) diff --git a/java/src/main/java/com/logicalclocks/hsfs/Feature.java b/java/src/main/java/com/logicalclocks/hsfs/Feature.java index 96e5dcc389..d8cac4b426 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/Feature.java +++ b/java/src/main/java/com/logicalclocks/hsfs/Feature.java @@ -53,6 +53,10 @@ public class Feature { @Setter private Boolean partition; + @Getter + @Setter + private Boolean hudiPrecombineKey = false; + @Getter @Setter private String defaultValue; diff --git a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java index baf002e8ae..3bcc5b553b 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java +++ b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java @@ -85,6 +85,10 @@ public class FeatureGroup extends FeatureGroupBase { // These are only used in the client. In the server they are aggregated in the `features` field private List partitionKeys; + @JsonIgnore + // This is only used in the client. In the server they are aggregated in the `features` field + private String hudiPrecombineKey; + private FeatureGroupEngine featureGroupEngine = new FeatureGroupEngine(); private StatisticsEngine statisticsEngine = new StatisticsEngine(EntityEndpointType.FEATURE_GROUP); @@ -92,15 +96,17 @@ public class FeatureGroup extends FeatureGroupBase { @Builder public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description, - List primaryKeys, List partitionKeys, boolean onlineEnabled, - TimeTravelFormat timeTravelFormat, List features, Boolean statisticsEnabled, - Boolean histograms, Boolean correlations, List statisticColumns) { + List primaryKeys, List partitionKeys, String hudiPrecombineKey, + boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List features, + Boolean statisticsEnabled, Boolean histograms, Boolean correlations, + List statisticColumns) { this.featureStore = featureStore; this.name = name; this.version = version; this.description = description; this.primaryKeys = primaryKeys; this.partitionKeys = partitionKeys; + this.hudiPrecombineKey = timeTravelFormat == TimeTravelFormat.HUDI ? hudiPrecombineKey : null; this.onlineEnabled = onlineEnabled; this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI; this.features = features; @@ -198,7 +204,8 @@ public void save(Dataset featureData) throws FeatureStoreException, IOExcep public void save(Dataset featureData, Map writeOptions) throws FeatureStoreException, IOException { - featureGroupEngine.saveFeatureGroup(this, featureData, primaryKeys, partitionKeys, writeOptions); + featureGroupEngine.saveFeatureGroup(this, featureData, primaryKeys, partitionKeys, hudiPrecombineKey, + writeOptions); if (statisticsEnabled) { statisticsEngine.computeStatistics(this, featureData); } @@ -208,6 +215,11 @@ public void insert(Dataset featureData) throws IOException, FeatureStoreExc insert(featureData, null, false); } + public void insert(Dataset featureData, Map writeOptions) + throws FeatureStoreException, IOException { + insert(featureData, null, false, null, writeOptions); + } + public void insert(Dataset featureData, Storage storage) throws IOException, FeatureStoreException { insert(featureData, storage, false, null, null); } diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java index d347d11884..ad464024fb 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java @@ -16,6 +16,7 @@ package com.logicalclocks.hsfs.engine; +import com.logicalclocks.hsfs.Feature; import com.logicalclocks.hsfs.FeatureGroup; import com.logicalclocks.hsfs.FeatureGroupCommit; import com.logicalclocks.hsfs.FeatureStoreException; @@ -57,7 +58,7 @@ public class FeatureGroupEngine { * @throws IOException */ public void saveFeatureGroup(FeatureGroup featureGroup, Dataset dataset, List primaryKeys, - List partitionKeys, Map writeOptions) + List partitionKeys, String hudiPrecombineKey, Map writeOptions) throws FeatureStoreException, IOException { if (featureGroup.getFeatures() == null) { @@ -86,6 +87,15 @@ public void saveFeatureGroup(FeatureGroup featureGroup, Dataset dataset, Li })); } + /* set hudi precombine key name */ + if (hudiPrecombineKey != null) { + featureGroup.getFeatures().forEach(f -> { + if (f.getName().equals(hudiPrecombineKey)) { + f.setHudiPrecombineKey(true); + } + }); + } + // Send Hopsworks the request to create a new feature group FeatureGroup apiFG = featureGroupApi.save(featureGroup); @@ -102,6 +112,12 @@ public void saveFeatureGroup(FeatureGroup featureGroup, Dataset dataset, Li featureGroup.setCorrelations(apiFG.getCorrelations()); featureGroup.setHistograms(apiFG.getHistograms()); + /* if hudi precombine key was not provided and TimeTravelFormat is HUDI, retrieve from backend and set */ + if (featureGroup.getTimeTravelFormat() == TimeTravelFormat.HUDI & hudiPrecombineKey == null) { + List features = apiFG.getFeatures(); + featureGroup.setFeatures(features); + } + // Write the dataframe saveDataframe(featureGroup, dataset, null, SaveMode.Append, featureGroup.getTimeTravelFormat() == TimeTravelFormat.HUDI diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/HudiEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/HudiEngine.java index c84ac3272a..e933852d58 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/HudiEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/HudiEngine.java @@ -16,6 +16,7 @@ package com.logicalclocks.hsfs.engine; +import com.logicalclocks.hsfs.Feature; import com.logicalclocks.hsfs.FeatureGroup; import com.logicalclocks.hsfs.FeatureGroupCommit; import com.logicalclocks.hsfs.FeatureStoreException; @@ -67,6 +68,8 @@ public class HudiEngine { "hoodie.datasource.hive_sync.partition_extractor_class"; private static final String DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL = "org.apache.hudi.hive.MultiPartKeysValueExtractor"; + private static final String HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL = + "org.apache.hudi.hive.NonPartitionedExtractor"; private static final String HIVE_AUTO_CREATE_DATABASE_OPT_KEY = "hoodie.datasource.hive_sync.auto_create_database"; private static final String HIVE_AUTO_CREATE_DATABASE_OPT_VAL = "false"; @@ -155,24 +158,26 @@ private Map setupHudiWriteOpts(FeatureGroup featureGroup, HudiOp // primary keys Seq primaryColumns = utils.getPrimaryColumns(featureGroup); - if (primaryColumns.isEmpty()) { - throw new FeatureStoreException("For time travel enabled feature groups You must provide at least 1 primary key"); - } hudiArgs.put(HUDI_RECORD_KEY, primaryColumns.mkString(",")); + // table name + String tableName = utils.getFgName(featureGroup); + hudiArgs.put(HUDI_TABLE_NAME, tableName); + // partition keys Seq partitionColumns = utils.getPartitionColumns(featureGroup); if (!partitionColumns.isEmpty()) { hudiArgs.put(HUDI_PARTITION_FIELD, partitionColumns.mkString(":SIMPLE,") + ":SIMPLE"); - // For precombine key take 1st primary key - hudiArgs.put(HUDI_PRECOMBINE_FIELD, primaryColumns.head()); hudiArgs.put(HUDI_HIVE_SYNC_PARTITION_FIELDS, partitionColumns.mkString(",")); + hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL); + } else { + hudiArgs.put(HUDI_PARTITION_FIELD, ""); + hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL); } - // table name - String tableName = utils.getFgName(featureGroup); - hudiArgs.put(HUDI_TABLE_NAME, tableName); - hudiArgs.put(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL); + String precombineKey = featureGroup.getFeatures().stream().filter(Feature::getHudiPrecombineKey).findFirst() + .orElseThrow(() -> new FeatureStoreException("Can't find hudi precombine key")).getName(); + hudiArgs.put(HUDI_PRECOMBINE_FIELD, precombineKey); // Hive args hudiArgs.put(HUDI_HIVE_SYNC_ENABLE, "true"); diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py index bce017d389..33f0306d1c 100644 --- a/python/hsfs/core/feature_group_engine.py +++ b/python/hsfs/core/feature_group_engine.py @@ -38,6 +38,11 @@ def save(self, feature_group, feature_dataframe, write_options): feat.primary = True if feat.name in feature_group.partition_key: feat.partition = True + if ( + feature_group.hudi_precombine_key is not None + and feat.name == feature_group.hudi_precombine_key + ): + feat.hudi_precombine_key = True self._feature_group_api.save(feature_group) diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py index db47b13382..29a9773e45 100644 --- a/python/hsfs/core/hudi_engine.py +++ b/python/hsfs/core/hudi_engine.py @@ -42,6 +42,9 @@ class HudiEngine: DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL = ( "org.apache.hudi.hive.MultiPartKeysValueExtractor" ) + HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL = ( + "org.apache.hudi.hive.NonPartitionedExtractor" + ) HUDI_COPY_ON_WRITE = "COPY_ON_WRITE" HUDI_BULK_INSERT = "bulk_insert" HUDI_INSERT = "insert" @@ -71,9 +74,21 @@ def __init__( self._table_name = feature_group.name + "_" + str(feature_group.version) self._primary_key = ",".join(feature_group.primary_key) - self._partition_key = ",".join(feature_group.partition_key) - self._partition_path = ":SIMPLE,".join(feature_group.partition_key) + ":SIMPLE" - self._pre_combine_key = feature_group.primary_key[0] + self._partition_key = ( + ",".join(feature_group.partition_key) + if len(feature_group.partition_key) >= 1 + else "" + ) + self._partition_path = ( + ":SIMPLE,".join(feature_group.partition_key) + ":SIMPLE" + if len(feature_group.partition_key) >= 1 + else "" + ) + self._pre_combine_key = ( + feature_group.hudi_precombine_key + if feature_group.hudi_precombine_key + else feature_group.primary_key[0] + ) self._feature_group_api = feature_group_api.FeatureGroupApi(feature_store_id) self._storage_connector_api = storage_connector_api.StorageConnectorApi( @@ -108,7 +123,6 @@ def register_temporary_table( def _write_hudi_dataset(self, dataset, save_mode, operation, write_options): hudi_options = self._setup_hudi_write_opts(operation, write_options) - dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode( save_mode ).save(self._base_path) @@ -123,11 +137,15 @@ def _setup_hudi_write_opts(self, operation, write_options): _jdbc_url = self._get_conn_str() hudi_options = { self.HUDI_KEY_GENERATOR_OPT_KEY: self.HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL, - self.HUDI_PRECOMBINE_FIELD: self._pre_combine_key, + self.HUDI_PRECOMBINE_FIELD: self._pre_combine_key[0] + if isinstance(self._pre_combine_key, list) + else self._pre_combine_key, self.HUDI_RECORD_KEY: self._primary_key, self.HUDI_PARTITION_FIELD: self._partition_path, self.HUDI_TABLE_NAME: self._table_name, - self.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY: self.DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL, + self.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY: self.DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL + if len(self._partition_key) >= 1 + else self.HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL, self.HUDI_HIVE_SYNC_ENABLE: "true", self.HUDI_HIVE_SYNC_TABLE: self._table_name, self.HUDI_HIVE_SYNC_JDBC_URL: _jdbc_url, diff --git a/python/hsfs/feature.py b/python/hsfs/feature.py index 4e29c997de..ae775ea811 100644 --- a/python/hsfs/feature.py +++ b/python/hsfs/feature.py @@ -35,6 +35,7 @@ def __init__( description=None, primary=None, partition=None, + hudi_precombine_key=None, online_type=None, default_value=None, feature_group_id=None, @@ -45,6 +46,7 @@ def __init__( self._description = description self._primary = primary or False self._partition = partition or False + self._hudi_precombine_key = hudi_precombine_key or False self._online_type = online_type self._default_value = default_value if feature_group is not None: @@ -58,6 +60,7 @@ def to_dict(self): "type": self._type, "description": self._description, "partition": self._partition, + "hudiPrecombineKey": self._hudi_precombine_key, "primary": self._primary, "onlineType": self._online_type, "defaultValue": self._default_value, @@ -123,6 +126,15 @@ def partition(self): def partition(self, partition): self._partition = partition + @property + def hudi_precombine_key(self): + """Whether the feature is part of the hudi precombine key of the feature group.""" + return self._hudi_precombine_key + + @hudi_precombine_key.setter + def hudi_precombine_key(self, hudi_precombine_key): + self._hudi_precombine_key = hudi_precombine_key + @property def default_value(self): """Default value of the feature as string, if the feature was appended to the diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py index e1222df24f..343e184fb5 100644 --- a/python/hsfs/feature_group.py +++ b/python/hsfs/feature_group.py @@ -250,6 +250,7 @@ def __init__( description="", partition_key=None, primary_key=None, + hudi_precombine_key=None, featurestore_name=None, created=None, creator=None, @@ -263,7 +264,6 @@ def __init__( statistic_columns=None, online_enabled=False, time_travel_format=None, - hudi_enabled=False, statistics_config=None, ): super().__init__(featurestore_id) @@ -287,7 +287,6 @@ def __init__( self._time_travel_format = ( time_travel_format.upper() if time_travel_format is not None else None ) - self._hudi_enabled = hudi_enabled if id is not None: # initialized by backend @@ -303,12 +302,23 @@ def __init__( self._partition_key = [ feat.name for feat in self._features if feat.partition is True ] - + if time_travel_format.upper() == "HUDI": + # hudi precombine key is always a single feature + self._hudi_precombine_key = [ + feat.name + for feat in self._features + if feat.hudi_precombine_key is True + ][0] + else: + self._hudi_precombine_key = None else: # initialized by user self.statistics_config = statistics_config self._primary_key = primary_key self._partition_key = partition_key + self._hudi_precombine_key = ( + hudi_precombine_key if time_travel_format.upper() == "HUDI" else None + ) self._feature_group_engine = feature_group_engine.FeatureGroupEngine( featurestore_id @@ -759,6 +769,11 @@ def partition_key(self): """List of features building the partition key.""" return self._partition_key + @property + def hudi_precombine_key(self): + """Feature name that is the hudi precombine key.""" + return self._hudi_precombine_key + @property def feature_store_id(self): return self._feature_store_id @@ -802,6 +817,10 @@ def primary_key(self, new_primary_key): def partition_key(self, new_partition_key): self._partition_key = new_partition_key + @hudi_precombine_key.setter + def hudi_precombine_key(self, hudi_precombine_key): + self._hudi_precombine_key = hudi_precombine_key + @online_enabled.setter def online_enabled(self, new_online_enabled): self._online_enabled = new_online_enabled diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py index 160fef4282..ad6db1c27a 100644 --- a/python/hsfs/feature_store.py +++ b/python/hsfs/feature_store.py @@ -229,6 +229,7 @@ def create_feature_group( time_travel_format: Optional[str] = "HUDI", partition_key: Optional[List[str]] = [], primary_key: Optional[List[str]] = [], + hudi_precombine_key: Optional[str] = None, features: Optional[List[feature.Feature]] = [], statistics_config: Optional[Union[StatisticsConfig, bool, dict]] = None, ): @@ -260,6 +261,10 @@ def create_feature_group( features and will be used as joining key, if not specified otherwise. Defaults to empty list `[]`, and the first column of the DataFrame will be used as primary key. + hudi_precombine_key: A feature name to be used as a precombine key for the `"HUDI"` + feature group. Defaults to `None`. If feature group has time travel format + `"HUDI"` and hudi precombine key was not specified then the first primary key of + the feature group will be used as hudi precombine key. features: Optionally, define the schema of the feature group manually as a list of `Feature` objects. Defaults to empty list `[]` and will use the schema information of the DataFrame provided in the `save` method. @@ -282,6 +287,7 @@ def create_feature_group( time_travel_format=time_travel_format, partition_key=partition_key, primary_key=primary_key, + hudi_precombine_key=hudi_precombine_key, featurestore_id=self._id, featurestore_name=self._name, features=features,