Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HOPSWORKS-1982] Deequ statistics for Feature Groups/Training Datasets #96

Merged
merged 10 commits into from
Sep 25, 2020
Prev Previous commit
Next Next commit
finalize java stats
moritzmeister committed Aug 26, 2020
commit 6316fe769c0a9066a674a7e9680810118e88c35e
51 changes: 44 additions & 7 deletions java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java
Original file line number Diff line number Diff line change
@@ -199,14 +199,26 @@ public void delete() throws FeatureStoreException, IOException {
featureGroupEngine.delete(this);
}

/**
* Update the statistics configuration of the feature group.
* Change the `statisticsEnabled`, `histograms`, `correlations` or `statisticColumns` attributes and persist
* the changes by calling this method.
*
* @throws FeatureStoreException
* @throws IOException
*/
public void updateStatisticsConfig() throws FeatureStoreException, IOException {
featureGroupEngine.updateStatisticsConfig(this);
}

/**
* Recompute the statistics for the feature group and save them to the feature store.
*
* @return statistics object of computed statistics
* @throws FeatureStoreException
* @throws IOException
*/
public Statistics computeStatistics() throws FeatureStoreException, IOException {
/**
* Recompute the statistics for the feature group and save them to the feature store.
*
* @return statistics object of computed statistics
* @throws FeatureStoreException
* @throws IOException
*/
if (statisticsEnabled) {
if (defaultStorage == Storage.ALL || defaultStorage == Storage.OFFLINE) {
return statisticsEngine.computeStatistics(this, read(Storage.OFFLINE));
@@ -218,6 +230,31 @@ public Statistics computeStatistics() throws FeatureStoreException, IOException
return null;
}

/**
* Get the last statistics commit for the feature group.
*
* @return statistics object of latest commit
* @throws FeatureStoreException
* @throws IOException
*/
@JsonIgnore
public Statistics getStatistics() throws FeatureStoreException, IOException {
return statisticsEngine.getLast(this);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This validates my point in the python api. Here we return an object containing commit_time, content. Which I think is good

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the object in python contains content and commit time as only accessible members

}

/**
* Get the statistics of a specific commit time for the feature group.
*
* @param commitTime commit time in the format "YYYYMMDDhhmmss"
* @return statistics object for the commit time
* @throws FeatureStoreException
* @throws IOException
*/
@JsonIgnore
public Statistics getStatistics(String commitTime) throws FeatureStoreException, IOException {
return statisticsEngine.get(this, commitTime);
}

/**
* Add a tag without value to the feature group.
*
23 changes: 23 additions & 0 deletions java/src/main/java/com/logicalclocks/hsfs/TrainingDataset.java
Original file line number Diff line number Diff line change
@@ -300,6 +300,29 @@ public Statistics computeStatistics() throws FeatureStoreException, IOException
return null;
}

/**
* Get the last statistics commit for the training dataset.
*
* @return statistics object of latest commit
* @throws FeatureStoreException
* @throws IOException
*/
public Statistics getStatistics() throws FeatureStoreException, IOException {
return statisticsEngine.getLast(this);
}

/**
* Get the statistics of a specific commit time for the training dataset.
*
* @param commitTime commit time in the format "YYYYMMDDhhmmss"
* @return statistics object for the commit time
* @throws FeatureStoreException
* @throws IOException
*/
public Statistics getStatistics(String commitTime) throws FeatureStoreException, IOException {
return statisticsEngine.get(this, commitTime);
}

/**
* Add a tag without value to the training dataset.
*
Original file line number Diff line number Diff line change
@@ -178,4 +178,12 @@ public Map<String, String> getTag(FeatureGroup featureGroup, String name) throws
public void deleteTag(FeatureGroup featureGroup, String name) throws FeatureStoreException, IOException {
tagsApi.deleteTag(featureGroup, name);
}

public void updateStatisticsConfig(FeatureGroup featureGroup) throws FeatureStoreException, IOException {
FeatureGroup apiFG = featureGroupApi.updateStatsConfig(featureGroup);
featureGroup.setStatisticsEnabled(apiFG.getStatisticsEnabled());
featureGroup.setCorrelations(apiFG.getCorrelations());
featureGroup.setHistograms(apiFG.getHistograms());
featureGroup.setStatisticColumns(apiFG.getStatisticColumns());
}
}
Original file line number Diff line number Diff line change
@@ -44,4 +44,20 @@ private Statistics computeStatistics(Dataset<Row> dataFrame, List<String> statis
String content = SparkEngine.getInstance().profile(dataFrame, statisticColumns, histograms, correlations);
return new Statistics(commitTime, content);
}

public Statistics get(FeatureGroup featureGroup, String commitTime) throws FeatureStoreException, IOException {
return statisticsApi.get(featureGroup, commitTime);
}

public Statistics get(TrainingDataset trainingDataset, String commitTime) throws FeatureStoreException, IOException {
return statisticsApi.get(trainingDataset, commitTime);
}

public Statistics getLast(FeatureGroup featureGroup) throws FeatureStoreException, IOException {
return statisticsApi.getLast(featureGroup);
}

public Statistics getLast(TrainingDataset trainingDataset) throws FeatureStoreException, IOException {
return statisticsApi.getLast(trainingDataset);
}
}
Original file line number Diff line number Diff line change
@@ -17,18 +17,21 @@
package com.logicalclocks.hsfs.metadata;

import com.damnhandy.uri.template.UriTemplate;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.logicalclocks.hsfs.FeatureGroup;
import com.logicalclocks.hsfs.FeatureStore;
import com.logicalclocks.hsfs.FeatureStoreException;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.entity.StringEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

import static com.logicalclocks.hsfs.metadata.HopsworksClient.PROJECT_PATH;

@@ -38,7 +41,7 @@ public class FeatureGroupApi {
public static final String FEATURE_GROUP_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgName}{?version}";
public static final String FEATURE_GROUP_ID_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgId}";
public static final String FEATURE_GROUP_CLEAR_PATH = FEATURE_GROUP_ID_PATH + "/clear";
public static final String FEATURE_GROUP_TAGS_PATH = FEATURE_GROUP_ID_PATH + "/tags{/name}{?value}";
public static final String FEATURE_GROUP_UPDATE_STATS_CONGIG = "updateStatsSettings=True";

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class);

@@ -122,4 +125,29 @@ public void deleteContent(FeatureGroup featureGroup) throws FeatureStoreExceptio
HttpPost postRequest = new HttpPost(uri);
hopsworksClient.handleRequest(postRequest);
}

public FeatureGroup updateStatsConfig(FeatureGroup featureGroup)
throws FeatureStoreException, IOException {
HopsworksClient hopsworksClient = HopsworksClient.getInstance();
String pathTemplate = PROJECT_PATH
+ FeatureStoreApi.FEATURE_STORE_PATH
+ FEATURE_GROUP_ID_PATH
+ "?" + FEATURE_GROUP_UPDATE_STATS_CONGIG;

String uri = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", featureGroup.getFeatureStore().getProjectId())
.set("fsId", featureGroup.getFeatureStore().getId())
.set("fgId", featureGroup.getId())
.expand();

String featureGroupJson = hopsworksClient.getObjectMapper().writeValueAsString(featureGroup);
HttpPut putRequest = new HttpPut(uri);
putRequest.setHeader(HttpHeaders.CONTENT_TYPE, "application/json");
putRequest.setEntity(new StringEntity(featureGroupJson));

LOGGER.info("Sending metadata request: " + uri);
LOGGER.info(featureGroupJson);

return hopsworksClient.handleRequest(putRequest, FeatureGroup.class);
}
}
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@
import com.logicalclocks.hsfs.TrainingDataset;
import lombok.NonNull;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.slf4j.Logger;
@@ -22,7 +23,7 @@ public class StatisticsApi {
public static final String ENTITY_ROOT_PATH = "{/entityType}";
public static final String ENTITY_ID_PATH = ENTITY_ROOT_PATH + "{/entityId}";
public static final String STATISTICS_PATH = ENTITY_ID_PATH + "/statistics";
public static final String STATISTICS_FILTER_COMMIT_TIME_EQ = "filter_by=commit_time_eq:{commitTime}";
public static final String FILTER_COMMIT_TIME_EQ = "filter_by=commit_time_eq:{commitTime}";
public static final String CONTENT_FIELD = "fields=content";
public static final String SORT_BY_COMMIT_TIME_DESC = "sort_by=commit_time:desc";
public static final String OFFSET = "offset={offset}";
@@ -70,11 +71,83 @@ private Statistics post(Integer projectId, Integer featurestoreId, Integer entit
return hopsworksClient.handleRequest(postRequest, Statistics.class);
}

public Statistics get(FeatureGroup featureGroup, String commitTime) throws FeatureStoreException {
public Statistics get(FeatureGroup featureGroup, String commitTime) throws FeatureStoreException, IOException {
return get(featureGroup.getFeatureStore().getProjectId(), featureGroup.getFeatureStore().getId(),
featureGroup.getId(), commitTime);
}

public Statistics get(TrainingDataset trainingDataset, String commitTime) throws FeatureStoreException, IOException {
return get(trainingDataset.getFeatureStore().getProjectId(), trainingDataset.getFeatureStore().getId(),
trainingDataset.getId(), commitTime);
}

private Statistics get(Integer projectId, Integer featurestoreId, Integer entityId, String commitTime)
throws FeatureStoreException, IOException {
HopsworksClient hopsworksClient = getInstance();
String pathTemplate = PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_PATH + STATISTICS_PATH;
String pathTemplate = PROJECT_PATH
+ FeatureStoreApi.FEATURE_STORE_PATH
+ STATISTICS_PATH + "?"
+ FILTER_COMMIT_TIME_EQ + "&"
+ CONTENT_FIELD;

String uri = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", projectId)
.set("fsId", featurestoreId)
.set("entityType", entityType.getValue())
.set("entityId", entityId)
.set("commitTime", commitTime)
.expand();

LOGGER.info("Sending metadata request: " + uri);
HttpGet getRequest = new HttpGet(uri);
Statistics statistics = hopsworksClient.handleRequest(getRequest, Statistics.class);

// currently getting multiple commits at the same time is not allowed
if (statistics.getItems().size() == 1) {
return statistics.getItems().get(0);
}
return null;
}

public Statistics getLast(FeatureGroup featureGroup) throws FeatureStoreException, IOException {
return getLast(featureGroup.getFeatureStore().getProjectId(), featureGroup.getFeatureStore().getId(),
featureGroup.getId());
}

public Statistics getLast(TrainingDataset trainingDataset) throws FeatureStoreException, IOException {
return getLast(trainingDataset.getFeatureStore().getProjectId(), trainingDataset.getFeatureStore().getId(),
trainingDataset.getId());
}

private Statistics getLast(Integer projectId, Integer featurestoreId, Integer entityId)
throws FeatureStoreException, IOException {
HopsworksClient hopsworksClient = getInstance();
String pathTemplate = PROJECT_PATH
+ FeatureStoreApi.FEATURE_STORE_PATH
+ STATISTICS_PATH + "?"
+ SORT_BY_COMMIT_TIME_DESC + "&"
+ OFFSET + "&"
+ LIMIT + "&"
+ CONTENT_FIELD;

String uri = UriTemplate.fromTemplate(pathTemplate)
.set("projectId", projectId)
.set("fsId", featurestoreId)
.set("entityType", entityType.getValue())
.set("entityId", entityId)
.set("offset", 0)
.set("limit", 1)
.expand();

LOGGER.info("Sending metadata request: " + uri);
HttpGet getRequest = new HttpGet(uri);
Statistics statistics = hopsworksClient.handleRequest(getRequest, Statistics.class);

// currently getting multiple commits at the same time is not allowed
if (statistics.getItems().size() == 1) {
return statistics.getItems().get(0);
}
return null;
}

}