Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt EH to OT 1.0.0 #17783

Merged
merged 10 commits into from
Apr 5, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .exceptions import _error_handler
from ._common import EventData
from ._client_base import ConsumerProducerMixin
from ._utils import create_properties, trace_link_message, event_position_selector
from ._utils import create_properties, get_event_links, event_position_selector
from ._constants import (
EPOCH_SYMBOL,
TIMEOUT_SYMBOL,
Expand Down Expand Up @@ -176,7 +176,6 @@ def _next_message_in_buffer(self):
# pylint:disable=protected-access
message = self._message_buffer.pop()
event_data = EventData._from_message(message)
trace_link_message(event_data)
self._last_received_event = event_data
return event_data

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,9 @@
TYPE_CHECKING,
)

from azure.core.tracing import SpanKind
from azure.core.tracing import SpanKind, Link
from azure.core.settings import settings

from .._utils import trace_link_message

if TYPE_CHECKING:
# pylint: disable=ungrouped-imports
from azure.core.tracing import AbstractSpan
Expand Down Expand Up @@ -98,17 +96,14 @@ def create_consumer(
return consumer

@contextmanager
def _context(self, event):
# type: (Union[EventData, Iterable[EventData]]) -> Iterator[None]
def _context(self, links=None):
# type: (List[Link]) -> Iterator[None]
"""Tracing"""
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
if span_impl_type is None:
yield
else:
child = span_impl_type(name="Azure.EventHubs.process")
child = span_impl_type(name="Azure.EventHubs.process", kind=SpanKind.CONSUMER, links=links)
self._eventhub_client._add_span_request_attributes(child) # type: ignore # pylint: disable=protected-access
child.kind = SpanKind.CONSUMER

trace_link_message(event, child)
with child:
yield
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
)
from functools import partial

from .._utils import get_event_links
from .partition_context import PartitionContext
from .in_memory_checkpoint_store import InMemoryCheckpointStore
from .ownership_manager import OwnershipManager
Expand Down Expand Up @@ -221,7 +222,8 @@ def _on_event_received(self, partition_context, event):
partition_context._last_received_event = event[-1] # type: ignore #pylint:disable=protected-access
except TypeError:
partition_context._last_received_event = event # type: ignore #pylint:disable=protected-access
with self._context(event):
links = get_event_links(event)
with self._context(links=links):
self._event_handler(partition_context, event)
else:
self._event_handler(partition_context, event)
Expand Down
7 changes: 2 additions & 5 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
set_message_partition_key,
trace_message,
send_context_manager,
add_link_to_send,
)
from ._constants import TIMEOUT_SYMBOL

Expand All @@ -53,7 +52,6 @@ def _set_trace_message(event_datas, parent_span=None):
# type: (Iterable[EventData], Optional[AbstractSpan]) -> Iterable[EventData]
for ed in iter(event_datas):
trace_message(ed, parent_span)
add_link_to_send(ed, parent_span)
yield ed


Expand Down Expand Up @@ -195,7 +193,6 @@ def _wrap_eventdata(
set_message_partition_key(event_data.message, partition_key)
wrapper_event_data = event_data
trace_message(wrapper_event_data, span)
add_link_to_send(wrapper_event_data, span)
else:
if isinstance(
event_data, EventDataBatch
Expand All @@ -206,8 +203,8 @@ def _wrap_eventdata(
raise ValueError(
"The partition_key does not match the one of the EventDataBatch"
)
for message in event_data.message._body_gen: # pylint: disable=protected-access
add_link_to_send(message, span)
for event in event_data.message._body_gen:
trace_message(event, span)
wrapper_event_data = event_data # type:ignore
else:
if partition_key:
Expand Down
65 changes: 23 additions & 42 deletions sdk/eventhub/azure-eventhub/azure/eventhub/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from uamqp.message import MessageHeader

from azure.core.settings import settings
from azure.core.tracing import SpanKind
from azure.core.tracing import SpanKind, Link

from ._version import VERSION
from ._constants import (
Expand Down Expand Up @@ -132,25 +132,12 @@ def send_context_manager():
) # type: Type[AbstractSpan]

if span_impl_type is not None:
with span_impl_type(name="Azure.EventHubs.send") as child:
child.kind = SpanKind.CLIENT
with span_impl_type(name="Azure.EventHubs.send", kind=SpanKind.CLIENT) as child:
yield child
else:
yield None


def add_link_to_send(event, send_span):
"""Add Diagnostic-Id from event to span as link.
"""
try:
if send_span and event.properties:
traceparent = event.properties.get(b"Diagnostic-Id", "").decode("ascii")
if traceparent:
send_span.link(traceparent)
except Exception as exp: # pylint:disable=broad-except
_LOGGER.warning("add_link_to_send had an exception %r", exp)


def trace_message(event, parent_span=None):
# type: (EventData, Optional[AbstractSpan]) -> None
"""Add tracing information to this event.
Expand All @@ -164,8 +151,14 @@ def trace_message(event, parent_span=None):
current_span = parent_span or span_impl_type(
span_impl_type.get_current_span()
)
with current_span.span(name="Azure.EventHubs.message") as message_span:
message_span.kind = SpanKind.PRODUCER
link = Link({
'traceparent': current_span.get_trace_parent()
})
with current_span.span(
name="Azure.EventHubs.message",
kind=SpanKind.PRODUCER,
links=[link]
) as message_span:
message_span.add_attribute("az.namespace", "Microsoft.EventHub")
if not event.properties:
event.properties = dict()
Expand All @@ -175,32 +168,20 @@ def trace_message(event, parent_span=None):
except Exception as exp: # pylint:disable=broad-except
_LOGGER.warning("trace_message had an exception %r", exp)


def trace_link_message(events, parent_span=None):
# type: (Union[EventData, Iterable[EventData]], Optional[AbstractSpan]) -> None
"""Link the current event(s) to current span or provided parent span.

Will extract DiagnosticId if available.
"""
def get_event_links(events):
trace_events = events if isinstance(events, Iterable) else (events,) # pylint:disable=isinstance-second-argument-not-valid-type
try: # pylint:disable=too-many-nested-blocks
span_impl_type = settings.tracing_implementation() # type: Type[AbstractSpan]
if span_impl_type is not None:
current_span = parent_span or span_impl_type(
span_impl_type.get_current_span()
)
if current_span:
for event in trace_events: # type: ignore
if event.properties:
traceparent = event.properties.get(b"Diagnostic-Id", "").decode("ascii")
if traceparent:
current_span.link(
traceparent,
attributes={"enqueuedTime": event.message.annotations.get(PROP_TIMESTAMP)}
)
except Exception as exp: # pylint:disable=broad-except
_LOGGER.warning("trace_link_message had an exception %r", exp)

links = []
try:
for event in trace_events: # type: ignore
if event.properties:
traceparent = event.properties.get(b"Diagnostic-Id", "").decode("ascii")
if traceparent:
links.append(Link({'traceparent': traceparent},
attributes={"enqueuedTime": event.message.annotations.get(PROP_TIMESTAMP)}
))
except AttributeError:
pass
return links

def event_position_selector(value, inclusive=False):
# type: (Union[int, str, datetime.datetime], bool) -> bytes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def __init__(
super(ClientBaseAsync, self).__init__(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
credential=credential,
credential=self._credential,
**kwargs
)
self._conn_manager_async = get_connection_manager(loop=self._loop, **kwargs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from ._client_base_async import ConsumerProducerMixin
from .._common import EventData
from ..exceptions import _error_handler
from .._utils import create_properties, trace_link_message, event_position_selector
from .._utils import create_properties, get_event_links, event_position_selector
from .._constants import EPOCH_SYMBOL, TIMEOUT_SYMBOL, RECEIVER_RUNTIME_METRIC_SYMBOL

if TYPE_CHECKING:
Expand Down Expand Up @@ -165,7 +165,7 @@ def _next_message_in_buffer(self):
# pylint:disable=protected-access
message = self._message_buffer.pop()
event_data = EventData._from_message(message)
trace_link_message(event_data)
get_event_links(event_data)
self._last_received_event = event_data
return event_data

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from azure.eventhub import EventData
from ..._eventprocessor.common import CloseReason, LoadBalancingStrategy
from ..._eventprocessor._eventprocessor_mixin import EventProcessorMixin
from ..._utils import get_event_links
from .partition_context import PartitionContext
from .in_memory_checkpoint_store import InMemoryCheckpointStore
from .checkpoint_store import CheckpointStore
Expand Down Expand Up @@ -220,7 +221,8 @@ async def _on_event_received(
partition_context._last_received_event = event[-1] # type: ignore #pylint:disable=protected-access
except TypeError:
partition_context._last_received_event = event # type: ignore # pylint:disable=protected-access
with self._context(event):
links = get_event_links(event)
with self._context(links=links):
await self._event_handler(partition_context, event)
else:
await self._event_handler(partition_context, event)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
set_message_partition_key,
trace_message,
send_context_manager,
add_link_to_send,
)
from .._constants import TIMEOUT_SYMBOL
from ._client_base_async import ConsumerProducerMixin
Expand Down Expand Up @@ -182,7 +181,6 @@ def _wrap_eventdata(
set_message_partition_key(event_data.message, partition_key)
wrapper_event_data = event_data
trace_message(wrapper_event_data, span)
add_link_to_send(wrapper_event_data, span)
else:
if isinstance(
event_data, EventDataBatch
Expand All @@ -193,8 +191,8 @@ def _wrap_eventdata(
raise ValueError(
"The partition_key does not match the one of the EventDataBatch"
)
for message in event_data.message._body_gen: # pylint: disable=protected-access
add_link_to_send(message, span)
for event in event_data.message._body_gen:
trace_message(event, span)
wrapper_event_data = event_data # type:ignore
else:
if partition_key:
Expand Down