From 9ccb612dbef50fa871448e9a4499dbd86e1a06d1 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Wed, 18 May 2022 11:54:36 +0300 Subject: [PATCH 01/21] Added aio-pika instrumentation --- .../README.rst | 22 +++++ .../setup.cfg | 58 ++++++++++++ .../setup.py | 89 +++++++++++++++++++ .../instrumentation/aio_pika/__init__.py | 60 +++++++++++++ .../aio_pika/aio_pika_getter.py | 29 ++++++ .../aio_pika/aio_pika_instrumentor.py | 43 +++++++++ .../aio_pika/instrumented_exchange.py | 46 ++++++++++ .../aio_pika/instrumented_queue.py | 72 +++++++++++++++ .../instrumentation/aio_pika/package.py | 16 ++++ .../instrumentation/aio_pika/span_builder.py | 80 +++++++++++++++++ .../instrumentation/aio_pika/version.py | 15 ++++ .../tests/__init__.py | 0 .../tests/consts.py | 18 ++++ .../tests/test_aio_pika_getter.py | 34 +++++++ .../tests/test_aio_pika_instrumentation.py | 35 ++++++++ .../tests/test_instrumented_exchange.py | 62 +++++++++++++ .../tests/test_instrumented_queue.py | 73 +++++++++++++++ .../tests/test_span_builder.py | 38 ++++++++ 18 files changed, 790 insertions(+) create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/README.rst create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/setup.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_getter.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/tests/__init__.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_getter.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst b/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst new file mode 100644 index 0000000000..524b880da3 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst @@ -0,0 +1,22 @@ +OpenTelemetry aio_pika Instrumentation +================================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-aio-pika.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-aio-pika/ + +This library allows tracing requests made by the aio_pika library. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-aio-pika + +References +---------- + +* `OpenTelemetry aio_pika/ Tracing `_ +* `OpenTelemetry Project `_ diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg new file mode 100644 index 0000000000..6a32f4f6ac --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg @@ -0,0 +1,58 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-instrumentation-aio-pika +description = OpenTelemetry aio_pika instrumentation +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-aio-pika +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 + +[options] +python_requires = >=3.6 +package_dir= + =src +packages=find_namespace: + +install_requires = + opentelemetry-api ~= 1.5 + wrapt >= 1.0.0, < 2.0.0 + +[options.extras_require] +test = + pytest + wrapt >= 1.0.0, < 2.0.0 + opentelemetry-test-utils == 0.30b1 + +[options.packages.find] +where = src + +[options.entry_points] +opentelemetry_instrumentor = + aio-pika = opentelemetry.instrumentation.aio_pika:AioPikaInstrumentor diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/setup.py b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.py new file mode 100644 index 0000000000..40caf10755 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.py @@ -0,0 +1,89 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# DO NOT EDIT. THIS FILE WAS AUTOGENERATED FROM templates/instrumentation_setup.py.txt. +# RUN `python scripts/generate_setup.py` TO REGENERATE. + + +import distutils.cmd +import json +import os +from configparser import ConfigParser + +import setuptools + +config = ConfigParser() +config.read("setup.cfg") + +# We provide extras_require parameter to setuptools.setup later which +# overwrites the extras_require section from setup.cfg. To support extras_require +# section in setup.cfg, we load it here and merge it with the extras_require param. +extras_require = {} +if "options.extras_require" in config: + for key, value in config["options.extras_require"].items(): + extras_require[key] = [v for v in value.split("\n") if v.strip()] + +BASE_DIR = os.path.dirname(__file__) +PACKAGE_INFO = {} + +VERSION_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "instrumentation", "aio_pika", "version.py" +) +with open(VERSION_FILENAME, encoding="utf-8") as f: + exec(f.read(), PACKAGE_INFO) + +PACKAGE_FILENAME = os.path.join( + BASE_DIR, "src", "opentelemetry", "instrumentation", "aio_pika", "package.py" +) +with open(PACKAGE_FILENAME, encoding="utf-8") as f: + exec(f.read(), PACKAGE_INFO) + +# Mark any instruments/runtime dependencies as test dependencies as well. +extras_require["instruments"] = PACKAGE_INFO["_instruments"] +test_deps = extras_require.get("test", []) +for dep in extras_require["instruments"]: + test_deps.append(dep) + +extras_require["test"] = test_deps + + +class JSONMetadataCommand(distutils.cmd.Command): + + description = ( + "print out package metadata as JSON. This is used by OpenTelemetry dev scripts to ", + "auto-generate code in other places", + ) + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + metadata = { + "name": config["metadata"]["name"], + "version": PACKAGE_INFO["__version__"], + "instruments": PACKAGE_INFO["_instruments"], + } + print(json.dumps(metadata)) + + +setuptools.setup( + cmdclass={"meta": JSONMetadataCommand}, + version=PACKAGE_INFO["__version__"], + extras_require=extras_require, +) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/__init__.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/__init__.py new file mode 100644 index 0000000000..6e78db08c9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/__init__.py @@ -0,0 +1,60 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Instrument `aio_pika` to trace RabbitMQ applications. + +Usage +----- + +* Start broker backend + +.. code-block:: python + + docker run -p 5672:5672 rabbitmq + +* Run instrumented task + +.. code-block:: python + + import asyncio + + from aio_pika import Message, connect + from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor + + AioPikaInstrumentor().instrument() + + + async def main() -> None: + connection = await connect("amqp://guest:guest@localhost/") + async with connection: + channel = await connection.channel() + queue = await channel.declare_queue("hello") + await channel.default_exchange.publish( + Message(b"Hello World!"), + routing_key=queue.name, + ) + + + if __name__ == "__main__": + asyncio.run(main()) + +API +--- +""" +# pylint: disable=import-error + +from .aio_pika_instrumentor import AioPikaInstrumentor +from .version import __version__ + +__all__ = ["AioPikaInstrumentor", "__version__"] diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_getter.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_getter.py new file mode 100644 index 0000000000..39524cf71c --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_getter.py @@ -0,0 +1,29 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Optional, List +from opentelemetry.propagators.textmap import Getter, CarrierT + + +class _AioPikaGetter(Getter): # type: ignore + def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]: + value = carrier.get(key, None) + if value is None: + return None + return [value] + + def keys(self, carrier: CarrierT) -> List[str]: + return [] + + +aio_pika_getter = _AioPikaGetter() diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py new file mode 100644 index 0000000000..a8bd1f7903 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py @@ -0,0 +1,43 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Collection + +import aio_pika +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor + +from .package import _instruments +from .span_builder import SpanBuilder +from .instrumented_exchange import InstrumentedExchange, RobustInstrumentedExchange +from .instrumented_queue import InstrumentedQueue, RobustInstrumentedQueue + + +class AioPikaInstrumentor(BaseInstrumentor): + + def _instrument(self, **kwargs): + tracer_provider = kwargs.get('tracer_provider', None) + SpanBuilder.TRACER_PROVIDER = tracer_provider + aio_pika.Channel.EXCHANGE_CLASS = InstrumentedExchange + aio_pika.Channel.QUEUE_CLASS = InstrumentedQueue + aio_pika.RobustChannel.EXCHANGE_CLASS = RobustInstrumentedExchange + aio_pika.RobustChannel.QUEUE_CLASS = RobustInstrumentedQueue + + def _uninstrument(self, **kwargs): + SpanBuilder.TRACER_PROVIDER = None + aio_pika.Channel.EXCHANGE_CLASS = aio_pika.Exchange + aio_pika.Channel.QUEUE_CLASS = aio_pika.Queue + aio_pika.RobustChannel.EXCHANGE_CLASS = aio_pika.RobustExchange + aio_pika.RobustChannel.QUEUE_CLASS = aio_pika.RobustQueue + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py new file mode 100644 index 0000000000..beb1786960 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py @@ -0,0 +1,46 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Optional + +import aiormq +from aio_pika import Exchange, RobustExchange +from aio_pika.abc import AbstractMessage +from opentelemetry import trace, propagate +from opentelemetry.trace import Span + +from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder + + +class InstrumentedExchange(Exchange): + def _get_publish_span(self, message: AbstractMessage, routing_key: str) -> Optional[Span]: + builder = SpanBuilder() + builder.set_as_producer() + builder.set_destination(f'{self.name},{routing_key}') + builder.set_channel(self.channel) + builder.set_message(message) + return builder.build() + + async def publish(self, message: AbstractMessage, routing_key: str, **kwargs) -> Optional[aiormq.abc.ConfirmationFrameType]: + span = self._get_publish_span(message, routing_key) + if not span: + return await super().publish(message, routing_key, **kwargs) + with trace.use_span(span, end_on_exit=True): + if span.is_recording(): + propagate.inject(message.properties.headers) + return_value = await super().publish(message, routing_key, **kwargs) + return return_value + + +class RobustInstrumentedExchange(RobustExchange, InstrumentedExchange): + pass diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py new file mode 100644 index 0000000000..ff1bc1745e --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py @@ -0,0 +1,72 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Any, Callable, Optional + +from aio_pika import Queue, RobustQueue, Channel, connect +from aio_pika.abc import AbstractIncomingMessage +from aio_pika.queue import ConsumerTag +from opentelemetry import trace, propagate, context +from opentelemetry.trace import Span +from opentelemetry.semconv.trace import MessagingOperationValues + +from .aio_pika_getter import aio_pika_getter +from .span_builder import SpanBuilder + + +class InstrumentedQueue(Queue): + def _get_callback_span(self, message: AbstractIncomingMessage) -> Optional[Span]: + builder = SpanBuilder() + builder.set_as_consumer() + builder.set_operation(MessagingOperationValues.RECEIVE) + builder.set_destination(message.exchange or message.routing_key) + builder.set_channel(self.channel) + builder.set_message(message) + return builder.build() + + def _decorate_callback(self, callback: Callable[[AbstractIncomingMessage], Any]) -> Callable[[AbstractIncomingMessage], Any]: + async def decorated(message: AbstractIncomingMessage): + headers = message.headers or dict() + ctx = propagate.extract(headers, getter=aio_pika_getter) or context.get_current() + token = context.attach(ctx) + span = self._get_callback_span(message) + if not span: + return await callback(message) + try: + with trace.use_span(span, end_on_exit=True): + return_value = await callback(message) + finally: + context.detach(token) + return return_value + + return decorated + + async def consume(self, callback: Callable[[AbstractIncomingMessage], Any], no_ack: bool = False, exclusive: bool = False, + arguments: dict = None, consumer_tag=None, timeout=None) -> ConsumerTag: + decorated_callback = self._decorate_callback(callback) + return await super().consume(decorated_callback, no_ack, exclusive, arguments, consumer_tag, timeout) + + +class RobustInstrumentedQueue(RobustQueue, InstrumentedQueue): + async def consume(self, callback: Callable[[AbstractIncomingMessage], Any], no_ack: bool = False, exclusive: bool = False, + arguments: dict = None, consumer_tag=None, timeout=None, robust: bool = True) -> ConsumerTag: + await self.connection.connected.wait() + consumer_tag = await InstrumentedQueue.consume(self, callback, no_ack, exclusive, arguments, consumer_tag, timeout) + if robust: + self._consumers[consumer_tag] = dict( + callback=callback, + no_ack=no_ack, + exclusive=exclusive, + arguments=arguments + ) + return consumer_tag diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py new file mode 100644 index 0000000000..6c7ed74ea4 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py @@ -0,0 +1,16 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Collection + +_instruments: Collection[str] = ("aio_pika ~= 7.2.0",) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py new file mode 100644 index 0000000000..e63db9fcc2 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -0,0 +1,80 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Optional + +from aio_pika.abc import AbstractChannel, AbstractMessage +from opentelemetry import context, trace +from opentelemetry.trace import Span, SpanKind +from opentelemetry.semconv.trace import MessagingOperationValues, SpanAttributes + +from .version import __version__ + + +class SpanBuilder: + _DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: 'rabbitmq'} + TRACER_PROVIDER = None + + def __init__(self): + self._attributes = self._DEFAULT_ATTRIBUTES.copy() + self._operation: MessagingOperationValues = None + self._kind: SpanKind = None + self._destination: str = None + + def set_as_producer(self): + self._kind = SpanKind.PRODUCER + + def set_as_consumer(self): + self._kind = SpanKind.CONSUMER + + def set_operation(self, operation: MessagingOperationValues): + self._operation = operation + + def set_destination(self, destination: str): + self._destination = destination + self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination + + def set_channel(self, channel: AbstractChannel): + url = channel.connection.connection.url + self._attributes.update({ + SpanAttributes.NET_PEER_NAME: url.host, + SpanAttributes.NET_PEER_PORT: url.port + }) + + def set_message(self, message: AbstractMessage): + properties = message.properties + if properties.message_id: + self._attributes[SpanAttributes.MESSAGING_MESSAGE_ID] = properties.message_id + if properties.correlation_id: + self._attributes[SpanAttributes.MESSAGING_CONVERSATION_ID] = properties.correlation_id + + def build(self) -> Optional[Span]: + if context.get_value('suppress_instrumentation') or context.get_value(context._SUPPRESS_INSTRUMENTATION_KEY): + return None + assert self._kind, 'kind must be configured.' + assert self._destination, 'destination must be configured.' + tracer = trace.get_tracer(__name__, __version__, self.TRACER_PROVIDER) + span = tracer.start_span(self._generate_span_name(), kind=self._kind) + if not span.is_recording(): + return span + for attribute_name, attribute_value in self._attributes.items(): + span.set_attribute(attribute_name, attribute_value) + if self._operation: + span.set_attribute(SpanAttributes.MESSAGING_OPERATION, self._operation.value) + else: + span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) + return span + + def _generate_span_name(self) -> str: + operation_value = self._operation.value if self._operation else 'send' + return f'{self._destination} {operation_value}' diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py new file mode 100644 index 0000000000..88015aae34 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.30b1" diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/__init__.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py new file mode 100644 index 0000000000..4553a4557b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py @@ -0,0 +1,18 @@ +from argparse import Namespace + +from yarl import URL + +MESSAGE_ID = 'meesage_id' +CORRELATION_ID = 'correlation_id' +MESSAGING_SYSTEM = 'rabbitmq' +EXCHANGE_NAME = 'exchange_name' +QUEUE_NAME = 'queue_name' +ROUTING_KEY = 'routing_key' +SERVER_HOST = 'localhost' +SERVER_PORT = 1234 +SERVER_USER = 'guest' +SERVER_PASS = 'guest' +SERVER_URL = URL(f'amqp://{SERVER_USER}:{SERVER_PASS}@{SERVER_HOST}:{SERVER_PORT}/') +CONNECTION = Namespace(connection=Namespace(url=SERVER_URL)) +CHANNEL = Namespace(connection=CONNECTION, loop=None) +MESSAGE = Namespace(properties=Namespace(message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers=dict()), exchange=EXCHANGE_NAME, headers=dict()) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_getter.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_getter.py new file mode 100644 index 0000000000..aa1d3b822b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_getter.py @@ -0,0 +1,34 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase + +from opentelemetry.instrumentation.aio_pika.aio_pika_getter import aio_pika_getter + + +class TestAioPikaGetter(TestCase): + def test_get_none(self) -> None: + carrier = {} + value = aio_pika_getter.get(carrier, "test") + self.assertIsNone(value) + + def test_get_value(self) -> None: + key = "test" + value = "value" + carrier = {key: value} + val = aio_pika_getter.get(carrier, key) + self.assertEqual(val, [value]) + + def test_keys(self): + keys = aio_pika_getter.keys({}) + self.assertEqual(keys, []) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py new file mode 100644 index 0000000000..f09fa5dd7e --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py @@ -0,0 +1,35 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import aio_pika +from unittest import TestCase + +from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor +from opentelemetry.instrumentation.aio_pika.instrumented_exchange import InstrumentedExchange, RobustInstrumentedExchange +from opentelemetry.instrumentation.aio_pika.instrumented_queue import InstrumentedQueue, RobustInstrumentedQueue + + +class TestPika(TestCase): + def test_instrument_api(self) -> None: + instrumentation = AioPikaInstrumentor() + instrumentation.instrument() + self.assertTrue(aio_pika.Channel.EXCHANGE_CLASS == InstrumentedExchange) + self.assertTrue(aio_pika.Channel.QUEUE_CLASS == InstrumentedQueue) + self.assertTrue(aio_pika.RobustChannel.EXCHANGE_CLASS == RobustInstrumentedExchange) + self.assertTrue(aio_pika.RobustChannel.QUEUE_CLASS == RobustInstrumentedQueue) + + instrumentation.uninstrument() + self.assertFalse(aio_pika.Channel.EXCHANGE_CLASS == InstrumentedExchange) + self.assertFalse(aio_pika.Channel.QUEUE_CLASS == InstrumentedQueue) + self.assertFalse(aio_pika.RobustChannel.EXCHANGE_CLASS == RobustInstrumentedExchange) + self.assertFalse(aio_pika.RobustChannel.QUEUE_CLASS == RobustInstrumentedQueue) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py new file mode 100644 index 0000000000..c18b8f6514 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py @@ -0,0 +1,62 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from typing import Type +from argparse import Namespace +from unittest import TestCase, mock + +from yarl import URL +from aio_pika import Exchange +from opentelemetry.trace import Span, NonRecordingSpan +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.instrumentation.aio_pika.instrumented_exchange import InstrumentedExchange, RobustInstrumentedExchange + +from .consts import MESSAGING_SYSTEM, SERVER_HOST, SERVER_PORT, MESSAGE_ID, CORRELATION_ID, EXCHANGE_NAME, ROUTING_KEY, MESSAGE, CONNECTION, CHANNEL + +class TestInstrumentedExchange(TestCase): + EXPECTED_ATTRIBUTES = { + SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + SpanAttributes.MESSAGING_DESTINATION: f'{EXCHANGE_NAME},{ROUTING_KEY}', + SpanAttributes.NET_PEER_NAME: SERVER_HOST, + SpanAttributes.NET_PEER_PORT: SERVER_PORT, + SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, + SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, + SpanAttributes.MESSAGING_TEMP_DESTINATION: True + } + + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def test_get_publish_span(self): + exchange = InstrumentedExchange(CONNECTION, CHANNEL, EXCHANGE_NAME) + with mock.patch.object(NonRecordingSpan, 'is_recording', return_value=True): + with mock.patch.object(NonRecordingSpan, 'set_attribute') as mock_set_attrubute: + exchange._get_publish_span(MESSAGE, ROUTING_KEY) + for name, value in self.EXPECTED_ATTRIBUTES.items(): + mock_set_attrubute.assert_any_call(name, value) + + def _test_publish(self, exchange_type: Type[InstrumentedExchange]): + exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME) + with mock.patch.object(InstrumentedExchange, '_get_publish_span') as mock_get_publish_span: + with mock.patch.object(Exchange, 'publish', return_value=asyncio.sleep(0)) as mock_publish: + self.loop.run_until_complete(exchange.publish(MESSAGE, ROUTING_KEY)) + mock_publish.assert_called_once() + mock_get_publish_span.assert_called_once() + + def test_publish(self): + self._test_publish(InstrumentedExchange) + + def test_robust_publish(self): + self._test_publish(RobustInstrumentedExchange) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py new file mode 100644 index 0000000000..9ed64d1675 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py @@ -0,0 +1,73 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from typing import Type +from argparse import Namespace +from unittest import TestCase, mock + +from yarl import URL +from aio_pika import Queue +from opentelemetry.trace import Span, NonRecordingSpan +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.instrumentation.aio_pika.instrumented_queue import InstrumentedQueue, RobustInstrumentedQueue + +from .consts import MESSAGING_SYSTEM, SERVER_HOST, SERVER_PORT, MESSAGE_ID, CORRELATION_ID, QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, MESSAGE, CONNECTION, CHANNEL + +class TestInstrumentedQueue(TestCase): + EXPECTED_ATTRIBUTES = { + SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME, + SpanAttributes.NET_PEER_NAME: SERVER_HOST, + SpanAttributes.NET_PEER_PORT: SERVER_PORT, + SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, + SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID + } + + def setUp(self): + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def test_get_callback_span(self): + queue = InstrumentedQueue(CHANNEL, QUEUE_NAME, False, False, False, None) + with mock.patch.object(NonRecordingSpan, 'is_recording', return_value=True): + with mock.patch.object(NonRecordingSpan, 'set_attribute') as mock_set_attrubute: + queue._get_callback_span(MESSAGE) + for name, value in self.EXPECTED_ATTRIBUTES.items(): + mock_set_attrubute.assert_any_call(name, value) + + def test_decorate_callback(self): + queue = InstrumentedQueue(CHANNEL, QUEUE_NAME, False, False, False, None) + callback = mock.MagicMock(return_value=asyncio.sleep(0)) + with mock.patch.object(InstrumentedQueue, '_get_callback_span') as mocked_get_callback_span: + decorated_callback = queue._decorate_callback(callback) + self.loop.run_until_complete(decorated_callback(MESSAGE)) + mocked_get_callback_span.assert_called_once() + callback.assert_called_once_with(MESSAGE) + + def _test_consume(self, queue_type: Type[InstrumentedQueue]): + queue = queue_type(CHANNEL, QUEUE_NAME, False, False, False, None) + callback = mock.MagicMock() + CONNECTION.connected = asyncio.Event() + CONNECTION.connected.set() + with mock.patch.object(InstrumentedQueue, '_decorate_callback') as mocked_decorate_callback: + with mock.patch.object(Queue, 'consume', return_value=asyncio.sleep(0)) as mocked_consume: + self.loop.run_until_complete(queue.consume(callback)) + mocked_decorate_callback.assert_called_once_with(callback) + mocked_consume.assert_called_once() + + def test_consume(self): + self._test_consume(InstrumentedQueue) + + def test_robust_consume(self): + self._test_consume(RobustInstrumentedQueue) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py new file mode 100644 index 0000000000..0d3c32a8e3 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py @@ -0,0 +1,38 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase + +from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder +from opentelemetry.trace import Span + + +class TestBuilder(TestCase): + def test_build(self): + builder = SpanBuilder() + builder.set_as_consumer() + builder.set_destination('destination') + span = builder.build() + self.assertTrue(isinstance(span, Span)) + + def test_no_destination(self): + builder = SpanBuilder() + builder.set_as_consumer() + with self.assertRaises(AssertionError): + builder.build() + + def test_no_kind(self): + builder = SpanBuilder() + builder.set_destination('destination') + with self.assertRaises(AssertionError): + builder.build() From 191d803bb97c3d3db4c4ce19d3741e04e21c32e3 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Wed, 18 May 2022 16:35:11 +0300 Subject: [PATCH 02/21] Ran lint --- .../setup.py | 14 +++- .../aio_pika/aio_pika_getter.py | 7 +- .../aio_pika/aio_pika_instrumentor.py | 15 ++-- .../aio_pika/instrumented_exchange.py | 20 ++++-- .../aio_pika/instrumented_queue.py | 72 +++++++++++++------ .../instrumentation/aio_pika/span_builder.py | 6 +- .../tests/consts.py | 30 +++++--- .../tests/test_aio_pika_getter.py | 4 +- .../tests/test_aio_pika_instrumentation.py | 37 +++++++--- .../tests/test_instrumented_exchange.py | 56 +++++++++++---- .../tests/test_instrumented_queue.py | 63 +++++++++++----- 11 files changed, 233 insertions(+), 91 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/setup.py b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.py index 40caf10755..bd74b8f7bc 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/setup.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.py @@ -39,13 +39,23 @@ PACKAGE_INFO = {} VERSION_FILENAME = os.path.join( - BASE_DIR, "src", "opentelemetry", "instrumentation", "aio_pika", "version.py" + BASE_DIR, + "src", + "opentelemetry", + "instrumentation", + "aio_pika", + "version.py", ) with open(VERSION_FILENAME, encoding="utf-8") as f: exec(f.read(), PACKAGE_INFO) PACKAGE_FILENAME = os.path.join( - BASE_DIR, "src", "opentelemetry", "instrumentation", "aio_pika", "package.py" + BASE_DIR, + "src", + "opentelemetry", + "instrumentation", + "aio_pika", + "package.py", ) with open(PACKAGE_FILENAME, encoding="utf-8") as f: exec(f.read(), PACKAGE_INFO) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_getter.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_getter.py index 39524cf71c..c3d0ccc4c6 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_getter.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_getter.py @@ -11,8 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Optional, List -from opentelemetry.propagators.textmap import Getter, CarrierT +from typing import List, Optional + +from opentelemetry.propagators.textmap import CarrierT, Getter class _AioPikaGetter(Getter): # type: ignore @@ -21,7 +22,7 @@ def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]: if value is None: return None return [value] - + def keys(self, carrier: CarrierT) -> List[str]: return [] diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py index a8bd1f7903..a30e9af664 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py @@ -14,30 +14,33 @@ from typing import Collection import aio_pika + from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from .instrumented_exchange import ( + InstrumentedExchange, + RobustInstrumentedExchange, +) +from .instrumented_queue import InstrumentedQueue, RobustInstrumentedQueue from .package import _instruments from .span_builder import SpanBuilder -from .instrumented_exchange import InstrumentedExchange, RobustInstrumentedExchange -from .instrumented_queue import InstrumentedQueue, RobustInstrumentedQueue class AioPikaInstrumentor(BaseInstrumentor): - def _instrument(self, **kwargs): - tracer_provider = kwargs.get('tracer_provider', None) + tracer_provider = kwargs.get("tracer_provider", None) SpanBuilder.TRACER_PROVIDER = tracer_provider aio_pika.Channel.EXCHANGE_CLASS = InstrumentedExchange aio_pika.Channel.QUEUE_CLASS = InstrumentedQueue aio_pika.RobustChannel.EXCHANGE_CLASS = RobustInstrumentedExchange aio_pika.RobustChannel.QUEUE_CLASS = RobustInstrumentedQueue - + def _uninstrument(self, **kwargs): SpanBuilder.TRACER_PROVIDER = None aio_pika.Channel.EXCHANGE_CLASS = aio_pika.Exchange aio_pika.Channel.QUEUE_CLASS = aio_pika.Queue aio_pika.RobustChannel.EXCHANGE_CLASS = aio_pika.RobustExchange aio_pika.RobustChannel.QUEUE_CLASS = aio_pika.RobustQueue - + def instrumentation_dependencies(self) -> Collection[str]: return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py index beb1786960..e8bf26fe79 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py @@ -16,29 +16,35 @@ import aiormq from aio_pika import Exchange, RobustExchange from aio_pika.abc import AbstractMessage -from opentelemetry import trace, propagate -from opentelemetry.trace import Span +from opentelemetry import propagate, trace from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder +from opentelemetry.trace import Span class InstrumentedExchange(Exchange): - def _get_publish_span(self, message: AbstractMessage, routing_key: str) -> Optional[Span]: + def _get_publish_span( + self, message: AbstractMessage, routing_key: str + ) -> Optional[Span]: builder = SpanBuilder() builder.set_as_producer() - builder.set_destination(f'{self.name},{routing_key}') + builder.set_destination(f"{self.name},{routing_key}") builder.set_channel(self.channel) builder.set_message(message) return builder.build() - - async def publish(self, message: AbstractMessage, routing_key: str, **kwargs) -> Optional[aiormq.abc.ConfirmationFrameType]: + + async def publish( + self, message: AbstractMessage, routing_key: str, **kwargs + ) -> Optional[aiormq.abc.ConfirmationFrameType]: span = self._get_publish_span(message, routing_key) if not span: return await super().publish(message, routing_key, **kwargs) with trace.use_span(span, end_on_exit=True): if span.is_recording(): propagate.inject(message.properties.headers) - return_value = await super().publish(message, routing_key, **kwargs) + return_value = await super().publish( + message, routing_key, **kwargs + ) return return_value diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py index ff1bc1745e..5976a0e98c 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py @@ -13,19 +13,22 @@ # limitations under the License. from typing import Any, Callable, Optional -from aio_pika import Queue, RobustQueue, Channel, connect +from aio_pika import Channel, Queue, RobustQueue, connect from aio_pika.abc import AbstractIncomingMessage from aio_pika.queue import ConsumerTag -from opentelemetry import trace, propagate, context -from opentelemetry.trace import Span + +from opentelemetry import context, propagate, trace from opentelemetry.semconv.trace import MessagingOperationValues +from opentelemetry.trace import Span from .aio_pika_getter import aio_pika_getter from .span_builder import SpanBuilder class InstrumentedQueue(Queue): - def _get_callback_span(self, message: AbstractIncomingMessage) -> Optional[Span]: + def _get_callback_span( + self, message: AbstractIncomingMessage + ) -> Optional[Span]: builder = SpanBuilder() builder.set_as_consumer() builder.set_operation(MessagingOperationValues.RECEIVE) @@ -33,11 +36,16 @@ def _get_callback_span(self, message: AbstractIncomingMessage) -> Optional[Span] builder.set_channel(self.channel) builder.set_message(message) return builder.build() - - def _decorate_callback(self, callback: Callable[[AbstractIncomingMessage], Any]) -> Callable[[AbstractIncomingMessage], Any]: + + def _decorate_callback( + self, callback: Callable[[AbstractIncomingMessage], Any] + ) -> Callable[[AbstractIncomingMessage], Any]: async def decorated(message: AbstractIncomingMessage): headers = message.headers or dict() - ctx = propagate.extract(headers, getter=aio_pika_getter) or context.get_current() + ctx = ( + propagate.extract(headers, getter=aio_pika_getter) + or context.get_current() + ) token = context.attach(ctx) span = self._get_callback_span(message) if not span: @@ -48,25 +56,49 @@ async def decorated(message: AbstractIncomingMessage): finally: context.detach(token) return return_value - + return decorated - - async def consume(self, callback: Callable[[AbstractIncomingMessage], Any], no_ack: bool = False, exclusive: bool = False, - arguments: dict = None, consumer_tag=None, timeout=None) -> ConsumerTag: + + async def consume( + self, + callback: Callable[[AbstractIncomingMessage], Any], + no_ack: bool = False, + exclusive: bool = False, + arguments: dict = None, + consumer_tag=None, + timeout=None, + ) -> ConsumerTag: decorated_callback = self._decorate_callback(callback) - return await super().consume(decorated_callback, no_ack, exclusive, arguments, consumer_tag, timeout) + return await super().consume( + decorated_callback, + no_ack, + exclusive, + arguments, + consumer_tag, + timeout, + ) class RobustInstrumentedQueue(RobustQueue, InstrumentedQueue): - async def consume(self, callback: Callable[[AbstractIncomingMessage], Any], no_ack: bool = False, exclusive: bool = False, - arguments: dict = None, consumer_tag=None, timeout=None, robust: bool = True) -> ConsumerTag: + async def consume( + self, + callback: Callable[[AbstractIncomingMessage], Any], + no_ack: bool = False, + exclusive: bool = False, + arguments: dict = None, + consumer_tag=None, + timeout=None, + robust: bool = True, + ) -> ConsumerTag: await self.connection.connected.wait() - consumer_tag = await InstrumentedQueue.consume(self, callback, no_ack, exclusive, arguments, consumer_tag, timeout) + consumer_tag = await InstrumentedQueue.consume( + self, callback, no_ack, exclusive, arguments, consumer_tag, timeout + ) if robust: self._consumers[consumer_tag] = dict( - callback=callback, - no_ack=no_ack, - exclusive=exclusive, - arguments=arguments - ) + callback=callback, + no_ack=no_ack, + exclusive=exclusive, + arguments=arguments, + ) return consumer_tag diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py index e63db9fcc2..1506f5ebc8 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -14,9 +14,13 @@ from typing import Optional from aio_pika.abc import AbstractChannel, AbstractMessage + from opentelemetry import context, trace +from opentelemetry.semconv.trace import ( + MessagingOperationValues, + SpanAttributes, +) from opentelemetry.trace import Span, SpanKind -from opentelemetry.semconv.trace import MessagingOperationValues, SpanAttributes from .version import __version__ diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py index 4553a4557b..05c82001ac 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py @@ -2,17 +2,25 @@ from yarl import URL -MESSAGE_ID = 'meesage_id' -CORRELATION_ID = 'correlation_id' -MESSAGING_SYSTEM = 'rabbitmq' -EXCHANGE_NAME = 'exchange_name' -QUEUE_NAME = 'queue_name' -ROUTING_KEY = 'routing_key' -SERVER_HOST = 'localhost' +MESSAGE_ID = "meesage_id" +CORRELATION_ID = "correlation_id" +MESSAGING_SYSTEM = "rabbitmq" +EXCHANGE_NAME = "exchange_name" +QUEUE_NAME = "queue_name" +ROUTING_KEY = "routing_key" +SERVER_HOST = "localhost" SERVER_PORT = 1234 -SERVER_USER = 'guest' -SERVER_PASS = 'guest' -SERVER_URL = URL(f'amqp://{SERVER_USER}:{SERVER_PASS}@{SERVER_HOST}:{SERVER_PORT}/') +SERVER_USER = "guest" +SERVER_PASS = "guest" +SERVER_URL = URL( + f"amqp://{SERVER_USER}:{SERVER_PASS}@{SERVER_HOST}:{SERVER_PORT}/" +) CONNECTION = Namespace(connection=Namespace(url=SERVER_URL)) CHANNEL = Namespace(connection=CONNECTION, loop=None) -MESSAGE = Namespace(properties=Namespace(message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers=dict()), exchange=EXCHANGE_NAME, headers=dict()) +MESSAGE = Namespace( + properties=Namespace( + message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers=dict() + ), + exchange=EXCHANGE_NAME, + headers=dict(), +) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_getter.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_getter.py index aa1d3b822b..dd25362883 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_getter.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_getter.py @@ -13,7 +13,9 @@ # limitations under the License. from unittest import TestCase -from opentelemetry.instrumentation.aio_pika.aio_pika_getter import aio_pika_getter +from opentelemetry.instrumentation.aio_pika.aio_pika_getter import ( + aio_pika_getter, +) class TestAioPikaGetter(TestCase): diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py index f09fa5dd7e..26bca5c21d 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py @@ -11,25 +11,44 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import aio_pika from unittest import TestCase +import aio_pika + from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor -from opentelemetry.instrumentation.aio_pika.instrumented_exchange import InstrumentedExchange, RobustInstrumentedExchange -from opentelemetry.instrumentation.aio_pika.instrumented_queue import InstrumentedQueue, RobustInstrumentedQueue +from opentelemetry.instrumentation.aio_pika.instrumented_exchange import ( + InstrumentedExchange, + RobustInstrumentedExchange, +) +from opentelemetry.instrumentation.aio_pika.instrumented_queue import ( + InstrumentedQueue, + RobustInstrumentedQueue, +) class TestPika(TestCase): def test_instrument_api(self) -> None: instrumentation = AioPikaInstrumentor() instrumentation.instrument() - self.assertTrue(aio_pika.Channel.EXCHANGE_CLASS == InstrumentedExchange) + self.assertTrue( + aio_pika.Channel.EXCHANGE_CLASS == InstrumentedExchange + ) self.assertTrue(aio_pika.Channel.QUEUE_CLASS == InstrumentedQueue) - self.assertTrue(aio_pika.RobustChannel.EXCHANGE_CLASS == RobustInstrumentedExchange) - self.assertTrue(aio_pika.RobustChannel.QUEUE_CLASS == RobustInstrumentedQueue) + self.assertTrue( + aio_pika.RobustChannel.EXCHANGE_CLASS == RobustInstrumentedExchange + ) + self.assertTrue( + aio_pika.RobustChannel.QUEUE_CLASS == RobustInstrumentedQueue + ) instrumentation.uninstrument() - self.assertFalse(aio_pika.Channel.EXCHANGE_CLASS == InstrumentedExchange) + self.assertFalse( + aio_pika.Channel.EXCHANGE_CLASS == InstrumentedExchange + ) self.assertFalse(aio_pika.Channel.QUEUE_CLASS == InstrumentedQueue) - self.assertFalse(aio_pika.RobustChannel.EXCHANGE_CLASS == RobustInstrumentedExchange) - self.assertFalse(aio_pika.RobustChannel.QUEUE_CLASS == RobustInstrumentedQueue) + self.assertFalse( + aio_pika.RobustChannel.EXCHANGE_CLASS == RobustInstrumentedExchange + ) + self.assertFalse( + aio_pika.RobustChannel.QUEUE_CLASS == RobustInstrumentedQueue + ) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py index c18b8f6514..e43454395b 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py @@ -12,49 +12,75 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio -from typing import Type from argparse import Namespace +from typing import Type from unittest import TestCase, mock -from yarl import URL from aio_pika import Exchange -from opentelemetry.trace import Span, NonRecordingSpan +from yarl import URL + +from opentelemetry.instrumentation.aio_pika.instrumented_exchange import ( + InstrumentedExchange, + RobustInstrumentedExchange, +) from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.instrumentation.aio_pika.instrumented_exchange import InstrumentedExchange, RobustInstrumentedExchange +from opentelemetry.trace import NonRecordingSpan, Span + +from .consts import ( + CHANNEL, + CONNECTION, + CORRELATION_ID, + EXCHANGE_NAME, + MESSAGE, + MESSAGE_ID, + MESSAGING_SYSTEM, + ROUTING_KEY, + SERVER_HOST, + SERVER_PORT, +) -from .consts import MESSAGING_SYSTEM, SERVER_HOST, SERVER_PORT, MESSAGE_ID, CORRELATION_ID, EXCHANGE_NAME, ROUTING_KEY, MESSAGE, CONNECTION, CHANNEL class TestInstrumentedExchange(TestCase): EXPECTED_ATTRIBUTES = { SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, - SpanAttributes.MESSAGING_DESTINATION: f'{EXCHANGE_NAME},{ROUTING_KEY}', + SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}", SpanAttributes.NET_PEER_NAME: SERVER_HOST, SpanAttributes.NET_PEER_PORT: SERVER_PORT, SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, - SpanAttributes.MESSAGING_TEMP_DESTINATION: True + SpanAttributes.MESSAGING_TEMP_DESTINATION: True, } - + def setUp(self): self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) def test_get_publish_span(self): exchange = InstrumentedExchange(CONNECTION, CHANNEL, EXCHANGE_NAME) - with mock.patch.object(NonRecordingSpan, 'is_recording', return_value=True): - with mock.patch.object(NonRecordingSpan, 'set_attribute') as mock_set_attrubute: + with mock.patch.object( + NonRecordingSpan, "is_recording", return_value=True + ): + with mock.patch.object( + NonRecordingSpan, "set_attribute" + ) as mock_set_attrubute: exchange._get_publish_span(MESSAGE, ROUTING_KEY) for name, value in self.EXPECTED_ATTRIBUTES.items(): mock_set_attrubute.assert_any_call(name, value) - + def _test_publish(self, exchange_type: Type[InstrumentedExchange]): exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME) - with mock.patch.object(InstrumentedExchange, '_get_publish_span') as mock_get_publish_span: - with mock.patch.object(Exchange, 'publish', return_value=asyncio.sleep(0)) as mock_publish: - self.loop.run_until_complete(exchange.publish(MESSAGE, ROUTING_KEY)) + with mock.patch.object( + InstrumentedExchange, "_get_publish_span" + ) as mock_get_publish_span: + with mock.patch.object( + Exchange, "publish", return_value=asyncio.sleep(0) + ) as mock_publish: + self.loop.run_until_complete( + exchange.publish(MESSAGE, ROUTING_KEY) + ) mock_publish.assert_called_once() mock_get_publish_span.assert_called_once() - + def test_publish(self): self._test_publish(InstrumentedExchange) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py index 9ed64d1675..dd312c5384 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py @@ -12,17 +12,34 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio -from typing import Type from argparse import Namespace +from typing import Type from unittest import TestCase, mock -from yarl import URL from aio_pika import Queue -from opentelemetry.trace import Span, NonRecordingSpan +from yarl import URL + +from opentelemetry.instrumentation.aio_pika.instrumented_queue import ( + InstrumentedQueue, + RobustInstrumentedQueue, +) from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.instrumentation.aio_pika.instrumented_queue import InstrumentedQueue, RobustInstrumentedQueue +from opentelemetry.trace import NonRecordingSpan, Span + +from .consts import ( + CHANNEL, + CONNECTION, + CORRELATION_ID, + EXCHANGE_NAME, + MESSAGE, + MESSAGE_ID, + MESSAGING_SYSTEM, + QUEUE_NAME, + ROUTING_KEY, + SERVER_HOST, + SERVER_PORT, +) -from .consts import MESSAGING_SYSTEM, SERVER_HOST, SERVER_PORT, MESSAGE_ID, CORRELATION_ID, QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY, MESSAGE, CONNECTION, CHANNEL class TestInstrumentedQueue(TestCase): EXPECTED_ATTRIBUTES = { @@ -31,7 +48,7 @@ class TestInstrumentedQueue(TestCase): SpanAttributes.NET_PEER_NAME: SERVER_HOST, SpanAttributes.NET_PEER_PORT: SERVER_PORT, SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, - SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID + SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, } def setUp(self): @@ -39,17 +56,27 @@ def setUp(self): asyncio.set_event_loop(self.loop) def test_get_callback_span(self): - queue = InstrumentedQueue(CHANNEL, QUEUE_NAME, False, False, False, None) - with mock.patch.object(NonRecordingSpan, 'is_recording', return_value=True): - with mock.patch.object(NonRecordingSpan, 'set_attribute') as mock_set_attrubute: + queue = InstrumentedQueue( + CHANNEL, QUEUE_NAME, False, False, False, None + ) + with mock.patch.object( + NonRecordingSpan, "is_recording", return_value=True + ): + with mock.patch.object( + NonRecordingSpan, "set_attribute" + ) as mock_set_attrubute: queue._get_callback_span(MESSAGE) for name, value in self.EXPECTED_ATTRIBUTES.items(): mock_set_attrubute.assert_any_call(name, value) - + def test_decorate_callback(self): - queue = InstrumentedQueue(CHANNEL, QUEUE_NAME, False, False, False, None) + queue = InstrumentedQueue( + CHANNEL, QUEUE_NAME, False, False, False, None + ) callback = mock.MagicMock(return_value=asyncio.sleep(0)) - with mock.patch.object(InstrumentedQueue, '_get_callback_span') as mocked_get_callback_span: + with mock.patch.object( + InstrumentedQueue, "_get_callback_span" + ) as mocked_get_callback_span: decorated_callback = queue._decorate_callback(callback) self.loop.run_until_complete(decorated_callback(MESSAGE)) mocked_get_callback_span.assert_called_once() @@ -60,14 +87,18 @@ def _test_consume(self, queue_type: Type[InstrumentedQueue]): callback = mock.MagicMock() CONNECTION.connected = asyncio.Event() CONNECTION.connected.set() - with mock.patch.object(InstrumentedQueue, '_decorate_callback') as mocked_decorate_callback: - with mock.patch.object(Queue, 'consume', return_value=asyncio.sleep(0)) as mocked_consume: + with mock.patch.object( + InstrumentedQueue, "_decorate_callback" + ) as mocked_decorate_callback: + with mock.patch.object( + Queue, "consume", return_value=asyncio.sleep(0) + ) as mocked_consume: self.loop.run_until_complete(queue.consume(callback)) mocked_decorate_callback.assert_called_once_with(callback) mocked_consume.assert_called_once() - + def test_consume(self): self._test_consume(InstrumentedQueue) - + def test_robust_consume(self): self._test_consume(RobustInstrumentedQueue) From ab4d65c030bd431a4a01476ec26bc0bc130a76b1 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Wed, 18 May 2022 17:19:30 +0300 Subject: [PATCH 03/21] Added aio-pika to changelog --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fa2badae2..2e5e51b360 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1065](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1065)) - `opentelemetry-instrumentation-redis` now instruments asynchronous Redis clients, if the installed redis-py includes async support (>=4.2.0). ([#1076](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1076)) +- `opentelemetry-instrumentation-aio-pika` added RabbitMQ aio-pika module instrumentation. + ([#1095](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1095)) ## [1.11.1-0.30b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.11.1-0.30b1) - 2022-04-21 From b99668f9a4e2dd9d3f8ee2a80d81a9374825cce9 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Thu, 19 May 2022 13:12:49 +0300 Subject: [PATCH 04/21] Fixed according to flake8 --- .../instrumentation/aio_pika/instrumented_queue.py | 2 +- .../instrumentation/aio_pika/span_builder.py | 14 +++++++------- .../tests/test_instrumented_exchange.py | 4 +--- .../tests/test_instrumented_queue.py | 5 +---- .../tests/test_span_builder.py | 4 ++-- 5 files changed, 12 insertions(+), 17 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py index 5976a0e98c..172297fa00 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py @@ -13,7 +13,7 @@ # limitations under the License. from typing import Any, Callable, Optional -from aio_pika import Channel, Queue, RobustQueue, connect +from aio_pika import Queue, RobustQueue from aio_pika.abc import AbstractIncomingMessage from aio_pika.queue import ConsumerTag diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py index 1506f5ebc8..62dff1ceb9 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -34,34 +34,34 @@ def __init__(self): self._operation: MessagingOperationValues = None self._kind: SpanKind = None self._destination: str = None - + def set_as_producer(self): self._kind = SpanKind.PRODUCER def set_as_consumer(self): self._kind = SpanKind.CONSUMER - + def set_operation(self, operation: MessagingOperationValues): self._operation = operation - + def set_destination(self, destination: str): self._destination = destination self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination - + def set_channel(self, channel: AbstractChannel): url = channel.connection.connection.url self._attributes.update({ SpanAttributes.NET_PEER_NAME: url.host, SpanAttributes.NET_PEER_PORT: url.port }) - + def set_message(self, message: AbstractMessage): properties = message.properties if properties.message_id: self._attributes[SpanAttributes.MESSAGING_MESSAGE_ID] = properties.message_id if properties.correlation_id: self._attributes[SpanAttributes.MESSAGING_CONVERSATION_ID] = properties.correlation_id - + def build(self) -> Optional[Span]: if context.get_value('suppress_instrumentation') or context.get_value(context._SUPPRESS_INSTRUMENTATION_KEY): return None @@ -78,7 +78,7 @@ def build(self) -> Optional[Span]: else: span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) return span - + def _generate_span_name(self) -> str: operation_value = self._operation.value if self._operation else 'send' return f'{self._destination} {operation_value}' diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py index e43454395b..6fe68e6499 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py @@ -12,19 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio -from argparse import Namespace from typing import Type from unittest import TestCase, mock from aio_pika import Exchange -from yarl import URL from opentelemetry.instrumentation.aio_pika.instrumented_exchange import ( InstrumentedExchange, RobustInstrumentedExchange, ) from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.trace import NonRecordingSpan, Span +from opentelemetry.trace import NonRecordingSpan from .consts import ( CHANNEL, diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py index dd312c5384..269c06eea1 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py @@ -12,19 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio -from argparse import Namespace from typing import Type from unittest import TestCase, mock from aio_pika import Queue -from yarl import URL from opentelemetry.instrumentation.aio_pika.instrumented_queue import ( InstrumentedQueue, RobustInstrumentedQueue, ) from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.trace import NonRecordingSpan, Span +from opentelemetry.trace import NonRecordingSpan from .consts import ( CHANNEL, @@ -35,7 +33,6 @@ MESSAGE_ID, MESSAGING_SYSTEM, QUEUE_NAME, - ROUTING_KEY, SERVER_HOST, SERVER_PORT, ) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py index 0d3c32a8e3..0fc6fe5296 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py @@ -24,13 +24,13 @@ def test_build(self): builder.set_destination('destination') span = builder.build() self.assertTrue(isinstance(span, Span)) - + def test_no_destination(self): builder = SpanBuilder() builder.set_as_consumer() with self.assertRaises(AssertionError): builder.build() - + def test_no_kind(self): builder = SpanBuilder() builder.set_destination('destination') From 7a2e0f2dcf69f188fd3d0184a9f7f572d79a4845 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Thu, 19 May 2022 13:20:06 +0300 Subject: [PATCH 05/21] Ran tox -e generate --- instrumentation/README.md | 1 + opentelemetry-contrib-instrumentations/setup.cfg | 1 + .../src/opentelemetry/instrumentation/bootstrap_gen.py | 4 ++++ 3 files changed, 6 insertions(+) diff --git a/instrumentation/README.md b/instrumentation/README.md index 20caa144bb..cf6c8cbef2 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -1,6 +1,7 @@ | Instrumentation | Supported Packages | | --------------- | ------------------ | +| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika ~= 7.2.0 | | [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | | [opentelemetry-instrumentation-aiopg](./opentelemetry-instrumentation-aiopg) | aiopg >= 0.13.0, < 1.3.0 | | [opentelemetry-instrumentation-asgi](./opentelemetry-instrumentation-asgi) | asgiref ~= 3.0 | diff --git a/opentelemetry-contrib-instrumentations/setup.cfg b/opentelemetry-contrib-instrumentations/setup.cfg index 8189f26086..362790e860 100644 --- a/opentelemetry-contrib-instrumentations/setup.cfg +++ b/opentelemetry-contrib-instrumentations/setup.cfg @@ -28,6 +28,7 @@ packages = find_namespace: zip_safe = False include_package_data = True install_requires = + opentelemetry-instrumentation-aio-pika==0.30b1 opentelemetry-instrumentation-aiohttp-client==0.31b0 opentelemetry-instrumentation-aiopg==0.31b0 opentelemetry-instrumentation-asgi==0.31b0 diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index 6288cd879a..c905dd4ec4 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -16,6 +16,10 @@ # RUN `python scripts/generate_instrumentation_bootstrap.py` TO REGENERATE. libraries = { + "aio_pika": { + "library": "aio_pika ~= 7.2.0", + "instrumentation": "opentelemetry-instrumentation-aio-pika==0.30b1", + }, "aiohttp": { "library": "aiohttp ~= 3.0", "instrumentation": "opentelemetry-instrumentation-aiohttp-client==0.31b0", From 5f3865fb099d320ff8ed8ad9d8a2a17371fbfd9a Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Sun, 22 May 2022 13:28:47 +0300 Subject: [PATCH 06/21] Added ofek1weiss to component owners --- .github/component_owners.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/component_owners.yml b/.github/component_owners.yml index 669ce38751..2eddd7b01e 100644 --- a/.github/component_owners.yml +++ b/.github/component_owners.yml @@ -1,5 +1,8 @@ components: + instrumentation/opentelemetry-instrumentation-aio-pika: + - ofek1weiss + instrumentation/opentelemetry-instrumentation-kafka-python: - nozik From 0e8004f5087cd5937869fc281d204d6e096a67fd Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Sun, 22 May 2022 13:29:19 +0300 Subject: [PATCH 07/21] fixed readme according to cr --- .../opentelemetry-instrumentation-aio-pika/README.rst | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst b/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst index 524b880da3..b1dacdc168 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst @@ -1,4 +1,4 @@ -OpenTelemetry aio_pika Instrumentation +OpenTelemetry Aio-pika Instrumentation ================================== |pypi| @@ -6,7 +6,7 @@ OpenTelemetry aio_pika Instrumentation .. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-aio-pika.svg :target: https://pypi.org/project/opentelemetry-instrumentation-aio-pika/ -This library allows tracing requests made by the aio_pika library. +This library allows tracing requests made by the Aio-pika library. Installation ------------ @@ -18,5 +18,6 @@ Installation References ---------- -* `OpenTelemetry aio_pika/ Tracing `_ +* `OpenTelemetry Aio-pika instrumentation `_ * `OpenTelemetry Project `_ +* `OpenTelemetry Python Examples `_ From ee480d703915bf8cea8bcf5b021fac447eeed280 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Sun, 22 May 2022 13:30:02 +0300 Subject: [PATCH 08/21] Moved _DEFAULT_ATTRIBUTES to be top level --- .../opentelemetry/instrumentation/aio_pika/span_builder.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py index 62dff1ceb9..242df1ef86 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -24,13 +24,14 @@ from .version import __version__ +_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: 'rabbitmq'} + class SpanBuilder: - _DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: 'rabbitmq'} TRACER_PROVIDER = None def __init__(self): - self._attributes = self._DEFAULT_ATTRIBUTES.copy() + self._attributes = _DEFAULT_ATTRIBUTES.copy() self._operation: MessagingOperationValues = None self._kind: SpanKind = None self._destination: str = None From 3d54438fba3155aaeb0cc7a70b114948244b14f8 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Sun, 22 May 2022 13:34:33 +0300 Subject: [PATCH 09/21] Changed to use default getter --- .../aio_pika/aio_pika_getter.py | 30 ---------------- .../aio_pika/instrumented_queue.py | 6 +--- .../tests/test_aio_pika_getter.py | 36 ------------------- 3 files changed, 1 insertion(+), 71 deletions(-) delete mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_getter.py delete mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_getter.py diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_getter.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_getter.py deleted file mode 100644 index c3d0ccc4c6..0000000000 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_getter.py +++ /dev/null @@ -1,30 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import List, Optional - -from opentelemetry.propagators.textmap import CarrierT, Getter - - -class _AioPikaGetter(Getter): # type: ignore - def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]: - value = carrier.get(key, None) - if value is None: - return None - return [value] - - def keys(self, carrier: CarrierT) -> List[str]: - return [] - - -aio_pika_getter = _AioPikaGetter() diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py index 172297fa00..b42179d7d6 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py @@ -21,7 +21,6 @@ from opentelemetry.semconv.trace import MessagingOperationValues from opentelemetry.trace import Span -from .aio_pika_getter import aio_pika_getter from .span_builder import SpanBuilder @@ -42,10 +41,7 @@ def _decorate_callback( ) -> Callable[[AbstractIncomingMessage], Any]: async def decorated(message: AbstractIncomingMessage): headers = message.headers or dict() - ctx = ( - propagate.extract(headers, getter=aio_pika_getter) - or context.get_current() - ) + ctx = propagate.extract(headers) or context.get_current() token = context.attach(ctx) span = self._get_callback_span(message) if not span: diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_getter.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_getter.py deleted file mode 100644 index dd25362883..0000000000 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_getter.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from unittest import TestCase - -from opentelemetry.instrumentation.aio_pika.aio_pika_getter import ( - aio_pika_getter, -) - - -class TestAioPikaGetter(TestCase): - def test_get_none(self) -> None: - carrier = {} - value = aio_pika_getter.get(carrier, "test") - self.assertIsNone(value) - - def test_get_value(self) -> None: - key = "test" - value = "value" - carrier = {key: value} - val = aio_pika_getter.get(carrier, key) - self.assertEqual(val, [value]) - - def test_keys(self): - keys = aio_pika_getter.keys({}) - self.assertEqual(keys, []) From a30aafe1db4c0e3b3276688b7ecbee38e32f5398 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Sun, 22 May 2022 13:35:10 +0300 Subject: [PATCH 10/21] Updated version --- .../opentelemetry-instrumentation-aio-pika/setup.cfg | 2 +- .../instrumentation/aio_pika/instrumented_exchange.py | 3 ++- .../src/opentelemetry/instrumentation/aio_pika/version.py | 2 +- opentelemetry-contrib-instrumentations/setup.cfg | 2 +- .../src/opentelemetry/instrumentation/bootstrap_gen.py | 2 +- 5 files changed, 6 insertions(+), 5 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg index 6a32f4f6ac..7d1b3b1e45 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg @@ -14,7 +14,7 @@ # [metadata] name = opentelemetry-instrumentation-aio-pika -description = OpenTelemetry aio_pika instrumentation +description = OpenTelemetry Aio-pika instrumentation long_description = file: README.rst long_description_content_type = text/x-rst author = OpenTelemetry Authors diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py index e8bf26fe79..a04a2d4cf6 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py @@ -18,9 +18,10 @@ from aio_pika.abc import AbstractMessage from opentelemetry import propagate, trace -from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder from opentelemetry.trace import Span +from .span_builder import SpanBuilder + class InstrumentedExchange(Exchange): def _get_publish_span( diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py index 88015aae34..a8a9d8f83c 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.30b1" +__version__ = "0.31b1" diff --git a/opentelemetry-contrib-instrumentations/setup.cfg b/opentelemetry-contrib-instrumentations/setup.cfg index 362790e860..c641ee3079 100644 --- a/opentelemetry-contrib-instrumentations/setup.cfg +++ b/opentelemetry-contrib-instrumentations/setup.cfg @@ -28,7 +28,7 @@ packages = find_namespace: zip_safe = False include_package_data = True install_requires = - opentelemetry-instrumentation-aio-pika==0.30b1 + opentelemetry-instrumentation-aio-pika==0.31b1 opentelemetry-instrumentation-aiohttp-client==0.31b0 opentelemetry-instrumentation-aiopg==0.31b0 opentelemetry-instrumentation-asgi==0.31b0 diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index c905dd4ec4..a7890d2758 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -18,7 +18,7 @@ libraries = { "aio_pika": { "library": "aio_pika ~= 7.2.0", - "instrumentation": "opentelemetry-instrumentation-aio-pika==0.30b1", + "instrumentation": "opentelemetry-instrumentation-aio-pika==0.31b1", }, "aiohttp": { "library": "aiohttp ~= 3.0", From a30d48aacb88f52fb7a3b7d63c1580e782f2f6b0 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Tue, 31 May 2022 13:29:28 +0300 Subject: [PATCH 11/21] Updated version --- .../src/opentelemetry/instrumentation/aio_pika/version.py | 2 +- opentelemetry-contrib-instrumentations/setup.cfg | 2 +- .../src/opentelemetry/instrumentation/bootstrap_gen.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py index a8a9d8f83c..d8dc1e1ed7 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "0.31b1" +__version__ = "0.31b0" diff --git a/opentelemetry-contrib-instrumentations/setup.cfg b/opentelemetry-contrib-instrumentations/setup.cfg index c641ee3079..4570c763b1 100644 --- a/opentelemetry-contrib-instrumentations/setup.cfg +++ b/opentelemetry-contrib-instrumentations/setup.cfg @@ -28,7 +28,7 @@ packages = find_namespace: zip_safe = False include_package_data = True install_requires = - opentelemetry-instrumentation-aio-pika==0.31b1 + opentelemetry-instrumentation-aio-pika==0.31b0 opentelemetry-instrumentation-aiohttp-client==0.31b0 opentelemetry-instrumentation-aiopg==0.31b0 opentelemetry-instrumentation-asgi==0.31b0 diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index a7890d2758..85f59bbdec 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -18,7 +18,7 @@ libraries = { "aio_pika": { "library": "aio_pika ~= 7.2.0", - "instrumentation": "opentelemetry-instrumentation-aio-pika==0.31b1", + "instrumentation": "opentelemetry-instrumentation-aio-pika==0.31b0", }, "aiohttp": { "library": "aiohttp ~= 3.0", From 0d22a6435209b1f54d21dbd3fd2b68b9fdb293e3 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Tue, 31 May 2022 13:31:51 +0300 Subject: [PATCH 12/21] Moved changelog entry to unreleased --- CHANGELOG.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e5e51b360..292a8e2a16 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/open-telemetry/opentelemetry-python/compare/v1.12.0rc1-0.31b0...HEAD) +### Added +- `opentelemetry-instrumentation-aio-pika` added RabbitMQ aio-pika module instrumentation. + ([#1095](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1095)) + ## [1.12.0rc1-0.31b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.12.0rc1-0.31b0) - 2022-05-17 @@ -27,8 +31,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1065](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1065)) - `opentelemetry-instrumentation-redis` now instruments asynchronous Redis clients, if the installed redis-py includes async support (>=4.2.0). ([#1076](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1076)) -- `opentelemetry-instrumentation-aio-pika` added RabbitMQ aio-pika module instrumentation. - ([#1095](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1095)) ## [1.11.1-0.30b1](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.11.1-0.30b1) - 2022-04-21 From 43973fc940eb81894e75e550e67b9935f41bf0fb Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Tue, 31 May 2022 13:34:59 +0300 Subject: [PATCH 13/21] Changed imports to be absolute --- .../instrumentation/aio_pika/aio_pika_instrumentor.py | 8 ++++---- .../instrumentation/aio_pika/instrumented_exchange.py | 2 +- .../instrumentation/aio_pika/instrumented_queue.py | 2 +- .../instrumentation/aio_pika/span_builder.py | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py index a30e9af664..ebc6c9ac36 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py @@ -17,13 +17,13 @@ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor -from .instrumented_exchange import ( +from opentelemetry.instrumentation.aio_pika.instrumented_exchange import ( InstrumentedExchange, RobustInstrumentedExchange, ) -from .instrumented_queue import InstrumentedQueue, RobustInstrumentedQueue -from .package import _instruments -from .span_builder import SpanBuilder +from opentelemetry.instrumentation.aio_pika.instrumented_queue import InstrumentedQueue, RobustInstrumentedQueue +from opentelemetry.instrumentation.aio_pika.package import _instruments +from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder class AioPikaInstrumentor(BaseInstrumentor): diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py index a04a2d4cf6..3648f607a8 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py @@ -20,7 +20,7 @@ from opentelemetry import propagate, trace from opentelemetry.trace import Span -from .span_builder import SpanBuilder +from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder class InstrumentedExchange(Exchange): diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py index b42179d7d6..6430a4dccc 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py @@ -21,7 +21,7 @@ from opentelemetry.semconv.trace import MessagingOperationValues from opentelemetry.trace import Span -from .span_builder import SpanBuilder +from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder class InstrumentedQueue(Queue): diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py index 242df1ef86..a411c46d3c 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -22,7 +22,7 @@ ) from opentelemetry.trace import Span, SpanKind -from .version import __version__ +from opentelemetry.instrumentation.aio_pika.version import __version__ _DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: 'rabbitmq'} From 18ad3483487ce74cdc9b3ca4409563696835a4f0 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Tue, 31 May 2022 13:38:24 +0300 Subject: [PATCH 14/21] Small semantic changes --- .../instrumentation/aio_pika/instrumented_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py index 6430a4dccc..60bf9eb044 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py @@ -40,8 +40,8 @@ def _decorate_callback( self, callback: Callable[[AbstractIncomingMessage], Any] ) -> Callable[[AbstractIncomingMessage], Any]: async def decorated(message: AbstractIncomingMessage): - headers = message.headers or dict() - ctx = propagate.extract(headers) or context.get_current() + headers = message.headers or {} + ctx = propagate.extract(headers) token = context.attach(ctx) span = self._get_callback_span(message) if not span: From 5262a93591e546bc99e7c733a0aaf916e3fd4077 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Tue, 31 May 2022 13:58:04 +0300 Subject: [PATCH 15/21] fixed tracer usage --- .../aio_pika/instrumented_queue.py | 4 +++- .../instrumentation/aio_pika/span_builder.py | 18 +++++++----------- .../instrumentation/aio_pika/utils.py | 7 +++++++ 3 files changed, 17 insertions(+), 12 deletions(-) create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/utils.py diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py index 60bf9eb044..b79c322164 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py @@ -22,7 +22,7 @@ from opentelemetry.trace import Span from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder - +from opentelemetry.instrumentation.aio_pika.utils import is_instrumentation_enabled class InstrumentedQueue(Queue): def _get_callback_span( @@ -40,6 +40,8 @@ def _decorate_callback( self, callback: Callable[[AbstractIncomingMessage], Any] ) -> Callable[[AbstractIncomingMessage], Any]: async def decorated(message: AbstractIncomingMessage): + if not is_instrumentation_enabled(): + return await callback(message) headers = message.headers or {} ctx = propagate.extract(headers) token = context.attach(ctx) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py index a411c46d3c..694a45e859 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -22,8 +22,10 @@ ) from opentelemetry.trace import Span, SpanKind +from opentelemetry.instrumentation.aio_pika.utils import is_instrumentation_enabled from opentelemetry.instrumentation.aio_pika.version import __version__ +_TRACER_NAME = 'opentelemetry.instrumentation.aio-pika' _DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: 'rabbitmq'} @@ -64,20 +66,14 @@ def set_message(self, message: AbstractMessage): self._attributes[SpanAttributes.MESSAGING_CONVERSATION_ID] = properties.correlation_id def build(self) -> Optional[Span]: - if context.get_value('suppress_instrumentation') or context.get_value(context._SUPPRESS_INSTRUMENTATION_KEY): + if not is_instrumentation_enabled(): return None - assert self._kind, 'kind must be configured.' - assert self._destination, 'destination must be configured.' - tracer = trace.get_tracer(__name__, __version__, self.TRACER_PROVIDER) - span = tracer.start_span(self._generate_span_name(), kind=self._kind) - if not span.is_recording(): - return span - for attribute_name, attribute_value in self._attributes.items(): - span.set_attribute(attribute_name, attribute_value) if self._operation: - span.set_attribute(SpanAttributes.MESSAGING_OPERATION, self._operation.value) + self._attributes[SpanAttributes.MESSAGING_OPERATION] = self._operation.value else: - span.set_attribute(SpanAttributes.MESSAGING_TEMP_DESTINATION, True) + self._attributes[SpanAttributes.MESSAGING_TEMP_DESTINATION] = True + tracer = trace.get_tracer(_TRACER_NAME, __version__, self.TRACER_PROVIDER) + span = tracer.start_span(self._generate_span_name(), kind=self._kind, attributes=self._attributes) return span def _generate_span_name(self) -> str: diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/utils.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/utils.py new file mode 100644 index 0000000000..10ae328698 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/utils.py @@ -0,0 +1,7 @@ +from opentelemetry import context + + +def is_instrumentation_enabled() -> bool: + if context.get_value('suppress_instrumentation') or context.get_value(context._SUPPRESS_INSTRUMENTATION_KEY): + return False + return True From b58916391de55b0cd14253b8c18f0d0a33b4d36e Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Tue, 31 May 2022 14:16:33 +0300 Subject: [PATCH 16/21] Fixed tests --- .../aio_pika/instrumented_queue.py | 1 + .../instrumentation/aio_pika/span_builder.py | 2 +- .../tests/test_instrumented_exchange.py | 20 ++++++++++-------- .../tests/test_instrumented_queue.py | 21 +++++++++++-------- .../tests/test_span_builder.py | 12 ----------- 5 files changed, 25 insertions(+), 31 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py index b79c322164..fa0142939e 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py @@ -24,6 +24,7 @@ from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder from opentelemetry.instrumentation.aio_pika.utils import is_instrumentation_enabled + class InstrumentedQueue(Queue): def _get_callback_span( self, message: AbstractIncomingMessage diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py index 694a45e859..c0a725dd1c 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -15,7 +15,7 @@ from aio_pika.abc import AbstractChannel, AbstractMessage -from opentelemetry import context, trace +from opentelemetry import trace from opentelemetry.semconv.trace import ( MessagingOperationValues, SpanAttributes, diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py index 6fe68e6499..137dc95ae2 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py @@ -17,12 +17,12 @@ from aio_pika import Exchange +from opentelemetry.trace import SpanKind from opentelemetry.instrumentation.aio_pika.instrumented_exchange import ( InstrumentedExchange, RobustInstrumentedExchange, ) from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.trace import NonRecordingSpan from .consts import ( CHANNEL, @@ -55,15 +55,17 @@ def setUp(self): def test_get_publish_span(self): exchange = InstrumentedExchange(CONNECTION, CHANNEL, EXCHANGE_NAME) - with mock.patch.object( - NonRecordingSpan, "is_recording", return_value=True + tracer = mock.MagicMock() + with mock.patch( + 'opentelemetry.trace.get_tracer', + return_value=tracer ): - with mock.patch.object( - NonRecordingSpan, "set_attribute" - ) as mock_set_attrubute: - exchange._get_publish_span(MESSAGE, ROUTING_KEY) - for name, value in self.EXPECTED_ATTRIBUTES.items(): - mock_set_attrubute.assert_any_call(name, value) + exchange._get_publish_span(MESSAGE, ROUTING_KEY) + tracer.start_span.assert_called_once_with( + f'{EXCHANGE_NAME},{ROUTING_KEY} send', + kind=SpanKind.PRODUCER, + attributes=self.EXPECTED_ATTRIBUTES + ) def _test_publish(self, exchange_type: Type[InstrumentedExchange]): exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py index 269c06eea1..26eaaf7c75 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py @@ -17,12 +17,12 @@ from aio_pika import Queue +from opentelemetry.trace import SpanKind from opentelemetry.instrumentation.aio_pika.instrumented_queue import ( InstrumentedQueue, RobustInstrumentedQueue, ) from opentelemetry.semconv.trace import SpanAttributes -from opentelemetry.trace import NonRecordingSpan from .consts import ( CHANNEL, @@ -46,6 +46,7 @@ class TestInstrumentedQueue(TestCase): SpanAttributes.NET_PEER_PORT: SERVER_PORT, SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, + SpanAttributes.MESSAGING_OPERATION: 'receive' } def setUp(self): @@ -56,15 +57,17 @@ def test_get_callback_span(self): queue = InstrumentedQueue( CHANNEL, QUEUE_NAME, False, False, False, None ) - with mock.patch.object( - NonRecordingSpan, "is_recording", return_value=True + tracer = mock.MagicMock() + with mock.patch( + 'opentelemetry.trace.get_tracer', + return_value=tracer ): - with mock.patch.object( - NonRecordingSpan, "set_attribute" - ) as mock_set_attrubute: - queue._get_callback_span(MESSAGE) - for name, value in self.EXPECTED_ATTRIBUTES.items(): - mock_set_attrubute.assert_any_call(name, value) + queue._get_callback_span(MESSAGE) + tracer.start_span.assert_called_once_with( + f'{EXCHANGE_NAME} receive', + kind=SpanKind.CONSUMER, + attributes=self.EXPECTED_ATTRIBUTES + ) def test_decorate_callback(self): queue = InstrumentedQueue( diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py index 0fc6fe5296..b55988f06b 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py @@ -24,15 +24,3 @@ def test_build(self): builder.set_destination('destination') span = builder.build() self.assertTrue(isinstance(span, Span)) - - def test_no_destination(self): - builder = SpanBuilder() - builder.set_as_consumer() - with self.assertRaises(AssertionError): - builder.build() - - def test_no_kind(self): - builder = SpanBuilder() - builder.set_destination('destination') - with self.assertRaises(AssertionError): - builder.build() From 68deeacdcaa2c90728308e0a233eebb60b6f7ec5 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Thu, 2 Jun 2022 17:06:30 +0300 Subject: [PATCH 17/21] Change to work using wrapt --- .../aio_pika/aio_pika_instrumentor.py | 73 ++++++++++++++----- ...umented_queue.py => callback_decorator.py} | 72 ++++-------------- .../aio_pika/instrumented_exchange.py | 53 -------------- .../aio_pika/publish_decorator.py | 53 ++++++++++++++ .../instrumentation/aio_pika/span_builder.py | 18 ++--- .../instrumentation/aio_pika/utils.py | 4 +- .../tests/test_aio_pika_instrumentation.py | 32 ++------ ...ed_queue.py => test_callback_decorator.py} | 56 ++++---------- ..._exchange.py => test_publish_decorator.py} | 37 +++++----- .../tests/test_span_builder.py | 4 +- 10 files changed, 172 insertions(+), 230 deletions(-) rename instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/{instrumented_queue.py => callback_decorator.py} (50%) delete mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py create mode 100644 instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py rename instrumentation/opentelemetry-instrumentation-aio-pika/tests/{test_instrumented_queue.py => test_callback_decorator.py} (53%) rename instrumentation/opentelemetry-instrumentation-aio-pika/tests/{test_instrumented_exchange.py => test_publish_decorator.py} (71%) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py index ebc6c9ac36..cb54387386 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py @@ -11,36 +11,71 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Collection +from typing import Any, Callable, Collection -import aio_pika +import wrapt +from aio_pika import Exchange, Queue +from aio_pika.abc import AbstractIncomingMessage -from opentelemetry.instrumentation.instrumentor import BaseInstrumentor - -from opentelemetry.instrumentation.aio_pika.instrumented_exchange import ( - InstrumentedExchange, - RobustInstrumentedExchange, +from opentelemetry import trace +from opentelemetry.instrumentation.aio_pika.callback_decorator import ( + CallbackDecorator, ) -from opentelemetry.instrumentation.aio_pika.instrumented_queue import InstrumentedQueue, RobustInstrumentedQueue from opentelemetry.instrumentation.aio_pika.package import _instruments -from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder +from opentelemetry.instrumentation.aio_pika.publish_decorator import ( + PublishDecorator, +) +from opentelemetry.instrumentation.aio_pika.version import __version__ +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.trace import Tracer + +_INSTRUMENTATION_MODULE_NAME = "opentelemetry.instrumentation.aio_pika" class AioPikaInstrumentor(BaseInstrumentor): + def _instrument_queue(self, tracer: Tracer): + async def wrapper(wrapped, instance, args, kwargs): + async def consume( + callback: Callable[[AbstractIncomingMessage], Any], + *fargs, + **fkwargs + ): + decorated_callback = CallbackDecorator( + tracer, instance + ).decorate(callback) + return await wrapped(decorated_callback, *fargs, **fkwargs) + + return await consume(*args, **kwargs) + + wrapt.wrap_function_wrapper(Queue, "consume", wrapper) + + def _instrument_exchange(self, tracer: Tracer): + async def wrapper(wrapped, instance, args, kwargs): + decorated_publish = PublishDecorator(tracer, instance).decorate( + wrapped + ) + return await decorated_publish(*args, **kwargs) + + wrapt.wrap_function_wrapper(Exchange, "publish", wrapper) + def _instrument(self, **kwargs): tracer_provider = kwargs.get("tracer_provider", None) - SpanBuilder.TRACER_PROVIDER = tracer_provider - aio_pika.Channel.EXCHANGE_CLASS = InstrumentedExchange - aio_pika.Channel.QUEUE_CLASS = InstrumentedQueue - aio_pika.RobustChannel.EXCHANGE_CLASS = RobustInstrumentedExchange - aio_pika.RobustChannel.QUEUE_CLASS = RobustInstrumentedQueue + tracer = trace.get_tracer( + _INSTRUMENTATION_MODULE_NAME, __version__, tracer_provider + ) + self._instrument_queue(tracer) + self._instrument_exchange(tracer) + + def _uninstrument_queue(self): + unwrap(Queue, "consume") + + def _uninstrument_exchange(self): + unwrap(Exchange, "publish") def _uninstrument(self, **kwargs): - SpanBuilder.TRACER_PROVIDER = None - aio_pika.Channel.EXCHANGE_CLASS = aio_pika.Exchange - aio_pika.Channel.QUEUE_CLASS = aio_pika.Queue - aio_pika.RobustChannel.EXCHANGE_CLASS = aio_pika.RobustExchange - aio_pika.RobustChannel.QUEUE_CLASS = aio_pika.RobustQueue + self._uninstrument_queue() + self._uninstrument_exchange() def instrumentation_dependencies(self) -> Collection[str]: return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/callback_decorator.py similarity index 50% rename from instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py rename to instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/callback_decorator.py index fa0142939e..a2169b6d18 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_queue.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/callback_decorator.py @@ -13,31 +13,33 @@ # limitations under the License. from typing import Any, Callable, Optional -from aio_pika import Queue, RobustQueue +from aio_pika import Queue from aio_pika.abc import AbstractIncomingMessage -from aio_pika.queue import ConsumerTag from opentelemetry import context, propagate, trace +from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder +from opentelemetry.instrumentation.aio_pika.utils import ( + is_instrumentation_enabled, +) from opentelemetry.semconv.trace import MessagingOperationValues -from opentelemetry.trace import Span +from opentelemetry.trace import Span, Tracer -from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder -from opentelemetry.instrumentation.aio_pika.utils import is_instrumentation_enabled +class CallbackDecorator: + def __init__(self, tracer: Tracer, queue: Queue): + self._tracer = tracer + self._queue = queue -class InstrumentedQueue(Queue): - def _get_callback_span( - self, message: AbstractIncomingMessage - ) -> Optional[Span]: - builder = SpanBuilder() + def _get_span(self, message: AbstractIncomingMessage) -> Optional[Span]: + builder = SpanBuilder(self._tracer) builder.set_as_consumer() builder.set_operation(MessagingOperationValues.RECEIVE) builder.set_destination(message.exchange or message.routing_key) - builder.set_channel(self.channel) + builder.set_channel(self._queue.channel) builder.set_message(message) return builder.build() - def _decorate_callback( + def decorate( self, callback: Callable[[AbstractIncomingMessage], Any] ) -> Callable[[AbstractIncomingMessage], Any]: async def decorated(message: AbstractIncomingMessage): @@ -46,7 +48,7 @@ async def decorated(message: AbstractIncomingMessage): headers = message.headers or {} ctx = propagate.extract(headers) token = context.attach(ctx) - span = self._get_callback_span(message) + span = self._get_span(message) if not span: return await callback(message) try: @@ -57,47 +59,3 @@ async def decorated(message: AbstractIncomingMessage): return return_value return decorated - - async def consume( - self, - callback: Callable[[AbstractIncomingMessage], Any], - no_ack: bool = False, - exclusive: bool = False, - arguments: dict = None, - consumer_tag=None, - timeout=None, - ) -> ConsumerTag: - decorated_callback = self._decorate_callback(callback) - return await super().consume( - decorated_callback, - no_ack, - exclusive, - arguments, - consumer_tag, - timeout, - ) - - -class RobustInstrumentedQueue(RobustQueue, InstrumentedQueue): - async def consume( - self, - callback: Callable[[AbstractIncomingMessage], Any], - no_ack: bool = False, - exclusive: bool = False, - arguments: dict = None, - consumer_tag=None, - timeout=None, - robust: bool = True, - ) -> ConsumerTag: - await self.connection.connected.wait() - consumer_tag = await InstrumentedQueue.consume( - self, callback, no_ack, exclusive, arguments, consumer_tag, timeout - ) - if robust: - self._consumers[consumer_tag] = dict( - callback=callback, - no_ack=no_ack, - exclusive=exclusive, - arguments=arguments, - ) - return consumer_tag diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py deleted file mode 100644 index 3648f607a8..0000000000 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/instrumented_exchange.py +++ /dev/null @@ -1,53 +0,0 @@ -# Copyright The OpenTelemetry Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Optional - -import aiormq -from aio_pika import Exchange, RobustExchange -from aio_pika.abc import AbstractMessage - -from opentelemetry import propagate, trace -from opentelemetry.trace import Span - -from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder - - -class InstrumentedExchange(Exchange): - def _get_publish_span( - self, message: AbstractMessage, routing_key: str - ) -> Optional[Span]: - builder = SpanBuilder() - builder.set_as_producer() - builder.set_destination(f"{self.name},{routing_key}") - builder.set_channel(self.channel) - builder.set_message(message) - return builder.build() - - async def publish( - self, message: AbstractMessage, routing_key: str, **kwargs - ) -> Optional[aiormq.abc.ConfirmationFrameType]: - span = self._get_publish_span(message, routing_key) - if not span: - return await super().publish(message, routing_key, **kwargs) - with trace.use_span(span, end_on_exit=True): - if span.is_recording(): - propagate.inject(message.properties.headers) - return_value = await super().publish( - message, routing_key, **kwargs - ) - return return_value - - -class RobustInstrumentedExchange(RobustExchange, InstrumentedExchange): - pass diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py new file mode 100644 index 0000000000..cae834a031 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py @@ -0,0 +1,53 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Callable, Optional + +import aiormq +from aio_pika import Exchange +from aio_pika.abc import AbstractMessage + +from opentelemetry import propagate, trace +from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder +from opentelemetry.trace import Span, Tracer + + +class PublishDecorator: + def __init__(self, tracer: Tracer, exchange: Exchange): + self._tracer = tracer + self._exchange = exchange + + def _get_publish_span( + self, message: AbstractMessage, routing_key: str + ) -> Optional[Span]: + builder = SpanBuilder(self._tracer) + builder.set_as_producer() + builder.set_destination(f"{self._exchange.name},{routing_key}") + builder.set_channel(self._exchange.channel) + builder.set_message(message) + return builder.build() + + def decorate(self, publish: Callable) -> Callable: + async def decorated_publish( + message: AbstractMessage, routing_key: str, **kwargs + ) -> Optional[aiormq.abc.ConfirmationFrameType]: + span = self._get_publish_span(message, routing_key) + if not span: + return await publish(message, routing_key, **kwargs) + with trace.use_span(span, end_on_exit=True): + if span.is_recording(): + propagate.inject(message.properties.headers) + return_value = await publish(message, routing_key, **kwargs) + return return_value + + return decorated_publish diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py index c0a725dd1c..a61209e0ce 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -15,24 +15,21 @@ from aio_pika.abc import AbstractChannel, AbstractMessage -from opentelemetry import trace +from opentelemetry.instrumentation.aio_pika.utils import ( + is_instrumentation_enabled, +) from opentelemetry.semconv.trace import ( MessagingOperationValues, SpanAttributes, ) -from opentelemetry.trace import Span, SpanKind - -from opentelemetry.instrumentation.aio_pika.utils import is_instrumentation_enabled -from opentelemetry.instrumentation.aio_pika.version import __version__ +from opentelemetry.trace import Span, SpanKind, Tracer -_TRACER_NAME = 'opentelemetry.instrumentation.aio-pika' _DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: 'rabbitmq'} class SpanBuilder: - TRACER_PROVIDER = None - - def __init__(self): + def __init__(self, tracer: Tracer): + self._tracer = tracer self._attributes = _DEFAULT_ATTRIBUTES.copy() self._operation: MessagingOperationValues = None self._kind: SpanKind = None @@ -72,8 +69,7 @@ def build(self) -> Optional[Span]: self._attributes[SpanAttributes.MESSAGING_OPERATION] = self._operation.value else: self._attributes[SpanAttributes.MESSAGING_TEMP_DESTINATION] = True - tracer = trace.get_tracer(_TRACER_NAME, __version__, self.TRACER_PROVIDER) - span = tracer.start_span(self._generate_span_name(), kind=self._kind, attributes=self._attributes) + span = self._tracer.start_span(self._generate_span_name(), kind=self._kind, attributes=self._attributes) return span def _generate_span_name(self) -> str: diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/utils.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/utils.py index 10ae328698..fb94ddf468 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/utils.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/utils.py @@ -2,6 +2,8 @@ def is_instrumentation_enabled() -> bool: - if context.get_value('suppress_instrumentation') or context.get_value(context._SUPPRESS_INSTRUMENTATION_KEY): + if context.get_value("suppress_instrumentation") or context.get_value( + context._SUPPRESS_INSTRUMENTATION_KEY + ): return False return True diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py index 26bca5c21d..c2dc6e5144 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py @@ -13,42 +13,22 @@ # limitations under the License. from unittest import TestCase -import aio_pika +import wrapt +from aio_pika import Exchange, Queue from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor -from opentelemetry.instrumentation.aio_pika.instrumented_exchange import ( - InstrumentedExchange, - RobustInstrumentedExchange, -) -from opentelemetry.instrumentation.aio_pika.instrumented_queue import ( - InstrumentedQueue, - RobustInstrumentedQueue, -) class TestPika(TestCase): def test_instrument_api(self) -> None: instrumentation = AioPikaInstrumentor() instrumentation.instrument() + self.assertTrue(isinstance(Queue.consume, wrapt.BoundFunctionWrapper)) self.assertTrue( - aio_pika.Channel.EXCHANGE_CLASS == InstrumentedExchange + isinstance(Exchange.publish, wrapt.BoundFunctionWrapper) ) - self.assertTrue(aio_pika.Channel.QUEUE_CLASS == InstrumentedQueue) - self.assertTrue( - aio_pika.RobustChannel.EXCHANGE_CLASS == RobustInstrumentedExchange - ) - self.assertTrue( - aio_pika.RobustChannel.QUEUE_CLASS == RobustInstrumentedQueue - ) - instrumentation.uninstrument() + self.assertFalse(isinstance(Queue.consume, wrapt.BoundFunctionWrapper)) self.assertFalse( - aio_pika.Channel.EXCHANGE_CLASS == InstrumentedExchange - ) - self.assertFalse(aio_pika.Channel.QUEUE_CLASS == InstrumentedQueue) - self.assertFalse( - aio_pika.RobustChannel.EXCHANGE_CLASS == RobustInstrumentedExchange - ) - self.assertFalse( - aio_pika.RobustChannel.QUEUE_CLASS == RobustInstrumentedQueue + isinstance(Exchange.publish, wrapt.BoundFunctionWrapper) ) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py similarity index 53% rename from instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py rename to instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py index 26eaaf7c75..70883c116c 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_queue.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py @@ -12,21 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. import asyncio -from typing import Type from unittest import TestCase, mock from aio_pika import Queue -from opentelemetry.trace import SpanKind -from opentelemetry.instrumentation.aio_pika.instrumented_queue import ( - InstrumentedQueue, - RobustInstrumentedQueue, +from opentelemetry.instrumentation.aio_pika.callback_decorator import ( + CallbackDecorator, ) from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind, get_tracer from .consts import ( CHANNEL, - CONNECTION, CORRELATION_ID, EXCHANGE_NAME, MESSAGE, @@ -46,59 +43,32 @@ class TestInstrumentedQueue(TestCase): SpanAttributes.NET_PEER_PORT: SERVER_PORT, SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, - SpanAttributes.MESSAGING_OPERATION: 'receive' + SpanAttributes.MESSAGING_OPERATION: "receive", } def setUp(self): + self.tracer = get_tracer(__name__) self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) def test_get_callback_span(self): - queue = InstrumentedQueue( - CHANNEL, QUEUE_NAME, False, False, False, None - ) + queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None) tracer = mock.MagicMock() - with mock.patch( - 'opentelemetry.trace.get_tracer', - return_value=tracer - ): - queue._get_callback_span(MESSAGE) + CallbackDecorator(tracer, queue)._get_span(MESSAGE) tracer.start_span.assert_called_once_with( - f'{EXCHANGE_NAME} receive', + f"{EXCHANGE_NAME} receive", kind=SpanKind.CONSUMER, - attributes=self.EXPECTED_ATTRIBUTES + attributes=self.EXPECTED_ATTRIBUTES, ) def test_decorate_callback(self): - queue = InstrumentedQueue( - CHANNEL, QUEUE_NAME, False, False, False, None - ) + queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None) callback = mock.MagicMock(return_value=asyncio.sleep(0)) with mock.patch.object( - InstrumentedQueue, "_get_callback_span" + CallbackDecorator, "_get_span" ) as mocked_get_callback_span: - decorated_callback = queue._decorate_callback(callback) + callback_decorator = CallbackDecorator(self.tracer, queue) + decorated_callback = callback_decorator.decorate(callback) self.loop.run_until_complete(decorated_callback(MESSAGE)) mocked_get_callback_span.assert_called_once() callback.assert_called_once_with(MESSAGE) - - def _test_consume(self, queue_type: Type[InstrumentedQueue]): - queue = queue_type(CHANNEL, QUEUE_NAME, False, False, False, None) - callback = mock.MagicMock() - CONNECTION.connected = asyncio.Event() - CONNECTION.connected.set() - with mock.patch.object( - InstrumentedQueue, "_decorate_callback" - ) as mocked_decorate_callback: - with mock.patch.object( - Queue, "consume", return_value=asyncio.sleep(0) - ) as mocked_consume: - self.loop.run_until_complete(queue.consume(callback)) - mocked_decorate_callback.assert_called_once_with(callback) - mocked_consume.assert_called_once() - - def test_consume(self): - self._test_consume(InstrumentedQueue) - - def test_robust_consume(self): - self._test_consume(RobustInstrumentedQueue) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py similarity index 71% rename from instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py rename to instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py index 137dc95ae2..80dfa3182b 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_instrumented_exchange.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py @@ -15,14 +15,13 @@ from typing import Type from unittest import TestCase, mock -from aio_pika import Exchange +from aio_pika import Exchange, RobustExchange -from opentelemetry.trace import SpanKind -from opentelemetry.instrumentation.aio_pika.instrumented_exchange import ( - InstrumentedExchange, - RobustInstrumentedExchange, +from opentelemetry.instrumentation.aio_pika.publish_decorator import ( + PublishDecorator, ) from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind, get_tracer from .consts import ( CHANNEL, @@ -50,39 +49,41 @@ class TestInstrumentedExchange(TestCase): } def setUp(self): + self.tracer = get_tracer(__name__) self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) def test_get_publish_span(self): - exchange = InstrumentedExchange(CONNECTION, CHANNEL, EXCHANGE_NAME) + exchange = Exchange(CONNECTION, CHANNEL, EXCHANGE_NAME) tracer = mock.MagicMock() - with mock.patch( - 'opentelemetry.trace.get_tracer', - return_value=tracer - ): - exchange._get_publish_span(MESSAGE, ROUTING_KEY) + PublishDecorator(tracer, exchange)._get_publish_span( + MESSAGE, ROUTING_KEY + ) tracer.start_span.assert_called_once_with( - f'{EXCHANGE_NAME},{ROUTING_KEY} send', + f"{EXCHANGE_NAME},{ROUTING_KEY} send", kind=SpanKind.PRODUCER, - attributes=self.EXPECTED_ATTRIBUTES + attributes=self.EXPECTED_ATTRIBUTES, ) - def _test_publish(self, exchange_type: Type[InstrumentedExchange]): + def _test_publish(self, exchange_type: Type[Exchange]): exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME) with mock.patch.object( - InstrumentedExchange, "_get_publish_span" + PublishDecorator, "_get_publish_span" ) as mock_get_publish_span: with mock.patch.object( Exchange, "publish", return_value=asyncio.sleep(0) ) as mock_publish: + decorated_publish = PublishDecorator( + self.tracer, exchange + ).decorate(mock_publish) self.loop.run_until_complete( - exchange.publish(MESSAGE, ROUTING_KEY) + decorated_publish(MESSAGE, ROUTING_KEY) ) mock_publish.assert_called_once() mock_get_publish_span.assert_called_once() def test_publish(self): - self._test_publish(InstrumentedExchange) + self._test_publish(Exchange) def test_robust_publish(self): - self._test_publish(RobustInstrumentedExchange) + self._test_publish(RobustExchange) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py index b55988f06b..5f87d53846 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py @@ -14,12 +14,12 @@ from unittest import TestCase from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder -from opentelemetry.trace import Span +from opentelemetry.trace import Span, get_tracer class TestBuilder(TestCase): def test_build(self): - builder = SpanBuilder() + builder = SpanBuilder(get_tracer(__name__)) builder.set_as_consumer() builder.set_destination('destination') span = builder.build() From 1f765b8c4c880eb62add078c33d9b7156bf2a1ed Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Fri, 24 Jun 2022 11:27:05 +0300 Subject: [PATCH 18/21] Fixed linting --- .../aio_pika/aio_pika_instrumentor.py | 12 ++++++++---- .../tests/consts.py | 4 ++-- tox.ini | 1 + 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py index cb54387386..99420d0892 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py @@ -34,7 +34,8 @@ class AioPikaInstrumentor(BaseInstrumentor): - def _instrument_queue(self, tracer: Tracer): + @staticmethod + def _instrument_queue(tracer: Tracer): async def wrapper(wrapped, instance, args, kwargs): async def consume( callback: Callable[[AbstractIncomingMessage], Any], @@ -50,7 +51,8 @@ async def consume( wrapt.wrap_function_wrapper(Queue, "consume", wrapper) - def _instrument_exchange(self, tracer: Tracer): + @staticmethod + def _instrument_exchange(tracer: Tracer): async def wrapper(wrapped, instance, args, kwargs): decorated_publish = PublishDecorator(tracer, instance).decorate( wrapped @@ -67,10 +69,12 @@ def _instrument(self, **kwargs): self._instrument_queue(tracer) self._instrument_exchange(tracer) - def _uninstrument_queue(self): + @staticmethod + def _uninstrument_queue(): unwrap(Queue, "consume") - def _uninstrument_exchange(self): + @staticmethod + def _uninstrument_exchange(): unwrap(Exchange, "publish") def _uninstrument(self, **kwargs): diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py index 05c82001ac..ada7080192 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py @@ -19,8 +19,8 @@ CHANNEL = Namespace(connection=CONNECTION, loop=None) MESSAGE = Namespace( properties=Namespace( - message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers=dict() + message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers={} ), exchange=EXCHANGE_NAME, - headers=dict(), + headers={}, ) diff --git a/tox.ini b/tox.ini index 90988e207c..f7a106442d 100644 --- a/tox.ini +++ b/tox.ini @@ -454,6 +454,7 @@ commands_pre = python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-sqlalchemy[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-celery[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test] + python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-aio-pika[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-sklearn[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-redis[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-remoulade[test] From 6fb045212f95267d6b084c782b12f46a1c163833 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Mon, 27 Jun 2022 18:41:37 +0300 Subject: [PATCH 19/21] Updated core repo sha --- .github/workflows/test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e1ee67d9eb..f808e9257f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,7 +6,7 @@ on: - 'release/*' pull_request: env: - CORE_REPO_SHA: d4d7c67663cc22615748d632e1c8c5799e8eacae + CORE_REPO_SHA: 25771ecdac685a5bf7ada1da21092d2061dbfc02 jobs: build: From fb9f15abf6213273b7b5c07aa6c1c5af1a8de3b3 Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Fri, 1 Jul 2022 17:55:40 +0300 Subject: [PATCH 20/21] Changed opentelemetry-test-utils requirement to be the newest --- .../opentelemetry-instrumentation-aio-pika/setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg index 7d1b3b1e45..f861eaaaef 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg @@ -48,7 +48,7 @@ install_requires = test = pytest wrapt >= 1.0.0, < 2.0.0 - opentelemetry-test-utils == 0.30b1 + opentelemetry-test-utils == 0.31b0 [options.packages.find] where = src From 9bc4f2311922152f95ba4ff9cdd03126c78a15dd Mon Sep 17 00:00:00 2001 From: Ofek Weiss Date: Sat, 2 Jul 2022 11:33:26 +0300 Subject: [PATCH 21/21] Fixed readme format --- .../opentelemetry-instrumentation-aio-pika/README.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst b/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst index b1dacdc168..aa0f1a3f5c 100644 --- a/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst @@ -1,5 +1,5 @@ OpenTelemetry Aio-pika Instrumentation -================================== +====================================== |pypi|