Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix/set default headers for properties in pika #740

2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `opentelemetry-util-http` no longer contains an instrumentation entrypoint and will not be loaded
automatically by the auto instrumentor.
([#745](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/745))
- `opentelemetry-instrumentation-pika` Bugfix use properties.headers. It will prevent the header injection from raising.
([#740](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/740))

## [1.6.0-0.25b0](https://github.com/open-telemetry/opentelemetry-python/releases/tag/v1.6.0-0.25b0) - 2021-10-13
### Added
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
MessagingOperationValues,
SpanAttributes,
)
from opentelemetry.trace import Tracer
from opentelemetry.trace import SpanKind, Tracer
from opentelemetry.trace.span import Span


Expand Down Expand Up @@ -40,16 +40,15 @@ def decorated_callback(
body: bytes,
) -> Any:
if not properties:
properties = BasicProperties()
if properties.headers is None:
properties.headers = {}
properties = BasicProperties(headers={})
ctx = propagate.extract(properties.headers, getter=_pika_getter)
if not ctx:
ctx = context.get_current()
span = _get_span(
tracer,
channel,
properties,
span_kind=SpanKind.CONSUMER,
task_name=task_name,
ctx=ctx,
operation=MessagingOperationValues.RECEIVE,
Expand All @@ -74,12 +73,13 @@ def decorated_function(
mandatory: bool = False,
) -> Any:
if not properties:
properties = BasicProperties()
properties = BasicProperties(headers={})
ctx = context.get_current()
span = _get_span(
tracer,
channel,
properties,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
ctx=ctx,
operation=None,
Expand All @@ -104,6 +104,7 @@ def _get_span(
channel: Channel,
properties: BasicProperties,
task_name: str,
span_kind: SpanKind,
ctx: context.Context,
operation: Optional[MessagingOperationValues] = None,
) -> Optional[Span]:
Expand All @@ -113,7 +114,9 @@ def _get_span(
return None
task_name = properties.type if properties.type else task_name
span = tracer.start_span(
context=ctx, name=_generate_span_name(task_name, operation)
context=ctx,
name=_generate_span_name(task_name, operation),
kind=span_kind,
)
if span.is_recording():
_enrich_span(span, channel, properties, task_name, operation)
Expand Down
136 changes: 131 additions & 5 deletions instrumentation/opentelemetry-instrumentation-pika/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@
# limitations under the License.
from unittest import TestCase, mock

from pika.channel import Channel
from pika.spec import Basic, BasicProperties

from opentelemetry.instrumentation.pika import utils
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import Span, Tracer
from opentelemetry.semconv.trace import (
MessagingOperationValues,
SpanAttributes,
)
from opentelemetry.trace import Span, SpanKind, Tracer


class TestUtils(TestCase):
Expand All @@ -32,12 +38,15 @@ def test_get_span(
channel = mock.MagicMock()
properties = mock.MagicMock()
task_name = "test.test"
span_kind = mock.MagicMock(spec=SpanKind)
get_value.return_value = None
ctx = mock.MagicMock()
_ = utils._get_span(tracer, channel, properties, task_name, ctx)
_ = utils._get_span(
tracer, channel, properties, task_name, span_kind, ctx
)
generate_span_name.assert_called_once()
tracer.start_span.assert_called_once_with(
context=ctx, name=generate_span_name.return_value
context=ctx, name=generate_span_name.return_value, kind=span_kind
)
enrich_span.assert_called_once()

Expand All @@ -54,9 +63,12 @@ def test_get_span_suppressed(
channel = mock.MagicMock()
properties = mock.MagicMock()
task_name = "test.test"
span_kind = mock.MagicMock(spec=SpanKind)
get_value.return_value = True
ctx = mock.MagicMock()
span = utils._get_span(tracer, channel, properties, task_name, ctx)
span = utils._get_span(
tracer, channel, properties, task_name, span_kind, ctx
)
self.assertEqual(span, None)
generate_span_name.assert_not_called()
enrich_span.assert_not_called()
Expand Down Expand Up @@ -158,3 +170,117 @@ def test_enrich_span_unique_connection() -> None:
),
],
)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.extract")
@mock.patch("opentelemetry.trace.use_span")
def test_decorate_callback(
self,
use_span: mock.MagicMock,
extract: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
mock_task_name = "mock_task_name"
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
properties = mock.MagicMock()
mock_body = b"mock_body"
decorated_callback = utils._decorate_callback(
callback, tracer, mock_task_name
)
retval = decorated_callback(channel, method, properties, mock_body)
extract.assert_called_once_with(
properties.headers, getter=utils._pika_getter
)
get_span.assert_called_once_with(
tracer,
channel,
properties,
span_kind=SpanKind.CONSUMER,
task_name=mock_task_name,
ctx=extract.return_value,
operation=MessagingOperationValues.RECEIVE,
)
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
callback.assert_called_once_with(
channel, method, properties, mock_body
)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.inject")
@mock.patch("opentelemetry.context.get_current")
@mock.patch("opentelemetry.trace.use_span")
def test_decorate_basic_publish(
self,
use_span: mock.MagicMock,
get_current: mock.MagicMock,
inject: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
properties = mock.MagicMock()
mock_body = b"mock_body"
decorated_basic_publish = utils._decorate_basic_publish(
callback, channel, tracer
)
retval = decorated_basic_publish(
channel, method, mock_body, properties
)
get_current.assert_called_once()
get_span.assert_called_once_with(
tracer,
channel,
properties,
span_kind=SpanKind.PRODUCER,
task_name="(temporary)",
ctx=get_current.return_value,
operation=None,
)
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(properties.headers)
callback.assert_called_once_with(
channel, method, mock_body, properties, False
)
self.assertEqual(retval, callback.return_value)

@mock.patch("opentelemetry.instrumentation.pika.utils._get_span")
@mock.patch("opentelemetry.propagate.inject")
@mock.patch("opentelemetry.context.get_current")
@mock.patch("opentelemetry.trace.use_span")
@mock.patch("pika.spec.BasicProperties.__new__")
def test_decorate_basic_publish_no_properties(
self,
basic_properties: mock.MagicMock,
use_span: mock.MagicMock,
get_current: mock.MagicMock,
inject: mock.MagicMock,
get_span: mock.MagicMock,
) -> None:
callback = mock.MagicMock()
tracer = mock.MagicMock()
channel = mock.MagicMock(spec=Channel)
method = mock.MagicMock(spec=Basic.Deliver)
mock_body = b"mock_body"
decorated_basic_publish = utils._decorate_basic_publish(
callback, channel, tracer
)
retval = decorated_basic_publish(channel, method, body=mock_body)
basic_properties.assert_called_once_with(BasicProperties, headers={})
get_current.assert_called_once()
use_span.assert_called_once_with(
get_span.return_value, end_on_exit=True
)
get_span.return_value.is_recording.assert_called_once()
inject.assert_called_once_with(basic_properties.return_value.headers)
self.assertEqual(retval, callback.return_value)