Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pyfunc] Add ability to push metrics to Prometheus push gateway #311

Merged
merged 8 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion python/pyfunc-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ Pyfunc server can be configured via following environment variables
| WORKERS | Number of Python processes that will be created to allow multi processing (default = 1) |
| LOG_LEVEL | Log level, valid values are `INFO`, `ERROR`, `DEBUG`, `WARN`, `CRITICAL` (default='INFO') |
| GRPC_OPTIONS | GRPC options to configure UPI server as json string. The possible options can be found in [grpc_types.h](https://github.com/grpc/grpc/blob/v1.46.x/include/grpc/impl/codegen/grpc_types.h). Example: '{"grpc.max_concurrent_streams":100}' |
| GRPC_CONCURRENCY | Size of grpc handler threadpool per worker (default = 10) |
| GRPC_CONCURRENCY | Size of grpc handler threadpool per worker (default = 10) |
| PUSHGATEWAY_ENABLED | Enable pushing metrics to prometheus push gateway, only available when `CARAML_PROTOCOL` is set to `UPI_V1` (default = false) |
| PUSHGATEWAY_URL | Url of the prometheus push gateway (default = localhost:9091) |
| PUSHGATEWAY_PUSH_INTERVAL_SEC | Interval in seconds for pushing metrics to prometheus push gateway (default = 30) |

## Directory Structure

Expand Down
2 changes: 0 additions & 2 deletions python/pyfunc-server/pyfuncserver/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
from pyfuncserver.server import PyFuncServer
from pyfuncserver.utils.contants import ERR_DRY_RUN

DEFAULT_MODEL_NAME = "model"

parser = argparse.ArgumentParser()
parser.add_argument('--model_dir', required=True,
help='A URI pointer to the model binary')
Expand Down
76 changes: 44 additions & 32 deletions python/pyfunc-server/pyfuncserver/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,24 @@
import logging
import os

# Following environment variables are expected to be populated by Merlin
from merlin.protocol import Protocol

HTTP_PORT = "CARAML_HTTP_PORT"
MODEL_NAME = "CARAML_MODEL_NAME"
MODEL_VERSION = "CARAML_MODEL_VERSION"
MODEL_FULL_NAME = "CARAML_MODEL_FULL_NAME"
PROTOCOL = "CARAML_PROTOCOL"
WORKERS = "WORKERS"
GRPC_PORT = "CARAML_GRPC_PORT"
LOG_LEVEL = "LOG_LEVEL"
GRPC_OPTIONS = "GRPC_OPTIONS"
GRPC_CONCURRENCY = "GRPC_CONCURRENCY"

DEFAULT_HTTP_PORT = 8080
DEFAULT_GRPC_PORT = 9000
DEFAULT_MODEL_NAME = "model"
DEFAULT_MODEL_VERSION = "1"
DEFAULT_FULL_NAME = f"{DEFAULT_MODEL_NAME}-{DEFAULT_MODEL_VERSION}"
DEFAULT_LOG_LEVEL = "INFO"
DEFAULT_PROTOCOL = "HTTP_JSON"
DEFAULT_GRPC_OPTIONS = "{}"
DEFAULT_GRPC_CONCURRENCY = "10"
# Following environment variables are expected to be populated by Merlin
HTTP_PORT = ("CARAML_HTTP_PORT", 8080)
MODEL_NAME = ("CARAML_MODEL_NAME", "model")
MODEL_VERSION = ("CARAML_MODEL_VERSION", "1")
MODEL_FULL_NAME = ("CARAML_MODEL_FULL_NAME", "model-1")
PROTOCOL = ("CARAML_PROTOCOL", "HTTP_JSON")

WORKERS = ("WORKERS", 1)
GRPC_PORT = ("CARAML_GRPC_PORT", 9000)
LOG_LEVEL = ("LOG_LEVEL", "INFO")
GRPC_OPTIONS = ("GRPC_OPTIONS", "{}")
GRPC_CONCURRENCY = ("GRPC_CONCURRENCY", 10)

PUSHGATEWAY_ENABLED = ("PUSHGATEWAY_ENABLED", "false")
PUSHGATEWAY_URL = ("PUSHGATEWAY_URL", "localhost:9091")
PUSHGATEWAY_PUSH_INTERVAL_SEC = ("PUSHGATEWAY_PUSH_INTERVAL_SEC", 30)

class ModelManifest:
"""
Expand All @@ -39,39 +33,57 @@ def __init__(self, model_name: str, model_version: str, model_full_name: str, mo
self.model_dir = model_dir


class PushGateway:
def __init__(self, enabled, url, push_interval_sec):
self.url = url
self.enabled = enabled
self.push_interval_sec = push_interval_sec


class Config:
"""
Server Configuration
"""

def __init__(self, model_dir: str):
self.protocol = Protocol(os.getenv(PROTOCOL, DEFAULT_PROTOCOL))
self.http_port = int(os.getenv(HTTP_PORT, DEFAULT_HTTP_PORT))
self.grpc_port = int(os.getenv(GRPC_PORT, DEFAULT_GRPC_PORT))
self.protocol = Protocol(os.getenv(*PROTOCOL))
self.http_port = int(os.getenv(*HTTP_PORT))
self.grpc_port = int(os.getenv(*GRPC_PORT))

# Model manifest
model_name = os.getenv(MODEL_NAME, DEFAULT_MODEL_NAME)
model_version = os.getenv(MODEL_VERSION, DEFAULT_MODEL_VERSION)
model_full_name = os.getenv(MODEL_FULL_NAME, DEFAULT_FULL_NAME)
model_name = os.getenv(*MODEL_NAME)
model_version = os.getenv(*MODEL_VERSION)
model_full_name = os.getenv(*MODEL_FULL_NAME)
self.model_manifest = ModelManifest(model_name, model_version, model_full_name, model_dir)

self.workers = int(os.getenv(WORKERS, 1))
self.workers = int(os.getenv(*WORKERS))
self.log_level = self._log_level()

self.grpc_options = self._to_grpc_options(os.getenv(GRPC_OPTIONS, DEFAULT_GRPC_OPTIONS))
self.grpc_concurrency = int(os.getenv(GRPC_CONCURRENCY, DEFAULT_GRPC_CONCURRENCY))
self.grpc_options = self._grpc_options()
self.grpc_concurrency = int(os.getenv(*GRPC_CONCURRENCY))

push_enabled = str_to_bool(os.getenv(*PUSHGATEWAY_ENABLED))
push_url = os.getenv(*PUSHGATEWAY_URL)
push_interval = os.getenv(*PUSHGATEWAY_PUSH_INTERVAL_SEC)
self.push_gateway = PushGateway(push_enabled,
push_url,
push_interval)

def _log_level(self):
log_level = os.getenv(LOG_LEVEL, DEFAULT_LOG_LEVEL)
log_level = os.getenv(*LOG_LEVEL)
numeric_level = getattr(logging, log_level.upper(), None)
if not isinstance(numeric_level, int):
logging.warning(f"invalid log level {log_level}")
return logging.INFO
return numeric_level

def _to_grpc_options(self, raw_options: str):
def _grpc_options(self):
raw_options = os.getenv(*GRPC_OPTIONS)
options = json.loads(raw_options)
grpc_options = []
for k, v in options.items():
grpc_options.append((k, v))
return grpc_options

def str_to_bool(str: str)->bool:
return str.lower() in ("true", "1")
39 changes: 39 additions & 0 deletions python/pyfunc-server/pyfuncserver/metrics/pusher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging
import socket
import time
from threading import Thread

from prometheus_client import push_to_gateway

from pyfuncserver.config import Config


def start_metrics_pusher(push_gateway, registry, target_info, interval_sec):
"""
Start periodic job to push metrics to prometheus push gatewat
"""
logging.info(f"starting metrics pusher, url: {push_gateway} with interval {interval_sec} s")
daemon = Thread(target=push_metrics, args=(push_gateway, registry, interval_sec, target_info),
daemon=True, name='metrics_push')
daemon.start()


def push_metrics(gateway_url, registry, interval_sec, grouping_keys):
"""
push metrics to prometheus push gateway every interval_sec
Should be called in separate thread
"""
while True:
push_to_gateway(gateway_url, "merlin_pyfunc_upi", registry=registry, grouping_key=grouping_keys)
time.sleep(interval_sec)


def labels(config: Config):
"""
Labels to be added to all metrics
"""
return {
"merlin_model_name": config.model_manifest.model_name,
"merlin_model_version": config.model_manifest.model_version,
"host": socket.getfqdn()
}
16 changes: 14 additions & 2 deletions python/pyfunc-server/pyfuncserver/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

import prometheus_client
from merlin.protocol import Protocol
from prometheus_client import CollectorRegistry, multiprocess

from pyfuncserver.config import Config
from pyfuncserver.metrics.pusher import labels, start_metrics_pusher
from pyfuncserver.model.model import PyFuncModel
from pyfuncserver.protocol.rest.server import HTTPServer
from pyfuncserver.protocol.upi.server import UPIServer
Expand All @@ -38,8 +40,18 @@ def start(self, model: PyFuncModel):
http_server = HTTPServer(model=model, config=self._config, metrics_registry=registry)
http_server.start()
elif self._config.protocol == Protocol.UPI_V1:
# start prometheus metrics server and listen at http port
prometheus_client.start_http_server(self._config.http_port, registry=registry)
# Due to https://github.com/knative/serving/issues/8471, we have to resort to pushing metrics to
# prometheus push gateway.
if (self._config.push_gateway.enabled):
target_info = labels(self._config)
start_metrics_pusher(self._config.push_gateway.url,
registry,
target_info,
self._config.push_gateway.push_interval_sec)
else:
# start prometheus metrics server and listen at http port
logging.info(f"starting metrics server at {self._config.http_port}")
prometheus_client.start_http_server(self._config.http_port, registry=registry)

# start grpc/upi server and listen at grpc port
upi_server = UPIServer(model=model, config=self._config)
Expand Down
12 changes: 6 additions & 6 deletions python/pyfunc-server/test/test_backward_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ def test_model_int(model):

try:
env["PROMETHEUS_MULTIPROC_DIR"] = "prometheus"
env[HTTP_PORT] = "8081"
env[WORKERS] = "1"
env[HTTP_PORT[0]] = "8081"
env[WORKERS[0]] = "1"
c = subprocess.Popen(["python", "-m", "pyfuncserver", "--model_dir", model_path], env=env)

# wait till the server is up
Expand Down Expand Up @@ -265,8 +265,8 @@ def test_model_headers(model):

try:
env["PROMETHEUS_MULTIPROC_DIR"] = "prometheus"
env[HTTP_PORT] = "8081"
env[WORKERS] = "1"
env[HTTP_PORT[0]] = "8081"
env[WORKERS[0]] = "1"
c = subprocess.Popen(["python", "-m", "pyfuncserver", "--model_dir", model_path], env=env)

# wait till the server is up
Expand Down Expand Up @@ -303,8 +303,8 @@ def test_error_model_int(error_core, message, model):

try:
env["PROMETHEUS_MULTIPROC_DIR"] = "prometheus"
env[HTTP_PORT] = "8081"
env[WORKERS] = "1"
env[HTTP_PORT[0]] = "8081"
env[WORKERS[0]] = "1"
c = subprocess.Popen(["python", "-m", "pyfuncserver", "--model_dir", model_path], env=env)

# wait till the server is up
Expand Down
20 changes: 10 additions & 10 deletions python/pyfunc-server/test/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ def test_http_protocol():
port = "8081"

try:
env[HTTP_PORT] = port
env[MODEL_NAME] = model_name
env[MODEL_VERSION] = model_version
env[MODEL_FULL_NAME] = model_full_name
env[WORKERS] = "1"
env[HTTP_PORT[0]] = port
env[MODEL_NAME[0]] = model_name
env[MODEL_VERSION[0]] = model_version
env[MODEL_FULL_NAME[0]] = model_full_name
env[WORKERS[0]] = "1"
env["PROMETHEUS_MULTIPROC_DIR"] = "prometheus"
c = subprocess.Popen(["python", "-m", "pyfuncserver", "--model_dir", model_path], env=env)

Expand Down Expand Up @@ -93,11 +93,11 @@ def test_metrics():
try:
pathlib.Path(metrics_path).mkdir(exist_ok=True)

env[HTTP_PORT] = port
env[MODEL_NAME] = model_name
env[MODEL_VERSION] = model_version
env[MODEL_FULL_NAME] = model_full_name
env[WORKERS] = "4"
env[HTTP_PORT[0]] = port
env[MODEL_NAME[0]] = model_name
env[MODEL_VERSION[0]] = model_version
env[MODEL_FULL_NAME[0]] = model_full_name
env[WORKERS[0]] = "4"
env["PROMETHEUS_MULTIPROC_DIR"] = metrics_path
c = subprocess.Popen(["python", "-m", "pyfuncserver", "--model_dir", model_path], env=env,
start_new_session=True)
Expand Down
15 changes: 8 additions & 7 deletions python/pyfunc-server/test/test_upi.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import pathlib
import re
Expand Down Expand Up @@ -184,13 +185,13 @@ def start_upi_server(model_name="my-model", model_version="1", http_port=8080, g
mlflow.end_run()
pathlib.Path(metrics_path).mkdir(exist_ok=True)

env[PROTOCOL] = Protocol.UPI_V1.value
env[HTTP_PORT] = str(http_port)
env[GRPC_PORT] = str(grpc_port)
env[MODEL_NAME] = model_name
env[MODEL_VERSION] = model_version
env[MODEL_FULL_NAME] = model_full_name
env[WORKERS] = str(workers)
env[PROTOCOL[0]] = Protocol.UPI_V1.value
env[HTTP_PORT[0]] = str(http_port)
env[GRPC_PORT[0]] = str(grpc_port)
env[MODEL_NAME[0]] = model_name
env[MODEL_VERSION[0]] = model_version
env[MODEL_FULL_NAME[0]] = model_full_name
env[WORKERS[0]] = str(workers)
env["PROMETHEUS_MULTIPROC_DIR"] = metrics_path
pid = subprocess.Popen(["python", "-m", "pyfuncserver", "--model_dir", model_path], env=env, start_new_session=True)

Expand Down
4 changes: 2 additions & 2 deletions scripts/e2e/config/kserve/overlay.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ spec:
cpu: 1
memory: 300Mi
requests:
cpu: 80m
cpu: 100m
memory: 64Mi

---
Expand Down Expand Up @@ -206,7 +206,7 @@ data:
"image" : "ghcr.io/ariefrahmansyah/kfserving-storage-init:latest",
"memoryRequest": "50Mi",
"memoryLimit": "1Gi",
"cpuRequest": "20m",
"cpuRequest": "10m",
"cpuLimit": "1"
}
transformers: |-
Expand Down
4 changes: 2 additions & 2 deletions scripts/e2e/deploy-merlin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ install_mlp() {
--set ingress.host=mlp.mlp.${INGRESS_HOST} \
--wait --timeout=${TIMEOUT}

kubectl apply -f config/mock/message-dumper.yaml

kubectl rollout status deployment/mlp -n mlp -w --timeout=${TIMEOUT}

kubectl apply -f config/mock/message-dumper.yaml
}

install_merlin() {
Expand Down