From 56a8a533d094d7c64881ec510b96a167d9da5786 Mon Sep 17 00:00:00 2001 From: Emil 'Skeen' Madsen Date: Wed, 19 Feb 2020 15:37:05 +0100 Subject: [PATCH] Added ASGI application (#512) * Added ASGI application * Factor out common-functionality for asgi/wsgi * Convert twisted to use WSGIResource * Change default HTTP Server to WSGI Server Signed-off-by: Emil Madsen --- README.md | 13 +++ prometheus_client/__init__.py | 5 + prometheus_client/asgi.py | 34 +++++++ prometheus_client/exposition.py | 83 +++++++-------- prometheus_client/twisted/_exposition.py | 20 +--- tests/test_asgi.py | 123 +++++++++++++++++++++++ tests/test_wsgi.py | 69 +++++++++++++ tox.ini | 1 + 8 files changed, 293 insertions(+), 55 deletions(-) create mode 100644 prometheus_client/asgi.py create mode 100644 tests/test_asgi.py create mode 100644 tests/test_wsgi.py diff --git a/README.md b/README.md index c7161fd4..586f5315 100644 --- a/README.md +++ b/README.md @@ -306,6 +306,19 @@ from prometheus_client import start_wsgi_server start_wsgi_server(8000) ``` +#### ASGI + +To use Prometheus with [ASGI](http://asgi.readthedocs.org/en/latest/), there is +`make_asgi_app` which creates an ASGI application. + +```python +from prometheus_client import make_asgi_app + +app = make_asgi_app() +``` +Such an application can be useful when integrating Prometheus metrics with ASGI +apps. + #### Flask To use Prometheus with [Flask](http://flask.pocoo.org/) we need to serve metrics through a Prometheus WSGI application. This can be achieved using [Flask's application dispatching](http://flask.pocoo.org/docs/latest/patterns/appdispatch/). Below is a working example. diff --git a/prometheus_client/__init__.py b/prometheus_client/__init__.py index 67f493f8..f176ad09 100644 --- a/prometheus_client/__init__.py +++ b/prometheus_client/__init__.py @@ -24,6 +24,11 @@ generate_latest = exposition.generate_latest MetricsHandler = exposition.MetricsHandler make_wsgi_app = exposition.make_wsgi_app +try: + # Python >3.5 only + make_asgi_app = exposition.make_asgi_app +except: + pass start_http_server = exposition.start_http_server start_wsgi_server = exposition.start_wsgi_server write_to_textfile = exposition.write_to_textfile diff --git a/prometheus_client/asgi.py b/prometheus_client/asgi.py new file mode 100644 index 00000000..7d7e4c7f --- /dev/null +++ b/prometheus_client/asgi.py @@ -0,0 +1,34 @@ +from urllib.parse import parse_qs + +from .exposition import _bake_output +from .registry import REGISTRY + + +def make_asgi_app(registry=REGISTRY): + """Create a ASGI app which serves the metrics from a registry.""" + + async def prometheus_app(scope, receive, send): + assert scope.get("type") == "http" + # Prepare parameters + params = parse_qs(scope.get('query_string', b'')) + accept_header = "Accept: " + ",".join([ + value.decode("utf8") for (name, value) in scope.get('headers') + if name.decode("utf8") == 'accept' + ]) + # Bake output + status, header, output = _bake_output(registry, accept_header, params) + # Return output + payload = await receive() + if payload.get("type") == "http.request": + await send( + { + "type": "http.response.start", + "status": int(status.split(' ')[0]), + "headers": [ + tuple(x.encode('utf8') for x in header) + ] + } + ) + await send({"type": "http.response.body", "body": output}) + + return prometheus_app diff --git a/prometheus_client/exposition.py b/prometheus_client/exposition.py index 6911ba75..634066f7 100644 --- a/prometheus_client/exposition.py +++ b/prometheus_client/exposition.py @@ -6,7 +6,7 @@ import socket import sys import threading -from wsgiref.simple_server import make_server, WSGIRequestHandler +from wsgiref.simple_server import make_server, WSGIServer, WSGIRequestHandler from .openmetrics import exposition as openmetrics from .registry import REGISTRY @@ -31,20 +31,27 @@ PYTHON26_OR_OLDER = sys.version_info < (2, 7) PYTHON376_OR_NEWER = sys.version_info > (3, 7, 5) + +def _bake_output(registry, accept_header, params): + """Bake output for metrics output.""" + encoder, content_type = choose_encoder(accept_header) + if 'name[]' in params: + registry = registry.restricted_registry(params['name[]']) + output = encoder(registry) + return str('200 OK'), (str('Content-Type'), content_type), output + + def make_wsgi_app(registry=REGISTRY): """Create a WSGI app which serves the metrics from a registry.""" def prometheus_app(environ, start_response): + # Prepare parameters + accept_header = environ.get('HTTP_ACCEPT') params = parse_qs(environ.get('QUERY_STRING', '')) - r = registry - encoder, content_type = choose_encoder(environ.get('HTTP_ACCEPT')) - if 'name[]' in params: - r = r.restricted_registry(params['name[]']) - output = encoder(r) - - status = str('200 OK') - headers = [(str('Content-type'), content_type)] - start_response(status, headers) + # Bake output + status, header, output = _bake_output(registry, accept_header, params) + # Return output + start_response(status, [header]) return [output] return prometheus_app @@ -57,15 +64,26 @@ def log_message(self, format, *args): """Log nothing.""" +class ThreadingWSGIServer(ThreadingMixIn, WSGIServer): + """Thread per request HTTP server.""" + # Make worker threads "fire and forget". Beginning with Python 3.7 this + # prevents a memory leak because ``ThreadingMixIn`` starts to gather all + # non-daemon threads in a list in order to join on them at server close. + daemon_threads = True + + def start_wsgi_server(port, addr='', registry=REGISTRY): """Starts a WSGI server for prometheus metrics as a daemon thread.""" app = make_wsgi_app(registry) - httpd = make_server(addr, port, app, handler_class=_SilentHandler) + httpd = make_server(addr, port, app, ThreadingWSGIServer, handler_class=_SilentHandler) t = threading.Thread(target=httpd.serve_forever) t.daemon = True t.start() +start_http_server = start_wsgi_server + + def generate_latest(registry=REGISTRY): """Returns the metrics from the registry in latest text format as a string.""" @@ -143,18 +161,15 @@ class MetricsHandler(BaseHTTPRequestHandler): registry = REGISTRY def do_GET(self): + # Prepare parameters registry = self.registry + accept_header = self.headers.get('Accept') params = parse_qs(urlparse(self.path).query) - encoder, content_type = choose_encoder(self.headers.get('Accept')) - if 'name[]' in params: - registry = registry.restricted_registry(params['name[]']) - try: - output = encoder(registry) - except: - self.send_error(500, 'error generating metric output') - raise - self.send_response(200) - self.send_header('Content-Type', content_type) + # Bake output + status, header, output = _bake_output(registry, accept_header, params) + # Return output + self.send_response(int(status.split(' ')[0])) + self.send_header(*header) self.end_headers() self.wfile.write(output) @@ -177,25 +192,6 @@ def factory(cls, registry): return MyMetricsHandler -class _ThreadingSimpleServer(ThreadingMixIn, HTTPServer): - """Thread per request HTTP server.""" - # Make worker threads "fire and forget". Beginning with Python 3.7 this - # prevents a memory leak because ``ThreadingMixIn`` starts to gather all - # non-daemon threads in a list in order to join on them at server close. - # Enabling daemon threads virtually makes ``_ThreadingSimpleServer`` the - # same as Python 3.7's ``ThreadingHTTPServer``. - daemon_threads = True - - -def start_http_server(port, addr='', registry=REGISTRY): - """Starts an HTTP server for prometheus metrics as a daemon thread""" - CustomMetricsHandler = MetricsHandler.factory(registry) - httpd = _ThreadingSimpleServer((addr, port), CustomMetricsHandler) - t = threading.Thread(target=httpd.serve_forever) - t.daemon = True - t.start() - - def write_to_textfile(path, registry): """Write metrics to the given path. @@ -378,3 +374,10 @@ def instance_ip_grouping_key(): with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s: s.connect(('localhost', 0)) return {'instance': s.getsockname()[0]} + + +try: + # Python >3.5 only + from .asgi import make_asgi_app +except: + pass diff --git a/prometheus_client/twisted/_exposition.py b/prometheus_client/twisted/_exposition.py index af3d0a6c..c032e254 100644 --- a/prometheus_client/twisted/_exposition.py +++ b/prometheus_client/twisted/_exposition.py @@ -1,20 +1,10 @@ from __future__ import absolute_import, unicode_literals -from twisted.web.resource import Resource +from twisted.web.wsgi import WSGIResource +from twisted.internet import reactor from .. import exposition, REGISTRY - -class MetricsResource(Resource): - """ - Twisted ``Resource`` that serves prometheus metrics. - """ - isLeaf = True - - def __init__(self, registry=REGISTRY): - self.registry = registry - - def render_GET(self, request): - encoder, content_type = exposition.choose_encoder(request.getHeader('Accept')) - request.setHeader(b'Content-Type', content_type.encode('ascii')) - return encoder(self.registry) +MetricsResource = lambda registry=REGISTRY: WSGIResource( + reactor, reactor.getThreadPool(), exposition.make_wsgi_app(registry) +) diff --git a/tests/test_asgi.py b/tests/test_asgi.py new file mode 100644 index 00000000..b2d9a70f --- /dev/null +++ b/tests/test_asgi.py @@ -0,0 +1,123 @@ +from __future__ import absolute_import, unicode_literals + +import sys +from unittest import TestCase + +from prometheus_client import CollectorRegistry, Counter, generate_latest +from prometheus_client.exposition import CONTENT_TYPE_LATEST + +if sys.version_info < (2, 7): + from unittest2 import skipUnless +else: + from unittest import skipUnless + +try: + # Python >3.5 only + from prometheus_client import make_asgi_app + import asyncio + from asgiref.testing import ApplicationCommunicator + HAVE_ASYNCIO_AND_ASGI = True +except ImportError: + HAVE_ASYNCIO_AND_ASGI = False + + +def setup_testing_defaults(scope): + scope.update( + { + "client": ("127.0.0.1", 32767), + "headers": [], + "http_version": "1.0", + "method": "GET", + "path": "/", + "query_string": b"", + "scheme": "http", + "server": ("127.0.0.1", 80), + "type": "http", + } + ) + + +class ASGITest(TestCase): + @skipUnless(HAVE_ASYNCIO_AND_ASGI, "Don't have asyncio/asgi installed.") + def setUp(self): + self.registry = CollectorRegistry() + self.captured_status = None + self.captured_headers = None + # Setup ASGI scope + self.scope = {} + setup_testing_defaults(self.scope) + self.communicator = None + + def tearDown(self): + if self.communicator: + asyncio.get_event_loop().run_until_complete( + self.communicator.wait() + ) + + def seed_app(self, app): + self.communicator = ApplicationCommunicator(app, self.scope) + + def send_input(self, payload): + asyncio.get_event_loop().run_until_complete( + self.communicator.send_input(payload) + ) + + def send_default_request(self): + self.send_input({"type": "http.request", "body": b""}) + + def get_output(self): + output = asyncio.get_event_loop().run_until_complete( + self.communicator.receive_output(0) + ) + return output + + def get_all_output(self): + outputs = [] + while True: + try: + outputs.append(self.get_output()) + except asyncio.TimeoutError: + break + return outputs + + def validate_metrics(self, metric_name, help_text, increments): + """ + ASGI app serves the metrics from the provided registry. + """ + c = Counter(metric_name, help_text, registry=self.registry) + for _ in range(increments): + c.inc() + # Create and run ASGI app + app = make_asgi_app(self.registry) + self.seed_app(app) + self.send_default_request() + # Assert outputs + outputs = self.get_all_output() + # Assert outputs + self.assertEqual(len(outputs), 2) + response_start = outputs[0] + self.assertEqual(response_start['type'], 'http.response.start') + response_body = outputs[1] + self.assertEqual(response_body['type'], 'http.response.body') + # Status code + self.assertEqual(response_start['status'], 200) + # Headers + self.assertEqual(len(response_start['headers']), 1) + self.assertEqual(response_start['headers'][0], (b"Content-Type", CONTENT_TYPE_LATEST.encode('utf8'))) + # Body + output = response_body['body'].decode('utf8') + self.assertIn("# HELP " + metric_name + "_total " + help_text + "\n", output) + self.assertIn("# TYPE " + metric_name + "_total counter\n", output) + self.assertIn(metric_name + "_total " + str(increments) + ".0\n", output) + + def test_report_metrics_1(self): + self.validate_metrics("counter", "A counter", 2) + + def test_report_metrics_2(self): + self.validate_metrics("counter", "Another counter", 3) + + def test_report_metrics_3(self): + self.validate_metrics("requests", "Number of requests", 5) + + def test_report_metrics_4(self): + self.validate_metrics("failed_requests", "Number of failed requests", 7) diff --git a/tests/test_wsgi.py b/tests/test_wsgi.py new file mode 100644 index 00000000..af251ac4 --- /dev/null +++ b/tests/test_wsgi.py @@ -0,0 +1,69 @@ +from __future__ import absolute_import, unicode_literals + +import sys +from unittest import TestCase +from wsgiref.util import setup_testing_defaults +from prometheus_client import make_wsgi_app + +from prometheus_client import CollectorRegistry, Counter, generate_latest +from prometheus_client.exposition import CONTENT_TYPE_LATEST + + +class WSGITest(TestCase): + def setUp(self): + self.registry = CollectorRegistry() + self.captured_status = None + self.captured_headers = None + # Setup WSGI environment + self.environ = {} + setup_testing_defaults(self.environ) + + def capture(self, status, header): + self.captured_status = status + self.captured_headers = header + + def assertIn(self, item, iterable): + try: + super().assertIn(item, iterable) + except: # Python < 2.7 + self.assertTrue( + item in iterable, + msg="{item} not found in {iterable}".format( + item=item, iterable=iterable + ) + ) + + def validate_metrics(self, metric_name, help_text, increments): + """ + WSGI app serves the metrics from the provided registry. + """ + c = Counter(metric_name, help_text, registry=self.registry) + for _ in range(increments): + c.inc() + # Create and run WSGI app + app = make_wsgi_app(self.registry) + outputs = app(self.environ, self.capture) + # Assert outputs + self.assertEqual(len(outputs), 1) + output = outputs[0].decode('utf8') + # Status code + self.assertEqual(self.captured_status, "200 OK") + # Headers + self.assertEqual(len(self.captured_headers), 1) + self.assertEqual(self.captured_headers[0], ("Content-Type", CONTENT_TYPE_LATEST)) + # Body + self.assertIn("# HELP " + metric_name + "_total " + help_text + "\n", output) + self.assertIn("# TYPE " + metric_name + "_total counter\n", output) + self.assertIn(metric_name + "_total " + str(increments) + ".0\n", output) + + def test_report_metrics_1(self): + self.validate_metrics("counter", "A counter", 2) + + def test_report_metrics_2(self): + self.validate_metrics("counter", "Another counter", 3) + + def test_report_metrics_3(self): + self.validate_metrics("requests", "Number of requests", 5) + + def test_report_metrics_4(self): + self.validate_metrics("failed_requests", "Number of failed requests", 7) diff --git a/tox.ini b/tox.ini index a849cd95..bcabd320 100644 --- a/tox.ini +++ b/tox.ini @@ -30,6 +30,7 @@ deps = deps = {[base]deps} {py27,py37,pypy,pypy3}: twisted + {py37,pypy3}: asgiref commands = coverage run --parallel -m pytest {posargs} ; Ensure test suite passes if no optional dependencies are present.