From a3d5588ff41577c3890edbde29b9bd71715cb70f Mon Sep 17 00:00:00 2001 From: Mario Jonke Date: Thu, 3 Sep 2020 17:07:14 +0200 Subject: [PATCH 1/3] Add auto instrumentation for aiohttp * add an instrumentor for aiohttp client sessions * the instrumentor will wrap the ClientSession constructor and automatically a TraceConfig on construction --- .../setup.cfg | 7 +- .../aiohttp_client/__init__.py | 173 ++++++++++-- .../tests/test_aiohttp_client_integration.py | 264 +++++++++++++++--- 3 files changed, 375 insertions(+), 69 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiohttp-client/setup.cfg b/instrumentation/opentelemetry-instrumentation-aiohttp-client/setup.cfg index a222f323c06..9c4c81305d5 100644 --- a/instrumentation/opentelemetry-instrumentation-aiohttp-client/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-aiohttp-client/setup.cfg @@ -39,12 +39,17 @@ package_dir= =src packages=find_namespace: install_requires = - opentelemetry-api >= 0.12.dev0 + opentelemetry-api == 0.13dev0 opentelemetry-instrumentation == 0.13dev0 aiohttp ~= 3.0 + wrapt >= 1.0.0, < 2.0.0 [options.packages.find] where = src [options.extras_require] test = + +[options.entry_points] +opentelemetry_instrumentor = + aiohttp-client = opentelemetry.instrumentation.aiohttp_client:AioHttpClientInstrumentor \ No newline at end of file diff --git a/instrumentation/opentelemetry-instrumentation-aiohttp-client/src/opentelemetry/instrumentation/aiohttp_client/__init__.py b/instrumentation/opentelemetry-instrumentation-aiohttp-client/src/opentelemetry/instrumentation/aiohttp_client/__init__.py index 397d5dc80e9..231ec95428c 100644 --- a/instrumentation/opentelemetry-instrumentation-aiohttp-client/src/opentelemetry/instrumentation/aiohttp_client/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aiohttp-client/src/opentelemetry/instrumentation/aiohttp_client/__init__.py @@ -18,44 +18,73 @@ Usage ----- +Explicitly instrumenting a single client session: - .. code:: python +.. code:: python - import aiohttp - from opentelemetry.instrumentation.aiohttp_client import ( - create_trace_config, - url_path_span_name - ) - import yarl + import aiohttp + from opentelemetry.instrumentation.aiohttp_client import ( + create_trace_config, + url_path_span_name + ) + import yarl - def strip_query_params(url: yarl.URL) -> str: - return str(url.with_query(None)) + def strip_query_params(url: yarl.URL) -> str: + return str(url.with_query(None)) - async with aiohttp.ClientSession(trace_configs=[create_trace_config( - # Remove all query params from the URL attribute on the span. - url_filter=strip_query_params, - # Use the URL's path as the span name. - span_name=url_path_span_name - )]) as session: - async with session.get(url) as response: - await response.text() + async with aiohttp.ClientSession(trace_configs=[create_trace_config( + # Remove all query params from the URL attribute on the span. + url_filter=strip_query_params, + # Use the URL's path as the span name. + span_name=url_path_span_name + )]) as session: + async with session.get(url) as response: + await response.text() + +Instrumenting all client sessions: + +.. code:: python + + import aiohttp + from opentelemetry.instrumentation.aiohttp_client import ( + AioHttpClientInstrumentor + ) + # Enable instrumentation + AioHttpClientInstrumentor().instrument() + + # Create a session and make an HTTP get request + async with aiohttp.ClientSession() as session: + async with session.get(url) as response: + await response.text() + +API +--- """ -import contextlib import socket import types import typing import aiohttp +import wrapt from opentelemetry import context as context_api from opentelemetry import propagators, trace from opentelemetry.instrumentation.aiohttp_client.version import __version__ -from opentelemetry.instrumentation.utils import http_status_to_canonical_code -from opentelemetry.trace import SpanKind +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import ( + http_status_to_canonical_code, + unwrap, +) +from opentelemetry.trace import SpanKind, TracerProvider, get_tracer from opentelemetry.trace.status import Status, StatusCanonicalCode +_UrlFilterT = typing.Optional[typing.Callable[[str], str]] +_SpanNameT = typing.Optional[ + typing.Union[typing.Callable[[aiohttp.TraceRequestStartParams], str], str] +] + def url_path_span_name(params: aiohttp.TraceRequestStartParams) -> str: """Extract a span name from the request URL path. @@ -73,12 +102,9 @@ def url_path_span_name(params: aiohttp.TraceRequestStartParams) -> str: def create_trace_config( - url_filter: typing.Optional[typing.Callable[[str], str]] = None, - span_name: typing.Optional[ - typing.Union[ - typing.Callable[[aiohttp.TraceRequestStartParams], str], str - ] - ] = None, + url_filter: _UrlFilterT = None, + span_name: _SpanNameT = None, + tracer_provider: TracerProvider = None, ) -> aiohttp.TraceConfig: """Create an aiohttp-compatible trace configuration. @@ -104,6 +130,7 @@ def create_trace_config( such as API keys or user personal information. :param str span_name: Override the default span name. + :param tracer_provider: optional TracerProvider from which to get a Tracer :return: An object suitable for use with :py:class:`aiohttp.ClientSession`. :rtype: :py:class:`aiohttp.TraceConfig` @@ -113,7 +140,7 @@ def create_trace_config( # Explicitly specify the type for the `span_name` param and rtype to work # around this issue. - tracer = trace.get_tracer_provider().get_tracer(__name__, __version__) + tracer = get_tracer(__name__, __version__, tracer_provider) def _end_trace(trace_config_ctx: types.SimpleNamespace): context_api.detach(trace_config_ctx.token) @@ -124,6 +151,10 @@ async def on_request_start( trace_config_ctx: types.SimpleNamespace, params: aiohttp.TraceRequestStartParams, ): + if context_api.get_value("suppress_instrumentation"): + trace_config_ctx.span = None + return + http_method = params.method.upper() if trace_config_ctx.span_name is None: request_span_name = "HTTP {}".format(http_method) @@ -155,6 +186,9 @@ async def on_request_end( trace_config_ctx: types.SimpleNamespace, params: aiohttp.TraceRequestEndParams, ): + if trace_config_ctx.span is None: + return + trace_config_ctx.span.set_status( Status(http_status_to_canonical_code(int(params.response.status))) ) @@ -171,6 +205,9 @@ async def on_request_exception( trace_config_ctx: types.SimpleNamespace, params: aiohttp.TraceRequestExceptionParams, ): + if trace_config_ctx.span is None: + return + if isinstance( params.exception, (aiohttp.ServerTimeoutError, aiohttp.TooManyRedirects), @@ -186,6 +223,7 @@ async def on_request_exception( status = StatusCanonicalCode.UNAVAILABLE trace_config_ctx.span.set_status(Status(status)) + trace_config_ctx.span.record_exception(params.exception) _end_trace(trace_config_ctx) def _trace_config_ctx_factory(**kwargs): @@ -203,3 +241,84 @@ def _trace_config_ctx_factory(**kwargs): trace_config.on_request_exception.append(on_request_exception) return trace_config + + +def _instrument( + tracer_provider: TracerProvider = None, + url_filter: _UrlFilterT = None, + span_name: _SpanNameT = None, +): + """Enables tracing of all ClientSessions + + When a ClientSession gets created a TraceConfig is automatically added to + the session's trace_configs. + """ + # pylint:disable=unused-argument + def instrumented_init(wrapped, instance, args, kwargs): + if context_api.get_value("suppress_instrumentation"): + return wrapped(*args, **kwargs) + + trace_configs = list(kwargs.get("trace_configs") or ()) + + trace_config = create_trace_config( + url_filter=url_filter, + span_name=span_name, + tracer_provider=tracer_provider, + ) + trace_config.opentelemetry_aiohttp_trace_config = True + trace_configs.append(trace_config) + + kwargs["trace_configs"] = trace_configs + return wrapped(*args, **kwargs) + + wrapt.wrap_function_wrapper( + aiohttp.ClientSession, "__init__", instrumented_init + ) + + +def _uninstrument(): + """Disables instrumenting for all newly created ClientSessions""" + unwrap(aiohttp.ClientSession, "__init__") + + +def _uninstrument_session(client_session: aiohttp.ClientSession): + """Disables instrumentation for the given ClientSession""" + # pylint: disable=protected-access + trace_configs = client_session._trace_configs + client_session._trace_configs = [ + trace_config + for trace_config in trace_configs + if not hasattr(trace_config, "opentelemetry_aiohttp_trace_config") + ] + + +class AioHttpClientInstrumentor(BaseInstrumentor): + """An instrumentor for aiohttp client sessions + + See `BaseInstrumentor` + """ + + def _instrument(self, **kwargs): + """Instruments aiohttp ClientSession + + Args: + **kwargs: Optional arguments + ``tracer_provider``: a TracerProvider, defaults to global + ``url_filter``: A callback to process the requested URL prior to adding + it as a span attribute. This can be useful to remove sensitive data + such as API keys or user personal information. + ``span_name``: Override the default span name. + """ + _instrument( + tracer_provider=kwargs.get("tracer_provider"), + url_filter=kwargs.get("url_filter"), + span_name=kwargs.get("span_name"), + ) + + def _uninstrument(self, **kwargs): + _uninstrument() + + @staticmethod + def uninstrument_session(client_session: aiohttp.ClientSession): + """Disables instrumentation for the given session""" + _uninstrument_session(client_session) diff --git a/instrumentation/opentelemetry-instrumentation-aiohttp-client/tests/test_aiohttp_client_integration.py b/instrumentation/opentelemetry-instrumentation-aiohttp-client/tests/test_aiohttp_client_integration.py index 4a48c38ff70..79b0af0ed11 100644 --- a/instrumentation/opentelemetry-instrumentation-aiohttp-client/tests/test_aiohttp_client_integration.py +++ b/instrumentation/opentelemetry-instrumentation-aiohttp-client/tests/test_aiohttp_client_integration.py @@ -15,21 +15,46 @@ import asyncio import contextlib import typing +import unittest import urllib.parse from http import HTTPStatus import aiohttp import aiohttp.test_utils import yarl +from pkg_resources import iter_entry_points -import opentelemetry.instrumentation.aiohttp_client +from opentelemetry import context +from opentelemetry.instrumentation import aiohttp_client +from opentelemetry.instrumentation.aiohttp_client import ( + AioHttpClientInstrumentor, +) from opentelemetry.test.test_base import TestBase from opentelemetry.trace.status import StatusCanonicalCode -class TestAioHttpIntegration(TestBase): - maxDiff = None +def run_with_test_server( + runnable: typing.Callable, url: str, handler: typing.Callable +) -> typing.Tuple[str, int]: + async def do_request(): + app = aiohttp.web.Application() + parsed_url = urllib.parse.urlparse(url) + app.add_routes([aiohttp.web.get(parsed_url.path, handler)]) + app.add_routes([aiohttp.web.post(parsed_url.path, handler)]) + app.add_routes([aiohttp.web.patch(parsed_url.path, handler)]) + + with contextlib.suppress(aiohttp.ClientError): + async with aiohttp.test_utils.TestServer(app) as server: + netloc = (server.host, server.port) + await server.start_server() + await runnable(server) + return netloc + + loop = asyncio.get_event_loop() + return loop.run_until_complete(do_request()) + +class TestAioHttpIntegration(TestBase): def assert_spans(self, spans): self.assertEqual( [ @@ -53,9 +78,7 @@ def test_url_path_span_name(self): ): with self.subTest(url=url): params = aiohttp.TraceRequestStartParams("METHOD", url, {}) - actual = opentelemetry.instrumentation.aiohttp_client.url_path_span_name( - params - ) + actual = aiohttp_client.url_path_span_name(params) self.assertEqual(actual, expected) self.assertIsInstance(actual, str) @@ -70,33 +93,20 @@ def _http_request( ) -> typing.Tuple[str, int]: """Helper to start an aiohttp test server and send an actual HTTP request to it.""" - async def do_request(): - async def default_handler(request): - assert "traceparent" in request.headers - return aiohttp.web.Response(status=int(status_code)) - - handler = request_handler or default_handler - - app = aiohttp.web.Application() - parsed_url = urllib.parse.urlparse(url) - app.add_routes([aiohttp.web.get(parsed_url.path, handler)]) - app.add_routes([aiohttp.web.post(parsed_url.path, handler)]) - app.add_routes([aiohttp.web.patch(parsed_url.path, handler)]) - - with contextlib.suppress(aiohttp.ClientError): - async with aiohttp.test_utils.TestServer(app) as server: - netloc = (server.host, server.port) - async with aiohttp.test_utils.TestClient( - server, trace_configs=[trace_config] - ) as client: - await client.start_server() - await client.request( - method, url, trace_request_ctx={}, **kwargs - ) - return netloc + async def default_handler(request): + assert "traceparent" in request.headers + return aiohttp.web.Response(status=int(status_code)) + + async def client_request(server: aiohttp.test_utils.TestServer): + async with aiohttp.test_utils.TestClient( + server, trace_configs=[trace_config] + ) as client: + await client.request( + method, url, trace_request_ctx={}, **kwargs + ) - loop = asyncio.get_event_loop() - return loop.run_until_complete(do_request()) + handler = request_handler or default_handler + return run_with_test_server(client_request, url, handler) def test_status_codes(self): for status_code, span_status in ( @@ -110,7 +120,7 @@ def test_status_codes(self): ): with self.subTest(status_code=status_code): host, port = self._http_request( - trace_config=opentelemetry.instrumentation.aiohttp_client.create_trace_config(), + trace_config=aiohttp_client.create_trace_config(), url="/test-path?query=param#foobar", status_code=status_code, ) @@ -149,7 +159,7 @@ def test_span_name_option(self): ): with self.subTest(span_name=span_name, method=method, path=path): host, port = self._http_request( - trace_config=opentelemetry.instrumentation.aiohttp_client.create_trace_config( + trace_config=aiohttp_client.create_trace_config( span_name=span_name ), method=method, @@ -182,7 +192,7 @@ def strip_query_params(url: yarl.URL) -> str: return str(url.with_query(None)) host, port = self._http_request( - trace_config=opentelemetry.instrumentation.aiohttp_client.create_trace_config( + trace_config=aiohttp_client.create_trace_config( url_filter=strip_query_params ), url="/some/path?query=param&other=param2", @@ -208,9 +218,7 @@ def strip_query_params(url: yarl.URL) -> str: ) def test_connection_errors(self): - trace_configs = [ - opentelemetry.instrumentation.aiohttp_client.create_trace_config() - ] + trace_configs = [aiohttp_client.create_trace_config()] for url, expected_status in ( ("http://this-is-unknown.local/", StatusCanonicalCode.UNKNOWN), @@ -220,7 +228,7 @@ def test_connection_errors(self): async def do_request(url): async with aiohttp.ClientSession( - trace_configs=trace_configs + trace_configs=trace_configs, ) as session: async with session.get(url): pass @@ -251,7 +259,7 @@ async def request_handler(request): return aiohttp.web.Response() host, port = self._http_request( - trace_config=opentelemetry.instrumentation.aiohttp_client.create_trace_config(), + trace_config=aiohttp_client.create_trace_config(), url="/test_timeout", request_handler=request_handler, timeout=aiohttp.ClientTimeout(sock_read=0.01), @@ -281,7 +289,7 @@ async def request_handler(request): raise aiohttp.web.HTTPFound(location=location) host, port = self._http_request( - trace_config=opentelemetry.instrumentation.aiohttp_client.create_trace_config(), + trace_config=aiohttp_client.create_trace_config(), url="/test_too_many_redirects", request_handler=request_handler, max_redirects=2, @@ -302,3 +310,177 @@ async def request_handler(request): ) ] ) + + +class TestAioHttpClientInstrumentor(TestBase): + URL = "/test-path" + + def setUp(self): + super().setUp() + AioHttpClientInstrumentor().instrument() + + def tearDown(self): + super().tearDown() + AioHttpClientInstrumentor().uninstrument() + + @staticmethod + # pylint:disable=unused-argument + async def default_handler(request): + return aiohttp.web.Response(status=int(200)) + + @staticmethod + def get_default_request(url: str = URL): + async def default_request(server: aiohttp.test_utils.TestServer): + async with aiohttp.test_utils.TestClient(server) as session: + await session.get(url) + + return default_request + + def assert_spans(self, num_spans: int): + finished_spans = self.memory_exporter.get_finished_spans() + self.assertEqual(num_spans, len(finished_spans)) + if num_spans == 0: + return None + if num_spans == 1: + return finished_spans[0] + return finished_spans + + def test_instrument(self): + host, port = run_with_test_server( + self.get_default_request(), self.URL, self.default_handler + ) + span = self.assert_spans(1) + self.assertEqual("http", span.attributes["component"]) + self.assertEqual("GET", span.attributes["http.method"]) + self.assertEqual( + "http://{}:{}/test-path".format(host, port), + span.attributes["http.url"], + ) + self.assertEqual(200, span.attributes["http.status_code"]) + self.assertEqual("OK", span.attributes["http.status_text"]) + + def test_instrument_with_existing_trace_config(self): + trace_config = aiohttp.TraceConfig() + + async def create_session(server: aiohttp.test_utils.TestServer): + async with aiohttp.test_utils.TestClient( + server, trace_configs=[trace_config] + ) as client: + # pylint:disable=protected-access + trace_configs = client.session._trace_configs + self.assertEqual(2, len(trace_configs)) + self.assertTrue(trace_config in trace_configs) + async with client as session: + await session.get(TestAioHttpClientInstrumentor.URL) + + run_with_test_server(create_session, self.URL, self.default_handler) + self.assert_spans(1) + + def test_uninstrument(self): + AioHttpClientInstrumentor().uninstrument() + run_with_test_server( + self.get_default_request(), self.URL, self.default_handler + ) + + self.assert_spans(0) + + AioHttpClientInstrumentor().instrument() + run_with_test_server( + self.get_default_request(), self.URL, self.default_handler + ) + self.assert_spans(1) + + def test_uninstrument_session(self): + async def uninstrument_request(server: aiohttp.test_utils.TestServer): + client = aiohttp.test_utils.TestClient(server) + AioHttpClientInstrumentor().uninstrument_session(client.session) + async with client as session: + await session.get(self.URL) + + run_with_test_server( + uninstrument_request, self.URL, self.default_handler + ) + self.assert_spans(0) + + run_with_test_server( + self.get_default_request(), self.URL, self.default_handler + ) + self.assert_spans(1) + + def test_suppress_instrumentation(self): + token = context.attach( + context.set_value("suppress_instrumentation", True) + ) + try: + run_with_test_server( + self.get_default_request(), self.URL, self.default_handler + ) + finally: + context.detach(token) + self.assert_spans(0) + + @staticmethod + async def suppressed_request(server: aiohttp.test_utils.TestServer): + async with aiohttp.test_utils.TestClient(server) as client: + token = context.attach( + context.set_value("suppress_instrumentation", True) + ) + await client.get(TestAioHttpClientInstrumentor.URL) + context.detach(token) + + def test_suppress_instrumentation_after_creation(self): + run_with_test_server( + self.suppressed_request, self.URL, self.default_handler + ) + self.assert_spans(0) + + def test_suppress_instrumentation_with_server_exception(self): + # pylint:disable=unused-argument + async def raising_handler(request): + raise aiohttp.web.HTTPFound(location=self.URL) + + run_with_test_server( + self.suppressed_request, self.URL, raising_handler + ) + self.assert_spans(0) + + def test_url_filter(self): + def strip_query_params(url: yarl.URL) -> str: + return str(url.with_query(None)) + + AioHttpClientInstrumentor().uninstrument() + AioHttpClientInstrumentor().instrument(url_filter=strip_query_params) + + url = "/test-path?query=params" + host, port = run_with_test_server( + self.get_default_request(url), url, self.default_handler + ) + span = self.assert_spans(1) + self.assertEqual( + "http://{}:{}/test-path".format(host, port), + span.attributes["http.url"], + ) + + def test_span_name(self): + def span_name_callback(params: aiohttp.TraceRequestStartParams) -> str: + return "{} - {}".format(params.method, params.url.path) + + AioHttpClientInstrumentor().uninstrument() + AioHttpClientInstrumentor().instrument(span_name=span_name_callback) + + url = "/test-path" + run_with_test_server( + self.get_default_request(url), url, self.default_handler + ) + span = self.assert_spans(1) + self.assertEqual("GET - /test-path", span.name) + + +class TestLoadingAioHttpInstrumentor(unittest.TestCase): + def test_loading_instrumentor(self): + entry_points = iter_entry_points( + "opentelemetry_instrumentor", "aiohttp-client" + ) + + instrumentor = next(entry_points).load()() + self.assertIsInstance(instrumentor, AioHttpClientInstrumentor) From 1f55e28614739819cb0aeee4ec7f0009ee9b34ac Mon Sep 17 00:00:00 2001 From: Mario Jonke Date: Fri, 4 Sep 2020 08:44:30 +0200 Subject: [PATCH 2/3] changelog --- .../opentelemetry-instrumentation-aiohttp-client/CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/instrumentation/opentelemetry-instrumentation-aiohttp-client/CHANGELOG.md b/instrumentation/opentelemetry-instrumentation-aiohttp-client/CHANGELOG.md index 78b989563f3..c648f49a1c0 100644 --- a/instrumentation/opentelemetry-instrumentation-aiohttp-client/CHANGELOG.md +++ b/instrumentation/opentelemetry-instrumentation-aiohttp-client/CHANGELOG.md @@ -4,6 +4,8 @@ - Updating span name to match semantic conventions ([#972](https://github.com/open-telemetry/opentelemetry-python/pull/972)) +- Add instrumentor and auto instrumentation support for aiohttp + ([#1075](https://github.com/open-telemetry/opentelemetry-python/pull/1075)) ## Version 0.12b0 From f0304098e802fc540483a0e838e9098efe438cc6 Mon Sep 17 00:00:00 2001 From: Mario Jonke Date: Fri, 9 Oct 2020 14:35:22 +0200 Subject: [PATCH 3/3] Change name of instrumentation indicator flag --- .../opentelemetry/instrumentation/aiohttp_client/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aiohttp-client/src/opentelemetry/instrumentation/aiohttp_client/__init__.py b/instrumentation/opentelemetry-instrumentation-aiohttp-client/src/opentelemetry/instrumentation/aiohttp_client/__init__.py index 2a7ec945eee..6606c483314 100644 --- a/instrumentation/opentelemetry-instrumentation-aiohttp-client/src/opentelemetry/instrumentation/aiohttp_client/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aiohttp-client/src/opentelemetry/instrumentation/aiohttp_client/__init__.py @@ -272,7 +272,7 @@ def instrumented_init(wrapped, instance, args, kwargs): span_name=span_name, tracer_provider=tracer_provider, ) - trace_config.opentelemetry_aiohttp_trace_config = True + trace_config.opentelemetry_aiohttp_instrumented = True trace_configs.append(trace_config) kwargs["trace_configs"] = trace_configs @@ -295,7 +295,7 @@ def _uninstrument_session(client_session: aiohttp.ClientSession): client_session._trace_configs = [ trace_config for trace_config in trace_configs - if not hasattr(trace_config, "opentelemetry_aiohttp_trace_config") + if not hasattr(trace_config, "opentelemetry_aiohttp_instrumented") ]