diff --git a/.github/component_owners.yml b/.github/component_owners.yml index 81a0f9da79..083219c353 100644 --- a/.github/component_owners.yml +++ b/.github/component_owners.yml @@ -1,5 +1,8 @@ components: + instrumentation/opentelemetry-instrumentation-aio-pika: + - ofek1weiss + instrumentation/opentelemetry-instrumentation-boto3sqs: - oxeye-nikolay - nikosokolik diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e1ee67d9eb..f808e9257f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -6,7 +6,7 @@ on: - 'release/*' pull_request: env: - CORE_REPO_SHA: d4d7c67663cc22615748d632e1c8c5799e8eacae + CORE_REPO_SHA: 25771ecdac685a5bf7ada1da21092d2061dbfc02 jobs: build: diff --git a/CHANGELOG.md b/CHANGELOG.md index c872762280..d72cedcae0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1127](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1127)) - Add metric instrumentation for WSGI ([#1128](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1128)) +- `opentelemetry-instrumentation-aio-pika` added RabbitMQ aio-pika module instrumentation. + ([#1095](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1095)) - `opentelemetry-instrumentation-requests` Restoring metrics in requests ([#1110](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1110)) - Integrated sqlcommenter plugin into opentelemetry-instrumentation-django diff --git a/instrumentation/README.md b/instrumentation/README.md index e51c9d42a1..71d79ea258 100644 --- a/instrumentation/README.md +++ b/instrumentation/README.md @@ -1,6 +1,7 @@ | Instrumentation | Supported Packages | Metrics support | | --------------- | ------------------ | --------------- | +| [opentelemetry-instrumentation-aio-pika](./opentelemetry-instrumentation-aio-pika) | aio_pika ~= 7.2.0 | No | [opentelemetry-instrumentation-aiohttp-client](./opentelemetry-instrumentation-aiohttp-client) | aiohttp ~= 3.0 | No | [opentelemetry-instrumentation-aiopg](./opentelemetry-instrumentation-aiopg) | aiopg >= 0.13.0, < 1.3.0 | No | [opentelemetry-instrumentation-asgi](./opentelemetry-instrumentation-asgi) | asgiref ~= 3.0 | No diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst b/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst new file mode 100644 index 0000000000..aa0f1a3f5c --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/README.rst @@ -0,0 +1,23 @@ +OpenTelemetry Aio-pika Instrumentation +====================================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-aio-pika.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-aio-pika/ + +This library allows tracing requests made by the Aio-pika library. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-aio-pika + +References +---------- + +* `OpenTelemetry Aio-pika instrumentation `_ +* `OpenTelemetry Project `_ +* `OpenTelemetry Python Examples `_ diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg new file mode 100644 index 0000000000..f861eaaaef --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg @@ -0,0 +1,58 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +[metadata] +name = opentelemetry-instrumentation-aio-pika +description = OpenTelemetry Aio-pika instrumentation +long_description = file: README.rst +long_description_content_type = text/x-rst +author = OpenTelemetry Authors +author_email = cncf-opentelemetry-contributors@lists.cncf.io +url = https://github.com/open-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-aio-pika +platforms = any +license = Apache-2.0 +classifiers = + Development Status :: 4 - Beta + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Programming Language :: Python + Programming Language :: Python :: 3 + Programming Language :: Python :: 3.6 + Programming Language :: Python :: 3.7 + Programming Language :: Python :: 3.8 + Programming Language :: Python :: 3.9 + Programming Language :: Python :: 3.10 + +[options] +python_requires = >=3.6 +package_dir= + =src +packages=find_namespace: + +install_requires = + opentelemetry-api ~= 1.5 + wrapt >= 1.0.0, < 2.0.0 + +[options.extras_require] +test = + pytest + wrapt >= 1.0.0, < 2.0.0 + opentelemetry-test-utils == 0.31b0 + +[options.packages.find] +where = src + +[options.entry_points] +opentelemetry_instrumentor = + aio-pika = opentelemetry.instrumentation.aio_pika:AioPikaInstrumentor diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/setup.py b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.py new file mode 100644 index 0000000000..bd74b8f7bc --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/setup.py @@ -0,0 +1,99 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# DO NOT EDIT. THIS FILE WAS AUTOGENERATED FROM templates/instrumentation_setup.py.txt. +# RUN `python scripts/generate_setup.py` TO REGENERATE. + + +import distutils.cmd +import json +import os +from configparser import ConfigParser + +import setuptools + +config = ConfigParser() +config.read("setup.cfg") + +# We provide extras_require parameter to setuptools.setup later which +# overwrites the extras_require section from setup.cfg. To support extras_require +# section in setup.cfg, we load it here and merge it with the extras_require param. +extras_require = {} +if "options.extras_require" in config: + for key, value in config["options.extras_require"].items(): + extras_require[key] = [v for v in value.split("\n") if v.strip()] + +BASE_DIR = os.path.dirname(__file__) +PACKAGE_INFO = {} + +VERSION_FILENAME = os.path.join( + BASE_DIR, + "src", + "opentelemetry", + "instrumentation", + "aio_pika", + "version.py", +) +with open(VERSION_FILENAME, encoding="utf-8") as f: + exec(f.read(), PACKAGE_INFO) + +PACKAGE_FILENAME = os.path.join( + BASE_DIR, + "src", + "opentelemetry", + "instrumentation", + "aio_pika", + "package.py", +) +with open(PACKAGE_FILENAME, encoding="utf-8") as f: + exec(f.read(), PACKAGE_INFO) + +# Mark any instruments/runtime dependencies as test dependencies as well. +extras_require["instruments"] = PACKAGE_INFO["_instruments"] +test_deps = extras_require.get("test", []) +for dep in extras_require["instruments"]: + test_deps.append(dep) + +extras_require["test"] = test_deps + + +class JSONMetadataCommand(distutils.cmd.Command): + + description = ( + "print out package metadata as JSON. This is used by OpenTelemetry dev scripts to ", + "auto-generate code in other places", + ) + user_options = [] + + def initialize_options(self): + pass + + def finalize_options(self): + pass + + def run(self): + metadata = { + "name": config["metadata"]["name"], + "version": PACKAGE_INFO["__version__"], + "instruments": PACKAGE_INFO["_instruments"], + } + print(json.dumps(metadata)) + + +setuptools.setup( + cmdclass={"meta": JSONMetadataCommand}, + version=PACKAGE_INFO["__version__"], + extras_require=extras_require, +) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/__init__.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/__init__.py new file mode 100644 index 0000000000..6e78db08c9 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/__init__.py @@ -0,0 +1,60 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Instrument `aio_pika` to trace RabbitMQ applications. + +Usage +----- + +* Start broker backend + +.. code-block:: python + + docker run -p 5672:5672 rabbitmq + +* Run instrumented task + +.. code-block:: python + + import asyncio + + from aio_pika import Message, connect + from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor + + AioPikaInstrumentor().instrument() + + + async def main() -> None: + connection = await connect("amqp://guest:guest@localhost/") + async with connection: + channel = await connection.channel() + queue = await channel.declare_queue("hello") + await channel.default_exchange.publish( + Message(b"Hello World!"), + routing_key=queue.name, + ) + + + if __name__ == "__main__": + asyncio.run(main()) + +API +--- +""" +# pylint: disable=import-error + +from .aio_pika_instrumentor import AioPikaInstrumentor +from .version import __version__ + +__all__ = ["AioPikaInstrumentor", "__version__"] diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py new file mode 100644 index 0000000000..99420d0892 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/aio_pika_instrumentor.py @@ -0,0 +1,85 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Any, Callable, Collection + +import wrapt +from aio_pika import Exchange, Queue +from aio_pika.abc import AbstractIncomingMessage + +from opentelemetry import trace +from opentelemetry.instrumentation.aio_pika.callback_decorator import ( + CallbackDecorator, +) +from opentelemetry.instrumentation.aio_pika.package import _instruments +from opentelemetry.instrumentation.aio_pika.publish_decorator import ( + PublishDecorator, +) +from opentelemetry.instrumentation.aio_pika.version import __version__ +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.trace import Tracer + +_INSTRUMENTATION_MODULE_NAME = "opentelemetry.instrumentation.aio_pika" + + +class AioPikaInstrumentor(BaseInstrumentor): + @staticmethod + def _instrument_queue(tracer: Tracer): + async def wrapper(wrapped, instance, args, kwargs): + async def consume( + callback: Callable[[AbstractIncomingMessage], Any], + *fargs, + **fkwargs + ): + decorated_callback = CallbackDecorator( + tracer, instance + ).decorate(callback) + return await wrapped(decorated_callback, *fargs, **fkwargs) + + return await consume(*args, **kwargs) + + wrapt.wrap_function_wrapper(Queue, "consume", wrapper) + + @staticmethod + def _instrument_exchange(tracer: Tracer): + async def wrapper(wrapped, instance, args, kwargs): + decorated_publish = PublishDecorator(tracer, instance).decorate( + wrapped + ) + return await decorated_publish(*args, **kwargs) + + wrapt.wrap_function_wrapper(Exchange, "publish", wrapper) + + def _instrument(self, **kwargs): + tracer_provider = kwargs.get("tracer_provider", None) + tracer = trace.get_tracer( + _INSTRUMENTATION_MODULE_NAME, __version__, tracer_provider + ) + self._instrument_queue(tracer) + self._instrument_exchange(tracer) + + @staticmethod + def _uninstrument_queue(): + unwrap(Queue, "consume") + + @staticmethod + def _uninstrument_exchange(): + unwrap(Exchange, "publish") + + def _uninstrument(self, **kwargs): + self._uninstrument_queue() + self._uninstrument_exchange() + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/callback_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/callback_decorator.py new file mode 100644 index 0000000000..a2169b6d18 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/callback_decorator.py @@ -0,0 +1,61 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Any, Callable, Optional + +from aio_pika import Queue +from aio_pika.abc import AbstractIncomingMessage + +from opentelemetry import context, propagate, trace +from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder +from opentelemetry.instrumentation.aio_pika.utils import ( + is_instrumentation_enabled, +) +from opentelemetry.semconv.trace import MessagingOperationValues +from opentelemetry.trace import Span, Tracer + + +class CallbackDecorator: + def __init__(self, tracer: Tracer, queue: Queue): + self._tracer = tracer + self._queue = queue + + def _get_span(self, message: AbstractIncomingMessage) -> Optional[Span]: + builder = SpanBuilder(self._tracer) + builder.set_as_consumer() + builder.set_operation(MessagingOperationValues.RECEIVE) + builder.set_destination(message.exchange or message.routing_key) + builder.set_channel(self._queue.channel) + builder.set_message(message) + return builder.build() + + def decorate( + self, callback: Callable[[AbstractIncomingMessage], Any] + ) -> Callable[[AbstractIncomingMessage], Any]: + async def decorated(message: AbstractIncomingMessage): + if not is_instrumentation_enabled(): + return await callback(message) + headers = message.headers or {} + ctx = propagate.extract(headers) + token = context.attach(ctx) + span = self._get_span(message) + if not span: + return await callback(message) + try: + with trace.use_span(span, end_on_exit=True): + return_value = await callback(message) + finally: + context.detach(token) + return return_value + + return decorated diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py new file mode 100644 index 0000000000..6c7ed74ea4 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/package.py @@ -0,0 +1,16 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Collection + +_instruments: Collection[str] = ("aio_pika ~= 7.2.0",) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py new file mode 100644 index 0000000000..cae834a031 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/publish_decorator.py @@ -0,0 +1,53 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Callable, Optional + +import aiormq +from aio_pika import Exchange +from aio_pika.abc import AbstractMessage + +from opentelemetry import propagate, trace +from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder +from opentelemetry.trace import Span, Tracer + + +class PublishDecorator: + def __init__(self, tracer: Tracer, exchange: Exchange): + self._tracer = tracer + self._exchange = exchange + + def _get_publish_span( + self, message: AbstractMessage, routing_key: str + ) -> Optional[Span]: + builder = SpanBuilder(self._tracer) + builder.set_as_producer() + builder.set_destination(f"{self._exchange.name},{routing_key}") + builder.set_channel(self._exchange.channel) + builder.set_message(message) + return builder.build() + + def decorate(self, publish: Callable) -> Callable: + async def decorated_publish( + message: AbstractMessage, routing_key: str, **kwargs + ) -> Optional[aiormq.abc.ConfirmationFrameType]: + span = self._get_publish_span(message, routing_key) + if not span: + return await publish(message, routing_key, **kwargs) + with trace.use_span(span, end_on_exit=True): + if span.is_recording(): + propagate.inject(message.properties.headers) + return_value = await publish(message, routing_key, **kwargs) + return return_value + + return decorated_publish diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py new file mode 100644 index 0000000000..a61209e0ce --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/span_builder.py @@ -0,0 +1,77 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import Optional + +from aio_pika.abc import AbstractChannel, AbstractMessage + +from opentelemetry.instrumentation.aio_pika.utils import ( + is_instrumentation_enabled, +) +from opentelemetry.semconv.trace import ( + MessagingOperationValues, + SpanAttributes, +) +from opentelemetry.trace import Span, SpanKind, Tracer + +_DEFAULT_ATTRIBUTES = {SpanAttributes.MESSAGING_SYSTEM: 'rabbitmq'} + + +class SpanBuilder: + def __init__(self, tracer: Tracer): + self._tracer = tracer + self._attributes = _DEFAULT_ATTRIBUTES.copy() + self._operation: MessagingOperationValues = None + self._kind: SpanKind = None + self._destination: str = None + + def set_as_producer(self): + self._kind = SpanKind.PRODUCER + + def set_as_consumer(self): + self._kind = SpanKind.CONSUMER + + def set_operation(self, operation: MessagingOperationValues): + self._operation = operation + + def set_destination(self, destination: str): + self._destination = destination + self._attributes[SpanAttributes.MESSAGING_DESTINATION] = destination + + def set_channel(self, channel: AbstractChannel): + url = channel.connection.connection.url + self._attributes.update({ + SpanAttributes.NET_PEER_NAME: url.host, + SpanAttributes.NET_PEER_PORT: url.port + }) + + def set_message(self, message: AbstractMessage): + properties = message.properties + if properties.message_id: + self._attributes[SpanAttributes.MESSAGING_MESSAGE_ID] = properties.message_id + if properties.correlation_id: + self._attributes[SpanAttributes.MESSAGING_CONVERSATION_ID] = properties.correlation_id + + def build(self) -> Optional[Span]: + if not is_instrumentation_enabled(): + return None + if self._operation: + self._attributes[SpanAttributes.MESSAGING_OPERATION] = self._operation.value + else: + self._attributes[SpanAttributes.MESSAGING_TEMP_DESTINATION] = True + span = self._tracer.start_span(self._generate_span_name(), kind=self._kind, attributes=self._attributes) + return span + + def _generate_span_name(self) -> str: + operation_value = self._operation.value if self._operation else 'send' + return f'{self._destination} {operation_value}' diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/utils.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/utils.py new file mode 100644 index 0000000000..fb94ddf468 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/utils.py @@ -0,0 +1,9 @@ +from opentelemetry import context + + +def is_instrumentation_enabled() -> bool: + if context.get_value("suppress_instrumentation") or context.get_value( + context._SUPPRESS_INSTRUMENTATION_KEY + ): + return False + return True diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py new file mode 100644 index 0000000000..d8dc1e1ed7 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/src/opentelemetry/instrumentation/aio_pika/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.31b0" diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/__init__.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py new file mode 100644 index 0000000000..ada7080192 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/consts.py @@ -0,0 +1,26 @@ +from argparse import Namespace + +from yarl import URL + +MESSAGE_ID = "meesage_id" +CORRELATION_ID = "correlation_id" +MESSAGING_SYSTEM = "rabbitmq" +EXCHANGE_NAME = "exchange_name" +QUEUE_NAME = "queue_name" +ROUTING_KEY = "routing_key" +SERVER_HOST = "localhost" +SERVER_PORT = 1234 +SERVER_USER = "guest" +SERVER_PASS = "guest" +SERVER_URL = URL( + f"amqp://{SERVER_USER}:{SERVER_PASS}@{SERVER_HOST}:{SERVER_PORT}/" +) +CONNECTION = Namespace(connection=Namespace(url=SERVER_URL)) +CHANNEL = Namespace(connection=CONNECTION, loop=None) +MESSAGE = Namespace( + properties=Namespace( + message_id=MESSAGE_ID, correlation_id=CORRELATION_ID, headers={} + ), + exchange=EXCHANGE_NAME, + headers={}, +) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py new file mode 100644 index 0000000000..c2dc6e5144 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_aio_pika_instrumentation.py @@ -0,0 +1,34 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase + +import wrapt +from aio_pika import Exchange, Queue + +from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor + + +class TestPika(TestCase): + def test_instrument_api(self) -> None: + instrumentation = AioPikaInstrumentor() + instrumentation.instrument() + self.assertTrue(isinstance(Queue.consume, wrapt.BoundFunctionWrapper)) + self.assertTrue( + isinstance(Exchange.publish, wrapt.BoundFunctionWrapper) + ) + instrumentation.uninstrument() + self.assertFalse(isinstance(Queue.consume, wrapt.BoundFunctionWrapper)) + self.assertFalse( + isinstance(Exchange.publish, wrapt.BoundFunctionWrapper) + ) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py new file mode 100644 index 0000000000..70883c116c --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_callback_decorator.py @@ -0,0 +1,74 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from unittest import TestCase, mock + +from aio_pika import Queue + +from opentelemetry.instrumentation.aio_pika.callback_decorator import ( + CallbackDecorator, +) +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind, get_tracer + +from .consts import ( + CHANNEL, + CORRELATION_ID, + EXCHANGE_NAME, + MESSAGE, + MESSAGE_ID, + MESSAGING_SYSTEM, + QUEUE_NAME, + SERVER_HOST, + SERVER_PORT, +) + + +class TestInstrumentedQueue(TestCase): + EXPECTED_ATTRIBUTES = { + SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + SpanAttributes.MESSAGING_DESTINATION: EXCHANGE_NAME, + SpanAttributes.NET_PEER_NAME: SERVER_HOST, + SpanAttributes.NET_PEER_PORT: SERVER_PORT, + SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, + SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, + SpanAttributes.MESSAGING_OPERATION: "receive", + } + + def setUp(self): + self.tracer = get_tracer(__name__) + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def test_get_callback_span(self): + queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None) + tracer = mock.MagicMock() + CallbackDecorator(tracer, queue)._get_span(MESSAGE) + tracer.start_span.assert_called_once_with( + f"{EXCHANGE_NAME} receive", + kind=SpanKind.CONSUMER, + attributes=self.EXPECTED_ATTRIBUTES, + ) + + def test_decorate_callback(self): + queue = Queue(CHANNEL, QUEUE_NAME, False, False, False, None) + callback = mock.MagicMock(return_value=asyncio.sleep(0)) + with mock.patch.object( + CallbackDecorator, "_get_span" + ) as mocked_get_callback_span: + callback_decorator = CallbackDecorator(self.tracer, queue) + decorated_callback = callback_decorator.decorate(callback) + self.loop.run_until_complete(decorated_callback(MESSAGE)) + mocked_get_callback_span.assert_called_once() + callback.assert_called_once_with(MESSAGE) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py new file mode 100644 index 0000000000..80dfa3182b --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_publish_decorator.py @@ -0,0 +1,89 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import asyncio +from typing import Type +from unittest import TestCase, mock + +from aio_pika import Exchange, RobustExchange + +from opentelemetry.instrumentation.aio_pika.publish_decorator import ( + PublishDecorator, +) +from opentelemetry.semconv.trace import SpanAttributes +from opentelemetry.trace import SpanKind, get_tracer + +from .consts import ( + CHANNEL, + CONNECTION, + CORRELATION_ID, + EXCHANGE_NAME, + MESSAGE, + MESSAGE_ID, + MESSAGING_SYSTEM, + ROUTING_KEY, + SERVER_HOST, + SERVER_PORT, +) + + +class TestInstrumentedExchange(TestCase): + EXPECTED_ATTRIBUTES = { + SpanAttributes.MESSAGING_SYSTEM: MESSAGING_SYSTEM, + SpanAttributes.MESSAGING_DESTINATION: f"{EXCHANGE_NAME},{ROUTING_KEY}", + SpanAttributes.NET_PEER_NAME: SERVER_HOST, + SpanAttributes.NET_PEER_PORT: SERVER_PORT, + SpanAttributes.MESSAGING_MESSAGE_ID: MESSAGE_ID, + SpanAttributes.MESSAGING_CONVERSATION_ID: CORRELATION_ID, + SpanAttributes.MESSAGING_TEMP_DESTINATION: True, + } + + def setUp(self): + self.tracer = get_tracer(__name__) + self.loop = asyncio.new_event_loop() + asyncio.set_event_loop(self.loop) + + def test_get_publish_span(self): + exchange = Exchange(CONNECTION, CHANNEL, EXCHANGE_NAME) + tracer = mock.MagicMock() + PublishDecorator(tracer, exchange)._get_publish_span( + MESSAGE, ROUTING_KEY + ) + tracer.start_span.assert_called_once_with( + f"{EXCHANGE_NAME},{ROUTING_KEY} send", + kind=SpanKind.PRODUCER, + attributes=self.EXPECTED_ATTRIBUTES, + ) + + def _test_publish(self, exchange_type: Type[Exchange]): + exchange = exchange_type(CONNECTION, CHANNEL, EXCHANGE_NAME) + with mock.patch.object( + PublishDecorator, "_get_publish_span" + ) as mock_get_publish_span: + with mock.patch.object( + Exchange, "publish", return_value=asyncio.sleep(0) + ) as mock_publish: + decorated_publish = PublishDecorator( + self.tracer, exchange + ).decorate(mock_publish) + self.loop.run_until_complete( + decorated_publish(MESSAGE, ROUTING_KEY) + ) + mock_publish.assert_called_once() + mock_get_publish_span.assert_called_once() + + def test_publish(self): + self._test_publish(Exchange) + + def test_robust_publish(self): + self._test_publish(RobustExchange) diff --git a/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py new file mode 100644 index 0000000000..5f87d53846 --- /dev/null +++ b/instrumentation/opentelemetry-instrumentation-aio-pika/tests/test_span_builder.py @@ -0,0 +1,26 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from unittest import TestCase + +from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder +from opentelemetry.trace import Span, get_tracer + + +class TestBuilder(TestCase): + def test_build(self): + builder = SpanBuilder(get_tracer(__name__)) + builder.set_as_consumer() + builder.set_destination('destination') + span = builder.build() + self.assertTrue(isinstance(span, Span)) diff --git a/opentelemetry-contrib-instrumentations/setup.cfg b/opentelemetry-contrib-instrumentations/setup.cfg index fa5aa4c853..4a22e81067 100644 --- a/opentelemetry-contrib-instrumentations/setup.cfg +++ b/opentelemetry-contrib-instrumentations/setup.cfg @@ -28,6 +28,7 @@ packages = find_namespace: zip_safe = False include_package_data = True install_requires = + opentelemetry-instrumentation-aio-pika==0.31b0 opentelemetry-instrumentation-aiohttp-client==0.31b0 opentelemetry-instrumentation-aiopg==0.31b0 opentelemetry-instrumentation-asgi==0.31b0 diff --git a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py index 6796b18c08..afd6ee2624 100644 --- a/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py +++ b/opentelemetry-instrumentation/src/opentelemetry/instrumentation/bootstrap_gen.py @@ -16,6 +16,10 @@ # RUN `python scripts/generate_instrumentation_bootstrap.py` TO REGENERATE. libraries = { + "aio_pika": { + "library": "aio_pika ~= 7.2.0", + "instrumentation": "opentelemetry-instrumentation-aio-pika==0.31b0", + }, "aiohttp": { "library": "aiohttp ~= 3.0", "instrumentation": "opentelemetry-instrumentation-aiohttp-client==0.31b0", diff --git a/tox.ini b/tox.ini index 90988e207c..f7a106442d 100644 --- a/tox.ini +++ b/tox.ini @@ -454,6 +454,7 @@ commands_pre = python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-sqlalchemy[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-celery[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-pika[test] + python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-aio-pika[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-sklearn[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-redis[test] python -m pip install -e {toxinidir}/instrumentation/opentelemetry-instrumentation-remoulade[test]