Skip to content

Commit

Permalink
[Pyfunc] Add ability to push metrics to Prometheus push gateway (#311)
Browse files Browse the repository at this point in the history
<!--  Thanks for sending a pull request!  Here are some tips for you:

1. Run unit tests and ensure that they are passing
2. If your change introduces any API changes, make sure to update the
e2e tests
3. Make sure documentation is updated for your PR!

-->

**What this PR does / why we need it**:
<!-- Explain here the context and why you're making the change. What is
the problem you're trying to solve. --->
Due to limitation in knative
(knative/serving#8471) . It's currently not
possible to expose prometheus metrics server endpoint in separate port
from the grpc port. As such, custom metrics from pyfunc server has to be
pushed using prometheus push gateway.

This PR add 3 new configuration to support that

| Environment Variable | Description |
| -------------------------
|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| PUSHGATEWAY_ENABLED | Enable pushing metrics to prometheus push
gateway, only available when `CARAML_PROTOCOL` is set to `UPI_V1`
(default = false) |
| PUSHGATEWAY_URL | URL of the prometheus push gateway (default =
localhost:9091) |
| PUSHGATEWAY_PUSH_INTERVAL_SEC | Interval in seconds for pushing
metrics to prometheus push gateway (default = 30) |
  

**Which issue(s) this PR fixes**:
<!--
*Automatically closes linked issue when PR is merged.
Usage: `Fixes #<issue number>`, or `Fixes (paste link of issue)`.
-->


Fixes #

**Does this PR introduce a user-facing change?**:
<!--
If no, just write "NONE" in the release-note block below.
If yes, a release note is required. Enter your extended release note in
the block below.
If the PR requires additional action from users switching to the new
release, include the string "action required".

For more information about release notes, see kubernetes' guide here:
http://git.k8s.io/community/contributors/guide/release-notes.md
-->

```release-note

```

**Checklist**

- [X] Tested locally
- [X] Updated documentation
  • Loading branch information
aria authored Feb 6, 2023
1 parent f37de15 commit 72616dd
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 64 deletions.
5 changes: 4 additions & 1 deletion python/pyfunc-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,10 @@ Pyfunc server can be configured via following environment variables
| WORKERS | Number of Python processes that will be created to allow multi processing (default = 1) |
| LOG_LEVEL | Log level, valid values are `INFO`, `ERROR`, `DEBUG`, `WARN`, `CRITICAL` (default='INFO') |
| GRPC_OPTIONS | GRPC options to configure UPI server as json string. The possible options can be found in [grpc_types.h](https://github.com/grpc/grpc/blob/v1.46.x/include/grpc/impl/codegen/grpc_types.h). Example: '{"grpc.max_concurrent_streams":100}' |
| GRPC_CONCURRENCY | Size of grpc handler threadpool per worker (default = 10) |
| GRPC_CONCURRENCY | Size of grpc handler threadpool per worker (default = 10) |
| PUSHGATEWAY_ENABLED | Enable pushing metrics to prometheus push gateway, only available when `CARAML_PROTOCOL` is set to `UPI_V1` (default = false) |
| PUSHGATEWAY_URL | Url of the prometheus push gateway (default = localhost:9091) |
| PUSHGATEWAY_PUSH_INTERVAL_SEC | Interval in seconds for pushing metrics to prometheus push gateway (default = 30) |

## Directory Structure

Expand Down
2 changes: 0 additions & 2 deletions python/pyfunc-server/pyfuncserver/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
from pyfuncserver.server import PyFuncServer
from pyfuncserver.utils.contants import ERR_DRY_RUN

DEFAULT_MODEL_NAME = "model"

parser = argparse.ArgumentParser()
parser.add_argument('--model_dir', required=True,
help='A URI pointer to the model binary')
Expand Down
76 changes: 44 additions & 32 deletions python/pyfunc-server/pyfuncserver/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,24 @@
import logging
import os

# Following environment variables are expected to be populated by Merlin
from merlin.protocol import Protocol

HTTP_PORT = "CARAML_HTTP_PORT"
MODEL_NAME = "CARAML_MODEL_NAME"
MODEL_VERSION = "CARAML_MODEL_VERSION"
MODEL_FULL_NAME = "CARAML_MODEL_FULL_NAME"
PROTOCOL = "CARAML_PROTOCOL"
WORKERS = "WORKERS"
GRPC_PORT = "CARAML_GRPC_PORT"
LOG_LEVEL = "LOG_LEVEL"
GRPC_OPTIONS = "GRPC_OPTIONS"
GRPC_CONCURRENCY = "GRPC_CONCURRENCY"

DEFAULT_HTTP_PORT = 8080
DEFAULT_GRPC_PORT = 9000
DEFAULT_MODEL_NAME = "model"
DEFAULT_MODEL_VERSION = "1"
DEFAULT_FULL_NAME = f"{DEFAULT_MODEL_NAME}-{DEFAULT_MODEL_VERSION}"
DEFAULT_LOG_LEVEL = "INFO"
DEFAULT_PROTOCOL = "HTTP_JSON"
DEFAULT_GRPC_OPTIONS = "{}"
DEFAULT_GRPC_CONCURRENCY = "10"
# Following environment variables are expected to be populated by Merlin
HTTP_PORT = ("CARAML_HTTP_PORT", 8080)
MODEL_NAME = ("CARAML_MODEL_NAME", "model")
MODEL_VERSION = ("CARAML_MODEL_VERSION", "1")
MODEL_FULL_NAME = ("CARAML_MODEL_FULL_NAME", "model-1")
PROTOCOL = ("CARAML_PROTOCOL", "HTTP_JSON")

WORKERS = ("WORKERS", 1)
GRPC_PORT = ("CARAML_GRPC_PORT", 9000)
LOG_LEVEL = ("LOG_LEVEL", "INFO")
GRPC_OPTIONS = ("GRPC_OPTIONS", "{}")
GRPC_CONCURRENCY = ("GRPC_CONCURRENCY", 10)

PUSHGATEWAY_ENABLED = ("PUSHGATEWAY_ENABLED", "false")
PUSHGATEWAY_URL = ("PUSHGATEWAY_URL", "localhost:9091")
PUSHGATEWAY_PUSH_INTERVAL_SEC = ("PUSHGATEWAY_PUSH_INTERVAL_SEC", 30)

class ModelManifest:
"""
Expand All @@ -39,39 +33,57 @@ def __init__(self, model_name: str, model_version: str, model_full_name: str, mo
self.model_dir = model_dir


class PushGateway:
def __init__(self, enabled, url, push_interval_sec):
self.url = url
self.enabled = enabled
self.push_interval_sec = push_interval_sec


class Config:
"""
Server Configuration
"""

def __init__(self, model_dir: str):
self.protocol = Protocol(os.getenv(PROTOCOL, DEFAULT_PROTOCOL))
self.http_port = int(os.getenv(HTTP_PORT, DEFAULT_HTTP_PORT))
self.grpc_port = int(os.getenv(GRPC_PORT, DEFAULT_GRPC_PORT))
self.protocol = Protocol(os.getenv(*PROTOCOL))
self.http_port = int(os.getenv(*HTTP_PORT))
self.grpc_port = int(os.getenv(*GRPC_PORT))

# Model manifest
model_name = os.getenv(MODEL_NAME, DEFAULT_MODEL_NAME)
model_version = os.getenv(MODEL_VERSION, DEFAULT_MODEL_VERSION)
model_full_name = os.getenv(MODEL_FULL_NAME, DEFAULT_FULL_NAME)
model_name = os.getenv(*MODEL_NAME)
model_version = os.getenv(*MODEL_VERSION)
model_full_name = os.getenv(*MODEL_FULL_NAME)
self.model_manifest = ModelManifest(model_name, model_version, model_full_name, model_dir)

self.workers = int(os.getenv(WORKERS, 1))
self.workers = int(os.getenv(*WORKERS))
self.log_level = self._log_level()

self.grpc_options = self._to_grpc_options(os.getenv(GRPC_OPTIONS, DEFAULT_GRPC_OPTIONS))
self.grpc_concurrency = int(os.getenv(GRPC_CONCURRENCY, DEFAULT_GRPC_CONCURRENCY))
self.grpc_options = self._grpc_options()
self.grpc_concurrency = int(os.getenv(*GRPC_CONCURRENCY))

push_enabled = str_to_bool(os.getenv(*PUSHGATEWAY_ENABLED))
push_url = os.getenv(*PUSHGATEWAY_URL)
push_interval = os.getenv(*PUSHGATEWAY_PUSH_INTERVAL_SEC)
self.push_gateway = PushGateway(push_enabled,
push_url,
push_interval)

def _log_level(self):
log_level = os.getenv(LOG_LEVEL, DEFAULT_LOG_LEVEL)
log_level = os.getenv(*LOG_LEVEL)
numeric_level = getattr(logging, log_level.upper(), None)
if not isinstance(numeric_level, int):
logging.warning(f"invalid log level {log_level}")
return logging.INFO
return numeric_level

def _to_grpc_options(self, raw_options: str):
def _grpc_options(self):
raw_options = os.getenv(*GRPC_OPTIONS)
options = json.loads(raw_options)
grpc_options = []
for k, v in options.items():
grpc_options.append((k, v))
return grpc_options

def str_to_bool(str: str)->bool:
return str.lower() in ("true", "1")
39 changes: 39 additions & 0 deletions python/pyfunc-server/pyfuncserver/metrics/pusher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import logging
import socket
import time
from threading import Thread

from prometheus_client import push_to_gateway

from pyfuncserver.config import Config


def start_metrics_pusher(push_gateway, registry, target_info, interval_sec):
"""
Start periodic job to push metrics to prometheus push gatewat
"""
logging.info(f"starting metrics pusher, url: {push_gateway} with interval {interval_sec} s")
daemon = Thread(target=push_metrics, args=(push_gateway, registry, interval_sec, target_info),
daemon=True, name='metrics_push')
daemon.start()


def push_metrics(gateway_url, registry, interval_sec, grouping_keys):
"""
push metrics to prometheus push gateway every interval_sec
Should be called in separate thread
"""
while True:
push_to_gateway(gateway_url, "merlin_pyfunc_upi", registry=registry, grouping_key=grouping_keys)
time.sleep(interval_sec)


def labels(config: Config):
"""
Labels to be added to all metrics
"""
return {
"merlin_model_name": config.model_manifest.model_name,
"merlin_model_version": config.model_manifest.model_version,
"host": socket.getfqdn()
}
16 changes: 14 additions & 2 deletions python/pyfunc-server/pyfuncserver/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging

import prometheus_client
from merlin.protocol import Protocol
from prometheus_client import CollectorRegistry, multiprocess

from pyfuncserver.config import Config
from pyfuncserver.metrics.pusher import labels, start_metrics_pusher
from pyfuncserver.model.model import PyFuncModel
from pyfuncserver.protocol.rest.server import HTTPServer
from pyfuncserver.protocol.upi.server import UPIServer
Expand All @@ -38,8 +40,18 @@ def start(self, model: PyFuncModel):
http_server = HTTPServer(model=model, config=self._config, metrics_registry=registry)
http_server.start()
elif self._config.protocol == Protocol.UPI_V1:
# start prometheus metrics server and listen at http port
prometheus_client.start_http_server(self._config.http_port, registry=registry)
# Due to https://github.com/knative/serving/issues/8471, we have to resort to pushing metrics to
# prometheus push gateway.
if (self._config.push_gateway.enabled):
target_info = labels(self._config)
start_metrics_pusher(self._config.push_gateway.url,
registry,
target_info,
self._config.push_gateway.push_interval_sec)
else:
# start prometheus metrics server and listen at http port
logging.info(f"starting metrics server at {self._config.http_port}")
prometheus_client.start_http_server(self._config.http_port, registry=registry)

# start grpc/upi server and listen at grpc port
upi_server = UPIServer(model=model, config=self._config)
Expand Down
12 changes: 6 additions & 6 deletions python/pyfunc-server/test/test_backward_compatibility.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ def test_model_int(model):

try:
env["PROMETHEUS_MULTIPROC_DIR"] = "prometheus"
env[HTTP_PORT] = "8081"
env[WORKERS] = "1"
env[HTTP_PORT[0]] = "8081"
env[WORKERS[0]] = "1"
c = subprocess.Popen(["python", "-m", "pyfuncserver", "--model_dir", model_path], env=env)

# wait till the server is up
Expand Down Expand Up @@ -265,8 +265,8 @@ def test_model_headers(model):

try:
env["PROMETHEUS_MULTIPROC_DIR"] = "prometheus"
env[HTTP_PORT] = "8081"
env[WORKERS] = "1"
env[HTTP_PORT[0]] = "8081"
env[WORKERS[0]] = "1"
c = subprocess.Popen(["python", "-m", "pyfuncserver", "--model_dir", model_path], env=env)

# wait till the server is up
Expand Down Expand Up @@ -303,8 +303,8 @@ def test_error_model_int(error_core, message, model):

try:
env["PROMETHEUS_MULTIPROC_DIR"] = "prometheus"
env[HTTP_PORT] = "8081"
env[WORKERS] = "1"
env[HTTP_PORT[0]] = "8081"
env[WORKERS[0]] = "1"
c = subprocess.Popen(["python", "-m", "pyfuncserver", "--model_dir", model_path], env=env)

# wait till the server is up
Expand Down
20 changes: 10 additions & 10 deletions python/pyfunc-server/test/test_http.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ def test_http_protocol():
port = "8081"

try:
env[HTTP_PORT] = port
env[MODEL_NAME] = model_name
env[MODEL_VERSION] = model_version
env[MODEL_FULL_NAME] = model_full_name
env[WORKERS] = "1"
env[HTTP_PORT[0]] = port
env[MODEL_NAME[0]] = model_name
env[MODEL_VERSION[0]] = model_version
env[MODEL_FULL_NAME[0]] = model_full_name
env[WORKERS[0]] = "1"
env["PROMETHEUS_MULTIPROC_DIR"] = "prometheus"
c = subprocess.Popen(["python", "-m", "pyfuncserver", "--model_dir", model_path], env=env)

Expand Down Expand Up @@ -93,11 +93,11 @@ def test_metrics():
try:
pathlib.Path(metrics_path).mkdir(exist_ok=True)

env[HTTP_PORT] = port
env[MODEL_NAME] = model_name
env[MODEL_VERSION] = model_version
env[MODEL_FULL_NAME] = model_full_name
env[WORKERS] = "4"
env[HTTP_PORT[0]] = port
env[MODEL_NAME[0]] = model_name
env[MODEL_VERSION[0]] = model_version
env[MODEL_FULL_NAME[0]] = model_full_name
env[WORKERS[0]] = "4"
env["PROMETHEUS_MULTIPROC_DIR"] = metrics_path
c = subprocess.Popen(["python", "-m", "pyfuncserver", "--model_dir", model_path], env=env,
start_new_session=True)
Expand Down
15 changes: 8 additions & 7 deletions python/pyfunc-server/test/test_upi.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os
import pathlib
import re
Expand Down Expand Up @@ -184,13 +185,13 @@ def start_upi_server(model_name="my-model", model_version="1", http_port=8080, g
mlflow.end_run()
pathlib.Path(metrics_path).mkdir(exist_ok=True)

env[PROTOCOL] = Protocol.UPI_V1.value
env[HTTP_PORT] = str(http_port)
env[GRPC_PORT] = str(grpc_port)
env[MODEL_NAME] = model_name
env[MODEL_VERSION] = model_version
env[MODEL_FULL_NAME] = model_full_name
env[WORKERS] = str(workers)
env[PROTOCOL[0]] = Protocol.UPI_V1.value
env[HTTP_PORT[0]] = str(http_port)
env[GRPC_PORT[0]] = str(grpc_port)
env[MODEL_NAME[0]] = model_name
env[MODEL_VERSION[0]] = model_version
env[MODEL_FULL_NAME[0]] = model_full_name
env[WORKERS[0]] = str(workers)
env["PROMETHEUS_MULTIPROC_DIR"] = metrics_path
pid = subprocess.Popen(["python", "-m", "pyfuncserver", "--model_dir", model_path], env=env, start_new_session=True)

Expand Down
4 changes: 2 additions & 2 deletions scripts/e2e/config/kserve/overlay.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ spec:
cpu: 1
memory: 300Mi
requests:
cpu: 80m
cpu: 100m
memory: 64Mi

---
Expand Down Expand Up @@ -206,7 +206,7 @@ data:
"image" : "ghcr.io/ariefrahmansyah/kfserving-storage-init:latest",
"memoryRequest": "50Mi",
"memoryLimit": "1Gi",
"cpuRequest": "20m",
"cpuRequest": "10m",
"cpuLimit": "1"
}
transformers: |-
Expand Down
4 changes: 2 additions & 2 deletions scripts/e2e/deploy-merlin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ install_mlp() {
--set ingress.host=mlp.mlp.${INGRESS_HOST} \
--wait --timeout=${TIMEOUT}

kubectl apply -f config/mock/message-dumper.yaml

kubectl rollout status deployment/mlp -n mlp -w --timeout=${TIMEOUT}

kubectl apply -f config/mock/message-dumper.yaml
}

install_merlin() {
Expand Down

0 comments on commit 72616dd

Please sign in to comment.