Skip to content

Commit

Permalink
Add support for feature set updates and remove versions (#676)
Browse files Browse the repository at this point in the history
* BigQuery connector migrates schema on feature set changes

* Update docs, remove versions from protos

* Remove versions from core, ApplyFeatureSet updates existing featuresets instead of advancing version

* Remove versions from storage API and connectors

* Remove versions from ingestion

* Remove versions from serving

* Remove versions from sdk, examples and end-to-end tests

* Ignore versions if provided for backward compatibility

* Regenerate golang protos, apply black

* Rebase on master

* Remove LAST_VERSION

* Clean up job update logic

* Use set comparison for entities during feature set updates, change FeatureSet.status to use enum directly

* Fix tests and error messages

* Move strip versions to transform before feature row validate step

* Update documentation, clean up toProto method

* Remove redundant status setting, correct misleading comments

* Add end to end test for feature set updates, squash some bugs

* Use count() instead of returning all rows
  • Loading branch information
Chen Zhiling authored May 13, 2020
1 parent 2c5130d commit dfc81b9
Show file tree
Hide file tree
Showing 98 changed files with 2,511 additions and 2,734 deletions.
25 changes: 8 additions & 17 deletions core/src/main/java/feast/core/dao/FeatureSetRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,16 @@ public interface FeatureSetRepository extends JpaRepository<FeatureSet, String>

long count();

// Find single feature set by project, name, and version
FeatureSet findFeatureSetByNameAndProject_NameAndVersion(
String name, String project, Integer version);
// Find single feature set by project and name
FeatureSet findFeatureSetByNameAndProject_Name(String name, String project);

// Find single latest version of a feature set by project and name (LIKE)
FeatureSet findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc(
String name, String project);
// find all feature sets and order by name
List<FeatureSet> findAllByOrderByNameAsc();

// find all feature sets and order by name and version
List<FeatureSet> findAllByOrderByNameAscVersionAsc();
// find all feature sets matching the given name pattern with a specific project.
List<FeatureSet> findAllByNameLikeAndProject_NameOrderByNameAsc(String name, String project_name);

// find all feature sets within a project and order by name and version
List<FeatureSet> findAllByProject_NameOrderByNameAscVersionAsc(String project_name);

// find all versions of feature sets matching the given name pattern with a specific project.
List<FeatureSet> findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc(
String name, String project_name);

// find all versions of feature sets matching the given name pattern and project pattern
List<FeatureSet> findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc(
// find all feature sets matching the given name pattern and project pattern
List<FeatureSet> findAllByNameLikeAndProject_NameLikeOrderByNameAsc(
String name, String project_name);
}
20 changes: 14 additions & 6 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package feast.core.job;

import com.google.common.collect.Sets;
import feast.core.FeatureSetProto.FeatureSetStatus;
import feast.core.log.Action;
import feast.core.log.AuditLogger;
import feast.core.log.Resource;
Expand All @@ -28,7 +29,6 @@
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -84,7 +84,7 @@ public Job call() {
} else {
Job job = currentJob.get();

if (featureSetsChangedFor(job)) {
if (requiresUpdate(job)) {
submittedJob = executorService.submit(() -> updateJob(job));
} else {
return updateStatus(job);
Expand All @@ -101,11 +101,19 @@ public Job call() {
}
}

boolean featureSetsChangedFor(Job job) {
Set<FeatureSet> existingFeatureSetsPopulatedByJob = Sets.newHashSet(job.getFeatureSets());
Set<FeatureSet> newFeatureSetsPopulatedByJob = Sets.newHashSet(featureSets);
boolean requiresUpdate(Job job) {
// If set of feature sets has changed
if (!Sets.newHashSet(featureSets).equals(Sets.newHashSet(job.getFeatureSets()))) {
return true;
}

return !newFeatureSetsPopulatedByJob.equals(existingFeatureSetsPopulatedByJob);
// If any existing feature set populated by the job has its status as pending
for (FeatureSet featureSet : job.getFeatureSets()) {
if (featureSet.getStatus().equals(FeatureSetStatus.STATUS_PENDING)) {
return true;
}
}
return false;
}

private Job createJob() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Job startJob(Job job) {
featureSetProtos.add(featureSet.toProto());
}
ImportOptions pipelineOptions =
getPipelineOptions(featureSetProtos, job.getStore().toProto());
getPipelineOptions(job.getId(), featureSetProtos, job.getStore().toProto());
PipelineResult pipelineResult = runPipeline(pipelineOptions);
DirectJob directJob = new DirectJob(job.getId(), pipelineResult);
jobs.add(directJob);
Expand All @@ -93,14 +93,16 @@ public Job startJob(Job job) {
}

private ImportOptions getPipelineOptions(
List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink) throws IOException {
String jobName, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink)
throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());

pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets));
pipelineOptions.setJobName(jobName);
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setRunner(DirectRunner.class);
pipelineOptions.setProject(""); // set to default value to satisfy validation
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/feast/core/model/Entity.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public static Entity fromProto(EntitySpec entitySpec) {
return entity;
}

public EntitySpec toProto() {
return EntitySpec.newBuilder().setName(name).setValueType(ValueType.Enum.valueOf(type)).build();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
122 changes: 104 additions & 18 deletions core/src/main/java/feast/core/model/Feature.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package feast.core.model;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.FeatureSetProto.FeatureSpec;
import feast.core.FeatureSetProto.FeatureSpec.Builder;
import feast.core.util.TypeConversion;
import feast.types.ValueProto.ValueType;
import java.util.Arrays;
Expand All @@ -26,6 +28,7 @@
import javax.persistence.Entity;
import lombok.Getter;
import lombok.Setter;
import org.tensorflow.metadata.v0.*;

/**
* Feature belonging to a featureset. Contains name, type as well as domain metadata about the
Expand Down Expand Up @@ -79,6 +82,9 @@ public class Feature {
private byte[] timeOfDayDomain;

public Feature() {}
// Whether this feature has been archived. A archived feature cannot be
// retrieved from or written to.
private boolean archived = false;

private Feature(String name, ValueType.Enum type) {
this.setName(name);
Expand All @@ -88,70 +94,148 @@ private Feature(String name, ValueType.Enum type) {
public static Feature fromProto(FeatureSpec featureSpec) {
Feature feature = new Feature(featureSpec.getName(), featureSpec.getValueType());
feature.labels = TypeConversion.convertMapToJsonString(featureSpec.getLabelsMap());
feature.updateSchema(featureSpec);
return feature;
}

public FeatureSpec toProto() throws InvalidProtocolBufferException {
Builder featureSpecBuilder =
FeatureSpec.newBuilder().setName(getName()).setValueType(ValueType.Enum.valueOf(getType()));

if (getPresence() != null) {
featureSpecBuilder.setPresence(FeaturePresence.parseFrom(getPresence()));
} else if (getGroupPresence() != null) {
featureSpecBuilder.setGroupPresence(FeaturePresenceWithinGroup.parseFrom(getGroupPresence()));
}

if (getShape() != null) {
featureSpecBuilder.setShape(FixedShape.parseFrom(getShape()));
} else if (getValueCount() != null) {
featureSpecBuilder.setValueCount(ValueCount.parseFrom(getValueCount()));
}

if (getDomain() != null) {
featureSpecBuilder.setDomain(getDomain());
} else if (getIntDomain() != null) {
featureSpecBuilder.setIntDomain(IntDomain.parseFrom(getIntDomain()));
} else if (getFloatDomain() != null) {
featureSpecBuilder.setFloatDomain(FloatDomain.parseFrom(getFloatDomain()));
} else if (getStringDomain() != null) {
featureSpecBuilder.setStringDomain(StringDomain.parseFrom(getStringDomain()));
} else if (getBoolDomain() != null) {
featureSpecBuilder.setBoolDomain(BoolDomain.parseFrom(getBoolDomain()));
} else if (getStructDomain() != null) {
featureSpecBuilder.setStructDomain(StructDomain.parseFrom(getStructDomain()));
} else if (getNaturalLanguageDomain() != null) {
featureSpecBuilder.setNaturalLanguageDomain(
NaturalLanguageDomain.parseFrom(getNaturalLanguageDomain()));
} else if (getImageDomain() != null) {
featureSpecBuilder.setImageDomain(ImageDomain.parseFrom(getImageDomain()));
} else if (getMidDomain() != null) {
featureSpecBuilder.setMidDomain(MIDDomain.parseFrom(getMidDomain()));
} else if (getUrlDomain() != null) {
featureSpecBuilder.setUrlDomain(URLDomain.parseFrom(getUrlDomain()));
} else if (getTimeDomain() != null) {
featureSpecBuilder.setTimeDomain(TimeDomain.parseFrom(getTimeDomain()));
} else if (getTimeOfDayDomain() != null) {
featureSpecBuilder.setTimeOfDayDomain(TimeOfDayDomain.parseFrom(getTimeOfDayDomain()));
}

if (getLabels() != null) {
featureSpecBuilder.putAllLabels(getLabels());
}
return featureSpecBuilder.build();
}

private void updateSchema(FeatureSpec featureSpec) {
switch (featureSpec.getPresenceConstraintsCase()) {
case PRESENCE:
feature.setPresence(featureSpec.getPresence().toByteArray());
setPresence(featureSpec.getPresence().toByteArray());
break;
case GROUP_PRESENCE:
feature.setGroupPresence(featureSpec.getGroupPresence().toByteArray());
setGroupPresence(featureSpec.getGroupPresence().toByteArray());
break;
case PRESENCECONSTRAINTS_NOT_SET:
break;
}

switch (featureSpec.getShapeTypeCase()) {
case SHAPE:
feature.setShape(featureSpec.getShape().toByteArray());
setShape(featureSpec.getShape().toByteArray());
break;
case VALUE_COUNT:
feature.setValueCount(featureSpec.getValueCount().toByteArray());
setValueCount(featureSpec.getValueCount().toByteArray());
break;
case SHAPETYPE_NOT_SET:
break;
}

switch (featureSpec.getDomainInfoCase()) {
case DOMAIN:
feature.setDomain(featureSpec.getDomain());
setDomain(featureSpec.getDomain());
break;
case INT_DOMAIN:
feature.setIntDomain(featureSpec.getIntDomain().toByteArray());
setIntDomain(featureSpec.getIntDomain().toByteArray());
break;
case FLOAT_DOMAIN:
feature.setFloatDomain(featureSpec.getFloatDomain().toByteArray());
setFloatDomain(featureSpec.getFloatDomain().toByteArray());
break;
case STRING_DOMAIN:
feature.setStringDomain(featureSpec.getStringDomain().toByteArray());
setStringDomain(featureSpec.getStringDomain().toByteArray());
break;
case BOOL_DOMAIN:
feature.setBoolDomain(featureSpec.getBoolDomain().toByteArray());
setBoolDomain(featureSpec.getBoolDomain().toByteArray());
break;
case STRUCT_DOMAIN:
feature.setStructDomain(featureSpec.getStructDomain().toByteArray());
setStructDomain(featureSpec.getStructDomain().toByteArray());
break;
case NATURAL_LANGUAGE_DOMAIN:
feature.setNaturalLanguageDomain(featureSpec.getNaturalLanguageDomain().toByteArray());
setNaturalLanguageDomain(featureSpec.getNaturalLanguageDomain().toByteArray());
break;
case IMAGE_DOMAIN:
feature.setImageDomain(featureSpec.getImageDomain().toByteArray());
setImageDomain(featureSpec.getImageDomain().toByteArray());
break;
case MID_DOMAIN:
feature.setMidDomain(featureSpec.getMidDomain().toByteArray());
setMidDomain(featureSpec.getMidDomain().toByteArray());
break;
case URL_DOMAIN:
feature.setUrlDomain(featureSpec.getUrlDomain().toByteArray());
setUrlDomain(featureSpec.getUrlDomain().toByteArray());
break;
case TIME_DOMAIN:
feature.setTimeDomain(featureSpec.getTimeDomain().toByteArray());
setTimeDomain(featureSpec.getTimeDomain().toByteArray());
break;
case TIME_OF_DAY_DOMAIN:
feature.setTimeOfDayDomain(featureSpec.getTimeOfDayDomain().toByteArray());
setTimeOfDayDomain(featureSpec.getTimeOfDayDomain().toByteArray());
break;
case DOMAININFO_NOT_SET:
break;
}
return feature;
}

/** Archive this feature. */
public void archive() {
this.archived = true;
}

/**
* Update the feature object with a valid feature spec. Only schema changes are allowed.
*
* @param featureSpec {@link FeatureSpec} containing schema changes.
*/
public void updateFromProto(FeatureSpec featureSpec) {
if (isArchived()) {
throw new IllegalArgumentException(
String.format(
"You are attempting to create a feature %s that was previously archived. This isn't allowed. Please create a new feature with a different name.",
featureSpec.getName()));
}
if (ValueType.Enum.valueOf(type) != featureSpec.getValueType()) {
throw new IllegalArgumentException(
String.format(
"You are attempting to change the type of feature %s from %s to %s. This isn't allowed. Please create a new feature.",
featureSpec.getName(), type, featureSpec.getValueType()));
}
updateSchema(featureSpec);
}

public Map<String, String> getLabels() {
Expand All @@ -167,7 +251,9 @@ public boolean equals(Object o) {
return false;
}
Feature feature = (Feature) o;
return Objects.equals(getName(), feature.getName())
return getName().equals(feature.getName())
&& getType().equals(feature.getType())
&& isArchived() == (feature.isArchived())
&& Objects.equals(getLabels(), feature.getLabels())
&& Arrays.equals(getPresence(), feature.getPresence())
&& Arrays.equals(getGroupPresence(), feature.getGroupPresence())
Expand Down
Loading

0 comments on commit dfc81b9

Please sign in to comment.