6868LOWER_64_BITS = "LOWER_64_BITS"
6969
7070
71- def _dsm_set_context_sqs_event (event ):
72- for record in event .get ("Records" , [])[1 :]:
73- if arn := record .get ("eventSourceARN" ):
74- try :
75- context = _extract_context_from_sqs_or_sns_record (record )
76- except Exception as e :
77- logger .debug (
78- f"DSM: Failed to extract context with error { e } , will still set checkpoint"
79- )
80- context = None
81- _dsm_set_checkpoint (context , "sqs" , arn )
82-
83-
84- def _dsm_set_context_kinesis_event (event ):
85- for record in event .get ("Records" , [])[1 :]:
86- if (arn := record .get ("eventSourceARN" )) and (kinesis := record .get ("kinesis" )):
87- try :
88- context = _extract_context_from_kinesis_record (kinesis )
89- except Exception as e :
90- logger .debug (
91- f"DSM: Failed to extract context with error { e } , will still set checkpoint"
92- )
93- context = None
94- _dsm_set_checkpoint (context , "kinesis" , arn )
95-
96-
9771def _dsm_set_checkpoint (context_json , event_type , arn ):
9872 if not config .data_streams_enabled :
9973 return
@@ -263,6 +237,7 @@ def extract_context_from_sqs_or_sns_event_or_context(
263237 Falls back to lambda context if no trace data is found in the SQS message attributes.
264238 Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
265239 """
240+ source_arn = ""
266241 event_type = "sqs" if event_source .equals (EventTypes .SQS ) else "sns"
267242
268243 # EventBridge => SQS
@@ -273,106 +248,98 @@ def extract_context_from_sqs_or_sns_event_or_context(
273248 except Exception :
274249 logger .debug ("Failed extracting context as EventBridge to SQS." )
275250
276- try :
277- first_record = event .get ("Records" )[0 ]
278- source_arn = (
279- first_record .get ("Sns" , {}).get ("TopicArn" )
280- if event_type == "sns"
281- else first_record .get ("eventSourceARN" )
282- )
283- dd_data = _extract_context_from_sqs_or_sns_record (first_record )
284- if dd_data :
285- if is_step_function_event (dd_data ):
251+ context = None
252+ for idx , record in enumerate (event .get ("Records" , [])):
253+ try :
254+ source_arn = record .get ("eventSourceARN" , "" )
255+ dd_ctx = None
256+
257+ # logic to deal with SNS => SQS event
258+ if "body" in record :
259+ body_str = record .get ("body" )
286260 try :
287- return extract_context_from_step_functions (dd_data , None )
261+ body = json .loads (body_str )
262+ if body .get ("Type" , "" ) == "Notification" and "TopicArn" in body :
263+ logger .debug ("Found SNS message inside SQS event" )
264+ record = get_first_record (create_sns_event (body ))
288265 except Exception :
266+ pass
267+
268+ msg_attributes = record .get ("messageAttributes" )
269+ if msg_attributes is None :
270+ sns_record = record .get ("Sns" ) or {}
271+ # SNS->SQS event would extract SNS arn without this check
272+ if event_source .equals (EventTypes .SNS ):
273+ source_arn = sns_record .get ("TopicArn" , "" )
274+ msg_attributes = sns_record .get ("MessageAttributes" ) or {}
275+ dd_payload = msg_attributes .get ("_datadog" )
276+ if dd_payload :
277+ # SQS uses dataType and binaryValue/stringValue
278+ # SNS uses Type and Value
279+ # fmt: off
280+ dd_json_data = None
281+ dd_json_data_type = dd_payload .get ("Type" ) or dd_payload .get ("dataType" )
282+ if dd_json_data_type == "Binary" :
283+ import base64
284+
285+ dd_json_data = dd_payload .get ("binaryValue" ) or dd_payload .get ("Value" )
286+ if dd_json_data :
287+ dd_json_data = base64 .b64decode (dd_json_data )
288+ elif dd_json_data_type == "String" :
289+ dd_json_data = dd_payload .get ("stringValue" ) or dd_payload .get ("Value" )
290+ # fmt: on
291+ else :
289292 logger .debug (
290- "Failed to extract Step Functions context from SQS/SNS event."
293+ "Datadog Lambda Python only supports extracting trace"
294+ "context from String or Binary SQS/SNS message attributes"
291295 )
292- context = propagator .extract (dd_data )
293- _dsm_set_checkpoint (dd_data , event_type , source_arn )
294- # Batch Processing does not occur for SNS events
295- if config .data_streams_enabled and event_type == "sqs" :
296- _dsm_set_context_sqs_event (event )
297- return context
298- else :
299- # Handle case where trace context is injected into attributes.AWSTraceHeader
300- # example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
301- attrs = event .get ("Records" )[0 ].get ("attributes" )
302- if attrs :
303- x_ray_header = attrs .get ("AWSTraceHeader" )
304- if x_ray_header :
305- x_ray_context = parse_xray_header (x_ray_header )
306- trace_id_parts = x_ray_context .get ("trace_id" , "" ).split ("-" )
307- if len (trace_id_parts ) > 2 and trace_id_parts [2 ].startswith (
308- DD_TRACE_JAVA_TRACE_ID_PADDING
309- ):
310- # If it starts with eight 0's padding,
311- # then this AWSTraceHeader contains Datadog injected trace context
312- logger .debug (
313- "Found dd-trace injected trace context from AWSTraceHeader"
314- )
315- return Context (
316- trace_id = int (trace_id_parts [2 ][8 :], 16 ),
317- span_id = int (x_ray_context ["parent_id" ], 16 ),
318- sampling_priority = float (x_ray_context ["sampled" ]),
319- )
320- # Still want to set a DSM checkpoint even if DSM context not propagated
321- _dsm_set_checkpoint (None , event_type , source_arn )
322- # Batch Processing does not occur for SNS events
323- if config .data_streams_enabled and event_type == "sqs" :
324- _dsm_set_context_sqs_event (event )
325- return extract_context_from_lambda_context (lambda_context )
326- except Exception as e :
327- logger .debug ("The trace extractor returned with error %s" , e )
328- # Still want to set a DSM checkpoint even if DSM context not propagated
329- _dsm_set_checkpoint (None , event_type , source_arn )
330- # Batch Processing does not occur for SNS events
331- if config .data_streams_enabled and event_type == "sqs" :
332- _dsm_set_context_sqs_event (event )
333- return extract_context_from_lambda_context (lambda_context )
334296
297+ if dd_json_data :
298+ dd_data = json .loads (dd_json_data )
299+
300+ if idx == 0 :
301+ if is_step_function_event (dd_data ):
302+ try :
303+ return extract_context_from_step_functions (
304+ dd_data , None
305+ )
306+ except Exception :
307+ logger .debug (
308+ "Failed to extract Step Functions context from SQS/SNS event."
309+ )
310+ context = propagator .extract (dd_data )
311+ if not config .data_streams_enabled :
312+ break
313+ dd_ctx = dd_data
314+ elif idx == 0 :
315+ # Handle case where trace context is injected into attributes.AWSTraceHeader
316+ # example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
317+ attrs = event .get ("Records" )[0 ].get ("attributes" )
318+ if attrs :
319+ x_ray_header = attrs .get ("AWSTraceHeader" )
320+ if x_ray_header :
321+ x_ray_context = parse_xray_header (x_ray_header )
322+ trace_id_parts = x_ray_context .get ("trace_id" , "" ).split ("-" )
323+ if len (trace_id_parts ) > 2 and trace_id_parts [2 ].startswith (
324+ DD_TRACE_JAVA_TRACE_ID_PADDING
325+ ):
326+ # If it starts with eight 0's padding,
327+ # then this AWSTraceHeader contains Datadog injected trace context
328+ logger .debug (
329+ "Found dd-trace injected trace context from AWSTraceHeader"
330+ )
331+ return Context (
332+ trace_id = int (trace_id_parts [2 ][8 :], 16 ),
333+ span_id = int (x_ray_context ["parent_id" ], 16 ),
334+ sampling_priority = float (x_ray_context ["sampled" ]),
335+ )
336+ except Exception as e :
337+ logger .debug ("The trace extractor returned with error %s" , e )
335338
336- def _extract_context_from_sqs_or_sns_record (record ):
337- # logic to deal with SNS => SQS event
338- if "body" in record :
339- body_str = record .get ("body" )
340- try :
341- body = json .loads (body_str )
342- if body .get ("Type" , "" ) == "Notification" and "TopicArn" in body :
343- logger .debug ("Found SNS message inside SQS event" )
344- record = get_first_record (create_sns_event (body ))
345- except Exception :
346- pass
347-
348- msg_attributes = record .get ("messageAttributes" )
349- if msg_attributes is None :
350- sns_record = record .get ("Sns" ) or {}
351- msg_attributes = sns_record .get ("MessageAttributes" ) or {}
352- dd_payload = msg_attributes .get ("_datadog" )
353- if dd_payload :
354- # SQS uses dataType and binaryValue/stringValue
355- # SNS uses Type and Value
356- dd_json_data = None
357- dd_json_data_type = dd_payload .get ("Type" ) or dd_payload .get ("dataType" )
358- if dd_json_data_type == "Binary" :
359- import base64
360-
361- dd_json_data = dd_payload .get ("binaryValue" ) or dd_payload .get ("Value" )
362- if dd_json_data :
363- dd_json_data = base64 .b64decode (dd_json_data )
364- elif dd_json_data_type == "String" :
365- dd_json_data = dd_payload .get ("stringValue" ) or dd_payload .get ("Value" )
366- else :
367- logger .debug (
368- "Datadog Lambda Python only supports extracting trace"
369- "context from String or Binary SQS/SNS message attributes"
370- )
339+ # Set DSM checkpoint once per record
340+ _dsm_set_checkpoint (dd_ctx , event_type , source_arn )
371341
372- if dd_json_data :
373- dd_data = json .loads (dd_json_data )
374- return dd_data
375- return None
342+ return context if context else extract_context_from_lambda_context (lambda_context )
376343
377344
378345def _extract_context_from_eventbridge_sqs_event (event ):
@@ -433,40 +400,35 @@ def extract_context_from_kinesis_event(event, lambda_context):
433400 Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
434401 """
435402 source_arn = ""
436- try :
437- record = get_first_record (event )
438- source_arn = record .get ("eventSourceARN" , "" )
439- kinesis = record .get ("kinesis" )
440- if not kinesis :
441- return extract_context_from_lambda_context (lambda_context )
442- dd_ctx = _extract_context_from_kinesis_record (kinesis )
443- if dd_ctx :
444- context = propagator .extract (dd_ctx )
445- _dsm_set_checkpoint (dd_ctx , "kinesis" , source_arn )
446- if config .data_streams_enabled :
447- _dsm_set_context_kinesis_event (event )
448- return context
449- except Exception as e :
450- logger .debug ("The trace extractor returned with error %s" , e )
451- # Still want to set a DSM checkpoint even if DSM context not propagated
452- _dsm_set_checkpoint (None , "kinesis" , source_arn )
453- if config .data_streams_enabled :
454- _dsm_set_context_kinesis_event (event )
455- return extract_context_from_lambda_context (lambda_context )
456-
457403
458- def _extract_context_from_kinesis_record (record_kinesis_data ):
459- data = record_kinesis_data .get ("data" )
460- if data :
461- import base64
462-
463- b64_bytes = data .encode ("ascii" )
464- str_bytes = base64 .b64decode (b64_bytes )
465- data_str = str_bytes .decode ("ascii" )
466- data_obj = json .loads (data_str )
467- dd_ctx = data_obj .get ("_datadog" )
468- return dd_ctx
469- return None
404+ context = None
405+ for idx , record in enumerate (event .get ("Records" , [])):
406+ dd_ctx = None
407+ try :
408+ source_arn = record .get ("eventSourceARN" , "" )
409+ kinesis = record .get ("kinesis" )
410+ if not kinesis :
411+ if idx == 0 :
412+ return extract_context_from_lambda_context (lambda_context )
413+ continue
414+ data = kinesis .get ("data" )
415+ if data :
416+ import base64
417+
418+ b64_bytes = data .encode ("ascii" )
419+ str_bytes = base64 .b64decode (b64_bytes )
420+ data_str = str_bytes .decode ("ascii" )
421+ data_obj = json .loads (data_str )
422+ dd_ctx = data_obj .get ("_datadog" )
423+ if dd_ctx :
424+ if idx == 0 :
425+ context = propagator .extract (dd_ctx )
426+ if not config .data_streams_enabled :
427+ break
428+ except Exception as e :
429+ logger .debug ("The trace extractor returned with error %s" , e )
430+ _dsm_set_checkpoint (dd_ctx , "kinesis" , source_arn )
431+ return context if context else extract_context_from_lambda_context (lambda_context )
470432
471433
472434def _deterministic_sha256_hash (s : str , part : str ) -> int :
0 commit comments