From b4f9c6d7369c53674ef395bb769d55a91bba9ea3 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Sat, 4 Jan 2020 11:24:11 +0800 Subject: [PATCH 1/2] Filter out duplicate fields, ignore extra fields --- .../transform/fn/ValidateFeatureRowDoFn.java | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java index dfbb48fc85..d8f7b376ed 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java @@ -23,7 +23,11 @@ import feast.types.FeatureRowProto.FeatureRow; import feast.types.FieldProto; import feast.types.ValueProto.Value.ValCase; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.TupleTag; @@ -57,15 +61,12 @@ public void processElement(ProcessContext context) { String error = null; FeatureRow featureRow = context.element(); FeatureSet featureSet = getFeatureSets().getOrDefault(featureRow.getFeatureSet(), null); + Set fields = new HashSet<>(); if (featureSet != null) { - for (FieldProto.Field field : featureRow.getFieldsList()) { Field fieldSpec = featureSet.getField(field.getName()); if (fieldSpec == null) { - error = - String.format( - "FeatureRow contains field '%s' which do not exists in FeatureSet '%s' version '%d'. Please check the FeatureRow data.", - field.getName(), featureSet.getReference()); + // skip break; } // If value is set in the FeatureRow, make sure the value type matches @@ -81,6 +82,7 @@ public void processElement(ProcessContext context) { break; } } + fields.add(field); } } else { error = @@ -107,6 +109,10 @@ public void processElement(ProcessContext context) { } context.output(getFailureTag(), failedElement.build()); } else { + featureRow = featureRow.toBuilder() + .clearFields() + .addAllFields(fields) + .build(); context.output(getSuccessTag(), featureRow); } } From 2eb613ae5bed9dd4aff445c063a470c419aa8b27 Mon Sep 17 00:00:00 2001 From: zhilingc Date: Mon, 6 Jan 2020 13:21:43 +0800 Subject: [PATCH 2/2] Add test for filtering out extra fields --- .../transform/fn/ValidateFeatureRowDoFn.java | 6 +- .../transform/ValidateFeatureRowsTest.java | 63 +++++++++++++++++++ 2 files changed, 67 insertions(+), 2 deletions(-) diff --git a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java index d8f7b376ed..7d61a62f3f 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java +++ b/ingestion/src/main/java/feast/ingestion/transform/fn/ValidateFeatureRowDoFn.java @@ -61,7 +61,7 @@ public void processElement(ProcessContext context) { String error = null; FeatureRow featureRow = context.element(); FeatureSet featureSet = getFeatureSets().getOrDefault(featureRow.getFeatureSet(), null); - Set fields = new HashSet<>(); + List fields = new ArrayList<>(); if (featureSet != null) { for (FieldProto.Field field : featureRow.getFieldsList()) { Field fieldSpec = featureSet.getField(field.getName()); @@ -82,7 +82,9 @@ public void processElement(ProcessContext context) { break; } } - fields.add(field); + if (!fields.contains(field)) { + fields.add(field); + } } } else { error = diff --git a/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java b/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java index d129c15661..aca3956387 100644 --- a/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java +++ b/ingestion/src/test/java/feast/ingestion/transform/ValidateFeatureRowsTest.java @@ -25,6 +25,8 @@ import feast.ingestion.values.FailedElement; import feast.test.TestUtil; import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.Value; import feast.types.ValueProto.ValueType.Enum; import java.util.ArrayList; import java.util.HashMap; @@ -138,4 +140,65 @@ public void shouldWriteSuccessAndFailureTagsCorrectly() { p.run(); } + + @Test + public void shouldExcludeUnregisteredFields() { + FeatureSet fs1 = + FeatureSet.newBuilder() + .setSpec( + FeatureSetSpec.newBuilder() + .setName("feature_set") + .setVersion(1) + .setProject("myproject") + .addEntities( + EntitySpec.newBuilder() + .setName("entity_id_primary") + .setValueType(Enum.INT32) + .build()) + .addEntities( + EntitySpec.newBuilder() + .setName("entity_id_secondary") + .setValueType(Enum.STRING) + .build()) + .addFeatures( + FeatureSpec.newBuilder() + .setName("feature_1") + .setValueType(Enum.STRING) + .build()) + .addFeatures( + FeatureSpec.newBuilder() + .setName("feature_2") + .setValueType(Enum.INT64) + .build())) + .build(); + + Map featureSets = new HashMap<>(); + featureSets.put("myproject/feature_set:1", fs1); + + List input = new ArrayList<>(); + List expected = new ArrayList<>(); + + FeatureRow randomRow = TestUtil.createRandomFeatureRow(fs1); + expected.add(randomRow); + input.add(randomRow.toBuilder() + .addFields(Field.newBuilder() + .setName("extra") + .setValue(Value.newBuilder().setStringVal("hello"))) + .build() + ); + + PCollectionTuple output = + p.apply(Create.of(input)) + .setCoder(ProtoCoder.of(FeatureRow.class)) + .apply( + ValidateFeatureRows.newBuilder() + .setFailureTag(FAILURE_TAG) + .setSuccessTag(SUCCESS_TAG) + .setFeatureSets(featureSets) + .build()); + + PAssert.that(output.get(SUCCESS_TAG)).containsInAnyOrder(expected); + + p.run(); + } }