Skip to content

Commit

Permalink
Merge pull request #197 from cliveseldon/config
Browse files Browse the repository at this point in the history
Allow Annotations to allow customizations
  • Loading branch information
ukclivecox authored Aug 28, 2018
2 parents 5592736 + 408590b commit 1e0cf90
Show file tree
Hide file tree
Showing 38 changed files with 1,757 additions and 142 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.seldon.apife.config;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

/**
*
* @author clive
* Utility class to load annotations from kubernetes file for use by other components as config during startup
*
*/
@Component
public class AnnotationsConfig {
protected static Logger logger = LoggerFactory.getLogger(AnnotationsConfig.class.getName());
final String ANNOTATIONS_FILE = "/etc/podinfo/annotations";

final Map<String,String> annotations = new ConcurrentHashMap<>();

String readFile(String path, Charset encoding) throws IOException
{
byte[] encoded = Files.readAllBytes(Paths.get(path));
return new String(encoded, encoding);
}

public AnnotationsConfig() throws IOException
{
loadAnnotations();
}

private void processAnnotation(String line)
{
final String[] parts = line.split("=");
if (parts.length == 2)
{
final String value = parts[1].substring(1, parts[1].length()-1); //remove start and end quote
annotations.put(parts[0], value);
}
else
logger.warn("Failed to parse annotation {}",line);
}

private void loadAnnotations()
{
try
{
File f = new File(ANNOTATIONS_FILE);
if(f.exists() && !f.isDirectory())
{
try (BufferedReader r = Files.newBufferedReader(Paths.get(ANNOTATIONS_FILE), StandardCharsets.UTF_8))
{
r.lines().forEach(this::processAnnotation);
}
}
} catch (IOException e) {
logger.error("Failed to load annotations file "+ANNOTATIONS_FILE,e);
}
logger.info("Annotations {}",annotations);
}

public boolean has(String key)
{
return annotations.containsKey(key);
}

public String get(String key)
{
return annotations.get(key);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,23 @@
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptors;
import io.grpc.netty.NettyServerBuilder;
import io.seldon.apife.AppProperties;
import io.seldon.apife.api.oauth.InMemoryClientDetailsService;
import io.seldon.apife.config.AnnotationsConfig;
import io.seldon.apife.deployments.DeploymentStore;
import io.seldon.apife.deployments.DeploymentsHandler;
import io.seldon.apife.deployments.DeploymentsListener;
import io.seldon.apife.exception.SeldonAPIException;
import io.seldon.apife.k8s.DeploymentWatcher;
import io.seldon.protos.DeploymentProtos.DeploymentSpec;
import io.seldon.protos.DeploymentProtos.Endpoint;
import io.seldon.protos.DeploymentProtos.SeldonDeployment;

@Component
public class SeldonGrpcServer {
protected static Logger logger = LoggerFactory.getLogger(SeldonGrpcServer.class.getName());

public static final int SERVER_PORT = 5000;
private final String ANNOTATION_MAX_MESSAGE_SIZE = "seldon.io/grpc-max-message-size";

private final int port;
private final Server server;
Expand All @@ -60,19 +61,21 @@ public class SeldonGrpcServer {
private final grpcDeploymentsListener grpcDeploymentsListener;
private final DeploymentsHandler deploymentsHandler;

private int maxMessageSize = io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;

@Autowired
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,DeploymentsHandler deploymentsHandler)
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,DeploymentsHandler deploymentsHandler,AnnotationsConfig annotations)
{
this(appProperties,deploymentStore,tokenStore,deploymentsHandler,SERVER_PORT);
this(appProperties,deploymentStore,tokenStore,deploymentsHandler,annotations,SERVER_PORT);
}

public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,DeploymentsHandler deploymentsHandler,int port)
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,DeploymentsHandler deploymentsHandler,AnnotationsConfig annotations,int port)
{
this(appProperties,deploymentStore,tokenStore,ServerBuilder.forPort(port), deploymentsHandler, port);
this(appProperties,deploymentStore,tokenStore,ServerBuilder.forPort(port), deploymentsHandler, annotations, port);
}


public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,ServerBuilder<?> serverBuilder,DeploymentsHandler deploymentsHandler, int port)
public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentStore,TokenStore tokenStore,ServerBuilder<?> serverBuilder,DeploymentsHandler deploymentsHandler, AnnotationsConfig annotations, int port)
{
this.appProperties = appProperties;
this.deploymentStore = deploymentStore;
Expand All @@ -81,9 +84,23 @@ public SeldonGrpcServer(AppProperties appProperties,DeploymentStore deploymentSt
this.deploymentsHandler = deploymentsHandler;
deploymentsHandler.addListener(this.grpcDeploymentsListener);
this.port = port;
server = serverBuilder
.addService(ServerInterceptors.intercept(new SeldonService(this), new HeaderServerInterceptor(this)))
.build();
NettyServerBuilder builder = NettyServerBuilder
.forPort(port)
.addService(ServerInterceptors.intercept(new SeldonService(this), new HeaderServerInterceptor(this)));
if (annotations != null && annotations.has(ANNOTATION_MAX_MESSAGE_SIZE))
{
try
{
maxMessageSize =Integer.parseInt(annotations.get(ANNOTATION_MAX_MESSAGE_SIZE));
logger.info("Setting max message to {}",maxMessageSize);
builder.maxMessageSize(maxMessageSize);
}
catch(NumberFormatException e)
{
logger.warn("Failed to parse {} with value {}",ANNOTATION_MAX_MESSAGE_SIZE,annotations.get(ANNOTATION_MAX_MESSAGE_SIZE),e);
}
}
server = builder.build();
}

@PostConstruct
Expand Down Expand Up @@ -190,7 +207,7 @@ public static void main(String[] args) throws Exception {
AppProperties appProperties = new AppProperties();
appProperties.setEngineGrpcContainerPort(5000);
store.deploymentAdded(dep);
SeldonGrpcServer server = new SeldonGrpcServer(appProperties,store,null,null,SERVER_PORT);
SeldonGrpcServer server = new SeldonGrpcServer(appProperties,store,null,null,null,SERVER_PORT);
server.start();
server.blockUntilShutdown();
}
Expand All @@ -203,5 +220,11 @@ public void deploymentAdded(SeldonDeployment resource) {
public void deploymentRemoved(SeldonDeployment resource) {
channelStore.remove(resource.getSpec().getOauthKey());
}

public int getMaxMessageSize() {
return maxMessageSize;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,25 @@
import io.kubernetes.client.proto.Resource.Quantity;
import io.kubernetes.client.proto.V1;
import io.kubernetes.client.proto.V1.ContainerPort;
import io.kubernetes.client.proto.V1.DownwardAPIVolumeFile;
import io.kubernetes.client.proto.V1.DownwardAPIVolumeSource;
import io.kubernetes.client.proto.V1.EnvVar;
import io.kubernetes.client.proto.V1.ExecAction;
import io.kubernetes.client.proto.V1.HTTPGetAction;
import io.kubernetes.client.proto.V1.Handler;
import io.kubernetes.client.proto.V1.Lifecycle;
import io.kubernetes.client.proto.V1.ObjectFieldSelector;
import io.kubernetes.client.proto.V1.PodSecurityContext;
import io.kubernetes.client.proto.V1.PodTemplateSpec;
import io.kubernetes.client.proto.V1.Probe;
import io.kubernetes.client.proto.V1.SecurityContext;
import io.kubernetes.client.proto.V1.Service;
import io.kubernetes.client.proto.V1.ServicePort;
import io.kubernetes.client.proto.V1.ServiceSpec;
import io.kubernetes.client.proto.V1.TCPSocketAction;
import io.kubernetes.client.proto.V1.Volume;
import io.kubernetes.client.proto.V1.VolumeMount;
import io.kubernetes.client.proto.V1.VolumeSource;
import io.kubernetes.client.proto.V1beta1Extensions;
import io.kubernetes.client.proto.V1beta1Extensions.Deployment;
import io.kubernetes.client.proto.V1beta1Extensions.DeploymentSpec;
Expand All @@ -75,6 +82,8 @@ public class SeldonDeploymentOperatorImpl implements SeldonDeploymentOperator {
public static final String LABEL_SELDON_APP = "seldon-app";
public static final String LABEL_SELDON_TYPE_KEY = "seldon-type";
public static final String LABEL_SELDON_TYPE_VAL = "deployment";
public static final String PODINFO_VOLUME_NAME = "podinfo";
public static final String PODINFO_VOLUME_PATH = "/etc/podinfo";

@Autowired
public SeldonDeploymentOperatorImpl(ClusterManagerProperites clusterManagerProperites) {
Expand Down Expand Up @@ -102,6 +111,7 @@ private V1.Container createEngineContainer(SeldonDeployment dep,PredictorSpec pr
cBuilder
.setName("seldon-container-engine")
.setImage(clusterManagerProperites.getEngineContainerImageAndVersion())
.addVolumeMounts(VolumeMount.newBuilder().setName(PODINFO_VOLUME_NAME).setMountPath(PODINFO_VOLUME_PATH).setReadOnly(true))
.addEnv(EnvVar.newBuilder().setName("ENGINE_PREDICTOR").setValue(getEngineEnvVarJson(predictorDef)))
.addEnv(EnvVar.newBuilder().setName("ENGINE_SELDON_DEPLOYMENT").setValue(getEngineEnvVarJson(dep)))
.addEnv(EnvVar.newBuilder().setName("ENGINE_SERVER_PORT").setValue(""+clusterManagerProperites.getEngineContainerPort()))
Expand Down Expand Up @@ -196,6 +206,9 @@ private V1.Container updateContainer(V1.Container c,PredictiveUnit pu,int portNu
{
V1.Container.Builder c2Builder = V1.Container.newBuilder(c);

//Add volume to get at pod annotations
c2Builder.addVolumeMounts(VolumeMount.newBuilder().setName(PODINFO_VOLUME_NAME).setMountPath(PODINFO_VOLUME_PATH).setReadOnly(true));

Integer containerPort = getPort(c.getPortsList());
// Add container port and liveness and readiness probes if no container ports are specified
if (containerPort == null)
Expand Down Expand Up @@ -533,12 +546,20 @@ public DeploymentResources createResources(SeldonDeployment mlDep) throws Seldon
PodTemplateSpec.Builder podSpecBuilder = PodTemplateSpec.newBuilder();
podSpecBuilder.getSpecBuilder()
.addContainers(createEngineContainer(mlDep,p))
.setTerminationGracePeriodSeconds(20);
.setSecurityContext(PodSecurityContext.newBuilder().setRunAsUser(8888).build())
.setTerminationGracePeriodSeconds(20)
.addVolumes(Volume.newBuilder() // Add downwardAPI volume for annotations
.setName(PODINFO_VOLUME_NAME)
.setVolumeSource(VolumeSource.newBuilder().setDownwardAPI(DownwardAPIVolumeSource.newBuilder()
.addItems(DownwardAPIVolumeFile.newBuilder().setPath("annotations")
.setFieldRef(ObjectFieldSelector.newBuilder().setFieldPath("metadata.annotations"))))));

String depName = getSeldonServiceName(mlDep,p,"svc-orch");
podSpecBuilder.getMetadataBuilder()
.putLabels(LABEL_SELDON_APP, mlDep.getSpec().getName())
.putLabels("app", depName)
.putAllAnnotations(mlDep.getSpec().getAnnotationsMap()) // Add all spec annotations first
.putAllAnnotations(p.getAnnotationsMap()) // ...then add those for predictor overwriting any from spec above
.putAnnotations("prometheus.io/path", "/prometheus")
.putAnnotations("prometheus.io/port",""+clusterManagerProperites.getEngineContainerPort())
.putAnnotations("prometheus.io/scrape", "true");
Expand Down Expand Up @@ -585,15 +606,21 @@ public DeploymentResources createResources(SeldonDeployment mlDep) throws Seldon
// Add default version number then overwrite with any labels
podSpecBuilder.getMetadataBuilder().putLabels("version", "v1");
depMetaBuilder.putAllLabels(spec.getMetadata().getLabelsMap());
podSpecBuilder.getMetadataBuilder().putAllLabels(spec.getMetadata().getLabelsMap());

podSpecBuilder.getMetadataBuilder()
.putAllLabels(spec.getMetadata().getLabelsMap())
.putAllAnnotations(mlDep.getSpec().getAnnotationsMap()) // Add all spec annotations first
.putAllAnnotations(p.getAnnotationsMap()); // ...then add those for predictor overwriting any from spec above
podSpecBuilder.getSpecBuilder()
.setTerminationGracePeriodSeconds(20)
.addVolumes(Volume.newBuilder() // Add downwardAPI volume for annotations
.setName(PODINFO_VOLUME_NAME)
.setVolumeSource(VolumeSource.newBuilder().setDownwardAPI(DownwardAPIVolumeSource.newBuilder()
.addItems(DownwardAPIVolumeFile.newBuilder().setPath("annotations")
.setFieldRef(ObjectFieldSelector.newBuilder().setFieldPath("metadata.annotations"))))));
for(V1.Container c : spec.getSpec().getContainersList())
{
String containerServiceKey = getPredictorServiceNameKey(c.getName());
String containerServiceValue = getSeldonServiceName(mlDep, p, c.getName());

podSpecBuilder.getSpecBuilder()
.setTerminationGracePeriodSeconds(20);

if (!createdServices.contains(containerServiceValue))
{
Expand Down
9 changes: 9 additions & 0 deletions docs/annotations.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Annotation Based Configuration

You can configure aspects of Seldon Core via annotations in the SeldonDeployment resource. Please create an issue if you would like some configuration added.

# Available Annotations

* ```seldon.io/grpc-max-message-size``` : Maximum gRPC message size
* Location : SeldonDeployment.spec.annotations
* [Example](../notebooks/resources/model_grpc_size.json)
85 changes: 85 additions & 0 deletions docs/grpc_max_message_size.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# GRPC Max Message Size

GRPC has a default max message size of 4MB. If you need to run models whose input features or output predictions are larger than this you can congifure Seldon Core to run with gRPC server/clients that handle a larger mesage size with annotations.

Add the annotation ```seldon.io/grpc-max-message-size``` with the number of bytes of the largest expected message. For example the SeldonDeployment resource below sets this to 10MB:

```
{
"apiVersion": "machinelearning.seldon.io/v1alpha2",
"kind": "SeldonDeployment",
"metadata": {
"labels": {
"app": "seldon"
},
"name": "seldon-deployment-example"
},
"spec": {
"annotations": {
"project_name": "FX Market Prediction",
"deployment_version": "v1",
"seldon.io/grpc-max-message-size":"10485760"
},
"name": "test-deployment",
"oauth_key": "oauth-key",
"oauth_secret": "oauth-secret",
"predictors": [
{
"componentSpecs": [{
"spec": {
"containers": [
{
"image": "seldonio/mock_classifier_grpc:1.0",
"imagePullPolicy": "IfNotPresent",
"name": "classifier",
"resources": {
"requests": {
"memory": "1Mi"
}
}
}
],
"terminationGracePeriodSeconds": 20
}
}],
"graph": {
"children": [],
"name": "classifier",
"endpoint": {
"type" : "GRPC"
},
"type": "MODEL"
},
"name": "fx-market-predictor",
"replicas": 1,
"annotations": {
"predictor_version" : "v1"
}
}
]
}
}
```

## API OAUTH Gateway

If you are using the default API OAUTH Gateway you will also need to update your Helm or Ksonnet install:

For Helm add to your Helm values, for example:

```
apife:
annotations:
seldon.io/grpc-max-message-size: "10485760"
```

For Ksonnet set the parameters grpcMaxMessageSize:

```
ks param set seldon-core grpcMaxMessageSize '10485760' --as-string
```

## Example

To see a worked example, run the Jupyter notebook [here](../notebooks/max_grpc_msg_size.ipynb).
Loading

0 comments on commit 1e0cf90

Please sign in to comment.