From f9f8b3c1d98505565695084581408ca0c6eb511a Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Fri, 21 Aug 2020 13:52:30 +0200 Subject: [PATCH 01/10] add profile method to spark engine --- java/pom.xml | 7 ++++ .../hsfs/engine/SparkEngine.java | 34 +++++++++++++++++++ 2 files changed, 41 insertions(+) diff --git a/java/pom.xml b/java/pom.xml index 44346ab7dd..4242ea1b21 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -20,6 +20,7 @@ 1.18.6 2.6.7.1 2.4.3.2 + 1.1.0-SNAPSHOT @@ -29,6 +30,12 @@ ${lombok.version} + + com.amazon.deequ + deequ + ${deequ.version} + + org.apache.spark spark-core_2.11 diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java index 3f6e18326c..5ff6af33ef 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java @@ -16,6 +16,9 @@ package com.logicalclocks.hsfs.engine; +import com.amazon.deequ.profiles.ColumnProfilerRunBuilder; +import com.amazon.deequ.profiles.ColumnProfilerRunner; +import com.amazon.deequ.profiles.ColumnProfiles; import com.logicalclocks.hsfs.DataFormat; import com.logicalclocks.hsfs.FeatureGroup; import com.logicalclocks.hsfs.FeatureStoreException; @@ -31,6 +34,7 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; +import scala.collection.JavaConverters; import java.util.HashMap; import java.util.List; @@ -294,4 +298,34 @@ public void writeOfflineDataframe(FeatureGroup featureGroup, Dataset datase .partitionBy(utils.getPartitionColumns(featureGroup)) .saveAsTable(utils.getTableName(featureGroup)); } + + + public String profile(Dataset df, List restrictToColumns, Boolean correlation, Boolean histogram) { + // only needed for training datasets, as the backend is not setting the defaults + if (correlation == null) { + correlation = true; + } + if (histogram == null) { + histogram = true; + } + ColumnProfilerRunBuilder runner = + new ColumnProfilerRunner().onData(df).withCorrelation(correlation).withHistogram(histogram); + if (restrictToColumns != null && !restrictToColumns.isEmpty()) { + runner.restrictToColumns(JavaConverters.asScalaIteratorConverter(restrictToColumns.iterator()).asScala().toSeq()); + } + ColumnProfiles result = runner.run(); + return ColumnProfiles.toJson(result.profiles().values().toSeq()); + } + + public String profile(Dataset df, List restrictToColumns) { + return profile(df, restrictToColumns, true, true); + } + + public String profile(Dataset df, boolean correlation, boolean histogram) { + return profile(df, null, correlation, histogram); + } + + public String profile(Dataset df) { + return profile(df, null, true, true); + } } From d3fdd97cfaac1bcf23ba7a7674eaa66433810126 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Mon, 24 Aug 2020 10:30:25 +0200 Subject: [PATCH 02/10] Add statistics to java api --- .../com/logicalclocks/hsfs/FeatureGroup.java | 48 ++++++++++++- .../hsfs/engine/FeatureGroupEngine.java | 11 ++- .../hsfs/engine/SparkEngine.java | 1 - .../hsfs/engine/StatisticsEngine.java | 36 ++++++++++ .../hsfs/metadata/Statistics.java | 19 ++++++ .../hsfs/metadata/StatisticsApi.java | 68 +++++++++++++++++++ 6 files changed, 174 insertions(+), 9 deletions(-) create mode 100644 java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java create mode 100644 java/src/main/java/com/logicalclocks/hsfs/metadata/Statistics.java create mode 100644 java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java diff --git a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java index ef8493dcbf..002aa63746 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java +++ b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java @@ -18,8 +18,11 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; import com.logicalclocks.hsfs.engine.FeatureGroupEngine; +import com.logicalclocks.hsfs.engine.StatisticsEngine; import com.logicalclocks.hsfs.metadata.Query; +import com.logicalclocks.hsfs.metadata.Statistics; import lombok.Builder; import lombok.Getter; import lombok.NonNull; @@ -27,6 +30,8 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Date; @@ -69,6 +74,21 @@ public class FeatureGroup { @Getter @Setter private String type = "cachedFeaturegroupDTO"; + @Getter @Setter + @JsonProperty("descStatsEnabled") + private Boolean statisticsEnabled; + + @Getter @Setter + @JsonProperty("featHistEnabled") + private Boolean histograms; + + @Getter @Setter + @JsonProperty("featCorrEnabled") + private Boolean correlations; + + @Getter @Setter + private List statisticColumns; + @JsonIgnore // These are only used in the client. In the server they are aggregated in the `features` field private List primaryKeys; @@ -78,11 +98,15 @@ public class FeatureGroup { private List partitionKeys; private FeatureGroupEngine featureGroupEngine = new FeatureGroupEngine(); + private StatisticsEngine statisticsEngine = new StatisticsEngine(EntityEndpointType.FEATURE_GROUP); + + private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroup.class); @Builder public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description, - List primaryKeys, List partitionKeys, - boolean onlineEnabled, Storage defaultStorage, List features) + List primaryKeys, List partitionKeys, boolean onlineEnabled, + Storage defaultStorage, List features, Boolean statisticsEnabled, Boolean histograms, + Boolean correlations, List statisticColumns) throws FeatureStoreException { this.featureStore = featureStore; @@ -94,6 +118,10 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver this.onlineEnabled = onlineEnabled; this.defaultStorage = defaultStorage != null ? defaultStorage : Storage.OFFLINE; this.features = features; + this.statisticsEnabled = statisticsEnabled; + this.histograms = histograms; + this.correlations = correlations; + this.statisticColumns = statisticColumns; } public FeatureGroup() { @@ -137,6 +165,9 @@ public void save(Dataset featureData) throws FeatureStoreException, IOExcep public void save(Dataset featureData, Map writeOptions) throws FeatureStoreException, IOException { featureGroupEngine.saveFeatureGroup(this, featureData, primaryKeys, partitionKeys, defaultStorage, writeOptions); + if (statisticsEnabled) { + statisticsEngine.computeStatistics(this, featureData); + } } public void insert(Dataset featureData, Storage storage) throws IOException, FeatureStoreException { @@ -161,12 +192,25 @@ public void insert(Dataset featureData, Storage storage, boolean overwrite, throws FeatureStoreException, IOException { featureGroupEngine.saveDataframe(this, featureData, storage, overwrite ? SaveMode.Overwrite : SaveMode.Append, writeOptions); + computeStatistics(); } public void delete() throws FeatureStoreException, IOException { featureGroupEngine.delete(this); } + public Statistics computeStatistics() throws FeatureStoreException, IOException { + if (statisticsEnabled) { + if (defaultStorage == Storage.ALL || defaultStorage == Storage.OFFLINE) { + return statisticsEngine.computeStatistics(this, read(Storage.OFFLINE)); + } else { + LOGGER.info("StorageWarning: The default storage of feature group `" + name + "`, with version `" + version + + "`, is `" + defaultStorage + "`. Statistics are only computed for default storage `offline and `all`."); + } + } + return null; + } + /** * Add a tag without value to the feature group. * diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java index 8d7f5c2667..936701085b 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java @@ -18,14 +18,12 @@ import com.logicalclocks.hsfs.EntityEndpointType; import com.logicalclocks.hsfs.FeatureGroup; -import com.logicalclocks.hsfs.FeatureStore; import com.logicalclocks.hsfs.FeatureStoreException; import com.logicalclocks.hsfs.Storage; import com.logicalclocks.hsfs.StorageConnector; import com.logicalclocks.hsfs.metadata.StorageConnectorApi; import com.logicalclocks.hsfs.metadata.FeatureGroupApi; import com.logicalclocks.hsfs.metadata.TagsApi; -import com.logicalclocks.hsfs.util.Constants; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; @@ -33,7 +31,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.HashMap; import java.util.List; import java.util.Map; @@ -46,9 +43,6 @@ public class FeatureGroupEngine { private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupEngine.class); - //TODO: - // Compute statistics - /** * Create the metadata and write the data to the online/offline feature store. * @@ -102,6 +96,11 @@ public void saveFeatureGroup(FeatureGroup featureGroup, Dataset dataset, // Update the original object - Hopsworks returns the incremented version featureGroup.setVersion(apiFG.getVersion()); + featureGroup.setId(apiFG.getId()); + featureGroup.setStatisticsEnabled(apiFG.getStatisticsEnabled()); + featureGroup.setCorrelations(apiFG.getCorrelations()); + featureGroup.setHistograms(apiFG.getHistograms()); + featureGroup.setStatisticColumns(apiFG.getStatisticColumns()); // Write the dataframe saveDataframe(featureGroup, dataset, storage, SaveMode.Append, writeOptions); diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java index 5ff6af33ef..472f89966c 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/SparkEngine.java @@ -299,7 +299,6 @@ public void writeOfflineDataframe(FeatureGroup featureGroup, Dataset datase .saveAsTable(utils.getTableName(featureGroup)); } - public String profile(Dataset df, List restrictToColumns, Boolean correlation, Boolean histogram) { // only needed for training datasets, as the backend is not setting the defaults if (correlation == null) { diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java new file mode 100644 index 0000000000..2a513d1d43 --- /dev/null +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java @@ -0,0 +1,36 @@ +package com.logicalclocks.hsfs.engine; + +import com.logicalclocks.hsfs.EntityEndpointType; +import com.logicalclocks.hsfs.FeatureGroup; +import com.logicalclocks.hsfs.FeatureStoreException; +import com.logicalclocks.hsfs.metadata.Statistics; +import com.logicalclocks.hsfs.metadata.StatisticsApi; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + +public class StatisticsEngine { + + private StatisticsApi statisticsApi; + + private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsEngine.class); + + public StatisticsEngine(EntityEndpointType entityType) { + this.statisticsApi = new StatisticsApi(entityType); + } + + public Statistics computeStatistics(FeatureGroup featureGroup, Dataset dataFrame) + throws FeatureStoreException, IOException { + String commitTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")); + + String content = SparkEngine.getInstance().profile(dataFrame, featureGroup.getStatisticColumns(), + featureGroup.getHistograms(), featureGroup.getCorrelations()); + return statisticsApi.post(featureGroup, new Statistics(commitTime, content)); + } + +} diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/Statistics.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/Statistics.java new file mode 100644 index 0000000000..d96345357e --- /dev/null +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/Statistics.java @@ -0,0 +1,19 @@ +package com.logicalclocks.hsfs.metadata; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@JsonIgnoreProperties(ignoreUnknown = true) +@NoArgsConstructor +@AllArgsConstructor +public class Statistics extends RestDto { + + @Getter @Setter + private String commitTime; + + @Getter @Setter + private String content; +} diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java new file mode 100644 index 0000000000..a222b5cc41 --- /dev/null +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java @@ -0,0 +1,68 @@ +package com.logicalclocks.hsfs.metadata; + +import com.damnhandy.uri.template.UriTemplate; +import com.logicalclocks.hsfs.EntityEndpointType; +import com.logicalclocks.hsfs.FeatureGroup; +import com.logicalclocks.hsfs.FeatureStoreException; +import com.logicalclocks.hsfs.TrainingDataset; +import lombok.NonNull; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +import static com.logicalclocks.hsfs.metadata.HopsworksClient.PROJECT_PATH; +import static com.logicalclocks.hsfs.metadata.HopsworksClient.getInstance; + +public class StatisticsApi { + + public static final String ENTITY_ROOT_PATH = "{/entityType}"; + public static final String ENTITY_ID_PATH = ENTITY_ROOT_PATH + "{/entityId}"; + public static final String STATISTICS_PATH = ENTITY_ID_PATH + "/statistics"; + + private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsApi.class); + + private EntityEndpointType entityType; + + public StatisticsApi(@NonNull EntityEndpointType entityType) { + this.entityType = entityType; + } + + public Statistics post(FeatureGroup featureGroup, Statistics statistics) throws FeatureStoreException, IOException { + return post(featureGroup.getFeatureStore().getProjectId(), featureGroup.getFeatureStore().getId(), + featureGroup.getId(), statistics); + } + + public Statistics post(TrainingDataset trainingDataset, Statistics statistics) + throws FeatureStoreException, IOException { + return post(trainingDataset.getFeatureStore().getProjectId(), trainingDataset.getFeatureStore().getId(), + trainingDataset.getId(), statistics); + } + + private Statistics post(Integer projectId, Integer featurestoreId, Integer entityId, Statistics statistics) + throws FeatureStoreException, IOException { + HopsworksClient hopsworksClient = getInstance(); + String pathTemplate = PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_PATH + STATISTICS_PATH; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", projectId) + .set("fsId", featurestoreId) + .set("entityType", entityType.getValue()) + .set("entityId", entityId) + .expand(); + + String statisticsJson = hopsworksClient.getObjectMapper().writeValueAsString(statistics); + HttpPost postRequest = new HttpPost(uri); + postRequest.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + postRequest.setEntity(new StringEntity(statisticsJson)); + + LOGGER.info("Sending metadata request: " + uri); + LOGGER.info(statisticsJson); + + return hopsworksClient.handleRequest(postRequest, Statistics.class); + } + +} From ad85913959fd9b113ea65931b4944b40690fb24c Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Mon, 24 Aug 2020 13:36:04 +0200 Subject: [PATCH 03/10] training dataset --- .../com/logicalclocks/hsfs/FeatureGroup.java | 7 +++ .../logicalclocks/hsfs/TrainingDataset.java | 57 +++++++++++++++++-- .../hsfs/engine/StatisticsEngine.java | 21 +++++-- 3 files changed, 75 insertions(+), 10 deletions(-) diff --git a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java index 002aa63746..8e400e00ba 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java +++ b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java @@ -200,6 +200,13 @@ public void delete() throws FeatureStoreException, IOException { } public Statistics computeStatistics() throws FeatureStoreException, IOException { + /** + * Recompute the statistics for the feature group and save them to the feature store. + * + * @return statistics object of computed statistics + * @throws FeatureStoreException + * @throws IOException + */ if (statisticsEnabled) { if (defaultStorage == Storage.ALL || defaultStorage == Storage.OFFLINE) { return statisticsEngine.computeStatistics(this, read(Storage.OFFLINE)); diff --git a/java/src/main/java/com/logicalclocks/hsfs/TrainingDataset.java b/java/src/main/java/com/logicalclocks/hsfs/TrainingDataset.java index d985d64713..da06ebbd56 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/TrainingDataset.java +++ b/java/src/main/java/com/logicalclocks/hsfs/TrainingDataset.java @@ -17,8 +17,11 @@ package com.logicalclocks.hsfs; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.logicalclocks.hsfs.engine.StatisticsEngine; import com.logicalclocks.hsfs.engine.TrainingDatasetEngine; import com.logicalclocks.hsfs.metadata.Query; +import com.logicalclocks.hsfs.metadata.Statistics; import lombok.Builder; import lombok.Getter; import lombok.NoArgsConstructor; @@ -75,13 +78,30 @@ public class TrainingDataset { @Getter @Setter private List splits; + @Getter @Setter + @JsonIgnore + private Boolean statisticsEnabled = true; + + @Getter @Setter + @JsonIgnore + private Boolean histograms; + + @Getter @Setter + @JsonIgnore + private Boolean correlations; + + @Getter @Setter + @JsonIgnore + private List statisticColumns; + private TrainingDatasetEngine trainingDatasetEngine = new TrainingDatasetEngine(); + private StatisticsEngine statisticsEngine = new StatisticsEngine(EntityEndpointType.TRAINING_DATASET); @Builder - public TrainingDataset(@NonNull String name, Integer version, String description, - DataFormat dataFormat, StorageConnector storageConnector, - String location, List splits, Long seed, - FeatureStore featureStore) { + public TrainingDataset(@NonNull String name, Integer version, String description, DataFormat dataFormat, + StorageConnector storageConnector, String location, List splits, Long seed, + FeatureStore featureStore, Boolean statisticsEnabled, Boolean histograms, + Boolean correlations, List statisticColumns) { this.name = name; this.version = version; this.description = description; @@ -100,6 +120,10 @@ public TrainingDataset(@NonNull String name, Integer version, String description this.splits = splits; this.seed = seed; this.featureStore = featureStore; + this.statisticsEnabled = statisticsEnabled; + this.histograms = histograms; + this.correlations = correlations; + this.statisticColumns = statisticColumns; } /** @@ -133,7 +157,11 @@ public void save(Dataset dataset) throws FeatureStoreException, IOException * @throws IOException */ public void save(Query query, Map writeOptions) throws FeatureStoreException, IOException { - trainingDatasetEngine.save(this, query.read(), writeOptions); + Dataset dataset = query.read(); + trainingDatasetEngine.save(this, dataset, writeOptions); + if (statisticsEnabled) { + statisticsEngine.computeStatistics(this, dataset); + } } /** @@ -147,6 +175,9 @@ public void save(Query query, Map writeOptions) throws FeatureSt public void save(Dataset dataset, Map writeOptions) throws FeatureStoreException, IOException { trainingDatasetEngine.save(this, dataset, writeOptions); + if (statisticsEnabled) { + statisticsEngine.computeStatistics(this, dataset); + } } /** @@ -186,6 +217,7 @@ public void insert(Query query, boolean overwrite, Map writeOpti throws FeatureStoreException, IOException { trainingDatasetEngine.insert(this, query.read(), writeOptions, overwrite ? SaveMode.Overwrite : SaveMode.Append); + computeStatistics(); } /** @@ -201,6 +233,7 @@ public void insert(Dataset dataset, boolean overwrite, Map throws FeatureStoreException, IOException { trainingDatasetEngine.insert(this, dataset, writeOptions, overwrite ? SaveMode.Overwrite : SaveMode.Append); + computeStatistics(); } /** @@ -253,6 +286,20 @@ public void show(int numRows) { read("").show(numRows); } + /** + * Recompute the statistics for the entire training dataset and save them to the feature store. + * + * @return statistics object of computed statistics + * @throws FeatureStoreException + * @throws IOException + */ + public Statistics computeStatistics() throws FeatureStoreException, IOException { + if (statisticsEnabled) { + return statisticsEngine.computeStatistics(this, read()); + } + return null; + } + /** * Add a tag without value to the training dataset. * diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java index 2a513d1d43..67c7fcc997 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java @@ -3,6 +3,7 @@ import com.logicalclocks.hsfs.EntityEndpointType; import com.logicalclocks.hsfs.FeatureGroup; import com.logicalclocks.hsfs.FeatureStoreException; +import com.logicalclocks.hsfs.TrainingDataset; import com.logicalclocks.hsfs.metadata.Statistics; import com.logicalclocks.hsfs.metadata.StatisticsApi; import org.apache.spark.sql.Dataset; @@ -13,6 +14,7 @@ import java.io.IOException; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; +import java.util.List; public class StatisticsEngine { @@ -24,13 +26,22 @@ public StatisticsEngine(EntityEndpointType entityType) { this.statisticsApi = new StatisticsApi(entityType); } - public Statistics computeStatistics(FeatureGroup featureGroup, Dataset dataFrame) + public Statistics computeStatistics(TrainingDataset trainingDataset, Dataset dataFrame) throws FeatureStoreException, IOException { - String commitTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")); + return statisticsApi.post(trainingDataset, computeStatistics(dataFrame, trainingDataset.getStatisticColumns(), + trainingDataset.getHistograms(), trainingDataset.getCorrelations())); + } - String content = SparkEngine.getInstance().profile(dataFrame, featureGroup.getStatisticColumns(), - featureGroup.getHistograms(), featureGroup.getCorrelations()); - return statisticsApi.post(featureGroup, new Statistics(commitTime, content)); + public Statistics computeStatistics(FeatureGroup featureGroup, Dataset dataFrame) + throws FeatureStoreException, IOException { + return statisticsApi.post(featureGroup, computeStatistics(dataFrame, featureGroup.getStatisticColumns(), + featureGroup.getHistograms(), featureGroup.getCorrelations())); } + private Statistics computeStatistics(Dataset dataFrame, List statisticColumns, Boolean histograms, + Boolean correlations) { + String commitTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss")); + String content = SparkEngine.getInstance().profile(dataFrame, statisticColumns, histograms, correlations); + return new Statistics(commitTime, content); + } } From 98bc08a74c858cc3ca1f6138edb720f6893c662d Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 25 Aug 2020 16:30:16 +0200 Subject: [PATCH 04/10] progress get stats java --- .../logicalclocks/hsfs/metadata/StatisticsApi.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java index a222b5cc41..4c35c2409f 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java @@ -22,6 +22,11 @@ public class StatisticsApi { public static final String ENTITY_ROOT_PATH = "{/entityType}"; public static final String ENTITY_ID_PATH = ENTITY_ROOT_PATH + "{/entityId}"; public static final String STATISTICS_PATH = ENTITY_ID_PATH + "/statistics"; + public static final String STATISTICS_FILTER_COMMIT_TIME_EQ = "filter_by=commit_time_eq:{commitTime}"; + public static final String CONTENT_FIELD = "fields=content"; + public static final String SORT_BY_COMMIT_TIME_DESC = "sort_by=commit_time:desc"; + public static final String OFFSET = "offset={offset}"; + public static final String LIMIT = "limit={limit}"; private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsApi.class); @@ -65,4 +70,11 @@ private Statistics post(Integer projectId, Integer featurestoreId, Integer entit return hopsworksClient.handleRequest(postRequest, Statistics.class); } + public Statistics get(FeatureGroup featureGroup, String commitTime) throws FeatureStoreException { + HopsworksClient hopsworksClient = getInstance(); + String pathTemplate = PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_PATH + STATISTICS_PATH; + + + } + } From 6316fe769c0a9066a674a7e9680810118e88c35e Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Wed, 26 Aug 2020 13:53:45 +0200 Subject: [PATCH 05/10] finalize java stats --- .../com/logicalclocks/hsfs/FeatureGroup.java | 51 ++++++++++-- .../logicalclocks/hsfs/TrainingDataset.java | 23 ++++++ .../hsfs/engine/FeatureGroupEngine.java | 8 ++ .../hsfs/engine/StatisticsEngine.java | 16 ++++ .../hsfs/metadata/FeatureGroupApi.java | 30 ++++++- .../hsfs/metadata/StatisticsApi.java | 79 ++++++++++++++++++- 6 files changed, 196 insertions(+), 11 deletions(-) diff --git a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java index 8e400e00ba..025a7107aa 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java +++ b/java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java @@ -199,14 +199,26 @@ public void delete() throws FeatureStoreException, IOException { featureGroupEngine.delete(this); } + /** + * Update the statistics configuration of the feature group. + * Change the `statisticsEnabled`, `histograms`, `correlations` or `statisticColumns` attributes and persist + * the changes by calling this method. + * + * @throws FeatureStoreException + * @throws IOException + */ + public void updateStatisticsConfig() throws FeatureStoreException, IOException { + featureGroupEngine.updateStatisticsConfig(this); + } + + /** + * Recompute the statistics for the feature group and save them to the feature store. + * + * @return statistics object of computed statistics + * @throws FeatureStoreException + * @throws IOException + */ public Statistics computeStatistics() throws FeatureStoreException, IOException { - /** - * Recompute the statistics for the feature group and save them to the feature store. - * - * @return statistics object of computed statistics - * @throws FeatureStoreException - * @throws IOException - */ if (statisticsEnabled) { if (defaultStorage == Storage.ALL || defaultStorage == Storage.OFFLINE) { return statisticsEngine.computeStatistics(this, read(Storage.OFFLINE)); @@ -218,6 +230,31 @@ public Statistics computeStatistics() throws FeatureStoreException, IOException return null; } + /** + * Get the last statistics commit for the feature group. + * + * @return statistics object of latest commit + * @throws FeatureStoreException + * @throws IOException + */ + @JsonIgnore + public Statistics getStatistics() throws FeatureStoreException, IOException { + return statisticsEngine.getLast(this); + } + + /** + * Get the statistics of a specific commit time for the feature group. + * + * @param commitTime commit time in the format "YYYYMMDDhhmmss" + * @return statistics object for the commit time + * @throws FeatureStoreException + * @throws IOException + */ + @JsonIgnore + public Statistics getStatistics(String commitTime) throws FeatureStoreException, IOException { + return statisticsEngine.get(this, commitTime); + } + /** * Add a tag without value to the feature group. * diff --git a/java/src/main/java/com/logicalclocks/hsfs/TrainingDataset.java b/java/src/main/java/com/logicalclocks/hsfs/TrainingDataset.java index da06ebbd56..6b2b1b2306 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/TrainingDataset.java +++ b/java/src/main/java/com/logicalclocks/hsfs/TrainingDataset.java @@ -300,6 +300,29 @@ public Statistics computeStatistics() throws FeatureStoreException, IOException return null; } + /** + * Get the last statistics commit for the training dataset. + * + * @return statistics object of latest commit + * @throws FeatureStoreException + * @throws IOException + */ + public Statistics getStatistics() throws FeatureStoreException, IOException { + return statisticsEngine.getLast(this); + } + + /** + * Get the statistics of a specific commit time for the training dataset. + * + * @param commitTime commit time in the format "YYYYMMDDhhmmss" + * @return statistics object for the commit time + * @throws FeatureStoreException + * @throws IOException + */ + public Statistics getStatistics(String commitTime) throws FeatureStoreException, IOException { + return statisticsEngine.get(this, commitTime); + } + /** * Add a tag without value to the training dataset. * diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java index 936701085b..35630dc36c 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java @@ -178,4 +178,12 @@ public Map getTag(FeatureGroup featureGroup, String name) throws public void deleteTag(FeatureGroup featureGroup, String name) throws FeatureStoreException, IOException { tagsApi.deleteTag(featureGroup, name); } + + public void updateStatisticsConfig(FeatureGroup featureGroup) throws FeatureStoreException, IOException { + FeatureGroup apiFG = featureGroupApi.updateStatsConfig(featureGroup); + featureGroup.setStatisticsEnabled(apiFG.getStatisticsEnabled()); + featureGroup.setCorrelations(apiFG.getCorrelations()); + featureGroup.setHistograms(apiFG.getHistograms()); + featureGroup.setStatisticColumns(apiFG.getStatisticColumns()); + } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java index 67c7fcc997..520cd98699 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java @@ -44,4 +44,20 @@ private Statistics computeStatistics(Dataset dataFrame, List statis String content = SparkEngine.getInstance().profile(dataFrame, statisticColumns, histograms, correlations); return new Statistics(commitTime, content); } + + public Statistics get(FeatureGroup featureGroup, String commitTime) throws FeatureStoreException, IOException { + return statisticsApi.get(featureGroup, commitTime); + } + + public Statistics get(TrainingDataset trainingDataset, String commitTime) throws FeatureStoreException, IOException { + return statisticsApi.get(trainingDataset, commitTime); + } + + public Statistics getLast(FeatureGroup featureGroup) throws FeatureStoreException, IOException { + return statisticsApi.getLast(featureGroup); + } + + public Statistics getLast(TrainingDataset trainingDataset) throws FeatureStoreException, IOException { + return statisticsApi.getLast(trainingDataset); + } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java index 6bab0571ec..5487973e83 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java @@ -17,6 +17,7 @@ package com.logicalclocks.hsfs.metadata; import com.damnhandy.uri.template.UriTemplate; +import com.fasterxml.jackson.core.JsonProcessingException; import com.logicalclocks.hsfs.FeatureGroup; import com.logicalclocks.hsfs.FeatureStore; import com.logicalclocks.hsfs.FeatureStoreException; @@ -24,11 +25,13 @@ import org.apache.http.client.methods.HttpGet; import org.apache.http.HttpHeaders; import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; import org.apache.http.entity.StringEntity; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UnsupportedEncodingException; import static com.logicalclocks.hsfs.metadata.HopsworksClient.PROJECT_PATH; @@ -38,7 +41,7 @@ public class FeatureGroupApi { public static final String FEATURE_GROUP_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgName}{?version}"; public static final String FEATURE_GROUP_ID_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgId}"; public static final String FEATURE_GROUP_CLEAR_PATH = FEATURE_GROUP_ID_PATH + "/clear"; - public static final String FEATURE_GROUP_TAGS_PATH = FEATURE_GROUP_ID_PATH + "/tags{/name}{?value}"; + public static final String FEATURE_GROUP_UPDATE_STATS_CONGIG = "updateStatsSettings=True"; private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class); @@ -122,4 +125,29 @@ public void deleteContent(FeatureGroup featureGroup) throws FeatureStoreExceptio HttpPost postRequest = new HttpPost(uri); hopsworksClient.handleRequest(postRequest); } + + public FeatureGroup updateStatsConfig(FeatureGroup featureGroup) + throws FeatureStoreException, IOException { + HopsworksClient hopsworksClient = HopsworksClient.getInstance(); + String pathTemplate = PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + FEATURE_GROUP_ID_PATH + + "?" + FEATURE_GROUP_UPDATE_STATS_CONGIG; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", featureGroup.getFeatureStore().getProjectId()) + .set("fsId", featureGroup.getFeatureStore().getId()) + .set("fgId", featureGroup.getId()) + .expand(); + + String featureGroupJson = hopsworksClient.getObjectMapper().writeValueAsString(featureGroup); + HttpPut putRequest = new HttpPut(uri); + putRequest.setHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + putRequest.setEntity(new StringEntity(featureGroupJson)); + + LOGGER.info("Sending metadata request: " + uri); + LOGGER.info(featureGroupJson); + + return hopsworksClient.handleRequest(putRequest, FeatureGroup.class); + } } diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java index 4c35c2409f..7b09abf5c2 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java @@ -7,6 +7,7 @@ import com.logicalclocks.hsfs.TrainingDataset; import lombok.NonNull; import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.slf4j.Logger; @@ -22,7 +23,7 @@ public class StatisticsApi { public static final String ENTITY_ROOT_PATH = "{/entityType}"; public static final String ENTITY_ID_PATH = ENTITY_ROOT_PATH + "{/entityId}"; public static final String STATISTICS_PATH = ENTITY_ID_PATH + "/statistics"; - public static final String STATISTICS_FILTER_COMMIT_TIME_EQ = "filter_by=commit_time_eq:{commitTime}"; + public static final String FILTER_COMMIT_TIME_EQ = "filter_by=commit_time_eq:{commitTime}"; public static final String CONTENT_FIELD = "fields=content"; public static final String SORT_BY_COMMIT_TIME_DESC = "sort_by=commit_time:desc"; public static final String OFFSET = "offset={offset}"; @@ -70,11 +71,83 @@ private Statistics post(Integer projectId, Integer featurestoreId, Integer entit return hopsworksClient.handleRequest(postRequest, Statistics.class); } - public Statistics get(FeatureGroup featureGroup, String commitTime) throws FeatureStoreException { + public Statistics get(FeatureGroup featureGroup, String commitTime) throws FeatureStoreException, IOException { + return get(featureGroup.getFeatureStore().getProjectId(), featureGroup.getFeatureStore().getId(), + featureGroup.getId(), commitTime); + } + + public Statistics get(TrainingDataset trainingDataset, String commitTime) throws FeatureStoreException, IOException { + return get(trainingDataset.getFeatureStore().getProjectId(), trainingDataset.getFeatureStore().getId(), + trainingDataset.getId(), commitTime); + } + + private Statistics get(Integer projectId, Integer featurestoreId, Integer entityId, String commitTime) + throws FeatureStoreException, IOException { HopsworksClient hopsworksClient = getInstance(); - String pathTemplate = PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_PATH + STATISTICS_PATH; + String pathTemplate = PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + STATISTICS_PATH + "?" + + FILTER_COMMIT_TIME_EQ + "&" + + CONTENT_FIELD; + + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", projectId) + .set("fsId", featurestoreId) + .set("entityType", entityType.getValue()) + .set("entityId", entityId) + .set("commitTime", commitTime) + .expand(); + + LOGGER.info("Sending metadata request: " + uri); + HttpGet getRequest = new HttpGet(uri); + Statistics statistics = hopsworksClient.handleRequest(getRequest, Statistics.class); + + // currently getting multiple commits at the same time is not allowed + if (statistics.getItems().size() == 1) { + return statistics.getItems().get(0); + } + return null; + } + + public Statistics getLast(FeatureGroup featureGroup) throws FeatureStoreException, IOException { + return getLast(featureGroup.getFeatureStore().getProjectId(), featureGroup.getFeatureStore().getId(), + featureGroup.getId()); + } + + public Statistics getLast(TrainingDataset trainingDataset) throws FeatureStoreException, IOException { + return getLast(trainingDataset.getFeatureStore().getProjectId(), trainingDataset.getFeatureStore().getId(), + trainingDataset.getId()); + } + private Statistics getLast(Integer projectId, Integer featurestoreId, Integer entityId) + throws FeatureStoreException, IOException { + HopsworksClient hopsworksClient = getInstance(); + String pathTemplate = PROJECT_PATH + + FeatureStoreApi.FEATURE_STORE_PATH + + STATISTICS_PATH + "?" + + SORT_BY_COMMIT_TIME_DESC + "&" + + OFFSET + "&" + + LIMIT + "&" + + CONTENT_FIELD; + String uri = UriTemplate.fromTemplate(pathTemplate) + .set("projectId", projectId) + .set("fsId", featurestoreId) + .set("entityType", entityType.getValue()) + .set("entityId", entityId) + .set("offset", 0) + .set("limit", 1) + .expand(); + + LOGGER.info("Sending metadata request: " + uri); + HttpGet getRequest = new HttpGet(uri); + Statistics statistics = hopsworksClient.handleRequest(getRequest, Statistics.class); + + // currently getting multiple commits at the same time is not allowed + if (statistics.getItems().size() == 1) { + return statistics.getItems().get(0); + } + return null; } } From 2a93b4f276e917ef2781f1067dfe950a67a1f769 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Mon, 31 Aug 2020 20:20:48 +0200 Subject: [PATCH 06/10] Add missing copyright headers --- .../hsfs/engine/StatisticsEngine.java | 16 ++++++++++++++++ .../logicalclocks/hsfs/metadata/Statistics.java | 16 ++++++++++++++++ .../hsfs/metadata/StatisticsApi.java | 16 ++++++++++++++++ 3 files changed, 48 insertions(+) diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java index 520cd98699..6dfea8098c 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/StatisticsEngine.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2020 Logical Clocks AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and limitations under the License. + */ + package com.logicalclocks.hsfs.engine; import com.logicalclocks.hsfs.EntityEndpointType; diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/Statistics.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/Statistics.java index d96345357e..8159645054 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/Statistics.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/Statistics.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2020 Logical Clocks AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and limitations under the License. + */ + package com.logicalclocks.hsfs.metadata; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java index 7b09abf5c2..8624974c08 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java @@ -1,3 +1,19 @@ +/* + * Copyright (c) 2020 Logical Clocks AB + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * + * See the License for the specific language governing permissions and limitations under the License. + */ + package com.logicalclocks.hsfs.metadata; import com.damnhandy.uri.template.UriTemplate; From 450e45020c63792a35c7fe8c64d717925a55733c Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 1 Sep 2020 09:10:39 +0200 Subject: [PATCH 07/10] remove unused imports from featuregroupapi --- .../java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java index 5487973e83..8be7aebef8 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java @@ -17,7 +17,6 @@ package com.logicalclocks.hsfs.metadata; import com.damnhandy.uri.template.UriTemplate; -import com.fasterxml.jackson.core.JsonProcessingException; import com.logicalclocks.hsfs.FeatureGroup; import com.logicalclocks.hsfs.FeatureStore; import com.logicalclocks.hsfs.FeatureStoreException; @@ -31,7 +30,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.UnsupportedEncodingException; import static com.logicalclocks.hsfs.metadata.HopsworksClient.PROJECT_PATH; From 0e2c756710a6b53ce71fee28313d161d8ef25d58 Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 1 Sep 2020 10:14:39 +0200 Subject: [PATCH 08/10] use UriTemplate to set query params --- .../hsfs/metadata/FeatureGroupApi.java | 7 +++--- .../hsfs/metadata/StatisticsApi.java | 22 ++++++------------- 2 files changed, 10 insertions(+), 19 deletions(-) diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java index 8be7aebef8..372aa9c3ce 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/FeatureGroupApi.java @@ -37,9 +37,8 @@ public class FeatureGroupApi { public static final String FEATURE_GROUP_ROOT_PATH = "/featuregroups"; public static final String FEATURE_GROUP_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgName}{?version}"; - public static final String FEATURE_GROUP_ID_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgId}"; + public static final String FEATURE_GROUP_ID_PATH = FEATURE_GROUP_ROOT_PATH + "{/fgId}{?updateStatsSettings}"; public static final String FEATURE_GROUP_CLEAR_PATH = FEATURE_GROUP_ID_PATH + "/clear"; - public static final String FEATURE_GROUP_UPDATE_STATS_CONGIG = "updateStatsSettings=True"; private static final Logger LOGGER = LoggerFactory.getLogger(FeatureGroupApi.class); @@ -129,13 +128,13 @@ public FeatureGroup updateStatsConfig(FeatureGroup featureGroup) HopsworksClient hopsworksClient = HopsworksClient.getInstance(); String pathTemplate = PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_PATH - + FEATURE_GROUP_ID_PATH - + "?" + FEATURE_GROUP_UPDATE_STATS_CONGIG; + + FEATURE_GROUP_ID_PATH; String uri = UriTemplate.fromTemplate(pathTemplate) .set("projectId", featureGroup.getFeatureStore().getProjectId()) .set("fsId", featureGroup.getFeatureStore().getId()) .set("fgId", featureGroup.getId()) + .set("updateStatsSettings", true) .expand(); String featureGroupJson = hopsworksClient.getObjectMapper().writeValueAsString(featureGroup); diff --git a/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java b/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java index 8624974c08..d20c293248 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java +++ b/java/src/main/java/com/logicalclocks/hsfs/metadata/StatisticsApi.java @@ -38,12 +38,7 @@ public class StatisticsApi { public static final String ENTITY_ROOT_PATH = "{/entityType}"; public static final String ENTITY_ID_PATH = ENTITY_ROOT_PATH + "{/entityId}"; - public static final String STATISTICS_PATH = ENTITY_ID_PATH + "/statistics"; - public static final String FILTER_COMMIT_TIME_EQ = "filter_by=commit_time_eq:{commitTime}"; - public static final String CONTENT_FIELD = "fields=content"; - public static final String SORT_BY_COMMIT_TIME_DESC = "sort_by=commit_time:desc"; - public static final String OFFSET = "offset={offset}"; - public static final String LIMIT = "limit={limit}"; + public static final String STATISTICS_PATH = ENTITY_ID_PATH + "/statistics{?filter_by,fields,sort_by,offset,limit}"; private static final Logger LOGGER = LoggerFactory.getLogger(StatisticsApi.class); @@ -102,16 +97,15 @@ private Statistics get(Integer projectId, Integer featurestoreId, Integer entity HopsworksClient hopsworksClient = getInstance(); String pathTemplate = PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_PATH - + STATISTICS_PATH + "?" - + FILTER_COMMIT_TIME_EQ + "&" - + CONTENT_FIELD; + + STATISTICS_PATH; String uri = UriTemplate.fromTemplate(pathTemplate) .set("projectId", projectId) .set("fsId", featurestoreId) .set("entityType", entityType.getValue()) .set("entityId", entityId) - .set("commitTime", commitTime) + .set("filter_by", "commit_time_eq:" + commitTime) + .set("fields", "content") .expand(); LOGGER.info("Sending metadata request: " + uri); @@ -140,19 +134,17 @@ private Statistics getLast(Integer projectId, Integer featurestoreId, Integer en HopsworksClient hopsworksClient = getInstance(); String pathTemplate = PROJECT_PATH + FeatureStoreApi.FEATURE_STORE_PATH - + STATISTICS_PATH + "?" - + SORT_BY_COMMIT_TIME_DESC + "&" - + OFFSET + "&" - + LIMIT + "&" - + CONTENT_FIELD; + + STATISTICS_PATH; String uri = UriTemplate.fromTemplate(pathTemplate) .set("projectId", projectId) .set("fsId", featurestoreId) .set("entityType", entityType.getValue()) .set("entityId", entityId) + .set("sort_by", "commit_time:desc") .set("offset", 0) .set("limit", 1) + .set("fields", "content") .expand(); LOGGER.info("Sending metadata request: " + uri); From 9776866675fa9bfc1f3e9689bc29be2b1ab0417f Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Tue, 1 Sep 2020 10:20:54 +0200 Subject: [PATCH 09/10] remove setting statsenabled and columns after api call --- .../com/logicalclocks/hsfs/engine/FeatureGroupEngine.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java index 35630dc36c..7d53647ab3 100644 --- a/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java +++ b/java/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupEngine.java @@ -97,10 +97,8 @@ public void saveFeatureGroup(FeatureGroup featureGroup, Dataset dataset, // Update the original object - Hopsworks returns the incremented version featureGroup.setVersion(apiFG.getVersion()); featureGroup.setId(apiFG.getId()); - featureGroup.setStatisticsEnabled(apiFG.getStatisticsEnabled()); featureGroup.setCorrelations(apiFG.getCorrelations()); featureGroup.setHistograms(apiFG.getHistograms()); - featureGroup.setStatisticColumns(apiFG.getStatisticColumns()); // Write the dataframe saveDataframe(featureGroup, dataset, storage, SaveMode.Append, writeOptions); @@ -181,9 +179,7 @@ public void deleteTag(FeatureGroup featureGroup, String name) throws FeatureStor public void updateStatisticsConfig(FeatureGroup featureGroup) throws FeatureStoreException, IOException { FeatureGroup apiFG = featureGroupApi.updateStatsConfig(featureGroup); - featureGroup.setStatisticsEnabled(apiFG.getStatisticsEnabled()); featureGroup.setCorrelations(apiFG.getCorrelations()); featureGroup.setHistograms(apiFG.getHistograms()); - featureGroup.setStatisticColumns(apiFG.getStatisticColumns()); } } From c39b4496e0278939fef675905ef43d96b2c5116e Mon Sep 17 00:00:00 2001 From: moritzmeister Date: Thu, 17 Sep 2020 15:51:50 +0200 Subject: [PATCH 10/10] change deequ group id to logicalclocks --- java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/pom.xml b/java/pom.xml index 4242ea1b21..a438be1b4b 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -31,7 +31,7 @@ - com.amazon.deequ + com.logicalclocks deequ ${deequ.version}