From 14077a95c55e3f285d94679c6fe45b3bcce23622 Mon Sep 17 00:00:00 2001 From: Thiyagu55 <64461612+Thiyagu55@users.noreply.github.com> Date: Thu, 4 Aug 2022 18:43:12 +0530 Subject: [PATCH 1/3] Adding sqlalchemy native tags in sqlalchemy commenter (#1206) --- CHANGELOG.md | 2 + .../instrumentation/sqlalchemy/__init__.py | 45 +++++++++++++++++++ .../instrumentation/sqlalchemy/engine.py | 23 ++++++++-- .../tests/test_sqlcommenter.py | 3 +- 4 files changed, 69 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f252ee5336..69e5f9c268 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1187](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1187)) - SQLCommenter semicolon bug fix ([#1200](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1200/files)) +- Adding sqlalchemy native tags in sqlalchemy commenter + ([#1206](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1206)) - Add psycopg2 native tags to sqlcommenter ([#1203](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1203)) diff --git a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/__init__.py b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/__init__.py index bc438c609a..e56485ca77 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/__init__.py @@ -22,6 +22,49 @@ .. _sqlalchemy: https://pypi.org/project/sqlalchemy/ +SQLCOMMENTER +**************************************** +You can optionally configure SQLAlchemy instrumentation to enable sqlcommenter which enriches +the query with contextual information. + +Usage +----- + +.. code:: python + + from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor + + SQLAlchemyInstrumentor().instrument(enable_commenter=True, commenter_options={}) + + +For example, +:: + + Invoking engine.execute("select * from auth_users") will lead to sql query "select * from auth_users" but when SQLCommenter is enabled + the query will get appended with some configurable tags like "select * from auth_users /*tag=value*/;" + +SQLCommenter Configurations +*************************** +We can configure the tags to be appended to the sqlquery log by adding configuration inside commenter_options(default:{}) keyword + +db_driver = True(Default) or False + +For example, +:: +Enabling this flag will add any underlying driver like psycopg2 /*db_driver='psycopg2'*/ + +db_framework = True(Default) or False + +For example, +:: +Enabling this flag will add db_framework and it's version /*db_framework='sqlalchemy:0.41b0'*/ + +opentelemetry_values = True(Default) or False + +For example, +:: +Enabling this flag will add traceparent values /*traceparent='00-03afa25236b8cd948fa853d67038ac79-405ff022e8247c46-01'*/ + Usage ----- .. code:: python @@ -115,6 +158,7 @@ def _instrument(self, **kwargs): _get_tracer(tracer_provider), kwargs.get("engine"), kwargs.get("enable_commenter", False), + kwargs.get("commenter_options", {}), ) if kwargs.get("engines") is not None and isinstance( kwargs.get("engines"), Sequence @@ -124,6 +168,7 @@ def _instrument(self, **kwargs): _get_tracer(tracer_provider), engine, kwargs.get("enable_commenter", False), + kwargs.get("commenter_options", {}), ) for engine in kwargs.get("engines") ] diff --git a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py index 25e8791dc8..7441c0aa03 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py +++ b/instrumentation/opentelemetry-instrumentation-sqlalchemy/src/opentelemetry/instrumentation/sqlalchemy/engine.py @@ -94,11 +94,14 @@ def _wrap_connect_internal(func, module, args, kwargs): class EngineTracer: - def __init__(self, tracer, engine, enable_commenter=False): + def __init__( + self, tracer, engine, enable_commenter=True, commenter_options=None + ): self.tracer = tracer self.engine = engine self.vendor = _normalize_vendor(engine.name) self.enable_commenter = enable_commenter + self.commenter_options = commenter_options if commenter_options else {} listen( engine, "before_cursor_execute", self._before_cur_exec, retval=True @@ -141,8 +144,22 @@ def _before_cur_exec( for key, value in attrs.items(): span.set_attribute(key, value) if self.enable_commenter: - commenter_data = {} - commenter_data.update(_get_opentelemetry_values()) + commenter_data = dict( + db_driver=conn.engine.driver, + # Driver/framework centric information. + db_framework=f"sqlalchemy:{__version__}", + ) + + if self.commenter_options.get("opentelemetry_values", True): + commenter_data.update(**_get_opentelemetry_values()) + + # Filter down to just the requested attributes. + commenter_data = { + k: v + for k, v in commenter_data.items() + if self.commenter_options.get(k, True) + } + statement = _add_sql_comment(statement, **commenter_data) context._otel_span = span diff --git a/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlcommenter.py b/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlcommenter.py index 8245d990f7..616388f5e5 100644 --- a/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlcommenter.py +++ b/instrumentation/opentelemetry-instrumentation-sqlalchemy/tests/test_sqlcommenter.py @@ -46,10 +46,11 @@ def test_sqlcommenter_enabled(self): engine=engine, tracer_provider=self.tracer_provider, enable_commenter=True, + commenter_options={"db_framework": False}, ) cnx = engine.connect() cnx.execute("SELECT 1;").fetchall() self.assertRegex( self.caplog.records[-2].getMessage(), - r"SELECT 1 /\*traceparent='\d{1,2}-[a-zA-Z0-9_]{32}-[a-zA-Z0-9_]{16}-\d{1,2}'\*/;", + r"SELECT 1 /\*db_driver='(.*)',traceparent='\d{1,2}-[a-zA-Z0-9_]{32}-[a-zA-Z0-9_]{16}-\d{1,2}'\*/;", ) From ebe6d1804bccff184edb379d7780469762c7b854 Mon Sep 17 00:00:00 2001 From: Anshul Asawa <35421635+TheAnshul756@users.noreply.github.com> Date: Fri, 5 Aug 2022 13:17:14 +0530 Subject: [PATCH 2/3] Metrics instrumentation flask (#1186) --- CHANGELOG.md | 2 + instrumentation/README.md | 2 +- .../instrumentation/flask/__init__.py | 76 ++++++++++- .../instrumentation/flask/package.py | 2 + .../tests/test_programmatic.py | 121 +++++++++++++++++- .../instrumentation/wsgi/__init__.py | 29 +++-- 6 files changed, 214 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 69e5f9c268..57565626d4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-redis` add support to instrument RedisCluster clients ([#1177](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1177)) - `opentelemetry-instrumentation-sqlalchemy` Added span for the connection phase ([#1133](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1133)) +- Add metric instumentation for flask + ([#1186](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1186)) ## [1.12.0rc2-0.32b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc2-0.32b0) - 2022-07-01 diff --git a/instrumentation/README.md b/instrumentation/README.md index 71d79ea258..269010b54f 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -17,7 +17,7 @@ | [opentelemetry-instrumentation-elasticsearch](./opentelemetry-instrumentation-elasticsearch) | elasticsearch >= 2.0 | No | [opentelemetry-instrumentation-falcon](./opentelemetry-instrumentation-falcon) | falcon >= 1.4.1, < 4.0.0 | No | [opentelemetry-instrumentation-fastapi](./opentelemetry-instrumentation-fastapi) | fastapi ~= 0.58 | No -| [opentelemetry-instrumentation-flask](./opentelemetry-instrumentation-flask) | flask >= 1.0, < 3.0 | No +| [opentelemetry-instrumentation-flask](./opentelemetry-instrumentation-flask) | flask >= 1.0, < 3.0 | Yes | [opentelemetry-instrumentation-grpc](./opentelemetry-instrumentation-grpc) | grpcio ~= 1.27 | No | [opentelemetry-instrumentation-httpx](./opentelemetry-instrumentation-httpx) | httpx >= 0.18.0 | No | [opentelemetry-instrumentation-jinja2](./opentelemetry-instrumentation-jinja2) | jinja2 >= 2.7, < 4.0 | No diff --git a/instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/__init__.py b/instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/__init__.py index cca8743556..db63c8c07c 100644 --- a/instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/__init__.py @@ -141,6 +141,7 @@ def response_hook(span: Span, status: str, response_headers: List): """ from logging import getLogger +from timeit import default_timer from typing import Collection import flask @@ -154,6 +155,7 @@ def response_hook(span: Span, status: str, response_headers: List): get_global_response_propagator, ) from opentelemetry.instrumentation.utils import _start_internal_or_server_span +from opentelemetry.metrics import get_meter from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.util._time import _time_ns from opentelemetry.util.http import get_excluded_urls, parse_excluded_urls @@ -165,7 +167,6 @@ def response_hook(span: Span, status: str, response_headers: List): _ENVIRON_ACTIVATION_KEY = "opentelemetry-flask.activation_key" _ENVIRON_TOKEN = "opentelemetry-flask.token" - _excluded_urls_from_env = get_excluded_urls("FLASK") @@ -178,13 +179,26 @@ def get_default_span_name(): return span_name -def _rewrapped_app(wsgi_app, response_hook=None, excluded_urls=None): +def _rewrapped_app( + wsgi_app, + active_requests_counter, + duration_histogram, + response_hook=None, + excluded_urls=None, +): def _wrapped_app(wrapped_app_environ, start_response): # We want to measure the time for route matching, etc. # In theory, we could start the span here and use # update_name later but that API is "highly discouraged" so # we better avoid it. wrapped_app_environ[_ENVIRON_STARTTIME_KEY] = _time_ns() + start = default_timer() + attributes = otel_wsgi.collect_request_attributes(wrapped_app_environ) + active_requests_count_attrs = ( + otel_wsgi._parse_active_request_count_attrs(attributes) + ) + duration_attrs = otel_wsgi._parse_duration_attrs(attributes) + active_requests_counter.add(1, active_requests_count_attrs) def _start_response(status, response_headers, *args, **kwargs): if flask.request and ( @@ -204,6 +218,11 @@ def _start_response(status, response_headers, *args, **kwargs): otel_wsgi.add_response_attributes( span, status, response_headers ) + status_code = otel_wsgi._parse_status_code(status) + if status_code is not None: + duration_attrs[ + SpanAttributes.HTTP_STATUS_CODE + ] = status_code if ( span.is_recording() and span.kind == trace.SpanKind.SERVER @@ -223,13 +242,19 @@ def _start_response(status, response_headers, *args, **kwargs): response_hook(span, status, response_headers) return start_response(status, response_headers, *args, **kwargs) - return wsgi_app(wrapped_app_environ, _start_response) + result = wsgi_app(wrapped_app_environ, _start_response) + duration = max(round((default_timer() - start) * 1000), 0) + duration_histogram.record(duration, duration_attrs) + active_requests_counter.add(-1, active_requests_count_attrs) + return result return _wrapped_app def _wrapped_before_request( - request_hook=None, tracer=None, excluded_urls=None + request_hook=None, + tracer=None, + excluded_urls=None, ): def _before_request(): if excluded_urls and excluded_urls.url_disabled(flask.request.url): @@ -278,7 +303,9 @@ def _before_request(): return _before_request -def _wrapped_teardown_request(excluded_urls=None): +def _wrapped_teardown_request( + excluded_urls=None, +): def _teardown_request(exc): # pylint: disable=E1101 if excluded_urls and excluded_urls.url_disabled(flask.request.url): @@ -290,7 +317,6 @@ def _teardown_request(exc): # a way that doesn't run `before_request`, like when it is created # with `app.test_request_context`. return - if exc is None: activation.__exit__(None, None, None) else: @@ -310,6 +336,7 @@ class _InstrumentedFlask(flask.Flask): _tracer_provider = None _request_hook = None _response_hook = None + _meter_provider = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -317,8 +344,24 @@ def __init__(self, *args, **kwargs): self._original_wsgi_app = self.wsgi_app self._is_instrumented_by_opentelemetry = True + meter = get_meter( + __name__, __version__, _InstrumentedFlask._meter_provider + ) + duration_histogram = meter.create_histogram( + name="http.server.duration", + unit="ms", + description="measures the duration of the inbound HTTP request", + ) + active_requests_counter = meter.create_up_down_counter( + name="http.server.active_requests", + unit="requests", + description="measures the number of concurrent HTTP requests that are currently in-flight", + ) + self.wsgi_app = _rewrapped_app( self.wsgi_app, + active_requests_counter, + duration_histogram, _InstrumentedFlask._response_hook, excluded_urls=_InstrumentedFlask._excluded_urls, ) @@ -367,6 +410,8 @@ def _instrument(self, **kwargs): if excluded_urls is None else parse_excluded_urls(excluded_urls) ) + meter_provider = kwargs.get("meter_provider") + _InstrumentedFlask._meter_provider = meter_provider flask.Flask = _InstrumentedFlask def _uninstrument(self, **kwargs): @@ -379,6 +424,7 @@ def instrument_app( response_hook=None, tracer_provider=None, excluded_urls=None, + meter_provider=None, ): if not hasattr(app, "_is_instrumented_by_opentelemetry"): app._is_instrumented_by_opentelemetry = False @@ -389,9 +435,25 @@ def instrument_app( if excluded_urls is not None else _excluded_urls_from_env ) + meter = get_meter(__name__, __version__, meter_provider) + duration_histogram = meter.create_histogram( + name="http.server.duration", + unit="ms", + description="measures the duration of the inbound HTTP request", + ) + active_requests_counter = meter.create_up_down_counter( + name="http.server.active_requests", + unit="requests", + description="measures the number of concurrent HTTP requests that are currently in-flight", + ) + app._original_wsgi_app = app.wsgi_app app.wsgi_app = _rewrapped_app( - app.wsgi_app, response_hook, excluded_urls=excluded_urls + app.wsgi_app, + active_requests_counter, + duration_histogram, + response_hook, + excluded_urls=excluded_urls, ) tracer = trace.get_tracer(__name__, __version__, tracer_provider) diff --git a/instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/package.py b/instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/package.py index b081564202..33bfe4ccba 100644 --- a/instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/package.py +++ b/instrumentation/opentelemetry-instrumentation-flask/src/opentelemetry/instrumentation/flask/package.py @@ -14,3 +14,5 @@ _instruments = ("flask >= 1.0, < 3.0",) + +_supports_metrics = True diff --git a/instrumentation/opentelemetry-instrumentation-flask/tests/test_programmatic.py b/instrumentation/opentelemetry-instrumentation-flask/tests/test_programmatic.py index 2bcb097c7b..a64ca48d55 100644 --- a/instrumentation/opentelemetry-instrumentation-flask/tests/test_programmatic.py +++ b/instrumentation/opentelemetry-instrumentation-flask/tests/test_programmatic.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from timeit import default_timer from unittest.mock import Mock, patch from flask import Flask, request @@ -23,7 +24,15 @@ get_global_response_propagator, set_global_response_propagator, ) -from opentelemetry.instrumentation.wsgi import OpenTelemetryMiddleware +from opentelemetry.instrumentation.wsgi import ( + OpenTelemetryMiddleware, + _active_requests_count_attrs, + _duration_attrs, +) +from opentelemetry.sdk.metrics.export import ( + HistogramDataPoint, + NumberDataPoint, +) from opentelemetry.sdk.resources import Resource from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.wsgitestutil import WsgiTestBase @@ -49,6 +58,16 @@ def expected_attributes(override_attributes): return default_attributes +_expected_metric_names = [ + "http.server.active_requests", + "http.server.duration", +] +_recommended_attrs = { + "http.server.active_requests": _active_requests_count_attrs, + "http.server.duration": _duration_attrs, +} + + class TestProgrammatic(InstrumentationTest, WsgiTestBase): def setUp(self): super().setUp() @@ -250,6 +269,106 @@ def test_exclude_lists_from_explicit(self): span_list = self.memory_exporter.get_finished_spans() self.assertEqual(len(span_list), 1) + def test_flask_metrics(self): + start = default_timer() + self.client.get("/hello/123") + self.client.get("/hello/321") + self.client.get("/hello/756") + duration = max(round((default_timer() - start) * 1000), 0) + metrics_list = self.memory_metrics_reader.get_metrics_data() + number_data_point_seen = False + histogram_data_point_seen = False + self.assertTrue(len(metrics_list.resource_metrics) != 0) + for resource_metric in metrics_list.resource_metrics: + self.assertTrue(len(resource_metric.scope_metrics) != 0) + for scope_metric in resource_metric.scope_metrics: + self.assertTrue(len(scope_metric.metrics) != 0) + for metric in scope_metric.metrics: + self.assertIn(metric.name, _expected_metric_names) + data_points = list(metric.data.data_points) + self.assertEqual(len(data_points), 1) + for point in data_points: + if isinstance(point, HistogramDataPoint): + self.assertEqual(point.count, 3) + self.assertAlmostEqual( + duration, point.sum, delta=10 + ) + histogram_data_point_seen = True + if isinstance(point, NumberDataPoint): + number_data_point_seen = True + for attr in point.attributes: + self.assertIn( + attr, _recommended_attrs[metric.name] + ) + self.assertTrue(number_data_point_seen and histogram_data_point_seen) + + def test_flask_metric_values(self): + start = default_timer() + self.client.post("/hello/756") + self.client.post("/hello/756") + self.client.post("/hello/756") + duration = max(round((default_timer() - start) * 1000), 0) + metrics_list = self.memory_metrics_reader.get_metrics_data() + for resource_metric in metrics_list.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for point in list(metric.data.data_points): + if isinstance(point, HistogramDataPoint): + self.assertEqual(point.count, 3) + self.assertAlmostEqual( + duration, point.sum, delta=10 + ) + if isinstance(point, NumberDataPoint): + self.assertEqual(point.value, 0) + + def test_basic_metric_success(self): + self.client.get("/hello/756") + expected_duration_attributes = { + "http.method": "GET", + "http.host": "localhost", + "http.scheme": "http", + "http.flavor": "1.1", + "http.server_name": "localhost", + "net.host.port": 80, + "http.status_code": 200, + } + expected_requests_count_attributes = { + "http.method": "GET", + "http.host": "localhost", + "http.scheme": "http", + "http.flavor": "1.1", + "http.server_name": "localhost", + } + metrics_list = self.memory_metrics_reader.get_metrics_data() + for resource_metric in metrics_list.resource_metrics: + for scope_metrics in resource_metric.scope_metrics: + for metric in scope_metrics.metrics: + for point in list(metric.data.data_points): + if isinstance(point, HistogramDataPoint): + self.assertDictEqual( + expected_duration_attributes, + dict(point.attributes), + ) + self.assertEqual(point.count, 1) + elif isinstance(point, NumberDataPoint): + self.assertDictEqual( + expected_requests_count_attributes, + dict(point.attributes), + ) + self.assertEqual(point.value, 0) + + def test_metric_uninstrument(self): + self.client.delete("/hello/756") + FlaskInstrumentor().uninstrument_app(self.app) + self.client.delete("/hello/756") + metrics_list = self.memory_metrics_reader.get_metrics_data() + for resource_metric in metrics_list.resource_metrics: + for scope_metric in resource_metric.scope_metrics: + for metric in scope_metric.metrics: + for point in list(metric.data.data_points): + if isinstance(point, HistogramDataPoint): + self.assertEqual(point.count, 1) + class TestProgrammaticHooks(InstrumentationTest, WsgiTestBase): def setUp(self): diff --git a/instrumentation/opentelemetry-instrumentation-wsgi/src/opentelemetry/instrumentation/wsgi/__init__.py b/instrumentation/opentelemetry-instrumentation-wsgi/src/opentelemetry/instrumentation/wsgi/__init__.py index 11c5acf643..1d82b33037 100644 --- a/instrumentation/opentelemetry-instrumentation-wsgi/src/opentelemetry/instrumentation/wsgi/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-wsgi/src/opentelemetry/instrumentation/wsgi/__init__.py @@ -334,6 +334,22 @@ def _parse_status_code(resp_status): return None +def _parse_active_request_count_attrs(req_attrs): + active_requests_count_attrs = {} + for attr_key in _active_requests_count_attrs: + if req_attrs.get(attr_key) is not None: + active_requests_count_attrs[attr_key] = req_attrs[attr_key] + return active_requests_count_attrs + + +def _parse_duration_attrs(req_attrs): + duration_attrs = {} + for attr_key in _duration_attrs: + if req_attrs.get(attr_key) is not None: + duration_attrs[attr_key] = req_attrs[attr_key] + return duration_attrs + + def add_response_attributes( span, start_response_status, response_headers ): # pylint: disable=unused-argument @@ -436,15 +452,10 @@ def __call__(self, environ, start_response): start_response: The WSGI start_response callable. """ req_attrs = collect_request_attributes(environ) - active_requests_count_attrs = {} - for attr_key in _active_requests_count_attrs: - if req_attrs.get(attr_key) is not None: - active_requests_count_attrs[attr_key] = req_attrs[attr_key] - - duration_attrs = {} - for attr_key in _duration_attrs: - if req_attrs.get(attr_key) is not None: - duration_attrs[attr_key] = req_attrs[attr_key] + active_requests_count_attrs = _parse_active_request_count_attrs( + req_attrs + ) + duration_attrs = _parse_duration_attrs(req_attrs) span, token = _start_internal_or_server_span( tracer=self.tracer, From cbf005be6fb17f35b4cbaa4e76ac30bb64b3a258 Mon Sep 17 00:00:00 2001 From: Anshul Asawa <35421635+TheAnshul756@users.noreply.github.com> Date: Mon, 8 Aug 2022 23:27:16 +0530 Subject: [PATCH 3/3] Metric instrumentation asgi (#1197) --- CHANGELOG.md | 2 + .../instrumentation/asgi/__init__.py | 42 ++++++- .../tests/test_asgi_middleware.py | 111 ++++++++++++++++++ .../src/opentelemetry/util/http/__init__.py | 38 ++++++ 4 files changed, 189 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 57565626d4..8c5889f827 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `opentelemetry-instrumentation-redis` add support to instrument RedisCluster clients ([#1177](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1177)) - `opentelemetry-instrumentation-sqlalchemy` Added span for the connection phase ([#1133](https://github.com/open-telemetry/opentelemetry-python-contrib/issues/1133)) +- Add metric instrumentation in asgi + ([#1197](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1197)) - Add metric instumentation for flask ([#1186](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1186)) diff --git a/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py b/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py index d2e42450a0..e3bbf16754 100644 --- a/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-asgi/src/opentelemetry/instrumentation/asgi/__init__.py @@ -149,6 +149,7 @@ def client_response_hook(span: Span, message: dict): import typing import urllib from functools import wraps +from timeit import default_timer from typing import Tuple from asgiref.compatibility import guarantee_single_callable @@ -162,6 +163,7 @@ def client_response_hook(span: Span, message: dict): _start_internal_or_server_span, http_status_to_status_code, ) +from opentelemetry.metrics import get_meter from opentelemetry.propagators.textmap import Getter, Setter from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace import Span, set_span_in_context @@ -169,6 +171,8 @@ def client_response_hook(span: Span, message: dict): from opentelemetry.util.http import ( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST, OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE, + _parse_active_request_count_attrs, + _parse_duration_attrs, get_custom_headers, normalise_request_header_name, normalise_response_header_name, @@ -391,9 +395,21 @@ def __init__( client_request_hook: _ClientRequestHookT = None, client_response_hook: _ClientResponseHookT = None, tracer_provider=None, + meter_provider=None, ): self.app = guarantee_single_callable(app) self.tracer = trace.get_tracer(__name__, __version__, tracer_provider) + self.meter = get_meter(__name__, __version__, meter_provider) + self.duration_histogram = self.meter.create_histogram( + name="http.server.duration", + unit="ms", + description="measures the duration of the inbound HTTP request", + ) + self.active_requests_counter = self.meter.create_up_down_counter( + name="http.server.active_requests", + unit="requests", + description="measures the number of concurrent HTTP requests that are currently in-flight", + ) self.excluded_urls = excluded_urls self.default_span_details = ( default_span_details or get_default_span_details @@ -426,12 +442,17 @@ async def __call__(self, scope, receive, send): context_carrier=scope, context_getter=asgi_getter, ) - + attributes = collect_request_attributes(scope) + attributes.update(additional_attributes) + active_requests_count_attrs = _parse_active_request_count_attrs( + attributes + ) + duration_attrs = _parse_duration_attrs(attributes) + if scope["type"] == "http": + self.active_requests_counter.add(1, active_requests_count_attrs) try: with trace.use_span(span, end_on_exit=True) as current_span: if current_span.is_recording(): - attributes = collect_request_attributes(scope) - attributes.update(additional_attributes) for key, value in attributes.items(): current_span.set_attribute(key, value) @@ -454,10 +475,18 @@ async def __call__(self, scope, receive, send): span_name, scope, send, + duration_attrs, ) + start = default_timer() await self.app(scope, otel_receive, otel_send) finally: + if scope["type"] == "http": + duration = max(round((default_timer() - start) * 1000), 0) + self.duration_histogram.record(duration, duration_attrs) + self.active_requests_counter.add( + -1, active_requests_count_attrs + ) if token: context.detach(token) @@ -478,7 +507,9 @@ async def otel_receive(): return otel_receive - def _get_otel_send(self, server_span, server_span_name, scope, send): + def _get_otel_send( + self, server_span, server_span_name, scope, send, duration_attrs + ): @wraps(send) async def otel_send(message): with self.tracer.start_as_current_span( @@ -489,6 +520,9 @@ async def otel_send(message): if send_span.is_recording(): if message["type"] == "http.response.start": status_code = message["status"] + duration_attrs[ + SpanAttributes.HTTP_STATUS_CODE + ] = status_code set_status_code(server_span, status_code) set_status_code(send_span, status_code) elif message["type"] == "websocket.send": diff --git a/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py b/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py index 3a1e8424a8..e6b75d7125 100644 --- a/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py +++ b/instrumentation/opentelemetry-instrumentation-asgi/tests/test_asgi_middleware.py @@ -14,6 +14,7 @@ import sys import unittest +from timeit import default_timer from unittest import mock import opentelemetry.instrumentation.asgi as otel_asgi @@ -24,6 +25,10 @@ set_global_response_propagator, ) from opentelemetry.sdk import resources +from opentelemetry.sdk.metrics.export import ( + HistogramDataPoint, + NumberDataPoint, +) from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.asgitestutil import ( AsgiTestBase, @@ -34,8 +39,19 @@ from opentelemetry.util.http import ( OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST, OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE, + _active_requests_count_attrs, + _duration_attrs, ) +_expected_metric_names = [ + "http.server.active_requests", + "http.server.duration", +] +_recommended_attrs = { + "http.server.active_requests": _active_requests_count_attrs, + "http.server.duration": _duration_attrs, +} + async def http_app(scope, receive, send): message = await receive() @@ -523,6 +539,101 @@ def update_expected_hook_results(expected): outputs, modifiers=[update_expected_hook_results] ) + def test_asgi_metrics(self): + app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) + self.seed_app(app) + self.send_default_request() + self.seed_app(app) + self.send_default_request() + self.seed_app(app) + self.send_default_request() + metrics_list = self.memory_metrics_reader.get_metrics_data() + number_data_point_seen = False + histogram_data_point_seen = False + self.assertTrue(len(metrics_list.resource_metrics) != 0) + for resource_metric in metrics_list.resource_metrics: + self.assertTrue(len(resource_metric.scope_metrics) != 0) + for scope_metric in resource_metric.scope_metrics: + self.assertTrue(len(scope_metric.metrics) != 0) + for metric in scope_metric.metrics: + self.assertIn(metric.name, _expected_metric_names) + data_points = list(metric.data.data_points) + self.assertEqual(len(data_points), 1) + for point in data_points: + if isinstance(point, HistogramDataPoint): + self.assertEqual(point.count, 3) + histogram_data_point_seen = True + if isinstance(point, NumberDataPoint): + number_data_point_seen = True + for attr in point.attributes: + self.assertIn( + attr, _recommended_attrs[metric.name] + ) + self.assertTrue(number_data_point_seen and histogram_data_point_seen) + + def test_basic_metric_success(self): + app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) + self.seed_app(app) + start = default_timer() + self.send_default_request() + duration = max(round((default_timer() - start) * 1000), 0) + expected_duration_attributes = { + "http.method": "GET", + "http.host": "127.0.0.1", + "http.scheme": "http", + "http.flavor": "1.0", + "net.host.port": 80, + "http.status_code": 200, + } + expected_requests_count_attributes = { + "http.method": "GET", + "http.host": "127.0.0.1", + "http.scheme": "http", + "http.flavor": "1.0", + } + metrics_list = self.memory_metrics_reader.get_metrics_data() + for resource_metric in metrics_list.resource_metrics: + for scope_metrics in resource_metric.scope_metrics: + for metric in scope_metrics.metrics: + for point in list(metric.data.data_points): + if isinstance(point, HistogramDataPoint): + self.assertDictEqual( + expected_duration_attributes, + dict(point.attributes), + ) + self.assertEqual(point.count, 1) + self.assertAlmostEqual( + duration, point.sum, delta=5 + ) + elif isinstance(point, NumberDataPoint): + self.assertDictEqual( + expected_requests_count_attributes, + dict(point.attributes), + ) + self.assertEqual(point.value, 0) + + def test_no_metric_for_websockets(self): + self.scope = { + "type": "websocket", + "http_version": "1.1", + "scheme": "ws", + "path": "/", + "query_string": b"", + "headers": [], + "client": ("127.0.0.1", 32767), + "server": ("127.0.0.1", 80), + } + app = otel_asgi.OpenTelemetryMiddleware(simple_asgi) + self.seed_app(app) + self.send_input({"type": "websocket.connect"}) + self.send_input({"type": "websocket.receive", "text": "ping"}) + self.send_input({"type": "websocket.disconnect"}) + self.get_all_output() + metrics_list = self.memory_metrics_reader.get_metrics_data() + self.assertEqual( + len(metrics_list.resource_metrics[0].scope_metrics), 0 + ) + class TestAsgiAttributes(unittest.TestCase): def setUp(self): diff --git a/util/opentelemetry-util-http/src/opentelemetry/util/http/__init__.py b/util/opentelemetry-util-http/src/opentelemetry/util/http/__init__.py index aa34fb439a..60eafdcd5b 100644 --- a/util/opentelemetry-util-http/src/opentelemetry/util/http/__init__.py +++ b/util/opentelemetry-util-http/src/opentelemetry/util/http/__init__.py @@ -18,6 +18,8 @@ from typing import Iterable, List from urllib.parse import urlparse, urlunparse +from opentelemetry.semconv.trace import SpanAttributes + OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST = ( "OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_REQUEST" ) @@ -25,6 +27,26 @@ "OTEL_INSTRUMENTATION_HTTP_CAPTURE_HEADERS_SERVER_RESPONSE" ) +# List of recommended metrics attributes +_duration_attrs = [ + SpanAttributes.HTTP_METHOD, + SpanAttributes.HTTP_HOST, + SpanAttributes.HTTP_SCHEME, + SpanAttributes.HTTP_STATUS_CODE, + SpanAttributes.HTTP_FLAVOR, + SpanAttributes.HTTP_SERVER_NAME, + SpanAttributes.NET_HOST_NAME, + SpanAttributes.NET_HOST_PORT, +] + +_active_requests_count_attrs = [ + SpanAttributes.HTTP_METHOD, + SpanAttributes.HTTP_HOST, + SpanAttributes.HTTP_SCHEME, + SpanAttributes.HTTP_FLAVOR, + SpanAttributes.HTTP_SERVER_NAME, +] + class ExcludeList: """Class to exclude certain paths (given as a list of regexes) from tracing requests""" @@ -125,3 +147,19 @@ def get_custom_headers(env_var: str) -> List[str]: for custom_headers in custom_headers.split(",") ] return custom_headers + + +def _parse_active_request_count_attrs(req_attrs): + active_requests_count_attrs = {} + for attr_key in _active_requests_count_attrs: + if req_attrs.get(attr_key) is not None: + active_requests_count_attrs[attr_key] = req_attrs[attr_key] + return active_requests_count_attrs + + +def _parse_duration_attrs(req_attrs): + duration_attrs = {} + for attr_key in _duration_attrs: + if req_attrs.get(attr_key) is not None: + duration_attrs[attr_key] = req_attrs[attr_key] + return duration_attrs