diff --git a/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java b/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java index 4b744d0fe6..ca017c1f75 100644 --- a/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java +++ b/ingestion/src/main/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFn.java @@ -18,12 +18,15 @@ import feast.core.FeatureSetProto.EntitySpec; import feast.core.FeatureSetProto.FeatureSet; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.FeatureSetProto.FeatureSpec; import feast.storage.RedisProto.RedisKey; import feast.storage.RedisProto.RedisKey.Builder; import feast.store.serving.redis.RedisCustomIO.Method; import feast.store.serving.redis.RedisCustomIO.RedisMutation; import feast.types.FeatureRowProto.FeatureRow; import feast.types.FieldProto.Field; +import feast.types.ValueProto; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -64,14 +67,45 @@ private RedisKey getKey(FeatureRow featureRow) { return redisKeyBuilder.build(); } + private byte[] getValue(FeatureRow featureRow) { + FeatureSetSpec spec = featureSets.get(featureRow.getFeatureSet()).getSpec(); + + List featureNames = + spec.getFeaturesList().stream().map(FeatureSpec::getName).collect(Collectors.toList()); + Map fieldValueOnlyMap = + featureRow.getFieldsList().stream() + .filter(field -> featureNames.contains(field.getName())) + .distinct() + .collect( + Collectors.toMap( + Field::getName, + field -> Field.newBuilder().setValue(field.getValue()).build())); + + List values = + featureNames.stream() + .sorted() + .map( + featureName -> + fieldValueOnlyMap.getOrDefault( + featureName, + Field.newBuilder().setValue(ValueProto.Value.getDefaultInstance()).build())) + .collect(Collectors.toList()); + + return FeatureRow.newBuilder() + .setEventTimestamp(featureRow.getEventTimestamp()) + .addAllFields(values) + .build() + .toByteArray(); + } + /** Output a redis mutation object for every feature in the feature row. */ @ProcessElement public void processElement(ProcessContext context) { FeatureRow featureRow = context.element(); try { - RedisKey key = getKey(featureRow); - RedisMutation redisMutation = - new RedisMutation(Method.SET, key.toByteArray(), featureRow.toByteArray(), null, null); + byte[] key = getKey(featureRow).toByteArray(); + byte[] value = getValue(featureRow); + RedisMutation redisMutation = new RedisMutation(Method.SET, key, value, null, null); context.output(redisMutation); } catch (Exception e) { log.error(e.getMessage(), e); diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 7546d7e36e..0b000df0f5 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -37,6 +37,7 @@ import feast.test.TestUtil.LocalKafka; import feast.test.TestUtil.LocalRedis; import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto; import feast.types.ValueProto.ValueType.Enum; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisURI; @@ -50,6 +51,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; @@ -189,6 +191,23 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() FeatureRow randomRow = TestUtil.createRandomFeatureRow(featureSet); RedisKey redisKey = TestUtil.createRedisKey(featureSet, randomRow); input.add(randomRow); + List fields = + randomRow.getFieldsList().stream() + .filter( + field -> + spec.getFeaturesList().stream() + .map(FeatureSpec::getName) + .collect(Collectors.toList()) + .contains(field.getName())) + .map(field -> field.toBuilder().clearName().build()) + .collect(Collectors.toList()); + randomRow = + randomRow + .toBuilder() + .clearFields() + .addAllFields(fields) + .clearFeatureSet() + .build(); expected.put(redisKey, randomRow); }); diff --git a/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java b/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java index 92bb6e41c3..86b4feae05 100644 --- a/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java +++ b/ingestion/src/test/java/feast/store/serving/redis/FeatureRowToRedisMutationDoFnTest.java @@ -29,10 +29,7 @@ import feast.types.FieldProto.Field; import feast.types.ValueProto.Value; import feast.types.ValueProto.ValueType.Enum; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; +import java.util.*; import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; @@ -96,6 +93,14 @@ public void shouldConvertRowWithDuplicateEntitiesToValidKey() { Field.newBuilder() .setName("entity_id_secondary") .setValue(Value.newBuilder().setStringVal("a"))) + .addFields( + Field.newBuilder() + .setName("feature_1") + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder() + .setName("feature_2") + .setValue(Value.newBuilder().setInt64Val(1001))) .build(); PCollection output = @@ -116,6 +121,13 @@ public void shouldConvertRowWithDuplicateEntitiesToValidKey() { .setValue(Value.newBuilder().setStringVal("a"))) .build(); + FeatureRow expectedValue = + FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) + .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001))) + .build(); + PAssert.that(output) .satisfies( (SerializableFunction, Void>) @@ -123,7 +135,7 @@ public void shouldConvertRowWithDuplicateEntitiesToValidKey() { input.forEach( rm -> { assert (Arrays.equals(rm.getKey(), expectedKey.toByteArray())); - assert (Arrays.equals(rm.getValue(), offendingRow.toByteArray())); + assert (Arrays.equals(rm.getValue(), expectedValue.toByteArray())); }); return null; }); @@ -131,7 +143,7 @@ public void shouldConvertRowWithDuplicateEntitiesToValidKey() { } @Test - public void shouldConvertRowWithOutOfOrderEntitiesToValidKey() { + public void shouldConvertRowWithOutOfOrderFieldsToValidKey() { Map featureSets = new HashMap<>(); featureSets.put("feature_set", fs); @@ -147,6 +159,14 @@ public void shouldConvertRowWithOutOfOrderEntitiesToValidKey() { Field.newBuilder() .setName("entity_id_primary") .setValue(Value.newBuilder().setInt32Val(1))) + .addFields( + Field.newBuilder() + .setName("feature_2") + .setValue(Value.newBuilder().setInt64Val(1001))) + .addFields( + Field.newBuilder() + .setName("feature_1") + .setValue(Value.newBuilder().setStringVal("strValue1"))) .build(); PCollection output = @@ -167,6 +187,148 @@ public void shouldConvertRowWithOutOfOrderEntitiesToValidKey() { .setValue(Value.newBuilder().setStringVal("a"))) .build(); + List expectedFields = + Arrays.asList( + Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1")).build(), + Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001)).build()); + FeatureRow expectedValue = + FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) + .addAllFields(expectedFields) + .build(); + + PAssert.that(output) + .satisfies( + (SerializableFunction, Void>) + input -> { + input.forEach( + rm -> { + assert (Arrays.equals(rm.getKey(), expectedKey.toByteArray())); + assert (Arrays.equals(rm.getValue(), expectedValue.toByteArray())); + }); + return null; + }); + p.run(); + } + + @Test + public void shouldMergeDuplicateFeatureFields() { + Map featureSets = new HashMap<>(); + featureSets.put("feature_set", fs); + + FeatureRow featureRowWithDuplicatedFeatureFields = + FeatureRow.newBuilder() + .setFeatureSet("feature_set") + .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) + .addFields( + Field.newBuilder() + .setName("entity_id_primary") + .setValue(Value.newBuilder().setInt32Val(1))) + .addFields( + Field.newBuilder() + .setName("entity_id_secondary") + .setValue(Value.newBuilder().setStringVal("a"))) + .addFields( + Field.newBuilder() + .setName("feature_1") + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder() + .setName("feature_1") + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields( + Field.newBuilder() + .setName("feature_2") + .setValue(Value.newBuilder().setInt64Val(1001))) + .build(); + + PCollection output = + p.apply(Create.of(Collections.singletonList(featureRowWithDuplicatedFeatureFields))) + .setCoder(ProtoCoder.of(FeatureRow.class)) + .apply(ParDo.of(new FeatureRowToRedisMutationDoFn(featureSets))); + + RedisKey expectedKey = + RedisKey.newBuilder() + .setFeatureSet("feature_set") + .addEntities( + Field.newBuilder() + .setName("entity_id_primary") + .setValue(Value.newBuilder().setInt32Val(1))) + .addEntities( + Field.newBuilder() + .setName("entity_id_secondary") + .setValue(Value.newBuilder().setStringVal("a"))) + .build(); + + FeatureRow expectedValue = + FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) + .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt64Val(1001))) + .build(); + + PAssert.that(output) + .satisfies( + (SerializableFunction, Void>) + input -> { + input.forEach( + rm -> { + assert (Arrays.equals(rm.getKey(), expectedKey.toByteArray())); + assert (Arrays.equals(rm.getValue(), expectedValue.toByteArray())); + }); + return null; + }); + p.run(); + } + + @Test + public void shouldPopulateMissingFeatureValuesWithDefaultInstance() { + Map featureSets = new HashMap<>(); + featureSets.put("feature_set", fs); + + FeatureRow featureRowWithDuplicatedFeatureFields = + FeatureRow.newBuilder() + .setFeatureSet("feature_set") + .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) + .addFields( + Field.newBuilder() + .setName("entity_id_primary") + .setValue(Value.newBuilder().setInt32Val(1))) + .addFields( + Field.newBuilder() + .setName("entity_id_secondary") + .setValue(Value.newBuilder().setStringVal("a"))) + .addFields( + Field.newBuilder() + .setName("feature_1") + .setValue(Value.newBuilder().setStringVal("strValue1"))) + .build(); + + PCollection output = + p.apply(Create.of(Collections.singletonList(featureRowWithDuplicatedFeatureFields))) + .setCoder(ProtoCoder.of(FeatureRow.class)) + .apply(ParDo.of(new FeatureRowToRedisMutationDoFn(featureSets))); + + RedisKey expectedKey = + RedisKey.newBuilder() + .setFeatureSet("feature_set") + .addEntities( + Field.newBuilder() + .setName("entity_id_primary") + .setValue(Value.newBuilder().setInt32Val(1))) + .addEntities( + Field.newBuilder() + .setName("entity_id_secondary") + .setValue(Value.newBuilder().setStringVal("a"))) + .build(); + + FeatureRow expectedValue = + FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setSeconds(10)) + .addFields(Field.newBuilder().setValue(Value.newBuilder().setStringVal("strValue1"))) + .addFields(Field.newBuilder().setValue(Value.getDefaultInstance())) + .build(); + PAssert.that(output) .satisfies( (SerializableFunction, Void>) @@ -174,7 +336,7 @@ public void shouldConvertRowWithOutOfOrderEntitiesToValidKey() { input.forEach( rm -> { assert (Arrays.equals(rm.getKey(), expectedKey.toByteArray())); - assert (Arrays.equals(rm.getValue(), offendingRow.toByteArray())); + assert (Arrays.equals(rm.getValue(), expectedValue.toByteArray())); }); return null; }); diff --git a/serving/src/main/java/feast/serving/encoding/FeatureRowDecoder.java b/serving/src/main/java/feast/serving/encoding/FeatureRowDecoder.java new file mode 100644 index 0000000000..e70695d8c6 --- /dev/null +++ b/serving/src/main/java/feast/serving/encoding/FeatureRowDecoder.java @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.encoding; + +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.core.FeatureSetProto.FeatureSpec; +import feast.types.FeatureRowProto.FeatureRow; +import feast.types.FieldProto.Field; +import java.util.Comparator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class FeatureRowDecoder { + + private final String featureSetRef; + private final FeatureSetSpec spec; + + public FeatureRowDecoder(String featureSetRef, FeatureSetSpec spec) { + this.featureSetRef = featureSetRef; + this.spec = spec; + } + + /** + * A feature row is considered encoded if the feature set and field names are not set. This method + * is required for backward compatibility purposes, to allow Feast serving to continue serving non + * encoded Feature Row ingested by an older version of Feast. + * + * @param featureRow Feature row + * @return boolean + */ + public Boolean isEncoded(FeatureRow featureRow) { + return featureRow.getFeatureSet().isEmpty() + && featureRow.getFieldsList().stream().allMatch(field -> field.getName().isEmpty()); + } + + /** + * Validates if an encoded feature row can be decoded without exception. + * + * @param featureRow Feature row + * @return boolean + */ + public Boolean isEncodingValid(FeatureRow featureRow) { + return featureRow.getFieldsList().size() == spec.getFeaturesList().size(); + } + + /** + * Decoding feature row by repopulating the field names based on the corresponding feature set + * spec. + * + * @param encodedFeatureRow Feature row + * @return boolean + */ + public FeatureRow decode(FeatureRow encodedFeatureRow) { + final List fieldsWithoutName = encodedFeatureRow.getFieldsList(); + + List featureNames = + spec.getFeaturesList().stream() + .sorted(Comparator.comparing(FeatureSpec::getName)) + .map(FeatureSpec::getName) + .collect(Collectors.toList()); + List fields = + IntStream.range(0, featureNames.size()) + .mapToObj( + featureNameIndex -> { + String featureName = featureNames.get(featureNameIndex); + return fieldsWithoutName + .get(featureNameIndex) + .toBuilder() + .setName(featureName) + .build(); + }) + .collect(Collectors.toList()); + return encodedFeatureRow + .toBuilder() + .clearFields() + .setFeatureSet(featureSetRef) + .addAllFields(fields) + .build(); + } +} diff --git a/serving/src/main/java/feast/serving/service/RedisServingService.java b/serving/src/main/java/feast/serving/service/RedisServingService.java index fad7f5d8cf..56ee1e80ec 100644 --- a/serving/src/main/java/feast/serving/service/RedisServingService.java +++ b/serving/src/main/java/feast/serving/service/RedisServingService.java @@ -16,6 +16,7 @@ */ package feast.serving.service; +import static feast.serving.util.Metrics.invalidEncodingCount; import static feast.serving.util.Metrics.missingKeyCount; import static feast.serving.util.Metrics.requestCount; import static feast.serving.util.Metrics.requestLatency; @@ -41,6 +42,7 @@ import feast.serving.ServingAPIProto.GetOnlineFeaturesRequest.EntityRow; import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse; import feast.serving.ServingAPIProto.GetOnlineFeaturesResponse.FieldValues; +import feast.serving.encoding.FeatureRowDecoder; import feast.serving.specs.CachedSpecService; import feast.serving.specs.FeatureSetRequest; import feast.serving.util.RefUtil; @@ -55,6 +57,7 @@ import io.opentracing.Tracer; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -108,7 +111,7 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequest requ try { sendAndProcessMultiGet(redisKeys, entityRows, featureValuesMap, featureSetRequest); - } catch (InvalidProtocolBufferException e) { + } catch (InvalidProtocolBufferException | ExecutionException e) { throw Status.INTERNAL .withDescription("Unable to parse protobuf while retrieving feature") .withCause(e) @@ -191,7 +194,7 @@ private void sendAndProcessMultiGet( List entityRows, Map> featureValuesMap, FeatureSetRequest featureSetRequest) - throws InvalidProtocolBufferException { + throws InvalidProtocolBufferException, ExecutionException { List values = sendMultiGet(redisKeys); long startTime = System.currentTimeMillis(); @@ -226,6 +229,27 @@ private void sendAndProcessMultiGet( } FeatureRow featureRow = FeatureRow.parseFrom(value); + String featureSetRef = redisKeys.get(i).getFeatureSet(); + FeatureRowDecoder decoder = + new FeatureRowDecoder(featureSetRef, specService.getFeatureSetSpec(featureSetRef)); + if (decoder.isEncoded(featureRow)) { + if (decoder.isEncodingValid(featureRow)) { + featureRow = decoder.decode(featureRow); + } else { + featureSetRequest + .getFeatureReferences() + .parallelStream() + .forEach( + request -> + invalidEncodingCount + .labels( + spec.getProject(), + String.format("%s:%d", request.getName(), request.getVersion())) + .inc()); + featureValues.putAll(nullValues); + continue; + } + } boolean stale = isStale(featureSetRequest, entityRow, featureRow); if (stale) { diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java index 35119589b2..12a8242da1 100644 --- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java @@ -90,6 +90,7 @@ public CachedSpecService(CoreSpecService coreService, Path configPath) { featureSetCacheLoader = CacheLoader.from(featureSets::get); featureSetCache = CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(featureSetCacheLoader); + featureSetCache.putAll(featureSets); } /** @@ -101,6 +102,10 @@ public Store getStore() { return this.store; } + public FeatureSetSpec getFeatureSetSpec(String featureSetRef) throws ExecutionException { + return featureSetCache.get(featureSetRef); + } + /** * Get FeatureSetSpecs for the given features. * diff --git a/serving/src/main/java/feast/serving/util/Metrics.java b/serving/src/main/java/feast/serving/util/Metrics.java index a502bb1559..fa66f79a80 100644 --- a/serving/src/main/java/feast/serving/util/Metrics.java +++ b/serving/src/main/java/feast/serving/util/Metrics.java @@ -46,6 +46,14 @@ public class Metrics { .labelNames("project", "feature_name") .register(); + public static final Counter invalidEncodingCount = + Counter.build() + .name("invalid_encoding_feature_count") + .subsystem("feast_serving") + .help("number requested feature rows that were stored with the wrong encoding") + .labelNames("project", "feature_name") + .register(); + public static final Counter staleKeyCount = Counter.build() .name("stale_feature_count") diff --git a/serving/src/test/java/feast/serving/encoding/FeatureRowDecoderTest.java b/serving/src/test/java/feast/serving/encoding/FeatureRowDecoderTest.java new file mode 100644 index 0000000000..8f6c79ad66 --- /dev/null +++ b/serving/src/test/java/feast/serving/encoding/FeatureRowDecoderTest.java @@ -0,0 +1,110 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.serving.encoding; + +import static org.junit.Assert.*; + +import com.google.protobuf.Timestamp; +import feast.core.FeatureSetProto; +import feast.core.FeatureSetProto.FeatureSetSpec; +import feast.types.FeatureRowProto; +import feast.types.FieldProto.Field; +import feast.types.ValueProto.Value; +import feast.types.ValueProto.ValueType; +import java.util.Collections; +import org.junit.Test; + +public class FeatureRowDecoderTest { + + private FeatureSetProto.EntitySpec entity = + FeatureSetProto.EntitySpec.newBuilder().setName("entity1").build(); + + private FeatureSetSpec spec = + FeatureSetSpec.newBuilder() + .addAllEntities(Collections.singletonList(entity)) + .addFeatures( + FeatureSetProto.FeatureSpec.newBuilder() + .setName("feature1") + .setValueType(ValueType.Enum.FLOAT)) + .addFeatures( + FeatureSetProto.FeatureSpec.newBuilder() + .setName("feature2") + .setValueType(ValueType.Enum.INT32)) + .setName("feature_set_name") + .build(); + + @Test + public void featureRowWithFieldNamesIsNotConsideredAsEncoded() { + + FeatureRowDecoder decoder = new FeatureRowDecoder("feature_set_ref", spec); + FeatureRowProto.FeatureRow nonEncodedFeatureRow = + FeatureRowProto.FeatureRow.newBuilder() + .setFeatureSet("feature_set_ref") + .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) + .addFields( + Field.newBuilder().setName("feature1").setValue(Value.newBuilder().setInt32Val(2))) + .addFields( + Field.newBuilder() + .setName("feature2") + .setValue(Value.newBuilder().setFloatVal(1.0f))) + .build(); + assertFalse(decoder.isEncoded(nonEncodedFeatureRow)); + } + + @Test + public void encodingIsInvalidIfNumberOfFeaturesInSpecDiffersFromFeatureRow() { + + FeatureRowDecoder decoder = new FeatureRowDecoder("feature_set_ref", spec); + + FeatureRowProto.FeatureRow encodedFeatureRow = + FeatureRowProto.FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) + .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt32Val(2))) + .build(); + + assertFalse(decoder.isEncodingValid(encodedFeatureRow)); + } + + @Test + public void shouldDecodeValidEncodedFeatureRow() { + + FeatureRowDecoder decoder = new FeatureRowDecoder("feature_set_ref", spec); + + FeatureRowProto.FeatureRow encodedFeatureRow = + FeatureRowProto.FeatureRow.newBuilder() + .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) + .addFields(Field.newBuilder().setValue(Value.newBuilder().setInt32Val(2))) + .addFields(Field.newBuilder().setValue(Value.newBuilder().setFloatVal(1.0f))) + .build(); + + FeatureRowProto.FeatureRow expectedFeatureRow = + FeatureRowProto.FeatureRow.newBuilder() + .setFeatureSet("feature_set_ref") + .setEventTimestamp(Timestamp.newBuilder().setNanos(1000)) + .addFields( + Field.newBuilder().setName("feature1").setValue(Value.newBuilder().setInt32Val(2))) + .addFields( + Field.newBuilder() + .setName("feature2") + .setValue(Value.newBuilder().setFloatVal(1.0f))) + .build(); + + assertTrue(decoder.isEncoded(encodedFeatureRow)); + assertTrue(decoder.isEncodingValid(encodedFeatureRow)); + assertEquals(expectedFeatureRow, decoder.decode(encodedFeatureRow)); + } +}