diff --git a/tests/integrations/crossed_integrations/test_kinesis.py b/tests/integrations/crossed_integrations/test_kinesis.py index c550ffa6fe8..a6f5acc9893 100644 --- a/tests/integrations/crossed_integrations/test_kinesis.py +++ b/tests/integrations/crossed_integrations/test_kinesis.py @@ -2,11 +2,9 @@ import json from utils.buddies import python_buddy -from utils import interfaces, scenarios, weblog, missing_feature, features, context, irrelevant +from utils import interfaces, scenarios, weblog, missing_feature, features, context from utils.tools import logger -from tests.integrations.utils import delete_kinesis_stream - class _Test_Kinesis: """Test Kinesis compatibility with inputted datadog tracer""" @@ -76,22 +74,19 @@ def setup_produce(self): send request A to weblog : this request will produce a Kinesis message send request B to library buddy, this request will consume Kinesis message """ - try: - message = ( - "[crossed_integrations/test_kinesis.py][Kinesis] Hello from Kinesis " - f"[{context.library.library} weblog->{self.buddy_interface.name}] test produce at {self.unique_id}" - ) - - self.production_response = weblog.get( - "/kinesis/produce", params={"stream": self.WEBLOG_TO_BUDDY_STREAM, "message": message}, timeout=120 - ) - self.consume_response = self.buddy.get( - "/kinesis/consume", - params={"stream": self.WEBLOG_TO_BUDDY_STREAM, "message": message, "timeout": 60}, - timeout=61, - ) - finally: - delete_kinesis_stream(self.WEBLOG_TO_BUDDY_STREAM) + message = ( + "[crossed_integrations/test_kinesis.py][Kinesis] Hello from Kinesis " + f"[{context.library.library} weblog->{self.buddy_interface.name}] test produce at {self.unique_id}" + ) + + self.production_response = weblog.get( + "/kinesis/produce", params={"stream": self.WEBLOG_TO_BUDDY_STREAM, "message": message}, timeout=120 + ) + self.consume_response = self.buddy.get( + "/kinesis/consume", + params={"stream": self.WEBLOG_TO_BUDDY_STREAM, "message": message, "timeout": 60}, + timeout=61, + ) def test_produce(self): """Check that a message produced to Kinesis is correctly ingested by a Datadog tracer""" @@ -139,22 +134,19 @@ def setup_consume(self): request A: GET /library_buddy/produce_kinesis_message request B: GET /weblog/consume_kinesis_message """ - try: - message = ( - "[crossed_integrations/test_kinesis.py][Kinesis] Hello from Kinesis " - f"[{self.buddy_interface.name}->{context.library.library} weblog] test consume at {self.unique_id}" - ) - - self.production_response = self.buddy.get( - "/kinesis/produce", params={"stream": self.BUDDY_TO_WEBLOG_STREAM, "message": message}, timeout=500 - ) - self.consume_response = weblog.get( - "/kinesis/consume", - params={"stream": self.BUDDY_TO_WEBLOG_STREAM, "message": message, "timeout": 60}, - timeout=61, - ) - finally: - delete_kinesis_stream(self.BUDDY_TO_WEBLOG_STREAM) + message = ( + "[crossed_integrations/test_kinesis.py][Kinesis] Hello from Kinesis " + f"[{self.buddy_interface.name}->{context.library.library} weblog] test consume at {self.unique_id}" + ) + + self.production_response = self.buddy.get( + "/kinesis/produce", params={"stream": self.BUDDY_TO_WEBLOG_STREAM, "message": message}, timeout=500 + ) + self.consume_response = weblog.get( + "/kinesis/consume", + params={"stream": self.BUDDY_TO_WEBLOG_STREAM, "message": message, "timeout": 60}, + timeout=61, + ) def test_consume(self): """Check that a message by an app instrumented by a Datadog tracer is correctly ingested""" @@ -215,7 +207,6 @@ def validate_kinesis_spans(self, producer_interface, consumer_interface, stream) @scenarios.crossed_tracing_libraries -@irrelevant(True, reason="AWS Tests are not currently stable.") @features.aws_kinesis_span_creationcontext_propagation_via_message_attributes_with_dd_trace class Test_Kinesis_PROPAGATION_VIA_MESSAGE_ATTRIBUTES(_Test_Kinesis): buddy_interface = interfaces.python_buddy diff --git a/tests/integrations/crossed_integrations/test_sns_to_sqs.py b/tests/integrations/crossed_integrations/test_sns_to_sqs.py index 2c45facb106..61cd1ef470d 100644 --- a/tests/integrations/crossed_integrations/test_sns_to_sqs.py +++ b/tests/integrations/crossed_integrations/test_sns_to_sqs.py @@ -2,11 +2,9 @@ import json from utils.buddies import python_buddy -from utils import interfaces, scenarios, weblog, missing_feature, features, context, irrelevant +from utils import interfaces, scenarios, weblog, missing_feature, features, context from utils.tools import logger -from tests.integrations.utils import delete_sns_topic, delete_sqs_queue - class _Test_SNS: """Test sns compatibility with inputted datadog tracer""" @@ -103,25 +101,21 @@ def setup_produce(self): send request A to weblog : this request will produce a sns message send request B to library buddy, this request will consume sns message """ - try: - message = ( - "[crossed_integrations/test_sns_to_sqs.py][SNS] Hello from SNS " - f"[{context.library.library} weblog->{self.buddy_interface.name}] test produce at {self.unique_id}" - ) - - self.production_response = weblog.get( - "/sns/produce", - params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "topic": self.WEBLOG_TO_BUDDY_TOPIC, "message": message}, - timeout=60, - ) - self.consume_response = self.buddy.get( - "/sns/consume", - params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "timeout": 60, "message": message}, - timeout=61, - ) - finally: - delete_sns_topic(self.WEBLOG_TO_BUDDY_TOPIC) - delete_sqs_queue(self.WEBLOG_TO_BUDDY_QUEUE) + message = ( + "[crossed_integrations/test_sns_to_sqs.py][SNS] Hello from SNS " + f"[{context.library.library} weblog->{self.buddy_interface.name}] test produce at {self.unique_id}" + ) + + self.production_response = weblog.get( + "/sns/produce", + params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "topic": self.WEBLOG_TO_BUDDY_TOPIC, "message": message}, + timeout=60, + ) + self.consume_response = self.buddy.get( + "/sns/consume", + params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "timeout": 60, "message": message}, + timeout=61, + ) def test_produce(self): """Check that a message produced to sns is correctly ingested by a Datadog tracer""" @@ -168,25 +162,21 @@ def setup_consume(self): request A: GET /library_buddy/produce_sns_message request B: GET /weblog/consume_sns_message """ - try: - message = ( - "[crossed_integrations/test_sns_to_sqs.py][SNS] Hello from SNS " - f"[{self.buddy_interface.name}->{context.library.library} weblog] test consume at {self.unique_id}" - ) - - self.production_response = self.buddy.get( - "/sns/produce", - params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "topic": self.BUDDY_TO_WEBLOG_TOPIC, "message": message}, - timeout=60, - ) - self.consume_response = weblog.get( - "/sns/consume", - params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "timeout": 60, "message": message}, - timeout=61, - ) - finally: - delete_sns_topic(self.BUDDY_TO_WEBLOG_TOPIC) - delete_sqs_queue(self.BUDDY_TO_WEBLOG_QUEUE) + message = ( + "[crossed_integrations/test_sns_to_sqs.py][SNS] Hello from SNS " + f"[{self.buddy_interface.name}->{context.library.library} weblog] test consume at {self.unique_id}" + ) + + self.production_response = self.buddy.get( + "/sns/produce", + params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "topic": self.BUDDY_TO_WEBLOG_TOPIC, "message": message}, + timeout=60, + ) + self.consume_response = weblog.get( + "/sns/consume", + params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "timeout": 60, "message": message}, + timeout=61, + ) def test_consume(self): """Check that a message by an app instrumented by a Datadog tracer is correctly ingested""" @@ -255,7 +245,6 @@ def validate_sns_spans(self, producer_interface, consumer_interface, queue, topi @scenarios.crossed_tracing_libraries @features.aws_sns_span_creationcontext_propagation_via_message_attributes_with_dd_trace -@irrelevant(True, reason="AWS Tests are not currently stable.") class Test_SNS_Propagation(_Test_SNS): buddy_interface = interfaces.python_buddy buddy = python_buddy diff --git a/tests/integrations/crossed_integrations/test_sqs.py b/tests/integrations/crossed_integrations/test_sqs.py index 65feee20d34..ed54122662a 100644 --- a/tests/integrations/crossed_integrations/test_sqs.py +++ b/tests/integrations/crossed_integrations/test_sqs.py @@ -5,8 +5,6 @@ from utils import interfaces, scenarios, weblog, missing_feature, features, context, irrelevant from utils.tools import logger -from tests.integrations.utils import delete_sqs_queue - class _Test_SQS: """Test sqs compatibility with inputted datadog tracer""" @@ -88,22 +86,19 @@ def setup_produce(self): send request A to weblog : this request will produce a sqs message send request B to library buddy, this request will consume sqs message """ - try: - message = ( - "[crossed_integrations/sqs.py][SQS] Hello from SQS " - f"[{context.library.library} weblog->{self.buddy_interface.name}] test produce: {self.unique_id}" - ) - - self.production_response = weblog.get( - "/sqs/produce", params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "message": message}, timeout=60 - ) - self.consume_response = self.buddy.get( - "/sqs/consume", - params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "timeout": 60, "message": message}, - timeout=61, - ) - finally: - delete_sqs_queue(self.WEBLOG_TO_BUDDY_QUEUE) + message = ( + "[crossed_integrations/sqs.py][SQS] Hello from SQS " + f"[{context.library.library} weblog->{self.buddy_interface.name}] test produce: {self.unique_id}" + ) + + self.production_response = weblog.get( + "/sqs/produce", params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "message": message}, timeout=60 + ) + self.consume_response = self.buddy.get( + "/sqs/consume", + params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "timeout": 60, "message": message}, + timeout=61, + ) def test_produce(self): """Check that a message produced to sqs is correctly ingested by a Datadog tracer""" @@ -155,22 +150,19 @@ def setup_consume(self): request A: GET /library_buddy/produce_sqs_message request B: GET /weblog/consume_sqs_message """ - try: - message = ( - "[crossed_integrations/test_sqs.py][SQS] Hello from SQS " - f"[{self.buddy_interface.name}->{context.library.library} weblog] test consume: {self.unique_id}" - ) - - self.production_response = self.buddy.get( - "/sqs/produce", params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "message": message}, timeout=60 - ) - self.consume_response = weblog.get( - "/sqs/consume", - params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "timeout": 60, "message": message}, - timeout=61, - ) - finally: - delete_sqs_queue(self.BUDDY_TO_WEBLOG_QUEUE) + message = ( + "[crossed_integrations/test_sqs.py][SQS] Hello from SQS " + f"[{self.buddy_interface.name}->{context.library.library} weblog] test consume: {self.unique_id}" + ) + + self.production_response = self.buddy.get( + "/sqs/produce", params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "message": message}, timeout=60 + ) + self.consume_response = weblog.get( + "/sqs/consume", + params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "timeout": 60, "message": message}, + timeout=61, + ) def test_consume(self): """Check that a message by an app instrumented by a Datadog tracer is correctly ingested""" @@ -232,7 +224,6 @@ def validate_sqs_spans(self, producer_interface, consumer_interface, queue): @scenarios.crossed_tracing_libraries -@irrelevant(True, reason="AWS Tests are not currently stable.") @features.aws_sqs_span_creationcontext_propagation_via_message_attributes_with_dd_trace class Test_SQS_PROPAGATION_VIA_MESSAGE_ATTRIBUTES(_Test_SQS): buddy_interface = interfaces.python_buddy @@ -245,8 +236,8 @@ class Test_SQS_PROPAGATION_VIA_MESSAGE_ATTRIBUTES(_Test_SQS): @scenarios.crossed_tracing_libraries -@irrelevant(True, reason="AWS Tests are not currently stable.") @features.aws_sqs_span_creationcontext_propagation_via_xray_header_with_dd_trace +@irrelevant("Localstack SQS does not support AWS Xray Header parsing") class Test_SQS_PROPAGATION_VIA_AWS_XRAY_HEADERS(_Test_SQS): buddy_interface = interfaces.java_buddy buddy = java_buddy diff --git a/tests/integrations/test_dsm.py b/tests/integrations/test_dsm.py index 7692ded6671..48e362675b9 100644 --- a/tests/integrations/test_dsm.py +++ b/tests/integrations/test_dsm.py @@ -4,13 +4,9 @@ import base64 import json +import os -from tests.integrations.utils import ( - compute_dsm_hash, - delete_sqs_queue, - delete_kinesis_stream, - delete_sns_topic, -) +from tests.integrations.utils import compute_dsm_hash from utils import weblog, interfaces, scenarios, irrelevant, context, bug, features, missing_feature, flaky from utils.tools import logger @@ -23,6 +19,16 @@ DSM_EXCHANGE = "dsm-system-tests-exchange" DSM_ROUTING_KEY = "dsm-system-tests-routing-key" +# AWS Specific +AWS_HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "") + +# TO DO CHECK RUNNER ENV FOR A SYSTEM TESTS AWS ENV INDICATING IF AWS TESTS IS LOCAL OR REMOTE + +# If the AWS host points to localstack, we are using local AWS mocking, else assume the real account +LOCAL_AWS_ACCT = "000000000000" # if 'localstack' in AWS_HOST else "601427279990" +AWS_ACCT = LOCAL_AWS_ACCT # if 'localstack' in AWS_HOST else "601427279990" +AWS_TESTING = "local" if LOCAL_AWS_ACCT == AWS_ACCT else "remote" + # AWS Kinesis Specific DSM_STREAM = "dsm-system-tests-stream" @@ -263,28 +269,23 @@ def test_dsm_rabbitmq(self): @features.datastreams_monitoring_support_for_sqs -@irrelevant(True, reason="AWS Tests are not currently stable.") @scenarios.integrations_aws class Test_DsmSQS: """Verify DSM stats points for AWS Sqs Service""" def setup_dsm_sqs(self): - try: - message = get_message("Test_DsmSQS", "sqs") - - # we can't add the time hash to node since we can't replicate the hashing algo in python and compute a hash, - # which changes for each run with the time stamp added - if context.library.library != "nodejs": - self.queue = f"{DSM_QUEUE}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}" - else: - self.queue = f"{DSM_QUEUE}_{context.library.library}" - - self.r = weblog.get( - f"/dsm?integration=sqs&timeout=60&queue={self.queue}&message={message}", timeout=DSM_REQUEST_TIMEOUT - ) - finally: - if context.library.library != "nodejs": - delete_sqs_queue(self.queue) + message = get_message("Test_DsmSQS", "sqs") + + # we can't add the time hash to node since we can't replicate the hashing algo in python and compute a hash, + # which changes for each run with the time stamp added + if context.library.library != "nodejs": + self.queue = f"{DSM_QUEUE}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}" + else: + self.queue = f"{DSM_QUEUE}_{context.library.library}" + + self.r = weblog.get( + f"/dsm?integration=sqs&timeout=60&queue={self.queue}&message={message}", timeout=DSM_REQUEST_TIMEOUT + ) def test_dsm_sqs(self): assert self.r.text == "ok" @@ -317,37 +318,31 @@ def test_dsm_sqs(self): @features.datastreams_monitoring_support_for_sns -@irrelevant(True, reason="AWS Tests are not currently stable.") @scenarios.integrations_aws class Test_DsmSNS: """Verify DSM stats points for AWS SNS Service""" def setup_dsm_sns(self): - try: - message = get_message("Test_DsmSNS", "sns") - - # we can't add the time hash to node since we can't replicate the hashing algo in python and compute a hash, - # which changes for each run with the time stamp added - if context.library.library != "nodejs": - self.topic = f"{DSM_TOPIC}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}_raw" - self.queue = f"{DSM_QUEUE_SNS}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}_raw" - else: - self.topic = f"{DSM_TOPIC}_{context.library.library}_raw" - self.queue = f"{DSM_QUEUE_SNS}_{context.library.library}_raw" - - self.r = weblog.get( - f"/dsm?integration=sns&timeout=60&queue={self.queue}&topic={self.topic}&message={message}", - timeout=DSM_REQUEST_TIMEOUT, - ) - finally: - if context.library.library != "nodejs": - delete_sns_topic(self.topic) - delete_sqs_queue(self.queue) + message = get_message("Test_DsmSNS", "sns") + + # we can't add the time hash to node since we can't replicate the hashing algo in python and compute a hash, + # which changes for each run with the time stamp added + if context.library.library != "nodejs": + self.topic = f"{DSM_TOPIC}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}_raw" + self.queue = f"{DSM_QUEUE_SNS}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}_raw" + else: + self.topic = f"{DSM_TOPIC}_{context.library.library}_raw" + self.queue = f"{DSM_QUEUE_SNS}_{context.library.library}_raw" + + self.r = weblog.get( + f"/dsm?integration=sns&timeout=60&queue={self.queue}&topic={self.topic}&message={message}", + timeout=DSM_REQUEST_TIMEOUT, + ) def test_dsm_sns(self): assert self.r.text == "ok" - topic = self.topic if context.library.library == "java" else f"arn:aws:sns:us-east-1:601427279990:{self.topic}" + topic = self.topic if context.library.library == "java" else f"arn:aws:sns:us-east-1:{AWS_ACCT}:{self.topic}" hash_inputs = { "default": { @@ -355,8 +350,8 @@ def test_dsm_sns(self): "tags_in": ("direction:in", f"topic:{self.queue}", "type:sqs"), }, "nodejs": { - "producer": 15466202493380574985, - "consumer": 9372735371403270535, + "producer": 15466202493380574985 if AWS_TESTING == "remote" else 3703335291192845713, + "consumer": 9372735371403270535 if AWS_TESTING == "remote" else 797339341876345963, "tags_out": ("direction:out", f"topic:{topic}", "type:sns"), "tags_in": ("direction:in", f"topic:{self.queue}", "type:sqs"), }, @@ -377,35 +372,30 @@ def test_dsm_sns(self): @features.datastreams_monitoring_support_for_kinesis -@irrelevant(True, reason="AWS Tests are not currently stable.") @scenarios.integrations_aws class Test_DsmKinesis: """Verify DSM stats points for AWS Kinesis Service""" def setup_dsm_kinesis(self): - try: - message = get_message("Test_DsmKinesis", "kinesis") - - # we can't add the time hash to node since we can't replicate the hashing algo in python and compute a hash, - # which changes for each run with the time stamp added - if context.library.library != "nodejs": - self.stream = f"{DSM_STREAM}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}" - else: - self.stream = f"{DSM_STREAM}_{context.library.library}" - - self.r = weblog.get( - f"/dsm?integration=kinesis&timeout=60&stream={self.stream}&message={message}", - timeout=DSM_REQUEST_TIMEOUT, - ) - finally: - if context.library.library != "nodejs": - delete_kinesis_stream(self.stream) + message = get_message("Test_DsmKinesis", "kinesis") + + # we can't add the time hash to node since we can't replicate the hashing algo in python and compute a hash, + # which changes for each run with the time stamp added + if context.library.library != "nodejs": + self.stream = f"{DSM_STREAM}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}" + else: + self.stream = f"{DSM_STREAM}_{context.library.library}" + + self.r = weblog.get( + f"/dsm?integration=kinesis&timeout=60&stream={self.stream}&message={message}", + timeout=DSM_REQUEST_TIMEOUT, + ) @missing_feature(library="java", reason="DSM is not implemented for Java AWS Kinesis.") def test_dsm_kinesis(self): assert self.r.text == "ok" - stream_arn = f"arn:aws:kinesis:us-east-1:601427279990:stream/{self.stream}" + stream_arn = f"arn:aws:kinesis:us-east-1:{AWS_ACCT}:stream/{self.stream}" hash_inputs = { "default": { diff --git a/tests/integrations/utils.py b/tests/integrations/utils.py index 72b0cdac90d..c9a49638dc3 100644 --- a/tests/integrations/utils.py +++ b/tests/integrations/utils.py @@ -206,10 +206,15 @@ def delete_aws_resource( raise +SQS_URL = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sqs.us-east-1.amazonaws.com/601427279990") +SNS_URL = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sns.us-east-1.amazonaws.com/601427279990") +KINESIS_URL = os.getenv("SYSTEM_TESTS_AWS_URL", "https://kinesis.us-east-1.amazonaws.com/601427279990") + + def delete_sqs_queue(queue_name): try: - queue_url = f"https://sqs.us-east-1.amazonaws.com/601427279990/{queue_name}" - sqs_client = _get_aws_session().client("sqs") + queue_url = f"{SQS_URL}/{queue_name}" + sqs_client = _get_aws_session().client("sqs", endpoint_url=SQS_URL) delete_callable = lambda url: sqs_client.delete_queue(QueueUrl=url) get_callable = lambda url: sqs_client.get_queue_attributes(QueueUrl=url) delete_aws_resource( @@ -231,7 +236,7 @@ def delete_sqs_queue(queue_name): def delete_sns_topic(topic_name): try: topic_arn = f"arn:aws:sns:us-east-1:601427279990:{topic_name}" - sns_client = _get_aws_session().client("sns") + sns_client = _get_aws_session().client("sns", endpoint_url=SNS_URL) get_callable = lambda arn: sns_client.get_topic_attributes(TopicArn=arn) delete_callable = lambda arn: sns_client.delete_topic(TopicArn=arn) delete_aws_resource(delete_callable, topic_arn, "SNS Topic", "NotFound", get_callable=get_callable) @@ -246,7 +251,7 @@ def delete_sns_topic(topic_name): def delete_kinesis_stream(stream_name): try: - kinesis_client = _get_aws_session().client("kinesis") + kinesis_client = _get_aws_session().client("kinesis", endpoint_url=KINESIS_URL) delete_callable = lambda name: kinesis_client.delete_stream(StreamName=name, EnforceConsumerDeletion=True) delete_aws_resource(delete_callable, stream_name, "Kinesis Stream", "ResourceNotFoundException") except botocore.exceptions.ClientError as e: diff --git a/utils/_context/_scenarios/endtoend.py b/utils/_context/_scenarios/endtoend.py index 067f322167c..41d04743c1f 100644 --- a/utils/_context/_scenarios/endtoend.py +++ b/utils/_context/_scenarios/endtoend.py @@ -27,6 +27,8 @@ MsSqlServerContainer, BuddyContainer, TestedContainer, + LocalstackContainer, + ElasticMQContainer, _get_client as get_docker_client, ) @@ -67,6 +69,8 @@ def __init__( include_rabbitmq=False, include_mysql_db=False, include_sqlserver=False, + include_localstack=False, + include_elasticmq=False, ) -> None: super().__init__(name, doc=doc, github_workflow=github_workflow, scenario_groups=scenario_groups) @@ -114,6 +118,12 @@ def __init__( if include_sqlserver: self._supporting_containers.append(MsSqlServerContainer(host_log_folder=self.host_log_folder)) + if include_localstack: + self._supporting_containers.append(LocalstackContainer(host_log_folder=self.host_log_folder)) + + if include_elasticmq: + self._supporting_containers.append(ElasticMQContainer(host_log_folder=self.host_log_folder)) + self._required_containers.extend(self._supporting_containers) def get_image_list(self, library: str, weblog: str) -> list[str]: @@ -256,6 +266,8 @@ def __init__( include_rabbitmq=False, include_mysql_db=False, include_sqlserver=False, + include_localstack=False, + include_elasticmq=False, include_otel_drop_in=False, include_buddies=False, require_api_key=False, @@ -281,6 +293,8 @@ def __init__( include_rabbitmq=include_rabbitmq, include_mysql_db=include_mysql_db, include_sqlserver=include_sqlserver, + include_localstack=include_localstack, + include_elasticmq=include_elasticmq, ) self._use_proxy_for_agent = use_proxy_for_agent diff --git a/utils/_context/_scenarios/integrations.py b/utils/_context/_scenarios/integrations.py index 962f6d647b5..a31b7cd745e 100644 --- a/utils/_context/_scenarios/integrations.py +++ b/utils/_context/_scenarios/integrations.py @@ -36,6 +36,7 @@ def __init__(self) -> None: "AWS_ACCESS_KEY_ID": "my-access-key", "AWS_SECRET_ACCESS_KEY": "my-access-key", "DD_TRACE_INFERRED_PROXY_SERVICES_ENABLED": "true", + "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566", "DD_IAST_CONTEXT_MODE": "GLOBAL", }, include_postgres_db=True, @@ -45,6 +46,8 @@ def __init__(self) -> None: include_rabbitmq=True, include_mysql_db=True, include_sqlserver=True, + include_localstack=True, + include_elasticmq=True, include_otel_drop_in=True, doc=( "Spawns tracer, agent, and a full set of database. " @@ -93,6 +96,8 @@ def __init__( include_kafka=False, include_rabbitmq=False, include_buddies=False, + include_localstack=True, + include_elasticmq=True, ) -> None: super().__init__( name, @@ -100,11 +105,14 @@ def __init__( "DD_TRACE_API_VERSION": "v0.4", "AWS_ACCESS_KEY_ID": "my-access-key", "AWS_SECRET_ACCESS_KEY": "my-access-key", + "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566", }, doc=doc, include_kafka=include_kafka, include_rabbitmq=include_rabbitmq, include_buddies=include_buddies, + include_localstack=include_localstack, + include_elasticmq=include_elasticmq, scenario_groups=[ScenarioGroup.INTEGRATIONS, ScenarioGroup.ESSENTIALS], ) # Since we are using real AWS queues / topics, we need a unique message to ensure we aren't consuming messages @@ -114,8 +122,6 @@ def __init__( def configure(self, config): super().configure(config) - if not self.replay: - self._check_aws_variables() self.unique_id = _get_unique_id(self.host_log_folder, replay=self.replay) def _check_aws_variables(self): @@ -133,7 +139,13 @@ def __init__(self) -> None: include_kafka=True, include_buddies=True, include_rabbitmq=True, + include_localstack=True, + include_elasticmq=True, doc="Spawns a buddy for each supported language of APM, requires AWS authentication.", + weblog_env={ + "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566", + "SYSTEM_TESTS_AWS_SQS_URL": "http://elasticmq:9324", + }, ) self.unique_id = None diff --git a/utils/_context/containers.py b/utils/_context/containers.py index be07dfe1bf4..f5720b224d4 100644 --- a/utils/_context/containers.py +++ b/utils/_context/containers.py @@ -607,6 +607,7 @@ def __init__(self, name, image_name, host_log_folder, host_port, trace_agent_por # "DD_TRACE_DEBUG": "true", "DD_AGENT_HOST": "proxy", "DD_TRACE_AGENT_PORT": trace_agent_port, + "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566", }, ) @@ -997,6 +998,41 @@ def __init__(self, host_log_folder) -> None: ) +class ElasticMQContainer(TestedContainer): + def __init__(self, host_log_folder) -> None: + super().__init__( + image_name="softwaremill/elasticmq-native:1.6.11", + name="elasticmq", + host_log_folder=host_log_folder, + environment={"ELASTICMQ_OPTS": "-Dnode-address.hostname=0.0.0.0"}, + ports={9324: 9324}, + volumes={"/var/run/docker.sock": {"bind": "/var/run/docker.sock", "mode": "rw"}}, + allow_old_container=True, + ) + + +class LocalstackContainer(TestedContainer): + def __init__(self, host_log_folder) -> None: + super().__init__( + image_name="localstack/localstack:4.1", + name="localstack-main", + environment={ + "LOCALSTACK_SERVICES": "kinesis,sqs,sns,xray", + "EXTRA_CORS_ALLOWED_HEADERS": "x-amz-request-id,x-amzn-requestid,x-amzn-trace-id", + "EXTRA_CORS_EXPOSE_HEADERS": "x-amz-request-id,x-amzn-requestid,x-amzn-trace-id", + "AWS_DEFAULT_REGION": "us-east-1", + "FORCE_NONINTERACTIVE": "true", + "START_WEB": "0", + "DEBUG": "1", + "SQS_PROVIDER": "elasticmq", + "DOCKER_HOST": "unix:///var/run/docker.sock", + }, + host_log_folder=host_log_folder, + ports={"4566": ("127.0.0.1", 4566)}, + volumes={"/var/run/docker.sock": {"bind": "/var/run/docker.sock", "mode": "rw"}}, + ) + + class MySqlContainer(SqlDbTestedContainer): def __init__(self, host_log_folder) -> None: super().__init__( diff --git a/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs b/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs index 56b64c00405..e1eb9120c18 100644 --- a/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs +++ b/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs @@ -170,7 +170,19 @@ class SqsProducer { public static async Task DoWork(string queue, string message) { - var sqsClient = new AmazonSQSClient(); + string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + + IAmazonSQS sqsClient; + if (!string.IsNullOrEmpty(awsUrl)) + { + // If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL + sqsClient = new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); + } + else + { + // If SYSTEM_TESTS_AWS_URL is not set, create a default client + sqsClient = new AmazonSQSClient(); + } // create queue Console.WriteLine($"[SQS] Produce: Creating queue {queue}"); CreateQueueResponse responseCreate = await sqsClient.CreateQueueAsync(queue); @@ -187,7 +199,19 @@ class SqsConsumer { public static async Task DoWork(string queue, string message) { - var sqsClient = new AmazonSQSClient(); + string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + + IAmazonSQS sqsClient; + if (!string.IsNullOrEmpty(awsUrl)) + { + // If awsUrl is set, use it for ServiceURL + sqsClient = new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); + } + else + { + // If awsUrl is not set, create a default client + sqsClient = new AmazonSQSClient(); + } // Create queue Console.WriteLine($"[SQS] Consume: Creating queue {queue}"); CreateQueueResponse responseCreate = await sqsClient.CreateQueueAsync(queue); diff --git a/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs b/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs index cd5b5598c23..6befb11e531 100644 --- a/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs +++ b/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs @@ -133,7 +133,19 @@ private static bool RabbitConsume(string queue, TimeSpan timeout) private static async Task SqsProduce(string queue, string message) { - var sqsClient = new AmazonSQSClient(); + string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + + IAmazonSQS sqsClient; + if (!string.IsNullOrEmpty(awsUrl)) + { + // If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL + sqsClient = new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); + } + else + { + // If SYSTEM_TESTS_AWS_URL is not set, create a default client + sqsClient = new AmazonSQSClient(); + } var responseCreate = await sqsClient.CreateQueueAsync(queue); var qUrl = responseCreate.QueueUrl; await sqsClient.SendMessageAsync(qUrl, message); @@ -143,7 +155,20 @@ private static async Task SqsProduce(string queue, string message) private static async Task SqsConsume(string queue, TimeSpan timeout, string message) { Console.WriteLine($"consuming one message from SQS queue {queue} in max {(int)timeout.TotalSeconds} seconds"); - var sqsClient = new AmazonSQSClient(); + + string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + + IAmazonSQS sqsClient; + if (!string.IsNullOrEmpty(awsUrl)) + { + // If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL + sqsClient = new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); + } + else + { + // If SYSTEM_TESTS_AWS_URL is not set, create a default client + sqsClient = new AmazonSQSClient(); + } var responseCreate = await sqsClient.CreateQueueAsync(queue); var qUrl = responseCreate.QueueUrl; diff --git a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/App.java b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/App.java index 4bbabf87ff4..99cf8b5f5ff 100644 --- a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/App.java +++ b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/App.java @@ -426,7 +426,9 @@ ResponseEntity sqsProduce( @RequestParam(required = true) String queue, @RequestParam(required = true) String message ) { - SqsConnector sqs = new SqsConnector(queue); + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); try { sqs.produceMessageWithoutNewThread(message); } catch (Exception e) { @@ -443,7 +445,9 @@ ResponseEntity sqsConsume( @RequestParam(required = false) Integer timeout, @RequestParam(required = true) String message ) { - SqsConnector sqs = new SqsConnector(queue); + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); if (timeout == null) timeout = 60; boolean consumed = false; try { @@ -462,8 +466,10 @@ ResponseEntity snsProduce( @RequestParam(required = true) String topic, @RequestParam(required = true) String message ) { + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + SnsConnector sns = new SnsConnector(topic); - SqsConnector sqs = new SqsConnector(queue); + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); try { sns.produceMessageWithoutNewThread(message, sqs); } catch (Exception e) { @@ -480,7 +486,9 @@ ResponseEntity snsConsume( @RequestParam(required = false) Integer timeout, @RequestParam(required = true) String message ) { - SqsConnector sqs = new SqsConnector(queue); + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); if (timeout == null) timeout = 60; boolean consumed = false; try { @@ -645,7 +653,9 @@ String publishToKafka( return "failed to start consuming message"; } } else if ("sqs".equals(integration)) { - SqsConnector sqs = new SqsConnector(queue); + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); try { Thread produceThread = sqs.startProducingMessage(message); produceThread.join(this.PRODUCE_CONSUME_THREAD_TIMEOUT); @@ -663,8 +673,10 @@ String publishToKafka( return "[SQS] failed to start consuming message"; } } else if ("sns".equals(integration)) { + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + SnsConnector sns = new SnsConnector(topic); - SqsConnector sqs = new SqsConnector(queue); + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); try { Thread produceThread = sns.startProducingMessage(message, sqs); produceThread.join(this.PRODUCE_CONSUME_THREAD_TIMEOUT); diff --git a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java index bde535725a6..c09e6d6405e 100644 --- a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java +++ b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java @@ -7,6 +7,7 @@ import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; @@ -35,10 +36,19 @@ public KinesisConnector(String stream){ } public KinesisClient createKinesisClient() { - KinesisClient kinesisClient = KinesisClient.builder() + KinesisClientBuilder builder = KinesisClient.builder() .region(this.region) - .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) - .build(); + .credentialsProvider(EnvironmentVariableCredentialsProvider.create()); + + // Read the SYSTEM_TESTS_AWS_URL environment variable + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + + // Only override endpoint if SYSTEM_TESTS_AWS_URL is set + if (systemTestsAwsUrl != null && !systemTestsAwsUrl.isEmpty()) { + builder.endpointOverride(URI.create(systemTestsAwsUrl)); + } + + KinesisClient kinesisClient = builder.build(); return kinesisClient; } diff --git a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java index d4c0d1a478b..d24b3424a01 100644 --- a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java +++ b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java @@ -7,6 +7,7 @@ import software.amazon.awssdk.services.sqs.model.QueueAttributeName; import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest; import software.amazon.awssdk.services.sns.SnsClient; +import software.amazon.awssdk.services.sns.SnsClientBuilder; import software.amazon.awssdk.services.sns.model.CreateTopicRequest; import software.amazon.awssdk.services.sns.model.CreateTopicResponse; import software.amazon.awssdk.services.sns.model.PublishRequest; @@ -30,10 +31,19 @@ public SnsConnector(String topic){ } private static SnsClient createSnsClient() { - SnsClient snsClient = SnsClient.builder() + SnsClientBuilder builder = SnsClient.builder() .region(Region.US_EAST_1) - .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) - .build(); + .credentialsProvider(EnvironmentVariableCredentialsProvider.create()); + + // Read the SYSTEM_TESTS_AWS_URL environment variable + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + + // Only override endpoint if SYSTEM_TESTS_AWS_URL is set + if (systemTestsAwsUrl != null && !systemTestsAwsUrl.isEmpty()) { + builder.endpointOverride(URI.create(systemTestsAwsUrl)); + } + + SnsClient snsClient = builder.build(); return snsClient; } diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js index 388b3046b2c..384b8a4b9b6 100644 --- a/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js @@ -1,10 +1,13 @@ const AWS = require('aws-sdk') const tracer = require('dd-trace') +const { AWS_HOST } = require('./shared') + const kinesisProduce = (stream, message, partitionKey = '1', timeout = 60000) => { // Create a Kinesis client const kinesis = new AWS.Kinesis({ - region: 'us-east-1' + region: 'us-east-1', + endpoint: AWS_HOST }) message = JSON.stringify({ message }) @@ -66,7 +69,10 @@ const kinesisProduce = (stream, message, partitionKey = '1', timeout = 60000) => const kinesisConsume = (stream, timeout = 60000, message, sequenceNumber) => { // Create a Kinesis client - const kinesis = new AWS.Kinesis() + const kinesis = new AWS.Kinesis({ + region: 'us-east-1', + endpoint: AWS_HOST + }) console.log(`[Kinesis] Looking for the following message for stream: ${stream}: ${message}`) diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/shared.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/shared.js new file mode 100644 index 00000000000..c9c25199c04 --- /dev/null +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/shared.js @@ -0,0 +1,4 @@ +const AWS_HOST = process.env.SYSTEM_TESTS_AWS_URL ?? 'https://sns.us-east-1.amazonaws.com' +const AWS_ACCT = process.env.SYSTEM_TESTS_AWS_URL ? '000000000000' : '601427279990' + +module.exports = { AWS_HOST, AWS_ACCT } diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/sns.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/sns.js index 14988d5197a..2baf09462c2 100644 --- a/utils/build/docker/nodejs/express/integrations/messaging/aws/sns.js +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/sns.js @@ -1,13 +1,21 @@ const AWS = require('aws-sdk') const tracer = require('dd-trace') +const { AWS_HOST, AWS_ACCT } = require('./shared') + let TopicArn let QueueUrl const snsPublish = (queue, topic, message) => { // Create an SQS client - const sns = new AWS.SNS() - const sqs = new AWS.SQS() + const sns = new AWS.SNS({ + region: 'us-east-1', + endpoint: AWS_HOST + }) + const sqs = new AWS.SQS({ + region: 'us-east-1', + endpoint: AWS_HOST + }) const messageToSend = message ?? 'Hello from SNS JavaScript injection' @@ -20,13 +28,15 @@ const snsPublish = (queue, topic, message) => { TopicArn = data.TopicArn - sqs.createQueue({ QueueName: queue }, (err) => { + sqs.createQueue({ QueueName: queue }, (err, data) => { if (err) { console.log(err) reject(err) } - QueueUrl = `https://sqs.us-east-1.amazonaws.com/601427279990/${queue}` + console.log(data) + + QueueUrl = `${AWS_HOST}/${AWS_ACCT}/${queue}` sqs.getQueueAttributes({ QueueUrl, AttributeNames: ['All'] }, (err, data) => { if (err) { @@ -34,6 +44,8 @@ const snsPublish = (queue, topic, message) => { reject(err) } + console.log('sns data') + console.log(data) const QueueArn = data.Attributes.QueueArn const policy = { @@ -102,9 +114,12 @@ const snsPublish = (queue, topic, message) => { const snsConsume = async (queue, timeout, expectedMessage) => { // Create an SQS client - const sqs = new AWS.SQS() + const sqs = new AWS.SQS({ + region: 'us-east-1', + endpoint: AWS_HOST + }) - const queueUrl = `https://sqs.us-east-1.amazonaws.com/601427279990/${queue}` + const queueUrl = `${AWS_HOST}/${AWS_ACCT}/${queue}` return new Promise((resolve, reject) => { let messageFound = false diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/sqs.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/sqs.js index bcfc38ccc45..264d0c687a6 100644 --- a/utils/build/docker/nodejs/express/integrations/messaging/aws/sqs.js +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/sqs.js @@ -1,9 +1,14 @@ const AWS = require('aws-sdk') const tracer = require('dd-trace') +const { AWS_HOST, AWS_ACCT } = require('./shared') + const sqsProduce = (queue, message) => { // Create an SQS client - const sqs = new AWS.SQS() + const sqs = new AWS.SQS({ + region: 'us-east-1', + endpoint: AWS_HOST + }) const messageToSend = message ?? 'Hello from SQS JavaScript injection' @@ -18,7 +23,7 @@ const sqsProduce = (queue, message) => { // Send messages to the queue const produce = () => { sqs.sendMessage({ - QueueUrl: `https://sqs.us-east-1.amazonaws.com/601427279990/${queue}`, + QueueUrl: `${AWS_HOST}/${AWS_ACCT}/${queue}`, MessageBody: messageToSend }, (err, data) => { if (err) { @@ -41,10 +46,15 @@ const sqsProduce = (queue, message) => { const sqsConsume = async (queue, timeout, expectedMessage) => { // Create an SQS client - const sqs = new AWS.SQS() + const sqs = new AWS.SQS({ + region: 'us-east-1', + endpoint: AWS_HOST + }) + + const queueUrl = `${AWS_HOST}/${AWS_ACCT}/${queue}` - const queueUrl = `https://sqs.us-east-1.amazonaws.com/601427279990/${queue}` console.log(`[SQS] Looking for message: ${expectedMessage} in queue: ${queue}`) + return new Promise((resolve, reject) => { let messageFound = false diff --git a/utils/build/docker/python/flask-poc.Dockerfile b/utils/build/docker/python/flask-poc.Dockerfile index a9d506151f0..8cf32f09d49 100644 --- a/utils/build/docker/python/flask-poc.Dockerfile +++ b/utils/build/docker/python/flask-poc.Dockerfile @@ -16,6 +16,7 @@ ENV _DD_APPSEC_DEDUPLICATION_ENABLED=false # Cross Tracer Integration Testing for Trace Context Propagation ENV DD_BOTOCORE_PROPAGATION_ENABLED=true ENV DD_KAFKA_PROPAGATION_ENABLED=true +ENV DD_TRACE_API_VERSION='v0.4' ENV LOG_LEVEL='DEBUG' diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/kinesis.py b/utils/build/docker/python/flask/integrations/messaging/aws/kinesis.py index 1f516ed2f3b..597a2ccb053 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/kinesis.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/kinesis.py @@ -1,17 +1,21 @@ import json import logging import time +import os import boto3 +HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://kinesis.us-east-1.amazonaws.com/601427279990") + + def kinesis_produce(stream, message, partition_key, timeout=60): """ The goal of this function is to trigger kinesis producer calls """ # Create an SQS client - kinesis = boto3.client("kinesis", region_name="us-east-1") + kinesis = boto3.client("kinesis", region_name="us-east-1", endpoint_url=HOST) # we only allow injection into JSON messages encoded as a string message = json.dumps({"message": message}) @@ -60,7 +64,7 @@ def kinesis_consume(stream, expectedMessage, timeout=60): The goal of this function is to trigger kinesis consumer calls """ # Create a Kinesis client - kinesis = boto3.client("kinesis", region_name="us-east-1") + kinesis = boto3.client("kinesis", region_name="us-east-1", endpoint_url=HOST) consumed_message = None shard_iterator = None diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/sns.py b/utils/build/docker/python/flask/integrations/messaging/aws/sns.py index 5f31edfb5ae..99f5526cbcd 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/sns.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/sns.py @@ -1,17 +1,23 @@ import json import logging +import os import time import boto3 +SNS_HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sns.us-east-1.amazonaws.com/601427279990") +SQS_HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sqs.us-east-1.amazonaws.com/601427279990") +AWS_ACCT = "000000000000" if "localstack" in SQS_HOST else "601427279990" + + def sns_produce(queue, topic, message): """ The goal of this function is to trigger sqs producer calls """ # Create an SQS client - sqs = boto3.client("sqs", region_name="us-east-1") - sns = boto3.client("sns", region_name="us-east-1") + sqs = boto3.client("sqs", region_name="us-east-1", endpoint_url=SNS_HOST) + sns = boto3.client("sns", region_name="us-east-1", endpoint_url=SQS_HOST) try: topic = sns.create_topic(Name=topic) @@ -40,17 +46,17 @@ def sns_produce(queue, topic, message): sqs.set_queue_attributes(QueueUrl=sqs_url, Attributes={"Policy": json.dumps(policy)}) sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=sqs_arn, Attributes={"RawMessageDelivery": "true"}) - print(f"[SNS->SQS] Created SNS Topic: {topic} and SQS Queue: {queue}") + logging.info(f"[SNS->SQS] Created SNS Topic: {topic} and SQS Queue: {queue}") except Exception as e: - print(f"[SNS->SQS] Error during Python SNS create topic or SQS create queue: {str(e)}") + logging.error(f"[SNS->SQS] Error during Python SNS create topic or SQS create queue: {str(e)}") try: # Send the message to the SNS topic sns.publish(TopicArn=topic_arn, Message=message) - print("[SNS->SQS] Python SNS messaged published successfully") + logging.info("[SNS->SQS] Python SNS messaged published successfully") return "SNS Produce ok" except Exception as e: - print(f"[SNS->SQS] Error during Python SNS publish message: {str(e)}") + logging.error(f"[SNS->SQS] Error during Python SNS publish message: {str(e)}") return {"error": f"[SNS->SQS] Error during Python SNS publish message: {str(e)}"} @@ -60,18 +66,34 @@ def sns_consume(queue, expectedMessage, timeout=60): """ # Create an SQS client - sqs = boto3.client("sqs", region_name="us-east-1") + sqs = boto3.client("sqs", region_name="us-east-1", endpoint_url=SQS_HOST) + + start = time.time() + queue_found = False + queue_url = None + + while not queue_found and time.time() < start + timeout: + try: + data = sqs.get_queue_url(QueueName=queue) + queue_found = True + logging.info(f"Found SQS Queue details with name: {queue}") + logging.info(data) + logging.info(data.get("QueueUrl")) + queue_url = data.get("QueueUrl") + except Exception as e: + logging.info(f"Error during Python SQS get queue details: {str(e)}") + time.sleep(1) consumed_message = None start_time = time.time() while not consumed_message and time.time() - start_time < timeout: try: - response = sqs.receive_message(QueueUrl=f"https://sqs.us-east-1.amazonaws.com/601427279990/{queue}") + response = sqs.receive_message(QueueUrl=queue_url) if response and "Messages" in response: for message in response["Messages"]: - print("[SNS->SQS] Consumed: ") - print(message) + logging.info("[SNS->SQS] Consumed: ") + logging.info(message) if message["Body"] == expectedMessage: consumed_message = message["Body"] logging.info("[SNS->SQS] Success. Found the following message: " + consumed_message) @@ -79,15 +101,15 @@ def sns_consume(queue, expectedMessage, timeout=60): else: # entire message may be json within the body try: - print("[SNS->SQS] Trying to decode raw message: ") - print(message.get("Body", "")) + logging.info("[SNS->SQS] Trying to decode raw message: ") + logging.info(message.get("Body", "")) message_json = json.loads(message["Body"]) if message_json.get("Message", "") == expectedMessage: consumed_message = message_json["Message"] - print("[SNS->SQS] Success. Found the following message: " + consumed_message) + logging.info("[SNS->SQS] Success. Found the following message: " + consumed_message) break except Exception as e: - print(e) + logging.error(e) pass except Exception as e: diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py b/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py index d8806ad7f07..2728fb1edd8 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py @@ -1,26 +1,35 @@ import logging +import os import time import boto3 +HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sqs.us-east-1.amazonaws.com/601427279990") +AWS_ACCT = "000000000000" if "localstack" in HOST else "601427279990" + + def sqs_produce(queue, message, timeout=60): """ The goal of this function is to trigger sqs producer calls """ # Create an SQS client - sqs = boto3.client("sqs", region_name="us-east-1") + sqs = boto3.client("sqs", region_name="us-east-1", endpoint_url=HOST) start = time.time() queue_created = False exc = None + queue_url = None while not queue_created and time.time() < start + timeout: try: - sqs.create_queue(QueueName=queue) + data = sqs.create_queue(QueueName=queue) queue_created = True logging.info(f"Created SQS Queue with name: {queue}") + logging.info(data) + logging.info(data.get("QueueUrl")) + queue_url = data.get("QueueUrl") except Exception as e: exc = e logging.info(f"Error during Python SQS create queue: {str(e)}") @@ -30,7 +39,7 @@ def sqs_produce(queue, message, timeout=60): while not message_sent and time.time() < start + timeout: try: # Send the message to the SQS queue - sqs.send_message(QueueUrl=f"https://sqs.us-east-1.amazonaws.com/601427279990/{queue}", MessageBody=message) + sqs.send_message(QueueUrl=queue_url, MessageBody=message) message_sent = True except Exception as e: exc = e @@ -49,14 +58,30 @@ def sqs_consume(queue, expectedMessage, timeout=60): The goal of this function is to trigger sqs consumer calls """ # Create an SQS client - sqs = boto3.client("sqs", region_name="us-east-1") + sqs = boto3.client("sqs", region_name="us-east-1", endpoint_url=HOST) + + start = time.time() + queue_found = False + queue_url = None + + while not queue_found and time.time() < start + timeout: + try: + data = sqs.get_queue_url(QueueName=queue) + queue_found = True + logging.info(f"Found SQS Queue details with name: {queue}") + logging.info(data) + logging.info(data.get("QueueUrl")) + queue_url = data.get("QueueUrl") + except Exception as e: + logging.info(f"Error during Python SQS get queue details: {str(e)}") + time.sleep(1) consumed_message = None start_time = time.time() while not consumed_message and time.time() - start_time < timeout: try: - response = sqs.receive_message(QueueUrl=f"https://sqs.us-east-1.amazonaws.com/601427279990/{queue}") + response = sqs.receive_message(QueueUrl=queue_url) if response and "Messages" in response: for message in response["Messages"]: if message["Body"] == expectedMessage: