-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HOPSWORKS-1982] Deequ statistics for Feature Groups/Training Datasets #96
Changes from 9 commits
f9f8b3c
d3fdd97
ad85913
98bc08a
6316fe7
2a93b4f
450e450
0e2c756
9776866
c39b449
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,15 +18,20 @@ | |
|
||
import com.fasterxml.jackson.annotation.JsonIgnore; | ||
import com.fasterxml.jackson.annotation.JsonIgnoreProperties; | ||
import com.fasterxml.jackson.annotation.JsonProperty; | ||
import com.logicalclocks.hsfs.engine.FeatureGroupEngine; | ||
import com.logicalclocks.hsfs.engine.StatisticsEngine; | ||
import com.logicalclocks.hsfs.metadata.Query; | ||
import com.logicalclocks.hsfs.metadata.Statistics; | ||
import lombok.Builder; | ||
import lombok.Getter; | ||
import lombok.NonNull; | ||
import lombok.Setter; | ||
import org.apache.spark.sql.Dataset; | ||
import org.apache.spark.sql.Row; | ||
import org.apache.spark.sql.SaveMode; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.util.Date; | ||
|
@@ -69,6 +74,21 @@ public class FeatureGroup { | |
@Getter @Setter | ||
private String type = "cachedFeaturegroupDTO"; | ||
|
||
@Getter @Setter | ||
@JsonProperty("descStatsEnabled") | ||
private Boolean statisticsEnabled; | ||
|
||
@Getter @Setter | ||
@JsonProperty("featHistEnabled") | ||
private Boolean histograms; | ||
|
||
@Getter @Setter | ||
@JsonProperty("featCorrEnabled") | ||
private Boolean correlations; | ||
|
||
@Getter @Setter | ||
private List<String> statisticColumns; | ||
|
||
@JsonIgnore | ||
// These are only used in the client. In the server they are aggregated in the `features` field | ||
private List<String> primaryKeys; | ||
|
@@ -78,11 +98,15 @@ public class FeatureGroup { | |
private List<String> partitionKeys; | ||
|
||
private FeatureGroupEngine featureGroupEngine = new FeatureGroupEngine(); | ||
private StatisticsEngine statisticsEngine = new StatisticsEngine(EntityEndpointType.FEATURE_GROUP); | ||
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroup.class); | ||
|
||
@Builder | ||
public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description, | ||
List<String> primaryKeys, List<String> partitionKeys, | ||
boolean onlineEnabled, Storage defaultStorage, List<Feature> features) | ||
List<String> primaryKeys, List<String> partitionKeys, boolean onlineEnabled, | ||
Storage defaultStorage, List<Feature> features, Boolean statisticsEnabled, Boolean histograms, | ||
Boolean correlations, List<String> statisticColumns) | ||
throws FeatureStoreException { | ||
|
||
this.featureStore = featureStore; | ||
|
@@ -94,6 +118,10 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver | |
this.onlineEnabled = onlineEnabled; | ||
this.defaultStorage = defaultStorage != null ? defaultStorage : Storage.OFFLINE; | ||
this.features = features; | ||
this.statisticsEnabled = statisticsEnabled; | ||
this.histograms = histograms; | ||
this.correlations = correlations; | ||
this.statisticColumns = statisticColumns; | ||
} | ||
|
||
public FeatureGroup() { | ||
|
@@ -137,6 +165,9 @@ public void save(Dataset<Row> featureData) throws FeatureStoreException, IOExcep | |
public void save(Dataset<Row> featureData, Map<String, String> writeOptions) | ||
throws FeatureStoreException, IOException { | ||
featureGroupEngine.saveFeatureGroup(this, featureData, primaryKeys, partitionKeys, defaultStorage, writeOptions); | ||
if (statisticsEnabled) { | ||
statisticsEngine.computeStatistics(this, featureData); | ||
} | ||
} | ||
|
||
public void insert(Dataset<Row> featureData, Storage storage) throws IOException, FeatureStoreException { | ||
|
@@ -161,12 +192,69 @@ public void insert(Dataset<Row> featureData, Storage storage, boolean overwrite, | |
throws FeatureStoreException, IOException { | ||
featureGroupEngine.saveDataframe(this, featureData, storage, | ||
overwrite ? SaveMode.Overwrite : SaveMode.Append, writeOptions); | ||
computeStatistics(); | ||
} | ||
|
||
public void delete() throws FeatureStoreException, IOException { | ||
featureGroupEngine.delete(this); | ||
} | ||
|
||
/** | ||
* Update the statistics configuration of the feature group. | ||
* Change the `statisticsEnabled`, `histograms`, `correlations` or `statisticColumns` attributes and persist | ||
* the changes by calling this method. | ||
* | ||
* @throws FeatureStoreException | ||
* @throws IOException | ||
*/ | ||
public void updateStatisticsConfig() throws FeatureStoreException, IOException { | ||
featureGroupEngine.updateStatisticsConfig(this); | ||
} | ||
|
||
/** | ||
* Recompute the statistics for the feature group and save them to the feature store. | ||
* | ||
* @return statistics object of computed statistics | ||
* @throws FeatureStoreException | ||
* @throws IOException | ||
*/ | ||
public Statistics computeStatistics() throws FeatureStoreException, IOException { | ||
if (statisticsEnabled) { | ||
if (defaultStorage == Storage.ALL || defaultStorage == Storage.OFFLINE) { | ||
return statisticsEngine.computeStatistics(this, read(Storage.OFFLINE)); | ||
} else { | ||
LOGGER.info("StorageWarning: The default storage of feature group `" + name + "`, with version `" + version | ||
+ "`, is `" + defaultStorage + "`. Statistics are only computed for default storage `offline and `all`."); | ||
} | ||
} | ||
return null; | ||
} | ||
|
||
/** | ||
* Get the last statistics commit for the feature group. | ||
* | ||
* @return statistics object of latest commit | ||
* @throws FeatureStoreException | ||
* @throws IOException | ||
*/ | ||
@JsonIgnore | ||
public Statistics getStatistics() throws FeatureStoreException, IOException { | ||
return statisticsEngine.getLast(this); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This validates my point in the python api. Here we return an object containing commit_time, content. Which I think is good There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also the object in python contains content and commit time as only accessible members |
||
} | ||
|
||
/** | ||
* Get the statistics of a specific commit time for the feature group. | ||
* | ||
* @param commitTime commit time in the format "YYYYMMDDhhmmss" | ||
* @return statistics object for the commit time | ||
* @throws FeatureStoreException | ||
* @throws IOException | ||
*/ | ||
@JsonIgnore | ||
public Statistics getStatistics(String commitTime) throws FeatureStoreException, IOException { | ||
return statisticsEngine.get(this, commitTime); | ||
} | ||
|
||
/** | ||
* Add a tag without value to the feature group. | ||
* | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this call the
computeStatistics()
method? Otherwise you might end up computing feature for the online feature store. which is not bad per se in this case, as you are not query NDB, but might confuse users.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the confusion part, I just wanted to reuse the dataframe as we already have it, instead of rereading it. I am not sure spark is smart enough to recognize that it's already there.
On the other hand this way it would always allow the user to have the statistics from the very first creation of the featuregroup even if it is purely online.