Skip to content

Commit

Permalink
Added aio-pika instrumentation
Browse files Browse the repository at this point in the history
  • Loading branch information
ofek1weiss committed May 18, 2022
1 parent 80969a0 commit 9ccb612
Show file tree
Hide file tree
Showing 18 changed files with 790 additions and 0 deletions.
22 changes: 22 additions & 0 deletions instrumentation/opentelemetry-instrumentation-aio-pika/README.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
OpenTelemetry aio_pika Instrumentation
==================================

|pypi|

.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-aio-pika.svg
:target: https://pypi.org/project/opentelemetry-instrumentation-aio-pika/

This library allows tracing requests made by the aio_pika library.

Installation
------------

::

pip install opentelemetry-instrumentation-aio-pika

References
----------

* `OpenTelemetry aio_pika/ Tracing <https://opentelemetry-python-contrib.readthedocs.io/en/latest/instrumentation/aio-pika/aio-pika.html>`_
* `OpenTelemetry Project <https://opentelemetry.io/>`_
58 changes: 58 additions & 0 deletions instrumentation/opentelemetry-instrumentation-aio-pika/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
[metadata]
name = opentelemetry-instrumentation-aio-pika
description = OpenTelemetry aio_pika instrumentation
long_description = file: README.rst
long_description_content_type = text/x-rst
author = OpenTelemetry Authors
author_email = cncf-opentelemetry-contributors@lists.cncf.io
url = https://github.com/open-telemetry/opentelemetry-python-contrib/instrumentation/opentelemetry-instrumentation-aio-pika
platforms = any
license = Apache-2.0
classifiers =
Development Status :: 4 - Beta
Intended Audience :: Developers
License :: OSI Approved :: Apache Software License
Programming Language :: Python
Programming Language :: Python :: 3
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
Programming Language :: Python :: 3.8
Programming Language :: Python :: 3.9
Programming Language :: Python :: 3.10

[options]
python_requires = >=3.6
package_dir=
=src
packages=find_namespace:

install_requires =
opentelemetry-api ~= 1.5
wrapt >= 1.0.0, < 2.0.0

[options.extras_require]
test =
pytest
wrapt >= 1.0.0, < 2.0.0
opentelemetry-test-utils == 0.30b1

[options.packages.find]
where = src

[options.entry_points]
opentelemetry_instrumentor =
aio-pika = opentelemetry.instrumentation.aio_pika:AioPikaInstrumentor
89 changes: 89 additions & 0 deletions instrumentation/opentelemetry-instrumentation-aio-pika/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# DO NOT EDIT. THIS FILE WAS AUTOGENERATED FROM templates/instrumentation_setup.py.txt.
# RUN `python scripts/generate_setup.py` TO REGENERATE.


import distutils.cmd
import json
import os
from configparser import ConfigParser

import setuptools

config = ConfigParser()
config.read("setup.cfg")

# We provide extras_require parameter to setuptools.setup later which
# overwrites the extras_require section from setup.cfg. To support extras_require
# section in setup.cfg, we load it here and merge it with the extras_require param.
extras_require = {}
if "options.extras_require" in config:
for key, value in config["options.extras_require"].items():
extras_require[key] = [v for v in value.split("\n") if v.strip()]

BASE_DIR = os.path.dirname(__file__)
PACKAGE_INFO = {}

VERSION_FILENAME = os.path.join(
BASE_DIR, "src", "opentelemetry", "instrumentation", "aio_pika", "version.py"
)
with open(VERSION_FILENAME, encoding="utf-8") as f:
exec(f.read(), PACKAGE_INFO)

PACKAGE_FILENAME = os.path.join(
BASE_DIR, "src", "opentelemetry", "instrumentation", "aio_pika", "package.py"
)
with open(PACKAGE_FILENAME, encoding="utf-8") as f:
exec(f.read(), PACKAGE_INFO)

# Mark any instruments/runtime dependencies as test dependencies as well.
extras_require["instruments"] = PACKAGE_INFO["_instruments"]
test_deps = extras_require.get("test", [])
for dep in extras_require["instruments"]:
test_deps.append(dep)

extras_require["test"] = test_deps


class JSONMetadataCommand(distutils.cmd.Command):

description = (
"print out package metadata as JSON. This is used by OpenTelemetry dev scripts to ",
"auto-generate code in other places",
)
user_options = []

def initialize_options(self):
pass

def finalize_options(self):
pass

def run(self):
metadata = {
"name": config["metadata"]["name"],
"version": PACKAGE_INFO["__version__"],
"instruments": PACKAGE_INFO["_instruments"],
}
print(json.dumps(metadata))


setuptools.setup(
cmdclass={"meta": JSONMetadataCommand},
version=PACKAGE_INFO["__version__"],
extras_require=extras_require,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Instrument `aio_pika` to trace RabbitMQ applications.
Usage
-----
* Start broker backend
.. code-block:: python
docker run -p 5672:5672 rabbitmq
* Run instrumented task
.. code-block:: python
import asyncio
from aio_pika import Message, connect
from opentelemetry.instrumentation.aio_pika import AioPikaInstrumentor
AioPikaInstrumentor().instrument()
async def main() -> None:
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
queue = await channel.declare_queue("hello")
await channel.default_exchange.publish(
Message(b"Hello World!"),
routing_key=queue.name,
)
if __name__ == "__main__":
asyncio.run(main())
API
---
"""
# pylint: disable=import-error

from .aio_pika_instrumentor import AioPikaInstrumentor
from .version import __version__

__all__ = ["AioPikaInstrumentor", "__version__"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional, List
from opentelemetry.propagators.textmap import Getter, CarrierT


class _AioPikaGetter(Getter): # type: ignore
def get(self, carrier: CarrierT, key: str) -> Optional[List[str]]:
value = carrier.get(key, None)
if value is None:
return None
return [value]

def keys(self, carrier: CarrierT) -> List[str]:
return []


aio_pika_getter = _AioPikaGetter()
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Collection

import aio_pika
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor

from .package import _instruments
from .span_builder import SpanBuilder
from .instrumented_exchange import InstrumentedExchange, RobustInstrumentedExchange
from .instrumented_queue import InstrumentedQueue, RobustInstrumentedQueue


class AioPikaInstrumentor(BaseInstrumentor):

def _instrument(self, **kwargs):
tracer_provider = kwargs.get('tracer_provider', None)
SpanBuilder.TRACER_PROVIDER = tracer_provider
aio_pika.Channel.EXCHANGE_CLASS = InstrumentedExchange
aio_pika.Channel.QUEUE_CLASS = InstrumentedQueue
aio_pika.RobustChannel.EXCHANGE_CLASS = RobustInstrumentedExchange
aio_pika.RobustChannel.QUEUE_CLASS = RobustInstrumentedQueue

def _uninstrument(self, **kwargs):
SpanBuilder.TRACER_PROVIDER = None
aio_pika.Channel.EXCHANGE_CLASS = aio_pika.Exchange
aio_pika.Channel.QUEUE_CLASS = aio_pika.Queue
aio_pika.RobustChannel.EXCHANGE_CLASS = aio_pika.RobustExchange
aio_pika.RobustChannel.QUEUE_CLASS = aio_pika.RobustQueue

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Optional

import aiormq
from aio_pika import Exchange, RobustExchange
from aio_pika.abc import AbstractMessage
from opentelemetry import trace, propagate
from opentelemetry.trace import Span

from opentelemetry.instrumentation.aio_pika.span_builder import SpanBuilder


class InstrumentedExchange(Exchange):
def _get_publish_span(self, message: AbstractMessage, routing_key: str) -> Optional[Span]:
builder = SpanBuilder()
builder.set_as_producer()
builder.set_destination(f'{self.name},{routing_key}')
builder.set_channel(self.channel)
builder.set_message(message)
return builder.build()

async def publish(self, message: AbstractMessage, routing_key: str, **kwargs) -> Optional[aiormq.abc.ConfirmationFrameType]:
span = self._get_publish_span(message, routing_key)
if not span:
return await super().publish(message, routing_key, **kwargs)
with trace.use_span(span, end_on_exit=True):
if span.is_recording():
propagate.inject(message.properties.headers)
return_value = await super().publish(message, routing_key, **kwargs)
return return_value


class RobustInstrumentedExchange(RobustExchange, InstrumentedExchange):
pass
Loading

0 comments on commit 9ccb612

Please sign in to comment.