Skip to content

Commit c38a2f6

Browse files
committed
add buffered snapshot tests
1 parent c496134 commit c38a2f6

9 files changed

+1026
-10
lines changed

ddtrace/contrib/internal/azure_eventhubs/patch.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,16 @@ async def _patched_create_batch_async(wrapped, instance, args, kwargs):
107107

108108
def _patched_add(wrapped, instance, args, kwargs):
109109
pin = Pin.get_from(instance)
110-
if not pin or not pin.enabled() or not config.azure_eventhubs.batch_links:
110+
if (
111+
not pin
112+
or not pin.enabled()
113+
or not config.azure_eventhubs.batch_links
114+
# Skip patching when these attributes haven't been added.
115+
# A known case is when a producer client in buffered mode
116+
# instantiates a batch without using the create_batch method.
117+
or not getattr(instance, "_dd_eventhub_name", None)
118+
or not getattr(instance, "_dd_fully_qualified_namespace", None)
119+
):
111120
return wrapped(*args, **kwargs)
112121

113122
resource_name = instance._dd_eventhub_name

tests/contrib/azure_eventhubs/common.py

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from threading import Lock
55
from threading import Thread
66
from typing import List
7+
from typing import Optional
78
from typing import Union
89
from uuid import uuid4
910

@@ -62,6 +63,24 @@ def make_amqp_annotated_messages() -> List[Union[EventData, AmqpAnnotatedMessage
6263
]
6364

6465

66+
def on_success(events: List[Union[EventData, AmqpAnnotatedMessage]], partition_id: Optional[str]):
67+
pass
68+
69+
70+
def on_error(events: List[Union[EventData, AmqpAnnotatedMessage]], partition_id: Optional[str], error: Exception):
71+
raise error
72+
73+
74+
async def on_success_async(events: List[Union[EventData, AmqpAnnotatedMessage]], partition_id: Optional[str]):
75+
pass
76+
77+
78+
async def on_error_async(
79+
events: List[Union[EventData, AmqpAnnotatedMessage]], partition_id: Optional[str], error: Exception
80+
):
81+
raise error
82+
83+
6584
class EventHandler:
6685
def __init__(self):
6786
self.expected_event_count = 0
@@ -290,14 +309,19 @@ async def run_test_async(
290309
@pytest.mark.asyncio
291310
async def test_common():
292311
method = os.environ.get("METHOD")
312+
buffered_mode = os.environ.get("BUFFERED_MODE") == "True"
293313
is_async = os.environ.get("IS_ASYNC") == "True"
294314
message_payload_type = os.environ.get("MESSAGE_PAYLOAD_TYPE")
295315
distributed_tracing_enabled = os.environ.get("DD_AZURE_EVENTHUBS_DISTRIBUTED_TRACING", "True") == "True"
296316
batch_links_enabled = os.environ.get("DD_TRACE_AZURE_EVENTHUBS_BATCH_LINKS_ENABLED", "True") == "True"
297317

298318
if is_async:
299319
producer_client = EventHubProducerClientAsync.from_connection_string(
300-
conn_str=CONNECTION_STRING, eventhub_name=EVENTHUB_NAME
320+
conn_str=CONNECTION_STRING,
321+
eventhub_name=EVENTHUB_NAME,
322+
buffered_mode=buffered_mode,
323+
on_error=on_error_async,
324+
on_success=on_success_async,
301325
)
302326
(consumer_client, event_handler, receive_task) = await create_event_handler_async()
303327
try:
@@ -314,7 +338,11 @@ async def test_common():
314338
await close_event_handler_async(consumer_client, receive_task)
315339
else:
316340
producer_client = EventHubProducerClient.from_connection_string(
317-
conn_str=CONNECTION_STRING, eventhub_name=EVENTHUB_NAME
341+
conn_str=CONNECTION_STRING,
342+
eventhub_name=EVENTHUB_NAME,
343+
buffered_mode=buffered_mode,
344+
on_error=on_error,
345+
on_success=on_success,
318346
)
319347
(consumer_client, event_handler, thread) = create_event_handler()
320348
try:

tests/contrib/azure_eventhubs/test_azure_eventhubs_snapshot.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,37 +12,48 @@
1212
SNAPSHOT_IGNORES = ["meta.messaging.message_id", "meta._dd.span_links"]
1313

1414
METHODS = ["send_event", "send_batch"]
15+
BUFFERED_OPTIONS = [False, True]
1516
ASYNC_OPTIONS = [False, True]
1617
PAYLOAD_TYPES = ["single", "list", "batch"]
1718
DISTRIBUTED_TRACING_ENABLED_OPTIONS = [None, False]
1819
BATCH_LINKS_ENABLED_OPTIONS = [None, False]
1920

2021

21-
def is_invalid_test_combination(method, payload_type, batch_links_enabled):
22+
def is_invalid_test_combination(method, buffered_mode, payload_type, distributed_tracing_enabled, batch_links_enabled):
2223
return (
24+
# Payloads not valid for method
2325
(method == "send_event" and payload_type in {"list", "batch"})
2426
or (method == "send_batch" and payload_type == "single")
27+
# Only test batch_links config for batches
2528
or (payload_type != "batch" and batch_links_enabled is False)
29+
# Only test buffered mode with enabled configs
30+
or (buffered_mode and (distributed_tracing_enabled is False or batch_links_enabled is False))
2631
)
2732

2833

2934
params = [
3035
(
31-
f"{m}{'_async' if a else ''}_{p}"
36+
f"{m}{'_buffered' if b else ''}{'_async' if a else ''}_{p}"
3237
f"_distributed_tracing_{'enabled' if d is None else 'disabled'}"
33-
f"{'_batch_links_enabled' if p == 'batch' and b is None else '_batch_links_disabled' if p == 'batch' else ''}",
38+
f"{'_batch_links_enabled' if p == 'batch' and bl is None else '_batch_links_disabled' if p == 'batch' else ''}",
3439
{
3540
"METHOD": m,
41+
"BUFFERED_MODE": str(b),
3642
"IS_ASYNC": str(a),
3743
"MESSAGE_PAYLOAD_TYPE": p,
3844
**({"DD_AZURE_EVENTHUBS_DISTRIBUTED_TRACING": str(d)} if d is not None else {}),
39-
**({"DD_TRACE_AZURE_EVENTHUBS_BATCH_LINKS_ENABLED": str(b)} if b is not None else {}),
45+
**({"DD_TRACE_AZURE_EVENTHUBS_BATCH_LINKS_ENABLED": str(bl)} if bl is not None else {}),
4046
},
4147
)
42-
for m, a, p, d, b in itertools.product(
43-
METHODS, ASYNC_OPTIONS, PAYLOAD_TYPES, DISTRIBUTED_TRACING_ENABLED_OPTIONS, BATCH_LINKS_ENABLED_OPTIONS
48+
for m, b, a, p, d, bl in itertools.product(
49+
METHODS,
50+
BUFFERED_OPTIONS,
51+
ASYNC_OPTIONS,
52+
PAYLOAD_TYPES,
53+
DISTRIBUTED_TRACING_ENABLED_OPTIONS,
54+
BATCH_LINKS_ENABLED_OPTIONS,
4455
)
45-
if not is_invalid_test_combination(m, p, b)
56+
if not is_invalid_test_combination(m, b, p, d, bl)
4657
]
4758

4859
param_ids, param_values = zip(*params)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
[[
2+
{
3+
"name": "azure.eventhubs.create",
4+
"service": "azure_eventhubs",
5+
"resource": "eh1",
6+
"trace_id": 0,
7+
"span_id": 1,
8+
"parent_id": 0,
9+
"type": "worker",
10+
"error": 0,
11+
"meta": {
12+
"_dd.base_service": "ddtrace_subprocess_dir",
13+
"_dd.p.dm": "-0",
14+
"_dd.p.tid": "68d1afa600000000",
15+
"component": "azure_eventhubs",
16+
"language": "python",
17+
"messaging.destination.name": "eh1",
18+
"messaging.operation": "create",
19+
"messaging.system": "eventhubs",
20+
"network.destination.name": "localhost",
21+
"runtime-id": "865fcd1a1cf94252b4a4267453e1f07a",
22+
"span.kind": "producer"
23+
},
24+
"metrics": {
25+
"_dd.top_level": 1,
26+
"_dd.tracer_kr": 1.0,
27+
"_sampling_priority_v1": 1,
28+
"process_id": 17265
29+
},
30+
"duration": 5025875,
31+
"start": 1758572454351446055
32+
}],
33+
[
34+
{
35+
"name": "azure.eventhubs.create",
36+
"service": "azure_eventhubs",
37+
"resource": "eh1",
38+
"trace_id": 1,
39+
"span_id": 1,
40+
"parent_id": 0,
41+
"type": "worker",
42+
"error": 0,
43+
"meta": {
44+
"_dd.base_service": "ddtrace_subprocess_dir",
45+
"_dd.p.dm": "-0",
46+
"_dd.p.tid": "68d1afa600000000",
47+
"component": "azure_eventhubs",
48+
"language": "python",
49+
"messaging.destination.name": "eh1",
50+
"messaging.message_id": "3573e4e2-0803-4164-b84e-bf28d9906dbb",
51+
"messaging.operation": "create",
52+
"messaging.system": "eventhubs",
53+
"network.destination.name": "localhost",
54+
"runtime-id": "865fcd1a1cf94252b4a4267453e1f07a",
55+
"span.kind": "producer"
56+
},
57+
"metrics": {
58+
"_dd.top_level": 1,
59+
"_dd.tracer_kr": 1.0,
60+
"_sampling_priority_v1": 1,
61+
"process_id": 17265
62+
},
63+
"duration": 882666,
64+
"start": 1758572454365227430
65+
}],
66+
[
67+
{
68+
"name": "azure.eventhubs.send",
69+
"service": "azure_eventhubs",
70+
"resource": "eh1",
71+
"trace_id": 2,
72+
"span_id": 1,
73+
"parent_id": 0,
74+
"type": "worker",
75+
"error": 0,
76+
"meta": {
77+
"_dd.base_service": "ddtrace_subprocess_dir",
78+
"_dd.p.dm": "-0",
79+
"_dd.p.tid": "68d1afa600000000",
80+
"_dd.span_links": "[{\"trace_id\": \"68d1afa600000000ba6aabf9c1b3fd92\", \"span_id\": \"24ec6ec59147b4f7\", \"tracestate\": \"dd=s:1;t.dm:-0;t.tid:68d1afa600000000\", \"flags\": 1}, {\"trace_id\": \"68d1afa600000000b6abae0644b4c302\", \"span_id\": \"47203230632cfaf7\", \"tracestate\": \"dd=s:1;t.dm:-0;t.tid:68d1afa600000000\", \"flags\": 1}]",
81+
"component": "azure_eventhubs",
82+
"language": "python",
83+
"messaging.batch_count": "2",
84+
"messaging.destination.name": "eh1",
85+
"messaging.operation": "send",
86+
"messaging.system": "eventhubs",
87+
"network.destination.name": "localhost",
88+
"runtime-id": "865fcd1a1cf94252b4a4267453e1f07a",
89+
"span.kind": "producer"
90+
},
91+
"metrics": {
92+
"_dd.top_level": 1,
93+
"_dd.tracer_kr": 1.0,
94+
"_sampling_priority_v1": 1,
95+
"process_id": 17265
96+
},
97+
"duration": 487636250,
98+
"start": 1758572454366398763
99+
}],
100+
[
101+
{
102+
"name": "azure.eventhubs.create",
103+
"service": "azure_eventhubs",
104+
"resource": "eh1",
105+
"trace_id": 3,
106+
"span_id": 1,
107+
"parent_id": 0,
108+
"type": "worker",
109+
"error": 0,
110+
"meta": {
111+
"_dd.base_service": "ddtrace_subprocess_dir",
112+
"_dd.p.dm": "-0",
113+
"_dd.p.tid": "68d1afa600000000",
114+
"component": "azure_eventhubs",
115+
"language": "python",
116+
"messaging.destination.name": "eh1",
117+
"messaging.operation": "create",
118+
"messaging.system": "eventhubs",
119+
"network.destination.name": "localhost",
120+
"runtime-id": "865fcd1a1cf94252b4a4267453e1f07a",
121+
"span.kind": "producer"
122+
},
123+
"metrics": {
124+
"_dd.top_level": 1,
125+
"_dd.tracer_kr": 1.0,
126+
"_sampling_priority_v1": 1,
127+
"process_id": 17265
128+
},
129+
"duration": 332375,
130+
"start": 1758572454855474472
131+
}],
132+
[
133+
{
134+
"name": "azure.eventhubs.create",
135+
"service": "azure_eventhubs",
136+
"resource": "eh1",
137+
"trace_id": 4,
138+
"span_id": 1,
139+
"parent_id": 0,
140+
"type": "worker",
141+
"error": 0,
142+
"meta": {
143+
"_dd.base_service": "ddtrace_subprocess_dir",
144+
"_dd.p.dm": "-0",
145+
"_dd.p.tid": "68d1afa600000000",
146+
"component": "azure_eventhubs",
147+
"language": "python",
148+
"messaging.destination.name": "eh1",
149+
"messaging.message_id": "10b51c28-628d-4a42-a7e2-905ef2ab63b2",
150+
"messaging.operation": "create",
151+
"messaging.system": "eventhubs",
152+
"network.destination.name": "localhost",
153+
"runtime-id": "865fcd1a1cf94252b4a4267453e1f07a",
154+
"span.kind": "producer"
155+
},
156+
"metrics": {
157+
"_dd.top_level": 1,
158+
"_dd.tracer_kr": 1.0,
159+
"_sampling_priority_v1": 1,
160+
"process_id": 17265
161+
},
162+
"duration": 319333,
163+
"start": 1758572454855916930
164+
}],
165+
[
166+
{
167+
"name": "azure.eventhubs.send",
168+
"service": "azure_eventhubs",
169+
"resource": "eh1",
170+
"trace_id": 5,
171+
"span_id": 1,
172+
"parent_id": 0,
173+
"type": "worker",
174+
"error": 0,
175+
"meta": {
176+
"_dd.base_service": "ddtrace_subprocess_dir",
177+
"_dd.p.dm": "-0",
178+
"_dd.p.tid": "68d1afa600000000",
179+
"_dd.span_links": "[{\"trace_id\": \"68d1afa600000000c1b95d25d2cf5f60\", \"span_id\": \"47b73cd1892637ea\", \"tracestate\": \"dd=s:1;t.dm:-0;t.tid:68d1afa600000000\", \"flags\": 1}, {\"trace_id\": \"68d1afa6000000000160fd952293e847\", \"span_id\": \"079de0c29ec51a2f\", \"tracestate\": \"dd=s:1;t.dm:-0;t.tid:68d1afa600000000\", \"flags\": 1}]",
180+
"component": "azure_eventhubs",
181+
"language": "python",
182+
"messaging.batch_count": "2",
183+
"messaging.destination.name": "eh1",
184+
"messaging.operation": "send",
185+
"messaging.system": "eventhubs",
186+
"network.destination.name": "localhost",
187+
"runtime-id": "865fcd1a1cf94252b4a4267453e1f07a",
188+
"span.kind": "producer"
189+
},
190+
"metrics": {
191+
"_dd.top_level": 1,
192+
"_dd.tracer_kr": 1.0,
193+
"_sampling_priority_v1": 1,
194+
"process_id": 17265
195+
},
196+
"duration": 460459,
197+
"start": 1758572454856334638
198+
}]]

0 commit comments

Comments
 (0)