Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Annotations to allow customizations #197

Merged
merged 10 commits into from
Aug 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.seldon.apife.config;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
*
* @author clive
* Utility class to load annotations from kubernetes file for use by other components as config during startup
*
*/
@Component
public class AnnotationsConfig {
protected static Logger logger = LoggerFactory.getLogger(AnnotationsConfig.class.getName());
final String ANNOTATIONS_FILE = "/etc/podinfo/annotations";

final Map<String,String> annotations = new ConcurrentHashMap<>();

String readFile(String path, Charset encoding) throws IOException
{
byte[] encoded = Files.readAllBytes(Paths.get(path));
return new String(encoded, encoding);
}

public AnnotationsConfig() throws IOException
{
loadAnnotations();
}

private void processAnnotation(String line)
{
final String[] parts = line.split("=");
if (parts.length == 2)
{
final String value = parts[1].substring(1, parts[1].length()-1); //remove start and end quote
annotations.put(parts[0], value);
}
else
logger.warn("Failed to parse annotation {}",line);
}

private void loadAnnotations()
{
try
{
File f = new File(ANNOTATIONS_FILE);
if(f.exists() && !f.isDirectory())
{
try (BufferedReader r = Files.newBufferedReader(Paths.get(ANNOTATIONS_FILE), StandardCharsets.UTF_8))
{
r.lines().forEach(this::processAnnotation);
}
}
} catch (IOException e) {
logger.error("Failed to load annotations file "+ANNOTATIONS_FILE,e);
}
logger.info("Annotations {}",annotations);
}

public boolean has(String key)
{
return annotations.containsKey(key);
}

public String get(String key)
{
return annotations.get(key);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,23 @@
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptors;
import io.grpc.netty.NettyServerBuilder;
import io.seldon.apife.AppProperties;
import io.seldon.apife.api.oauth.InMemoryClientDetailsService;
import io.seldon.apife.config.AnnotationsConfig;
import io.seldon.apife.deployments.DeploymentStore;
import io.seldon.apife.deployments.DeploymentsHandler;
import io.seldon.apife.deployments.DeploymentsListener;
import io.seldon.apife.exception.SeldonAPIException;
import io.seldon.apife.k8s.DeploymentWatcher;
import io.seldon.protos.DeploymentProtos.DeploymentSpec;
import io.seldon.protos.DeploymentProtos.Endpoint;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

@Component
public class SeldonGrpcServer {
protected static Logger logger = LoggerFactory.getLogger(SeldonGrpcServer.class.getName());

public static final int SERVER_PORT = 5000;
private final String ANNOTATION_MAX_MESSAGE_SIZE = "seldon.io/grpc-max-message-size";

private final int port;
private final Server server;
Expand All @@ -60,19 +61,21 @@ public class SeldonGrpcServer {
private final grpcDeploymentsListener grpcDeploymentsListener;
private final DeploymentsHandler deploymentsHandler;

private int maxMessageSize = io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;

@Autowired
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,DeploymentsHandler deploymentsHandler)
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,DeploymentsHandler deploymentsHandler,AnnotationsConfig annotations)
{
this(appProperties,deploymentStore,tokenStore,deploymentsHandler,SERVER_PORT);
this(appProperties,deploymentStore,tokenStore,deploymentsHandler,annotations,SERVER_PORT);
}

public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,DeploymentsHandler deploymentsHandler,int port)
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,DeploymentsHandler deploymentsHandler,AnnotationsConfig annotations,int port)
{
this(appProperties,deploymentStore,tokenStore,ServerBuilder.forPort(port), deploymentsHandler, port);
this(appProperties,deploymentStore,tokenStore,ServerBuilder.forPort(port), deploymentsHandler, annotations, port);
}


public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,ServerBuilder<?> serverBuilder,DeploymentsHandler deploymentsHandler, int port)
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,ServerBuilder<?> serverBuilder,DeploymentsHandler deploymentsHandler, AnnotationsConfig annotations, int port)
{
this.appProperties = appProperties;
this.deploymentStore = deploymentStore;
Expand All @@ -81,9 +84,23 @@ public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentSt
this.deploymentsHandler = deploymentsHandler;
deploymentsHandler.addListener(this.grpcDeploymentsListener);
this.port = port;
server = serverBuilder
.addService(ServerInterceptors.intercept(new SeldonService(this), new HeaderServerInterceptor(this)))
.build();
NettyServerBuilder builder = NettyServerBuilder
.forPort(port)
.addService(ServerInterceptors.intercept(new SeldonService(this), new HeaderServerInterceptor(this)));
if (annotations != null && annotations.has(ANNOTATION_MAX_MESSAGE_SIZE))
{
try
{
maxMessageSize =Integer.parseInt(annotations.get(ANNOTATION_MAX_MESSAGE_SIZE));
logger.info("Setting max message to {}",maxMessageSize);
builder.maxMessageSize(maxMessageSize);
}
catch(NumberFormatException e)
{
logger.warn("Failed to parse {} with value {}",ANNOTATION_MAX_MESSAGE_SIZE,annotations.get(ANNOTATION_MAX_MESSAGE_SIZE),e);
}
}
server = builder.build();
}

@PostConstruct
Expand Down Expand Up @@ -190,7 +207,7 @@ public static void main(String[] args) throws Exception {
AppProperties appProperties = new AppProperties();
appProperties.setEngineGrpcContainerPort(5000);
store.deploymentAdded(dep);
SeldonGrpcServer server = new SeldonGrpcServer(appProperties,store,null,null,SERVER_PORT);
SeldonGrpcServer server = new SeldonGrpcServer(appProperties,store,null,null,null,SERVER_PORT);
server.start();
server.blockUntilShutdown();
}
Expand All @@ -203,5 +220,11 @@ public void deploymentAdded(SeldonDeployment resource) {
public void deploymentRemoved(SeldonDeployment resource) {
channelStore.remove(resource.getSpec().getOauthKey());
}

public int getMaxMessageSize() {
return maxMessageSize;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,25 @@
import io.kubernetes.client.proto.Resource.Quantity;
import io.kubernetes.client.proto.V1;
import io.kubernetes.client.proto.V1.ContainerPort;
import io.kubernetes.client.proto.V1.DownwardAPIVolumeFile;
import io.kubernetes.client.proto.V1.DownwardAPIVolumeSource;
import io.kubernetes.client.proto.V1.EnvVar;
import io.kubernetes.client.proto.V1.ExecAction;
import io.kubernetes.client.proto.V1.HTTPGetAction;
import io.kubernetes.client.proto.V1.Handler;
import io.kubernetes.client.proto.V1.Lifecycle;
import io.kubernetes.client.proto.V1.ObjectFieldSelector;
import io.kubernetes.client.proto.V1.PodSecurityContext;
import io.kubernetes.client.proto.V1.PodTemplateSpec;
import io.kubernetes.client.proto.V1.Probe;
import io.kubernetes.client.proto.V1.SecurityContext;
import io.kubernetes.client.proto.V1.Service;
import io.kubernetes.client.proto.V1.ServicePort;
import io.kubernetes.client.proto.V1.ServiceSpec;
import io.kubernetes.client.proto.V1.TCPSocketAction;
import io.kubernetes.client.proto.V1.Volume;
import io.kubernetes.client.proto.V1.VolumeMount;
import io.kubernetes.client.proto.V1.VolumeSource;
import io.kubernetes.client.proto.V1beta1Extensions;
import io.kubernetes.client.proto.V1beta1Extensions.Deployment;
import io.kubernetes.client.proto.V1beta1Extensions.DeploymentSpec;
Expand All @@ -75,6 +82,8 @@ public class SeldonDeploymentOperatorImpl implements SeldonDeploymentOperator {
public static final String LABEL_SELDON_APP = "seldon-app";
public static final String LABEL_SELDON_TYPE_KEY = "seldon-type";
public static final String LABEL_SELDON_TYPE_VAL = "deployment";
public static final String PODINFO_VOLUME_NAME = "podinfo";
public static final String PODINFO_VOLUME_PATH = "/etc/podinfo";

@Autowired
public SeldonDeploymentOperatorImpl(ClusterManagerProperites clusterManagerProperites) {
Expand Down Expand Up @@ -102,6 +111,7 @@ private V1.Container createEngineContainer(SeldonDeployment dep,PredictorSpec pr
cBuilder
.setName("seldon-container-engine")
.setImage(clusterManagerProperites.getEngineContainerImageAndVersion())
.addVolumeMounts(VolumeMount.newBuilder().setName(PODINFO_VOLUME_NAME).setMountPath(PODINFO_VOLUME_PATH).setReadOnly(true))
.addEnv(EnvVar.newBuilder().setName("ENGINE_PREDICTOR").setValue(getEngineEnvVarJson(predictorDef)))
.addEnv(EnvVar.newBuilder().setName("ENGINE_SELDON_DEPLOYMENT").setValue(getEngineEnvVarJson(dep)))
.addEnv(EnvVar.newBuilder().setName("ENGINE_SERVER_PORT").setValue(""+clusterManagerProperites.getEngineContainerPort()))
Expand Down Expand Up @@ -196,6 +206,9 @@ private V1.Container updateContainer(V1.Container c,PredictiveUnit pu,int portNu
{
V1.Container.Builder c2Builder = V1.Container.newBuilder(c);

//Add volume to get at pod annotations
c2Builder.addVolumeMounts(VolumeMount.newBuilder().setName(PODINFO_VOLUME_NAME).setMountPath(PODINFO_VOLUME_PATH).setReadOnly(true));

Integer containerPort = getPort(c.getPortsList());
// Add container port and liveness and readiness probes if no container ports are specified
if (containerPort == null)
Expand Down Expand Up @@ -533,12 +546,20 @@ public DeploymentResources createResources(SeldonDeployment mlDep) throws Seldon
PodTemplateSpec.Builder podSpecBuilder = PodTemplateSpec.newBuilder();
podSpecBuilder.getSpecBuilder()
.addContainers(createEngineContainer(mlDep,p))
.setTerminationGracePeriodSeconds(20);
.setSecurityContext(PodSecurityContext.newBuilder().setRunAsUser(8888).build())
.setTerminationGracePeriodSeconds(20)
.addVolumes(Volume.newBuilder() // Add downwardAPI volume for annotations
.setName(PODINFO_VOLUME_NAME)
.setVolumeSource(VolumeSource.newBuilder().setDownwardAPI(DownwardAPIVolumeSource.newBuilder()
.addItems(DownwardAPIVolumeFile.newBuilder().setPath("annotations")
.setFieldRef(ObjectFieldSelector.newBuilder().setFieldPath("metadata.annotations"))))));

String depName = getSeldonServiceName(mlDep,p,"svc-orch");
podSpecBuilder.getMetadataBuilder()
.putLabels(LABEL_SELDON_APP, mlDep.getSpec().getName())
.putLabels("app", depName)
.putAllAnnotations(mlDep.getSpec().getAnnotationsMap()) // Add all spec annotations first
.putAllAnnotations(p.getAnnotationsMap()) // ...then add those for predictor overwriting any from spec above
.putAnnotations("prometheus.io/path", "/prometheus")
.putAnnotations("prometheus.io/port",""+clusterManagerProperites.getEngineContainerPort())
.putAnnotations("prometheus.io/scrape", "true");
Expand Down Expand Up @@ -585,15 +606,21 @@ public DeploymentResources createResources(SeldonDeployment mlDep) throws Seldon
// Add default version number then overwrite with any labels
podSpecBuilder.getMetadataBuilder().putLabels("version", "v1");
depMetaBuilder.putAllLabels(spec.getMetadata().getLabelsMap());
podSpecBuilder.getMetadataBuilder().putAllLabels(spec.getMetadata().getLabelsMap());

podSpecBuilder.getMetadataBuilder()
.putAllLabels(spec.getMetadata().getLabelsMap())
.putAllAnnotations(mlDep.getSpec().getAnnotationsMap()) // Add all spec annotations first
.putAllAnnotations(p.getAnnotationsMap()); // ...then add those for predictor overwriting any from spec above
podSpecBuilder.getSpecBuilder()
.setTerminationGracePeriodSeconds(20)
.addVolumes(Volume.newBuilder() // Add downwardAPI volume for annotations
.setName(PODINFO_VOLUME_NAME)
.setVolumeSource(VolumeSource.newBuilder().setDownwardAPI(DownwardAPIVolumeSource.newBuilder()
.addItems(DownwardAPIVolumeFile.newBuilder().setPath("annotations")
.setFieldRef(ObjectFieldSelector.newBuilder().setFieldPath("metadata.annotations"))))));
for(V1.Container c : spec.getSpec().getContainersList())
{
String containerServiceKey = getPredictorServiceNameKey(c.getName());
String containerServiceValue = getSeldonServiceName(mlDep, p, c.getName());

podSpecBuilder.getSpecBuilder()
.setTerminationGracePeriodSeconds(20);

if (!createdServices.contains(containerServiceValue))
{
Expand Down
9 changes: 9 additions & 0 deletions docs/annotations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Annotation Based Configuration

You can configure aspects of Seldon Core via annotations in the SeldonDeployment resource. Please create an issue if you would like some configuration added.

# Available Annotations

* ```seldon.io/grpc-max-message-size``` : Maximum gRPC message size
* Location : SeldonDeployment.spec.annotations
* [Example](../notebooks/resources/model_grpc_size.json)
85 changes: 85 additions & 0 deletions docs/grpc_max_message_size.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# GRPC Max Message Size

GRPC has a default max message size of 4MB. If you need to run models whose input features or output predictions are larger than this you can congifure Seldon Core to run with gRPC server/clients that handle a larger mesage size with annotations.

Add the annotation ```seldon.io/grpc-max-message-size``` with the number of bytes of the largest expected message. For example the SeldonDeployment resource below sets this to 10MB:

```
{
"apiVersion": "machinelearning.seldon.io/v1alpha2",
"kind": "SeldonDeployment",
"metadata": {
"labels": {
"app": "seldon"
},
"name": "seldon-deployment-example"
},
"spec": {
"annotations": {
"project_name": "FX Market Prediction",
"deployment_version": "v1",
"seldon.io/grpc-max-message-size":"10485760"
},
"name": "test-deployment",
"oauth_key": "oauth-key",
"oauth_secret": "oauth-secret",
"predictors": [
{
"componentSpecs": [{
"spec": {
"containers": [
{
"image": "seldonio/mock_classifier_grpc:1.0",
"imagePullPolicy": "IfNotPresent",
"name": "classifier",
"resources": {
"requests": {
"memory": "1Mi"
}
}
}
],
"terminationGracePeriodSeconds": 20
}
}],
"graph": {
"children": [],
"name": "classifier",
"endpoint": {
"type" : "GRPC"
},
"type": "MODEL"
},
"name": "fx-market-predictor",
"replicas": 1,
"annotations": {
"predictor_version" : "v1"
}
}
]
}
}

```

## API OAUTH Gateway

If you are using the default API OAUTH Gateway you will also need to update your Helm or Ksonnet install:

For Helm add to your Helm values, for example:

```
apife:
annotations:
seldon.io/grpc-max-message-size: "10485760"
```

For Ksonnet set the parameters grpcMaxMessageSize:

```
ks param set seldon-core grpcMaxMessageSize '10485760' --as-string
```

## Example

To see a worked example, run the Jupyter notebook [here](../notebooks/max_grpc_msg_size.ipynb).
Loading