Skip to content

Commit

Permalink
Update protos with Tensorflow data validation schema (#438)
Browse files Browse the repository at this point in the history
* Add presence, shape and domain fields to EntitySpec and FeatureSpec
These are copied from definition in Schema.proto in tensorflow data validation

* Add presence,shape and domain fields to Field class
  • Loading branch information
davidheryanto authored and feast-ci-bot committed Jan 20, 2020
1 parent bb960a7 commit ba1c828
Show file tree
Hide file tree
Showing 14 changed files with 1,310 additions and 88 deletions.
5 changes: 3 additions & 2 deletions core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.grpc;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.CoreServiceGrpc.CoreServiceImplBase;
import feast.core.CoreServiceProto.ApplyFeatureSetRequest;
import feast.core.CoreServiceProto.ApplyFeatureSetResponse;
Expand Down Expand Up @@ -77,7 +78,7 @@ public void getFeatureSet(
GetFeatureSetResponse response = specService.getFeatureSet(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RetrievalException | StatusRuntimeException e) {
} catch (RetrievalException | StatusRuntimeException | InvalidProtocolBufferException e) {
log.error("Exception has occurred in GetFeatureSet method: ", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
Expand All @@ -91,7 +92,7 @@ public void listFeatureSets(
ListFeatureSetsResponse response = specService.listFeatureSets(request.getFilter());
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (RetrievalException | IllegalArgumentException e) {
} catch (RetrievalException | IllegalArgumentException | InvalidProtocolBufferException e) {
log.error("Exception has occurred in ListFeatureSet method: ", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
Expand Down
1 change: 1 addition & 0 deletions core/src/main/java/feast/core/job/JobUpdateTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ private Job startJob(

return job;
} catch (Exception e) {
log.error(e.getMessage());
AuditLogger.log(
Resource.JOB,
jobId,
Expand Down
31 changes: 21 additions & 10 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,24 @@ public Runner getRunnerType() {

@Override
public Job startJob(Job job) {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());
try {
List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
for (FeatureSet featureSet : job.getFeatureSets()) {
featureSetProtos.add(featureSet.toProto());
}
return submitDataflowJob(
job.getId(),
featureSetProtos,
job.getSource().toProto(),
job.getStore().toProto(),
false);

} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(String.format("Unable to start job %s", job.getId()), e);
log.error(e.getMessage());
throw new IllegalArgumentException(
String.format("DataflowJobManager failed to START job with id '%s' because the job"
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
job.getId(), e.getMessage()));
}
}

Expand All @@ -101,14 +108,18 @@ public Job startJob(Job job) {
@Override
public Job updateJob(Job job) {
try {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());

return submitDataflowJob(
job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), true);

List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
for (FeatureSet featureSet : job.getFeatureSets()) {
featureSetProtos.add(featureSet.toProto());
}
return submitDataflowJob(job.getId(), featureSetProtos, job.getSource().toProto(),
job.getStore().toProto(), true);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(String.format("Unable to update job %s", job.getId()), e);
log.error(e.getMessage());
throw new IllegalArgumentException(
String.format("DataflowJobManager failed to UPDATE job with id '%s' because the job"
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
job.getId(), e.getMessage()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import feast.core.FeatureSetProto;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.StoreProto;
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.exception.JobExecutionException;
Expand All @@ -38,7 +37,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -75,8 +73,10 @@ public Runner getRunnerType() {
@Override
public Job startJob(Job job) {
try {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());
List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
for (FeatureSet featureSet : job.getFeatureSets()) {
featureSetProtos.add(featureSet.toProto());
}
ImportOptions pipelineOptions =
getPipelineOptions(featureSetProtos, job.getStore().toProto());
PipelineResult pipelineResult = runPipeline(pipelineOptions);
Expand Down Expand Up @@ -131,10 +131,6 @@ public Job updateJob(Job job) {
String jobId = job.getExtId();
abortJob(jobId);
try {
List<FeatureSetSpec> featureSetSpecs = new ArrayList<>();
for (FeatureSet featureSet : job.getFeatureSets()) {
featureSetSpecs.add(featureSet.toProto().getSpec());
}
return startJob(job);
} catch (JobExecutionException e) {
throw new JobExecutionException(String.format("Error running ingestion job: %s", e), e);
Expand Down
158 changes: 136 additions & 22 deletions core/src/main/java/feast/core/model/FeatureSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@
package feast.core.model;

import com.google.protobuf.Duration;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Timestamp;
import feast.core.FeatureSetProto;
import feast.core.FeatureSetProto.EntitySpec;
import feast.core.FeatureSetProto.FeatureSetMeta;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.FeatureSetProto.FeatureSetStatus;
import feast.core.FeatureSetProto.FeatureSpec;
import feast.types.ValueProto.ValueType;
import feast.types.ValueProto.ValueType.Enum;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -47,6 +48,20 @@
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.hibernate.annotations.Fetch;
import org.hibernate.annotations.FetchMode;
import org.tensorflow.metadata.v0.BoolDomain;
import org.tensorflow.metadata.v0.FeaturePresence;
import org.tensorflow.metadata.v0.FeaturePresenceWithinGroup;
import org.tensorflow.metadata.v0.FixedShape;
import org.tensorflow.metadata.v0.FloatDomain;
import org.tensorflow.metadata.v0.ImageDomain;
import org.tensorflow.metadata.v0.IntDomain;
import org.tensorflow.metadata.v0.NaturalLanguageDomain;
import org.tensorflow.metadata.v0.StringDomain;
import org.tensorflow.metadata.v0.StructDomain;
import org.tensorflow.metadata.v0.TimeDomain;
import org.tensorflow.metadata.v0.TimeOfDayDomain;
import org.tensorflow.metadata.v0.URLDomain;
import org.tensorflow.metadata.v0.ValueCount;

@Getter
@Setter
Expand Down Expand Up @@ -157,23 +172,23 @@ public static FeatureSet fromProto(FeatureSetProto.FeatureSet featureSetProto) {
FeatureSetSpec featureSetSpec = featureSetProto.getSpec();
Source source = Source.fromProto(featureSetSpec.getSource());

List<Field> features = new ArrayList<>();
for (FeatureSpec feature : featureSetSpec.getFeaturesList()) {
features.add(new Field(feature.getName(), feature.getValueType()));
List<Field> featureSpecs = new ArrayList<>();
for (FeatureSpec featureSpec : featureSetSpec.getFeaturesList()) {
featureSpecs.add(new Field(featureSpec));
}

List<Field> entities = new ArrayList<>();
for (EntitySpec entity : featureSetSpec.getEntitiesList()) {
entities.add(new Field(entity.getName(), entity.getValueType()));
List<Field> entitySpecs = new ArrayList<>();
for (EntitySpec entitySpec : featureSetSpec.getEntitiesList()) {
entitySpecs.add(new Field(entitySpec));
}

return new FeatureSet(
featureSetProto.getSpec().getName(),
featureSetProto.getSpec().getProject(),
featureSetProto.getSpec().getVersion(),
featureSetSpec.getMaxAge().getSeconds(),
entities,
features,
entitySpecs,
featureSpecs,
source,
featureSetProto.getMeta().getStatus());
}
Expand Down Expand Up @@ -202,24 +217,21 @@ public void addFeature(Field field) {
features.add(field);
}

public FeatureSetProto.FeatureSet toProto() {
public FeatureSetProto.FeatureSet toProto() throws InvalidProtocolBufferException {
List<EntitySpec> entitySpecs = new ArrayList<>();
for (Field entity : entities) {
entitySpecs.add(
EntitySpec.newBuilder()
.setName(entity.getName())
.setValueType(ValueType.Enum.valueOf(entity.getType()))
.build());
for (Field entityField : entities) {
EntitySpec.Builder entitySpecBuilder = EntitySpec.newBuilder();
setEntitySpecFields(entitySpecBuilder, entityField);
entitySpecs.add(entitySpecBuilder.build());
}

List<FeatureSpec> featureSpecs = new ArrayList<>();
for (Field feature : features) {
featureSpecs.add(
FeatureSpec.newBuilder()
.setName(feature.getName())
.setValueType(ValueType.Enum.valueOf(feature.getType()))
.build());
for (Field featureField : features) {
FeatureSpec.Builder featureSpecBuilder = FeatureSpec.newBuilder();
setFeatureSpecFields(featureSpecBuilder, featureField);
featureSpecs.add(featureSpecBuilder.build());
}

FeatureSetMeta.Builder meta =
FeatureSetMeta.newBuilder()
.setCreatedTimestamp(
Expand All @@ -239,6 +251,108 @@ public FeatureSetProto.FeatureSet toProto() {
return FeatureSetProto.FeatureSet.newBuilder().setMeta(meta).setSpec(spec).build();
}

// setEntitySpecFields and setFeatureSpecFields methods contain duplicated code because
// Feast internally treat EntitySpec and FeatureSpec as Field class. However, the proto message
// builder for EntitySpec and FeatureSpec are of different class.
@SuppressWarnings("DuplicatedCode")
private void setEntitySpecFields(EntitySpec.Builder entitySpecBuilder, Field entityField)
throws InvalidProtocolBufferException {
entitySpecBuilder
.setName(entityField.getName())
.setValueType(Enum.valueOf(entityField.getType()));

if (entityField.getPresence() != null) {
entitySpecBuilder.setPresence(FeaturePresence.parseFrom(entityField.getPresence()));
} else if (entityField.getGroupPresence() != null) {
entitySpecBuilder
.setGroupPresence(FeaturePresenceWithinGroup.parseFrom(entityField.getGroupPresence()));
}

if (entityField.getShape() != null) {
entitySpecBuilder.setShape(FixedShape.parseFrom(entityField.getShape()));
} else if (entityField.getValueCount() != null) {
entitySpecBuilder.setValueCount(ValueCount.parseFrom(entityField.getValueCount()));
}

if (entityField.getDomain() != null) {
entitySpecBuilder.setDomain(entityField.getDomain());
} else if (entityField.getIntDomain() != null) {
entitySpecBuilder.setIntDomain(IntDomain.parseFrom(entityField.getIntDomain()));
} else if (entityField.getFloatDomain() != null) {
entitySpecBuilder.setFloatDomain(FloatDomain.parseFrom(entityField.getFloatDomain()));
} else if (entityField.getStringDomain() != null) {
entitySpecBuilder.setStringDomain(StringDomain.parseFrom(entityField.getStringDomain()));
} else if (entityField.getBoolDomain() != null) {
entitySpecBuilder.setBoolDomain(BoolDomain.parseFrom(entityField.getBoolDomain()));
} else if (entityField.getStructDomain() != null) {
entitySpecBuilder.setStructDomain(StructDomain.parseFrom(entityField.getStructDomain()));
} else if (entityField.getNaturalLanguageDomain() != null) {
entitySpecBuilder.setNaturalLanguageDomain(
NaturalLanguageDomain.parseFrom(entityField.getNaturalLanguageDomain()));
} else if (entityField.getImageDomain() != null) {
entitySpecBuilder.setImageDomain(ImageDomain.parseFrom(entityField.getImageDomain()));
} else if (entityField.getMidDomain() != null) {
entitySpecBuilder.setIntDomain(IntDomain.parseFrom(entityField.getIntDomain()));
} else if (entityField.getUrlDomain() != null) {
entitySpecBuilder.setUrlDomain(URLDomain.parseFrom(entityField.getUrlDomain()));
} else if (entityField.getTimeDomain() != null) {
entitySpecBuilder.setTimeDomain(TimeDomain.parseFrom(entityField.getTimeDomain()));
} else if (entityField.getTimeOfDayDomain() != null) {
entitySpecBuilder
.setTimeOfDayDomain(TimeOfDayDomain.parseFrom(entityField.getTimeOfDayDomain()));
}
}

// Refer to setEntitySpecFields method for the reason for code duplication.
@SuppressWarnings("DuplicatedCode")
private void setFeatureSpecFields(FeatureSpec.Builder featureSpecBuilder, Field featureField)
throws InvalidProtocolBufferException {
featureSpecBuilder
.setName(featureField.getName())
.setValueType(Enum.valueOf(featureField.getType()));

if (featureField.getPresence() != null) {
featureSpecBuilder.setPresence(FeaturePresence.parseFrom(featureField.getPresence()));
} else if (featureField.getGroupPresence() != null) {
featureSpecBuilder
.setGroupPresence(FeaturePresenceWithinGroup.parseFrom(featureField.getGroupPresence()));
}

if (featureField.getShape() != null) {
featureSpecBuilder.setShape(FixedShape.parseFrom(featureField.getShape()));
} else if (featureField.getValueCount() != null) {
featureSpecBuilder.setValueCount(ValueCount.parseFrom(featureField.getValueCount()));
}

if (featureField.getDomain() != null) {
featureSpecBuilder.setDomain(featureField.getDomain());
} else if (featureField.getIntDomain() != null) {
featureSpecBuilder.setIntDomain(IntDomain.parseFrom(featureField.getIntDomain()));
} else if (featureField.getFloatDomain() != null) {
featureSpecBuilder.setFloatDomain(FloatDomain.parseFrom(featureField.getFloatDomain()));
} else if (featureField.getStringDomain() != null) {
featureSpecBuilder.setStringDomain(StringDomain.parseFrom(featureField.getStringDomain()));
} else if (featureField.getBoolDomain() != null) {
featureSpecBuilder.setBoolDomain(BoolDomain.parseFrom(featureField.getBoolDomain()));
} else if (featureField.getStructDomain() != null) {
featureSpecBuilder.setStructDomain(StructDomain.parseFrom(featureField.getStructDomain()));
} else if (featureField.getNaturalLanguageDomain() != null) {
featureSpecBuilder.setNaturalLanguageDomain(
NaturalLanguageDomain.parseFrom(featureField.getNaturalLanguageDomain()));
} else if (featureField.getImageDomain() != null) {
featureSpecBuilder.setImageDomain(ImageDomain.parseFrom(featureField.getImageDomain()));
} else if (featureField.getMidDomain() != null) {
featureSpecBuilder.setIntDomain(IntDomain.parseFrom(featureField.getIntDomain()));
} else if (featureField.getUrlDomain() != null) {
featureSpecBuilder.setUrlDomain(URLDomain.parseFrom(featureField.getUrlDomain()));
} else if (featureField.getTimeDomain() != null) {
featureSpecBuilder.setTimeDomain(TimeDomain.parseFrom(featureField.getTimeDomain()));
} else if (featureField.getTimeOfDayDomain() != null) {
featureSpecBuilder
.setTimeOfDayDomain(TimeOfDayDomain.parseFrom(featureField.getTimeOfDayDomain()));
}
}

/**
* Checks if the given featureSet's schema and source has is different from this one.
*
Expand Down
Loading

0 comments on commit ba1c828

Please sign in to comment.