Skip to content

Commit

Permalink
Fix Online Serving unable to retrieve feature data after Feature Set …
Browse files Browse the repository at this point in the history
…update. (#908)

* Update RedisCustomIO to write FeatureRows with field's name set to hash of field.

* Update FeatureRowDecoder to decode by name hash instead of order

* Bump pytest order numbers by 2 to make space for new tests

* Revert "Bump pytest order numbers by 2 to make space for new tests"

This reverts commit aecc9a6e9a70be3fd84d04f81442b518be01a4c6.

* Added e2e to check that feature rows with missing or extra fields can be retrieved

* Clarify docs about Feature Row v1 encoding and Feature Row v2 encoding

* Fix python lint

* Update FeatureRowDecoder's isEncodedV2 check to use anyMatch()

* Make missing field/extra field e2e tests independent of other tests.

* Update FeatureRowDecoder if/else statement into 2 ifs

* Fix python and java lint

* Fix java unit test failures

* Fix ImportJobTest java unit test

* Sync github workflows with master

* Sync .github folder with master for fix

* Replace v1/v2 encoding with v1/v2 decoder in docs
  • Loading branch information
mrzzy authored and Oleksii Moskalenko committed Aug 2, 2020
1 parent a466f64 commit ae29ba9
Show file tree
Hide file tree
Showing 11 changed files with 404 additions and 96 deletions.
4 changes: 3 additions & 1 deletion ingestion/src/test/java/feast/ingestion/ImportJobTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,9 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow()
.map(FeatureSpec::getName)
.collect(Collectors.toList())
.contains(field.getName()))
.map(field -> field.toBuilder().clearName().build())
.map(
field ->
field.toBuilder().setName(TestUtil.hash(field.getName())).build())
.collect(Collectors.toList());
randomRow =
randomRow
Expand Down
5 changes: 5 additions & 0 deletions ingestion/src/test/java/feast/test/TestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static feast.common.models.FeatureSet.getFeatureSetStringRef;

import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
import com.google.common.io.Files;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
Expand Down Expand Up @@ -517,4 +518,8 @@ public static void waitUntilAllElementsAreWrittenToStore(
}
}
}

public static String hash(String input) {
return Hashing.murmur3_32().hashString(input, StandardCharsets.UTF_8).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.storage.common.testing;

import com.google.common.hash.Hashing;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
import feast.proto.core.FeatureSetProto.FeatureSet;
Expand All @@ -24,6 +25,7 @@
import feast.proto.types.FeatureRowProto.FeatureRow.Builder;
import feast.proto.types.FieldProto.Field;
import feast.proto.types.ValueProto.*;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang3.RandomStringUtils;
Expand Down Expand Up @@ -191,4 +193,8 @@ public static Field field(String name, Object value, ValueType.Enum valueType) {
throw new IllegalStateException("Unexpected valueType: " + value.getClass());
}
}

public static String hash(String input) {
return Hashing.murmur3_32().hashString(input, StandardCharsets.UTF_8).toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@
*/
package feast.storage.connectors.redis.retriever;

import com.google.common.hash.Hashing;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
import feast.proto.core.FeatureSetProto.FeatureSpec;
import feast.proto.types.FeatureRowProto.FeatureRow;
import feast.proto.types.FieldProto.Field;
import feast.proto.types.ValueProto.Value;
import feast.storage.connectors.redis.writer.RedisCustomIO;
import java.nio.charset.StandardCharsets;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

Expand All @@ -36,60 +41,102 @@ public FeatureRowDecoder(String featureSetRef, FeatureSetSpec spec) {
}

/**
* A feature row is considered encoded if the feature set and field names are not set. This method
* is required for backward compatibility purposes, to allow Feast serving to continue serving non
* encoded Feature Row ingested by an older version of Feast.
* Check if encoded feature row can be decoded by v1 Decoder. The v1 Decoder requires that the
* Feature Row to have both it's feature set reference and fields names are not set. The no. of
* fields in the feature row should also match up with the number of fields in the Feature Set
* spec. NOTE: This method is deprecated and will be removed in Feast v0.7.
*
* @param featureRow Feature row
* @return boolean
*/
public boolean isEncoded(FeatureRow featureRow) {
@Deprecated
private boolean isEncodedV1(FeatureRow featureRow) {
return featureRow.getFeatureSet().isEmpty()
&& featureRow.getFieldsList().stream().allMatch(field -> field.getName().isEmpty());
&& featureRow.getFieldsList().stream().allMatch(field -> field.getName().isEmpty())
&& featureRow.getFieldsList().size() == spec.getFeaturesList().size();
}

/**
* Validates if an encoded feature row can be decoded without exception.
* Check if encoded feature row can be decoded by Decoder. The v2 Decoder requires that a Feature
* Row to have both it feature set reference and fields names are set.
*
* @param featureRow Feature row
* @return boolean
*/
public boolean isEncodingValid(FeatureRow featureRow) {
return featureRow.getFieldsList().size() == spec.getFeaturesList().size();
private boolean isEncodedV2(FeatureRow featureRow) {
return !featureRow.getFieldsList().stream().anyMatch(field -> field.getName().isEmpty());
}

/**
* Decoding feature row by repopulating the field names based on the corresponding feature set
* spec.
* Decode feature row encoded by {@link RedisCustomIO}. NOTE: The v1 Decoder will be removed in
* Feast 0.7
*
* @throws IllegalArgumentException if unable to the decode the given feature row
* @param encodedFeatureRow Feature row
* @return boolean
*/
public FeatureRow decode(FeatureRow encodedFeatureRow) {
final List<Field> fieldsWithoutName = encodedFeatureRow.getFieldsList();
if (isEncodedV1(encodedFeatureRow)) {
// TODO: remove v1 feature row decoder in Feast 0.7
// Decode Feature Rows using the v1 Decoder.
final List<Field> fieldsWithoutName = encodedFeatureRow.getFieldsList();
List<String> featureNames =
spec.getFeaturesList().stream()
.sorted(Comparator.comparing(FeatureSpec::getName))
.map(FeatureSpec::getName)
.collect(Collectors.toList());

List<String> featureNames =
spec.getFeaturesList().stream()
.sorted(Comparator.comparing(FeatureSpec::getName))
.map(FeatureSpec::getName)
.collect(Collectors.toList());
List<Field> fields =
IntStream.range(0, featureNames.size())
.mapToObj(
featureNameIndex -> {
String featureName = featureNames.get(featureNameIndex);
return fieldsWithoutName
.get(featureNameIndex)
.toBuilder()
.setName(featureName)
.build();
})
.collect(Collectors.toList());
return encodedFeatureRow
.toBuilder()
.clearFields()
.setFeatureSet(featureSetRef)
.addAllFields(fields)
.build();
List<Field> fields =
IntStream.range(0, featureNames.size())
.mapToObj(
featureNameIndex -> {
String featureName = featureNames.get(featureNameIndex);
return fieldsWithoutName
.get(featureNameIndex)
.toBuilder()
.setName(featureName)
.build();
})
.collect(Collectors.toList());

return encodedFeatureRow
.toBuilder()
.clearFields()
.setFeatureSet(featureSetRef)
.addAllFields(fields)
.build();
}
if (isEncodedV2(encodedFeatureRow)) {
// Decode Feature Rows using the v2 Decoder.
// v2 Decoder input Feature Rows should use a hashed name as the field name and
// should not have feature set reference set.
// Decoding reverts the field name to a unhashed string and set feature set reference.
Map<String, Value> nameHashValueMap =
encodedFeatureRow.getFieldsList().stream()
.collect(Collectors.toMap(field -> field.getName(), field -> field.getValue()));

List<String> featureNames =
spec.getFeaturesList().stream().map(FeatureSpec::getName).collect(Collectors.toList());

List<Field> fields =
featureNames.stream()
.map(
name -> {
String nameHash =
Hashing.murmur3_32().hashString(name, StandardCharsets.UTF_8).toString();
Value value =
nameHashValueMap.getOrDefault(nameHash, Value.newBuilder().build());
return Field.newBuilder().setName(name).setValue(value).build();
})
.collect(Collectors.toList());

return encodedFeatureRow
.toBuilder()
.clearFields()
.setFeatureSet(featureSetRef)
.addAllFields(fields)
.build();
}
throw new IllegalArgumentException("Failed to decode FeatureRow row: Possible data corruption");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,11 @@ private List<Optional<FeatureRow>> getFeaturesFromRedis(

// decode feature rows from data bytes using decoder.
FeatureRow featureRow = FeatureRow.parseFrom(featureRowBytes);
if (decoder.isEncoded(featureRow)) {
if (decoder.isEncodingValid(featureRow)) {
featureRow = decoder.decode(featureRow);
} else {
// decoding feature row failed: data corruption could have occurred
throw Status.DATA_LOSS
.withDescription(
"Failed to decode FeatureRow from bytes retrieved from redis"
+ ": Possible data corruption")
.asRuntimeException();
}
try {
featureRow = decoder.decode(featureRow);
} catch (IllegalArgumentException e) {
// decoding feature row failed: data corruption could have occurred
throw Status.DATA_LOSS.withCause(e).withDescription(e.getMessage()).asRuntimeException();
}
featureRows.add(Optional.of(featureRow));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,11 @@ private List<Optional<FeatureRow>> getFeaturesFromRedis(

// decode feature rows from data bytes using decoder.
FeatureRow featureRow = FeatureRow.parseFrom(featureRowBytes);
if (decoder.isEncoded(featureRow) && decoder.isEncodingValid(featureRow)) {
try {
featureRow = decoder.decode(featureRow);
} else {
} catch (IllegalArgumentException e) {
// decoding feature row failed: data corruption could have occurred
throw Status.DATA_LOSS
.withDescription(
"Failed to decode FeatureRow from bytes retrieved from redis"
+ ": Possible data corruption")
.asRuntimeException();
throw Status.DATA_LOSS.withCause(e).withDescription(e.getMessage()).asRuntimeException();
}
featureRows.add(Optional.of(featureRow));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.hash.Hashing;
import feast.proto.core.FeatureSetProto.EntitySpec;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
import feast.proto.core.FeatureSetProto.FeatureSpec;
Expand All @@ -29,7 +30,9 @@
import feast.storage.api.writer.FailedElement;
import feast.storage.api.writer.WriteResult;
import feast.storage.common.retry.Retriable;
import feast.storage.connectors.redis.retriever.FeatureRowDecoder;
import io.lettuce.core.RedisException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -203,28 +206,45 @@ private byte[] getKey(FeatureRow featureRow, FeatureSetSpec spec) {
return redisKeyBuilder.build().toByteArray();
}

/**
* Encode the Feature Row as bytes to store in Redis in encoded Feature Row encoding. To
* reduce storage space consumption in redis, feature rows are "encoded" by hashing the fields
* names and not unsetting the feature set reference. {@link FeatureRowDecoder} is
* rensponsible for reversing this "encoding" step.
*/
private byte[] getValue(FeatureRow featureRow, FeatureSetSpec spec) {
List<String> featureNames =
spec.getFeaturesList().stream().map(FeatureSpec::getName).collect(Collectors.toList());
Map<String, Field> fieldValueOnlyMap =

Map<String, Field.Builder> fieldValueOnlyMap =
featureRow.getFieldsList().stream()
.filter(field -> featureNames.contains(field.getName()))
.distinct()
.collect(
Collectors.toMap(
Field::getName,
field -> Field.newBuilder().setValue(field.getValue()).build()));
Field::getName, field -> Field.newBuilder().setValue(field.getValue())));

List<Field> values =
featureNames.stream()
.sorted()
.map(
featureName ->
fieldValueOnlyMap.getOrDefault(
featureName,
Field.newBuilder()
.setValue(ValueProto.Value.getDefaultInstance())
.build()))
featureName -> {
Field.Builder field =
fieldValueOnlyMap.getOrDefault(
featureName,
Field.newBuilder().setValue(ValueProto.Value.getDefaultInstance()));

// Encode the name of the as the hash of the field name.
// Use hash of name instead of the name of to reduce redis storage consumption
// per feature row stored.
String nameHash =
Hashing.murmur3_32()
.hashString(featureName, StandardCharsets.UTF_8)
.toString();
field.setName(nameHash);

return field.build();
})
.collect(Collectors.toList());

return FeatureRow.newBuilder()
Expand Down
Loading

0 comments on commit ae29ba9

Please sign in to comment.