Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply grpc tracing interceptor on online serving #1242

Merged
merged 1 commit into from
Dec 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions serving/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -155,21 +155,26 @@
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
<!--compile 'io.jaegertracing:jaeger-client:0.31.0'-->
<!--compile 'io.jaegertracing:jaeger-client:1.3.2'-->
<dependency>
<groupId>io.jaegertracing</groupId>
<artifactId>jaeger-client</artifactId>
<version>0.31.0</version>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-api</artifactId>
<version>0.31.0</version>
<version>0.33.0</version>
</dependency>
<dependency>
<groupId>io.opentracing</groupId>
<artifactId>opentracing-noop</artifactId>
<version>0.31.0</version>
<version>0.33.0</version>
</dependency>
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-grpc</artifactId>
<version>0.2.3</version>
</dependency>

<!-- The client -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package feast.serving.config;

import io.opentracing.Tracer;
import io.opentracing.contrib.grpc.TracingServerInterceptor;
import io.opentracing.noop.NoopTracerFactory;
import io.prometheus.client.exporter.MetricsServlet;
import io.prometheus.client.hotspot.DefaultExports;
Expand Down Expand Up @@ -54,4 +55,9 @@ public Tracer tracer() {
return io.jaegertracing.Configuration.fromEnv(feastProperties.getTracing().getServiceName())
.getTracer();
}

@Bean
public TracingServerInterceptor tracingInterceptor(Tracer tracer) {
return TracingServerInterceptor.newBuilder().withTracer(tracer).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,21 @@
import feast.serving.util.RequestHelper;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.contrib.grpc.TracingServerInterceptor;
import net.devh.boot.grpc.server.service.GrpcService;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.access.AccessDeniedException;
import org.springframework.security.core.context.SecurityContextHolder;

@GrpcService(interceptors = {GrpcMessageInterceptor.class, GrpcMonitoringInterceptor.class})
@GrpcService(
interceptors = {
TracingServerInterceptor.class,
GrpcMessageInterceptor.class,
GrpcMonitoringInterceptor.class
})
public class ServingServiceGRpcController extends ServingServiceImplBase {

private static final Logger log =
Expand Down Expand Up @@ -75,16 +80,19 @@ public void getFeastServingInfo(
public void getOnlineFeaturesV2(
ServingAPIProto.GetOnlineFeaturesRequestV2 request,
StreamObserver<GetOnlineFeaturesResponse> responseObserver) {
Span span = tracer.buildSpan("getOnlineFeaturesV2").start();
try (Scope scope = tracer.scopeManager().activate(span, false)) {
try {
// authorize for the project in request object.
if (request.getProject() != null && !request.getProject().isEmpty()) {
// project set at root level overrides the project set at feature table level
this.authorizationService.authorizeRequest(
SecurityContextHolder.getContext(), request.getProject());
}
RequestHelper.validateOnlineRequest(request);
Span span = tracer.buildSpan("getOnlineFeaturesV2").start();
GetOnlineFeaturesResponse onlineFeatures = servingServiceV2.getOnlineFeatures(request);
if (span != null) {
span.finish();
}
responseObserver.onNext(onlineFeatures);
responseObserver.onCompleted();
} catch (SpecRetrievalException e) {
Expand All @@ -102,6 +110,5 @@ public void getOnlineFeaturesV2(
log.warn("Failed to get Online Features", e);
responseObserver.onError(e);
}
span.finish();
}
}
194 changes: 100 additions & 94 deletions serving/src/main/java/feast/serving/service/OnlineServingServiceV2.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import feast.storage.api.retriever.Feature;
import feast.storage.api.retriever.OnlineRetrieverV2;
import io.grpc.Status;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import java.util.*;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -71,105 +71,111 @@ public GetOnlineFeaturesResponse getOnlineFeatures(GetOnlineFeaturesRequestV2 re
projectName = "default";
}

try (Scope scope = tracer.buildSpan("getOnlineFeaturesV2").startActive(true)) {
List<GetOnlineFeaturesRequestV2.EntityRow> entityRows = request.getEntityRowsList();
// Collect the feature/entity value for each entity row in entityValueMap
Map<GetOnlineFeaturesRequestV2.EntityRow, Map<String, ValueProto.Value>> entityValuesMap =
entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>()));
// Collect the feature/entity status metadata for each entity row in entityValueMap
Map<GetOnlineFeaturesRequestV2.EntityRow, Map<String, GetOnlineFeaturesResponse.FieldStatus>>
entityStatusesMap =
entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>()));

entityRows.forEach(
entityRow -> {
Map<String, ValueProto.Value> valueMap = entityRow.getFieldsMap();
entityValuesMap.get(entityRow).putAll(valueMap);
entityStatusesMap.get(entityRow).putAll(getMetadataMap(valueMap, false, false));
});

List<List<Optional<Feature>>> entityRowsFeatures =
retriever.getOnlineFeatures(projectName, entityRows, featureReferences);

if (entityRowsFeatures.size() != entityRows.size()) {
throw Status.INTERNAL
.withDescription(
"The no. of FeatureRow obtained from OnlineRetriever"
+ "does not match no. of entityRow passed.")
.asRuntimeException();
}
List<GetOnlineFeaturesRequestV2.EntityRow> entityRows = request.getEntityRowsList();
// Collect the feature/entity value for each entity row in entityValueMap
Map<GetOnlineFeaturesRequestV2.EntityRow, Map<String, ValueProto.Value>> entityValuesMap =
entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>()));
// Collect the feature/entity status metadata for each entity row in entityValueMap
Map<GetOnlineFeaturesRequestV2.EntityRow, Map<String, GetOnlineFeaturesResponse.FieldStatus>>
entityStatusesMap =
entityRows.stream().collect(Collectors.toMap(row -> row, row -> new HashMap<>()));

entityRows.forEach(
entityRow -> {
Map<String, ValueProto.Value> valueMap = entityRow.getFieldsMap();
entityValuesMap.get(entityRow).putAll(valueMap);
entityStatusesMap.get(entityRow).putAll(getMetadataMap(valueMap, false, false));
});

for (int i = 0; i < entityRows.size(); i++) {
GetOnlineFeaturesRequestV2.EntityRow entityRow = entityRows.get(i);
List<Optional<Feature>> curEntityRowFeatures = entityRowsFeatures.get(i);

Map<FeatureReferenceV2, Optional<Feature>> featureReferenceFeatureMap =
getFeatureRefFeatureMap(curEntityRowFeatures);

Map<String, ValueProto.Value> allValueMaps = new HashMap<>();
Map<String, GetOnlineFeaturesResponse.FieldStatus> allStatusMaps = new HashMap<>();

for (FeatureReferenceV2 featureReference : featureReferences) {
if (featureReferenceFeatureMap.containsKey(featureReference)) {
Optional<Feature> feature = featureReferenceFeatureMap.get(featureReference);

FeatureTableSpec featureTableSpec =
specService.getFeatureTableSpec(projectName, feature.get().getFeatureReference());
FeatureProto.FeatureSpecV2 featureSpec =
specService.getFeatureSpec(projectName, feature.get().getFeatureReference());
ValueProto.ValueType.Enum valueTypeEnum = featureSpec.getValueType();
ValueProto.Value.ValCase valueCase = feature.get().getFeatureValue().getValCase();
boolean isMatchingFeatureSpec = checkSameFeatureSpec(valueTypeEnum, valueCase);

boolean isOutsideMaxAge = checkOutsideMaxAge(featureTableSpec, entityRow, feature);
Map<String, ValueProto.Value> valueMap =
unpackValueMap(feature, isOutsideMaxAge, isMatchingFeatureSpec);
allValueMaps.putAll(valueMap);

// Generate metadata for feature values and merge into entityFieldsMap
Map<String, GetOnlineFeaturesResponse.FieldStatus> statusMap =
getMetadataMap(valueMap, !isMatchingFeatureSpec, isOutsideMaxAge);
allStatusMaps.putAll(statusMap);

// Populate metrics/log request
populateCountMetrics(statusMap, projectName);
} else {
Map<String, ValueProto.Value> valueMap =
new HashMap<>() {
{
put(
FeatureV2.getFeatureStringRef(featureReference),
ValueProto.Value.newBuilder().build());
}
};
allValueMaps.putAll(valueMap);
Span onlineRetrievalSpan = tracer.buildSpan("onlineRetrieval").start();
if (onlineRetrievalSpan != null) {
onlineRetrievalSpan.setTag("entities", entityRows.size());
onlineRetrievalSpan.setTag("features", featureReferences.size());
}
List<List<Optional<Feature>>> entityRowsFeatures =
retriever.getOnlineFeatures(projectName, entityRows, featureReferences);
if (onlineRetrievalSpan != null) {
onlineRetrievalSpan.finish();
}

Map<String, GetOnlineFeaturesResponse.FieldStatus> statusMap =
getMetadataMap(valueMap, true, false);
allStatusMaps.putAll(statusMap);
if (entityRowsFeatures.size() != entityRows.size()) {
throw Status.INTERNAL
.withDescription(
"The no. of FeatureRow obtained from OnlineRetriever"
+ "does not match no. of entityRow passed.")
.asRuntimeException();
}

// Populate metrics/log request
populateCountMetrics(statusMap, projectName);
}
for (int i = 0; i < entityRows.size(); i++) {
GetOnlineFeaturesRequestV2.EntityRow entityRow = entityRows.get(i);
List<Optional<Feature>> curEntityRowFeatures = entityRowsFeatures.get(i);

Map<FeatureReferenceV2, Optional<Feature>> featureReferenceFeatureMap =
getFeatureRefFeatureMap(curEntityRowFeatures);

Map<String, ValueProto.Value> allValueMaps = new HashMap<>();
Map<String, GetOnlineFeaturesResponse.FieldStatus> allStatusMaps = new HashMap<>();

for (FeatureReferenceV2 featureReference : featureReferences) {
if (featureReferenceFeatureMap.containsKey(featureReference)) {
Optional<Feature> feature = featureReferenceFeatureMap.get(featureReference);

FeatureTableSpec featureTableSpec =
specService.getFeatureTableSpec(projectName, feature.get().getFeatureReference());
FeatureProto.FeatureSpecV2 featureSpec =
specService.getFeatureSpec(projectName, feature.get().getFeatureReference());
ValueProto.ValueType.Enum valueTypeEnum = featureSpec.getValueType();
ValueProto.Value.ValCase valueCase = feature.get().getFeatureValue().getValCase();
boolean isMatchingFeatureSpec = checkSameFeatureSpec(valueTypeEnum, valueCase);

boolean isOutsideMaxAge = checkOutsideMaxAge(featureTableSpec, entityRow, feature);
Map<String, ValueProto.Value> valueMap =
unpackValueMap(feature, isOutsideMaxAge, isMatchingFeatureSpec);
allValueMaps.putAll(valueMap);

// Generate metadata for feature values and merge into entityFieldsMap
Map<String, GetOnlineFeaturesResponse.FieldStatus> statusMap =
getMetadataMap(valueMap, !isMatchingFeatureSpec, isOutsideMaxAge);
allStatusMaps.putAll(statusMap);

// Populate metrics/log request
populateCountMetrics(statusMap, projectName);
} else {
Map<String, ValueProto.Value> valueMap =
new HashMap<>() {
{
put(
FeatureV2.getFeatureStringRef(featureReference),
ValueProto.Value.newBuilder().build());
}
};
allValueMaps.putAll(valueMap);

Map<String, GetOnlineFeaturesResponse.FieldStatus> statusMap =
getMetadataMap(valueMap, true, false);
allStatusMaps.putAll(statusMap);

// Populate metrics/log request
populateCountMetrics(statusMap, projectName);
}
entityValuesMap.get(entityRow).putAll(allValueMaps);
entityStatusesMap.get(entityRow).putAll(allStatusMaps);
}

// Build response field values from entityValuesMap and entityStatusesMap
// Response field values should be in the same order as the entityRows provided by the user.
List<GetOnlineFeaturesResponse.FieldValues> fieldValuesList =
entityRows.stream()
.map(
entityRow -> {
return GetOnlineFeaturesResponse.FieldValues.newBuilder()
.putAllFields(entityValuesMap.get(entityRow))
.putAllStatuses(entityStatusesMap.get(entityRow))
.build();
})
.collect(Collectors.toList());
return GetOnlineFeaturesResponse.newBuilder().addAllFieldValues(fieldValuesList).build();
entityValuesMap.get(entityRow).putAll(allValueMaps);
entityStatusesMap.get(entityRow).putAll(allStatusMaps);
}

// Build response field values from entityValuesMap and entityStatusesMap
// Response field values should be in the same order as the entityRows provided by the user.
List<GetOnlineFeaturesResponse.FieldValues> fieldValuesList =
entityRows.stream()
.map(
entityRow -> {
return GetOnlineFeaturesResponse.FieldValues.newBuilder()
.putAllFields(entityValuesMap.get(entityRow))
.putAllStatuses(entityStatusesMap.get(entityRow))
.build();
})
.collect(Collectors.toList());
return GetOnlineFeaturesResponse.newBuilder().addAllFieldValues(fieldValuesList).build();
}

private boolean checkSameFeatureSpec(
Expand Down