Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EFK with transform for tabular data #616

Merged
merged 47 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
54aeda2
initial EFK setup
ryandawsonuk May 31, 2019
6674e9c
index pattern has underscore in different place and kibana env var na…
ryandawsonuk May 31, 2019
2159b9c
initial engine request/reponse logging
ryandawsonuk May 31, 2019
d960481
use newer elasticsearch
ryandawsonuk Jun 3, 2019
c763b2e
try logging as plain json
ryandawsonuk Jun 3, 2019
840f086
show filtering
ryandawsonuk Jun 3, 2019
a9c51fb
now have puid in request and response
ryandawsonuk Jun 4, 2019
9069b5b
add option to transform data
ryandawsonuk Jun 4, 2019
8d9e97d
now not dependent on double - still assumes rowmajor
ryandawsonuk Jun 5, 2019
43083a9
only log pods labeled fluentd true
ryandawsonuk Jun 8, 2019
67d5587
note for next steps
ryandawsonuk Jun 8, 2019
dd1b991
point temporarily to images in personal repo
ryandawsonuk Jun 10, 2019
5a68f4f
accessing elastic backend externally
ryandawsonuk Jun 10, 2019
7d433b2
take out nodeport elastic pending issue on upstream chart
ryandawsonuk Jun 10, 2019
33e96d3
start adding labels options into helm charts
ryandawsonuk Jun 10, 2019
8749541
setting labels from helm charts
ryandawsonuk Jun 10, 2019
40c479c
point to personal docker repo again - didn't mean to remove that
ryandawsonuk Jun 10, 2019
6c26a09
initial knative exploration
ryandawsonuk Jun 10, 2019
f27daf2
more knative notes
ryandawsonuk Jun 10, 2019
4c8f558
note on broker-trigger and getting metadata
ryandawsonuk Jun 11, 2019
0e43e58
start to separate out request-logging
ryandawsonuk Jun 11, 2019
607b950
tidy up notes
ryandawsonuk Jun 11, 2019
4adec61
trying to get kn eventing broker trigger working
ryandawsonuk Jun 13, 2019
5355040
python app that can receive an event
ryandawsonuk Jun 14, 2019
4803c23
basic trigger and log message
ryandawsonuk Jun 14, 2019
3013c02
start engine changes for external message logging
ryandawsonuk Jun 14, 2019
c089838
engine to broker setup
ryandawsonuk Jun 17, 2019
312ce45
use python impl for logger
ryandawsonuk Jun 17, 2019
33e39c7
single-node elastic
ryandawsonuk Jun 17, 2019
8a4ffae
format dict as json
ryandawsonuk Jun 17, 2019
cc9f5d6
do need to wait on knative install
ryandawsonuk Jun 17, 2019
04bdf73
reduce istio resources and note on kafka and cloud
ryandawsonuk Jun 18, 2019
68cb6ce
extra clarificatory notes
ryandawsonuk Jun 18, 2019
cde596a
note on custom fields
ryandawsonuk Jun 18, 2019
eeeb6bd
fix bug in logger and note on how to debug
ryandawsonuk Jun 18, 2019
6b5d8fb
notes on limitations of logger
ryandawsonuk Jun 18, 2019
48269eb
turn off request logging by default and add option to helm charts
ryandawsonuk Jun 18, 2019
62a1f09
Merge branch 'master' into 545-efk-transform
ryandawsonuk Jun 18, 2019
9de5d09
merge in master and fix labels on mab chart
ryandawsonuk Jun 18, 2019
8787767
add option to set engine env vars from mab chart
ryandawsonuk Jun 18, 2019
f3754d6
comments in helm charts to explain env vars
ryandawsonuk Jun 18, 2019
83e6861
minor cleanup of unit test
ryandawsonuk Jun 18, 2019
8c242b7
clean up request logger naming and check names not empty
ryandawsonuk Jun 18, 2019
1e92c03
full setup script
ryandawsonuk Jun 18, 2019
55d3eb2
option to set engine env vars from annotations
ryandawsonuk Jun 19, 2019
fa63f98
expand annotations list
ryandawsonuk Jun 19, 2019
ff03b20
further small update to docs
ryandawsonuk Jun 19, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 30 additions & 3 deletions doc/source/graph/annotations.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,43 @@ You can configure aspects of Seldon Core via annotations in the SeldonDeployment

### Service Orchestrator

* ```seldon.io/engine-java-opts``` : Java Opts for Service Orchestrator
* Locations : SeldonDeployment.spec.predictors.annotations
* [Java Opts example](model_engine_java_opts.md)
* ```seldon.io/engine-separate-pod``` : Use a separate pod for the service orchestrator
* Locations : SeldonDeployment.spec.annotations
* [Separate svc-orc pod example](model_svcorch_sep.md)
* ```seldon.io/headless-svc``` : Run main endpoint as headless kubernetes service. This is required for gRPC load balancing via Ambassador.
* Locations : SeldonDeployment.spec.annotations
* [gRPC headless example](grpc_load_balancing_ambassador.md)

Otherwise any annotations starting with `seldon.io/engine-` will be interpreted as specifying environment variables for the engine container. These include:

* ```seldon.io/engine-java-opts``` : Java Opts for Service Orchestrator
* Locations : SeldonDeployment.spec.predictors.annotations
* [Java Opts example](model_engine_java_opts.md)
* Translates to the environment variable JAVA_OPTS
* ```seldon.io/engine-log-requests``` : Whether to log raw requests from engine
* Locations : SeldonDeployment.spec.predictors.annotations
* Translates to the environment variable LOG_REQUESTS
* ```seldon.io/engine-log-responses``` : Whether to log raw responses from engine
* Locations : SeldonDeployment.spec.predictors.annotations
* Translates to the environment variable LOG_RESPONSES
* ```seldon.io/engine-log-messages-externally``` : Option to turn on logging of requests via a logging service
* Locations : SeldonDeployment.spec.predictors.annotations
* Translates to the environment variable LOG_MESSAGES_EXTERNALLY
* ```seldon.io/engine-log-message-type``` : Option to override type set on messages when sending to logging service
* Locations : SeldonDeployment.spec.predictors.annotations
* Translates to the environment variable LOG_MESSAGE_TYPE
* ```seldon.io/engine-message-logging-service``` : Option to override url to broker that sends to logging service
* Locations : SeldonDeployment.spec.predictors.annotations
* Translates to the environment variable MESSAGE_LOGGING_SERVICE

More details on logging-related variables can be seen in the [request-logging example](https://github.com/SeldonIO/seldon-core/tree/master/examples/centralised-logging/README.md).

Environment variables for the engine can also be set in the `svcOrchSpec` section of the SeldonDeployment, alongside engine resources. For examples see the helm charts or the [distributed tracing example](./distributed-tracing.md).

If both annotations and `svcOrchSpec` environment variables are used to set an environment variable for the engine container then `svcOrchSpec` environment variables take priority.

The above are the key engine env vars. For a full listing of engine env vars see the application.properties file of the engine source code.

## API OAuth Gateway Annotations
The API OAuth Gateway, if used, can also have the following annotations:

Expand Down
2 changes: 1 addition & 1 deletion doc/source/graph/distributed-tracing.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Add an environment variable: TRACING with value 1 to activate tracing.

You can utilize the default configuration by simply providing the name of the Jaeger agent service by providing JAEGER_AGENT_HOST environment variable. Override default Jaeger agent port `5775` by setting JAEGER_AGENT_PORT environment variable.

To provide a custom configuration following the Jarger Python configuration yaml defined [here](https://github.com/jaegertracing/jaeger-client-python) you can provide a configmap and the path to the YAML file in JAEGER_CONFIG_PATH environment variable.
To provide a custom configuration following the Jaeger Python configuration yaml defined [here](https://github.com/jaegertracing/jaeger-client-python) you can provide a configmap and the path to the YAML file in JAEGER_CONFIG_PATH environment variable.

An example is show below:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
public class RestClientController {

private static Logger logger = LoggerFactory.getLogger(RestClientController.class.getName());



@Autowired
private PredictionService predictionService;

Expand Down Expand Up @@ -150,8 +151,8 @@ public ResponseEntity<String> predictions(RequestEntity<String> requestEntity)
try
{
SeldonMessage response = predictionService.predict(request);
String json = ProtoBufUtils.toJson(response);
return new ResponseEntity<String>(json,HttpStatus.OK);
String responseJson = ProtoBufUtils.toJson(response);
return new ResponseEntity<String>(responseJson,HttpStatus.OK);
}
catch (InterruptedException e) {
throw new APIException(ApiExceptionType.ENGINE_INTERRUPTED,e.getMessage());
Expand Down
125 changes: 122 additions & 3 deletions engine/src/main/java/io/seldon/engine/service/PredictionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,23 @@
*******************************************************************************/
package io.seldon.engine.service;

import java.io.IOException;
import java.math.BigInteger;
import java.security.SecureRandom;
import java.time.ZonedDateTime;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.protobuf.Message;
import io.seldon.engine.pb.ProtoBufUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.HttpEntity;
import org.springframework.stereotype.Service;

import com.google.protobuf.InvalidProtocolBufferException;
Expand All @@ -35,6 +42,9 @@
import io.seldon.protos.PredictionProtos.Feedback;
import io.seldon.protos.PredictionProtos.SeldonMessage;
import io.seldon.protos.PredictionProtos.Meta;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.web.client.RestTemplate;

@Service
public class PredictionService {
Expand All @@ -49,6 +59,21 @@ public class PredictionService {

PuidGenerator puidGenerator = new PuidGenerator();

@Value("${log.requests}")
private boolean logRequests;

@Value("${log.responses}")
private boolean logResponses;

@Value("${log.feedback.requests}")
private boolean logFeedbackRequests;

@Value("${log.messages.externally}")
private boolean logMessagesExternally;

@Value("${message.logging.service}")
private String messageLoggingService;

public final class PuidGenerator {
private SecureRandom random = new SecureRandom();

Expand All @@ -62,13 +87,19 @@ public void sendFeedback(Feedback feedback) throws InterruptedException, Executi
PredictorState predictorState = predictorBean.predictorStateFromPredictorSpec(enginePredictor.getPredictorSpec());

predictorBean.sendFeedback(feedback, predictorState);

if(logFeedbackRequests) {
logMessageAsJson(feedback);
}

return;
}

public SeldonMessage predict(SeldonMessage request) throws InterruptedException, ExecutionException, InvalidProtocolBufferException
{

ZonedDateTime requestTime = ZonedDateTime.now();

if (!request.hasMeta())
{
request = request.toBuilder().setMeta(Meta.newBuilder().setPuid(puidGenerator.nextPuidId()).build()).build();
Expand All @@ -85,7 +116,95 @@ else if (StringUtils.isEmpty(request.getMeta().getPuid()))

SeldonMessage.Builder builder = SeldonMessage.newBuilder(predictorReturn).setMeta(Meta.newBuilder(predictorReturn.getMeta()).setPuid(puid));

return builder.build();
SeldonMessage response = builder.build();

//raw logging in engine, if enabled
if(logRequests){
//log json now we've added puid
logMessageAsJson(request);
}
if(logResponses){
logMessageAsJson(response);
}

//enriched logging outside engine, if enabled
if(logMessagesExternally){
ZonedDateTime responseTime = ZonedDateTime.now();
sendMessagePairAsJson(request,response,requestTime,responseTime);
}

return response;

}

private JsonNode combineRequestResponse(String request, String response, ZonedDateTime requestTime, ZonedDateTime responseTime) throws IOException {
ObjectMapper mapper = new ObjectMapper();
JsonNode requestNode = mapper.readTree(request);
JsonNode responseNode = mapper.readTree(response);
ObjectNode combined = mapper.createObjectNode();
combined.set("request",requestNode);
combined.set("response",responseNode);
((ObjectNode)combined.get("request")).set("date",mapper.readTree(mapper.writeValueAsString(requestTime.toString())));
((ObjectNode)combined.get("response")).set("date",mapper.readTree(mapper.writeValueAsString(responseTime.toString())));
String depName = System.getenv().get("DEPLOYMENT_NAME");
if(depName!=null){
combined.set("sdepName",mapper.readTree(mapper.writeValueAsString(depName)));
}

String depNamespace = System.getenv().get("DEPLOYMENT_NAMESPACE");
if(depNamespace!=null && depNamespace!=""){
combined.set("namespace",mapper.readTree(mapper.writeValueAsString(depNamespace)));
}

return combined;
}

private void sendMessagePairAsJson(SeldonMessage request, SeldonMessage response, ZonedDateTime requestTime, ZonedDateTime responseTime){
try {
String requestJson = ProtoBufUtils.toJson(request);
String responseJson = ProtoBufUtils.toJson(response);
JsonNode pair = combineRequestResponse(requestJson,responseJson,requestTime,responseTime);

MultiValueMap<String, String> headers = new LinkedMultiValueMap<String, String>();
headers.add("Content-Type", "application/json");
headers.add("X-B3-Flags", "1");
headers.add("CE-SpecVersion", "0.2");
headers.add("CE-Type", "seldon.message.pair");
headers.add("CE-Time", requestTime.toString());
headers.add("CE-EventID", request.getMeta().getPuid());

String depName = System.getenv().get("DEPLOYMENT_NAME");
if(depName!=null){
headers.add("CE-Source", "application/json");
} else{
headers.add("CE-Source", "seldon");
}

HttpEntity<?> requestBody = new HttpEntity<Object>(pair.toString(), headers);
RestTemplate restTemplate = new RestTemplate();

restTemplate.postForEntity(messageLoggingService,requestBody,String.class);

}catch (Exception ex){
logger.error("Unable to parse message",ex);
}
}

private void logMessageAsJson(Feedback message){
try {
String json = ProtoBufUtils.toJson(message);
System.out.println(json);
}catch (Exception ex){
logger.error("Unable to parse message",ex);
}
}

private void logMessageAsJson(Message message){
try {
String json = ProtoBufUtils.toJson(message);
System.out.println(json);
}catch (Exception ex){
logger.error("Unable to parse message",ex);
}
}
}
14 changes: 13 additions & 1 deletion engine/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,16 @@ spring.jmx.enabled = false

logging.file=
logging.level.root=WARN
logging.level.io.seldon=${SELDON_LOG_LEVEL:INFO}
logging.level.io.seldon=${SELDON_LOG_LEVEL:INFO}

#logging of raw requests in-engine
log.requests=${SELDON_LOG_REQUESTS:false}
log.responses=${SELDON_LOG_RESPONSES:false}
log.feedback.requests=${SELDON_LOG_FEEDBACK_REQUESTS:false}

#namespace in which deployed
deployment.namespace=${DEPLOYMENT_NAMESPACE:default}
#send request-response pairs to be processed and logged outside engine
message.logging.service=${SELDON_MESSAGE_LOGGING_SERVICE:http://default-broker.${deployment.namespace}.svc.cluster.local}
log.messages.externally=${SELDON_LOG_MESSAGES_EXTERNALLY:false}
log.message.type=${SELDON_LOG_MESSAGE_TYPE:seldon.message.pair}
37 changes: 37 additions & 0 deletions engine/src/test/java/io/seldon/engine/pb/TestJsonParse.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
package io.seldon.engine.pb;

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.ArrayList;

import com.fasterxml.jackson.databind.node.ArrayNode;
import org.junit.Assert;
import org.junit.Test;

import com.fasterxml.jackson.core.JsonFactory;
Expand All @@ -27,8 +31,12 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.primitives.Doubles;

import static java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME;

public class TestJsonParse {

String rawRequest = "{ \"meta\": { \"puid\": \"avodt6jrk9nbgomnco7nhrvpo0\", \"tags\": { }, \"routing\": { }, \"requestPath\": { }, \"metrics\": [] }, \"data\": { \"names\": [\"f0\", \"f1\"], \"ndarray\": [[0.15, 0.74]] }}";
String rawResponse = "{ \"meta\": { \"puid\": \"avodt6jrk9nbgomnco7nhrvpo0\", \"tags\": { }, \"routing\": { }, \"requestPath\": { \"classifier\": \"seldonio/mock_classifier:1.0\" }, \"metrics\": [] }, \"data\": { \"names\": [\"proba\"], \"ndarray\": [[0.07786847593954888]] }}";

@Test
public void multiDimTest() throws JsonProcessingException, IOException
Expand All @@ -49,4 +57,33 @@ public void multiDimTest() throws JsonProcessingException, IOException
((ObjectNode) j.get("request")).set("shape",mapper.valueToTree(shape));
System.out.println(j.toString());
}


private JsonNode combineRequestResponse(String request, String response, ZonedDateTime requestTime, ZonedDateTime responseTime) throws IOException {
ObjectMapper mapper = new ObjectMapper();
JsonNode requestNode = mapper.readTree(request);
JsonNode responseNode = mapper.readTree(response);
ObjectNode combined = mapper.createObjectNode();
combined.set("request",requestNode);
combined.set("response",responseNode);
((ObjectNode)combined.get("request")).set("date",mapper.readTree(mapper.writeValueAsString(requestTime.toString())));
((ObjectNode)combined.get("response")).set("date",mapper.readTree(mapper.writeValueAsString(responseTime.toString())));
String depName = System.getenv().get("DEPLOYMENT_NAME");
if(depName!=null){
combined.set("sdepName",mapper.readTree(depName));
}

return combined;
}


@Test
public void combineRequestResponse() throws JsonProcessingException, IOException
{

ZonedDateTime time = ZonedDateTime.parse("2018-04-26T14:48:09.769Z", ISO_ZONED_DATE_TIME);
JsonNode j = combineRequestResponse(rawRequest,rawResponse,time,time);
Assert.assertEquals(j.toString(),"{\"request\":{\"meta\":{\"puid\":\"avodt6jrk9nbgomnco7nhrvpo0\",\"tags\":{},\"routing\":{},\"requestPath\":{},\"metrics\":[]},\"data\":{\"names\":[\"f0\",\"f1\"],\"ndarray\":[[0.15,0.74]]},\"date\":\"2018-04-26T14:48:09.769Z\"},\"response\":{\"meta\":{\"puid\":\"avodt6jrk9nbgomnco7nhrvpo0\",\"tags\":{},\"routing\":{},\"requestPath\":{\"classifier\":\"seldonio/mock_classifier:1.0\"},\"metrics\":[]},\"data\":{\"names\":[\"proba\"],\"ndarray\":[[0.07786847593954888]]},\"date\":\"2018-04-26T14:48:09.769Z\"}}");
}

}
7 changes: 7 additions & 0 deletions engine/src/test/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,9 @@
server.port = 8081

log.requests=${SELDON_LOG_REQUESTS:true}
log.responses=${SELDON_LOG_RESPONSES:true}
log.feedback.requests=${SELDON_LOG_FEEDBACK_REQUESTS:true}

#send request-response pairs to be processed and logged outside engine
message.logging.service=${SELDON_MESSAGE_LOGGING_SERVICE:http://default-broker.default.svc.cluster.local}
log.messages.externally=${SELDON_LOG_MESSAGES_EXTERNALLY:false}
Loading