Skip to content

Commit

Permalink
Maximum timeout for python functions (vmware-archive#433)
Browse files Browse the repository at this point in the history
* Maximum timeout for python functions

* Fix image reference

* Fix unit test

* Fix pubsub34 test and fix it

* Use threads instead of processes

* Adapt unit test

* Revert "Adapt unit test"

This reverts commit b2434c9.

* Revert "Use threads instead of processes"

This reverts commit 045dcdd.

* Increase timeout to 180s

* Fix unit test
  • Loading branch information
andresmgot authored Nov 23, 2017
1 parent 15ca69b commit c022790
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 15 deletions.
17 changes: 16 additions & 1 deletion docker/runtime/python-2.7/event-trigger/kubeless.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
import imp
import json

from multiprocessing import Process, Queue
from kafka import KafkaConsumer
import prometheus_client as prom

mod_name = os.getenv('MOD_NAME')
func_handler = os.getenv('FUNC_HANDLER')
topic_name = os.getenv('TOPIC_NAME')
timeout = float(os.getenv('FUNC_TIMEOUT', 180))

group = mod_name + func_handler

Expand Down Expand Up @@ -40,6 +42,9 @@
'Number of exceptions in user function',
['topic'])

def funcWrap(q, payload):
q.put(func(payload))

def json_safe_loads(msg):
try:
data = json.loads(msg)
Expand All @@ -56,7 +61,17 @@ def handle(msg):
func_calls.labels(topic_name).inc()
with func_errors.labels(topic_name).count_exceptions():
with func_hist.labels(topic_name).time():
return func(msg.value['payload'])
q = Queue()
p = Process(target=funcWrap, args=(q,msg.value['payload'],))
p.start()
p.join(timeout)
# If thread is still active
if p.is_alive():
p.terminate()
p.join()
raise Exception('Timeout while processing the function')
else:
return q.get()

if __name__ == '__main__':
prom.start_http_server(8080)
Expand Down
23 changes: 21 additions & 2 deletions docker/runtime/python-2.7/http-trigger/kubeless.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import os
import imp

from multiprocessing import Process, Queue
import bottle
import prometheus_client as prom

mod = imp.load_source('function',
'/kubeless/%s.py' % os.getenv('MOD_NAME'))
func = getattr(mod, os.getenv('FUNC_HANDLER'))

timeout = float(os.getenv('FUNC_TIMEOUT', 180))

app = application = bottle.app()

func_hist = prom.Histogram('function_duration_seconds',
Expand All @@ -22,17 +25,33 @@
'Number of exceptions in user function',
['method'])

def funcWrap(q, req):
if req is None:
q.put(func())
else:
q.put(func(req))

@app.route('/', method=['GET', 'POST'])
def handler():
req = bottle.request
method = req.method
func_calls.labels(method).inc()
with func_errors.labels(method).count_exceptions():
with func_hist.labels(method).time():
q = Queue()
if method == 'GET':
return func()
p = Process(target=funcWrap, args=(q,None,))
else:
p = Process(target=funcWrap, args=(q,bottle.request,))
p.start()
p.join(timeout)
# If thread is still active
if p.is_alive():
p.terminate()
p.join()
return bottle.HTTPError(408, "Timeout while processing the function")
else:
return func(bottle.request)
return q.get()

@app.get('/healthz')
def healthz():
Expand Down
23 changes: 19 additions & 4 deletions docker/runtime/python-3.4/event-trigger/kubeless.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,16 @@
import imp
import json

from multiprocessing import Process, Queue
from kafka import KafkaConsumer
import prometheus_client as prom

mod_name = os.getenv('MOD_NAME')
func_handler = os.getenv('FUNC_HANDLER')
topic_name = os.getenv('TOPIC_NAME')
timeout = float(os.getenv('FUNC_TIMEOUT', 180))

group = mod_name + func_handler

if "KUBELESS_KAFKA_SVC" in os.environ:
kafka_svc = os.getenv('KUBELESS_KAFKA_SVC')
Expand All @@ -21,12 +25,10 @@
if "KUBELESS_KAFKA_NAMESPACE" in os.environ:
kafka_namespace = os.getenv('KUBELESS_KAFKA_NAMESPACE')
else:
kafka_namespace = 'kubeless'
kafka_namespace = 'kubeless'

kafka_server = '%s.%s:9092' % (kafka_svc, kafka_namespace)

group = mod_name + func_handler

mod = imp.load_source('function', '/kubeless/%s.py' % mod_name)
func = getattr(mod, func_handler)

Expand All @@ -40,6 +42,9 @@
'Number of exceptions in user function',
['topic'])

def funcWrap(q, payload):
q.put(func(payload))

def json_safe_loads(msg):
try:
data = json.loads(msg)
Expand All @@ -56,7 +61,17 @@ def handle(msg):
func_calls.labels(topic_name).inc()
with func_errors.labels(topic_name).count_exceptions():
with func_hist.labels(topic_name).time():
return func(msg.value['payload'])
q = Queue()
p = Process(target=funcWrap, args=(q,msg.value['payload'],))
p.start()
p.join(timeout)
# If thread is still active
if p.is_alive():
p.terminate()
p.join()
raise Exception('Timeout while processing the function')
else:
return q.get()

if __name__ == '__main__':
prom.start_http_server(8080)
Expand Down
25 changes: 22 additions & 3 deletions docker/runtime/python-3.4/http-trigger/kubeless.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
#!/usr/bin/env python3
#!/usr/bin/env python

import os
import imp

from multiprocessing import Process, Queue
import bottle
import prometheus_client as prom

mod = imp.load_source('function',
'/kubeless/%s.py' % os.getenv('MOD_NAME'))
func = getattr(mod, os.getenv('FUNC_HANDLER'))

timeout = float(os.getenv('FUNC_TIMEOUT', 180))

app = application = bottle.app()

func_hist = prom.Histogram('function_duration_seconds',
Expand All @@ -22,17 +25,33 @@
'Number of exceptions in user function',
['method'])

def funcWrap(q, req):
if req is None:
q.put(func())
else:
q.put(func(req))

@app.route('/', method=['GET', 'POST'])
def handler():
req = bottle.request
method = req.method
func_calls.labels(method).inc()
with func_errors.labels(method).count_exceptions():
with func_hist.labels(method).time():
q = Queue()
if method == 'GET':
return func()
p = Process(target=funcWrap, args=(q,None,))
else:
p = Process(target=funcWrap, args=(q,bottle.request,))
p.start()
p.join(timeout)
# If thread is still active
if p.is_alive():
p.terminate()
p.join()
return bottle.HTTPError(408, "Timeout while processing the function")
else:
return func(bottle.request)
return q.get()

@app.get('/healthz')
def healthz():
Expand Down
10 changes: 10 additions & 0 deletions examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ get-python-34:
get-python-34-verify:
kubeless function call get-python |egrep hello.world

timeout-python:
$(eval TMPDIR := $(shell mktemp -d))
printf 'def foo():\n%4swhile 1: pass\n%4sreturn "hello world"\n' > $(TMPDIR)/hello-loop.py
kubeless function deploy timeout-python --trigger-http --runtime python2.7 --handler helloget.foo --from-file $(TMPDIR)/hello-loop.py --timeout 3
rm -rf $(TMPDIR)

timeout-python-verify:
$(eval MSG := $(shell kubeless function call timeout-python 2>&1 || true))
echo $(MSG) | egrep Request.timeout.exceeded

get-nodejs:
kubeless function deploy get-nodejs --trigger-http --runtime nodejs6 --handler helloget.foo --from-file nodejs/helloget.js
echo "curl localhost:8080/api/v1/proxy/namespaces/default/services/get-nodejs/"
Expand Down
8 changes: 4 additions & 4 deletions pkg/langruntime/langruntime.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
)

const (
python27Http = "bitnami/kubeless-python@sha256:6789266df0c97333f76e23efd58cf9c7efe24fa3e83b5fc826fd5cc317699b55"
python27Pubsub = "bitnami/kubeless-event-consumer@sha256:5ce469529811acf49c4d20bcd8a675be7aa029b43cf5252a8c9375b170859d83"
python27Http = "kubeless/python@sha256:ba948a6783b93d75037b7b1806a3925d441401ae6fba18282f712a1b1a786899"
python27Pubsub = "kubeless/python-event-consumer@sha256:1aeb6cef151222201abed6406694081db26fa2235d7ac128113dcebd8d73a6cb"
python27Init = "tuna/python-pillow:2.7.11-alpine" // TODO: Migrate the image for python 2.7 to an official source (not alpine-based)
python34Http = "bitnami/kubeless-python:test@sha256:686cd28cda5fe7bc6db60fa3e8a9a2c57a5eff6a58e66a60179cc1d3fcf1035b"
python34Pubsub = "bitnami/kubeless-python-event-consumer@sha256:8f92397258836e9c39948814aa5324c29d96ff3624b66dd70fdbad1ce0a1615e"
python34Http = "kubeless/python@sha256:631b406ab9681fe0da9c281949a885a95b7d8c9cea4a48d7dfd0fa2c0576e23e"
python34Pubsub = "kubeless/python-event-consumer@sha256:d963e4cd58229d662188d618cd87503b3c749b126b359ce724a19a375e4b3040"
python34Init = "python:3.4"
node6Http = "kubeless/nodejs@sha256:2e63bc5e0b3f3fe951ef73e51dde18c8daae17d5f1880e472cb3dfc3f92cdc0b"
node6Pubsub = "kubeless/nodejs-event-consumer@sha256:07ab6ca66f5119e610471ccfec5e7995e5c21dbaf3dc622427acf07bc1a63147"
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/k8sutil_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func TestEnsureDeployment(t *testing.T) {
}
expectedContainer := v1.Container{
Name: f1Name,
Image: "bitnami/kubeless-python@sha256:6789266df0c97333f76e23efd58cf9c7efe24fa3e83b5fc826fd5cc317699b55",
Image: "kubeless/python@sha256:ba948a6783b93d75037b7b1806a3925d441401ae6fba18282f712a1b1a786899",
Ports: []v1.ContainerPort{
{
ContainerPort: 8080,
Expand Down
3 changes: 3 additions & 0 deletions tests/integration-tests.bats
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ load ../script/libtest
@test "Test function: pubsub-python" {
test_kubeless_function pubsub-python
}
@test "Test function: pubsub-python34" {
test_kubeless_function pubsub-python34
}
@test "Test function update: pubsub-python" {
test_kubeless_function_update pubsub-python
}
Expand Down

0 comments on commit c022790

Please sign in to comment.