Skip to content

Commit

Permalink
Merge pull request #19 from cliveseldon/health-checks-grpc
Browse files Browse the repository at this point in the history
Health checks grpc
  • Loading branch information
Maximophone authored Jan 10, 2018
2 parents 6b93941 + c81def3 commit 869563a
Show file tree
Hide file tree
Showing 5 changed files with 491 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.ApiException;
import io.kubernetes.client.apis.ExtensionsV1beta1Api;
import io.kubernetes.client.models.AppsV1beta1Deployment;
import io.kubernetes.client.models.ExtensionsV1beta1Deployment;
import io.kubernetes.client.models.V1OwnerReference;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
Expand Down Expand Up @@ -65,14 +65,14 @@ public int watchDeployments(int resourceVersion,int resourceVersionProcessed) th
ApiClient client = Config.defaultClient();
ExtensionsV1beta1Api api = new ExtensionsV1beta1Api(client);

Watch<AppsV1beta1Deployment> watch = Watch.createWatch(
Watch<ExtensionsV1beta1Deployment> watch = Watch.createWatch(
client,
api.listNamespacedDeploymentCall("default", null, null, null,false,SeldonDeploymentOperatorImpl.LABEL_SELDON_TYPE_KEY+"="+SeldonDeploymentOperatorImpl.LABEL_SELDON_TYPE_VAL, null,rs, 10, true,null,null),
new TypeToken<Watch.Response<AppsV1beta1Deployment>>(){}.getType());
new TypeToken<Watch.Response<ExtensionsV1beta1Deployment>>(){}.getType());

try
{
for (Watch.Response<AppsV1beta1Deployment> item : watch) {
for (Watch.Response<ExtensionsV1beta1Deployment> item : watch) {
int resourceVersionNew = Integer.parseInt(item.object.getMetadata().getResourceVersion());
if (resourceVersionNew <= resourceVersionProcessed)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ private V1.Container updateContainer(V1.Container c,PredictiveUnit pu,int idx)
V1.Container.Builder c2Builder = V1.Container.newBuilder(c);

Integer containerPort = getPort(c.getPortsList());
// Add container port and liveness and readiness probes if no container ports are specified
if (containerPort == null)
{
if (pu != null)
Expand All @@ -191,46 +192,66 @@ private V1.Container updateContainer(V1.Container c,PredictiveUnit pu,int idx)
{
c2Builder.addPorts(ContainerPort.newBuilder().setName("http").setContainerPort(clusterManagerProperites.getPuContainerPortBase() + idx));
containerPort = clusterManagerProperites.getPuContainerPortBase() + idx;

if (!c.hasLivenessProbe())
{
c2Builder.setLivenessProbe(Probe.newBuilder()
.setHandler(Handler.newBuilder().setTcpSocket(TCPSocketAction.newBuilder().setPort(io.kubernetes.client.proto.IntStr.IntOrString.newBuilder().setType(1).setStrVal("http"))))
.setInitialDelaySeconds(10)
.setPeriodSeconds(5)
);
}
if (!c.hasReadinessProbe())
{

c2Builder.setReadinessProbe(Probe.newBuilder()
.setHandler(Handler.newBuilder().setTcpSocket(TCPSocketAction.newBuilder().setPort(io.kubernetes.client.proto.IntStr.IntOrString.newBuilder().setType(1).setStrVal("http"))))
.setInitialDelaySeconds(10)
.setPeriodSeconds(5)
);
}
}
else
{
c2Builder.addPorts(ContainerPort.newBuilder().setName("grpc").setContainerPort(clusterManagerProperites.getPuContainerPortBase() + idx));
containerPort = clusterManagerProperites.getPuContainerPortBase() + idx;
containerPort = clusterManagerProperites.getPuContainerPortBase() + idx;

if (!c.hasLivenessProbe())
{
c2Builder.setLivenessProbe(Probe.newBuilder()
.setHandler(Handler.newBuilder().setTcpSocket(TCPSocketAction.newBuilder().setPort(io.kubernetes.client.proto.IntStr.IntOrString.newBuilder().setType(1).setStrVal("grpc"))))
.setInitialDelaySeconds(10)
.setPeriodSeconds(5)
);
}
if (!c.hasReadinessProbe())
{

c2Builder.setReadinessProbe(Probe.newBuilder()
.setHandler(Handler.newBuilder().setTcpSocket(TCPSocketAction.newBuilder().setPort(io.kubernetes.client.proto.IntStr.IntOrString.newBuilder().setType(1).setStrVal("grpc"))))
.setInitialDelaySeconds(10)
.setPeriodSeconds(5)
);

}
}
}
}
else
containerPort = c.getPorts(0).getContainerPort();

// Add environment variable for the port used in case the model needs to access it
final String ENV_PREDICTIVE_UNIT_SERVICE_PORT ="PREDICTIVE_UNIT_SERVICE_PORT";
Set<String> envNames = this.getEnvNamesProto(c.getEnvList());
if (!envNames.contains(ENV_PREDICTIVE_UNIT_SERVICE_PORT))
c2Builder.addEnv(EnvVar.newBuilder().setName(ENV_PREDICTIVE_UNIT_SERVICE_PORT).setValue(""+containerPort));

//Add environment variable for the parameters passed in case the model needs to access it
final String ENV_PREDICTIVE_UNIT_PARAMETERS = "PREDICTIVE_UNIT_PARAMETERS";
if (!envNames.contains(ENV_PREDICTIVE_UNIT_PARAMETERS))
c2Builder.addEnv(EnvVar.newBuilder().setName(ENV_PREDICTIVE_UNIT_PARAMETERS).setValue(extractPredictiveUnitParametersAsJson(pu)));

if (!c.hasLivenessProbe())
{
c2Builder.setLivenessProbe(Probe.newBuilder()
.setHandler(Handler.newBuilder().setTcpSocket(TCPSocketAction.newBuilder().setPort(io.kubernetes.client.proto.IntStr.IntOrString.newBuilder().setType(1).setStrVal("http"))))
.setInitialDelaySeconds(10)
.setPeriodSeconds(5)
);
}

if (!c.hasReadinessProbe())
{
c2Builder.setReadinessProbe(Probe.newBuilder()
.setHandler(Handler.newBuilder().setTcpSocket(TCPSocketAction.newBuilder().setPort(io.kubernetes.client.proto.IntStr.IntOrString.newBuilder().setType(1).setStrVal("http"))))
.setInitialDelaySeconds(10)
.setPeriodSeconds(5)
);

}



// Add a default lifecycle pre-stop if non exists
if (!c.hasLifecycle())
{
if (!c.getLifecycle().hasPreStop())
Expand All @@ -239,7 +260,6 @@ private V1.Container updateContainer(V1.Container c,PredictiveUnit pu,int idx)
ExecAction.newBuilder().addCommand("/bin/sh").addCommand("-c").addCommand("/bin/sleep 5"))));
}
}


return c2Builder.build();
}
Expand Down
Loading

0 comments on commit 869563a

Please sign in to comment.