diff --git a/common-test/src/main/java/feast/common/it/DataGenerator.java b/common-test/src/main/java/feast/common/it/DataGenerator.java index 398d5b5abb..0606c75951 100644 --- a/common-test/src/main/java/feast/common/it/DataGenerator.java +++ b/common-test/src/main/java/feast/common/it/DataGenerator.java @@ -134,6 +134,19 @@ public static FeatureTableSpec createFeatureTableSpec( .build()) .collect(Collectors.toList())) .setMaxAge(Duration.newBuilder().setSeconds(3600).build()) + .setBatchSource( + DataSource.newBuilder() + .setEventTimestampColumn("ts") + .setType(DataSource.SourceType.BATCH_FILE) + .setFileOptions( + FileOptions.newBuilder() + .setFileFormat( + FileFormat.newBuilder() + .setParquetFormat(ParquetFormat.newBuilder().build()) + .build()) + .setFileUrl("/dev/null") + .build()) + .build()) .putAllLabels(labels) .build(); } diff --git a/core/src/test/java/feast/core/service/SpecServiceIT.java b/core/src/test/java/feast/core/service/SpecServiceIT.java index e6dc068ef6..8851d875ae 100644 --- a/core/src/test/java/feast/core/service/SpecServiceIT.java +++ b/core/src/test/java/feast/core/service/SpecServiceIT.java @@ -659,6 +659,7 @@ public void shouldErrorOnMissingBatchSource() { 3600, Map.of()) .toBuilder() + .clearBatchSource() .build(); StatusRuntimeException exc = diff --git a/infra/charts/feast/charts/feast-serving/templates/deployment.yaml b/infra/charts/feast/charts/feast-serving/templates/deployment.yaml index 3df799df1d..1d6e3aadc4 100644 --- a/infra/charts/feast/charts/feast-serving/templates/deployment.yaml +++ b/infra/charts/feast/charts/feast-serving/templates/deployment.yaml @@ -117,7 +117,11 @@ spec: {{- if .Values.livenessProbe.enabled }} livenessProbe: exec: - command: ["grpc-health-probe", "-addr=:{{ .Values.service.grpc.targetPort }}"] + command: + - "grpc-health-probe" + - "-addr=:{{ .Values.service.grpc.targetPort }}" + - "-connect-timeout={{ .Values.livenessProbe.timeoutSeconds }}s" + - "-rpc-timeout={{ .Values.livenessProbe.timeoutSeconds }}s" initialDelaySeconds: {{ .Values.livenessProbe.initialDelaySeconds }} periodSeconds: {{ .Values.livenessProbe.periodSeconds }} successThreshold: {{ .Values.livenessProbe.successThreshold }} @@ -128,7 +132,11 @@ spec: {{- if .Values.readinessProbe.enabled }} readinessProbe: exec: - command: ["grpc-health-probe", "-addr=:{{ .Values.service.grpc.targetPort }}"] + command: + - "grpc-health-probe" + - "-addr=:{{ .Values.service.grpc.targetPort }}" + - "-connect-timeout={{ .Values.readinessProbe.timeoutSeconds }}s" + - "-rpc-timeout={{ .Values.readinessProbe.timeoutSeconds }}s" initialDelaySeconds: {{ .Values.readinessProbe.initialDelaySeconds }} periodSeconds: {{ .Values.readinessProbe.periodSeconds }} successThreshold: {{ .Values.readinessProbe.successThreshold }} diff --git a/infra/docker/serving/Dockerfile b/infra/docker/serving/Dockerfile index 72d6bc23c8..395b344c7c 100644 --- a/infra/docker/serving/Dockerfile +++ b/infra/docker/serving/Dockerfile @@ -42,12 +42,12 @@ RUN wget -q https://github.com/grpc-ecosystem/grpc-health-probe/releases/downloa # Build stage 2: Production # ============================================================ -FROM openjdk:11-jre-slim as production +FROM amazoncorretto:11 as production ARG VERSION=dev COPY --from=builder /build/serving/target/feast-serving-$VERSION-exec.jar /opt/feast/feast-serving.jar COPY --from=builder /usr/bin/grpc-health-probe /usr/bin/grpc-health-probe CMD ["java",\ - "-Xms1024m",\ - "-Xmx1024m",\ + "-Xms1g",\ + "-Xmx4g",\ "-jar",\ "/opt/feast/feast-serving.jar"] diff --git a/serving/pom.xml b/serving/pom.xml index f70a118b21..b8f675dd30 100644 --- a/serving/pom.xml +++ b/serving/pom.xml @@ -298,13 +298,13 @@ org.testcontainers testcontainers - 1.14.3 + 1.15.1 test org.testcontainers junit-jupiter - 1.14.3 + 1.15.1 test diff --git a/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java b/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java index 0de3fea82c..70dd6f7387 100644 --- a/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java +++ b/serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java @@ -20,8 +20,6 @@ import com.google.protobuf.Duration; import feast.common.models.FeatureV2; -import feast.proto.core.FeatureProto; -import feast.proto.core.FeatureTableProto.FeatureTableSpec; import feast.proto.serving.ServingAPIProto.FeastServingType; import feast.proto.serving.ServingAPIProto.FeatureReferenceV2; import feast.proto.serving.ServingAPIProto.GetFeastServingInfoRequest; @@ -29,6 +27,7 @@ import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse; import feast.proto.types.ValueProto; +import feast.serving.exception.SpecRetrievalException; import feast.serving.specs.CachedSpecService; import feast.serving.util.Metrics; import feast.storage.api.retriever.Feature; @@ -37,7 +36,9 @@ import io.opentracing.Span; import io.opentracing.Tracer; import java.util.*; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.slf4j.Logger; public class OnlineServingServiceV2 implements ServingServiceV2 { @@ -47,6 +48,27 @@ public class OnlineServingServiceV2 implements ServingServiceV2 { private final Tracer tracer; private final OnlineRetrieverV2 retriever; + private static final HashMap + TYPE_TO_VAL_CASE = + new HashMap<>() { + { + put(ValueProto.ValueType.Enum.BYTES, ValueProto.Value.ValCase.BYTES_VAL); + put(ValueProto.ValueType.Enum.STRING, ValueProto.Value.ValCase.STRING_VAL); + put(ValueProto.ValueType.Enum.INT32, ValueProto.Value.ValCase.INT32_VAL); + put(ValueProto.ValueType.Enum.INT64, ValueProto.Value.ValCase.INT64_VAL); + put(ValueProto.ValueType.Enum.DOUBLE, ValueProto.Value.ValCase.DOUBLE_VAL); + put(ValueProto.ValueType.Enum.FLOAT, ValueProto.Value.ValCase.FLOAT_VAL); + put(ValueProto.ValueType.Enum.BOOL, ValueProto.Value.ValCase.BOOL_VAL); + put(ValueProto.ValueType.Enum.BYTES_LIST, ValueProto.Value.ValCase.BYTES_LIST_VAL); + put(ValueProto.ValueType.Enum.STRING_LIST, ValueProto.Value.ValCase.STRING_LIST_VAL); + put(ValueProto.ValueType.Enum.INT32_LIST, ValueProto.Value.ValCase.INT32_LIST_VAL); + put(ValueProto.ValueType.Enum.INT64_LIST, ValueProto.Value.ValCase.INT64_LIST_VAL); + put(ValueProto.ValueType.Enum.DOUBLE_LIST, ValueProto.Value.ValCase.DOUBLE_LIST_VAL); + put(ValueProto.ValueType.Enum.FLOAT_LIST, ValueProto.Value.ValCase.FLOAT_LIST_VAL); + put(ValueProto.ValueType.Enum.BOOL_LIST, ValueProto.Value.ValCase.BOOL_LIST_VAL); + } + }; + public OnlineServingServiceV2( OnlineRetrieverV2 retriever, CachedSpecService specService, Tracer tracer) { this.retriever = retriever; @@ -74,30 +96,22 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re } List entityRows = request.getEntityRowsList(); - // Collect the feature/entity value for each entity row in entityValueMap - Map> entityValuesMap = - entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>())); - // Collect the feature/entity status metadata for each entity row in entityValueMap - Map> - entityStatusesMap = - entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>())); - - entityRows.forEach( - entityRow -> { - Map valueMap = entityRow.getFieldsMap(); - entityValuesMap.get(entityRow).putAll(valueMap); - entityStatusesMap.get(entityRow).putAll(getMetadataMap(valueMap, false, false)); - }); + List> values = + entityRows.stream().map(r -> new HashMap<>(r.getFieldsMap())).collect(Collectors.toList()); + List> statuses = + entityRows.stream() + .map(r -> getMetadataMap(r.getFieldsMap(), false, false)) + .collect(Collectors.toList()); - Span onlineRetrievalSpan = tracer.buildSpan("onlineRetrieval").start(); - if (onlineRetrievalSpan != null) { - onlineRetrievalSpan.setTag("entities", entityRows.size()); - onlineRetrievalSpan.setTag("features", featureReferences.size()); + Span storageRetrievalSpan = tracer.buildSpan("storageRetrieval").start(); + if (storageRetrievalSpan != null) { + storageRetrievalSpan.setTag("entities", entityRows.size()); + storageRetrievalSpan.setTag("features", featureReferences.size()); } - List>> entityRowsFeatures = + List> entityRowsFeatures = retriever.getOnlineFeatures(projectName, entityRows, featureReferences); - if (onlineRetrievalSpan != null) { - onlineRetrievalSpan.finish(); + if (storageRetrievalSpan != null) { + storageRetrievalSpan.finish(); } if (entityRowsFeatures.size() != entityRows.size()) { @@ -108,37 +122,61 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re .asRuntimeException(); } + String finalProjectName = projectName; + Map featureMaxAges = + featureReferences.stream() + .distinct() + .collect( + Collectors.toMap( + Function.identity(), + ref -> specService.getFeatureTableSpec(finalProjectName, ref).getMaxAge())); + + Map featureValueTypes = + featureReferences.stream() + .distinct() + .collect( + Collectors.toMap( + Function.identity(), + ref -> { + try { + return specService.getFeatureSpec(finalProjectName, ref).getValueType(); + } catch (SpecRetrievalException e) { + return ValueProto.ValueType.Enum.INVALID; + } + })); + + Span postProcessingSpan = tracer.buildSpan("postProcessing").start(); + for (int i = 0; i < entityRows.size(); i++) { GetOnlineFeaturesRequestV2.EntityRow entityRow = entityRows.get(i); - List> curEntityRowFeatures = entityRowsFeatures.get(i); + List curEntityRowFeatures = entityRowsFeatures.get(i); - Map> featureReferenceFeatureMap = + Map featureReferenceFeatureMap = getFeatureRefFeatureMap(curEntityRowFeatures); - Map allValueMaps = new HashMap<>(); - Map allStatusMaps = new HashMap<>(); + Map rowValues = values.get(i); + Map rowStatuses = statuses.get(i); for (FeatureReferenceV2 featureReference : featureReferences) { if (featureReferenceFeatureMap.containsKey(featureReference)) { - Optional feature = featureReferenceFeatureMap.get(featureReference); + Feature feature = featureReferenceFeatureMap.get(featureReference); - FeatureTableSpec featureTableSpec = - specService.getFeatureTableSpec(projectName, feature.get().getFeatureReference()); - FeatureProto.FeatureSpecV2 featureSpec = - specService.getFeatureSpec(projectName, feature.get().getFeatureReference()); - ValueProto.ValueType.Enum valueTypeEnum = featureSpec.getValueType(); - ValueProto.Value.ValCase valueCase = feature.get().getFeatureValue().getValCase(); - boolean isMatchingFeatureSpec = checkSameFeatureSpec(valueTypeEnum, valueCase); + ValueProto.Value.ValCase valueCase = feature.getFeatureValue().getValCase(); + + boolean isMatchingFeatureSpec = + checkSameFeatureSpec(featureValueTypes.get(feature.getFeatureReference()), valueCase); + boolean isOutsideMaxAge = + checkOutsideMaxAge( + feature, entityRow, featureMaxAges.get(feature.getFeatureReference())); - boolean isOutsideMaxAge = checkOutsideMaxAge(featureTableSpec, entityRow, feature); Map valueMap = unpackValueMap(feature, isOutsideMaxAge, isMatchingFeatureSpec); - allValueMaps.putAll(valueMap); + rowValues.putAll(valueMap); // Generate metadata for feature values and merge into entityFieldsMap Map statusMap = getMetadataMap(valueMap, !isMatchingFeatureSpec, isOutsideMaxAge); - allStatusMaps.putAll(statusMap); + rowStatuses.putAll(statusMap); // Populate metrics/log request populateCountMetrics(statusMap, projectName); @@ -151,75 +189,55 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re ValueProto.Value.newBuilder().build()); } }; - allValueMaps.putAll(valueMap); + rowValues.putAll(valueMap); Map statusMap = getMetadataMap(valueMap, true, false); - allStatusMaps.putAll(statusMap); + rowStatuses.putAll(statusMap); // Populate metrics/log request populateCountMetrics(statusMap, projectName); } } - entityValuesMap.get(entityRow).putAll(allValueMaps); - entityStatusesMap.get(entityRow).putAll(allStatusMaps); } + + if (postProcessingSpan != null) { + postProcessingSpan.finish(); + } + populateHistogramMetrics(entityRows, featureReferences, projectName); populateFeatureCountMetrics(featureReferences, projectName); // Build response field values from entityValuesMap and entityStatusesMap // Response field values should be in the same order as the entityRows provided by the user. List fieldValuesList = - entityRows.stream() - .map( - entityRow -> { - return GetOnlineFeaturesResponse.FieldValues.newBuilder() - .putAllFields(entityValuesMap.get(entityRow)) - .putAllStatuses(entityStatusesMap.get(entityRow)) - .build(); - }) + IntStream.range(0, entityRows.size()) + .mapToObj( + entityRowIdx -> + GetOnlineFeaturesResponse.FieldValues.newBuilder() + .putAllFields(values.get(entityRowIdx)) + .putAllStatuses(statuses.get(entityRowIdx)) + .build()) .collect(Collectors.toList()); return GetOnlineFeaturesResponse.newBuilder().addAllFieldValues(fieldValuesList).build(); } private boolean checkSameFeatureSpec( ValueProto.ValueType.Enum valueTypeEnum, ValueProto.Value.ValCase valueCase) { - HashMap typingMap = - new HashMap<>() { - { - put(ValueProto.ValueType.Enum.BYTES, ValueProto.Value.ValCase.BYTES_VAL); - put(ValueProto.ValueType.Enum.STRING, ValueProto.Value.ValCase.STRING_VAL); - put(ValueProto.ValueType.Enum.INT32, ValueProto.Value.ValCase.INT32_VAL); - put(ValueProto.ValueType.Enum.INT64, ValueProto.Value.ValCase.INT64_VAL); - put(ValueProto.ValueType.Enum.DOUBLE, ValueProto.Value.ValCase.DOUBLE_VAL); - put(ValueProto.ValueType.Enum.FLOAT, ValueProto.Value.ValCase.FLOAT_VAL); - put(ValueProto.ValueType.Enum.BOOL, ValueProto.Value.ValCase.BOOL_VAL); - put(ValueProto.ValueType.Enum.BYTES_LIST, ValueProto.Value.ValCase.BYTES_LIST_VAL); - put(ValueProto.ValueType.Enum.STRING_LIST, ValueProto.Value.ValCase.STRING_LIST_VAL); - put(ValueProto.ValueType.Enum.INT32_LIST, ValueProto.Value.ValCase.INT32_LIST_VAL); - put(ValueProto.ValueType.Enum.INT64_LIST, ValueProto.Value.ValCase.INT64_LIST_VAL); - put(ValueProto.ValueType.Enum.DOUBLE_LIST, ValueProto.Value.ValCase.DOUBLE_LIST_VAL); - put(ValueProto.ValueType.Enum.FLOAT_LIST, ValueProto.Value.ValCase.FLOAT_LIST_VAL); - put(ValueProto.ValueType.Enum.BOOL_LIST, ValueProto.Value.ValCase.BOOL_LIST_VAL); - } - }; + if (valueTypeEnum.equals(ValueProto.ValueType.Enum.INVALID)) { + return false; + } + if (valueCase.equals(ValueProto.Value.ValCase.VAL_NOT_SET)) { return true; } - return typingMap.get(valueTypeEnum).equals(valueCase); + return TYPE_TO_VAL_CASE.get(valueTypeEnum).equals(valueCase); } - private static Map> getFeatureRefFeatureMap( - List> features) { - Map> featureReferenceFeatureMap = new HashMap<>(); - features.forEach( - feature -> { - FeatureReferenceV2 featureReference = feature.get().getFeatureReference(); - featureReferenceFeatureMap.put(featureReference, feature); - }); - - return featureReferenceFeatureMap; + private static Map getFeatureRefFeatureMap(List features) { + return features.stream() + .collect(Collectors.toMap(Feature::getFeatureReference, Function.identity())); } /** @@ -238,7 +256,7 @@ private static Map getMetadataMap return valueMap.entrySet().stream() .collect( Collectors.toMap( - es -> es.getKey(), + Map.Entry::getKey, es -> { ValueProto.Value fieldValue = es.getValue(); if (isNotFound) { @@ -253,20 +271,18 @@ private static Map getMetadataMap } private static Map unpackValueMap( - Optional feature, boolean isOutsideMaxAge, boolean isMatchingFeatureSpec) { + Feature feature, boolean isOutsideMaxAge, boolean isMatchingFeatureSpec) { Map valueMap = new HashMap<>(); - if (feature.isPresent()) { - if (!isOutsideMaxAge && isMatchingFeatureSpec) { - valueMap.put( - FeatureV2.getFeatureStringRef(feature.get().getFeatureReference()), - feature.get().getFeatureValue()); - } else { - valueMap.put( - FeatureV2.getFeatureStringRef(feature.get().getFeatureReference()), - ValueProto.Value.newBuilder().build()); - } + if (!isOutsideMaxAge && isMatchingFeatureSpec) { + valueMap.put( + FeatureV2.getFeatureStringRef(feature.getFeatureReference()), feature.getFeatureValue()); + } else { + valueMap.put( + FeatureV2.getFeatureStringRef(feature.getFeatureReference()), + ValueProto.Value.newBuilder().build()); } + return valueMap; } @@ -275,18 +291,13 @@ private static Map unpackValueMap( * maxAge to be when the difference ingestion time set in feature row and the retrieval time set * in entity row exceeds FeatureTable max age. * - * @param featureTableSpec contains the spec where feature's max age is extracted. - * @param entityRow contains the retrieval timing of when features are pulled. * @param feature contains the ingestion timing and feature data. + * @param entityRow contains the retrieval timing of when features are pulled. + * @param maxAge feature's max age. */ private static boolean checkOutsideMaxAge( - FeatureTableSpec featureTableSpec, - GetOnlineFeaturesRequestV2.EntityRow entityRow, - Optional feature) { - Duration maxAge = featureTableSpec.getMaxAge(); - if (feature.isEmpty()) { // no data to consider - return false; - } + Feature feature, GetOnlineFeaturesRequestV2.EntityRow entityRow, Duration maxAge) { + if (maxAge.equals(Duration.getDefaultInstance())) { // max age is not set return false; } @@ -295,7 +306,7 @@ private static boolean checkOutsideMaxAge( if (givenTimestamp == 0) { givenTimestamp = System.currentTimeMillis() / 1000; } - long timeDifference = givenTimestamp - feature.get().getEventTimestamp().getSeconds(); + long timeDifference = givenTimestamp - feature.getEventTimestamp().getSeconds(); return timeDifference > maxAge.getSeconds(); } @@ -335,29 +346,23 @@ private void populateHistogramMetrics( */ private void populateCountMetrics( Map statusMap, String project) { - statusMap - .entrySet() - .forEach( - es -> { - String featureRefString = es.getKey(); - GetOnlineFeaturesResponse.FieldStatus status = es.getValue(); - if (status == GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND) { - Metrics.notFoundKeyCount.labels(project, featureRefString).inc(); - } - if (status == GetOnlineFeaturesResponse.FieldStatus.OUTSIDE_MAX_AGE) { - Metrics.staleKeyCount.labels(project, featureRefString).inc(); - } - }); + statusMap.forEach( + (featureRefString, status) -> { + if (status == GetOnlineFeaturesResponse.FieldStatus.NOT_FOUND) { + Metrics.notFoundKeyCount.labels(project, featureRefString).inc(); + } + if (status == GetOnlineFeaturesResponse.FieldStatus.OUTSIDE_MAX_AGE) { + Metrics.staleKeyCount.labels(project, featureRefString).inc(); + } + }); } private void populateFeatureCountMetrics( List featureReferences, String project) { - featureReferences - .parallelStream() - .forEach( - featureReference -> - Metrics.requestFeatureCount - .labels(project, FeatureV2.getFeatureStringRef(featureReference)) - .inc()); + featureReferences.forEach( + featureReference -> + Metrics.requestFeatureCount + .labels(project, FeatureV2.getFeatureStringRef(featureReference)) + .inc()); } } diff --git a/serving/src/main/java/feast/serving/specs/CachedSpecService.java b/serving/src/main/java/feast/serving/specs/CachedSpecService.java index 85f0a1ebef..f54e08fc60 100644 --- a/serving/src/main/java/feast/serving/specs/CachedSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CachedSpecService.java @@ -16,11 +16,10 @@ */ package feast.serving.specs; -import static feast.common.models.FeatureTable.getFeatureTableStringRef; - import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import feast.proto.core.CoreServiceProto; import feast.proto.core.CoreServiceProto.ListFeatureTablesRequest; import feast.proto.core.CoreServiceProto.ListFeatureTablesResponse; import feast.proto.core.CoreServiceProto.ListProjectsRequest; @@ -57,7 +56,6 @@ public class CachedSpecService { .help("epoch time of the last time the cache was updated") .register(); - private final LoadingCache featureTableCache; private static Gauge featureTablesCount = Gauge.build() .name("feature_table_count") @@ -65,27 +63,26 @@ public class CachedSpecService { .help("number of feature tables served by this instance") .register(); + private final LoadingCache, FeatureTableSpec> featureTableCache; + private final LoadingCache< - String, Map> + ImmutablePair, FeatureProto.FeatureSpecV2> featureCache; public CachedSpecService(CoreSpecService coreService, StoreProto.Store store) { this.coreService = coreService; this.store = coreService.registerStore(store); - Map featureTables = getFeatureTableMap().getLeft(); - CacheLoader featureTableCacheLoader = - CacheLoader.from(featureTables::get); + CacheLoader, FeatureTableSpec> featureTableCacheLoader = + CacheLoader.from(k -> retrieveSingleFeatureTable(k.getLeft(), k.getRight())); featureTableCache = CacheBuilder.newBuilder().maximumSize(MAX_SPEC_COUNT).build(featureTableCacheLoader); - featureTableCache.putAll(featureTables); - Map> features = - getFeatureTableMap().getRight(); - CacheLoader> - featureCacheLoader = CacheLoader.from(features::get); + CacheLoader< + ImmutablePair, FeatureProto.FeatureSpecV2> + featureCacheLoader = + CacheLoader.from(k -> retrieveSingleFeature(k.getLeft(), k.getRight())); featureCache = CacheBuilder.newBuilder().build(featureCacheLoader); - featureCache.putAll(features); } /** @@ -102,17 +99,20 @@ public Store getStore() { * from core to preload the cache. */ public void populateCache() { - Map featureTableMap = getFeatureTableMap().getLeft(); + ImmutablePair< + HashMap, FeatureTableSpec>, + HashMap< + ImmutablePair, + FeatureProto.FeatureSpecV2>> + specs = getFeatureTableMap(); featureTableCache.invalidateAll(); - featureTableCache.putAll(featureTableMap); + featureTableCache.putAll(specs.getLeft()); featureTablesCount.set(featureTableCache.size()); - Map> featureMap = - getFeatureTableMap().getRight(); featureCache.invalidateAll(); - featureCache.putAll(featureMap); + featureCache.putAll(specs.getRight()); cacheLastUpdated.set(System.currentTimeMillis()); } @@ -131,12 +131,14 @@ public void scheduledPopulateCache() { * @return Map in the format of */ private ImmutablePair< - Map, - Map>> + HashMap, FeatureTableSpec>, + HashMap< + ImmutablePair, + FeatureProto.FeatureSpecV2>> getFeatureTableMap() { - HashMap featureTables = new HashMap<>(); - HashMap> features = - new HashMap<>(); + HashMap, FeatureTableSpec> featureTables = new HashMap<>(); + HashMap, FeatureProto.FeatureSpecV2> + features = new HashMap<>(); List projects = coreService.listProjects(ListProjectsRequest.newBuilder().build()).getProjectsList(); @@ -152,8 +154,7 @@ public void scheduledPopulateCache() { new HashMap<>(); for (FeatureTable featureTable : featureTablesResponse.getTablesList()) { FeatureTableSpec spec = featureTable.getSpec(); - // Key of Map is in the form of - featureTables.put(getFeatureTableStringRef(project, spec), spec); + featureTables.put(ImmutablePair.of(project, spec.getName()), spec); String featureTableName = spec.getName(); List featureSpecs = spec.getFeaturesList(); @@ -163,10 +164,10 @@ public void scheduledPopulateCache() { .setFeatureTable(featureTableName) .setName(featureSpec.getName()) .build(); - featureRefSpecMap.put(featureReference, featureSpec); + features.put(ImmutablePair.of(project, featureReference), featureSpec); } } - features.put(project, featureRefSpecMap); + } catch (StatusRuntimeException e) { throw new RuntimeException( String.format("Unable to retrieve specs matching project %s", project), e); @@ -175,28 +176,53 @@ public void scheduledPopulateCache() { return ImmutablePair.of(featureTables, features); } + private FeatureTableSpec retrieveSingleFeatureTable(String projectName, String tableName) { + FeatureTable table = + coreService + .getFeatureTable( + CoreServiceProto.GetFeatureTableRequest.newBuilder() + .setProject(projectName) + .setName(tableName) + .build()) + .getTable(); + return table.getSpec(); + } + + private FeatureProto.FeatureSpecV2 retrieveSingleFeature( + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) { + FeatureTableSpec featureTableSpec = + getFeatureTableSpec(projectName, featureReference); // don't stress core too much + if (featureTableSpec == null) { + return null; + } + return featureTableSpec.getFeaturesList().stream() + .filter(f -> f.getName().equals(featureReference.getName())) + .findFirst() + .orElse(null); + } + public FeatureTableSpec getFeatureTableSpec( - String project, ServingAPIProto.FeatureReferenceV2 featureReference) { - String featureTableRefStr = getFeatureTableStringRef(project, featureReference); + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) { FeatureTableSpec featureTableSpec; try { - featureTableSpec = featureTableCache.get(featureTableRefStr); - } catch (ExecutionException e) { + featureTableSpec = + featureTableCache.get(ImmutablePair.of(projectName, featureReference.getFeatureTable())); + } catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) { throw new SpecRetrievalException( - String.format("Unable to find FeatureTable with name: %s", featureTableRefStr), e); + String.format( + "Unable to find FeatureTable %s/%s", projectName, featureReference.getFeatureTable()), + e); } return featureTableSpec; } public FeatureProto.FeatureSpecV2 getFeatureSpec( - String project, ServingAPIProto.FeatureReferenceV2 featureReference) { + String projectName, ServingAPIProto.FeatureReferenceV2 featureReference) { FeatureProto.FeatureSpecV2 featureSpec; try { - Map featureRefSpecMap = - featureCache.get(project); - featureSpec = featureRefSpecMap.get(featureReference); - } catch (ExecutionException e) { + featureSpec = featureCache.get(ImmutablePair.of(projectName, featureReference)); + } catch (ExecutionException | CacheLoader.InvalidCacheLoadException e) { throw new SpecRetrievalException( String.format("Unable to find Feature with name: %s", featureReference.getName()), e); } diff --git a/serving/src/main/java/feast/serving/specs/CoreSpecService.java b/serving/src/main/java/feast/serving/specs/CoreSpecService.java index 5d1209f41e..5429d22931 100644 --- a/serving/src/main/java/feast/serving/specs/CoreSpecService.java +++ b/serving/src/main/java/feast/serving/specs/CoreSpecService.java @@ -17,6 +17,7 @@ package feast.serving.specs; import feast.proto.core.CoreServiceGrpc; +import feast.proto.core.CoreServiceProto; import feast.proto.core.CoreServiceProto.ListFeatureTablesRequest; import feast.proto.core.CoreServiceProto.ListFeatureTablesResponse; import feast.proto.core.CoreServiceProto.ListProjectsRequest; @@ -80,4 +81,9 @@ public ListFeatureTablesResponse listFeatureTables( ListFeatureTablesRequest listFeatureTablesRequest) { return blockingStub.listFeatureTables(listFeatureTablesRequest); } + + public CoreServiceProto.GetFeatureTableResponse getFeatureTable( + CoreServiceProto.GetFeatureTableRequest getFeatureTableRequest) { + return blockingStub.getFeatureTable(getFeatureTableRequest); + } } diff --git a/serving/src/test/java/feast/serving/it/ServingServiceOauthAuthorizationIT.java b/serving/src/test/java/feast/serving/it/ServingServiceOauthAuthorizationIT.java index 9bc5cbe044..64fe44b2dc 100644 --- a/serving/src/test/java/feast/serving/it/ServingServiceOauthAuthorizationIT.java +++ b/serving/src/test/java/feast/serving/it/ServingServiceOauthAuthorizationIT.java @@ -21,15 +21,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import static org.testcontainers.containers.wait.strategy.Wait.forHttp; +import feast.common.it.DataGenerator; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesRequestV2; import feast.proto.serving.ServingAPIProto.GetOnlineFeaturesResponse; import feast.proto.serving.ServingServiceGrpc.ServingServiceBlockingStub; +import feast.proto.types.ValueProto; import feast.proto.types.ValueProto.Value; import io.grpc.ManagedChannel; import io.grpc.StatusRuntimeException; import java.io.File; import java.io.IOException; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.junit.ClassRule; @@ -44,6 +47,8 @@ import org.testcontainers.containers.wait.strategy.Wait; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap; import sh.ory.keto.ApiException; @ActiveProfiles("it") @@ -126,6 +131,18 @@ static void globalSetup() throws IOException, InitializationError, InterruptedEx adminCredentials.put("grant_type", GRANT_TYPE); coreClient = AuthTestUtils.getSecureApiClientForCore(FEAST_CORE_PORT, adminCredentials); + coreClient.simpleApplyEntity( + PROJECT_NAME, + DataGenerator.createEntitySpecV2( + ENTITY_ID, "", ValueProto.ValueType.Enum.STRING, Collections.emptyMap())); + coreClient.simpleApplyFeatureTable( + PROJECT_NAME, + DataGenerator.createFeatureTableSpec( + FEATURE_TABLE_NAME, + ImmutableList.of(ENTITY_ID), + ImmutableMap.of(FEATURE_NAME, ValueProto.ValueType.Enum.STRING), + 0, + Collections.emptyMap())); } @Test diff --git a/serving/src/test/java/feast/serving/service/OnlineServingServiceTest.java b/serving/src/test/java/feast/serving/service/OnlineServingServiceTest.java index 8a6dfdffe7..539ed398e1 100644 --- a/serving/src/test/java/feast/serving/service/OnlineServingServiceTest.java +++ b/serving/src/test/java/feast/serving/service/OnlineServingServiceTest.java @@ -40,7 +40,6 @@ import io.opentracing.Tracer.SpanBuilder; import java.util.ArrayList; import java.util.List; -import java.util.Optional; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentMatchers; @@ -155,14 +154,14 @@ public void shouldReturnResponseWithValuesAndMetadataIfKeysPresent() { List.of(featureReference1, featureReference2); GetOnlineFeaturesRequestV2 request = getOnlineFeaturesRequestV2(projectName, featureReferences); - List> entityKeyList1 = new ArrayList<>(); - List> entityKeyList2 = new ArrayList<>(); - entityKeyList1.add(Optional.of(mockedFeatureRows.get(0))); - entityKeyList1.add(Optional.of(mockedFeatureRows.get(1))); - entityKeyList2.add(Optional.of(mockedFeatureRows.get(2))); - entityKeyList2.add(Optional.of(mockedFeatureRows.get(3))); + List entityKeyList1 = new ArrayList<>(); + List entityKeyList2 = new ArrayList<>(); + entityKeyList1.add(mockedFeatureRows.get(0)); + entityKeyList1.add(mockedFeatureRows.get(1)); + entityKeyList2.add(mockedFeatureRows.get(2)); + entityKeyList2.add(mockedFeatureRows.get(3)); - List>> featureRows = List.of(entityKeyList1, entityKeyList2); + List> featureRows = List.of(entityKeyList1, entityKeyList2); when(retrieverV2.getOnlineFeatures(any(), any(), any())).thenReturn(featureRows); when(specService.getFeatureTableSpec(any(), any())).thenReturn(getFeatureTableSpec()); @@ -223,13 +222,13 @@ public void shouldReturnResponseWithUnsetValuesAndMetadataIfKeysNotPresent() { List.of(featureReference1, featureReference2); GetOnlineFeaturesRequestV2 request = getOnlineFeaturesRequestV2(projectName, featureReferences); - List> entityKeyList1 = new ArrayList<>(); - List> entityKeyList2 = new ArrayList<>(); - entityKeyList1.add(Optional.of(mockedFeatureRows.get(0))); - entityKeyList1.add(Optional.of(mockedFeatureRows.get(1))); - entityKeyList2.add(Optional.of(mockedFeatureRows.get(4))); + List entityKeyList1 = new ArrayList<>(); + List entityKeyList2 = new ArrayList<>(); + entityKeyList1.add(mockedFeatureRows.get(0)); + entityKeyList1.add(mockedFeatureRows.get(1)); + entityKeyList2.add(mockedFeatureRows.get(4)); - List>> featureRows = List.of(entityKeyList1, entityKeyList2); + List> featureRows = List.of(entityKeyList1, entityKeyList2); when(retrieverV2.getOnlineFeatures(any(), any(), any())).thenReturn(featureRows); when(specService.getFeatureTableSpec(any(), any())).thenReturn(getFeatureTableSpec()); @@ -286,14 +285,14 @@ public void shouldReturnResponseWithUnsetValuesAndMetadataIfMaxAgeIsExceeded() { List.of(featureReference1, featureReference2); GetOnlineFeaturesRequestV2 request = getOnlineFeaturesRequestV2(projectName, featureReferences); - List> entityKeyList1 = new ArrayList<>(); - List> entityKeyList2 = new ArrayList<>(); - entityKeyList1.add(Optional.of(mockedFeatureRows.get(5))); - entityKeyList1.add(Optional.of(mockedFeatureRows.get(1))); - entityKeyList2.add(Optional.of(mockedFeatureRows.get(5))); - entityKeyList2.add(Optional.of(mockedFeatureRows.get(1))); + List entityKeyList1 = new ArrayList<>(); + List entityKeyList2 = new ArrayList<>(); + entityKeyList1.add(mockedFeatureRows.get(5)); + entityKeyList1.add(mockedFeatureRows.get(1)); + entityKeyList2.add(mockedFeatureRows.get(5)); + entityKeyList2.add(mockedFeatureRows.get(1)); - List>> featureRows = List.of(entityKeyList1, entityKeyList2); + List> featureRows = List.of(entityKeyList1, entityKeyList2); when(retrieverV2.getOnlineFeatures(any(), any(), any())).thenReturn(featureRows); when(specService.getFeatureTableSpec(any(), any())) diff --git a/storage/api/src/main/java/feast/storage/api/retriever/OnlineRetrieverV2.java b/storage/api/src/main/java/feast/storage/api/retriever/OnlineRetrieverV2.java index 224cf5fe44..9be66a7b1f 100644 --- a/storage/api/src/main/java/feast/storage/api/retriever/OnlineRetrieverV2.java +++ b/storage/api/src/main/java/feast/storage/api/retriever/OnlineRetrieverV2.java @@ -18,7 +18,6 @@ import feast.proto.serving.ServingAPIProto; import java.util.List; -import java.util.Optional; public interface OnlineRetrieverV2 { /** @@ -37,7 +36,7 @@ public interface OnlineRetrieverV2 { * @return list of {@link Feature}s corresponding to data retrieved for each entity row from * FeatureTable specified in FeatureTable request. */ - List>> getOnlineFeatures( + List> getOnlineFeatures( String project, List entityRows, List featureReferences); diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/common/RedisHashDecoder.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/common/RedisHashDecoder.java index 3a64538942..44f74d3f56 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/common/RedisHashDecoder.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/common/RedisHashDecoder.java @@ -36,13 +36,13 @@ public class RedisHashDecoder { * @return List of {@link Feature} * @throws InvalidProtocolBufferException */ - public static List> retrieveFeature( + public static List retrieveFeature( List> redisHashValues, Map byteToFeatureReferenceMap, String timestampPrefix) throws InvalidProtocolBufferException { - List> allFeatures = new ArrayList<>(); - Map> allFeaturesBuilderMap = + List allFeatures = new ArrayList<>(); + Map allFeaturesBuilderMap = new HashMap<>(); Map featureTableTimestampMap = new HashMap<>(); @@ -62,19 +62,19 @@ public static List> retrieveFeature( Feature.Builder featureBuilder = Feature.builder().setFeatureReference(featureReference).setFeatureValue(featureValue); - allFeaturesBuilderMap.put(featureReference, Optional.of(featureBuilder)); + allFeaturesBuilderMap.put(featureReference, featureBuilder); } } } // Add timestamp to features - for (Map.Entry> entry : + for (Map.Entry entry : allFeaturesBuilderMap.entrySet()) { String timestampRedisHashKeyStr = timestampPrefix + ":" + entry.getKey().getFeatureTable(); Timestamp curFeatureTimestamp = featureTableTimestampMap.get(timestampRedisHashKeyStr); - Feature curFeature = entry.getValue().get().setEventTimestamp(curFeatureTimestamp).build(); - allFeatures.add(Optional.of(curFeature)); + Feature curFeature = entry.getValue().setEventTimestamp(curFeatureTimestamp).build(); + allFeatures.add(curFeature); } return allFeatures; diff --git a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/OnlineRetriever.java b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/OnlineRetriever.java index d58fabb9b9..79d00240a0 100644 --- a/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/OnlineRetriever.java +++ b/storage/connectors/redis/src/main/java/feast/storage/connectors/redis/retriever/OnlineRetriever.java @@ -41,21 +41,21 @@ public OnlineRetriever(RedisClientAdapter redisClientAdapter) { } @Override - public List>> getOnlineFeatures( + public List> getOnlineFeatures( String project, List entityRows, List featureReferences) { List redisKeys = RedisKeyGenerator.buildRedisKeys(project, entityRows); - List>> features = getFeaturesFromRedis(redisKeys, featureReferences); + List> features = getFeaturesFromRedis(redisKeys, featureReferences); return features; } - private List>> getFeaturesFromRedis( + private List> getFeaturesFromRedis( List redisKeys, List featureReferences) { - List>> features = new ArrayList<>(); + List> features = new ArrayList<>(); // To decode bytes back to Feature Reference Map byteToFeatureReferenceMap = new HashMap<>(); @@ -96,7 +96,7 @@ private List>> getFeaturesFromRedis( future -> { try { List> redisValuesList = future.get(); - List> curRedisKeyFeatures = + List curRedisKeyFeatures = RedisHashDecoder.retrieveFeature( redisValuesList, byteToFeatureReferenceMap, timestampPrefix); features.add(curRedisKeyFeatures);