Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for feature set updates and remove versions #676

Merged
merged 19 commits into from
May 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 8 additions & 17 deletions core/src/main/java/feast/core/dao/FeatureSetRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,16 @@ public interface FeatureSetRepository extends JpaRepository<FeatureSet, String>

long count();

// Find single feature set by project, name, and version
FeatureSet findFeatureSetByNameAndProject_NameAndVersion(
String name, String project, Integer version);
// Find single feature set by project and name
FeatureSet findFeatureSetByNameAndProject_Name(String name, String project);

// Find single latest version of a feature set by project and name (LIKE)
FeatureSet findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc(
String name, String project);
// find all feature sets and order by name
List<FeatureSet> findAllByOrderByNameAsc();

// find all feature sets and order by name and version
List<FeatureSet> findAllByOrderByNameAscVersionAsc();
// find all feature sets matching the given name pattern with a specific project.
List<FeatureSet> findAllByNameLikeAndProject_NameOrderByNameAsc(String name, String project_name);

// find all feature sets within a project and order by name and version
List<FeatureSet> findAllByProject_NameOrderByNameAscVersionAsc(String project_name);

// find all versions of feature sets matching the given name pattern with a specific project.
List<FeatureSet> findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc(
String name, String project_name);

// find all versions of feature sets matching the given name pattern and project pattern
List<FeatureSet> findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc(
// find all feature sets matching the given name pattern and project pattern
List<FeatureSet> findAllByNameLikeAndProject_NameLikeOrderByNameAsc(
String name, String project_name);
}
20 changes: 14 additions & 6 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package feast.core.job;

import com.google.common.collect.Sets;
import feast.core.FeatureSetProto.FeatureSetStatus;
import feast.core.log.Action;
import feast.core.log.AuditLogger;
import feast.core.log.Resource;
Expand All @@ -28,7 +29,6 @@
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -84,7 +84,7 @@ public Job call() {
} else {
Job job = currentJob.get();

if (featureSetsChangedFor(job)) {
if (requiresUpdate(job)) {
submittedJob = executorService.submit(() -> updateJob(job));
} else {
return updateStatus(job);
Expand All @@ -101,11 +101,19 @@ public Job call() {
}
}

boolean featureSetsChangedFor(Job job) {
Set<FeatureSet> existingFeatureSetsPopulatedByJob = Sets.newHashSet(job.getFeatureSets());
Set<FeatureSet> newFeatureSetsPopulatedByJob = Sets.newHashSet(featureSets);
boolean requiresUpdate(Job job) {
// If set of feature sets has changed
if (!Sets.newHashSet(featureSets).equals(Sets.newHashSet(job.getFeatureSets()))) {
return true;
}

return !newFeatureSetsPopulatedByJob.equals(existingFeatureSetsPopulatedByJob);
// If any existing feature set populated by the job has its status as pending
ches marked this conversation as resolved.
Show resolved Hide resolved
for (FeatureSet featureSet : job.getFeatureSets()) {
if (featureSet.getStatus().equals(FeatureSetStatus.STATUS_PENDING)) {
return true;
}
}
return false;
}

private Job createJob() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public Job startJob(Job job) {
featureSetProtos.add(featureSet.toProto());
}
ImportOptions pipelineOptions =
getPipelineOptions(featureSetProtos, job.getStore().toProto());
getPipelineOptions(job.getId(), featureSetProtos, job.getStore().toProto());
PipelineResult pipelineResult = runPipeline(pipelineOptions);
DirectJob directJob = new DirectJob(job.getId(), pipelineResult);
jobs.add(directJob);
Expand All @@ -93,14 +93,16 @@ public Job startJob(Job job) {
}

private ImportOptions getPipelineOptions(
List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink) throws IOException {
String jobName, List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink)
throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());

pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets));
pipelineOptions.setJobName(jobName);
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setRunner(DirectRunner.class);
pipelineOptions.setProject(""); // set to default value to satisfy validation
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/feast/core/model/Entity.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public static Entity fromProto(EntitySpec entitySpec) {
return entity;
}

public EntitySpec toProto() {
return EntitySpec.newBuilder().setName(name).setValueType(ValueType.Enum.valueOf(type)).build();
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
122 changes: 104 additions & 18 deletions core/src/main/java/feast/core/model/Feature.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
*/
package feast.core.model;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.FeatureSetProto.FeatureSpec;
import feast.core.FeatureSetProto.FeatureSpec.Builder;
import feast.core.util.TypeConversion;
import feast.types.ValueProto.ValueType;
import java.util.Arrays;
Expand All @@ -26,6 +28,7 @@
import javax.persistence.Entity;
import lombok.Getter;
import lombok.Setter;
import org.tensorflow.metadata.v0.*;

/**
* Feature belonging to a featureset. Contains name, type as well as domain metadata about the
Expand Down Expand Up @@ -79,6 +82,9 @@ public class Feature {
private byte[] timeOfDayDomain;

public Feature() {}
// Whether this feature has been archived. A archived feature cannot be
// retrieved from or written to.
private boolean archived = false;

private Feature(String name, ValueType.Enum type) {
this.setName(name);
Expand All @@ -88,70 +94,148 @@ private Feature(String name, ValueType.Enum type) {
public static Feature fromProto(FeatureSpec featureSpec) {
Feature feature = new Feature(featureSpec.getName(), featureSpec.getValueType());
feature.labels = TypeConversion.convertMapToJsonString(featureSpec.getLabelsMap());
feature.updateSchema(featureSpec);
return feature;
}

public FeatureSpec toProto() throws InvalidProtocolBufferException {
Builder featureSpecBuilder =
FeatureSpec.newBuilder().setName(getName()).setValueType(ValueType.Enum.valueOf(getType()));

if (getPresence() != null) {
featureSpecBuilder.setPresence(FeaturePresence.parseFrom(getPresence()));
} else if (getGroupPresence() != null) {
featureSpecBuilder.setGroupPresence(FeaturePresenceWithinGroup.parseFrom(getGroupPresence()));
}

if (getShape() != null) {
featureSpecBuilder.setShape(FixedShape.parseFrom(getShape()));
} else if (getValueCount() != null) {
featureSpecBuilder.setValueCount(ValueCount.parseFrom(getValueCount()));
}

if (getDomain() != null) {
featureSpecBuilder.setDomain(getDomain());
} else if (getIntDomain() != null) {
featureSpecBuilder.setIntDomain(IntDomain.parseFrom(getIntDomain()));
} else if (getFloatDomain() != null) {
featureSpecBuilder.setFloatDomain(FloatDomain.parseFrom(getFloatDomain()));
} else if (getStringDomain() != null) {
featureSpecBuilder.setStringDomain(StringDomain.parseFrom(getStringDomain()));
} else if (getBoolDomain() != null) {
featureSpecBuilder.setBoolDomain(BoolDomain.parseFrom(getBoolDomain()));
} else if (getStructDomain() != null) {
featureSpecBuilder.setStructDomain(StructDomain.parseFrom(getStructDomain()));
} else if (getNaturalLanguageDomain() != null) {
featureSpecBuilder.setNaturalLanguageDomain(
NaturalLanguageDomain.parseFrom(getNaturalLanguageDomain()));
} else if (getImageDomain() != null) {
featureSpecBuilder.setImageDomain(ImageDomain.parseFrom(getImageDomain()));
} else if (getMidDomain() != null) {
featureSpecBuilder.setMidDomain(MIDDomain.parseFrom(getMidDomain()));
} else if (getUrlDomain() != null) {
featureSpecBuilder.setUrlDomain(URLDomain.parseFrom(getUrlDomain()));
} else if (getTimeDomain() != null) {
featureSpecBuilder.setTimeDomain(TimeDomain.parseFrom(getTimeDomain()));
} else if (getTimeOfDayDomain() != null) {
featureSpecBuilder.setTimeOfDayDomain(TimeOfDayDomain.parseFrom(getTimeOfDayDomain()));
}

if (getLabels() != null) {
featureSpecBuilder.putAllLabels(getLabels());
}
return featureSpecBuilder.build();
}

private void updateSchema(FeatureSpec featureSpec) {
switch (featureSpec.getPresenceConstraintsCase()) {
case PRESENCE:
feature.setPresence(featureSpec.getPresence().toByteArray());
setPresence(featureSpec.getPresence().toByteArray());
break;
case GROUP_PRESENCE:
feature.setGroupPresence(featureSpec.getGroupPresence().toByteArray());
setGroupPresence(featureSpec.getGroupPresence().toByteArray());
break;
case PRESENCECONSTRAINTS_NOT_SET:
break;
}

switch (featureSpec.getShapeTypeCase()) {
case SHAPE:
feature.setShape(featureSpec.getShape().toByteArray());
setShape(featureSpec.getShape().toByteArray());
break;
case VALUE_COUNT:
feature.setValueCount(featureSpec.getValueCount().toByteArray());
setValueCount(featureSpec.getValueCount().toByteArray());
break;
case SHAPETYPE_NOT_SET:
break;
}

switch (featureSpec.getDomainInfoCase()) {
case DOMAIN:
feature.setDomain(featureSpec.getDomain());
setDomain(featureSpec.getDomain());
break;
case INT_DOMAIN:
feature.setIntDomain(featureSpec.getIntDomain().toByteArray());
setIntDomain(featureSpec.getIntDomain().toByteArray());
break;
case FLOAT_DOMAIN:
feature.setFloatDomain(featureSpec.getFloatDomain().toByteArray());
setFloatDomain(featureSpec.getFloatDomain().toByteArray());
break;
case STRING_DOMAIN:
feature.setStringDomain(featureSpec.getStringDomain().toByteArray());
setStringDomain(featureSpec.getStringDomain().toByteArray());
break;
case BOOL_DOMAIN:
feature.setBoolDomain(featureSpec.getBoolDomain().toByteArray());
setBoolDomain(featureSpec.getBoolDomain().toByteArray());
break;
case STRUCT_DOMAIN:
feature.setStructDomain(featureSpec.getStructDomain().toByteArray());
setStructDomain(featureSpec.getStructDomain().toByteArray());
break;
case NATURAL_LANGUAGE_DOMAIN:
feature.setNaturalLanguageDomain(featureSpec.getNaturalLanguageDomain().toByteArray());
setNaturalLanguageDomain(featureSpec.getNaturalLanguageDomain().toByteArray());
break;
case IMAGE_DOMAIN:
feature.setImageDomain(featureSpec.getImageDomain().toByteArray());
setImageDomain(featureSpec.getImageDomain().toByteArray());
break;
case MID_DOMAIN:
feature.setMidDomain(featureSpec.getMidDomain().toByteArray());
setMidDomain(featureSpec.getMidDomain().toByteArray());
break;
case URL_DOMAIN:
feature.setUrlDomain(featureSpec.getUrlDomain().toByteArray());
setUrlDomain(featureSpec.getUrlDomain().toByteArray());
break;
case TIME_DOMAIN:
feature.setTimeDomain(featureSpec.getTimeDomain().toByteArray());
setTimeDomain(featureSpec.getTimeDomain().toByteArray());
break;
case TIME_OF_DAY_DOMAIN:
feature.setTimeOfDayDomain(featureSpec.getTimeOfDayDomain().toByteArray());
setTimeOfDayDomain(featureSpec.getTimeOfDayDomain().toByteArray());
break;
case DOMAININFO_NOT_SET:
break;
}
return feature;
}

/** Archive this feature. */
public void archive() {
this.archived = true;
}

/**
* Update the feature object with a valid feature spec. Only schema changes are allowed.
*
* @param featureSpec {@link FeatureSpec} containing schema changes.
*/
public void updateFromProto(FeatureSpec featureSpec) {
if (isArchived()) {
throw new IllegalArgumentException(
String.format(
"You are attempting to create a feature %s that was previously archived. This isn't allowed. Please create a new feature with a different name.",
featureSpec.getName()));
}
if (ValueType.Enum.valueOf(type) != featureSpec.getValueType()) {
throw new IllegalArgumentException(
String.format(
"You are attempting to change the type of feature %s from %s to %s. This isn't allowed. Please create a new feature.",
featureSpec.getName(), type, featureSpec.getValueType()));
}
updateSchema(featureSpec);
}

public Map<String, String> getLabels() {
Expand All @@ -167,7 +251,9 @@ public boolean equals(Object o) {
return false;
}
Feature feature = (Feature) o;
return Objects.equals(getName(), feature.getName())
return getName().equals(feature.getName())
&& getType().equals(feature.getType())
&& isArchived() == (feature.isArchived())
&& Objects.equals(getLabels(), feature.getLabels())
&& Arrays.equals(getPresence(), feature.getPresence())
&& Arrays.equals(getGroupPresence(), feature.getGroupPresence())
Expand Down
Loading