Skip to content

Commit d5c82a0

Browse files
chore(azure): add typing, call _finish_span, and use ddtrace.auto (#14922)
## Description For `azure_eventhubs`, `azure_functions`, `azure_servicebus` integrations: - Add typing to util functions - call _finish_span directly instead of using the ctx.span context manager - recommend import ddtrace.auto instead of patch ## Testing Uses existing test coverage, no additional testing needed ## Risks <!-- Note any risks associated with this change, or "None" if no risks --> ## Additional Notes Follows comments made in #14636 when adding `azure_eventhubs` integration
1 parent 8c118b8 commit d5c82a0

File tree

9 files changed

+83
-43
lines changed

9 files changed

+83
-43
lines changed

ddtrace/_trace/trace_handlers.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1284,6 +1284,13 @@ def listen():
12841284
"molten.trace_func",
12851285
"redis.execute_pipeline",
12861286
"redis.command",
1287+
"azure.functions.patched_event_hubs",
1288+
"azure.functions.patched_route_request",
1289+
"azure.functions.patched_service_bus",
1290+
"azure.functions.patched_timer",
1291+
"azure.servicebus.patched_producer_batch",
1292+
"azure.servicebus.patched_producer_schedule",
1293+
"azure.servicebus.patched_producer_send",
12871294
"psycopg.patched_connect",
12881295
"azure.eventhubs.patched_producer_batch",
12891296
"azure.eventhubs.patched_producer_send",

ddtrace/contrib/internal/azure_eventhubs/utils.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from ddtrace.internal import core
1919
from ddtrace.internal.utils import get_argument_value
2020
from ddtrace.propagation.http import HTTPPropagator
21+
from ddtrace.trace import Context
2122

2223

2324
def create_context(
@@ -62,7 +63,7 @@ def handle_event_hubs_event_data_context(
6263
span.link_span(parent_context)
6364

6465

65-
def extract_context(event_data: Union[EventData, AmqpAnnotatedMessage]):
66+
def extract_context(event_data: Union[EventData, AmqpAnnotatedMessage]) -> Context:
6667
msg = event_data if isinstance(event_data, AmqpAnnotatedMessage) else event_data._message
6768
return HTTPPropagator.extract(msg.application_properties)
6869

ddtrace/contrib/internal/azure_functions/__init__.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@
44
Enabling
55
~~~~~~~~
66
7-
Use :func:`patch()<ddtrace.patch>` to manually enable the integration::
8-
9-
from ddtrace import patch
10-
patch(azure_functions=True)
7+
The azure_functions integration is enabled by default when using :ref:`import ddtrace.auto<ddtraceauto>`.
118
129
1310
Global Configuration

ddtrace/contrib/internal/azure_functions/utils.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
11
import functools
22
import inspect
3+
from typing import Any
4+
from typing import Callable
5+
from typing import Coroutine
6+
from typing import Optional
7+
from typing import Tuple
8+
from typing import Union
39

410
from ddtrace import config
11+
from ddtrace._trace.pin import Pin
512
from ddtrace.contrib.internal.trace_utils import int_service
613
from ddtrace.ext import SpanTypes
714
from ddtrace.internal import core
815
from ddtrace.internal.schema import schematize_cloud_faas_operation
916

1017

11-
def create_context(context_name, pin, resource=None, headers=None):
18+
def create_context(
19+
context_name: str, pin: Pin, resource: Optional[str] = None, headers: Optional[dict] = None
20+
) -> core.ExecutionContext:
1221
operation_name = schematize_cloud_faas_operation(
1322
"azure.functions.invoke", cloud_provider="azure", cloud_service="functions"
1423
)
@@ -25,12 +34,17 @@ def create_context(context_name, pin, resource=None, headers=None):
2534
)
2635

2736

28-
def wrap_function_with_tracing(func, context_factory, pre_dispatch=None, post_dispatch=None):
37+
def wrap_function_with_tracing(
38+
func: Callable[..., Any],
39+
context_factory: Callable[[Any], core.ExecutionContext],
40+
pre_dispatch: Optional[Callable[[core.ExecutionContext, Any], Tuple[str, Tuple[Any, ...]]]] = None,
41+
post_dispatch: Optional[Callable[[core.ExecutionContext, Any], Tuple[str, Tuple[Any, ...]]]] = None,
42+
) -> Union[Callable[..., Any], Callable[..., Coroutine[Any, Any, Any]]]:
2943
if inspect.iscoroutinefunction(func):
3044

3145
@functools.wraps(func)
32-
async def async_wrapper(*args, **kwargs):
33-
with context_factory(kwargs) as ctx, ctx.span:
46+
async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
47+
with context_factory(kwargs) as ctx:
3448
if pre_dispatch:
3549
core.dispatch(*pre_dispatch(ctx, kwargs))
3650

@@ -45,8 +59,8 @@ async def async_wrapper(*args, **kwargs):
4559
return async_wrapper
4660

4761
@functools.wraps(func)
48-
def wrapper(*args, **kwargs):
49-
with context_factory(kwargs) as ctx, ctx.span:
62+
def wrapper(*args: Any, **kwargs: Any) -> Any:
63+
with context_factory(kwargs) as ctx:
5064
if pre_dispatch:
5165
core.dispatch(*pre_dispatch(ctx, kwargs))
5266

ddtrace/contrib/internal/azure_servicebus/__init__.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,7 @@
44
Enabling
55
~~~~~~~~
66
7-
Use :func:`patch()<ddtrace.patch>` to manually enable the integration::
8-
9-
from ddtrace import patch
10-
patch(azure_servicebus=True)
7+
The azure_servicebus integration is enabled by default when using :ref:`import ddtrace.auto<ddtraceauto>`.
118
129
1310
Global Configuration

ddtrace/contrib/internal/azure_servicebus/patch.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def _patched_add_message(wrapped, instance, args, kwargs):
9797
fully_qualified_namespace = instance._dd_fully_qualified_namespace
9898
operation_name = f"{azure_servicebusx.CLOUD}.{azure_servicebusx.SERVICE}.{azure_servicebusx.CREATE}"
9999

100-
with create_context("azure.servicebus.patched_producer_batch", pin, operation_name, resource_name) as ctx, ctx.span:
100+
with create_context("azure.servicebus.patched_producer_batch", pin, operation_name, resource_name) as ctx:
101101
dispatch_message_modifier(
102102
ctx, args, kwargs, azure_servicebusx.CREATE, resource_name, fully_qualified_namespace, "message"
103103
)
@@ -113,7 +113,7 @@ def _patched_send_messages(wrapped, instance, args, kwargs):
113113
fully_qualified_namespace = instance.fully_qualified_namespace
114114
operation_name = f"{azure_servicebusx.CLOUD}.{azure_servicebusx.SERVICE}.{azure_servicebusx.SEND}"
115115

116-
with create_context("azure.servicebus.patched_producer_send", pin, operation_name, resource_name) as ctx, ctx.span:
116+
with create_context("azure.servicebus.patched_producer_send", pin, operation_name, resource_name) as ctx:
117117
dispatch_message_modifier(
118118
ctx, args, kwargs, azure_servicebusx.SEND, resource_name, fully_qualified_namespace, "message"
119119
)
@@ -129,7 +129,7 @@ async def _patched_send_messages_async(wrapped, instance, args, kwargs):
129129
fully_qualified_namespace = instance.fully_qualified_namespace
130130
operation_name = f"{azure_servicebusx.CLOUD}.{azure_servicebusx.SERVICE}.{azure_servicebusx.SEND}"
131131

132-
with create_context("azure.servicebus.patched_producer_send", pin, operation_name, resource_name) as ctx, ctx.span:
132+
with create_context("azure.servicebus.patched_producer_send", pin, operation_name, resource_name) as ctx:
133133
dispatch_message_modifier(
134134
ctx, args, kwargs, azure_servicebusx.SEND, resource_name, fully_qualified_namespace, "message"
135135
)
@@ -145,9 +145,7 @@ def _patched_schedule_messages(wrapped, instance, args, kwargs):
145145
fully_qualified_namespace = instance.fully_qualified_namespace
146146
operation_name = f"{azure_servicebusx.CLOUD}.{azure_servicebusx.SERVICE}.{azure_servicebusx.SEND}"
147147

148-
with create_context(
149-
"azure.servicebus.patched_producer_schedule", pin, operation_name, resource_name
150-
) as ctx, ctx.span:
148+
with create_context("azure.servicebus.patched_producer_schedule", pin, operation_name, resource_name) as ctx:
151149
dispatch_message_modifier(
152150
ctx, args, kwargs, azure_servicebusx.SEND, resource_name, fully_qualified_namespace, "messages"
153151
)
@@ -163,9 +161,7 @@ async def _patched_schedule_messages_async(wrapped, instance, args, kwargs):
163161
fully_qualified_namespace = instance.fully_qualified_namespace
164162
operation_name = f"{azure_servicebusx.CLOUD}.{azure_servicebusx.SERVICE}.{azure_servicebusx.SEND}"
165163

166-
with create_context(
167-
"azure.servicebus.patched_producer_schedule", pin, operation_name, resource_name
168-
) as ctx, ctx.span:
164+
with create_context("azure.servicebus.patched_producer_schedule", pin, operation_name, resource_name) as ctx:
169165
dispatch_message_modifier(
170166
ctx, args, kwargs, azure_servicebusx.SEND, resource_name, fully_qualified_namespace, "messages"
171167
)

ddtrace/contrib/internal/azure_servicebus/utils.py

Lines changed: 45 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,19 @@
1+
from typing import Any
2+
from typing import List
3+
from typing import Optional
4+
from typing import Tuple
15
from typing import Union
26
from uuid import UUID
37

48
import azure.servicebus as azure_servicebus
9+
from azure.servicebus import ServiceBusMessage
10+
from azure.servicebus import ServiceBusMessageBatch
511
import azure.servicebus.amqp as azure_servicebus_amqp
12+
from azure.servicebus.amqp import AmqpAnnotatedMessage
613

714
from ddtrace import config
15+
from ddtrace._trace.pin import Pin
16+
from ddtrace._trace.span import Span
817
from ddtrace.contrib.trace_utils import ext_service
918
from ddtrace.ext import SpanTypes
1019
from ddtrace.ext import azure_servicebus as azure_servicebusx
@@ -13,7 +22,12 @@
1322
from ddtrace.propagation.http import HTTPPropagator
1423

1524

16-
def create_context(context_name, pin, operation_name, resource=None):
25+
def create_context(
26+
context_name: str,
27+
pin: Pin,
28+
operation_name: str,
29+
resource: Optional[str] = None,
30+
) -> core.ExecutionContext:
1731
return core.context_with_data(
1832
context_name,
1933
span_name=operation_name,
@@ -24,15 +38,21 @@ def create_context(context_name, pin, operation_name, resource=None):
2438
)
2539

2640

27-
def handle_service_bus_message_context(span, message_arg_value):
28-
if isinstance(message_arg_value, (azure_servicebus.ServiceBusMessage, azure_servicebus_amqp.AmqpAnnotatedMessage)):
41+
def handle_service_bus_message_context(
42+
span: Span,
43+
message_arg_value: Union[
44+
ServiceBusMessage,
45+
AmqpAnnotatedMessage,
46+
List[Union[ServiceBusMessage, AmqpAnnotatedMessage]],
47+
ServiceBusMessageBatch,
48+
],
49+
):
50+
if isinstance(message_arg_value, (ServiceBusMessage, AmqpAnnotatedMessage)):
2951
inject_context(span, message_arg_value)
3052
elif (
3153
isinstance(message_arg_value, list)
3254
and message_arg_value
33-
and isinstance(
34-
message_arg_value[0], (azure_servicebus.ServiceBusMessage, azure_servicebus_amqp.AmqpAnnotatedMessage)
35-
)
55+
and isinstance(message_arg_value[0], (ServiceBusMessage, AmqpAnnotatedMessage))
3656
):
3757
for message in message_arg_value:
3858
inject_context(span, message)
@@ -43,7 +63,7 @@ def handle_service_bus_message_context(span, message_arg_value):
4363
span.link_span(parent_context)
4464

4565

46-
def inject_context(span, message):
66+
def inject_context(span: Span, message: Union[ServiceBusMessage, AmqpAnnotatedMessage]):
4767
"""
4868
ServiceBusMessage.application_properties is of type Dict[str | bytes, PrimitiveTypes] | None
4969
AmqpAnnotatedMessage.application_properties is of type Dict[str | bytes, Any] | None
@@ -62,7 +82,14 @@ def inject_context(span, message):
6282
message.application_properties.update(inject_carrier)
6383

6484

65-
def handle_service_bus_message_attributes(message_arg_value):
85+
def handle_service_bus_message_attributes(
86+
message_arg_value: Union[
87+
ServiceBusMessage,
88+
AmqpAnnotatedMessage,
89+
List[Union[ServiceBusMessage, AmqpAnnotatedMessage]],
90+
ServiceBusMessageBatch,
91+
],
92+
) -> Tuple[Union[str, None], Union[str, None]]:
6693
if isinstance(message_arg_value, azure_servicebus.ServiceBusMessage):
6794
batch_count = None
6895
message_id = message_arg_value.message_id
@@ -81,16 +108,22 @@ def handle_service_bus_message_attributes(message_arg_value):
81108
elif isinstance(message_arg_value, list):
82109
batch_count = str(len(message_arg_value))
83110
message_id = None
84-
else:
85-
message_id = None
86-
batch_count = None
87111
return message_id, batch_count
88112

89113

90114
def dispatch_message_modifier(
91-
ctx, args, kwargs, message_operation, resource_name, fully_qualified_namespace, message_arg
115+
ctx: core.ExecutionContext,
116+
args: Any,
117+
kwargs: Any,
118+
message_operation: str,
119+
resource_name: str,
120+
fully_qualified_namespace: str,
121+
message_arg: str,
92122
):
93123
message_arg_value = get_argument_value(args, kwargs, 0, message_arg, True)
124+
if message_arg_value is None:
125+
return
126+
94127
message_id, batch_count = handle_service_bus_message_attributes(message_arg_value)
95128

96129
if config.azure_servicebus.distributed_tracing:

tests/contrib/azure_functions/azure_function_app/function_app.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,7 @@
33
import azure.functions as func
44
import requests
55

6-
from ddtrace import patch
7-
8-
9-
patch(azure_functions=True, azure_servicebus=True, requests=True)
6+
import ddtrace.auto # noqa: F401
107

118

129
app = func.FunctionApp()

tests/contrib/azure_functions_servicebus/azure_function_app/function_app.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,9 @@
33
import azure.functions as func
44
import azure.servicebus as azure_servicebus
55

6-
from ddtrace import patch
6+
import ddtrace.auto # noqa: F401
77

88

9-
patch(azure_functions=True, azure_servicebus=True, requests=True)
10-
119
app = func.FunctionApp()
1210

1311

0 commit comments

Comments
 (0)