Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add GRPC health check in suggestions #779

Merged
merged 2 commits into from
Sep 20, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions cmd/suggestion/hyperopt/v1alpha3/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
FROM python:3

ADD . /usr/src/app/github.com/kubeflow/katib
WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/suggestion/hyperopt/v1alpha3
RUN if [ "$(uname -m)" = "ppc64le" ]; then \
apt-get -y update && \
apt-get -y install gfortran libopenblas-dev liblapack-dev && \
pip install cython; \
fi
RUN GRPC_HEALTH_PROBE_VERSION=v0.3.0 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gaocegege , I am wondering if you have tested the multi-arch binary file of the GRPC health probe, as your patch only wget the amd64 architecture.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I have not tried to run on other architecture.

But we could support it since the probe can be built other platforms.

https://github.com/grpc-ecosystem/grpc-health-probe/releases

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have some problem when you use it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I am working on the arm64 support of images, therefore I will propose a patch to fix this, thanks for noticing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well I didn't test but I am working on the arm64 image support, by going through the Dockerfile I noticed this. I will then try to take care of it :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, thanks. It seems that we can support it with an arch check.

chmod +x /bin/grpc_health_probe

ADD . /usr/src/app/github.com/kubeflow/katib
WORKDIR /usr/src/app/github.com/kubeflow/katib/cmd/suggestion/hyperopt/v1alpha3
RUN pip install --no-cache-dir -r requirements.txt
ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/apis/manager/v1alpha3/python

ENV PYTHONPATH /usr/src/app/github.com/kubeflow/katib:/usr/src/app/github.com/kubeflow/katib/pkg/apis/manager/v1alpha3/python:/usr/src/app/github.com/kubeflow/katib/pkg/apis/manager/health/python

ENTRYPOINT ["python", "main.py"]
5 changes: 4 additions & 1 deletion cmd/suggestion/hyperopt/v1alpha3/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import grpc
import time
from pkg.apis.manager.v1alpha3.python import api_pb2_grpc
from pkg.apis.manager.health.python import health_pb2_grpc
from pkg.suggestion.v1alpha3.hyperopt_service import HyperoptService
from concurrent import futures

Expand All @@ -9,7 +10,9 @@

def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
api_pb2_grpc.add_SuggestionServicer_to_server(HyperoptService(), server)
service = HyperoptService()
api_pb2_grpc.add_SuggestionServicer_to_server(service, server)
health_pb2_grpc.add_HealthServicer_to_server(service, server)
server.add_insecure_port(DEFAULT_PORT)
print("Listening...")
server.start()
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller.v1alpha3/consts/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ const (

ContainerSuggestion = "suggestion"

DefaultSuggestionPort = 6789
DefaultSuggestionPort = 6789
DefaultSuggestionPortName = "katib-api"
DefaultGRPCService = "manager.v1alpha3.Suggestion"

// Default env name of katib namespace
DefaultKatibNamespaceEnvName = "KATIB_CORE_NAMESPACE"
Expand Down
39 changes: 38 additions & 1 deletion pkg/controller.v1alpha3/suggestion/composer/composer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand All @@ -19,6 +20,12 @@ import (
"github.com/kubeflow/katib/pkg/controller.v1alpha3/util"
)

const (
defaultInitialDelaySeconds = 10
gaocegege marked this conversation as resolved.
Show resolved Hide resolved
// Ref https://github.com/grpc-ecosystem/grpc-health-probe/
defaultGRPCHealthCheckProbe = "/bin/grpc_health_probe"
)

var log = logf.Log.WithName("suggestion-composer")

type Composer interface {
Expand Down Expand Up @@ -75,7 +82,7 @@ func (g *General) DesiredDeployment(s *suggestionsv1alpha3.Suggestion) (*appsv1.
func (g *General) DesiredService(s *suggestionsv1alpha3.Suggestion) (*corev1.Service, error) {
ports := []corev1.ServicePort{
{
Name: "katib-api",
Name: consts.DefaultSuggestionPortName,
Port: consts.DefaultSuggestionPort,
},
}
Expand Down Expand Up @@ -109,6 +116,36 @@ func (g *General) desiredContainer(s *suggestionsv1alpha3.Suggestion) (*corev1.C
Name: consts.ContainerSuggestion,
}
c.Image = suggestionContainerImage
c.Ports = []corev1.ContainerPort{
{
Name: consts.DefaultSuggestionPortName,
ContainerPort: consts.DefaultSuggestionPort,
},
}
c.ReadinessProbe = &corev1.Probe{
Handler: corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{
defaultGRPCHealthCheckProbe,
fmt.Sprintf("-addr=:%d", consts.DefaultSuggestionPort),
fmt.Sprintf("-service=%s", consts.DefaultGRPCService),
},
},
},
InitialDelaySeconds: defaultInitialDelaySeconds,
}
c.LivenessProbe = &corev1.Probe{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will liveness probe has any hidden consequences? Say if service is not accessible for some time

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it will. But I think we should have it. Since we can mark the deployment unavailable then avoid timeout requests

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sound good

Handler: corev1.Handler{
Exec: &corev1.ExecAction{
Command: []string{
defaultGRPCHealthCheckProbe,
fmt.Sprintf("-addr=:%d", consts.DefaultSuggestionPort),
fmt.Sprintf("-service=%s", consts.DefaultGRPCService),
},
},
},
InitialDelaySeconds: defaultInitialDelaySeconds,
}
return c, nil
}

Expand Down
167 changes: 167 additions & 0 deletions pkg/suggestion/v1alpha3/base_health_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
# Copyright 2015 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Reference implementation for health checking in gRPC Python."""

import collections
import threading

import grpc

from pkg.apis.manager.health.python import health_pb2 as _health_pb2
from pkg.apis.manager.health.python import health_pb2_grpc as _health_pb2_grpc

SERVICE_NAME = _health_pb2.DESCRIPTOR.services_by_name['Health'].full_name


class _Watcher():

def __init__(self):
self._condition = threading.Condition()
self._responses = collections.deque()
self._open = True

def __iter__(self):
return self

def _next(self):
with self._condition:
while not self._responses and self._open:
self._condition.wait()
if self._responses:
return self._responses.popleft()
else:
raise StopIteration()

def next(self):
return self._next()

def __next__(self):
return self._next()

def add(self, response):
with self._condition:
self._responses.append(response)
self._condition.notify()

def close(self):
with self._condition:
self._open = False
self._condition.notify()


def _watcher_to_send_response_callback_adapter(watcher):

def send_response_callback(response):
if response is None:
watcher.close()
else:
watcher.add(response)

return send_response_callback


class HealthServicer(_health_pb2_grpc.HealthServicer):
"""Servicer handling RPCs for service statuses."""

def __init__(self,
experimental_non_blocking=True,
experimental_thread_pool=None):
self._lock = threading.RLock()
self._server_status = {}
self._send_response_callbacks = {}
self.Watch.__func__.experimental_non_blocking = experimental_non_blocking
self.Watch.__func__.experimental_thread_pool = experimental_thread_pool
self._gracefully_shutting_down = False
self.set("manager.v1alpha3.Suggestion", _health_pb2.HealthCheckResponse.SERVING)

def _on_close_callback(self, send_response_callback, service):

def callback():
with self._lock:
self._send_response_callbacks[service].remove(
send_response_callback)
send_response_callback(None)

return callback

def Check(self, request, context):
with self._lock:
status = self._server_status.get(request.service)
if status is None:
print(request.service)
context.set_code(grpc.StatusCode.NOT_FOUND)
return _health_pb2.HealthCheckResponse()
else:
return _health_pb2.HealthCheckResponse(status=status)

# pylint: disable=arguments-differ
def Watch(self, request, context, send_response_callback=None):
blocking_watcher = None
if send_response_callback is None:
# The server does not support the experimental_non_blocking
# parameter. For backwards compatibility, return a blocking response
# generator.
blocking_watcher = _Watcher()
send_response_callback = _watcher_to_send_response_callback_adapter(
blocking_watcher)
service = request.service
with self._lock:
status = self._server_status.get(service)
if status is None:
status = _health_pb2.HealthCheckResponse.SERVICE_UNKNOWN # pylint: disable=no-member
send_response_callback(
_health_pb2.HealthCheckResponse(status=status))
if service not in self._send_response_callbacks:
self._send_response_callbacks[service] = set()
self._send_response_callbacks[service].add(send_response_callback)
context.add_callback(
self._on_close_callback(send_response_callback, service))
return blocking_watcher

def set(self, service, status):
"""Sets the status of a service.

Args:
service: string, the name of the service. NOTE, '' must be set.
status: HealthCheckResponse.status enum value indicating the status of
the service
"""
with self._lock:
if self._gracefully_shutting_down:
return
else:
self._server_status[service] = status
if service in self._send_response_callbacks:
for send_response_callback in self._send_response_callbacks[
service]:
send_response_callback(
_health_pb2.HealthCheckResponse(status=status))

def enter_graceful_shutdown(self):
"""Permanently sets the status of all services to NOT_SERVING.

This should be invoked when the server is entering a graceful shutdown
period. After this method is invoked, future attempts to set the status
of a service will be ignored.

This is an EXPERIMENTAL API.
"""
with self._lock:
if self._gracefully_shutting_down:
return
else:
for service in self._server_status:
self.set(service,
_health_pb2.HealthCheckResponse.NOT_SERVING) # pylint: disable=no-member
self._gracefully_shutting_down = True
5 changes: 4 additions & 1 deletion pkg/suggestion/v1alpha3/hyperopt_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

from pkg.apis.manager.v1alpha3.python import api_pb2
from pkg.apis.manager.v1alpha3.python import api_pb2_grpc
from pkg.apis.manager.health.python import health_pb2

from pkg.suggestion.v1alpha3.internal.search_space import HyperParameter, HyperParameterSearchSpace
from pkg.suggestion.v1alpha3.internal.trial import Trial, Assignment
from pkg.suggestion.v1alpha3.hyperopt.base_hyperopt_service import BaseHyperoptService
from pkg.suggestion.v1alpha3.base_health_service import HealthServicer

logger = logging.getLogger("HyperoptRandomService")


class HyperoptService(
api_pb2_grpc.SuggestionServicer):
api_pb2_grpc.SuggestionServicer, HealthServicer):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change, it is mandatory for all suggestion service to use HealthServices. Right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we need to add health servicer to all algorithms

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. We should add them in the suggestion doc separately. Also, Dockerfile instructions to have graph-health-check binary also has to be documented.

def GetSuggestions(self, request, context):
"""
Main function to provide suggestion.
Expand Down
6 changes: 3 additions & 3 deletions scripts/v1alpha3/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ SCRIPT_ROOT=$(dirname ${BASH_SOURCE})/../..

cd ${SCRIPT_ROOT}

echo "Building suggestion images..."
docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/suggestion-hyperopt -f ${CMD_PREFIX}/suggestion/hyperopt/v1alpha3/Dockerfile .

echo "Building core image..."
docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/katib-controller -f ${CMD_PREFIX}/katib-controller/v1alpha3/Dockerfile .
docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/katib-manager -f ${CMD_PREFIX}/manager/v1alpha3/Dockerfile .
Expand All @@ -39,6 +42,3 @@ docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/file-metrics-collector -f ${CMD_P

echo "Building TF Event metrics collector image..."
docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/tfevent-metrics-collector -f ${CMD_PREFIX}/metricscollector/v1alpha3/tfevent-metricscollector/Dockerfile .

echo "Building suggestion images..."
docker build -t ${REGISTRY}/${PREFIX}/v1alpha3/suggestion-hyperopt -f ${CMD_PREFIX}/suggestion/hyperopt/v1alpha3/Dockerfile .
2 changes: 1 addition & 1 deletion test/scripts/v1alpha3/python-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ set -o errexit
set -o nounset
set -o pipefail

export PYTHONPATH=$(pwd):$(pwd)/pkg/apis/manager/v1alpha3/python
export PYTHONPATH=$(pwd):$(pwd)/pkg/apis/manager/v1alpha3/python:$(pwd)/pkg/apis/manager/health/python
pip install -r test/suggestion/v1alpha3/test_requirements.txt
pip install -r cmd/suggestion/hyperopt/v1alpha3/requirements.txt
pytest -s ./test