Skip to content

Commit

Permalink
[HOPSWORKS-3178] Remove Deequ based data validation in favour of GE (l…
Browse files Browse the repository at this point in the history
  • Loading branch information
moritzmeister authored and kennethmhc committed Nov 16, 2022
1 parent 1de2987 commit 9346fe2
Show file tree
Hide file tree
Showing 53 changed files with 180 additions and 3,552 deletions.
70 changes: 0 additions & 70 deletions auto_doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,38 +140,6 @@
"hsfs.statistics_config.StatisticsConfig"
),
},
"feature_validation.md": {
"rule": ["hsfs.rule.Rule"],
"rule_properties": keras_autodoc.get_properties("hsfs.rule.Rule"),
"ruledefinition": ["hsfs.ruledefinition.RuleDefinition"],
"ruledefinition_getall": ["hsfs.connection.Connection.get_rules"],
"ruledefinition_get": ["hsfs.connection.Connection.get_rule"],
"ruledefinition_properties": keras_autodoc.get_properties(
"hsfs.ruledefinition.RuleDefinition"
),
"expectation": ["hsfs.expectation.Expectation"],
"expectation_properties": keras_autodoc.get_properties(
"hsfs.expectation.Expectation"
),
"expectation_methods": keras_autodoc.get_methods(
"hsfs.expectation.Expectation",
exclude=[
"from_response_json",
"update_from_response_json",
"json",
"to_dict",
],
),
"expectation_create": ["hsfs.feature_store.FeatureStore.create_expectation"],
"expectation_get": ["hsfs.feature_store.FeatureStore.get_expectation"],
"expectation_getall": ["hsfs.feature_store.FeatureStore.get_expectations"],
"validation_result": ["hsfs.validation_result.ValidationResult"],
"validation_result_properties": keras_autodoc.get_properties(
"hsfs.validation_result.ValidationResult"
),
"validate": ["hsfs.feature_group.FeatureGroup.validate"],
"validation_result_get": ["hsfs.feature_group.FeatureGroup.get_validations"],
},
"tags.md": {
"fg_tag_add": ["hsfs.feature_group.FeatureGroupBase.add_tag"],
"fg_tag_get": ["hsfs.feature_group.FeatureGroupBase.get_tag"],
Expand Down Expand Up @@ -308,44 +276,6 @@
"hsfs.statistics_config.StatisticsConfig"
),
},
"api/rule_api.md": {
"rule": ["hsfs.rule.Rule"],
"rule_properties": keras_autodoc.get_properties("hsfs.rule.Rule"),
},
"api/rule_definition_api.md": {
"ruledefinition": ["hsfs.ruledefinition.RuleDefinition"],
"ruledefinition_getall": ["hsfs.connection.Connection.get_rules"],
"ruledefinition_get": ["hsfs.connection.Connection.get_rule"],
"ruledefinition_properties": keras_autodoc.get_properties(
"hsfs.ruledefinition.RuleDefinition"
),
},
"api/expectation_api.md": {
"expectation": ["hsfs.expectation.Expectation"],
"expectation_properties": keras_autodoc.get_properties(
"hsfs.expectation.Expectation"
),
"expectation_methods": keras_autodoc.get_methods(
"hsfs.expectation.Expectation",
exclude=[
"from_response_json",
"update_from_response_json",
"json",
"to_dict",
],
),
"expectation_create": ["hsfs.feature_store.FeatureStore.create_expectation"],
"expectation_get": ["hsfs.feature_store.FeatureStore.get_expectation"],
"expectation_getall": ["hsfs.feature_store.FeatureStore.get_expectations"],
},
"api/validation_api.md": {
"validation_result": ["hsfs.validation_result.ValidationResult"],
"validation_result_properties": keras_autodoc.get_properties(
"hsfs.validation_result.ValidationResult"
),
"validate": ["hsfs.feature_group.FeatureGroup.validate"],
"validation_result_get": ["hsfs.feature_group.FeatureGroup.get_validations"],
},
"api/transformation_functions_api.md": {
"transformation_function": [
"hsfs.transformation_function.TransformationFunction"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,17 @@
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.logicalclocks.hsfs.engine.ExternalFeatureGroupEngine;
import com.logicalclocks.hsfs.engine.CodeEngine;
import com.logicalclocks.hsfs.metadata.Expectation;
import com.logicalclocks.hsfs.metadata.FeatureGroupBase;
import com.logicalclocks.hsfs.metadata.OnDemandOptions;
import com.logicalclocks.hsfs.metadata.validation.ValidationType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import scala.collection.JavaConverters;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -73,9 +69,7 @@ public class ExternalFeatureGroup extends FeatureGroupBase {
public ExternalFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String query,
ExternalDataFormat dataFormat, String path, Map<String, String> options,
@NonNull StorageConnector storageConnector, String description, List<String> primaryKeys,
List<Feature> features, StatisticsConfig statisticsConfig,
scala.collection.Seq<Expectation> expectations,
ValidationType validationType, String eventTime) {
List<Feature> features, StatisticsConfig statisticsConfig, String eventTime) {
this.featureStore = featureStore;
this.name = name;
this.version = version;
Expand All @@ -93,12 +87,6 @@ public ExternalFeatureGroup(FeatureStore featureStore, @NonNull String name, Int
this.features = features;
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
this.eventTime = eventTime;
this.validationType = validationType != null ? validationType : ValidationType.NONE;
if (expectations != null && !expectations.isEmpty()) {
this.expectationsNames = new ArrayList<>();
this.expectations = JavaConverters.seqAsJavaListConverter(expectations).asJava();
this.expectations.forEach(expectation -> this.expectationsNames.add(expectation.getName()));
}
}

public ExternalFeatureGroup() {
Expand Down
13 changes: 1 addition & 12 deletions java/src/main/java/com/logicalclocks/hsfs/FeatureGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.metadata.FeatureGroupBase;
import com.logicalclocks.hsfs.engine.StatisticsEngine;
import com.logicalclocks.hsfs.metadata.Expectation;
import com.logicalclocks.hsfs.metadata.validation.ValidationType;
import com.logicalclocks.hsfs.metadata.Statistics;
import lombok.AllArgsConstructor;
import lombok.Builder;
Expand All @@ -40,11 +38,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.avro.Schema;
import scala.collection.JavaConverters;

import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -96,8 +92,7 @@ public class FeatureGroup extends FeatureGroupBase {
public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
boolean onlineEnabled, TimeTravelFormat timeTravelFormat, List<Feature> features,
StatisticsConfig statisticsConfig, ValidationType validationType,
scala.collection.Seq<Expectation> expectations, String onlineTopicName, String eventTime) {
StatisticsConfig statisticsConfig, String onlineTopicName, String eventTime) {
this.featureStore = featureStore;
this.name = name;
this.version = version;
Expand All @@ -112,12 +107,6 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
this.timeTravelFormat = timeTravelFormat != null ? timeTravelFormat : TimeTravelFormat.HUDI;
this.features = features;
this.statisticsConfig = statisticsConfig != null ? statisticsConfig : new StatisticsConfig();
this.validationType = validationType != null ? validationType : ValidationType.NONE;
if (expectations != null && !expectations.isEmpty()) {
this.expectationsNames = new ArrayList<>();
this.expectations = JavaConverters.seqAsJavaListConverter(expectations).asJava();
this.expectations.forEach(expectation -> this.expectationsNames.add(expectation.getName()));
}
this.onlineTopicName = onlineTopicName;
this.eventTime = eventTime;
}
Expand Down
72 changes: 9 additions & 63 deletions java/src/main/java/com/logicalclocks/hsfs/FeatureStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.logicalclocks.hsfs.engine.FeatureViewEngine;
import com.logicalclocks.hsfs.engine.SparkEngine;
import com.logicalclocks.hsfs.metadata.Expectation;
import com.logicalclocks.hsfs.metadata.ExpectationsApi;
import com.logicalclocks.hsfs.metadata.FeatureGroupApi;
import com.logicalclocks.hsfs.metadata.StorageConnectorApi;
import com.logicalclocks.hsfs.metadata.TrainingDatasetApi;
import com.logicalclocks.hsfs.metadata.validation.ValidationType;
import lombok.Getter;
import lombok.NonNull;
import lombok.Setter;
Expand All @@ -33,7 +30,6 @@
import scala.collection.JavaConverters;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class FeatureStore {
Expand All @@ -55,7 +51,6 @@ public class FeatureStore {
private FeatureGroupApi featureGroupApi;
private TrainingDatasetApi trainingDatasetApi;
private StorageConnectorApi storageConnectorApi;
private ExpectationsApi expectationsApi;
private FeatureViewEngine featureViewEngine;

private static final Logger LOGGER = LoggerFactory.getLogger(FeatureStore.class);
Expand All @@ -66,7 +61,6 @@ public FeatureStore() {
featureGroupApi = new FeatureGroupApi();
trainingDatasetApi = new TrainingDatasetApi();
storageConnectorApi = new StorageConnectorApi();
expectationsApi = new ExpectationsApi();
featureViewEngine = new FeatureViewEngine();
}

Expand Down Expand Up @@ -260,16 +254,14 @@ public FeatureGroup.FeatureGroupBuilder createFeatureGroup() {

public FeatureGroup getOrCreateFeatureGroup(String name, Integer version) throws IOException, FeatureStoreException {
return featureGroupApi.getOrCreateFeatureGroup(this, name, version, null, null,
null, null, false, null, null, null,
null, null);
null, null, false, null, null, null);
}

public FeatureGroup getOrCreateFeatureGroup(String name, Integer version, List<String> primaryKeys,
boolean onlineEnabled, String eventTime)
throws IOException, FeatureStoreException {
return featureGroupApi.getOrCreateFeatureGroup(this, name, version, null, primaryKeys,
null, null, onlineEnabled, null, null, null,
null, eventTime);
null, null, onlineEnabled, null, null, eventTime);
}

public FeatureGroup getOrCreateFeatureGroup(String name, Integer version,
Expand All @@ -279,21 +271,18 @@ public FeatureGroup getOrCreateFeatureGroup(String name, Integer version,
String eventTime) throws IOException, FeatureStoreException {

return featureGroupApi.getOrCreateFeatureGroup(this, name, version, null, primaryKeys,
partitionKeys, null, onlineEnabled, null, null, null,
null, eventTime);
partitionKeys, null, onlineEnabled, null, null, eventTime);
}

public FeatureGroup getOrCreateFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys,
String hudiPrecombineKey,
boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
StatisticsConfig statisticsConfig, ValidationType validationType,
scala.collection.Seq<Expectation> expectations, String eventTime)
StatisticsConfig statisticsConfig, String eventTime)
throws IOException, FeatureStoreException {

return featureGroupApi.getOrCreateFeatureGroup(this, name, version, description, primaryKeys,
partitionKeys, hudiPrecombineKey, onlineEnabled, timeTravelFormat, statisticsConfig, validationType,
expectations, eventTime);
partitionKeys, hudiPrecombineKey, onlineEnabled, timeTravelFormat, statisticsConfig, eventTime);
}

public StreamFeatureGroup.StreamFeatureGroupBuilder createStreamFeatureGroup() {
Expand All @@ -304,14 +293,14 @@ public StreamFeatureGroup.StreamFeatureGroupBuilder createStreamFeatureGroup() {
public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version)
throws IOException, FeatureStoreException {
return featureGroupApi.getOrCreateStreamFeatureGroup(this, name, version, null,
null, null, null, false, null, null, null);
null, null, null, false, null, null);
}

public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, List<String> primaryKeys,
boolean onlineEnabled, String eventTime)
throws IOException, FeatureStoreException {
return featureGroupApi.getOrCreateStreamFeatureGroup(this, name, version, null,
primaryKeys, null, null, onlineEnabled, null, null, eventTime);
primaryKeys, null, null, onlineEnabled, null, eventTime);
}

public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, List<String> primaryKeys,
Expand All @@ -320,19 +309,18 @@ public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer ver


return featureGroupApi.getOrCreateStreamFeatureGroup(this, name, version, null,
primaryKeys, partitionKeys, null, onlineEnabled, null, null, eventTime);
primaryKeys, partitionKeys, null, onlineEnabled, null, eventTime);
}

public StreamFeatureGroup getOrCreateStreamFeatureGroup(String name, Integer version, String description,
List<String> primaryKeys, List<String> partitionKeys,
String hudiPrecombineKey, boolean onlineEnabled,
StatisticsConfig statisticsConfig,
scala.collection.Seq<Expectation> expectations,
String eventTime)
throws IOException, FeatureStoreException {

return featureGroupApi.getOrCreateStreamFeatureGroup(this, name, version, description,
primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, statisticsConfig, expectations, eventTime);
primaryKeys, partitionKeys, hudiPrecombineKey, onlineEnabled, statisticsConfig, eventTime);
}

public ExternalFeatureGroup.ExternalFeatureGroupBuilder createExternalFeatureGroup() {
Expand Down Expand Up @@ -383,12 +371,6 @@ public TrainingDataset.TrainingDatasetBuilder createTrainingDataset() {
.featureStore(this);
}


public Expectation.ExpectationBuilder createExpectation() {
return Expectation.builder()
.featureStore(this);
}

/**
* Get a training dataset object from the selected feature store.
*
Expand Down Expand Up @@ -430,42 +412,6 @@ public scala.collection.Seq<TrainingDataset> getTrainingDatasets(@NonNull String
return JavaConverters.asScalaBufferConverter(trainingDatasetApi.get(this, name, null)).asScala().toSeq();
}

public scala.collection.Seq<Expectation> createExpectations(scala.collection.Seq<Expectation> expectations)
throws FeatureStoreException, IOException {
List<Expectation> newExpectations = new ArrayList<>();
List<Expectation> expectationsList =
(List<Expectation>) JavaConverters.seqAsJavaListConverter(expectations).asJava();
for (Expectation expectation : expectationsList) {
expectation = expectationsApi.put(this, expectation);
newExpectations.add(expectation);
}
return JavaConverters.asScalaBufferConverter(newExpectations).asScala().toSeq();
}

public Expectation getExpectation(String name)
throws FeatureStoreException, IOException {
return expectationsApi.get(this, name);
}

public scala.collection.Seq<Expectation> getExpectations() throws FeatureStoreException, IOException {
return JavaConverters.asScalaBufferConverter(expectationsApi.get(this)).asScala().toSeq();
}

public void deleteExpectation(Expectation expectation) throws FeatureStoreException, IOException {
deleteExpectation(expectation.getName());
}

public void deleteExpectation(String name) throws FeatureStoreException, IOException {
expectationsApi.delete(this, name);
}

public void deleteExpectations(scala.collection.Seq<Expectation> expectations)
throws FeatureStoreException, IOException {
for (Expectation expectation : (List<Expectation>) JavaConverters.seqAsJavaListConverter(expectations).asJava()) {
deleteExpectation(expectation);
}
}

@Override
public String toString() {
return "FeatureStore{"
Expand Down
12 changes: 0 additions & 12 deletions java/src/main/java/com/logicalclocks/hsfs/HopsworksConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import com.logicalclocks.hsfs.metadata.FeatureStoreApi;
import com.logicalclocks.hsfs.metadata.HopsworksClient;
import com.logicalclocks.hsfs.metadata.ProjectApi;
import com.logicalclocks.hsfs.metadata.RuleDefinition;
import com.logicalclocks.hsfs.metadata.RulesApi;
import com.logicalclocks.hsfs.metadata.validation.RuleName;
import com.logicalclocks.hsfs.util.Constants;
import lombok.Builder;
import lombok.Getter;
Expand Down Expand Up @@ -69,7 +66,6 @@ public class HopsworksConnection implements Closeable {

private FeatureStoreApi featureStoreApi = new FeatureStoreApi();
private ProjectApi projectApi = new ProjectApi();
private RulesApi rulesApi = new RulesApi();

private Project projectObj;

Expand Down Expand Up @@ -148,12 +144,4 @@ private String getProjectName(String project) {
}
return project;
}

public scala.collection.Seq<RuleDefinition> getRules() throws FeatureStoreException, IOException {
return rulesApi.get();
}

public RuleDefinition getRule(RuleName name) throws FeatureStoreException, IOException {
return rulesApi.get(name);
}
}
Loading

0 comments on commit 9346fe2

Please sign in to comment.