From d7ce695bb8714315e154d9c353359c68f724dfb7 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Mon, 9 Jun 2025 14:49:12 -0400 Subject: [PATCH 01/26] move get dsm context logic into lambda layer code --- datadog_lambda/dsm.py | 109 ++++++++++++++++--- tests/test_dsm.py | 237 +++++++++++++++++++++++++++++++++++++++++- 2 files changed, 332 insertions(+), 14 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 427f5e47..4399b762 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -1,21 +1,33 @@ +import json +import base64 + +from ddtrace.internal.logger import get_logger from datadog_lambda import logger from datadog_lambda.trigger import EventTypes +log = get_logger(__name__) -def set_dsm_context(event, event_source): +def set_dsm_context(event, event_source): if event_source.equals(EventTypes.SQS): _dsm_set_sqs_context(event) -def _dsm_set_sqs_context(event): +def _dsm_set_context_helper( + event, service_type, arn_extractor, payload_size_calculator +): + """ + Common helper function for setting DSM context. + + Args: + event: The Lambda event containing records + service_type: The service type string (example: sqs', 'sns') + arn_extractor: Function to extract the ARN from the record + payload_size_calculator: Function to calculate payload size + """ from datadog_lambda.wrapper import format_err_with_traceback from ddtrace.internal.datastreams import data_streams_processor from ddtrace.internal.datastreams.processor import DsmPathwayCodec - from ddtrace.internal.datastreams.botocore import ( - get_datastreams_context, - calculate_sqs_payload_size, - ) records = event.get("Records") if records is None: @@ -24,15 +36,88 @@ def _dsm_set_sqs_context(event): for record in records: try: - queue_arn = record.get("eventSourceARN", "") - - contextjson = get_datastreams_context(record) - payload_size = calculate_sqs_payload_size(record) + arn = arn_extractor(record) + context_json = _get_dsm_context_from_lambda(record) + payload_size = payload_size_calculator(record, context_json) - ctx = DsmPathwayCodec.decode(contextjson, processor) + ctx = DsmPathwayCodec.decode(context_json, processor) ctx.set_checkpoint( - ["direction:in", f"topic:{queue_arn}", "type:sqs"], + ["direction:in", f"topic:{arn}", f"type:{service_type}"], payload_size=payload_size, ) except Exception as e: logger.error(format_err_with_traceback(e)) + + +def _dsm_set_sqs_context(event): + from ddtrace.internal.datastreams.botocore import calculate_sqs_payload_size + + def sqs_payload_calculator(record, context_json): + return calculate_sqs_payload_size(record) + + def sqs_arn_extractor(record): + return record.get("eventSourceARN", "") + + _dsm_set_context_helper(event, "sqs", sqs_arn_extractor, sqs_payload_calculator) + + +def _get_dsm_context_from_lambda(message): + """ + Lambda-specific message formats: + - message.messageAttributes._datadog.stringValue (SQS -> lambda) + - message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda) + - message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw) + - message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda) + - message.kinesis.data.decode()._datadog (Kinesis -> lambda) + """ + context_json = None + message_body = message + + if "kinesis" in message: + try: + kinesis_data = json.loads( + base64.b64decode(message["kinesis"]["data"]).decode() + ) + return kinesis_data.get("_datadog") + except (ValueError, TypeError, KeyError): + log.debug("Unable to parse kinesis data for lambda message") + return None + elif "Sns" in message: + message_body = message["Sns"] + else: + try: + body = message.get("body") + if body: + message_body = json.loads(body) + except (ValueError, TypeError): + log.debug("Unable to parse lambda message body as JSON, treat as non-json") + + message_attributes = message_body.get("MessageAttributes") or message_body.get( + "messageAttributes" + ) + if not message_attributes: + log.debug("DataStreams skipped lambda message: %r", message) + return None + + if "_datadog" not in message_attributes: + log.debug("DataStreams skipped lambda message: %r", message) + return None + + datadog_attr = message_attributes["_datadog"] + + if message_body.get("Type") == "Notification": + # SNS -> lambda notification + if datadog_attr.get("Type") == "Binary": + context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode()) + elif "stringValue" in datadog_attr: + # SQS -> lambda + context_json = json.loads(datadog_attr["stringValue"]) + elif "binaryValue" in datadog_attr: + # SNS -> SQS -> lambda, raw message delivery + context_json = json.loads( + base64.b64decode(datadog_attr["binaryValue"]).decode() + ) + else: + log.debug("DataStreams did not handle lambda message: %r", message) + + return context_json diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 544212d8..d43104e7 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -1,11 +1,17 @@ import unittest +import json +import base64 from unittest.mock import patch, MagicMock -from datadog_lambda.dsm import set_dsm_context, _dsm_set_sqs_context +from datadog_lambda.dsm import ( + set_dsm_context, + _dsm_set_sqs_context, + _get_dsm_context_from_lambda, +) from datadog_lambda.trigger import EventTypes, _EventSource -class TestDsmSQSContext(unittest.TestCase): +class TestDSMContext(unittest.TestCase): def setUp(self): patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context") self.mock_dsm_set_sqs_context = patcher.start() @@ -110,3 +116,230 @@ def test_sqs_multiple_records_process_each_record(self): self.assertIn(f"topic:{expected_arns[i]}", tags) self.assertIn("type:sqs", tags) self.assertEqual(kwargs["payload_size"], 100) + + +class TestGetDSMContext(unittest.TestCase): + def test_sqs_to_lambda_string_value_format(self): + """Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)""" + trace_context = { + "x-datadog-trace-id": "789123456", + "x-datadog-parent-id": "321987654", + "dd-pathway-ctx": "test-pathway-ctx", + } + + lambda_record = { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185", + }, + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps(trace_context), + "stringListValues": [], + "binaryListValues": [], + "dataType": "String", + }, + "myAttribute": { + "stringValue": "myValue", + "stringListValues": [], + "binaryListValues": [], + "dataType": "String", + }, + }, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2", + } + + result = _get_dsm_context_from_lambda(lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "789123456" + assert result["x-datadog-parent-id"] == "321987654" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_sns_to_lambda_format(self): + """Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)""" + trace_context = { + "x-datadog-trace-id": "111111111", + "x-datadog-parent-id": "222222222", + "dd-pathway-ctx": "test-pathway-ctx", + } + binary_data = base64.b64encode( + json.dumps(trace_context).encode("utf-8") + ).decode("utf-8") + + sns_lambda_record = { + "EventSource": "aws:sns", + "EventSubscriptionArn": ( + "arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012" + ), + "Sns": { + "Type": "Notification", + "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e", + "TopicArn": "arn:aws:sns:us-east-1:123456789012:sns-topic", + "Subject": "Test Subject", + "Message": "Hello from SNS!", + "Timestamp": "2023-01-01T12:00:00.000Z", + "MessageAttributes": { + "_datadog": {"Type": "Binary", "Value": binary_data} + }, + }, + } + + result = _get_dsm_context_from_lambda(sns_lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "111111111" + assert result["x-datadog-parent-id"] == "222222222" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_sns_to_sqs_to_lambda_binary_value_format(self): + """Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)""" + trace_context = { + "x-datadog-trace-id": "777666555", + "x-datadog-parent-id": "444333222", + "dd-pathway-ctx": "test-pathway-ctx", + } + binary_data = base64.b64encode( + json.dumps(trace_context).encode("utf-8") + ).decode("utf-8") + + lambda_record = { + "messageId": "test-message-id", + "receiptHandle": "test-receipt-handle", + "body": "Test message body", + "messageAttributes": { + "_datadog": {"binaryValue": binary_data, "dataType": "Binary"} + }, + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue", + } + + result = _get_dsm_context_from_lambda(lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "777666555" + assert result["x-datadog-parent-id"] == "444333222" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_sns_to_sqs_to_lambda_body_format(self): + """Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)""" + trace_context = { + "x-datadog-trace-id": "123987456", + "x-datadog-parent-id": "654321987", + "x-datadog-sampling-priority": "1", + "dd-pathway-ctx": "test-pathway-ctx", + } + + message_body = { + "Type": "Notification", + "MessageId": "test-message-id", + "Message": "Test message from SNS", + "MessageAttributes": { + "_datadog": { + "Type": "Binary", + "Value": base64.b64encode( + json.dumps(trace_context).encode("utf-8") + ).decode("utf-8"), + } + }, + } + + lambda_record = { + "messageId": "lambda-message-id", + "body": json.dumps(message_body), + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue", + } + + result = _get_dsm_context_from_lambda(lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "123987456" + assert result["x-datadog-parent-id"] == "654321987" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_kinesis_to_lambda_format(self): + """Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)""" + trace_context = { + "x-datadog-trace-id": "555444333", + "x-datadog-parent-id": "888777666", + "dd-pathway-ctx": "test-pathway-ctx", + } + + # Create the kinesis data payload + kinesis_payload = { + "_datadog": trace_context, + "actualData": "some business data", + } + encoded_kinesis_data = base64.b64encode( + json.dumps(kinesis_payload).encode("utf-8") + ).decode("utf-8") + + kinesis_lambda_record = { + "eventSource": "aws:kinesis", + "eventSourceARN": ( + "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream" + ), + "kinesis": { + "data": encoded_kinesis_data, + "partitionKey": "partition-key-1", + "sequenceNumber": ( + "49590338271490256608559692538361571095921575989136588898" + ), + }, + } + + result = _get_dsm_context_from_lambda(kinesis_lambda_record) + + assert result is not None + assert result == trace_context + assert result["x-datadog-trace-id"] == "555444333" + assert result["x-datadog-parent-id"] == "888777666" + assert result["dd-pathway-ctx"] == "test-pathway-ctx" + + def test_no_message_attributes(self): + """Test message without MessageAttributes returns None.""" + message = { + "messageId": "test-message-id", + "body": "Test message without attributes", + } + + result = _get_dsm_context_from_lambda(message) + + assert result is None + + def test_no_datadog_attribute(self): + """Test message with MessageAttributes but no _datadog attribute returns None.""" + message = { + "messageId": "test-message-id", + "body": "Test message", + "messageAttributes": { + "customAttribute": {"stringValue": "custom-value", "dataType": "String"} + }, + } + + result = _get_dsm_context_from_lambda(message) + assert result is None + + def test_empty_datadog_attribute(self): + """Test message with empty _datadog attribute returns None.""" + message = { + "messageId": "test-message-id", + "messageAttributes": {"_datadog": {}}, + } + + result = _get_dsm_context_from_lambda(message) + + assert result is None From 623f49e49aab68389b2656babb9d980189ec4263 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Mon, 9 Jun 2025 18:17:09 -0400 Subject: [PATCH 02/26] simplify PR --- datadog_lambda/dsm.py | 41 +----------- tests/test_dsm.py | 145 ------------------------------------------ 2 files changed, 2 insertions(+), 184 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 4399b762..6807899b 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -1,5 +1,4 @@ import json -import base64 from ddtrace.internal.logger import get_logger from datadog_lambda import logger @@ -65,36 +64,9 @@ def _get_dsm_context_from_lambda(message): """ Lambda-specific message formats: - message.messageAttributes._datadog.stringValue (SQS -> lambda) - - message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda) - - message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw) - - message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda) - - message.kinesis.data.decode()._datadog (Kinesis -> lambda) """ context_json = None - message_body = message - - if "kinesis" in message: - try: - kinesis_data = json.loads( - base64.b64decode(message["kinesis"]["data"]).decode() - ) - return kinesis_data.get("_datadog") - except (ValueError, TypeError, KeyError): - log.debug("Unable to parse kinesis data for lambda message") - return None - elif "Sns" in message: - message_body = message["Sns"] - else: - try: - body = message.get("body") - if body: - message_body = json.loads(body) - except (ValueError, TypeError): - log.debug("Unable to parse lambda message body as JSON, treat as non-json") - - message_attributes = message_body.get("MessageAttributes") or message_body.get( - "messageAttributes" - ) + message_attributes = message.get("messageAttributes") if not message_attributes: log.debug("DataStreams skipped lambda message: %r", message) return None @@ -105,18 +77,9 @@ def _get_dsm_context_from_lambda(message): datadog_attr = message_attributes["_datadog"] - if message_body.get("Type") == "Notification": - # SNS -> lambda notification - if datadog_attr.get("Type") == "Binary": - context_json = json.loads(base64.b64decode(datadog_attr["Value"]).decode()) - elif "stringValue" in datadog_attr: + if "stringValue" in datadog_attr: # SQS -> lambda context_json = json.loads(datadog_attr["stringValue"]) - elif "binaryValue" in datadog_attr: - # SNS -> SQS -> lambda, raw message delivery - context_json = json.loads( - base64.b64decode(datadog_attr["binaryValue"]).decode() - ) else: log.debug("DataStreams did not handle lambda message: %r", message) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index d43104e7..32103cab 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -1,6 +1,5 @@ import unittest import json -import base64 from unittest.mock import patch, MagicMock from datadog_lambda.dsm import ( @@ -165,150 +164,6 @@ def test_sqs_to_lambda_string_value_format(self): assert result["x-datadog-parent-id"] == "321987654" assert result["dd-pathway-ctx"] == "test-pathway-ctx" - def test_sns_to_lambda_format(self): - """Test format: message.Sns.MessageAttributes._datadog.Value.decode() (SNS -> lambda)""" - trace_context = { - "x-datadog-trace-id": "111111111", - "x-datadog-parent-id": "222222222", - "dd-pathway-ctx": "test-pathway-ctx", - } - binary_data = base64.b64encode( - json.dumps(trace_context).encode("utf-8") - ).decode("utf-8") - - sns_lambda_record = { - "EventSource": "aws:sns", - "EventSubscriptionArn": ( - "arn:aws:sns:us-east-1:123456789012:sns-topic:12345678-1234-1234-1234-123456789012" - ), - "Sns": { - "Type": "Notification", - "MessageId": "95df01b4-ee98-5cb9-9903-4c221d41eb5e", - "TopicArn": "arn:aws:sns:us-east-1:123456789012:sns-topic", - "Subject": "Test Subject", - "Message": "Hello from SNS!", - "Timestamp": "2023-01-01T12:00:00.000Z", - "MessageAttributes": { - "_datadog": {"Type": "Binary", "Value": binary_data} - }, - }, - } - - result = _get_dsm_context_from_lambda(sns_lambda_record) - - assert result is not None - assert result == trace_context - assert result["x-datadog-trace-id"] == "111111111" - assert result["x-datadog-parent-id"] == "222222222" - assert result["dd-pathway-ctx"] == "test-pathway-ctx" - - def test_sns_to_sqs_to_lambda_binary_value_format(self): - """Test format: message.messageAttributes._datadog.binaryValue.decode() (SNS -> SQS -> lambda, raw)""" - trace_context = { - "x-datadog-trace-id": "777666555", - "x-datadog-parent-id": "444333222", - "dd-pathway-ctx": "test-pathway-ctx", - } - binary_data = base64.b64encode( - json.dumps(trace_context).encode("utf-8") - ).decode("utf-8") - - lambda_record = { - "messageId": "test-message-id", - "receiptHandle": "test-receipt-handle", - "body": "Test message body", - "messageAttributes": { - "_datadog": {"binaryValue": binary_data, "dataType": "Binary"} - }, - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-west-2:123456789012:test-queue", - } - - result = _get_dsm_context_from_lambda(lambda_record) - - assert result is not None - assert result == trace_context - assert result["x-datadog-trace-id"] == "777666555" - assert result["x-datadog-parent-id"] == "444333222" - assert result["dd-pathway-ctx"] == "test-pathway-ctx" - - def test_sns_to_sqs_to_lambda_body_format(self): - """Test format: message.body.MessageAttributes._datadog.Value.decode() (SNS -> SQS -> lambda)""" - trace_context = { - "x-datadog-trace-id": "123987456", - "x-datadog-parent-id": "654321987", - "x-datadog-sampling-priority": "1", - "dd-pathway-ctx": "test-pathway-ctx", - } - - message_body = { - "Type": "Notification", - "MessageId": "test-message-id", - "Message": "Test message from SNS", - "MessageAttributes": { - "_datadog": { - "Type": "Binary", - "Value": base64.b64encode( - json.dumps(trace_context).encode("utf-8") - ).decode("utf-8"), - } - }, - } - - lambda_record = { - "messageId": "lambda-message-id", - "body": json.dumps(message_body), - "eventSource": "aws:sqs", - "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:sns-to-sqs-queue", - } - - result = _get_dsm_context_from_lambda(lambda_record) - - assert result is not None - assert result == trace_context - assert result["x-datadog-trace-id"] == "123987456" - assert result["x-datadog-parent-id"] == "654321987" - assert result["dd-pathway-ctx"] == "test-pathway-ctx" - - def test_kinesis_to_lambda_format(self): - """Test format: message.kinesis.data.decode()._datadog (Kinesis -> lambda)""" - trace_context = { - "x-datadog-trace-id": "555444333", - "x-datadog-parent-id": "888777666", - "dd-pathway-ctx": "test-pathway-ctx", - } - - # Create the kinesis data payload - kinesis_payload = { - "_datadog": trace_context, - "actualData": "some business data", - } - encoded_kinesis_data = base64.b64encode( - json.dumps(kinesis_payload).encode("utf-8") - ).decode("utf-8") - - kinesis_lambda_record = { - "eventSource": "aws:kinesis", - "eventSourceARN": ( - "arn:aws:kinesis:us-east-1:123456789012:stream/my-stream" - ), - "kinesis": { - "data": encoded_kinesis_data, - "partitionKey": "partition-key-1", - "sequenceNumber": ( - "49590338271490256608559692538361571095921575989136588898" - ), - }, - } - - result = _get_dsm_context_from_lambda(kinesis_lambda_record) - - assert result is not None - assert result == trace_context - assert result["x-datadog-trace-id"] == "555444333" - assert result["x-datadog-parent-id"] == "888777666" - assert result["dd-pathway-ctx"] == "test-pathway-ctx" - def test_no_message_attributes(self): """Test message without MessageAttributes returns None.""" message = { From 96e3d880ad67221af55759be5b098fb8dfb45736 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 10 Jun 2025 05:34:30 -0400 Subject: [PATCH 03/26] fixes --- datadog_lambda/dsm.py | 49 +++++++++++++++++++++---------------------- tests/test_dsm.py | 2 +- 2 files changed, 25 insertions(+), 26 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 6807899b..5976de0b 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -13,7 +13,7 @@ def set_dsm_context(event, event_source): def _dsm_set_context_helper( - event, service_type, arn_extractor, payload_size_calculator + record, service_type, arn, payload_size ): """ Common helper function for setting DSM context. @@ -21,43 +21,42 @@ def _dsm_set_context_helper( Args: event: The Lambda event containing records service_type: The service type string (example: sqs', 'sns') - arn_extractor: Function to extract the ARN from the record - payload_size_calculator: Function to calculate payload size + arn: ARN from the record + payload_size: payload size of the record """ from datadog_lambda.wrapper import format_err_with_traceback from ddtrace.internal.datastreams import data_streams_processor from ddtrace.internal.datastreams.processor import DsmPathwayCodec - records = event.get("Records") - if records is None: - return processor = data_streams_processor() - for record in records: - try: - arn = arn_extractor(record) - context_json = _get_dsm_context_from_lambda(record) - payload_size = payload_size_calculator(record, context_json) - - ctx = DsmPathwayCodec.decode(context_json, processor) - ctx.set_checkpoint( - ["direction:in", f"topic:{arn}", f"type:{service_type}"], - payload_size=payload_size, - ) - except Exception as e: - logger.error(format_err_with_traceback(e)) + try: + context_json = _get_dsm_context_from_lambda(record) + + ctx = DsmPathwayCodec.decode(context_json, processor) + ctx.set_checkpoint( + ["direction:in", f"topic:{arn}", f"type:{service_type}"], + payload_size=payload_size, + ) + except Exception as e: + logger.error(format_err_with_traceback(e)) def _dsm_set_sqs_context(event): from ddtrace.internal.datastreams.botocore import calculate_sqs_payload_size + from datadog_lambda.wrapper import format_err_with_traceback - def sqs_payload_calculator(record, context_json): - return calculate_sqs_payload_size(record) - - def sqs_arn_extractor(record): - return record.get("eventSourceARN", "") + records = event.get("Records") + if records is None: + return - _dsm_set_context_helper(event, "sqs", sqs_arn_extractor, sqs_payload_calculator) + for record in records: + try: + arn = record.get("eventSourceARN", "") + payload_size = calculate_sqs_payload_size(record) + _dsm_set_context_helper(record, "sqs", arn, payload_size) + except Exception as e: + logger.error(format_err_with_traceback(e)) def _get_dsm_context_from_lambda(message): diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 32103cab..01a2b78f 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -10,7 +10,7 @@ from datadog_lambda.trigger import EventTypes, _EventSource -class TestDSMContext(unittest.TestCase): +class TestSetDSMContext(unittest.TestCase): def setUp(self): patcher = patch("datadog_lambda.dsm._dsm_set_sqs_context") self.mock_dsm_set_sqs_context = patcher.start() From b5a711d0395159ad183ec41dfea706adf4309091 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 10 Jun 2025 07:29:31 -0400 Subject: [PATCH 04/26] fix lint --- datadog_lambda/dsm.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 5976de0b..5afe35d9 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -12,9 +12,7 @@ def set_dsm_context(event, event_source): _dsm_set_sqs_context(event) -def _dsm_set_context_helper( - record, service_type, arn, payload_size -): +def _dsm_set_context_helper(record, service_type, arn, payload_size): """ Common helper function for setting DSM context. From 7810ced68d80efccee0c5da4cf0fd2c0c54fa1bf Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 10 Jun 2025 07:33:58 -0400 Subject: [PATCH 05/26] remove redundant try catch --- datadog_lambda/dsm.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 5afe35d9..b8641395 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -42,19 +42,15 @@ def _dsm_set_context_helper(record, service_type, arn, payload_size): def _dsm_set_sqs_context(event): from ddtrace.internal.datastreams.botocore import calculate_sqs_payload_size - from datadog_lambda.wrapper import format_err_with_traceback records = event.get("Records") if records is None: return for record in records: - try: - arn = record.get("eventSourceARN", "") - payload_size = calculate_sqs_payload_size(record) - _dsm_set_context_helper(record, "sqs", arn, payload_size) - except Exception as e: - logger.error(format_err_with_traceback(e)) + arn = record.get("eventSourceARN", "") + payload_size = calculate_sqs_payload_size(record) + _dsm_set_context_helper(record, "sqs", arn, payload_size) def _get_dsm_context_from_lambda(message): From e94eb22fe08123b404aa45962f0eb8561aa1e641 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 10 Jun 2025 08:24:13 -0400 Subject: [PATCH 06/26] fixes --- datadog_lambda/dsm.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index b8641395..9567e9f5 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -12,15 +12,15 @@ def set_dsm_context(event, event_source): _dsm_set_sqs_context(event) -def _dsm_set_context_helper(record, service_type, arn, payload_size): +def _dsm_set_context_helper(service_type, arn, payload_size, context_json): """ Common helper function for setting DSM context. Args: - event: The Lambda event containing records service_type: The service type string (example: sqs', 'sns') arn: ARN from the record payload_size: payload size of the record + context_json: Datadog context for the record """ from datadog_lambda.wrapper import format_err_with_traceback from ddtrace.internal.datastreams import data_streams_processor @@ -29,8 +29,6 @@ def _dsm_set_context_helper(record, service_type, arn, payload_size): processor = data_streams_processor() try: - context_json = _get_dsm_context_from_lambda(record) - ctx = DsmPathwayCodec.decode(context_json, processor) ctx.set_checkpoint( ["direction:in", f"topic:{arn}", f"type:{service_type}"], @@ -49,8 +47,10 @@ def _dsm_set_sqs_context(event): for record in records: arn = record.get("eventSourceARN", "") - payload_size = calculate_sqs_payload_size(record) - _dsm_set_context_helper(record, "sqs", arn, payload_size) + context_json = _get_dsm_context_from_lambda(record) + payload_size = calculate_sqs_payload_size(record, context_json) + + _dsm_set_context_helper("sqs", arn, payload_size, context_json) def _get_dsm_context_from_lambda(message): From 54bedbf83a2596a6e2bb456dbe3d48b747ed8d18 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 10 Jun 2025 15:23:03 -0400 Subject: [PATCH 07/26] fixes --- datadog_lambda/dsm.py | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 9567e9f5..93fec020 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -1,10 +1,9 @@ import json +import logging -from ddtrace.internal.logger import get_logger -from datadog_lambda import logger from datadog_lambda.trigger import EventTypes -log = get_logger(__name__) +logger = logging.getLogger(__name__) def set_dsm_context(event, event_source): @@ -22,7 +21,6 @@ def _dsm_set_context_helper(service_type, arn, payload_size, context_json): payload_size: payload size of the record context_json: Datadog context for the record """ - from datadog_lambda.wrapper import format_err_with_traceback from ddtrace.internal.datastreams import data_streams_processor from ddtrace.internal.datastreams.processor import DsmPathwayCodec @@ -35,7 +33,7 @@ def _dsm_set_context_helper(service_type, arn, payload_size, context_json): payload_size=payload_size, ) except Exception as e: - logger.error(format_err_with_traceback(e)) + logger.error(f"Unable to set dsm context: {e}") def _dsm_set_sqs_context(event): @@ -61,11 +59,11 @@ def _get_dsm_context_from_lambda(message): context_json = None message_attributes = message.get("messageAttributes") if not message_attributes: - log.debug("DataStreams skipped lambda message: %r", message) + logger.debug("DataStreams skipped lambda message: %r", message) return None if "_datadog" not in message_attributes: - log.debug("DataStreams skipped lambda message: %r", message) + logger.debug("DataStreams skipped lambda message: %r", message) return None datadog_attr = message_attributes["_datadog"] @@ -74,6 +72,6 @@ def _get_dsm_context_from_lambda(message): # SQS -> lambda context_json = json.loads(datadog_attr["stringValue"]) else: - log.debug("DataStreams did not handle lambda message: %r", message) + logger.debug("DataStreams did not handle lambda message: %r", message) return context_json From 823a07f4e77ece3ad022aee969461edf50916dbb Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 11 Jun 2025 11:41:33 -0400 Subject: [PATCH 08/26] reworked to remove ddtrace dependencies --- datadog_lambda/dsm.py | 39 +++++++++------------------------------ 1 file changed, 9 insertions(+), 30 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 93fec020..6e9250f6 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -1,6 +1,5 @@ -import json import logging - +import json from datadog_lambda.trigger import EventTypes logger = logging.getLogger(__name__) @@ -11,33 +10,8 @@ def set_dsm_context(event, event_source): _dsm_set_sqs_context(event) -def _dsm_set_context_helper(service_type, arn, payload_size, context_json): - """ - Common helper function for setting DSM context. - - Args: - service_type: The service type string (example: sqs', 'sns') - arn: ARN from the record - payload_size: payload size of the record - context_json: Datadog context for the record - """ - from ddtrace.internal.datastreams import data_streams_processor - from ddtrace.internal.datastreams.processor import DsmPathwayCodec - - processor = data_streams_processor() - - try: - ctx = DsmPathwayCodec.decode(context_json, processor) - ctx.set_checkpoint( - ["direction:in", f"topic:{arn}", f"type:{service_type}"], - payload_size=payload_size, - ) - except Exception as e: - logger.error(f"Unable to set dsm context: {e}") - - def _dsm_set_sqs_context(event): - from ddtrace.internal.datastreams.botocore import calculate_sqs_payload_size + from ddtrace.data_streams import set_consume_checkpoint records = event.get("Records") if records is None: @@ -46,9 +20,14 @@ def _dsm_set_sqs_context(event): for record in records: arn = record.get("eventSourceARN", "") context_json = _get_dsm_context_from_lambda(record) - payload_size = calculate_sqs_payload_size(record, context_json) + if not context_json: + logger.debug("DataStreams skipped lambda message: %r", record) + return None + + def carrier_get(key): + return context_json.get(key) - _dsm_set_context_helper("sqs", arn, payload_size, context_json) + set_consume_checkpoint("sqs", arn, carrier_get) def _get_dsm_context_from_lambda(message): From 5356cc65acf555abf1731ccde754d382b489bf74 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 11 Jun 2025 13:56:46 -0400 Subject: [PATCH 09/26] fix --- datadog_lambda/dsm.py | 11 +++++-- tests/test_dsm.py | 67 +++++++++++++++++++++++++++++-------------- 2 files changed, 53 insertions(+), 25 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 6e9250f6..77504ef0 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -24,9 +24,7 @@ def _dsm_set_sqs_context(event): logger.debug("DataStreams skipped lambda message: %r", record) return None - def carrier_get(key): - return context_json.get(key) - + carrier_get = _create_carrier_get(context_json) set_consume_checkpoint("sqs", arn, carrier_get) @@ -54,3 +52,10 @@ def _get_dsm_context_from_lambda(message): logger.debug("DataStreams did not handle lambda message: %r", message) return context_json + + +def _create_carrier_get(context_json): + def carrier_get(key): + return context_json.get(key) + + return carrier_get diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 01a2b78f..9569d406 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -1,6 +1,6 @@ import unittest import json -from unittest.mock import patch, MagicMock +from unittest.mock import patch from datadog_lambda.dsm import ( set_dsm_context, @@ -16,24 +16,19 @@ def setUp(self): self.mock_dsm_set_sqs_context = patcher.start() self.addCleanup(patcher.stop) - patcher = patch("ddtrace.internal.datastreams.data_streams_processor") - self.mock_data_streams_processor = patcher.start() - self.addCleanup(patcher.stop) - patcher = patch("ddtrace.internal.datastreams.botocore.get_datastreams_context") self.mock_get_datastreams_context = patcher.start() self.mock_get_datastreams_context.return_value = {} self.addCleanup(patcher.stop) - patcher = patch( - "ddtrace.internal.datastreams.botocore.calculate_sqs_payload_size" - ) - self.mock_calculate_sqs_payload_size = patcher.start() - self.mock_calculate_sqs_payload_size.return_value = 100 + # Patch set_consume_checkpoint for testing DSM functionality + patcher = patch("ddtrace.data_streams.set_consume_checkpoint") + self.mock_set_consume_checkpoint = patcher.start() self.addCleanup(patcher.stop) - patcher = patch("ddtrace.internal.datastreams.processor.DsmPathwayCodec.decode") - self.mock_dsm_pathway_codec_decode = patcher.start() + # Patch _get_dsm_context_from_lambda for testing DSM context extraction + patcher = patch("datadog_lambda.dsm._get_dsm_context_from_lambda") + self.mock_get_dsm_context_from_lambda = patcher.start() self.addCleanup(patcher.stop) def test_non_sqs_event_source_does_nothing(self): @@ -56,7 +51,8 @@ def test_sqs_event_with_no_records_does_nothing(self): for event in events_with_no_records: _dsm_set_sqs_context(event) - self.mock_data_streams_processor.assert_not_called() + # Should not call set_consume_checkpoint for events without records + self.mock_set_consume_checkpoint.assert_not_called() def test_sqs_event_triggers_dsm_sqs_context(self): """Test that SQS event sources trigger the SQS-specific DSM context function""" @@ -82,39 +78,66 @@ def test_sqs_multiple_records_process_each_record(self): { "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue1", "body": "Message 1", + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps({"dd-pathway-ctx-base64": "context1"}), + "dataType": "String", + } + }, }, { "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue2", "body": "Message 2", + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps({"dd-pathway-ctx-base64": "context2"}), + "dataType": "String", + } + }, }, { "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:queue3", "body": "Message 3", + "messageAttributes": { + "_datadog": { + "stringValue": json.dumps({"dd-pathway-ctx-base64": "context3"}), + "dataType": "String", + } + }, }, ] } - mock_context = MagicMock() - self.mock_dsm_pathway_codec_decode.return_value = mock_context + self.mock_get_dsm_context_from_lambda.side_effect = [ + {"dd-pathway-ctx-base64": "context1"}, + {"dd-pathway-ctx-base64": "context2"}, + {"dd-pathway-ctx-base64": "context3"}, + ] _dsm_set_sqs_context(multi_record_event) - self.assertEqual(mock_context.set_checkpoint.call_count, 3) + self.assertEqual(self.mock_set_consume_checkpoint.call_count, 3) - calls = mock_context.set_checkpoint.call_args_list + calls = self.mock_set_consume_checkpoint.call_args_list expected_arns = [ "arn:aws:sqs:us-east-1:123456789012:queue1", "arn:aws:sqs:us-east-1:123456789012:queue2", "arn:aws:sqs:us-east-1:123456789012:queue3", ] + expected_contexts = ["context1", "context2", "context3"] for i, call in enumerate(calls): args, kwargs = call - tags = args[0] - self.assertIn("direction:in", tags) - self.assertIn(f"topic:{expected_arns[i]}", tags) - self.assertIn("type:sqs", tags) - self.assertEqual(kwargs["payload_size"], 100) + service_type = args[0] + arn = args[1] + carrier_get_func = args[2] + + self.assertEqual(service_type, "sqs") + + self.assertEqual(arn, expected_arns[i]) + + pathway_ctx = carrier_get_func("dd-pathway-ctx-base64") + self.assertEqual(pathway_ctx, expected_contexts[i]) class TestGetDSMContext(unittest.TestCase): From 45ed35fab274002822e80c4a8b5d10b1275cd5d1 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 11 Jun 2025 14:06:47 -0400 Subject: [PATCH 10/26] fix --- tests/test_dsm.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 9569d406..cb346add 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -16,11 +16,6 @@ def setUp(self): self.mock_dsm_set_sqs_context = patcher.start() self.addCleanup(patcher.stop) - patcher = patch("ddtrace.internal.datastreams.botocore.get_datastreams_context") - self.mock_get_datastreams_context = patcher.start() - self.mock_get_datastreams_context.return_value = {} - self.addCleanup(patcher.stop) - # Patch set_consume_checkpoint for testing DSM functionality patcher = patch("ddtrace.data_streams.set_consume_checkpoint") self.mock_set_consume_checkpoint = patcher.start() From f72964945f9370ed2d5d7925fc96755a5232d9b3 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 11 Jun 2025 14:38:01 -0400 Subject: [PATCH 11/26] fix --- tests/test_dsm.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index cb346add..720b191e 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -75,7 +75,9 @@ def test_sqs_multiple_records_process_each_record(self): "body": "Message 1", "messageAttributes": { "_datadog": { - "stringValue": json.dumps({"dd-pathway-ctx-base64": "context1"}), + "stringValue": json.dumps( + {"dd-pathway-ctx-base64": "context1"} + ), "dataType": "String", } }, @@ -85,7 +87,9 @@ def test_sqs_multiple_records_process_each_record(self): "body": "Message 2", "messageAttributes": { "_datadog": { - "stringValue": json.dumps({"dd-pathway-ctx-base64": "context2"}), + "stringValue": json.dumps( + {"dd-pathway-ctx-base64": "context2"} + ), "dataType": "String", } }, @@ -95,7 +99,9 @@ def test_sqs_multiple_records_process_each_record(self): "body": "Message 3", "messageAttributes": { "_datadog": { - "stringValue": json.dumps({"dd-pathway-ctx-base64": "context3"}), + "stringValue": json.dumps( + {"dd-pathway-ctx-base64": "context3"} + ), "dataType": "String", } }, From 24f6ed9f6df2ed52b643ce16998e1e4e3f3992de Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Wed, 11 Jun 2025 14:42:26 -0400 Subject: [PATCH 12/26] fix --- tests/test_dsm.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 720b191e..4d8fd2ba 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -16,12 +16,10 @@ def setUp(self): self.mock_dsm_set_sqs_context = patcher.start() self.addCleanup(patcher.stop) - # Patch set_consume_checkpoint for testing DSM functionality patcher = patch("ddtrace.data_streams.set_consume_checkpoint") self.mock_set_consume_checkpoint = patcher.start() self.addCleanup(patcher.stop) - # Patch _get_dsm_context_from_lambda for testing DSM context extraction patcher = patch("datadog_lambda.dsm._get_dsm_context_from_lambda") self.mock_get_dsm_context_from_lambda = patcher.start() self.addCleanup(patcher.stop) @@ -29,11 +27,9 @@ def setUp(self): def test_non_sqs_event_source_does_nothing(self): """Test that non-SQS event sources don't trigger DSM context setting""" event = {} - # Use Unknown Event Source event_source = _EventSource(EventTypes.UNKNOWN) set_dsm_context(event, event_source) - # DSM context should not be set for non-SQS events self.mock_dsm_set_sqs_context.assert_not_called() def test_sqs_event_with_no_records_does_nothing(self): @@ -46,7 +42,6 @@ def test_sqs_event_with_no_records_does_nothing(self): for event in events_with_no_records: _dsm_set_sqs_context(event) - # Should not call set_consume_checkpoint for events without records self.mock_set_consume_checkpoint.assert_not_called() def test_sqs_event_triggers_dsm_sqs_context(self): From 5bc4b8fabd189300ca58fd8f7dd6c6063e2b73c6 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 13 Jun 2025 10:29:44 -0400 Subject: [PATCH 13/26] fixes --- datadog_lambda/dsm.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 77504ef0..b665e8fb 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -18,14 +18,17 @@ def _dsm_set_sqs_context(event): return for record in records: - arn = record.get("eventSourceARN", "") - context_json = _get_dsm_context_from_lambda(record) - if not context_json: - logger.debug("DataStreams skipped lambda message: %r", record) - return None - - carrier_get = _create_carrier_get(context_json) - set_consume_checkpoint("sqs", arn, carrier_get) + try: + arn = record.get("eventSourceARN", "") + context_json = _get_dsm_context_from_lambda(record) + if not context_json: + logger.debug("DataStreams skipped lambda message: %r", record) + return + + carrier_get = _create_carrier_get(context_json) + set_consume_checkpoint("sqs", arn, carrier_get) + except Exception as e: + logger.error(f"Unable to set dsm context: {e}") def _get_dsm_context_from_lambda(message): From 3fa3014a2a731fc83755942e9a2d4af555c5d7c9 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Fri, 13 Jun 2025 10:42:12 -0400 Subject: [PATCH 14/26] fixes --- datadog_lambda/dsm.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index b665e8fb..2dd98c1f 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -11,24 +11,28 @@ def set_dsm_context(event, event_source): def _dsm_set_sqs_context(event): - from ddtrace.data_streams import set_consume_checkpoint - records = event.get("Records") if records is None: return for record in records: - try: - arn = record.get("eventSourceARN", "") - context_json = _get_dsm_context_from_lambda(record) - if not context_json: - logger.debug("DataStreams skipped lambda message: %r", record) - return - - carrier_get = _create_carrier_get(context_json) - set_consume_checkpoint("sqs", arn, carrier_get) - except Exception as e: - logger.error(f"Unable to set dsm context: {e}") + arn = record.get("eventSourceARN", "") + _set_dsm_context_for_record(record, "sqs", arn) + + +def _set_dsm_context_for_record(record, type, arn): + from ddtrace.data_streams import set_consume_checkpoint + + try: + context_json = _get_dsm_context_from_lambda(record) + if not context_json: + logger.debug("DataStreams skipped lambda message: %r", record) + return + + carrier_get = _create_carrier_get(context_json) + set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False) + except Exception as e: + logger.error(f"Unable to set dsm context: {e}") def _get_dsm_context_from_lambda(message): From c066b8f083b49528ed0c685bf1ac0d4fdc2e35be Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Mon, 16 Jun 2025 16:07:38 -0400 Subject: [PATCH 15/26] fix --- tests/test_dsm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 4d8fd2ba..184e6924 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -205,6 +205,7 @@ def test_no_datadog_attribute(self): } result = _get_dsm_context_from_lambda(message) + assert result is None def test_empty_datadog_attribute(self): From 235659b143ecc3dbe53a5cfb37992083b774b2fc Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Mon, 16 Jun 2025 18:57:33 -0400 Subject: [PATCH 16/26] fixes --- datadog_lambda/dsm.py | 33 +++++------ tests/test_dsm.py | 125 ++++++++++++++++++++++++++++++++++++++---- 2 files changed, 132 insertions(+), 26 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 2dd98c1f..c807c8df 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -17,28 +17,27 @@ def _dsm_set_sqs_context(event): for record in records: arn = record.get("eventSourceARN", "") - _set_dsm_context_for_record(record, "sqs", arn) + try: + context_json = _get_dsm_context_from_sqs_lambda(record) + if not context_json: + return + _set_dsm_context_for_record(context_json, "sqs", arn) + except Exception as e: + logger.error(f"Unable to set dsm context: {e}") -def _set_dsm_context_for_record(record, type, arn): - from ddtrace.data_streams import set_consume_checkpoint - try: - context_json = _get_dsm_context_from_lambda(record) - if not context_json: - logger.debug("DataStreams skipped lambda message: %r", record) - return +def _set_dsm_context_for_record(context_json, type, arn): + from ddtrace.data_streams import set_consume_checkpoint - carrier_get = _create_carrier_get(context_json) - set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False) - except Exception as e: - logger.error(f"Unable to set dsm context: {e}") + carrier_get = _create_carrier_get(context_json) + set_consume_checkpoint(type, arn, carrier_get, manual_checkpoint=False) -def _get_dsm_context_from_lambda(message): +def _get_dsm_context_from_sqs_lambda(message): """ - Lambda-specific message formats: - - message.messageAttributes._datadog.stringValue (SQS -> lambda) + Lambda-specific message shape for SQS -> Lambda: + - message.messageAttributes._datadog.stringValue """ context_json = None message_attributes = message.get("messageAttributes") @@ -53,8 +52,10 @@ def _get_dsm_context_from_lambda(message): datadog_attr = message_attributes["_datadog"] if "stringValue" in datadog_attr: - # SQS -> lambda context_json = json.loads(datadog_attr["stringValue"]) + if not isinstance(context_json, dict): + logger.debug("DataStreams did not handle lambda message: %r", message) + return None else: logger.debug("DataStreams did not handle lambda message: %r", message) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 184e6924..96bb54f6 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -5,7 +5,8 @@ from datadog_lambda.dsm import ( set_dsm_context, _dsm_set_sqs_context, - _get_dsm_context_from_lambda, + _get_dsm_context_from_sqs_lambda, + _create_carrier_get, ) from datadog_lambda.trigger import EventTypes, _EventSource @@ -20,8 +21,12 @@ def setUp(self): self.mock_set_consume_checkpoint = patcher.start() self.addCleanup(patcher.stop) - patcher = patch("datadog_lambda.dsm._get_dsm_context_from_lambda") - self.mock_get_dsm_context_from_lambda = patcher.start() + patcher = patch("datadog_lambda.dsm._get_dsm_context_from_sqs_lambda") + self.mock_get_dsm_context_from_sqs_lambda = patcher.start() + self.addCleanup(patcher.stop) + + patcher = patch("datadog_lambda.dsm.logger") + self.mock_logger = patcher.start() self.addCleanup(patcher.stop) def test_non_sqs_event_source_does_nothing(self): @@ -104,7 +109,7 @@ def test_sqs_multiple_records_process_each_record(self): ] } - self.mock_get_dsm_context_from_lambda.side_effect = [ + self.mock_get_dsm_context_from_sqs_lambda.side_effect = [ {"dd-pathway-ctx-base64": "context1"}, {"dd-pathway-ctx-base64": "context2"}, {"dd-pathway-ctx-base64": "context3"}, @@ -129,14 +134,44 @@ def test_sqs_multiple_records_process_each_record(self): carrier_get_func = args[2] self.assertEqual(service_type, "sqs") - self.assertEqual(arn, expected_arns[i]) pathway_ctx = carrier_get_func("dd-pathway-ctx-base64") + self.assertEqual(pathway_ctx, expected_contexts[i]) + def test_set_context_exception_handled(self): + """Test that exceptions in _get_dsm_context_from_sqs_lambda are properly handled""" + # Make _get_dsm_context_from_sqs_lambda raise an exception + self.mock_get_dsm_context_from_sqs_lambda.side_effect = Exception( + "JSON parsing error" + ) -class TestGetDSMContext(unittest.TestCase): + event = { + "Records": [ + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:my-queue", + "body": "Test message", + "messageAttributes": { + "_datadog": { + "stringValue": "invalid json", + "dataType": "String", + } + }, + } + ] + } + + _dsm_set_sqs_context(event) + + self.mock_logger.error.assert_called_once_with( + "Unable to set dsm context: JSON parsing error" + ) + + self.mock_set_consume_checkpoint.assert_not_called() + + +class TestGetDSMContextFromSQS(unittest.TestCase): def test_sqs_to_lambda_string_value_format(self): """Test format: message.messageAttributes._datadog.stringValue (SQS -> lambda)""" trace_context = { @@ -175,7 +210,7 @@ def test_sqs_to_lambda_string_value_format(self): "awsRegion": "us-east-2", } - result = _get_dsm_context_from_lambda(lambda_record) + result = _get_dsm_context_from_sqs_lambda(lambda_record) assert result is not None assert result == trace_context @@ -183,6 +218,37 @@ def test_sqs_to_lambda_string_value_format(self): assert result["x-datadog-parent-id"] == "321987654" assert result["dd-pathway-ctx"] == "test-pathway-ctx" + def test_sqs_record_context_not_dict(self): + """Test if that context is not a dict, get_dsm_context_from_sqs_lambda returns None""" + + message_string = { + "messageId": "test-message-id", + "messageAttributes": { + "_datadog": { + "stringValue": '"just a string"', + "dataType": "String", + } + }, + } + + result = _get_dsm_context_from_sqs_lambda(message_string) + + assert result is None + + message_array = { + "messageId": "test-message-id", + "messageAttributes": { + "_datadog": { + "stringValue": '["array", "values"]', + "dataType": "String", + } + }, + } + + result = _get_dsm_context_from_sqs_lambda(message_array) + + assert result is None + def test_no_message_attributes(self): """Test message without MessageAttributes returns None.""" message = { @@ -190,7 +256,7 @@ def test_no_message_attributes(self): "body": "Test message without attributes", } - result = _get_dsm_context_from_lambda(message) + result = _get_dsm_context_from_sqs_lambda(message) assert result is None @@ -204,7 +270,7 @@ def test_no_datadog_attribute(self): }, } - result = _get_dsm_context_from_lambda(message) + result = _get_dsm_context_from_sqs_lambda(message) assert result is None @@ -215,6 +281,45 @@ def test_empty_datadog_attribute(self): "messageAttributes": {"_datadog": {}}, } - result = _get_dsm_context_from_lambda(message) + result = _get_dsm_context_from_sqs_lambda(message) assert result is None + + +class TestCarrierGet(unittest.TestCase): + def test_carrier_get_returns_correct_values(self): + """Test that carrier_get function returns correct values from context_json""" + context_json = { + "x-datadog-trace-id": "789123456", + "x-datadog-parent-id": "321987654", + "dd-pathway-ctx": "test-pathway-ctx", + "custom-header": "custom-value", + } + + carrier_get = _create_carrier_get(context_json) + + assert carrier_get("x-datadog-trace-id") == "789123456" + assert carrier_get("x-datadog-parent-id") == "321987654" + assert carrier_get("dd-pathway-ctx") == "test-pathway-ctx" + assert carrier_get("custom-header") == "custom-value" + assert carrier_get("non-existent-key") is None + + def test_carrier_get_with_empty_context(self): + """Test carrier_get with empty context_json""" + context_json = {} + + carrier_get = _create_carrier_get(context_json) + + assert carrier_get("any-key") is None + assert carrier_get("x-datadog-trace-id") is None + + def test_carrier_get_function_closure(self): + """Test that each carrier_get function has its own closure""" + context_json_1 = {"key": "value1"} + context_json_2 = {"key": "value2"} + + carrier_get_1 = _create_carrier_get(context_json_1) + carrier_get_2 = _create_carrier_get(context_json_2) + + assert carrier_get_1("key") == "value1" + assert carrier_get_2("key") == "value2" From 34b9f451565b7e916e5f70b2cca77d860479ae06 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Mon, 16 Jun 2025 19:20:05 -0400 Subject: [PATCH 17/26] formatting --- tests/test_dsm.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 96bb54f6..240e285f 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -142,7 +142,6 @@ def test_sqs_multiple_records_process_each_record(self): def test_set_context_exception_handled(self): """Test that exceptions in _get_dsm_context_from_sqs_lambda are properly handled""" - # Make _get_dsm_context_from_sqs_lambda raise an exception self.mock_get_dsm_context_from_sqs_lambda.side_effect = Exception( "JSON parsing error" ) From ff6e3c488b0426baa6025f8c48ecc3abcd5fa473 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Mon, 16 Jun 2025 19:44:02 -0400 Subject: [PATCH 18/26] add test for try catch on checkpoint exception --- tests/test_dsm.py | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 240e285f..40ccf43a 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -141,6 +141,35 @@ def test_sqs_multiple_records_process_each_record(self): self.assertEqual(pathway_ctx, expected_contexts[i]) def test_set_context_exception_handled(self): + """Test that exceptions in set_consume_checkpoint are properly handled""" + self.mock_get_dsm_context_from_sqs_lambda.return_value = { + "dd-pathway-ctx": "test-context" + } + + self.mock_set_consume_checkpoint.side_effect = Exception("Checkpoint error") + + event = { + "Records": [ + { + "eventSourceARN": "arn:aws:sqs:us-east-1:123456789012:my-queue", + "body": "Test message", + "messageAttributes": { + "_datadog": { + "stringValue": '{"dd-pathway-ctx": "test-context"}', + "dataType": "String", + } + }, + } + ] + } + + _dsm_set_sqs_context(event) + + self.mock_logger.error.assert_called_once_with( + "Unable to set dsm context: Checkpoint error" + ) + + def test_get_context_exception_handled(self): """Test that exceptions in _get_dsm_context_from_sqs_lambda are properly handled""" self.mock_get_dsm_context_from_sqs_lambda.side_effect = Exception( "JSON parsing error" @@ -153,7 +182,7 @@ def test_set_context_exception_handled(self): "body": "Test message", "messageAttributes": { "_datadog": { - "stringValue": "invalid json", + "stringValue": "invalid json{", "dataType": "String", } }, @@ -167,8 +196,6 @@ def test_set_context_exception_handled(self): "Unable to set dsm context: JSON parsing error" ) - self.mock_set_consume_checkpoint.assert_not_called() - class TestGetDSMContextFromSQS(unittest.TestCase): def test_sqs_to_lambda_string_value_format(self): From 1a82f684e5bb88a7522a9591bb308ec9f8ead484 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 17 Jun 2025 08:22:01 -0400 Subject: [PATCH 19/26] added tests for logging if ctx_json is none --- tests/test_dsm.py | 44 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 40ccf43a..f37bb8ae 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -196,6 +196,50 @@ def test_get_context_exception_handled(self): "Unable to set dsm context: JSON parsing error" ) + def test_debug_logging_for_skipped_messages(self): + """Test debug logging for various scenarios where messages are skipped""" + + message_no_attrs = {"messageId": "test-id", "body": "test body"} + + result = _get_dsm_context_from_sqs_lambda(message_no_attrs) + + assert result is None + self.mock_logger.debug.assert_called_with( + "DataStreams skipped lambda message: %r", message_no_attrs + ) + + self.mock_logger.reset_mock() + + message_no_datadog = { + "messageId": "test-id", + "messageAttributes": { + "other_attr": {"stringValue": "value", "dataType": "String"} + }, + } + + result = _get_dsm_context_from_sqs_lambda(message_no_datadog) + + assert result is None + self.mock_logger.debug.assert_called_with( + "DataStreams skipped lambda message: %r", message_no_datadog + ) + + self.mock_logger.reset_mock() + + message_not_dict = { + "messageId": "test-id", + "messageAttributes": { + "_datadog": {"stringValue": '"just a string"', "dataType": "String"} + }, + } + + result = _get_dsm_context_from_sqs_lambda(message_not_dict) + + assert result is None + self.mock_logger.debug.assert_called_with( + "DataStreams did not handle lambda message: %r", message_not_dict + ) + class TestGetDSMContextFromSQS(unittest.TestCase): def test_sqs_to_lambda_string_value_format(self): From 8bb20536d660dbdaf590ae758fac166f7f1be92a Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 17 Jun 2025 14:14:53 -0400 Subject: [PATCH 20/26] move try outside entire block --- datadog_lambda/dsm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index c807c8df..264f7412 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -16,8 +16,8 @@ def _dsm_set_sqs_context(event): return for record in records: - arn = record.get("eventSourceARN", "") try: + arn = record.get("eventSourceARN", "") context_json = _get_dsm_context_from_sqs_lambda(record) if not context_json: return From 5f006de9dda91efbfe5b92f1e70c8445540cfe54 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 17 Jun 2025 14:15:55 -0400 Subject: [PATCH 21/26] change to continue --- datadog_lambda/dsm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 264f7412..ab718ad6 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -20,7 +20,7 @@ def _dsm_set_sqs_context(event): arn = record.get("eventSourceARN", "") context_json = _get_dsm_context_from_sqs_lambda(record) if not context_json: - return + continue _set_dsm_context_for_record(context_json, "sqs", arn) except Exception as e: From e28bac44e43c517df69869f30523c7f39f1c01e9 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 17 Jun 2025 14:17:58 -0400 Subject: [PATCH 22/26] add changes to logging --- datadog_lambda/dsm.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index ab718ad6..8392c0d7 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -42,11 +42,15 @@ def _get_dsm_context_from_sqs_lambda(message): context_json = None message_attributes = message.get("messageAttributes") if not message_attributes: - logger.debug("DataStreams skipped lambda message: %r", message) + logger.debug( + "DataStreams skipped lambda message: %r, no messageAttributes", message + ) return None if "_datadog" not in message_attributes: - logger.debug("DataStreams skipped lambda message: %r", message) + logger.debug( + "DataStreams skipped lambda message: %r, no datadog context", message + ) return None datadog_attr = message_attributes["_datadog"] @@ -54,10 +58,15 @@ def _get_dsm_context_from_sqs_lambda(message): if "stringValue" in datadog_attr: context_json = json.loads(datadog_attr["stringValue"]) if not isinstance(context_json, dict): - logger.debug("DataStreams did not handle lambda message: %r", message) + logger.debug( + "DataStreams did not handle lambda message: %r, context is not a dict", + message, + ) return None else: - logger.debug("DataStreams did not handle lambda message: %r", message) + logger.debug( + "DataStreams did not handle lambda message: %r, no dsm context", message + ) return context_json From 2509fcc7774fc0fd99f0f2b76830b5450a679d76 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 17 Jun 2025 14:22:52 -0400 Subject: [PATCH 23/26] fixed tests --- tests/test_dsm.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index f37bb8ae..6ec8bb2d 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -205,7 +205,8 @@ def test_debug_logging_for_skipped_messages(self): assert result is None self.mock_logger.debug.assert_called_with( - "DataStreams skipped lambda message: %r", message_no_attrs + "DataStreams skipped lambda message: %r, no messageAttributes", + message_no_attrs, ) self.mock_logger.reset_mock() @@ -221,7 +222,8 @@ def test_debug_logging_for_skipped_messages(self): assert result is None self.mock_logger.debug.assert_called_with( - "DataStreams skipped lambda message: %r", message_no_datadog + "DataStreams skipped lambda message: %r, no datadog context", + message_no_datadog, ) self.mock_logger.reset_mock() @@ -237,7 +239,8 @@ def test_debug_logging_for_skipped_messages(self): assert result is None self.mock_logger.debug.assert_called_with( - "DataStreams did not handle lambda message: %r", message_not_dict + "DataStreams did not handle lambda message: %r, context is not a dict", + message_not_dict, ) From ab8c871d006cfcfaf9951df4a3d3c28053d60d93 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 17 Jun 2025 14:39:08 -0400 Subject: [PATCH 24/26] moved %r to the end of message --- datadog_lambda/dsm.py | 11 +++++++---- tests/test_dsm.py | 6 +++--- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index 8392c0d7..eca48cde0 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -43,13 +43,15 @@ def _get_dsm_context_from_sqs_lambda(message): message_attributes = message.get("messageAttributes") if not message_attributes: logger.debug( - "DataStreams skipped lambda message: %r, no messageAttributes", message + "DataStreams skipped lambda message, no messageAttributes, message: %r", + message, ) return None if "_datadog" not in message_attributes: logger.debug( - "DataStreams skipped lambda message: %r, no datadog context", message + "DataStreams skipped lambda message, no datadog context, message: %r", + message, ) return None @@ -59,13 +61,14 @@ def _get_dsm_context_from_sqs_lambda(message): context_json = json.loads(datadog_attr["stringValue"]) if not isinstance(context_json, dict): logger.debug( - "DataStreams did not handle lambda message: %r, context is not a dict", + "DataStreams did not handle lambda message, context is not a dict, message: %r", message, ) return None else: logger.debug( - "DataStreams did not handle lambda message: %r, no dsm context", message + "DataStreams did not handle lambda message, no dsm context, message: %r", + message, ) return context_json diff --git a/tests/test_dsm.py b/tests/test_dsm.py index 6ec8bb2d..b0677e98 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -205,7 +205,7 @@ def test_debug_logging_for_skipped_messages(self): assert result is None self.mock_logger.debug.assert_called_with( - "DataStreams skipped lambda message: %r, no messageAttributes", + "DataStreams skipped lambda message, no messageAttributes, message: %r", message_no_attrs, ) @@ -222,7 +222,7 @@ def test_debug_logging_for_skipped_messages(self): assert result is None self.mock_logger.debug.assert_called_with( - "DataStreams skipped lambda message: %r, no datadog context", + "DataStreams skipped lambda message, no datadog context, message: %r", message_no_datadog, ) @@ -239,7 +239,7 @@ def test_debug_logging_for_skipped_messages(self): assert result is None self.mock_logger.debug.assert_called_with( - "DataStreams did not handle lambda message: %r, context is not a dict", + "DataStreams did not handle lambda message, context is not a dict, message: %r", message_not_dict, ) From 2089fa911cc2f5e37a59e72bd7f7eecdcc95ed13 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 17 Jun 2025 14:42:38 -0400 Subject: [PATCH 25/26] check datadog_attr is dict --- datadog_lambda/dsm.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/datadog_lambda/dsm.py b/datadog_lambda/dsm.py index eca48cde0..2ffe17ed 100644 --- a/datadog_lambda/dsm.py +++ b/datadog_lambda/dsm.py @@ -56,12 +56,18 @@ def _get_dsm_context_from_sqs_lambda(message): return None datadog_attr = message_attributes["_datadog"] + if not isinstance(datadog_attr, dict): + logger.debug( + "DataStreams did not handle lambda message, datadog context is not a dict, message: %r", + message, + ) + return None if "stringValue" in datadog_attr: context_json = json.loads(datadog_attr["stringValue"]) if not isinstance(context_json, dict): logger.debug( - "DataStreams did not handle lambda message, context is not a dict, message: %r", + "DataStreams did not handle lambda message, dsm context is not a dict, message: %r", message, ) return None From 24851f3fff340b2be140f0786be91be46ded8c52 Mon Sep 17 00:00:00 2001 From: Michael Zhao Date: Tue, 17 Jun 2025 14:50:23 -0400 Subject: [PATCH 26/26] fix test --- tests/test_dsm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_dsm.py b/tests/test_dsm.py index b0677e98..5f0f0bbb 100644 --- a/tests/test_dsm.py +++ b/tests/test_dsm.py @@ -239,7 +239,7 @@ def test_debug_logging_for_skipped_messages(self): assert result is None self.mock_logger.debug.assert_called_with( - "DataStreams did not handle lambda message, context is not a dict, message: %r", + "DataStreams did not handle lambda message, dsm context is not a dict, message: %r", message_not_dict, )