From e1497de92043f4cbf7840c44e288665b297625c8 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Wed, 9 Mar 2022 16:20:08 -0500 Subject: [PATCH 01/58] Change DataType to binary and Value to BinaryValue --- ddtrace/contrib/botocore/patch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index e604bb71374..5f1dd4c08c7 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -73,7 +73,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry): entry["MessageAttributes"] = {} # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["MessageAttributes"]) < 10: - entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") From 70e0924d5566e39bbc07e405186d470f4bcc02e8 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Wed, 9 Mar 2022 17:53:11 -0500 Subject: [PATCH 02/58] inject trace based on event_source --- ddtrace/contrib/botocore/patch.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 5f1dd4c08c7..6069fe1b01f 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -69,13 +69,21 @@ def inject_trace_data_to_message_attributes(trace_data, entry): Inject trace headers into the an SQS or SNS record's MessageAttributes """ - if "MessageAttributes" not in entry: - entry["MessageAttributes"] = {} - # An Amazon SQS message can contain up to 10 metadata attributes. - if len(entry["MessageAttributes"]) < 10: + event_source = entry.get("eventSource", None) + if event_source == "aws:sqs": + entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + elif event_source == "aws:sns": entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: - log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") + log.warning("skipping trace injection, eventSource is not sns or sqs") + + # if "MessageAttributes" not in entry: + # entry["MessageAttributes"] = {} + # # An Amazon SQS message can contain up to 10 metadata attributes. + # if len(entry["MessageAttributes"]) < 10: + # entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} + # else: + # log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") def inject_trace_to_sqs_or_sns_batch_message(params, span): From 58cba0445b863a4560fb43ec038435462b5aa25b Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Thu, 10 Mar 2022 12:13:17 -0500 Subject: [PATCH 03/58] seperate sns and sqs encoding logic with if statement and modify test_sns_send_message_trace_injection_with_message_attributes to new b64 sns encryption --- ddtrace/contrib/botocore/patch.py | 6 ++++-- tests/contrib/botocore/test.py | 6 +++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 6069fe1b01f..e8f03127e3c 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -69,9 +69,11 @@ def inject_trace_data_to_message_attributes(trace_data, entry): Inject trace headers into the an SQS or SNS record's MessageAttributes """ - event_source = entry.get("eventSource", None) + event_source = entry.get("eventSource", entry.get("EventSource", "")) if event_source == "aws:sqs": - entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + # An Amazon SQS message can contain up to 10 metadata attributes. + if len(entry["messageAttributes"]) < 10: + entry["essageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} elif event_source == "aws:sns": entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index fe1b158fa71..f5d19b8b044 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1068,7 +1068,11 @@ def test_sns_send_message_trace_injection_with_message_attributes(self): assert msg_str == "test" msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None - headers = json.loads(msg_attr["_datadog"]["Value"]) + msg_attr_value_in_b64 = msg_attr["_datadog"]["Value"] + base64_bytes = msg_attr_value_in_b64.encode("ascii") + message_bytes = base64.b64decode(base64_bytes) + msg_attr_value_in_str = message_bytes.decode("ascii") + headers = json.loads(msg_attr_value_in_str) assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) From 2b527607c1cabe09a2102c3db263ad28e8ced562 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Thu, 10 Mar 2022 12:15:43 -0500 Subject: [PATCH 04/58] fix typo --- ddtrace/contrib/botocore/patch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index e8f03127e3c..cf12c5b0075 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -73,7 +73,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry): if event_source == "aws:sqs": # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["messageAttributes"]) < 10: - entry["essageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} elif event_source == "aws:sns": entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: From 270f68ec335283b7aaed3b44205729b78648a960 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Thu, 10 Mar 2022 13:51:54 -0500 Subject: [PATCH 05/58] add reno release note --- .../encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml diff --git a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml new file mode 100644 index 00000000000..ea5a121eade --- /dev/null +++ b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml @@ -0,0 +1,4 @@ +--- +upgrade: + - | + Change SNS DataType->Binary and StringValue->BinaryValue to bypass policy filter bug Datadog/serverless-plugin-datadog#232. However we leave SQS message attributes as strings. From bbed931d2dc1549cdfa80ff65eb89d2acab26c0f Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 15:29:45 -0500 Subject: [PATCH 06/58] add comment explaining eventSource vs EventSource --- ddtrace/contrib/botocore/patch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index cf12c5b0075..668b3a6be8d 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -69,6 +69,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry): Inject trace headers into the an SQS or SNS record's MessageAttributes """ + # SQS uses "eventSource" while SNS uses "EventSource" event_source = entry.get("eventSource", entry.get("EventSource", "")) if event_source == "aws:sqs": # An Amazon SQS message can contain up to 10 metadata attributes. From ea547657141daf80ada7b24278d598e9f4645c8a Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 15:33:26 -0500 Subject: [PATCH 07/58] readd skipping trace injection warning when max number of msg attributes exceeded --- ddtrace/contrib/botocore/patch.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 668b3a6be8d..87de9c28466 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -75,6 +75,8 @@ def inject_trace_data_to_message_attributes(trace_data, entry): # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["messageAttributes"]) < 10: entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + else: + log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") elif event_source == "aws:sns": entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: From 3ef7345250a53036f11ef8f9daac2f2f15c59d91 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 15:57:43 -0500 Subject: [PATCH 08/58] Add comments explaining why we use String vs Binary aswell as why we would skip trace injection of a sqs record. --- ddtrace/contrib/botocore/patch.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 87de9c28466..1989f138661 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -74,10 +74,14 @@ def inject_trace_data_to_message_attributes(trace_data, entry): if event_source == "aws:sqs": # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["messageAttributes"]) < 10: + # Use String as changing this to Binary would be a breaking change as other tracers expect this to be a String. entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} else: + # In the event an SQS record has 10 or more msg attributes we cannot add our _datadog msg attribute log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") elif event_source == "aws:sns": + # Use Binary since SNS subscription filter policies fail silently with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 + # AWS will encode our value if it sees "BinaryValue" entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: log.warning("skipping trace injection, eventSource is not sns or sqs") From 6fb455f723023b637d65495f0816fd5087b0c864 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 15:59:05 -0500 Subject: [PATCH 09/58] remove commented out code --- ddtrace/contrib/botocore/patch.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 1989f138661..1095d9d652c 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -86,14 +86,6 @@ def inject_trace_data_to_message_attributes(trace_data, entry): else: log.warning("skipping trace injection, eventSource is not sns or sqs") - # if "MessageAttributes" not in entry: - # entry["MessageAttributes"] = {} - # # An Amazon SQS message can contain up to 10 metadata attributes. - # if len(entry["MessageAttributes"]) < 10: - # entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} - # else: - # log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") - def inject_trace_to_sqs_or_sns_batch_message(params, span): # type: (Any, Span) -> None From b36733855c4ee7dda2b07a4aaef0865de72565c0 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:20:18 -0500 Subject: [PATCH 10/58] make sure casing matches SNS and SQS lambda payloads --- ddtrace/contrib/botocore/patch.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 1095d9d652c..4f0cac04cd8 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -75,16 +75,16 @@ def inject_trace_data_to_message_attributes(trace_data, entry): # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["messageAttributes"]) < 10: # Use String as changing this to Binary would be a breaking change as other tracers expect this to be a String. - entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + entry["messageAttributes"]["_datadog"] = {"dataType": "String", "stringValue": json.dumps(trace_data)} else: # In the event an SQS record has 10 or more msg attributes we cannot add our _datadog msg attribute log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") elif event_source == "aws:sns": # Use Binary since SNS subscription filter policies fail silently with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 - # AWS will encode our value if it sees "BinaryValue" - entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} + # AWS will encode our value if it sees "Binary" + entry["MessageAttributes"]["_datadog"] = {"Type": "Binary", "Value": json.dumps(trace_data)} else: - log.warning("skipping trace injection, eventSource is not sns or sqs") + log.warning("skipping trace injection, eventSource is not SNS or SQS") def inject_trace_to_sqs_or_sns_batch_message(params, span): From b012b1655a76963363dfe2b9af3a3b49ac6bc2dd Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:22:56 -0500 Subject: [PATCH 11/58] eventSource to event source in warning --- ddtrace/contrib/botocore/patch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 4f0cac04cd8..3eb774288fb 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -84,7 +84,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry): # AWS will encode our value if it sees "Binary" entry["MessageAttributes"]["_datadog"] = {"Type": "Binary", "Value": json.dumps(trace_data)} else: - log.warning("skipping trace injection, eventSource is not SNS or SQS") + log.warning("skipping trace injection, event source is not SNS or SQS") def inject_trace_to_sqs_or_sns_batch_message(params, span): From 93dccf06b76b1ae947e2af75c259a7ebee7ea56b Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:23:33 -0500 Subject: [PATCH 12/58] Update releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml upgrade->fixes Co-authored-by: Brett Langdon --- .../encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml index ea5a121eade..8b7c9b66615 100644 --- a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml +++ b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml @@ -1,4 +1,4 @@ --- -upgrade: +fixes: - | Change SNS DataType->Binary and StringValue->BinaryValue to bypass policy filter bug Datadog/serverless-plugin-datadog#232. However we leave SQS message attributes as strings. From b1908dac55a9931db80ead1a65d7818150dd22f5 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:24:19 -0500 Subject: [PATCH 13/58] Update releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml Co-authored-by: Brett Langdon --- .../encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml index 8b7c9b66615..735cf29704e 100644 --- a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml +++ b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml @@ -1,4 +1,4 @@ --- fixes: - | - Change SNS DataType->Binary and StringValue->BinaryValue to bypass policy filter bug Datadog/serverless-plugin-datadog#232. However we leave SQS message attributes as strings. + botocore: fix incorrect context propagation message attribute types for SNS. This addresses `Datadog/serverless-plugin-datadog#232`_ From d6114cbf65e2e96b81b6192c5c619814f51bc68c Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:50:59 -0500 Subject: [PATCH 14/58] testing --- tests/contrib/botocore/test.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index f5d19b8b044..16105b7bb11 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1010,7 +1010,7 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self): @mock_sns @mock_sqs - def test_sns_send_message_trace_injection_with_message_attributes(self): + def test_sns_send_message_trace_injection_with_b64_message_attributes(self): sns = self.session.create_client("sns", region_name="us-east-1", endpoint_url="http://localhost:4566") sqs = self.session.create_client("sqs", region_name="us-east-1", endpoint_url="http://localhost:4566") @@ -1068,11 +1068,10 @@ def test_sns_send_message_trace_injection_with_message_attributes(self): assert msg_str == "test" msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None - msg_attr_value_in_b64 = msg_attr["_datadog"]["Value"] - base64_bytes = msg_attr_value_in_b64.encode("ascii") - message_bytes = base64.b64decode(base64_bytes) - msg_attr_value_in_str = message_bytes.decode("ascii") - headers = json.loads(msg_attr_value_in_str) + msg_attr_val = msg_attr["_datadog"]["Value"] + print("type of msg attr val") + print(type(msg_attr_val)) + headers = json.loads(b"") assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) From c14f1fb40b4e7108978138f6569baa601215ad7a Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:59:15 -0500 Subject: [PATCH 15/58] use b64decode and json.loads in test_sns_send_message_trace_injection_with_b64_message_attributes test --- tests/contrib/botocore/test.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index 16105b7bb11..199295dd896 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1069,9 +1069,8 @@ def test_sns_send_message_trace_injection_with_b64_message_attributes(self): msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None msg_attr_val = msg_attr["_datadog"]["Value"] - print("type of msg attr val") - print(type(msg_attr_val)) - headers = json.loads(b"") + msg_attr_val_b64decoded = msg_attr_val.b64decode(msg_attr_val) + headers = json.loads(msg_attr_val_b64decoded) assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) From c5048c526d96489333a22deb8d41ce373631ce6b Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 18:46:19 -0500 Subject: [PATCH 16/58] flake fix --- ddtrace/contrib/botocore/patch.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 3eb774288fb..0c2bbf5a1fe 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -74,13 +74,15 @@ def inject_trace_data_to_message_attributes(trace_data, entry): if event_source == "aws:sqs": # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["messageAttributes"]) < 10: - # Use String as changing this to Binary would be a breaking change as other tracers expect this to be a String. + # Use String as changing this to Binary would be a breaking + # change as other tracers expect this to be a String. entry["messageAttributes"]["_datadog"] = {"dataType": "String", "stringValue": json.dumps(trace_data)} else: # In the event an SQS record has 10 or more msg attributes we cannot add our _datadog msg attribute log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") elif event_source == "aws:sns": - # Use Binary since SNS subscription filter policies fail silently with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 + # Use Binary since SNS subscription filter policies fail silently + # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 # AWS will encode our value if it sees "Binary" entry["MessageAttributes"]["_datadog"] = {"Type": "Binary", "Value": json.dumps(trace_data)} else: From 814987f819aa0639c0626e97305b1a45755730d8 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Fri, 11 Mar 2022 10:17:37 -0500 Subject: [PATCH 17/58] riot run -s fmt --- ddtrace/contrib/botocore/patch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 0c2bbf5a1fe..70096a4edbb 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -74,14 +74,14 @@ def inject_trace_data_to_message_attributes(trace_data, entry): if event_source == "aws:sqs": # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["messageAttributes"]) < 10: - # Use String as changing this to Binary would be a breaking + # Use String as changing this to Binary would be a breaking # change as other tracers expect this to be a String. entry["messageAttributes"]["_datadog"] = {"dataType": "String", "stringValue": json.dumps(trace_data)} else: # In the event an SQS record has 10 or more msg attributes we cannot add our _datadog msg attribute log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") elif event_source == "aws:sns": - # Use Binary since SNS subscription filter policies fail silently + # Use Binary since SNS subscription filter policies fail silently # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 # AWS will encode our value if it sees "Binary" entry["MessageAttributes"]["_datadog"] = {"Type": "Binary", "Value": json.dumps(trace_data)} From 3703796675032c6008af46f5661cf86fcf651620 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Fri, 11 Mar 2022 16:31:25 -0500 Subject: [PATCH 18/58] reformat function logic after manual testing --- ddtrace/contrib/botocore/patch.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 70096a4edbb..fd23032c127 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -69,24 +69,22 @@ def inject_trace_data_to_message_attributes(trace_data, entry): Inject trace headers into the an SQS or SNS record's MessageAttributes """ - # SQS uses "eventSource" while SNS uses "EventSource" - event_source = entry.get("eventSource", entry.get("EventSource", "")) - if event_source == "aws:sqs": - # An Amazon SQS message can contain up to 10 metadata attributes. - if len(entry["messageAttributes"]) < 10: + if len(entry["MessageAttributes"]) < 10: + # We expect entry to either have QueueUrl or TopicArn which comes from a boto3 publish() or send_message() call + if entry.get("QueueUrl"): # Use String as changing this to Binary would be a breaking # change as other tracers expect this to be a String. - entry["messageAttributes"]["_datadog"] = {"dataType": "String", "stringValue": json.dumps(trace_data)} + entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + elif entry.get("TopicArn"): + # Use Binary since SNS subscription filter policies fail silently + # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 + # AWS will encode our value if it sees "Binary" + entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: - # In the event an SQS record has 10 or more msg attributes we cannot add our _datadog msg attribute - log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") - elif event_source == "aws:sns": - # Use Binary since SNS subscription filter policies fail silently - # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 - # AWS will encode our value if it sees "Binary" - entry["MessageAttributes"]["_datadog"] = {"Type": "Binary", "Value": json.dumps(trace_data)} + log.warning("skipping trace injection, event source is not SNS or SQS") else: - log.warning("skipping trace injection, event source is not SNS or SQS") + # In the event a record has 10 or more msg attributes we cannot add our _datadog msg attribute + log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") def inject_trace_to_sqs_or_sns_batch_message(params, span): From 4c615d3bc11d2c48545584e7bdf708ed137ef2df Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Tue, 15 Mar 2022 10:43:47 -0400 Subject: [PATCH 19/58] base64.b64decode --- tests/contrib/botocore/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index 199295dd896..c904446e587 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1069,7 +1069,7 @@ def test_sns_send_message_trace_injection_with_b64_message_attributes(self): msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None msg_attr_val = msg_attr["_datadog"]["Value"] - msg_attr_val_b64decoded = msg_attr_val.b64decode(msg_attr_val) + msg_attr_val_b64decoded = base64.b64decode(msg_attr_val) headers = json.loads(msg_attr_val_b64decoded) assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) From 1d5942e7a31dd6ef77c7afe1700fe348555c6a77 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Tue, 15 Mar 2022 16:25:31 -0400 Subject: [PATCH 20/58] distinguish between sqs and sns in joined functions by passing endpoint, fix failing test by decoding trace context, and bump localstack to fix failing tests --- ddtrace/contrib/botocore/patch.py | 38 +++++++++++++++++-------------- docker-compose.yml | 2 +- tests/contrib/botocore/test.py | 3 ++- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index fd23032c127..4367443db81 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -61,37 +61,40 @@ class TraceInjectionDecodingError(Exception): pass -def inject_trace_data_to_message_attributes(trace_data, entry): - # type: (Dict[str, str], Dict[str, Any]) -> None +def inject_trace_data_to_message_attributes(trace_data, entry, endpoint): + # type: (Dict[str, str], Dict[str, Any], endpoint) -> None """ :trace_data: trace headers to be stored in the entry's MessageAttributes :entry: an SQS or SNS record + :endpoint: endpoint of message, "sqs" or "sns" Inject trace headers into the an SQS or SNS record's MessageAttributes """ + if "MessageAttributes" not in entry: + entry["MessageAttributes"] = {} if len(entry["MessageAttributes"]) < 10: - # We expect entry to either have QueueUrl or TopicArn which comes from a boto3 publish() or send_message() call - if entry.get("QueueUrl"): - # Use String as changing this to Binary would be a breaking + if endpoint == "sqs": + # Use String since changing this to Binary would be a breaking # change as other tracers expect this to be a String. entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} - elif entry.get("TopicArn"): + elif endpoint == "sns": # Use Binary since SNS subscription filter policies fail silently # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 # AWS will encode our value if it sees "Binary" entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: - log.warning("skipping trace injection, event source is not SNS or SQS") + log.warning("skipping trace injection, endpoint is not SNS or SQS") else: # In the event a record has 10 or more msg attributes we cannot add our _datadog msg attribute log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") -def inject_trace_to_sqs_or_sns_batch_message(params, span): - # type: (Any, Span) -> None +def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint): + # type: (Any, Span, endpoint) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated + :endpoint: endpoint of message, "sqs" or "sns" Inject trace headers into MessageAttributes for all SQS or SNS records inside a batch """ @@ -103,21 +106,22 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span): # or PublishBatchRequestEntries (in case of PublishBatch). entries = params.get("Entries", params.get("PublishBatchRequestEntries", [])) for entry in entries: - inject_trace_data_to_message_attributes(trace_data, entry) + inject_trace_data_to_message_attributes(trace_data, entry, endpoint) -def inject_trace_to_sqs_or_sns_message(params, span): - # type: (Any, Span) -> None +def inject_trace_to_sqs_or_sns_message(params, span, endpoint): + # type: (Any, Span, endpoint) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated + :endpoint: endpoint of message, "sqs" or "sns" Inject trace headers into MessageAttributes for the SQS or SNS record """ trace_data = {} HTTPPropagator.inject(span.context, trace_data) - inject_trace_data_to_message_attributes(trace_data, params) + inject_trace_data_to_message_attributes(trace_data, params, endpoint) def inject_trace_to_eventbridge_detail(params, span): @@ -302,17 +306,17 @@ def patched_api_call(original_func, instance, args, kwargs): if endpoint_name == "lambda" and operation == "Invoke": inject_trace_to_client_context(params, span) if endpoint_name == "sqs" and operation == "SendMessage": - inject_trace_to_sqs_or_sns_message(params, span) + inject_trace_to_sqs_or_sns_message(params, span, "sqs") if endpoint_name == "sqs" and operation == "SendMessageBatch": - inject_trace_to_sqs_or_sns_batch_message(params, span) + inject_trace_to_sqs_or_sns_batch_message(params, span, "sqs") if endpoint_name == "events" and operation == "PutEvents": inject_trace_to_eventbridge_detail(params, span) if endpoint_name == "kinesis" and (operation == "PutRecord" or operation == "PutRecords"): inject_trace_to_kinesis_stream(params, span) if endpoint_name == "sns" and operation == "Publish": - inject_trace_to_sqs_or_sns_message(params, span) + inject_trace_to_sqs_or_sns_message(params, span, "sns") if endpoint_name == "sns" and operation == "PublishBatch": - inject_trace_to_sqs_or_sns_batch_message(params, span) + inject_trace_to_sqs_or_sns_batch_message(params, span, "sns") except Exception: log.warning("Unable to inject trace context", exc_info=True) diff --git a/docker-compose.yml b/docker-compose.yml index c097d40e88d..c17fff6bb02 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -132,7 +132,7 @@ services: - ./.ddriot:/root/project/.riot localstack: - image: localstack/localstack:0.13.1 + image: localstack/localstack:0.14.1 network_mode: bridge ports: - "127.0.0.1:4566:4566" diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index c904446e587..4142412f0a1 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1003,7 +1003,8 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self): assert msg_str == "test" msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None - headers = json.loads(msg_attr["_datadog"]["Value"]) + datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"]) + headers = json.loads(datadog_value_decoded) assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) From 94536d20646aac632cfaa9962616b7fef6e4b112 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Tue, 15 Mar 2022 17:15:07 -0400 Subject: [PATCH 21/58] delete duplicate test --- tests/contrib/botocore/test.py | 67 ---------------------------------- 1 file changed, 67 deletions(-) diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index 4142412f0a1..a464f2c6aef 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1009,73 +1009,6 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self): assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) - @mock_sns - @mock_sqs - def test_sns_send_message_trace_injection_with_b64_message_attributes(self): - sns = self.session.create_client("sns", region_name="us-east-1", endpoint_url="http://localhost:4566") - sqs = self.session.create_client("sqs", region_name="us-east-1", endpoint_url="http://localhost:4566") - - topic = sns.create_topic(Name="testTopic") - queue = sqs.create_queue(QueueName="test") - - topic_arn = topic["TopicArn"] - sqs_url = queue["QueueUrl"] - sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=sqs_url) - - Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sns) - - message_attributes = { - "one": {"DataType": "String", "StringValue": "one"}, - "two": {"DataType": "String", "StringValue": "two"}, - "three": {"DataType": "String", "StringValue": "three"}, - "four": {"DataType": "String", "StringValue": "four"}, - "five": {"DataType": "String", "StringValue": "five"}, - "six": {"DataType": "String", "StringValue": "six"}, - "seven": {"DataType": "String", "StringValue": "seven"}, - "eight": {"DataType": "String", "StringValue": "eight"}, - "nine": {"DataType": "String", "StringValue": "nine"}, - } - - sns.publish(TopicArn=topic_arn, Message="test", MessageAttributes=message_attributes) - spans = self.get_spans() - - # get SNS messages via SQS - response = sqs.receive_message(QueueUrl=queue["QueueUrl"], MessageAttributeNames=["_datadog"]) - - # clean up resources - sqs.delete_queue(QueueUrl=sqs_url) - sns.delete_topic(TopicArn=topic_arn) - - # check if the appropriate span was generated - assert spans - span = spans[0] - assert len(spans) == 2 - assert span.get_tag("aws.region") == "us-east-1" - assert span.get_tag("aws.operation") == "Publish" - assert span.get_tag("params.MessageBody") is None - assert_is_measured(span) - assert_span_http_status_code(span, 200) - assert span.service == "test-botocore-tracing.sns" - assert span.resource == "sns.publish" - trace_json = span.get_tag("params.MessageAttributes._datadog.StringValue") - assert trace_json is None - - # receive message using SQS and ensure headers are present - assert len(response["Messages"]) == 1 - msg = response["Messages"][0] - assert msg is not None - msg_body = json.loads(msg["Body"]) - msg_str = msg_body["Message"] - assert msg_str == "test" - msg_attr = msg_body["MessageAttributes"] - assert msg_attr.get("_datadog") is not None - msg_attr_val = msg_attr["_datadog"]["Value"] - msg_attr_val_b64decoded = base64.b64decode(msg_attr_val) - headers = json.loads(msg_attr_val_b64decoded) - assert headers is not None - assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) - assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) - @mock_sns @mock_sqs def test_sns_send_message_trace_injection_with_max_message_attributes(self): From 4235353e3384caf1a9a56074dd966c0f2f06ec71 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Wed, 16 Mar 2022 18:02:27 -0400 Subject: [PATCH 22/58] add msg attribute comment --- ddtrace/contrib/botocore/patch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 4367443db81..902102b2598 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -72,6 +72,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry, endpoint): """ if "MessageAttributes" not in entry: entry["MessageAttributes"] = {} + # Max of 10 message attributes. if len(entry["MessageAttributes"]) < 10: if endpoint == "sqs": # Use String since changing this to Binary would be a breaking From e5ec5227e22287cda532596af14bb0c303d4710d Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Thu, 17 Mar 2022 12:49:29 -0400 Subject: [PATCH 23/58] address Brett's comments, fix type annotations, bring back accidentally deleted test, and assert msg_attr[_datadog][Type] is Binary --- ddtrace/contrib/botocore/patch.py | 14 +++---- tests/contrib/botocore/test.py | 68 +++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 7 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 4367443db81..a2aa5e9c22e 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -62,7 +62,7 @@ class TraceInjectionDecodingError(Exception): def inject_trace_data_to_message_attributes(trace_data, entry, endpoint): - # type: (Dict[str, str], Dict[str, Any], endpoint) -> None + # type: (Dict[str, str], Dict[str, Any], Union[Literal["sqs"], Literal["sns"]]) -> None """ :trace_data: trace headers to be stored in the entry's MessageAttributes :entry: an SQS or SNS record @@ -90,7 +90,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry, endpoint): def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint): - # type: (Any, Span, endpoint) -> None + # type: (Any, Span, Union[Literal["sqs"], Literal["sns"]]) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated @@ -110,7 +110,7 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint): def inject_trace_to_sqs_or_sns_message(params, span, endpoint): - # type: (Any, Span, endpoint) -> None + # type: (Any, Span, Union[Literal["sqs"], Literal["sns"]]) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated @@ -306,17 +306,17 @@ def patched_api_call(original_func, instance, args, kwargs): if endpoint_name == "lambda" and operation == "Invoke": inject_trace_to_client_context(params, span) if endpoint_name == "sqs" and operation == "SendMessage": - inject_trace_to_sqs_or_sns_message(params, span, "sqs") + inject_trace_to_sqs_or_sns_message(params, span, endpoint_name) if endpoint_name == "sqs" and operation == "SendMessageBatch": - inject_trace_to_sqs_or_sns_batch_message(params, span, "sqs") + inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name) if endpoint_name == "events" and operation == "PutEvents": inject_trace_to_eventbridge_detail(params, span) if endpoint_name == "kinesis" and (operation == "PutRecord" or operation == "PutRecords"): inject_trace_to_kinesis_stream(params, span) if endpoint_name == "sns" and operation == "Publish": - inject_trace_to_sqs_or_sns_message(params, span, "sns") + inject_trace_to_sqs_or_sns_message(params, span, endpoint_name) if endpoint_name == "sns" and operation == "PublishBatch": - inject_trace_to_sqs_or_sns_batch_message(params, span, "sns") + inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name) except Exception: log.warning("Unable to inject trace context", exc_info=True) diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index a464f2c6aef..873a7d6d48c 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1003,6 +1003,74 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self): assert msg_str == "test" msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None + assert msg_attr["_datadog"]["Type"] == "Binary" + datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"]) + headers = json.loads(datadog_value_decoded) + assert headers is not None + assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) + assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) + + @mock_sns + @mock_sqs + def test_sns_send_message_trace_injection_with_message_attributes(self): + sns = self.session.create_client("sns", region_name="us-east-1", endpoint_url="http://localhost:4566") + sqs = self.session.create_client("sqs", region_name="us-east-1", endpoint_url="http://localhost:4566") + + topic = sns.create_topic(Name="testTopic") + queue = sqs.create_queue(QueueName="test") + + topic_arn = topic["TopicArn"] + sqs_url = queue["QueueUrl"] + sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=sqs_url) + + Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sns) + + message_attributes = { + "one": {"DataType": "String", "StringValue": "one"}, + "two": {"DataType": "String", "StringValue": "two"}, + "three": {"DataType": "String", "StringValue": "three"}, + "four": {"DataType": "String", "StringValue": "four"}, + "five": {"DataType": "String", "StringValue": "five"}, + "six": {"DataType": "String", "StringValue": "six"}, + "seven": {"DataType": "String", "StringValue": "seven"}, + "eight": {"DataType": "String", "StringValue": "eight"}, + "nine": {"DataType": "String", "StringValue": "nine"}, + } + + sns.publish(TopicArn=topic_arn, Message="test", MessageAttributes=message_attributes) + spans = self.get_spans() + + # get SNS messages via SQS + response = sqs.receive_message(QueueUrl=queue["QueueUrl"], MessageAttributeNames=["_datadog"]) + + # clean up resources + sqs.delete_queue(QueueUrl=sqs_url) + sns.delete_topic(TopicArn=topic_arn) + + # check if the appropriate span was generated + assert spans + span = spans[0] + assert len(spans) == 2 + assert span.get_tag("aws.region") == "us-east-1" + assert span.get_tag("aws.operation") == "Publish" + assert span.get_tag("params.MessageBody") is None + assert_is_measured(span) + assert_span_http_status_code(span, 200) + assert span.service == "test-botocore-tracing.sns" + assert span.resource == "sns.publish" + trace_json = span.get_tag("params.MessageAttributes._datadog.StringValue") + assert trace_json is None + + # receive message using SQS and ensure headers are present + assert len(response["Messages"]) == 1 + msg = response["Messages"][0] + assert msg is not None + msg_body = json.loads(msg["Body"]) + msg_str = msg_body["Message"] + assert msg_str == "test" + msg_attr = msg_body["MessageAttributes"] + assert msg_attr.get("_datadog") is not None + assert msg_attr["_datadog"]["Type"] == "Binary" datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"]) headers = json.loads(datadog_value_decoded) assert headers is not None From 77afe63a59d892cd7751f183f242491e21207ced Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Thu, 17 Mar 2022 14:12:46 -0400 Subject: [PATCH 24/58] Give endpoint None default value and update type annotation --- ddtrace/contrib/botocore/patch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 0e72e21fe9d..60fde7e77a6 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -61,8 +61,8 @@ class TraceInjectionDecodingError(Exception): pass -def inject_trace_data_to_message_attributes(trace_data, entry, endpoint): - # type: (Dict[str, str], Dict[str, Any], Union[Literal["sqs"], Literal["sns"]]) -> None +def inject_trace_data_to_message_attributes(trace_data, entry, endpoint=None): + # type: (Dict[str, str], Dict[str, Any], Optional[Union[Literal["sqs"], Literal["sns"]]]) -> None """ :trace_data: trace headers to be stored in the entry's MessageAttributes :entry: an SQS or SNS record From 15da13b1337113ba42de175bfc26d027be76bd1c Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Fri, 18 Mar 2022 09:55:29 -0400 Subject: [PATCH 25/58] make endpoint optional in all functions where endpoint is an arg. Update type annotation with optional. --- ddtrace/contrib/botocore/patch.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 60fde7e77a6..2b2cfa705bd 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -90,8 +90,8 @@ def inject_trace_data_to_message_attributes(trace_data, entry, endpoint=None): log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") -def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint): - # type: (Any, Span, Union[Literal["sqs"], Literal["sns"]]) -> None +def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint=None): + # type: (Any, Span, Optional[Union[Literal["sqs"], Literal["sns"]]]) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated @@ -110,8 +110,8 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint): inject_trace_data_to_message_attributes(trace_data, entry, endpoint) -def inject_trace_to_sqs_or_sns_message(params, span, endpoint): - # type: (Any, Span, Union[Literal["sqs"], Literal["sns"]]) -> None +def inject_trace_to_sqs_or_sns_message(params, span, endpoint=None): + # type: (Any, Span, Optional[Union[Literal["sqs"], Literal["sns"]]]) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated From 9a6ffc1bf4b4e23fd559d75903640a23245e88ef Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Fri, 18 Mar 2022 14:59:37 -0400 Subject: [PATCH 26/58] add typing Union and Literal imports --- ddtrace/contrib/botocore/patch.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 2b2cfa705bd..c0b5137a55e 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -9,6 +9,8 @@ from typing import Dict from typing import List from typing import Optional +from typing import Literal +from typing import Union import botocore.client From bd91052678d6549b5b90c16f50e7da1cb488bad6 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Fri, 18 Mar 2022 15:38:25 -0400 Subject: [PATCH 27/58] alphabetize imports --- ddtrace/contrib/botocore/patch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index c0b5137a55e..c7e51d107b2 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -8,8 +8,8 @@ from typing import Any from typing import Dict from typing import List -from typing import Optional from typing import Literal +from typing import Optional from typing import Union import botocore.client From 204c5703631574d3e22534da97213fe31c30b052 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Wed, 9 Mar 2022 16:20:08 -0500 Subject: [PATCH 28/58] Change DataType to binary and Value to BinaryValue --- ddtrace/contrib/botocore/patch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index e604bb71374..5f1dd4c08c7 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -73,7 +73,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry): entry["MessageAttributes"] = {} # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["MessageAttributes"]) < 10: - entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") From a3e1f48240bbb9108b72dd2d9fec2326d2f589a5 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Wed, 9 Mar 2022 17:53:11 -0500 Subject: [PATCH 29/58] inject trace based on event_source --- ddtrace/contrib/botocore/patch.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 5f1dd4c08c7..6069fe1b01f 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -69,13 +69,21 @@ def inject_trace_data_to_message_attributes(trace_data, entry): Inject trace headers into the an SQS or SNS record's MessageAttributes """ - if "MessageAttributes" not in entry: - entry["MessageAttributes"] = {} - # An Amazon SQS message can contain up to 10 metadata attributes. - if len(entry["MessageAttributes"]) < 10: + event_source = entry.get("eventSource", None) + if event_source == "aws:sqs": + entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + elif event_source == "aws:sns": entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: - log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") + log.warning("skipping trace injection, eventSource is not sns or sqs") + + # if "MessageAttributes" not in entry: + # entry["MessageAttributes"] = {} + # # An Amazon SQS message can contain up to 10 metadata attributes. + # if len(entry["MessageAttributes"]) < 10: + # entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} + # else: + # log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") def inject_trace_to_sqs_or_sns_batch_message(params, span): From bf10e809b4a0e4affecd9c080ae43f44e72ef961 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Thu, 10 Mar 2022 12:13:17 -0500 Subject: [PATCH 30/58] seperate sns and sqs encoding logic with if statement and modify test_sns_send_message_trace_injection_with_message_attributes to new b64 sns encryption --- ddtrace/contrib/botocore/patch.py | 6 ++++-- tests/contrib/botocore/test.py | 6 +++++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 6069fe1b01f..e8f03127e3c 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -69,9 +69,11 @@ def inject_trace_data_to_message_attributes(trace_data, entry): Inject trace headers into the an SQS or SNS record's MessageAttributes """ - event_source = entry.get("eventSource", None) + event_source = entry.get("eventSource", entry.get("EventSource", "")) if event_source == "aws:sqs": - entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + # An Amazon SQS message can contain up to 10 metadata attributes. + if len(entry["messageAttributes"]) < 10: + entry["essageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} elif event_source == "aws:sns": entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index fe1b158fa71..f5d19b8b044 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1068,7 +1068,11 @@ def test_sns_send_message_trace_injection_with_message_attributes(self): assert msg_str == "test" msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None - headers = json.loads(msg_attr["_datadog"]["Value"]) + msg_attr_value_in_b64 = msg_attr["_datadog"]["Value"] + base64_bytes = msg_attr_value_in_b64.encode("ascii") + message_bytes = base64.b64decode(base64_bytes) + msg_attr_value_in_str = message_bytes.decode("ascii") + headers = json.loads(msg_attr_value_in_str) assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) From 7040ecc2cd6fb8813303c8edda94e274e8b417a6 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Thu, 10 Mar 2022 12:15:43 -0500 Subject: [PATCH 31/58] fix typo --- ddtrace/contrib/botocore/patch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index e8f03127e3c..cf12c5b0075 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -73,7 +73,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry): if event_source == "aws:sqs": # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["messageAttributes"]) < 10: - entry["essageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} elif event_source == "aws:sns": entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: From 31a5f5037ec4e19c67ee3dbc9e6a56ddf1614693 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Thu, 10 Mar 2022 13:51:54 -0500 Subject: [PATCH 32/58] add reno release note --- .../encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml diff --git a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml new file mode 100644 index 00000000000..ea5a121eade --- /dev/null +++ b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml @@ -0,0 +1,4 @@ +--- +upgrade: + - | + Change SNS DataType->Binary and StringValue->BinaryValue to bypass policy filter bug Datadog/serverless-plugin-datadog#232. However we leave SQS message attributes as strings. From 0b3193592b008638395f2ac16fdfc2edd208e536 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 15:29:45 -0500 Subject: [PATCH 33/58] add comment explaining eventSource vs EventSource --- ddtrace/contrib/botocore/patch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index cf12c5b0075..668b3a6be8d 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -69,6 +69,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry): Inject trace headers into the an SQS or SNS record's MessageAttributes """ + # SQS uses "eventSource" while SNS uses "EventSource" event_source = entry.get("eventSource", entry.get("EventSource", "")) if event_source == "aws:sqs": # An Amazon SQS message can contain up to 10 metadata attributes. From 081caf10a36f40329ad671083041267037893e1f Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 15:33:26 -0500 Subject: [PATCH 34/58] readd skipping trace injection warning when max number of msg attributes exceeded --- ddtrace/contrib/botocore/patch.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 668b3a6be8d..87de9c28466 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -75,6 +75,8 @@ def inject_trace_data_to_message_attributes(trace_data, entry): # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["messageAttributes"]) < 10: entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + else: + log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") elif event_source == "aws:sns": entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: From c7d957b5874adff696fe9681236b1dde7f8a8fc9 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 15:57:43 -0500 Subject: [PATCH 35/58] Add comments explaining why we use String vs Binary aswell as why we would skip trace injection of a sqs record. --- ddtrace/contrib/botocore/patch.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 87de9c28466..1989f138661 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -74,10 +74,14 @@ def inject_trace_data_to_message_attributes(trace_data, entry): if event_source == "aws:sqs": # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["messageAttributes"]) < 10: + # Use String as changing this to Binary would be a breaking change as other tracers expect this to be a String. entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} else: + # In the event an SQS record has 10 or more msg attributes we cannot add our _datadog msg attribute log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") elif event_source == "aws:sns": + # Use Binary since SNS subscription filter policies fail silently with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 + # AWS will encode our value if it sees "BinaryValue" entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: log.warning("skipping trace injection, eventSource is not sns or sqs") From 34630b2ce587352dc5629034d1d83651ee2730c9 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 15:59:05 -0500 Subject: [PATCH 36/58] remove commented out code --- ddtrace/contrib/botocore/patch.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 1989f138661..1095d9d652c 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -86,14 +86,6 @@ def inject_trace_data_to_message_attributes(trace_data, entry): else: log.warning("skipping trace injection, eventSource is not sns or sqs") - # if "MessageAttributes" not in entry: - # entry["MessageAttributes"] = {} - # # An Amazon SQS message can contain up to 10 metadata attributes. - # if len(entry["MessageAttributes"]) < 10: - # entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} - # else: - # log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") - def inject_trace_to_sqs_or_sns_batch_message(params, span): # type: (Any, Span) -> None From 81c535abc4480cd6f825f385897e4a48cc53f8be Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:20:18 -0500 Subject: [PATCH 37/58] make sure casing matches SNS and SQS lambda payloads --- ddtrace/contrib/botocore/patch.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 1095d9d652c..4f0cac04cd8 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -75,16 +75,16 @@ def inject_trace_data_to_message_attributes(trace_data, entry): # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["messageAttributes"]) < 10: # Use String as changing this to Binary would be a breaking change as other tracers expect this to be a String. - entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + entry["messageAttributes"]["_datadog"] = {"dataType": "String", "stringValue": json.dumps(trace_data)} else: # In the event an SQS record has 10 or more msg attributes we cannot add our _datadog msg attribute log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") elif event_source == "aws:sns": # Use Binary since SNS subscription filter policies fail silently with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 - # AWS will encode our value if it sees "BinaryValue" - entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} + # AWS will encode our value if it sees "Binary" + entry["MessageAttributes"]["_datadog"] = {"Type": "Binary", "Value": json.dumps(trace_data)} else: - log.warning("skipping trace injection, eventSource is not sns or sqs") + log.warning("skipping trace injection, eventSource is not SNS or SQS") def inject_trace_to_sqs_or_sns_batch_message(params, span): From bdba5de140dd3834e6476a94fffc502e288389a1 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:22:56 -0500 Subject: [PATCH 38/58] eventSource to event source in warning --- ddtrace/contrib/botocore/patch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 4f0cac04cd8..3eb774288fb 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -84,7 +84,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry): # AWS will encode our value if it sees "Binary" entry["MessageAttributes"]["_datadog"] = {"Type": "Binary", "Value": json.dumps(trace_data)} else: - log.warning("skipping trace injection, eventSource is not SNS or SQS") + log.warning("skipping trace injection, event source is not SNS or SQS") def inject_trace_to_sqs_or_sns_batch_message(params, span): From 13e9491e49faf3818266f2bdbcf5c5adb6c7d88b Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:23:33 -0500 Subject: [PATCH 39/58] Update releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml upgrade->fixes Co-authored-by: Brett Langdon --- .../encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml index ea5a121eade..8b7c9b66615 100644 --- a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml +++ b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml @@ -1,4 +1,4 @@ --- -upgrade: +fixes: - | Change SNS DataType->Binary and StringValue->BinaryValue to bypass policy filter bug Datadog/serverless-plugin-datadog#232. However we leave SQS message attributes as strings. From 753713adb45688cea61770f3a53fce7596ceae99 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:24:19 -0500 Subject: [PATCH 40/58] Update releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml Co-authored-by: Brett Langdon --- .../encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml index 8b7c9b66615..735cf29704e 100644 --- a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml +++ b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml @@ -1,4 +1,4 @@ --- fixes: - | - Change SNS DataType->Binary and StringValue->BinaryValue to bypass policy filter bug Datadog/serverless-plugin-datadog#232. However we leave SQS message attributes as strings. + botocore: fix incorrect context propagation message attribute types for SNS. This addresses `Datadog/serverless-plugin-datadog#232`_ From 36a4631a5287ebb10acd451fb1b64a0e3186f7db Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:50:59 -0500 Subject: [PATCH 41/58] testing --- tests/contrib/botocore/test.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index f5d19b8b044..16105b7bb11 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1010,7 +1010,7 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self): @mock_sns @mock_sqs - def test_sns_send_message_trace_injection_with_message_attributes(self): + def test_sns_send_message_trace_injection_with_b64_message_attributes(self): sns = self.session.create_client("sns", region_name="us-east-1", endpoint_url="http://localhost:4566") sqs = self.session.create_client("sqs", region_name="us-east-1", endpoint_url="http://localhost:4566") @@ -1068,11 +1068,10 @@ def test_sns_send_message_trace_injection_with_message_attributes(self): assert msg_str == "test" msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None - msg_attr_value_in_b64 = msg_attr["_datadog"]["Value"] - base64_bytes = msg_attr_value_in_b64.encode("ascii") - message_bytes = base64.b64decode(base64_bytes) - msg_attr_value_in_str = message_bytes.decode("ascii") - headers = json.loads(msg_attr_value_in_str) + msg_attr_val = msg_attr["_datadog"]["Value"] + print("type of msg attr val") + print(type(msg_attr_val)) + headers = json.loads(b"") assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) From 0a3adcb2782e9c1a501ceadcdd74746a35a9ccf2 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 16:59:15 -0500 Subject: [PATCH 42/58] use b64decode and json.loads in test_sns_send_message_trace_injection_with_b64_message_attributes test --- tests/contrib/botocore/test.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index 16105b7bb11..199295dd896 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1069,9 +1069,8 @@ def test_sns_send_message_trace_injection_with_b64_message_attributes(self): msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None msg_attr_val = msg_attr["_datadog"]["Value"] - print("type of msg attr val") - print(type(msg_attr_val)) - headers = json.loads(b"") + msg_attr_val_b64decoded = msg_attr_val.b64decode(msg_attr_val) + headers = json.loads(msg_attr_val_b64decoded) assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) From 48f5e772539b16d8d5e0b2789bca38989bad79de Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Thu, 10 Mar 2022 18:46:19 -0500 Subject: [PATCH 43/58] flake fix --- ddtrace/contrib/botocore/patch.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 3eb774288fb..0c2bbf5a1fe 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -74,13 +74,15 @@ def inject_trace_data_to_message_attributes(trace_data, entry): if event_source == "aws:sqs": # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["messageAttributes"]) < 10: - # Use String as changing this to Binary would be a breaking change as other tracers expect this to be a String. + # Use String as changing this to Binary would be a breaking + # change as other tracers expect this to be a String. entry["messageAttributes"]["_datadog"] = {"dataType": "String", "stringValue": json.dumps(trace_data)} else: # In the event an SQS record has 10 or more msg attributes we cannot add our _datadog msg attribute log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") elif event_source == "aws:sns": - # Use Binary since SNS subscription filter policies fail silently with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 + # Use Binary since SNS subscription filter policies fail silently + # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 # AWS will encode our value if it sees "Binary" entry["MessageAttributes"]["_datadog"] = {"Type": "Binary", "Value": json.dumps(trace_data)} else: From ab2e091b40998de4caad5f17c4805f25dbcc457d Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Fri, 11 Mar 2022 10:17:37 -0500 Subject: [PATCH 44/58] riot run -s fmt --- ddtrace/contrib/botocore/patch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 0c2bbf5a1fe..70096a4edbb 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -74,14 +74,14 @@ def inject_trace_data_to_message_attributes(trace_data, entry): if event_source == "aws:sqs": # An Amazon SQS message can contain up to 10 metadata attributes. if len(entry["messageAttributes"]) < 10: - # Use String as changing this to Binary would be a breaking + # Use String as changing this to Binary would be a breaking # change as other tracers expect this to be a String. entry["messageAttributes"]["_datadog"] = {"dataType": "String", "stringValue": json.dumps(trace_data)} else: # In the event an SQS record has 10 or more msg attributes we cannot add our _datadog msg attribute log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") elif event_source == "aws:sns": - # Use Binary since SNS subscription filter policies fail silently + # Use Binary since SNS subscription filter policies fail silently # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 # AWS will encode our value if it sees "Binary" entry["MessageAttributes"]["_datadog"] = {"Type": "Binary", "Value": json.dumps(trace_data)} From ee549291d703abbbb7a0a62f05971c2f19572f14 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Fri, 11 Mar 2022 16:31:25 -0500 Subject: [PATCH 45/58] reformat function logic after manual testing --- ddtrace/contrib/botocore/patch.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 70096a4edbb..fd23032c127 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -69,24 +69,22 @@ def inject_trace_data_to_message_attributes(trace_data, entry): Inject trace headers into the an SQS or SNS record's MessageAttributes """ - # SQS uses "eventSource" while SNS uses "EventSource" - event_source = entry.get("eventSource", entry.get("EventSource", "")) - if event_source == "aws:sqs": - # An Amazon SQS message can contain up to 10 metadata attributes. - if len(entry["messageAttributes"]) < 10: + if len(entry["MessageAttributes"]) < 10: + # We expect entry to either have QueueUrl or TopicArn which comes from a boto3 publish() or send_message() call + if entry.get("QueueUrl"): # Use String as changing this to Binary would be a breaking # change as other tracers expect this to be a String. - entry["messageAttributes"]["_datadog"] = {"dataType": "String", "stringValue": json.dumps(trace_data)} + entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} + elif entry.get("TopicArn"): + # Use Binary since SNS subscription filter policies fail silently + # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 + # AWS will encode our value if it sees "Binary" + entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: - # In the event an SQS record has 10 or more msg attributes we cannot add our _datadog msg attribute - log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") - elif event_source == "aws:sns": - # Use Binary since SNS subscription filter policies fail silently - # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 - # AWS will encode our value if it sees "Binary" - entry["MessageAttributes"]["_datadog"] = {"Type": "Binary", "Value": json.dumps(trace_data)} + log.warning("skipping trace injection, event source is not SNS or SQS") else: - log.warning("skipping trace injection, event source is not SNS or SQS") + # In the event a record has 10 or more msg attributes we cannot add our _datadog msg attribute + log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") def inject_trace_to_sqs_or_sns_batch_message(params, span): From 503237838be48db6504072a72f73724d32f55971 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Tue, 15 Mar 2022 10:43:47 -0400 Subject: [PATCH 46/58] base64.b64decode --- tests/contrib/botocore/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index 199295dd896..c904446e587 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1069,7 +1069,7 @@ def test_sns_send_message_trace_injection_with_b64_message_attributes(self): msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None msg_attr_val = msg_attr["_datadog"]["Value"] - msg_attr_val_b64decoded = msg_attr_val.b64decode(msg_attr_val) + msg_attr_val_b64decoded = base64.b64decode(msg_attr_val) headers = json.loads(msg_attr_val_b64decoded) assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) From 641408a9b6f890188849d6586dcad3c4acb97666 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Tue, 15 Mar 2022 16:25:31 -0400 Subject: [PATCH 47/58] distinguish between sqs and sns in joined functions by passing endpoint, fix failing test by decoding trace context, and bump localstack to fix failing tests --- ddtrace/contrib/botocore/patch.py | 38 +++++++++++++++++-------------- docker-compose.yml | 2 +- tests/contrib/botocore/test.py | 3 ++- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index fd23032c127..4367443db81 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -61,37 +61,40 @@ class TraceInjectionDecodingError(Exception): pass -def inject_trace_data_to_message_attributes(trace_data, entry): - # type: (Dict[str, str], Dict[str, Any]) -> None +def inject_trace_data_to_message_attributes(trace_data, entry, endpoint): + # type: (Dict[str, str], Dict[str, Any], endpoint) -> None """ :trace_data: trace headers to be stored in the entry's MessageAttributes :entry: an SQS or SNS record + :endpoint: endpoint of message, "sqs" or "sns" Inject trace headers into the an SQS or SNS record's MessageAttributes """ + if "MessageAttributes" not in entry: + entry["MessageAttributes"] = {} if len(entry["MessageAttributes"]) < 10: - # We expect entry to either have QueueUrl or TopicArn which comes from a boto3 publish() or send_message() call - if entry.get("QueueUrl"): - # Use String as changing this to Binary would be a breaking + if endpoint == "sqs": + # Use String since changing this to Binary would be a breaking # change as other tracers expect this to be a String. entry["MessageAttributes"]["_datadog"] = {"DataType": "String", "StringValue": json.dumps(trace_data)} - elif entry.get("TopicArn"): + elif endpoint == "sns": # Use Binary since SNS subscription filter policies fail silently # with JSON strings https://github.com/DataDog/datadog-lambda-js/pull/269 # AWS will encode our value if it sees "Binary" entry["MessageAttributes"]["_datadog"] = {"DataType": "Binary", "BinaryValue": json.dumps(trace_data)} else: - log.warning("skipping trace injection, event source is not SNS or SQS") + log.warning("skipping trace injection, endpoint is not SNS or SQS") else: # In the event a record has 10 or more msg attributes we cannot add our _datadog msg attribute log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") -def inject_trace_to_sqs_or_sns_batch_message(params, span): - # type: (Any, Span) -> None +def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint): + # type: (Any, Span, endpoint) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated + :endpoint: endpoint of message, "sqs" or "sns" Inject trace headers into MessageAttributes for all SQS or SNS records inside a batch """ @@ -103,21 +106,22 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span): # or PublishBatchRequestEntries (in case of PublishBatch). entries = params.get("Entries", params.get("PublishBatchRequestEntries", [])) for entry in entries: - inject_trace_data_to_message_attributes(trace_data, entry) + inject_trace_data_to_message_attributes(trace_data, entry, endpoint) -def inject_trace_to_sqs_or_sns_message(params, span): - # type: (Any, Span) -> None +def inject_trace_to_sqs_or_sns_message(params, span, endpoint): + # type: (Any, Span, endpoint) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated + :endpoint: endpoint of message, "sqs" or "sns" Inject trace headers into MessageAttributes for the SQS or SNS record """ trace_data = {} HTTPPropagator.inject(span.context, trace_data) - inject_trace_data_to_message_attributes(trace_data, params) + inject_trace_data_to_message_attributes(trace_data, params, endpoint) def inject_trace_to_eventbridge_detail(params, span): @@ -302,17 +306,17 @@ def patched_api_call(original_func, instance, args, kwargs): if endpoint_name == "lambda" and operation == "Invoke": inject_trace_to_client_context(params, span) if endpoint_name == "sqs" and operation == "SendMessage": - inject_trace_to_sqs_or_sns_message(params, span) + inject_trace_to_sqs_or_sns_message(params, span, "sqs") if endpoint_name == "sqs" and operation == "SendMessageBatch": - inject_trace_to_sqs_or_sns_batch_message(params, span) + inject_trace_to_sqs_or_sns_batch_message(params, span, "sqs") if endpoint_name == "events" and operation == "PutEvents": inject_trace_to_eventbridge_detail(params, span) if endpoint_name == "kinesis" and (operation == "PutRecord" or operation == "PutRecords"): inject_trace_to_kinesis_stream(params, span) if endpoint_name == "sns" and operation == "Publish": - inject_trace_to_sqs_or_sns_message(params, span) + inject_trace_to_sqs_or_sns_message(params, span, "sns") if endpoint_name == "sns" and operation == "PublishBatch": - inject_trace_to_sqs_or_sns_batch_message(params, span) + inject_trace_to_sqs_or_sns_batch_message(params, span, "sns") except Exception: log.warning("Unable to inject trace context", exc_info=True) diff --git a/docker-compose.yml b/docker-compose.yml index c097d40e88d..c17fff6bb02 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -132,7 +132,7 @@ services: - ./.ddriot:/root/project/.riot localstack: - image: localstack/localstack:0.13.1 + image: localstack/localstack:0.14.1 network_mode: bridge ports: - "127.0.0.1:4566:4566" diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index c904446e587..4142412f0a1 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1003,7 +1003,8 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self): assert msg_str == "test" msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None - headers = json.loads(msg_attr["_datadog"]["Value"]) + datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"]) + headers = json.loads(datadog_value_decoded) assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) From f66d208502f1886e6ae37ca96f6c3e3f0db5409d Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Tue, 15 Mar 2022 17:15:07 -0400 Subject: [PATCH 48/58] delete duplicate test --- tests/contrib/botocore/test.py | 67 ---------------------------------- 1 file changed, 67 deletions(-) diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index 4142412f0a1..a464f2c6aef 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1009,73 +1009,6 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self): assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) - @mock_sns - @mock_sqs - def test_sns_send_message_trace_injection_with_b64_message_attributes(self): - sns = self.session.create_client("sns", region_name="us-east-1", endpoint_url="http://localhost:4566") - sqs = self.session.create_client("sqs", region_name="us-east-1", endpoint_url="http://localhost:4566") - - topic = sns.create_topic(Name="testTopic") - queue = sqs.create_queue(QueueName="test") - - topic_arn = topic["TopicArn"] - sqs_url = queue["QueueUrl"] - sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=sqs_url) - - Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sns) - - message_attributes = { - "one": {"DataType": "String", "StringValue": "one"}, - "two": {"DataType": "String", "StringValue": "two"}, - "three": {"DataType": "String", "StringValue": "three"}, - "four": {"DataType": "String", "StringValue": "four"}, - "five": {"DataType": "String", "StringValue": "five"}, - "six": {"DataType": "String", "StringValue": "six"}, - "seven": {"DataType": "String", "StringValue": "seven"}, - "eight": {"DataType": "String", "StringValue": "eight"}, - "nine": {"DataType": "String", "StringValue": "nine"}, - } - - sns.publish(TopicArn=topic_arn, Message="test", MessageAttributes=message_attributes) - spans = self.get_spans() - - # get SNS messages via SQS - response = sqs.receive_message(QueueUrl=queue["QueueUrl"], MessageAttributeNames=["_datadog"]) - - # clean up resources - sqs.delete_queue(QueueUrl=sqs_url) - sns.delete_topic(TopicArn=topic_arn) - - # check if the appropriate span was generated - assert spans - span = spans[0] - assert len(spans) == 2 - assert span.get_tag("aws.region") == "us-east-1" - assert span.get_tag("aws.operation") == "Publish" - assert span.get_tag("params.MessageBody") is None - assert_is_measured(span) - assert_span_http_status_code(span, 200) - assert span.service == "test-botocore-tracing.sns" - assert span.resource == "sns.publish" - trace_json = span.get_tag("params.MessageAttributes._datadog.StringValue") - assert trace_json is None - - # receive message using SQS and ensure headers are present - assert len(response["Messages"]) == 1 - msg = response["Messages"][0] - assert msg is not None - msg_body = json.loads(msg["Body"]) - msg_str = msg_body["Message"] - assert msg_str == "test" - msg_attr = msg_body["MessageAttributes"] - assert msg_attr.get("_datadog") is not None - msg_attr_val = msg_attr["_datadog"]["Value"] - msg_attr_val_b64decoded = base64.b64decode(msg_attr_val) - headers = json.loads(msg_attr_val_b64decoded) - assert headers is not None - assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) - assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) - @mock_sns @mock_sqs def test_sns_send_message_trace_injection_with_max_message_attributes(self): From ddc034c58c34dad58148deea7d4fcdcec7b2346e Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez <49878080+zARODz11z@users.noreply.github.com> Date: Wed, 16 Mar 2022 18:02:27 -0400 Subject: [PATCH 49/58] add msg attribute comment --- ddtrace/contrib/botocore/patch.py | 1 + 1 file changed, 1 insertion(+) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 4367443db81..902102b2598 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -72,6 +72,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry, endpoint): """ if "MessageAttributes" not in entry: entry["MessageAttributes"] = {} + # Max of 10 message attributes. if len(entry["MessageAttributes"]) < 10: if endpoint == "sqs": # Use String since changing this to Binary would be a breaking From 541d1b5b56c077d0583417be8643ea29e9b568e6 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Thu, 17 Mar 2022 12:49:29 -0400 Subject: [PATCH 50/58] address Brett's comments, fix type annotations, bring back accidentally deleted test, and assert msg_attr[_datadog][Type] is Binary --- ddtrace/contrib/botocore/patch.py | 14 +++---- tests/contrib/botocore/test.py | 68 +++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 7 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 902102b2598..0e72e21fe9d 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -62,7 +62,7 @@ class TraceInjectionDecodingError(Exception): def inject_trace_data_to_message_attributes(trace_data, entry, endpoint): - # type: (Dict[str, str], Dict[str, Any], endpoint) -> None + # type: (Dict[str, str], Dict[str, Any], Union[Literal["sqs"], Literal["sns"]]) -> None """ :trace_data: trace headers to be stored in the entry's MessageAttributes :entry: an SQS or SNS record @@ -91,7 +91,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry, endpoint): def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint): - # type: (Any, Span, endpoint) -> None + # type: (Any, Span, Union[Literal["sqs"], Literal["sns"]]) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated @@ -111,7 +111,7 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint): def inject_trace_to_sqs_or_sns_message(params, span, endpoint): - # type: (Any, Span, endpoint) -> None + # type: (Any, Span, Union[Literal["sqs"], Literal["sns"]]) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated @@ -307,17 +307,17 @@ def patched_api_call(original_func, instance, args, kwargs): if endpoint_name == "lambda" and operation == "Invoke": inject_trace_to_client_context(params, span) if endpoint_name == "sqs" and operation == "SendMessage": - inject_trace_to_sqs_or_sns_message(params, span, "sqs") + inject_trace_to_sqs_or_sns_message(params, span, endpoint_name) if endpoint_name == "sqs" and operation == "SendMessageBatch": - inject_trace_to_sqs_or_sns_batch_message(params, span, "sqs") + inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name) if endpoint_name == "events" and operation == "PutEvents": inject_trace_to_eventbridge_detail(params, span) if endpoint_name == "kinesis" and (operation == "PutRecord" or operation == "PutRecords"): inject_trace_to_kinesis_stream(params, span) if endpoint_name == "sns" and operation == "Publish": - inject_trace_to_sqs_or_sns_message(params, span, "sns") + inject_trace_to_sqs_or_sns_message(params, span, endpoint_name) if endpoint_name == "sns" and operation == "PublishBatch": - inject_trace_to_sqs_or_sns_batch_message(params, span, "sns") + inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint_name) except Exception: log.warning("Unable to inject trace context", exc_info=True) diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index a464f2c6aef..873a7d6d48c 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1003,6 +1003,74 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self): assert msg_str == "test" msg_attr = msg_body["MessageAttributes"] assert msg_attr.get("_datadog") is not None + assert msg_attr["_datadog"]["Type"] == "Binary" + datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"]) + headers = json.loads(datadog_value_decoded) + assert headers is not None + assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) + assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) + + @mock_sns + @mock_sqs + def test_sns_send_message_trace_injection_with_message_attributes(self): + sns = self.session.create_client("sns", region_name="us-east-1", endpoint_url="http://localhost:4566") + sqs = self.session.create_client("sqs", region_name="us-east-1", endpoint_url="http://localhost:4566") + + topic = sns.create_topic(Name="testTopic") + queue = sqs.create_queue(QueueName="test") + + topic_arn = topic["TopicArn"] + sqs_url = queue["QueueUrl"] + sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=sqs_url) + + Pin(service=self.TEST_SERVICE, tracer=self.tracer).onto(sns) + + message_attributes = { + "one": {"DataType": "String", "StringValue": "one"}, + "two": {"DataType": "String", "StringValue": "two"}, + "three": {"DataType": "String", "StringValue": "three"}, + "four": {"DataType": "String", "StringValue": "four"}, + "five": {"DataType": "String", "StringValue": "five"}, + "six": {"DataType": "String", "StringValue": "six"}, + "seven": {"DataType": "String", "StringValue": "seven"}, + "eight": {"DataType": "String", "StringValue": "eight"}, + "nine": {"DataType": "String", "StringValue": "nine"}, + } + + sns.publish(TopicArn=topic_arn, Message="test", MessageAttributes=message_attributes) + spans = self.get_spans() + + # get SNS messages via SQS + response = sqs.receive_message(QueueUrl=queue["QueueUrl"], MessageAttributeNames=["_datadog"]) + + # clean up resources + sqs.delete_queue(QueueUrl=sqs_url) + sns.delete_topic(TopicArn=topic_arn) + + # check if the appropriate span was generated + assert spans + span = spans[0] + assert len(spans) == 2 + assert span.get_tag("aws.region") == "us-east-1" + assert span.get_tag("aws.operation") == "Publish" + assert span.get_tag("params.MessageBody") is None + assert_is_measured(span) + assert_span_http_status_code(span, 200) + assert span.service == "test-botocore-tracing.sns" + assert span.resource == "sns.publish" + trace_json = span.get_tag("params.MessageAttributes._datadog.StringValue") + assert trace_json is None + + # receive message using SQS and ensure headers are present + assert len(response["Messages"]) == 1 + msg = response["Messages"][0] + assert msg is not None + msg_body = json.loads(msg["Body"]) + msg_str = msg_body["Message"] + assert msg_str == "test" + msg_attr = msg_body["MessageAttributes"] + assert msg_attr.get("_datadog") is not None + assert msg_attr["_datadog"]["Type"] == "Binary" datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"]) headers = json.loads(datadog_value_decoded) assert headers is not None From eb845102504992bafa938ee8aa495ac814de007f Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Thu, 17 Mar 2022 14:12:46 -0400 Subject: [PATCH 51/58] Give endpoint None default value and update type annotation --- ddtrace/contrib/botocore/patch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 0e72e21fe9d..60fde7e77a6 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -61,8 +61,8 @@ class TraceInjectionDecodingError(Exception): pass -def inject_trace_data_to_message_attributes(trace_data, entry, endpoint): - # type: (Dict[str, str], Dict[str, Any], Union[Literal["sqs"], Literal["sns"]]) -> None +def inject_trace_data_to_message_attributes(trace_data, entry, endpoint=None): + # type: (Dict[str, str], Dict[str, Any], Optional[Union[Literal["sqs"], Literal["sns"]]]) -> None """ :trace_data: trace headers to be stored in the entry's MessageAttributes :entry: an SQS or SNS record From ff09fc163a6e623dcd6cbe38c652c037d3d7e3bd Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Fri, 18 Mar 2022 09:55:29 -0400 Subject: [PATCH 52/58] make endpoint optional in all functions where endpoint is an arg. Update type annotation with optional. --- ddtrace/contrib/botocore/patch.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 60fde7e77a6..2b2cfa705bd 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -90,8 +90,8 @@ def inject_trace_data_to_message_attributes(trace_data, entry, endpoint=None): log.warning("skipping trace injection, max number (10) of MessageAttributes exceeded") -def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint): - # type: (Any, Span, Union[Literal["sqs"], Literal["sns"]]) -> None +def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint=None): + # type: (Any, Span, Optional[Union[Literal["sqs"], Literal["sns"]]]) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated @@ -110,8 +110,8 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint): inject_trace_data_to_message_attributes(trace_data, entry, endpoint) -def inject_trace_to_sqs_or_sns_message(params, span, endpoint): - # type: (Any, Span, Union[Literal["sqs"], Literal["sns"]]) -> None +def inject_trace_to_sqs_or_sns_message(params, span, endpoint=None): + # type: (Any, Span, Optional[Union[Literal["sqs"], Literal["sns"]]]) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated From a24ebcc5004b6b93206e78044e976ad3d7ce7e97 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Fri, 18 Mar 2022 14:59:37 -0400 Subject: [PATCH 53/58] add typing Union and Literal imports --- ddtrace/contrib/botocore/patch.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 2b2cfa705bd..c0b5137a55e 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -9,6 +9,8 @@ from typing import Dict from typing import List from typing import Optional +from typing import Literal +from typing import Union import botocore.client From 1a901109797c2ac72c5913e14108096bbe2f2a6a Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Fri, 18 Mar 2022 15:38:25 -0400 Subject: [PATCH 54/58] alphabetize imports --- ddtrace/contrib/botocore/patch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index c0b5137a55e..c7e51d107b2 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -8,8 +8,8 @@ from typing import Any from typing import Dict from typing import List -from typing import Optional from typing import Literal +from typing import Optional from typing import Union import botocore.client From bffbef00a7e1cca051d1630d699f46638b6a676e Mon Sep 17 00:00:00 2001 From: Brett Langdon Date: Fri, 18 Mar 2022 16:25:45 -0400 Subject: [PATCH 55/58] Update releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml --- .../encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml index 735cf29704e..69e39e21c66 100644 --- a/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml +++ b/releasenotes/notes/encode-sns-msg-attributes-as-b64-7818aec10f533534.yaml @@ -1,4 +1,4 @@ --- fixes: - | - botocore: fix incorrect context propagation message attribute types for SNS. This addresses `Datadog/serverless-plugin-datadog#232`_ + botocore: fix incorrect context propagation message attribute types for SNS. This addresses `Datadog/serverless-plugin-datadog#232 `_ From d73572d4584118a3538d20236008a35f3b4d4346 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Mon, 21 Mar 2022 11:05:23 -0400 Subject: [PATCH 56/58] edit type hints to Optional[str] --- ddtrace/contrib/botocore/patch.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index c7e51d107b2..78d55859af9 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -8,7 +8,6 @@ from typing import Any from typing import Dict from typing import List -from typing import Literal from typing import Optional from typing import Union @@ -64,7 +63,7 @@ class TraceInjectionDecodingError(Exception): def inject_trace_data_to_message_attributes(trace_data, entry, endpoint=None): - # type: (Dict[str, str], Dict[str, Any], Optional[Union[Literal["sqs"], Literal["sns"]]]) -> None + # type: (Dict[str, str], Dict[str, Any], Optional[str]) -> None """ :trace_data: trace headers to be stored in the entry's MessageAttributes :entry: an SQS or SNS record @@ -93,7 +92,7 @@ def inject_trace_data_to_message_attributes(trace_data, entry, endpoint=None): def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint=None): - # type: (Any, Span, Optional[Union[Literal["sqs"], Literal["sns"]]]) -> None + # type: (Any, Span, Optional[str]) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated @@ -113,7 +112,7 @@ def inject_trace_to_sqs_or_sns_batch_message(params, span, endpoint=None): def inject_trace_to_sqs_or_sns_message(params, span, endpoint=None): - # type: (Any, Span, Optional[Union[Literal["sqs"], Literal["sns"]]]) -> None + # type: (Any, Span, Optional[str]) -> None """ :params: contains the params for the current botocore action :span: the span which provides the trace context to be propagated From 8dc339b2d04fa266dda2a3ccee8acc56ccdea9eb Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Mon, 21 Mar 2022 11:11:05 -0400 Subject: [PATCH 57/58] remove uneeded union --- ddtrace/contrib/botocore/patch.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ddtrace/contrib/botocore/patch.py b/ddtrace/contrib/botocore/patch.py index 78d55859af9..b3e9fe5ff4e 100644 --- a/ddtrace/contrib/botocore/patch.py +++ b/ddtrace/contrib/botocore/patch.py @@ -9,7 +9,6 @@ from typing import Dict from typing import List from typing import Optional -from typing import Union import botocore.client From 3154e104da2cbac93d7c1deb52ed98e56f290fd7 Mon Sep 17 00:00:00 2001 From: Andrew Rodriguez Date: Mon, 21 Mar 2022 15:02:21 -0400 Subject: [PATCH 58/58] add .decode() before json.loads to pass python3.5 unit tests --- tests/contrib/botocore/test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/contrib/botocore/test.py b/tests/contrib/botocore/test.py index 873a7d6d48c..5853b52762c 100644 --- a/tests/contrib/botocore/test.py +++ b/tests/contrib/botocore/test.py @@ -1005,7 +1005,7 @@ def test_sns_send_message_trace_injection_with_no_message_attributes(self): assert msg_attr.get("_datadog") is not None assert msg_attr["_datadog"]["Type"] == "Binary" datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"]) - headers = json.loads(datadog_value_decoded) + headers = json.loads(datadog_value_decoded.decode()) assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id) @@ -1072,7 +1072,7 @@ def test_sns_send_message_trace_injection_with_message_attributes(self): assert msg_attr.get("_datadog") is not None assert msg_attr["_datadog"]["Type"] == "Binary" datadog_value_decoded = base64.b64decode(msg_attr["_datadog"]["Value"]) - headers = json.loads(datadog_value_decoded) + headers = json.loads(datadog_value_decoded.decode()) assert headers is not None assert headers[HTTP_HEADER_TRACE_ID] == str(span.trace_id) assert headers[HTTP_HEADER_PARENT_ID] == str(span.span_id)