Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 125 additions & 104 deletions datadog_lambda/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,91 +248,100 @@ def extract_context_from_sqs_or_sns_event_or_context(
except Exception:
logger.debug("Failed extracting context as EventBridge to SQS.")

try:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if context is extracted from event bridge, we don't set a checkpoint. Is that expected?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tracers never inject DSM context in the case of event bridge or step functions. I'm not sure this is the PR to be adding the functionality for these event types

first_record = event.get("Records")[0]
source_arn = first_record.get("eventSourceARN", "")

# logic to deal with SNS => SQS event
if "body" in first_record:
body_str = first_record.get("body")
try:
body = json.loads(body_str)
if body.get("Type", "") == "Notification" and "TopicArn" in body:
logger.debug("Found SNS message inside SQS event")
first_record = get_first_record(create_sns_event(body))
except Exception:
pass

msg_attributes = first_record.get("messageAttributes")
if msg_attributes is None:
sns_record = first_record.get("Sns") or {}
# SNS->SQS event would extract SNS arn without this check
if event_source.equals(EventTypes.SNS):
source_arn = sns_record.get("TopicArn", "")
msg_attributes = sns_record.get("MessageAttributes") or {}
dd_payload = msg_attributes.get("_datadog")
if dd_payload:
# SQS uses dataType and binaryValue/stringValue
# SNS uses Type and Value
dd_json_data = None
dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType")
if dd_json_data_type == "Binary":
import base64
context = None
records = (

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe let's have records always be: event.get("Records", []), however, in the loop, we can break early if data streams is disabled.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if index == 0:
     do apm stuff
     if data streams is enabled:
         break

event.get("Records", [])
if config.data_streams_enabled
else [event.get("Records")[0]]
)
for idx, record in enumerate(records):
try:
source_arn = record.get("eventSourceARN", "")
dsm_data = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as bellow, dsm_data is not specific to data streams here. I would name it same as bellow, dd_ctx, or something like that.


# logic to deal with SNS => SQS event
if "body" in record:
body_str = record.get("body")
try:
body = json.loads(body_str)
if body.get("Type", "") == "Notification" and "TopicArn" in body:
logger.debug("Found SNS message inside SQS event")
record = get_first_record(create_sns_event(body))
except Exception:
pass

msg_attributes = record.get("messageAttributes")
if msg_attributes is None:
sns_record = record.get("Sns") or {}
# SNS->SQS event would extract SNS arn without this check
if event_source.equals(EventTypes.SNS):
source_arn = sns_record.get("TopicArn", "")
msg_attributes = sns_record.get("MessageAttributes") or {}
dd_payload = msg_attributes.get("_datadog")
if dd_payload:
# SQS uses dataType and binaryValue/stringValue
# SNS uses Type and Value
# fmt: off
dd_json_data = None
dd_json_data_type = dd_payload.get("Type") or dd_payload.get("dataType")
if dd_json_data_type == "Binary":
import base64

dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value")
if dd_json_data:
dd_json_data = base64.b64decode(dd_json_data)
elif dd_json_data_type == "String":
dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value")
# fmt: on
else:
logger.debug(
"Datadog Lambda Python only supports extracting trace"
"context from String or Binary SQS/SNS message attributes"
)

dd_json_data = dd_payload.get("binaryValue") or dd_payload.get("Value")
if dd_json_data:
dd_json_data = base64.b64decode(dd_json_data)
elif dd_json_data_type == "String":
dd_json_data = dd_payload.get("stringValue") or dd_payload.get("Value")
dd_data = json.loads(dd_json_data)

if is_step_function_event(dd_data):
try:
return extract_context_from_step_functions(dd_data, None)
except Exception:
logger.debug(
"Failed to extract Step Functions context from SQS/SNS event."
)
if idx == 0:
context = propagator.extract(dd_data)
dsm_data = dd_data
else:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this else, dsm_data is not set. Is that an issue?

Copy link
Contributor Author

@michael-zhao459 michael-zhao459 Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an issue. I checked in dd-trace-py and DSM never injects context into attributes.AWSTraceHeader, we can just set a checkpoint with None

logger.debug(
"Datadog Lambda Python only supports extracting trace"
"context from String or Binary SQS/SNS message attributes"
)
# Handle case where trace context is injected into attributes.AWSTraceHeader
# example:Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
attrs = event.get("Records")[0].get("attributes")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, it's accessing records[0] again. I would put this whole section in an else if idx == 0 (line 315)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
attrs = event.get("Records")[0].get("attributes")
attrs = record.get("attributes")

if attrs:
x_ray_header = attrs.get("AWSTraceHeader")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe put this logic in the extract_context I suggested above. The extract_context can take an argument: extract_from_xray?

(extract_context is probably not a great name, I let you find a better one)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I might be misinterpreting but I'm not sure we should have one function return both a Context() object and the return of a json.loads(). I ended up splitting the x-ray extractor into another helper, let me know what you think

if x_ray_header:
x_ray_context = parse_xray_header(x_ray_header)
trace_id_parts = x_ray_context.get("trace_id", "").split("-")
if len(trace_id_parts) > 2 and trace_id_parts[2].startswith(
DD_TRACE_JAVA_TRACE_ID_PADDING
):
# If it starts with eight 0's padding,
# then this AWSTraceHeader contains Datadog injected trace context
logger.debug(
"Found dd-trace injected trace context from AWSTraceHeader"
)
if idx == 0:
context = Context(
trace_id=int(trace_id_parts[2][8:], 16),
span_id=int(x_ray_context["parent_id"], 16),
sampling_priority=float(x_ray_context["sampled"]),
)
except Exception as e:
logger.debug("The trace extractor returned with error %s", e)

if dd_json_data:
dd_data = json.loads(dd_json_data)
# Set DSM checkpoint once per record
_dsm_set_checkpoint(dsm_data, event_type, source_arn)

if is_step_function_event(dd_data):
try:
return extract_context_from_step_functions(dd_data, None)
except Exception:
logger.debug(
"Failed to extract Step Functions context from SQS/SNS event."
)
context = propagator.extract(dd_data)
_dsm_set_checkpoint(dd_data, event_type, source_arn)
return context
else:
# Handle case where trace context is injected into attributes.AWSTraceHeader
# example: Root=1-654321ab-000000001234567890abcdef;Parent=0123456789abcdef;Sampled=1
attrs = event.get("Records")[0].get("attributes")
if attrs:
x_ray_header = attrs.get("AWSTraceHeader")
if x_ray_header:
x_ray_context = parse_xray_header(x_ray_header)
trace_id_parts = x_ray_context.get("trace_id", "").split("-")
if len(trace_id_parts) > 2 and trace_id_parts[2].startswith(
DD_TRACE_JAVA_TRACE_ID_PADDING
):
# If it starts with eight 0's padding,
# then this AWSTraceHeader contains Datadog injected trace context
logger.debug(
"Found dd-trace injected trace context from AWSTraceHeader"
)
return Context(
trace_id=int(trace_id_parts[2][8:], 16),
span_id=int(x_ray_context["parent_id"], 16),
sampling_priority=float(x_ray_context["sampled"]),
)
# Still want to set a DSM checkpoint even if DSM context not propagated
_dsm_set_checkpoint(None, event_type, source_arn)
return extract_context_from_lambda_context(lambda_context)
except Exception as e:
logger.debug("The trace extractor returned with error %s", e)
# Still want to set a DSM checkpoint even if DSM context not propagated
_dsm_set_checkpoint(None, event_type, source_arn)
return extract_context_from_lambda_context(lambda_context)
return context if context else extract_context_from_lambda_context(lambda_context)


def _extract_context_from_eventbridge_sqs_event(event):
Expand Down Expand Up @@ -393,30 +402,42 @@ def extract_context_from_kinesis_event(event, lambda_context):
Set a DSM checkpoint if DSM is enabled and the method for context propagation is supported.
"""
source_arn = ""
try:
record = get_first_record(event)
source_arn = record.get("eventSourceARN", "")
kinesis = record.get("kinesis")
if not kinesis:
return extract_context_from_lambda_context(lambda_context)
data = kinesis.get("data")
if data:
import base64

b64_bytes = data.encode("ascii")
str_bytes = base64.b64decode(b64_bytes)
data_str = str_bytes.decode("ascii")
data_obj = json.loads(data_str)
dd_ctx = data_obj.get("_datadog")
if dd_ctx:
context = propagator.extract(dd_ctx)
_dsm_set_checkpoint(dd_ctx, "kinesis", source_arn)
return context
except Exception as e:
logger.debug("The trace extractor returned with error %s", e)
# Still want to set a DSM checkpoint even if DSM context not propagated
_dsm_set_checkpoint(None, "kinesis", source_arn)
return extract_context_from_lambda_context(lambda_context)
records = (
[get_first_record(event)]
if not config.data_streams_enabled
else event.get("Records")
)
context = None
for idx, record in enumerate(records):
dsm_data = None

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not specific to dsm, it's dd_ctx

try:
source_arn = record.get("eventSourceARN", "")
kinesis = record.get("kinesis")
if not kinesis:
context = (
extract_context_from_lambda_context(lambda_context)
if idx == 0
else context
)
_dsm_set_checkpoint(None, "kinesis", source_arn)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are setting a checkpoint kinesis if not kinesis, I think the name kinesis is wrong, because this code is a bit confusing.

Copy link
Contributor Author

@michael-zhao459 michael-zhao459 Aug 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm. This is deep enough inside the extract function where we believe that the event source is from Kinesis from parsing beforehand. However, all AWS documentation says Kinesis lambda event should have this field. To my understanding, this check is for lambda synchronous invocations with records that match Kinesis, but doesn't actually come from a Kinesis stream. @DataDog/apm-serverless Can you help confirm why this check is here in the first place?

continue
data = kinesis.get("data")
if data:
import base64

b64_bytes = data.encode("ascii")
str_bytes = base64.b64decode(b64_bytes)
data_str = str_bytes.decode("ascii")
data_obj = json.loads(data_str)
dd_ctx = data_obj.get("_datadog")
if dd_ctx:
if idx == 0:
context = propagator.extract(dd_ctx)
dsm_data = dd_ctx
except Exception as e:
logger.debug("The trace extractor returned with error %s", e)
_dsm_set_checkpoint(dsm_data, "kinesis", source_arn)
return context if context else extract_context_from_lambda_context(lambda_context)


def _deterministic_sha256_hash(s: str, part: str) -> int:
Expand Down
Loading
Loading