From 529b3c53ddd157a3a86e727411e49366d5ed1ef8 Mon Sep 17 00:00:00 2001 From: William Conti Date: Thu, 19 Dec 2024 14:54:07 -0500 Subject: [PATCH 01/19] initial commit --- .../crossed_integrations/test_kinesis.py | 2 +- tests/integrations/utils.py | 13 ++++++--- utils/_context/_scenarios/endtoend.py | 1 + utils/_context/_scenarios/integrations.py | 13 +++++++-- utils/_context/containers.py | 20 ++++++++++++++ .../dotnet/weblog/Endpoints/DsmEndpoint.cs | 13 ++++++++- .../weblog/Endpoints/MessagingEndpoints.cs | 27 +++++++++++++++++-- .../system_tests/springboot/App.java | 24 ++++++++++++----- .../springboot/aws/KinesisConnector.java | 15 ++++++++--- .../springboot/aws/SnsConnector.java | 15 ++++++++--- .../integrations/messaging/aws/kinesis.js | 10 +++++-- .../express/integrations/messaging/aws/sns.js | 6 +++-- .../express/integrations/messaging/aws/sqs.js | 6 +++-- .../integrations/messaging/aws/kinesis.py | 8 ++++-- .../flask/integrations/messaging/aws/sns.py | 11 +++++--- .../flask/integrations/messaging/aws/sqs.py | 8 ++++-- 16 files changed, 157 insertions(+), 35 deletions(-) diff --git a/tests/integrations/crossed_integrations/test_kinesis.py b/tests/integrations/crossed_integrations/test_kinesis.py index c550ffa6fe8..36d3eb5cd84 100644 --- a/tests/integrations/crossed_integrations/test_kinesis.py +++ b/tests/integrations/crossed_integrations/test_kinesis.py @@ -215,7 +215,7 @@ def validate_kinesis_spans(self, producer_interface, consumer_interface, stream) @scenarios.crossed_tracing_libraries -@irrelevant(True, reason="AWS Tests are not currently stable.") +# @irrelevant(True, reason="AWS Tests are not currently stable.") @features.aws_kinesis_span_creationcontext_propagation_via_message_attributes_with_dd_trace class Test_Kinesis_PROPAGATION_VIA_MESSAGE_ATTRIBUTES(_Test_Kinesis): buddy_interface = interfaces.python_buddy diff --git a/tests/integrations/utils.py b/tests/integrations/utils.py index 5bd770692f1..9bae04c3120 100644 --- a/tests/integrations/utils.py +++ b/tests/integrations/utils.py @@ -206,10 +206,15 @@ def delete_aws_resource( raise +SQS_URL = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sqs.us-east-1.amazonaws.com/601427279990") +SNS_URL = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sns.us-east-1.amazonaws.com/601427279990") +KINESIS_URL = os.getenv("SYSTEM_TESTS_AWS_URL", "https://kinesis.us-east-1.amazonaws.com/601427279990") + + def delete_sqs_queue(queue_name): try: - queue_url = f"https://sqs.us-east-1.amazonaws.com/601427279990/{queue_name}" - sqs_client = _get_aws_session().client("sqs") + queue_url = f"{SQS_URL}/{queue_name}" + sqs_client = _get_aws_session().client("sqs", endpoint_url=SQS_URL) delete_callable = lambda url: sqs_client.delete_queue(QueueUrl=url) get_callable = lambda url: sqs_client.get_queue_attributes(QueueUrl=url) delete_aws_resource( @@ -231,7 +236,7 @@ def delete_sqs_queue(queue_name): def delete_sns_topic(topic_name): try: topic_arn = f"arn:aws:sns:us-east-1:601427279990:{topic_name}" - sns_client = _get_aws_session().client("sns") + sns_client = _get_aws_session().client("sns", endpoint_url=SNS_URL) get_callable = lambda arn: sns_client.get_topic_attributes(TopicArn=arn) delete_callable = lambda arn: sns_client.delete_topic(TopicArn=arn) delete_aws_resource(delete_callable, topic_arn, "SNS Topic", "NotFound", get_callable=get_callable) @@ -246,7 +251,7 @@ def delete_sns_topic(topic_name): def delete_kinesis_stream(stream_name): try: - kinesis_client = _get_aws_session().client("kinesis") + kinesis_client = _get_aws_session().client("kinesis", endpoint_url=KINESIS_URL) delete_callable = lambda name: kinesis_client.delete_stream(StreamName=name, EnforceConsumerDeletion=True) delete_aws_resource(delete_callable, stream_name, "Kinesis Stream", "ResourceNotFoundException") except botocore.exceptions.ClientError as e: diff --git a/utils/_context/_scenarios/endtoend.py b/utils/_context/_scenarios/endtoend.py index 63213f02095..fe30e01beeb 100644 --- a/utils/_context/_scenarios/endtoend.py +++ b/utils/_context/_scenarios/endtoend.py @@ -216,6 +216,7 @@ def __init__( include_rabbitmq=False, include_mysql_db=False, include_sqlserver=False, + include_localstack=False, include_otel_drop_in=False, include_buddies=False, require_api_key=False, diff --git a/utils/_context/_scenarios/integrations.py b/utils/_context/_scenarios/integrations.py index ab15547b849..561b5b12c4e 100644 --- a/utils/_context/_scenarios/integrations.py +++ b/utils/_context/_scenarios/integrations.py @@ -36,6 +36,7 @@ def __init__(self) -> None: "AWS_ACCESS_KEY_ID": "my-access-key", "AWS_SECRET_ACCESS_KEY": "my-access-key", "DD_TRACE_INFERRED_PROXY_SERVICES_ENABLED": "true", + "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566" }, include_postgres_db=True, include_cassandra_db=True, @@ -44,6 +45,7 @@ def __init__(self) -> None: include_rabbitmq=True, include_mysql_db=True, include_sqlserver=True, + include_localstack=True include_otel_drop_in=True, doc=( "Spawns tracer, agent, and a full set of database. " @@ -92,6 +94,7 @@ def __init__( include_kafka=False, include_rabbitmq=False, include_buddies=False, + include_localstack=True ) -> None: super().__init__( name, @@ -99,11 +102,13 @@ def __init__( "DD_TRACE_API_VERSION": "v0.4", "AWS_ACCESS_KEY_ID": "my-access-key", "AWS_SECRET_ACCESS_KEY": "my-access-key", + "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566" }, doc=doc, include_kafka=include_kafka, include_rabbitmq=include_rabbitmq, include_buddies=include_buddies, + include_localstack=include_localstack, scenario_groups=[ScenarioGroup.INTEGRATIONS, ScenarioGroup.ESSENTIALS], ) # Since we are using real AWS queues / topics, we need a unique message to ensure we aren't consuming messages @@ -113,8 +118,8 @@ def __init__( def configure(self, config): super().configure(config) - if not self.replay: - self._check_aws_variables() + # if not self.replay: + # self._check_aws_variables() self.unique_id = _get_unique_id(self.host_log_folder, replay=self.replay) def _check_aws_variables(self): @@ -132,7 +137,11 @@ def __init__(self) -> None: include_kafka=True, include_buddies=True, include_rabbitmq=True, + include_localstack=True doc="Spawns a buddy for each supported language of APM, requires AWS authentication.", + weblog_env={ + "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566" + }, ) self.unique_id = None diff --git a/utils/_context/containers.py b/utils/_context/containers.py index 6a933146a34..58f8b11e23d 100644 --- a/utils/_context/containers.py +++ b/utils/_context/containers.py @@ -976,6 +976,26 @@ def __init__(self, host_log_folder) -> None: ) +class LocalstackContainer(TestedContainer): + def __init__(self, host_log_folder) -> None: + super().__init__( + image_name="localstack/localstack:latest", + name="localstack-main", + environment={ + "LOCALSTACK_SERVICES": "kinesis,sqs,sns,xray", + "EXTRA_CORS_ALLOWED_HEADERS": "x-amz-request-id,x-amzn-requestid", + "EXTRA_CORS_EXPOSE_HEADERS": "x-amz-request-id,x-amzn-requestid", + "AWS_DEFAULT_REGION": "us-east-1", + "FORCE_NONINTERACTIVE": "true", + "START_WEB": "0", + "DOCKER_HOST": "unix:///var/run/docker.sock", + }, + host_log_folder=host_log_folder, + ports={"4566": ("127.0.0.1", 4566)}, + volumes={"/var/run/docker.sock": {"bind": "/var/run/docker.sock", "mode": "rw"}}, + ) + + class MySqlContainer(SqlDbTestedContainer): def __init__(self, host_log_folder) -> None: super().__init__( diff --git a/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs b/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs index 56b64c00405..e064a46ae94 100644 --- a/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs +++ b/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs @@ -187,7 +187,18 @@ class SqsConsumer { public static async Task DoWork(string queue, string message) { - var sqsClient = new AmazonSQSClient(); + string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + + if (!string.IsNullOrEmpty(awsUrl)) + { + // If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL + return new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); + } + else + { + // If SYSTEM_TESTS_AWS_URL is not set, create a default client + return new AmazonSQSClient(); + } // Create queue Console.WriteLine($"[SQS] Consume: Creating queue {queue}"); CreateQueueResponse responseCreate = await sqsClient.CreateQueueAsync(queue); diff --git a/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs b/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs index cd5b5598c23..767866c2be6 100644 --- a/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs +++ b/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs @@ -133,7 +133,18 @@ private static bool RabbitConsume(string queue, TimeSpan timeout) private static async Task SqsProduce(string queue, string message) { - var sqsClient = new AmazonSQSClient(); + string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + + if (!string.IsNullOrEmpty(awsUrl)) + { + // If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL + return new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); + } + else + { + // If SYSTEM_TESTS_AWS_URL is not set, create a default client + return new AmazonSQSClient(); + } var responseCreate = await sqsClient.CreateQueueAsync(queue); var qUrl = responseCreate.QueueUrl; await sqsClient.SendMessageAsync(qUrl, message); @@ -143,7 +154,19 @@ private static async Task SqsProduce(string queue, string message) private static async Task SqsConsume(string queue, TimeSpan timeout, string message) { Console.WriteLine($"consuming one message from SQS queue {queue} in max {(int)timeout.TotalSeconds} seconds"); - var sqsClient = new AmazonSQSClient(); + + string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + + if (!string.IsNullOrEmpty(awsUrl)) + { + // If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL + return new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); + } + else + { + // If SYSTEM_TESTS_AWS_URL is not set, create a default client + return new AmazonSQSClient(); + } var responseCreate = await sqsClient.CreateQueueAsync(queue); var qUrl = responseCreate.QueueUrl; diff --git a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/App.java b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/App.java index 0767b37c0d2..43183c59316 100644 --- a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/App.java +++ b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/App.java @@ -420,7 +420,9 @@ ResponseEntity sqsProduce( @RequestParam(required = true) String queue, @RequestParam(required = true) String message ) { - SqsConnector sqs = new SqsConnector(queue); + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); try { sqs.produceMessageWithoutNewThread(message); } catch (Exception e) { @@ -437,7 +439,9 @@ ResponseEntity sqsConsume( @RequestParam(required = false) Integer timeout, @RequestParam(required = true) String message ) { - SqsConnector sqs = new SqsConnector(queue); + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); if (timeout == null) timeout = 60; boolean consumed = false; try { @@ -456,8 +460,10 @@ ResponseEntity snsProduce( @RequestParam(required = true) String topic, @RequestParam(required = true) String message ) { + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + SnsConnector sns = new SnsConnector(topic); - SqsConnector sqs = new SqsConnector(queue); + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); try { sns.produceMessageWithoutNewThread(message, sqs); } catch (Exception e) { @@ -474,7 +480,9 @@ ResponseEntity snsConsume( @RequestParam(required = false) Integer timeout, @RequestParam(required = true) String message ) { - SqsConnector sqs = new SqsConnector(queue); + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); if (timeout == null) timeout = 60; boolean consumed = false; try { @@ -639,7 +647,9 @@ String publishToKafka( return "failed to start consuming message"; } } else if ("sqs".equals(integration)) { - SqsConnector sqs = new SqsConnector(queue); + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); try { Thread produceThread = sqs.startProducingMessage(message); produceThread.join(this.PRODUCE_CONSUME_THREAD_TIMEOUT); @@ -657,8 +667,10 @@ String publishToKafka( return "[SQS] failed to start consuming message"; } } else if ("sns".equals(integration)) { + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + SnsConnector sns = new SnsConnector(topic); - SqsConnector sqs = new SqsConnector(queue); + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); try { Thread produceThread = sns.startProducingMessage(message, sqs); produceThread.join(this.PRODUCE_CONSUME_THREAD_TIMEOUT); diff --git a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java index bde535725a6..67ca8ca6d67 100644 --- a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java +++ b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java @@ -35,10 +35,19 @@ public KinesisConnector(String stream){ } public KinesisClient createKinesisClient() { - KinesisClient kinesisClient = KinesisClient.builder() + KinesisClient.Builder builder = KinesisClient.builder() .region(this.region) - .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) - .build(); + .credentialsProvider(EnvironmentVariableCredentialsProvider.create()); + + // Read the SYSTEM_TESTS_AWS_URL environment variable + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + + // Only override endpoint if SYSTEM_TESTS_AWS_URL is set + if (systemTestsAwsUrl != null && !systemTestsAwsUrl.isEmpty()) { + builder.endpointOverride(URI.create(systemTestsAwsUrl)); + } + + KinesisClient kinesisClient = builder.build(); return kinesisClient; } diff --git a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java index d4c0d1a478b..a193c13fea2 100644 --- a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java +++ b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java @@ -30,10 +30,19 @@ public SnsConnector(String topic){ } private static SnsClient createSnsClient() { - SnsClient snsClient = SnsClient.builder() + SnsClient.Builder builder = SnsClient.builder() .region(Region.US_EAST_1) - .credentialsProvider(EnvironmentVariableCredentialsProvider.create()) - .build(); + .credentialsProvider(EnvironmentVariableCredentialsProvider.create()); + + // Read the SYSTEM_TESTS_AWS_URL environment variable + String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); + + // Only override endpoint if SYSTEM_TESTS_AWS_URL is set + if (systemTestsAwsUrl != null && !systemTestsAwsUrl.isEmpty()) { + builder.endpointOverride(URI.create(systemTestsAwsUrl)); + } + + SnsClient snsclient = builder.build(); return snsClient; } diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js index 388b3046b2c..94d213afbc3 100644 --- a/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js @@ -1,10 +1,13 @@ const AWS = require('aws-sdk') const tracer = require('dd-trace') +const HOST = process.env.SYSTEM_TESTS_AWS_URL ?? 'https://kinesis.us-east-1.amazonaws.com/601427279990' + const kinesisProduce = (stream, message, partitionKey = '1', timeout = 60000) => { // Create a Kinesis client const kinesis = new AWS.Kinesis({ - region: 'us-east-1' + region: 'us-east-1', + endpoint: HOST, }) message = JSON.stringify({ message }) @@ -66,7 +69,10 @@ const kinesisProduce = (stream, message, partitionKey = '1', timeout = 60000) => const kinesisConsume = (stream, timeout = 60000, message, sequenceNumber) => { // Create a Kinesis client - const kinesis = new AWS.Kinesis() + const kinesis = new AWS.Kinesis({ + region: 'us-east-1', + endpoint: HOST, + }) console.log(`[Kinesis] Looking for the following message for stream: ${stream}: ${message}`) diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/sns.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/sns.js index 14988d5197a..38a0e5d324c 100644 --- a/utils/build/docker/nodejs/express/integrations/messaging/aws/sns.js +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/sns.js @@ -4,6 +4,8 @@ const tracer = require('dd-trace') let TopicArn let QueueUrl +const HOST = process.env.SYSTEM_TESTS_AWS_URL ?? 'https://sns.us-east-1.amazonaws.com/601427279990' + const snsPublish = (queue, topic, message) => { // Create an SQS client const sns = new AWS.SNS() @@ -26,7 +28,7 @@ const snsPublish = (queue, topic, message) => { reject(err) } - QueueUrl = `https://sqs.us-east-1.amazonaws.com/601427279990/${queue}` + QueueUrl = `${HOST}/${queue}` sqs.getQueueAttributes({ QueueUrl, AttributeNames: ['All'] }, (err, data) => { if (err) { @@ -104,7 +106,7 @@ const snsConsume = async (queue, timeout, expectedMessage) => { // Create an SQS client const sqs = new AWS.SQS() - const queueUrl = `https://sqs.us-east-1.amazonaws.com/601427279990/${queue}` + const queueUrl = `${HOST}/${queue}` return new Promise((resolve, reject) => { let messageFound = false diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/sqs.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/sqs.js index bcfc38ccc45..b67d87aa6b9 100644 --- a/utils/build/docker/nodejs/express/integrations/messaging/aws/sqs.js +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/sqs.js @@ -1,6 +1,8 @@ const AWS = require('aws-sdk') const tracer = require('dd-trace') +const HOST = process.env.SYSTEM_TESTS_AWS_URL ?? 'https://sqs.us-east-1.amazonaws.com/601427279990' + const sqsProduce = (queue, message) => { // Create an SQS client const sqs = new AWS.SQS() @@ -18,7 +20,7 @@ const sqsProduce = (queue, message) => { // Send messages to the queue const produce = () => { sqs.sendMessage({ - QueueUrl: `https://sqs.us-east-1.amazonaws.com/601427279990/${queue}`, + QueueUrl: `${HOST}/${queue}`, MessageBody: messageToSend }, (err, data) => { if (err) { @@ -43,7 +45,7 @@ const sqsConsume = async (queue, timeout, expectedMessage) => { // Create an SQS client const sqs = new AWS.SQS() - const queueUrl = `https://sqs.us-east-1.amazonaws.com/601427279990/${queue}` + const queueUrl = `${HOST}/${queue}` console.log(`[SQS] Looking for message: ${expectedMessage} in queue: ${queue}`) return new Promise((resolve, reject) => { let messageFound = false diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/kinesis.py b/utils/build/docker/python/flask/integrations/messaging/aws/kinesis.py index 1f516ed2f3b..597a2ccb053 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/kinesis.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/kinesis.py @@ -1,17 +1,21 @@ import json import logging import time +import os import boto3 +HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://kinesis.us-east-1.amazonaws.com/601427279990") + + def kinesis_produce(stream, message, partition_key, timeout=60): """ The goal of this function is to trigger kinesis producer calls """ # Create an SQS client - kinesis = boto3.client("kinesis", region_name="us-east-1") + kinesis = boto3.client("kinesis", region_name="us-east-1", endpoint_url=HOST) # we only allow injection into JSON messages encoded as a string message = json.dumps({"message": message}) @@ -60,7 +64,7 @@ def kinesis_consume(stream, expectedMessage, timeout=60): The goal of this function is to trigger kinesis consumer calls """ # Create a Kinesis client - kinesis = boto3.client("kinesis", region_name="us-east-1") + kinesis = boto3.client("kinesis", region_name="us-east-1", endpoint_url=HOST) consumed_message = None shard_iterator = None diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/sns.py b/utils/build/docker/python/flask/integrations/messaging/aws/sns.py index 5f31edfb5ae..beffa283337 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/sns.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/sns.py @@ -1,17 +1,22 @@ import json import logging +import os import time import boto3 +SNS_HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sns.us-east-1.amazonaws.com/601427279990") +SQS_HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sqs.us-east-1.amazonaws.com/601427279990") + + def sns_produce(queue, topic, message): """ The goal of this function is to trigger sqs producer calls """ # Create an SQS client - sqs = boto3.client("sqs", region_name="us-east-1") - sns = boto3.client("sns", region_name="us-east-1") + sqs = boto3.client("sqs", region_name="us-east-1", endpoint_url=SNS_HOST) + sns = boto3.client("sns", region_name="us-east-1", endpoint_url=SQS_HOST) try: topic = sns.create_topic(Name=topic) @@ -60,7 +65,7 @@ def sns_consume(queue, expectedMessage, timeout=60): """ # Create an SQS client - sqs = boto3.client("sqs", region_name="us-east-1") + sqs = boto3.client("sqs", region_name="us-east-1", endpoint_url=SQS_HOST) consumed_message = None start_time = time.time() diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py b/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py index d8806ad7f07..51b9031bc1d 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py @@ -1,16 +1,20 @@ import logging +import os import time import boto3 +HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sqs.us-east-1.amazonaws.com/601427279990") + + def sqs_produce(queue, message, timeout=60): """ The goal of this function is to trigger sqs producer calls """ # Create an SQS client - sqs = boto3.client("sqs", region_name="us-east-1") + sqs = boto3.client("sqs", region_name="us-east-1", endpoint_url=HOST) start = time.time() queue_created = False @@ -49,7 +53,7 @@ def sqs_consume(queue, expectedMessage, timeout=60): The goal of this function is to trigger sqs consumer calls """ # Create an SQS client - sqs = boto3.client("sqs", region_name="us-east-1") + sqs = boto3.client("sqs", region_name="us-east-1", endpoint_url=HOST) consumed_message = None start_time = time.time() From 856c9bd249d5690b99d4601a24fe69338ab34f1a Mon Sep 17 00:00:00 2001 From: William Conti Date: Fri, 20 Dec 2024 10:42:46 -0500 Subject: [PATCH 02/19] fix lint --- utils/_context/_scenarios/integrations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/_context/_scenarios/integrations.py b/utils/_context/_scenarios/integrations.py index 561b5b12c4e..4ccaaa75db9 100644 --- a/utils/_context/_scenarios/integrations.py +++ b/utils/_context/_scenarios/integrations.py @@ -45,7 +45,7 @@ def __init__(self) -> None: include_rabbitmq=True, include_mysql_db=True, include_sqlserver=True, - include_localstack=True + include_localstack=True, include_otel_drop_in=True, doc=( "Spawns tracer, agent, and a full set of database. " From f52551faa25ff5a2081ff252e01759a926d36ffa Mon Sep 17 00:00:00 2001 From: William Conti Date: Fri, 20 Dec 2024 10:45:08 -0500 Subject: [PATCH 03/19] fix lint again --- utils/_context/_scenarios/integrations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/_context/_scenarios/integrations.py b/utils/_context/_scenarios/integrations.py index 4ccaaa75db9..1d160d0ea31 100644 --- a/utils/_context/_scenarios/integrations.py +++ b/utils/_context/_scenarios/integrations.py @@ -137,7 +137,7 @@ def __init__(self) -> None: include_kafka=True, include_buddies=True, include_rabbitmq=True, - include_localstack=True + include_localstack=True, doc="Spawns a buddy for each supported language of APM, requires AWS authentication.", weblog_env={ "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566" From 8a4f9a43835b744cf47d5e1207f2e8212e32edf6 Mon Sep 17 00:00:00 2001 From: William Conti Date: Fri, 20 Dec 2024 11:12:15 -0500 Subject: [PATCH 04/19] fix format --- utils/_context/_scenarios/endtoend.py | 1 + utils/_context/_scenarios/integrations.py | 8 ++++---- .../docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs | 2 +- .../java/com/datadoghq/system_tests/springboot/App.java | 2 +- .../system_tests/springboot/aws/KinesisConnector.java | 4 ++-- .../system_tests/springboot/aws/SnsConnector.java | 2 +- 6 files changed, 10 insertions(+), 9 deletions(-) diff --git a/utils/_context/_scenarios/endtoend.py b/utils/_context/_scenarios/endtoend.py index fe30e01beeb..6b3616aa80b 100644 --- a/utils/_context/_scenarios/endtoend.py +++ b/utils/_context/_scenarios/endtoend.py @@ -241,6 +241,7 @@ def __init__( include_rabbitmq=include_rabbitmq, include_mysql_db=include_mysql_db, include_sqlserver=include_sqlserver, + include_localstack=include_localstack, ) self._require_api_key = require_api_key diff --git a/utils/_context/_scenarios/integrations.py b/utils/_context/_scenarios/integrations.py index 1d160d0ea31..909d7e27359 100644 --- a/utils/_context/_scenarios/integrations.py +++ b/utils/_context/_scenarios/integrations.py @@ -36,7 +36,7 @@ def __init__(self) -> None: "AWS_ACCESS_KEY_ID": "my-access-key", "AWS_SECRET_ACCESS_KEY": "my-access-key", "DD_TRACE_INFERRED_PROXY_SERVICES_ENABLED": "true", - "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566" + "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566", }, include_postgres_db=True, include_cassandra_db=True, @@ -94,7 +94,7 @@ def __init__( include_kafka=False, include_rabbitmq=False, include_buddies=False, - include_localstack=True + include_localstack=True, ) -> None: super().__init__( name, @@ -102,7 +102,7 @@ def __init__( "DD_TRACE_API_VERSION": "v0.4", "AWS_ACCESS_KEY_ID": "my-access-key", "AWS_SECRET_ACCESS_KEY": "my-access-key", - "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566" + "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566", }, doc=doc, include_kafka=include_kafka, @@ -140,7 +140,7 @@ def __init__(self) -> None: include_localstack=True, doc="Spawns a buddy for each supported language of APM, requires AWS authentication.", weblog_env={ - "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566" + "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566", }, ) self.unique_id = None diff --git a/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs b/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs index 767866c2be6..0c436282fdd 100644 --- a/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs +++ b/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs @@ -154,7 +154,7 @@ private static async Task SqsProduce(string queue, string message) private static async Task SqsConsume(string queue, TimeSpan timeout, string message) { Console.WriteLine($"consuming one message from SQS queue {queue} in max {(int)timeout.TotalSeconds} seconds"); - + string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); if (!string.IsNullOrEmpty(awsUrl)) diff --git a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/App.java b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/App.java index 43183c59316..e54ffd89209 100644 --- a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/App.java +++ b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/App.java @@ -421,7 +421,7 @@ ResponseEntity sqsProduce( @RequestParam(required = true) String message ) { String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); - + SqsConnector sqs = new SqsConnector(queue, systemTestsAwsUrl); try { sqs.produceMessageWithoutNewThread(message); diff --git a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java index 67ca8ca6d67..526d666a424 100644 --- a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java +++ b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java @@ -41,12 +41,12 @@ public KinesisClient createKinesisClient() { // Read the SYSTEM_TESTS_AWS_URL environment variable String systemTestsAwsUrl = System.getenv("SYSTEM_TESTS_AWS_URL"); - + // Only override endpoint if SYSTEM_TESTS_AWS_URL is set if (systemTestsAwsUrl != null && !systemTestsAwsUrl.isEmpty()) { builder.endpointOverride(URI.create(systemTestsAwsUrl)); } - + KinesisClient kinesisClient = builder.build(); return kinesisClient; } diff --git a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java index a193c13fea2..50ae1b34d5b 100644 --- a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java +++ b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java @@ -41,7 +41,7 @@ private static SnsClient createSnsClient() { if (systemTestsAwsUrl != null && !systemTestsAwsUrl.isEmpty()) { builder.endpointOverride(URI.create(systemTestsAwsUrl)); } - + SnsClient snsclient = builder.build(); return snsClient; } From 9f16bc88d4c682e881b48608cc44918b0c816e4b Mon Sep 17 00:00:00 2001 From: William Conti Date: Fri, 20 Dec 2024 11:15:53 -0500 Subject: [PATCH 05/19] fix js format --- .../nodejs/express/integrations/messaging/aws/kinesis.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js index 94d213afbc3..b22856dffbb 100644 --- a/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js @@ -7,7 +7,7 @@ const kinesisProduce = (stream, message, partitionKey = '1', timeout = 60000) => // Create a Kinesis client const kinesis = new AWS.Kinesis({ region: 'us-east-1', - endpoint: HOST, + endpoint: HOST }) message = JSON.stringify({ message }) @@ -71,7 +71,7 @@ const kinesisConsume = (stream, timeout = 60000, message, sequenceNumber) => { // Create a Kinesis client const kinesis = new AWS.Kinesis({ region: 'us-east-1', - endpoint: HOST, + endpoint: HOST }) console.log(`[Kinesis] Looking for the following message for stream: ${stream}: ${message}`) From 03f2f5f8102b66622441ae3a9d885bf51e8965a9 Mon Sep 17 00:00:00 2001 From: William Conti Date: Fri, 20 Dec 2024 11:19:05 -0500 Subject: [PATCH 06/19] fix container --- utils/_context/_scenarios/endtoend.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/utils/_context/_scenarios/endtoend.py b/utils/_context/_scenarios/endtoend.py index 6b3616aa80b..3ad8ed619b9 100644 --- a/utils/_context/_scenarios/endtoend.py +++ b/utils/_context/_scenarios/endtoend.py @@ -22,6 +22,7 @@ MsSqlServerContainer, BuddyContainer, TestedContainer, + LocalstackContainer, _get_client as get_docker_client, ) @@ -53,6 +54,7 @@ def __init__( include_rabbitmq=False, include_mysql_db=False, include_sqlserver=False, + include_localstack=False, ) -> None: super().__init__(name, doc=doc, github_workflow=github_workflow, scenario_groups=scenario_groups) @@ -98,6 +100,9 @@ def __init__( if include_sqlserver: self._supporting_containers.append(MsSqlServerContainer(host_log_folder=self.host_log_folder)) + if include_localstack: + self._supporting_containers.append(LocalstackContainer(host_log_folder=self.host_log_folder)) + self._required_containers.extend(self._supporting_containers) def get_image_list(self, library: str, weblog: str) -> list[str]: From 11a54c64e1fcbd7b541707196a31fa026530e05e Mon Sep 17 00:00:00 2001 From: William Conti Date: Fri, 20 Dec 2024 13:48:51 -0500 Subject: [PATCH 07/19] fix builds --- .../dotnet/weblog/Endpoints/DsmEndpoint.cs | 23 +++++++++++++++---- .../weblog/Endpoints/MessagingEndpoints.cs | 10 ++++---- .../springboot/aws/KinesisConnector.java | 3 ++- .../springboot/aws/SnsConnector.java | 5 ++-- 4 files changed, 29 insertions(+), 12 deletions(-) diff --git a/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs b/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs index e064a46ae94..e1eb9120c18 100644 --- a/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs +++ b/utils/build/docker/dotnet/weblog/Endpoints/DsmEndpoint.cs @@ -170,7 +170,19 @@ class SqsProducer { public static async Task DoWork(string queue, string message) { - var sqsClient = new AmazonSQSClient(); + string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + + IAmazonSQS sqsClient; + if (!string.IsNullOrEmpty(awsUrl)) + { + // If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL + sqsClient = new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); + } + else + { + // If SYSTEM_TESTS_AWS_URL is not set, create a default client + sqsClient = new AmazonSQSClient(); + } // create queue Console.WriteLine($"[SQS] Produce: Creating queue {queue}"); CreateQueueResponse responseCreate = await sqsClient.CreateQueueAsync(queue); @@ -189,15 +201,16 @@ public static async Task DoWork(string queue, string message) { string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + IAmazonSQS sqsClient; if (!string.IsNullOrEmpty(awsUrl)) { - // If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL - return new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); + // If awsUrl is set, use it for ServiceURL + sqsClient = new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); } else { - // If SYSTEM_TESTS_AWS_URL is not set, create a default client - return new AmazonSQSClient(); + // If awsUrl is not set, create a default client + sqsClient = new AmazonSQSClient(); } // Create queue Console.WriteLine($"[SQS] Consume: Creating queue {queue}"); diff --git a/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs b/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs index 0c436282fdd..6befb11e531 100644 --- a/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs +++ b/utils/build/docker/dotnet/weblog/Endpoints/MessagingEndpoints.cs @@ -135,15 +135,16 @@ private static async Task SqsProduce(string queue, string message) { string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + IAmazonSQS sqsClient; if (!string.IsNullOrEmpty(awsUrl)) { // If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL - return new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); + sqsClient = new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); } else { // If SYSTEM_TESTS_AWS_URL is not set, create a default client - return new AmazonSQSClient(); + sqsClient = new AmazonSQSClient(); } var responseCreate = await sqsClient.CreateQueueAsync(queue); var qUrl = responseCreate.QueueUrl; @@ -157,15 +158,16 @@ private static async Task SqsConsume(string queue, TimeSpan timeout, strin string awsUrl = Environment.GetEnvironmentVariable("SYSTEM_TESTS_AWS_URL"); + IAmazonSQS sqsClient; if (!string.IsNullOrEmpty(awsUrl)) { // If SYSTEM_TESTS_AWS_URL is set, use it for ServiceURL - return new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); + sqsClient = new AmazonSQSClient(new AmazonSQSConfig { ServiceURL = awsUrl }); } else { // If SYSTEM_TESTS_AWS_URL is not set, create a default client - return new AmazonSQSClient(); + sqsClient = new AmazonSQSClient(); } var responseCreate = await sqsClient.CreateQueueAsync(queue); var qUrl = responseCreate.QueueUrl; diff --git a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java index 526d666a424..c09e6d6405e 100644 --- a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java +++ b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/KinesisConnector.java @@ -7,6 +7,7 @@ import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; +import software.amazon.awssdk.services.kinesis.KinesisClientBuilder; import software.amazon.awssdk.services.kinesis.model.CreateStreamRequest; import software.amazon.awssdk.services.kinesis.model.CreateStreamResponse; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; @@ -35,7 +36,7 @@ public KinesisConnector(String stream){ } public KinesisClient createKinesisClient() { - KinesisClient.Builder builder = KinesisClient.builder() + KinesisClientBuilder builder = KinesisClient.builder() .region(this.region) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()); diff --git a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java index 50ae1b34d5b..d24b3424a01 100644 --- a/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java +++ b/utils/build/docker/java/spring-boot/src/main/java/com/datadoghq/system_tests/springboot/aws/SnsConnector.java @@ -7,6 +7,7 @@ import software.amazon.awssdk.services.sqs.model.QueueAttributeName; import software.amazon.awssdk.services.sqs.model.SetQueueAttributesRequest; import software.amazon.awssdk.services.sns.SnsClient; +import software.amazon.awssdk.services.sns.SnsClientBuilder; import software.amazon.awssdk.services.sns.model.CreateTopicRequest; import software.amazon.awssdk.services.sns.model.CreateTopicResponse; import software.amazon.awssdk.services.sns.model.PublishRequest; @@ -30,7 +31,7 @@ public SnsConnector(String topic){ } private static SnsClient createSnsClient() { - SnsClient.Builder builder = SnsClient.builder() + SnsClientBuilder builder = SnsClient.builder() .region(Region.US_EAST_1) .credentialsProvider(EnvironmentVariableCredentialsProvider.create()); @@ -42,7 +43,7 @@ private static SnsClient createSnsClient() { builder.endpointOverride(URI.create(systemTestsAwsUrl)); } - SnsClient snsclient = builder.build(); + SnsClient snsClient = builder.build(); return snsClient; } From 4a0d6dfd79f85d23a289fb3aade46c452dbb4dda Mon Sep 17 00:00:00 2001 From: William Conti Date: Fri, 20 Dec 2024 15:26:03 -0500 Subject: [PATCH 08/19] enable all tests --- tests/integrations/crossed_integrations/test_kinesis.py | 1 - tests/integrations/crossed_integrations/test_sns_to_sqs.py | 1 - tests/integrations/crossed_integrations/test_sqs.py | 2 -- tests/integrations/test_dsm.py | 3 --- 4 files changed, 7 deletions(-) diff --git a/tests/integrations/crossed_integrations/test_kinesis.py b/tests/integrations/crossed_integrations/test_kinesis.py index 36d3eb5cd84..aed36063b48 100644 --- a/tests/integrations/crossed_integrations/test_kinesis.py +++ b/tests/integrations/crossed_integrations/test_kinesis.py @@ -215,7 +215,6 @@ def validate_kinesis_spans(self, producer_interface, consumer_interface, stream) @scenarios.crossed_tracing_libraries -# @irrelevant(True, reason="AWS Tests are not currently stable.") @features.aws_kinesis_span_creationcontext_propagation_via_message_attributes_with_dd_trace class Test_Kinesis_PROPAGATION_VIA_MESSAGE_ATTRIBUTES(_Test_Kinesis): buddy_interface = interfaces.python_buddy diff --git a/tests/integrations/crossed_integrations/test_sns_to_sqs.py b/tests/integrations/crossed_integrations/test_sns_to_sqs.py index e2c0adc25fd..4427e604207 100644 --- a/tests/integrations/crossed_integrations/test_sns_to_sqs.py +++ b/tests/integrations/crossed_integrations/test_sns_to_sqs.py @@ -258,7 +258,6 @@ def validate_sns_spans(self, producer_interface, consumer_interface, queue, topi @scenarios.crossed_tracing_libraries @features.aws_sns_span_creationcontext_propagation_via_message_attributes_with_dd_trace -@irrelevant(True, reason="AWS Tests are not currently stable.") class Test_SNS_Propagation(_Test_SNS): buddy_interface = interfaces.python_buddy buddy = python_buddy diff --git a/tests/integrations/crossed_integrations/test_sqs.py b/tests/integrations/crossed_integrations/test_sqs.py index 3d73af37067..86996f9b2b0 100644 --- a/tests/integrations/crossed_integrations/test_sqs.py +++ b/tests/integrations/crossed_integrations/test_sqs.py @@ -235,7 +235,6 @@ def validate_sqs_spans(self, producer_interface, consumer_interface, queue): @scenarios.crossed_tracing_libraries -@irrelevant(True, reason="AWS Tests are not currently stable.") @features.aws_sqs_span_creationcontext_propagation_via_message_attributes_with_dd_trace class Test_SQS_PROPAGATION_VIA_MESSAGE_ATTRIBUTES(_Test_SQS): buddy_interface = interfaces.python_buddy @@ -248,7 +247,6 @@ class Test_SQS_PROPAGATION_VIA_MESSAGE_ATTRIBUTES(_Test_SQS): @scenarios.crossed_tracing_libraries -@irrelevant(True, reason="AWS Tests are not currently stable.") @features.aws_sqs_span_creationcontext_propagation_via_xray_header_with_dd_trace class Test_SQS_PROPAGATION_VIA_AWS_XRAY_HEADERS(_Test_SQS): buddy_interface = interfaces.java_buddy diff --git a/tests/integrations/test_dsm.py b/tests/integrations/test_dsm.py index 02fa67e96bf..141053b5d22 100644 --- a/tests/integrations/test_dsm.py +++ b/tests/integrations/test_dsm.py @@ -262,7 +262,6 @@ def test_dsm_rabbitmq(self): @features.datastreams_monitoring_support_for_sqs -@irrelevant(True, reason="AWS Tests are not currently stable.") @scenarios.integrations_aws class Test_DsmSQS: """Verify DSM stats points for AWS Sqs Service""" @@ -316,7 +315,6 @@ def test_dsm_sqs(self): @features.datastreams_monitoring_support_for_sns -@irrelevant(True, reason="AWS Tests are not currently stable.") @scenarios.integrations_aws class Test_DsmSNS: """Verify DSM stats points for AWS SNS Service""" @@ -376,7 +374,6 @@ def test_dsm_sns(self): @features.datastreams_monitoring_support_for_kinesis -@irrelevant(True, reason="AWS Tests are not currently stable.") @scenarios.integrations_aws class Test_DsmKinesis: """Verify DSM stats points for AWS Kinesis Service""" From f500235f6b0bca653ea5b335dd3b6cf0ace651e8 Mon Sep 17 00:00:00 2001 From: William Conti Date: Fri, 20 Dec 2024 15:51:04 -0500 Subject: [PATCH 09/19] remove delete resources calls --- .../crossed_integrations/test_kinesis.py | 58 +++++++--------- .../crossed_integrations/test_sns_to_sqs.py | 68 ++++++++----------- .../crossed_integrations/test_sqs.py | 58 +++++++--------- 3 files changed, 82 insertions(+), 102 deletions(-) diff --git a/tests/integrations/crossed_integrations/test_kinesis.py b/tests/integrations/crossed_integrations/test_kinesis.py index aed36063b48..867123ca82c 100644 --- a/tests/integrations/crossed_integrations/test_kinesis.py +++ b/tests/integrations/crossed_integrations/test_kinesis.py @@ -76,22 +76,19 @@ def setup_produce(self): send request A to weblog : this request will produce a Kinesis message send request B to library buddy, this request will consume Kinesis message """ - try: - message = ( - "[crossed_integrations/test_kinesis.py][Kinesis] Hello from Kinesis " - f"[{context.library.library} weblog->{self.buddy_interface.name}] test produce at {self.unique_id}" - ) - - self.production_response = weblog.get( - "/kinesis/produce", params={"stream": self.WEBLOG_TO_BUDDY_STREAM, "message": message}, timeout=120 - ) - self.consume_response = self.buddy.get( - "/kinesis/consume", - params={"stream": self.WEBLOG_TO_BUDDY_STREAM, "message": message, "timeout": 60}, - timeout=61, - ) - finally: - delete_kinesis_stream(self.WEBLOG_TO_BUDDY_STREAM) + message = ( + "[crossed_integrations/test_kinesis.py][Kinesis] Hello from Kinesis " + f"[{context.library.library} weblog->{self.buddy_interface.name}] test produce at {self.unique_id}" + ) + + self.production_response = weblog.get( + "/kinesis/produce", params={"stream": self.WEBLOG_TO_BUDDY_STREAM, "message": message}, timeout=120 + ) + self.consume_response = self.buddy.get( + "/kinesis/consume", + params={"stream": self.WEBLOG_TO_BUDDY_STREAM, "message": message, "timeout": 60}, + timeout=61, + ) def test_produce(self): """Check that a message produced to Kinesis is correctly ingested by a Datadog tracer""" @@ -139,22 +136,19 @@ def setup_consume(self): request A: GET /library_buddy/produce_kinesis_message request B: GET /weblog/consume_kinesis_message """ - try: - message = ( - "[crossed_integrations/test_kinesis.py][Kinesis] Hello from Kinesis " - f"[{self.buddy_interface.name}->{context.library.library} weblog] test consume at {self.unique_id}" - ) - - self.production_response = self.buddy.get( - "/kinesis/produce", params={"stream": self.BUDDY_TO_WEBLOG_STREAM, "message": message}, timeout=500 - ) - self.consume_response = weblog.get( - "/kinesis/consume", - params={"stream": self.BUDDY_TO_WEBLOG_STREAM, "message": message, "timeout": 60}, - timeout=61, - ) - finally: - delete_kinesis_stream(self.BUDDY_TO_WEBLOG_STREAM) + message = ( + "[crossed_integrations/test_kinesis.py][Kinesis] Hello from Kinesis " + f"[{self.buddy_interface.name}->{context.library.library} weblog] test consume at {self.unique_id}" + ) + + self.production_response = self.buddy.get( + "/kinesis/produce", params={"stream": self.BUDDY_TO_WEBLOG_STREAM, "message": message}, timeout=500 + ) + self.consume_response = weblog.get( + "/kinesis/consume", + params={"stream": self.BUDDY_TO_WEBLOG_STREAM, "message": message, "timeout": 60}, + timeout=61, + ) def test_consume(self): """Check that a message by an app instrumented by a Datadog tracer is correctly ingested""" diff --git a/tests/integrations/crossed_integrations/test_sns_to_sqs.py b/tests/integrations/crossed_integrations/test_sns_to_sqs.py index 4427e604207..0fb4e196ea7 100644 --- a/tests/integrations/crossed_integrations/test_sns_to_sqs.py +++ b/tests/integrations/crossed_integrations/test_sns_to_sqs.py @@ -106,25 +106,21 @@ def setup_produce(self): send request A to weblog : this request will produce a sns message send request B to library buddy, this request will consume sns message """ - try: - message = ( - "[crossed_integrations/test_sns_to_sqs.py][SNS] Hello from SNS " - f"[{context.library.library} weblog->{self.buddy_interface.name}] test produce at {self.unique_id}" - ) - - self.production_response = weblog.get( - "/sns/produce", - params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "topic": self.WEBLOG_TO_BUDDY_TOPIC, "message": message}, - timeout=60, - ) - self.consume_response = self.buddy.get( - "/sns/consume", - params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "timeout": 60, "message": message}, - timeout=61, - ) - finally: - delete_sns_topic(self.WEBLOG_TO_BUDDY_TOPIC) - delete_sqs_queue(self.WEBLOG_TO_BUDDY_QUEUE) + message = ( + "[crossed_integrations/test_sns_to_sqs.py][SNS] Hello from SNS " + f"[{context.library.library} weblog->{self.buddy_interface.name}] test produce at {self.unique_id}" + ) + + self.production_response = weblog.get( + "/sns/produce", + params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "topic": self.WEBLOG_TO_BUDDY_TOPIC, "message": message}, + timeout=60, + ) + self.consume_response = self.buddy.get( + "/sns/consume", + params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "timeout": 60, "message": message}, + timeout=61, + ) def test_produce(self): """Check that a message produced to sns is correctly ingested by a Datadog tracer""" @@ -171,25 +167,21 @@ def setup_consume(self): request A: GET /library_buddy/produce_sns_message request B: GET /weblog/consume_sns_message """ - try: - message = ( - "[crossed_integrations/test_sns_to_sqs.py][SNS] Hello from SNS " - f"[{self.buddy_interface.name}->{context.library.library} weblog] test consume at {self.unique_id}" - ) - - self.production_response = self.buddy.get( - "/sns/produce", - params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "topic": self.BUDDY_TO_WEBLOG_TOPIC, "message": message}, - timeout=60, - ) - self.consume_response = weblog.get( - "/sns/consume", - params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "timeout": 60, "message": message}, - timeout=61, - ) - finally: - delete_sns_topic(self.BUDDY_TO_WEBLOG_TOPIC) - delete_sqs_queue(self.BUDDY_TO_WEBLOG_QUEUE) + message = ( + "[crossed_integrations/test_sns_to_sqs.py][SNS] Hello from SNS " + f"[{self.buddy_interface.name}->{context.library.library} weblog] test consume at {self.unique_id}" + ) + + self.production_response = self.buddy.get( + "/sns/produce", + params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "topic": self.BUDDY_TO_WEBLOG_TOPIC, "message": message}, + timeout=60, + ) + self.consume_response = weblog.get( + "/sns/consume", + params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "timeout": 60, "message": message}, + timeout=61, + ) def test_consume(self): """Check that a message by an app instrumented by a Datadog tracer is correctly ingested""" diff --git a/tests/integrations/crossed_integrations/test_sqs.py b/tests/integrations/crossed_integrations/test_sqs.py index 86996f9b2b0..bc400e679c3 100644 --- a/tests/integrations/crossed_integrations/test_sqs.py +++ b/tests/integrations/crossed_integrations/test_sqs.py @@ -91,22 +91,19 @@ def setup_produce(self): send request A to weblog : this request will produce a sqs message send request B to library buddy, this request will consume sqs message """ - try: - message = ( - "[crossed_integrations/sqs.py][SQS] Hello from SQS " - f"[{context.library.library} weblog->{self.buddy_interface.name}] test produce: {self.unique_id}" - ) - - self.production_response = weblog.get( - "/sqs/produce", params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "message": message}, timeout=60 - ) - self.consume_response = self.buddy.get( - "/sqs/consume", - params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "timeout": 60, "message": message}, - timeout=61, - ) - finally: - delete_sqs_queue(self.WEBLOG_TO_BUDDY_QUEUE) + message = ( + "[crossed_integrations/sqs.py][SQS] Hello from SQS " + f"[{context.library.library} weblog->{self.buddy_interface.name}] test produce: {self.unique_id}" + ) + + self.production_response = weblog.get( + "/sqs/produce", params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "message": message}, timeout=60 + ) + self.consume_response = self.buddy.get( + "/sqs/consume", + params={"queue": self.WEBLOG_TO_BUDDY_QUEUE, "timeout": 60, "message": message}, + timeout=61, + ) def test_produce(self): """Check that a message produced to sqs is correctly ingested by a Datadog tracer""" @@ -158,22 +155,19 @@ def setup_consume(self): request A: GET /library_buddy/produce_sqs_message request B: GET /weblog/consume_sqs_message """ - try: - message = ( - "[crossed_integrations/test_sqs.py][SQS] Hello from SQS " - f"[{self.buddy_interface.name}->{context.library.library} weblog] test consume: {self.unique_id}" - ) - - self.production_response = self.buddy.get( - "/sqs/produce", params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "message": message}, timeout=60 - ) - self.consume_response = weblog.get( - "/sqs/consume", - params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "timeout": 60, "message": message}, - timeout=61, - ) - finally: - delete_sqs_queue(self.BUDDY_TO_WEBLOG_QUEUE) + message = ( + "[crossed_integrations/test_sqs.py][SQS] Hello from SQS " + f"[{self.buddy_interface.name}->{context.library.library} weblog] test consume: {self.unique_id}" + ) + + self.production_response = self.buddy.get( + "/sqs/produce", params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "message": message}, timeout=60 + ) + self.consume_response = weblog.get( + "/sqs/consume", + params={"queue": self.BUDDY_TO_WEBLOG_QUEUE, "timeout": 60, "message": message}, + timeout=61, + ) def test_consume(self): """Check that a message by an app instrumented by a Datadog tracer is correctly ingested""" From 32f327ac5d4d58234e940503fe11dd6b0a93cb93 Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 23 Dec 2024 09:59:31 -0500 Subject: [PATCH 10/19] fix python and nodejs aws tests --- tests/integrations/test_dsm.py | 19 +++++++++--- .../integrations/messaging/aws/kinesis.js | 6 ++-- .../integrations/messaging/aws/shared.js | 4 +++ .../express/integrations/messaging/aws/sns.js | 29 ++++++++++++++----- .../express/integrations/messaging/aws/sqs.js | 18 ++++++++---- .../flask/integrations/messaging/aws/sns.py | 3 +- .../flask/integrations/messaging/aws/sqs.py | 9 +++--- 7 files changed, 63 insertions(+), 25 deletions(-) create mode 100644 utils/build/docker/nodejs/express/integrations/messaging/aws/shared.js diff --git a/tests/integrations/test_dsm.py b/tests/integrations/test_dsm.py index 141053b5d22..9ecfe28b542 100644 --- a/tests/integrations/test_dsm.py +++ b/tests/integrations/test_dsm.py @@ -4,6 +4,7 @@ import base64 import json +import os from tests.integrations.utils import ( compute_dsm_hash, @@ -23,6 +24,16 @@ DSM_EXCHANGE = "dsm-system-tests-exchange" DSM_ROUTING_KEY = "dsm-system-tests-routing-key" +# AWS Specific +AWS_HOST = os.getenv('SYSTEM_TESTS_AWS_URL', "") + +# TO DO CHECK RUNNER ENV FOR A SYSTEM TESTS AWS ENV INDICATING IF AWS TESTS IS LOCAL OR REMOTE + +# If the AWS host points to localstack, we are using local AWS mocking, else assume the real account +LOCAL_AWS_ACCT = '000000000000' #if 'localstack' in AWS_HOST else "601427279990" +AWS_ACCT = LOCAL_AWS_ACCT #if 'localstack' in AWS_HOST else "601427279990" +AWS_TESTING = 'local' if LOCAL_AWS_ACCT == AWS_ACCT else 'remote' + # AWS Kinesis Specific DSM_STREAM = "dsm-system-tests-stream" @@ -344,7 +355,7 @@ def setup_dsm_sns(self): def test_dsm_sns(self): assert self.r.text == "ok" - topic = self.topic if context.library.library == "java" else f"arn:aws:sns:us-east-1:601427279990:{self.topic}" + topic = self.topic if context.library.library == "java" else f"arn:aws:sns:us-east-1:{AWS_ACCT}:{self.topic}" hash_inputs = { "default": { @@ -352,8 +363,8 @@ def test_dsm_sns(self): "tags_in": ("direction:in", f"topic:{self.queue}", "type:sqs"), }, "nodejs": { - "producer": 15466202493380574985, - "consumer": 9372735371403270535, + "producer": 15466202493380574985 if AWS_TESTING == 'remote' else 3703335291192845713, + "consumer": 9372735371403270535 if AWS_TESTING == 'remote' else 797339341876345963, "tags_out": ("direction:out", f"topic:{topic}", "type:sns"), "tags_in": ("direction:in", f"topic:{self.queue}", "type:sqs"), }, @@ -401,7 +412,7 @@ def setup_dsm_kinesis(self): def test_dsm_kinesis(self): assert self.r.text == "ok" - stream_arn = f"arn:aws:kinesis:us-east-1:601427279990:stream/{self.stream}" + stream_arn = f"arn:aws:kinesis:us-east-1:{AWS_ACCT}:stream/{self.stream}" hash_inputs = { "default": { diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js index b22856dffbb..384b8a4b9b6 100644 --- a/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/kinesis.js @@ -1,13 +1,13 @@ const AWS = require('aws-sdk') const tracer = require('dd-trace') -const HOST = process.env.SYSTEM_TESTS_AWS_URL ?? 'https://kinesis.us-east-1.amazonaws.com/601427279990' +const { AWS_HOST } = require('./shared') const kinesisProduce = (stream, message, partitionKey = '1', timeout = 60000) => { // Create a Kinesis client const kinesis = new AWS.Kinesis({ region: 'us-east-1', - endpoint: HOST + endpoint: AWS_HOST }) message = JSON.stringify({ message }) @@ -71,7 +71,7 @@ const kinesisConsume = (stream, timeout = 60000, message, sequenceNumber) => { // Create a Kinesis client const kinesis = new AWS.Kinesis({ region: 'us-east-1', - endpoint: HOST + endpoint: AWS_HOST }) console.log(`[Kinesis] Looking for the following message for stream: ${stream}: ${message}`) diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/shared.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/shared.js new file mode 100644 index 00000000000..a480f853e5e --- /dev/null +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/shared.js @@ -0,0 +1,4 @@ +const AWS_HOST = process.env.SYSTEM_TESTS_AWS_URL ?? 'https://sns.us-east-1.amazonaws.com' +const AWS_ACCT = process.env.SYSTEM_TESTS_AWS_URL ? '000000000000' : '601427279990' + +module.exports = { AWS_HOST, AWS_ACCT } \ No newline at end of file diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/sns.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/sns.js index 38a0e5d324c..2baf09462c2 100644 --- a/utils/build/docker/nodejs/express/integrations/messaging/aws/sns.js +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/sns.js @@ -1,15 +1,21 @@ const AWS = require('aws-sdk') const tracer = require('dd-trace') +const { AWS_HOST, AWS_ACCT } = require('./shared') + let TopicArn let QueueUrl -const HOST = process.env.SYSTEM_TESTS_AWS_URL ?? 'https://sns.us-east-1.amazonaws.com/601427279990' - const snsPublish = (queue, topic, message) => { // Create an SQS client - const sns = new AWS.SNS() - const sqs = new AWS.SQS() + const sns = new AWS.SNS({ + region: 'us-east-1', + endpoint: AWS_HOST + }) + const sqs = new AWS.SQS({ + region: 'us-east-1', + endpoint: AWS_HOST + }) const messageToSend = message ?? 'Hello from SNS JavaScript injection' @@ -22,13 +28,15 @@ const snsPublish = (queue, topic, message) => { TopicArn = data.TopicArn - sqs.createQueue({ QueueName: queue }, (err) => { + sqs.createQueue({ QueueName: queue }, (err, data) => { if (err) { console.log(err) reject(err) } - QueueUrl = `${HOST}/${queue}` + console.log(data) + + QueueUrl = `${AWS_HOST}/${AWS_ACCT}/${queue}` sqs.getQueueAttributes({ QueueUrl, AttributeNames: ['All'] }, (err, data) => { if (err) { @@ -36,6 +44,8 @@ const snsPublish = (queue, topic, message) => { reject(err) } + console.log('sns data') + console.log(data) const QueueArn = data.Attributes.QueueArn const policy = { @@ -104,9 +114,12 @@ const snsPublish = (queue, topic, message) => { const snsConsume = async (queue, timeout, expectedMessage) => { // Create an SQS client - const sqs = new AWS.SQS() + const sqs = new AWS.SQS({ + region: 'us-east-1', + endpoint: AWS_HOST + }) - const queueUrl = `${HOST}/${queue}` + const queueUrl = `${AWS_HOST}/${AWS_ACCT}/${queue}` return new Promise((resolve, reject) => { let messageFound = false diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/sqs.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/sqs.js index b67d87aa6b9..264d0c687a6 100644 --- a/utils/build/docker/nodejs/express/integrations/messaging/aws/sqs.js +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/sqs.js @@ -1,11 +1,14 @@ const AWS = require('aws-sdk') const tracer = require('dd-trace') -const HOST = process.env.SYSTEM_TESTS_AWS_URL ?? 'https://sqs.us-east-1.amazonaws.com/601427279990' +const { AWS_HOST, AWS_ACCT } = require('./shared') const sqsProduce = (queue, message) => { // Create an SQS client - const sqs = new AWS.SQS() + const sqs = new AWS.SQS({ + region: 'us-east-1', + endpoint: AWS_HOST + }) const messageToSend = message ?? 'Hello from SQS JavaScript injection' @@ -20,7 +23,7 @@ const sqsProduce = (queue, message) => { // Send messages to the queue const produce = () => { sqs.sendMessage({ - QueueUrl: `${HOST}/${queue}`, + QueueUrl: `${AWS_HOST}/${AWS_ACCT}/${queue}`, MessageBody: messageToSend }, (err, data) => { if (err) { @@ -43,10 +46,15 @@ const sqsProduce = (queue, message) => { const sqsConsume = async (queue, timeout, expectedMessage) => { // Create an SQS client - const sqs = new AWS.SQS() + const sqs = new AWS.SQS({ + region: 'us-east-1', + endpoint: AWS_HOST + }) + + const queueUrl = `${AWS_HOST}/${AWS_ACCT}/${queue}` - const queueUrl = `${HOST}/${queue}` console.log(`[SQS] Looking for message: ${expectedMessage} in queue: ${queue}`) + return new Promise((resolve, reject) => { let messageFound = false diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/sns.py b/utils/build/docker/python/flask/integrations/messaging/aws/sns.py index beffa283337..6cd850072b2 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/sns.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/sns.py @@ -8,6 +8,7 @@ SNS_HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sns.us-east-1.amazonaws.com/601427279990") SQS_HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sqs.us-east-1.amazonaws.com/601427279990") +AWS_ACCT = '000000000000' if 'localstack' in SQS_HOST else '601427279990' def sns_produce(queue, topic, message): @@ -72,7 +73,7 @@ def sns_consume(queue, expectedMessage, timeout=60): while not consumed_message and time.time() - start_time < timeout: try: - response = sqs.receive_message(QueueUrl=f"https://sqs.us-east-1.amazonaws.com/601427279990/{queue}") + response = sqs.receive_message(QueueUrl=f"https://sqs.us-east-1.amazonaws.com/{AWS_ACCT}/{queue}") if response and "Messages" in response: for message in response["Messages"]: print("[SNS->SQS] Consumed: ") diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py b/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py index 51b9031bc1d..5aa5000e549 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py @@ -6,7 +6,7 @@ HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sqs.us-east-1.amazonaws.com/601427279990") - +AWS_ACCT = '000000000000' if 'localstack' in HOST else '601427279990' def sqs_produce(queue, message, timeout=60): """ @@ -22,8 +22,9 @@ def sqs_produce(queue, message, timeout=60): while not queue_created and time.time() < start + timeout: try: - sqs.create_queue(QueueName=queue) + data = sqs.create_queue(QueueName=queue) queue_created = True + logging.info(data) logging.info(f"Created SQS Queue with name: {queue}") except Exception as e: exc = e @@ -34,7 +35,7 @@ def sqs_produce(queue, message, timeout=60): while not message_sent and time.time() < start + timeout: try: # Send the message to the SQS queue - sqs.send_message(QueueUrl=f"https://sqs.us-east-1.amazonaws.com/601427279990/{queue}", MessageBody=message) + sqs.send_message(QueueUrl=f"https://sqs.us-east-1.amazonaws.com/{AWS_ACCT}/{queue}", MessageBody=message) message_sent = True except Exception as e: exc = e @@ -60,7 +61,7 @@ def sqs_consume(queue, expectedMessage, timeout=60): while not consumed_message and time.time() - start_time < timeout: try: - response = sqs.receive_message(QueueUrl=f"https://sqs.us-east-1.amazonaws.com/601427279990/{queue}") + response = sqs.receive_message(QueueUrl=f"https://sqs.us-east-1.amazonaws.com/{AWS_ACCT}/{queue}") if response and "Messages" in response: for message in response["Messages"]: if message["Body"] == expectedMessage: From cca3707200a4ce0267fc89fd1718c47508f2c2d9 Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 23 Dec 2024 10:17:23 -0500 Subject: [PATCH 11/19] fix lint --- tests/integrations/test_dsm.py | 12 ++++++------ .../python/flask/integrations/messaging/aws/sns.py | 2 +- .../python/flask/integrations/messaging/aws/sqs.py | 3 ++- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/tests/integrations/test_dsm.py b/tests/integrations/test_dsm.py index 9ecfe28b542..b6054ec436d 100644 --- a/tests/integrations/test_dsm.py +++ b/tests/integrations/test_dsm.py @@ -25,14 +25,14 @@ DSM_ROUTING_KEY = "dsm-system-tests-routing-key" # AWS Specific -AWS_HOST = os.getenv('SYSTEM_TESTS_AWS_URL', "") +AWS_HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "") # TO DO CHECK RUNNER ENV FOR A SYSTEM TESTS AWS ENV INDICATING IF AWS TESTS IS LOCAL OR REMOTE # If the AWS host points to localstack, we are using local AWS mocking, else assume the real account -LOCAL_AWS_ACCT = '000000000000' #if 'localstack' in AWS_HOST else "601427279990" -AWS_ACCT = LOCAL_AWS_ACCT #if 'localstack' in AWS_HOST else "601427279990" -AWS_TESTING = 'local' if LOCAL_AWS_ACCT == AWS_ACCT else 'remote' +LOCAL_AWS_ACCT = "000000000000" # if 'localstack' in AWS_HOST else "601427279990" +AWS_ACCT = LOCAL_AWS_ACCT # if 'localstack' in AWS_HOST else "601427279990" +AWS_TESTING = "local" if LOCAL_AWS_ACCT == AWS_ACCT else "remote" # AWS Kinesis Specific DSM_STREAM = "dsm-system-tests-stream" @@ -363,8 +363,8 @@ def test_dsm_sns(self): "tags_in": ("direction:in", f"topic:{self.queue}", "type:sqs"), }, "nodejs": { - "producer": 15466202493380574985 if AWS_TESTING == 'remote' else 3703335291192845713, - "consumer": 9372735371403270535 if AWS_TESTING == 'remote' else 797339341876345963, + "producer": 15466202493380574985 if AWS_TESTING == "remote" else 3703335291192845713, + "consumer": 9372735371403270535 if AWS_TESTING == "remote" else 797339341876345963, "tags_out": ("direction:out", f"topic:{topic}", "type:sns"), "tags_in": ("direction:in", f"topic:{self.queue}", "type:sqs"), }, diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/sns.py b/utils/build/docker/python/flask/integrations/messaging/aws/sns.py index 6cd850072b2..f39c8a2754b 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/sns.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/sns.py @@ -8,7 +8,7 @@ SNS_HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sns.us-east-1.amazonaws.com/601427279990") SQS_HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sqs.us-east-1.amazonaws.com/601427279990") -AWS_ACCT = '000000000000' if 'localstack' in SQS_HOST else '601427279990' +AWS_ACCT = "000000000000" if "localstack" in SQS_HOST else "601427279990" def sns_produce(queue, topic, message): diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py b/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py index 5aa5000e549..3696d33b438 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py @@ -6,7 +6,8 @@ HOST = os.getenv("SYSTEM_TESTS_AWS_URL", "https://sqs.us-east-1.amazonaws.com/601427279990") -AWS_ACCT = '000000000000' if 'localstack' in HOST else '601427279990' +AWS_ACCT = "000000000000" if "localstack" in HOST else "601427279990" + def sqs_produce(queue, message, timeout=60): """ From 1609e2bd3c3a92c6640cf08eb1071eb9d887e848 Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 23 Dec 2024 10:22:52 -0500 Subject: [PATCH 12/19] fix lint --- .../docker/nodejs/express/integrations/messaging/aws/shared.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/build/docker/nodejs/express/integrations/messaging/aws/shared.js b/utils/build/docker/nodejs/express/integrations/messaging/aws/shared.js index a480f853e5e..c9c25199c04 100644 --- a/utils/build/docker/nodejs/express/integrations/messaging/aws/shared.js +++ b/utils/build/docker/nodejs/express/integrations/messaging/aws/shared.js @@ -1,4 +1,4 @@ const AWS_HOST = process.env.SYSTEM_TESTS_AWS_URL ?? 'https://sns.us-east-1.amazonaws.com' const AWS_ACCT = process.env.SYSTEM_TESTS_AWS_URL ? '000000000000' : '601427279990' -module.exports = { AWS_HOST, AWS_ACCT } \ No newline at end of file +module.exports = { AWS_HOST, AWS_ACCT } From 185d6943955ab2932b34866a97cd648baaec24dc Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 23 Dec 2024 10:52:09 -0500 Subject: [PATCH 13/19] add env for buddy --- utils/_context/containers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/utils/_context/containers.py b/utils/_context/containers.py index 58f8b11e23d..053f2a5091b 100644 --- a/utils/_context/containers.py +++ b/utils/_context/containers.py @@ -611,6 +611,7 @@ def __init__(self, name, image_name, host_log_folder, host_port, trace_agent_por # "DD_TRACE_DEBUG": "true", "DD_AGENT_HOST": "proxy", "DD_TRACE_AGENT_PORT": trace_agent_port, + "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566" }, ) From 2c5e23c8c608ed3f76d58362aa39612211165178 Mon Sep 17 00:00:00 2001 From: William Conti Date: Thu, 26 Dec 2024 11:27:01 -0500 Subject: [PATCH 14/19] fix python buddy --- .../build/docker/python/flask-poc.Dockerfile | 1 + .../flask/integrations/messaging/aws/sns.py | 23 ++++++++++--------- .../flask/integrations/messaging/aws/sqs.py | 4 +++- 3 files changed, 16 insertions(+), 12 deletions(-) diff --git a/utils/build/docker/python/flask-poc.Dockerfile b/utils/build/docker/python/flask-poc.Dockerfile index 3fc8d89d686..0e6e9d14f57 100644 --- a/utils/build/docker/python/flask-poc.Dockerfile +++ b/utils/build/docker/python/flask-poc.Dockerfile @@ -16,6 +16,7 @@ ENV _DD_APPSEC_DEDUPLICATION_ENABLED=false # Cross Tracer Integration Testing for Trace Context Propagation ENV DD_BOTOCORE_PROPAGATION_ENABLED=true ENV DD_KAFKA_PROPAGATION_ENABLED=true +ENV DD_TRACE_API_VERSION='v0.4' ENV LOG_LEVEL='DEBUG' diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/sns.py b/utils/build/docker/python/flask/integrations/messaging/aws/sns.py index f39c8a2754b..967c5b560f3 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/sns.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/sns.py @@ -46,17 +46,17 @@ def sns_produce(queue, topic, message): sqs.set_queue_attributes(QueueUrl=sqs_url, Attributes={"Policy": json.dumps(policy)}) sns.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=sqs_arn, Attributes={"RawMessageDelivery": "true"}) - print(f"[SNS->SQS] Created SNS Topic: {topic} and SQS Queue: {queue}") + logging.info(f"[SNS->SQS] Created SNS Topic: {topic} and SQS Queue: {queue}") except Exception as e: - print(f"[SNS->SQS] Error during Python SNS create topic or SQS create queue: {str(e)}") + logging.error(f"[SNS->SQS] Error during Python SNS create topic or SQS create queue: {str(e)}") try: # Send the message to the SNS topic sns.publish(TopicArn=topic_arn, Message=message) - print("[SNS->SQS] Python SNS messaged published successfully") + logging.info("[SNS->SQS] Python SNS messaged published successfully") return "SNS Produce ok" except Exception as e: - print(f"[SNS->SQS] Error during Python SNS publish message: {str(e)}") + logging.error(f"[SNS->SQS] Error during Python SNS publish message: {str(e)}") return {"error": f"[SNS->SQS] Error during Python SNS publish message: {str(e)}"} @@ -67,17 +67,18 @@ def sns_consume(queue, expectedMessage, timeout=60): # Create an SQS client sqs = boto3.client("sqs", region_name="us-east-1", endpoint_url=SQS_HOST) + response = sqs.get_queue_url(QueueName=queue) consumed_message = None start_time = time.time() while not consumed_message and time.time() - start_time < timeout: try: - response = sqs.receive_message(QueueUrl=f"https://sqs.us-east-1.amazonaws.com/{AWS_ACCT}/{queue}") + response = sqs.receive_message(QueueUrl=response.get("QueueUrl")) if response and "Messages" in response: for message in response["Messages"]: - print("[SNS->SQS] Consumed: ") - print(message) + logging.info("[SNS->SQS] Consumed: ") + logging.info(message) if message["Body"] == expectedMessage: consumed_message = message["Body"] logging.info("[SNS->SQS] Success. Found the following message: " + consumed_message) @@ -85,15 +86,15 @@ def sns_consume(queue, expectedMessage, timeout=60): else: # entire message may be json within the body try: - print("[SNS->SQS] Trying to decode raw message: ") - print(message.get("Body", "")) + logging.info("[SNS->SQS] Trying to decode raw message: ") + logging.info(message.get("Body", "")) message_json = json.loads(message["Body"]) if message_json.get("Message", "") == expectedMessage: consumed_message = message_json["Message"] - print("[SNS->SQS] Success. Found the following message: " + consumed_message) + logging.info("[SNS->SQS] Success. Found the following message: " + consumed_message) break except Exception as e: - print(e) + logging.error(e) pass except Exception as e: diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py b/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py index 3696d33b438..36e4c302e97 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py @@ -26,6 +26,7 @@ def sqs_produce(queue, message, timeout=60): data = sqs.create_queue(QueueName=queue) queue_created = True logging.info(data) + logging.info(data.get("QueueUrl")) logging.info(f"Created SQS Queue with name: {queue}") except Exception as e: exc = e @@ -56,13 +57,14 @@ def sqs_consume(queue, expectedMessage, timeout=60): """ # Create an SQS client sqs = boto3.client("sqs", region_name="us-east-1", endpoint_url=HOST) + response = sqs.get_queue_url(QueueName=queue) consumed_message = None start_time = time.time() while not consumed_message and time.time() - start_time < timeout: try: - response = sqs.receive_message(QueueUrl=f"https://sqs.us-east-1.amazonaws.com/{AWS_ACCT}/{queue}") + response = sqs.receive_message(QueueUrl=response.get("QueueUrl")) if response and "Messages" in response: for message in response["Messages"]: if message["Body"] == expectedMessage: From a15f77822d02b185c6358c5d45df9292218205dd Mon Sep 17 00:00:00 2001 From: William Conti Date: Thu, 26 Dec 2024 11:38:11 -0500 Subject: [PATCH 15/19] fix formatting --- utils/_context/containers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/utils/_context/containers.py b/utils/_context/containers.py index 053f2a5091b..e87ca850c78 100644 --- a/utils/_context/containers.py +++ b/utils/_context/containers.py @@ -611,7 +611,7 @@ def __init__(self, name, image_name, host_log_folder, host_port, trace_agent_por # "DD_TRACE_DEBUG": "true", "DD_AGENT_HOST": "proxy", "DD_TRACE_AGENT_PORT": trace_agent_port, - "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566" + "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566", }, ) From a0238d466e0947941802f628dd2069c9067d25ea Mon Sep 17 00:00:00 2001 From: William Conti Date: Fri, 3 Jan 2025 11:18:24 -0500 Subject: [PATCH 16/19] disable sqs xray test --- .../crossed_integrations/test_sqs.py | 1 + utils/_context/_scenarios/endtoend.py | 7 +++++++ utils/_context/_scenarios/integrations.py | 5 +++++ utils/_context/containers.py | 19 +++++++++++++++++-- 4 files changed, 30 insertions(+), 2 deletions(-) diff --git a/tests/integrations/crossed_integrations/test_sqs.py b/tests/integrations/crossed_integrations/test_sqs.py index bc400e679c3..0b02739b23c 100644 --- a/tests/integrations/crossed_integrations/test_sqs.py +++ b/tests/integrations/crossed_integrations/test_sqs.py @@ -242,6 +242,7 @@ class Test_SQS_PROPAGATION_VIA_MESSAGE_ATTRIBUTES(_Test_SQS): @scenarios.crossed_tracing_libraries @features.aws_sqs_span_creationcontext_propagation_via_xray_header_with_dd_trace +@irrelevant("Localstack SQS does not support AWS Xray Header parsing") class Test_SQS_PROPAGATION_VIA_AWS_XRAY_HEADERS(_Test_SQS): buddy_interface = interfaces.java_buddy buddy = java_buddy diff --git a/utils/_context/_scenarios/endtoend.py b/utils/_context/_scenarios/endtoend.py index 3ad8ed619b9..7ff96ac59c0 100644 --- a/utils/_context/_scenarios/endtoend.py +++ b/utils/_context/_scenarios/endtoend.py @@ -23,6 +23,7 @@ BuddyContainer, TestedContainer, LocalstackContainer, + ElasticMQContainer, _get_client as get_docker_client, ) @@ -55,6 +56,7 @@ def __init__( include_mysql_db=False, include_sqlserver=False, include_localstack=False, + include_elasticmq=False, ) -> None: super().__init__(name, doc=doc, github_workflow=github_workflow, scenario_groups=scenario_groups) @@ -103,6 +105,9 @@ def __init__( if include_localstack: self._supporting_containers.append(LocalstackContainer(host_log_folder=self.host_log_folder)) + if include_elasticmq: + self._supporting_containers.append(ElasticMQContainer(host_log_folder=self.host_log_folder)) + self._required_containers.extend(self._supporting_containers) def get_image_list(self, library: str, weblog: str) -> list[str]: @@ -222,6 +227,7 @@ def __init__( include_mysql_db=False, include_sqlserver=False, include_localstack=False, + include_elasticmq=False, include_otel_drop_in=False, include_buddies=False, require_api_key=False, @@ -247,6 +253,7 @@ def __init__( include_mysql_db=include_mysql_db, include_sqlserver=include_sqlserver, include_localstack=include_localstack, + include_elasticmq=include_elasticmq, ) self._require_api_key = require_api_key diff --git a/utils/_context/_scenarios/integrations.py b/utils/_context/_scenarios/integrations.py index 909d7e27359..d7d4b86fafc 100644 --- a/utils/_context/_scenarios/integrations.py +++ b/utils/_context/_scenarios/integrations.py @@ -46,6 +46,7 @@ def __init__(self) -> None: include_mysql_db=True, include_sqlserver=True, include_localstack=True, + include_elasticmq=True, include_otel_drop_in=True, doc=( "Spawns tracer, agent, and a full set of database. " @@ -95,6 +96,7 @@ def __init__( include_rabbitmq=False, include_buddies=False, include_localstack=True, + include_elasticmq=True, ) -> None: super().__init__( name, @@ -109,6 +111,7 @@ def __init__( include_rabbitmq=include_rabbitmq, include_buddies=include_buddies, include_localstack=include_localstack, + include_elasticmq=include_elasticmq, scenario_groups=[ScenarioGroup.INTEGRATIONS, ScenarioGroup.ESSENTIALS], ) # Since we are using real AWS queues / topics, we need a unique message to ensure we aren't consuming messages @@ -138,9 +141,11 @@ def __init__(self) -> None: include_buddies=True, include_rabbitmq=True, include_localstack=True, + include_elasticmq=True, doc="Spawns a buddy for each supported language of APM, requires AWS authentication.", weblog_env={ "SYSTEM_TESTS_AWS_URL": "http://localstack-main:4566", + "SYSTEM_TESTS_AWS_SQS_URL": "http://elasticmq:9324", }, ) self.unique_id = None diff --git a/utils/_context/containers.py b/utils/_context/containers.py index e87ca850c78..444371a6f61 100644 --- a/utils/_context/containers.py +++ b/utils/_context/containers.py @@ -977,6 +977,19 @@ def __init__(self, host_log_folder) -> None: ) +class ElasticMQContainer(TestedContainer): + def __init__(self, host_log_folder) -> None: + super().__init__( + image_name="softwaremill/elasticmq-native:latest", + name="elasticmq", + host_log_folder=host_log_folder, + environment={"ELASTICMQ_OPTS": "-Dnode-address.hostname=0.0.0.0"}, + ports={9324: 9324}, + volumes={"/var/run/docker.sock": {"bind": "/var/run/docker.sock", "mode": "rw"}}, + allow_old_container=True, + ) + + class LocalstackContainer(TestedContainer): def __init__(self, host_log_folder) -> None: super().__init__( @@ -984,11 +997,13 @@ def __init__(self, host_log_folder) -> None: name="localstack-main", environment={ "LOCALSTACK_SERVICES": "kinesis,sqs,sns,xray", - "EXTRA_CORS_ALLOWED_HEADERS": "x-amz-request-id,x-amzn-requestid", - "EXTRA_CORS_EXPOSE_HEADERS": "x-amz-request-id,x-amzn-requestid", + "EXTRA_CORS_ALLOWED_HEADERS": "x-amz-request-id,x-amzn-requestid,x-amzn-trace-id", + "EXTRA_CORS_EXPOSE_HEADERS": "x-amz-request-id,x-amzn-requestid,x-amzn-trace-id", "AWS_DEFAULT_REGION": "us-east-1", "FORCE_NONINTERACTIVE": "true", "START_WEB": "0", + "DEBUG": "1", + "SQS_PROVIDER": "elasticmq", "DOCKER_HOST": "unix:///var/run/docker.sock", }, host_log_folder=host_log_folder, From 3ed39715c3c3784308f838fddff42aa488cce4b9 Mon Sep 17 00:00:00 2001 From: William Conti Date: Mon, 6 Jan 2025 10:42:08 -0500 Subject: [PATCH 17/19] fix failing python tests --- .../flask/integrations/messaging/aws/sns.py | 19 ++++++++++++-- .../flask/integrations/messaging/aws/sqs.py | 25 ++++++++++++++++--- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/sns.py b/utils/build/docker/python/flask/integrations/messaging/aws/sns.py index 967c5b560f3..99f5526cbcd 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/sns.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/sns.py @@ -67,14 +67,29 @@ def sns_consume(queue, expectedMessage, timeout=60): # Create an SQS client sqs = boto3.client("sqs", region_name="us-east-1", endpoint_url=SQS_HOST) - response = sqs.get_queue_url(QueueName=queue) + + start = time.time() + queue_found = False + queue_url = None + + while not queue_found and time.time() < start + timeout: + try: + data = sqs.get_queue_url(QueueName=queue) + queue_found = True + logging.info(f"Found SQS Queue details with name: {queue}") + logging.info(data) + logging.info(data.get("QueueUrl")) + queue_url = data.get("QueueUrl") + except Exception as e: + logging.info(f"Error during Python SQS get queue details: {str(e)}") + time.sleep(1) consumed_message = None start_time = time.time() while not consumed_message and time.time() - start_time < timeout: try: - response = sqs.receive_message(QueueUrl=response.get("QueueUrl")) + response = sqs.receive_message(QueueUrl=queue_url) if response and "Messages" in response: for message in response["Messages"]: logging.info("[SNS->SQS] Consumed: ") diff --git a/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py b/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py index 36e4c302e97..2728fb1edd8 100644 --- a/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py +++ b/utils/build/docker/python/flask/integrations/messaging/aws/sqs.py @@ -20,14 +20,16 @@ def sqs_produce(queue, message, timeout=60): start = time.time() queue_created = False exc = None + queue_url = None while not queue_created and time.time() < start + timeout: try: data = sqs.create_queue(QueueName=queue) queue_created = True + logging.info(f"Created SQS Queue with name: {queue}") logging.info(data) logging.info(data.get("QueueUrl")) - logging.info(f"Created SQS Queue with name: {queue}") + queue_url = data.get("QueueUrl") except Exception as e: exc = e logging.info(f"Error during Python SQS create queue: {str(e)}") @@ -37,7 +39,7 @@ def sqs_produce(queue, message, timeout=60): while not message_sent and time.time() < start + timeout: try: # Send the message to the SQS queue - sqs.send_message(QueueUrl=f"https://sqs.us-east-1.amazonaws.com/{AWS_ACCT}/{queue}", MessageBody=message) + sqs.send_message(QueueUrl=queue_url, MessageBody=message) message_sent = True except Exception as e: exc = e @@ -57,14 +59,29 @@ def sqs_consume(queue, expectedMessage, timeout=60): """ # Create an SQS client sqs = boto3.client("sqs", region_name="us-east-1", endpoint_url=HOST) - response = sqs.get_queue_url(QueueName=queue) + + start = time.time() + queue_found = False + queue_url = None + + while not queue_found and time.time() < start + timeout: + try: + data = sqs.get_queue_url(QueueName=queue) + queue_found = True + logging.info(f"Found SQS Queue details with name: {queue}") + logging.info(data) + logging.info(data.get("QueueUrl")) + queue_url = data.get("QueueUrl") + except Exception as e: + logging.info(f"Error during Python SQS get queue details: {str(e)}") + time.sleep(1) consumed_message = None start_time = time.time() while not consumed_message and time.time() - start_time < timeout: try: - response = sqs.receive_message(QueueUrl=response.get("QueueUrl")) + response = sqs.receive_message(QueueUrl=queue_url) if response and "Messages" in response: for message in response["Messages"]: if message["Body"] == expectedMessage: From d59b757549c1a44a492451c482ef52552618f5c7 Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 4 Feb 2025 12:31:40 -0500 Subject: [PATCH 18/19] address reviewer comments --- utils/_context/_scenarios/integrations.py | 2 -- utils/_context/containers.py | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/utils/_context/_scenarios/integrations.py b/utils/_context/_scenarios/integrations.py index cf0eebf0171..a31b7cd745e 100644 --- a/utils/_context/_scenarios/integrations.py +++ b/utils/_context/_scenarios/integrations.py @@ -122,8 +122,6 @@ def __init__( def configure(self, config): super().configure(config) - # if not self.replay: - # self._check_aws_variables() self.unique_id = _get_unique_id(self.host_log_folder, replay=self.replay) def _check_aws_variables(self): diff --git a/utils/_context/containers.py b/utils/_context/containers.py index 0d127967e1a..e4d2b988cc4 100644 --- a/utils/_context/containers.py +++ b/utils/_context/containers.py @@ -997,7 +997,7 @@ def __init__(self, host_log_folder) -> None: class ElasticMQContainer(TestedContainer): def __init__(self, host_log_folder) -> None: super().__init__( - image_name="softwaremill/elasticmq-native:latest", + image_name="softwaremill/elasticmq-native:1.6.11", name="elasticmq", host_log_folder=host_log_folder, environment={"ELASTICMQ_OPTS": "-Dnode-address.hostname=0.0.0.0"}, @@ -1010,7 +1010,7 @@ def __init__(self, host_log_folder) -> None: class LocalstackContainer(TestedContainer): def __init__(self, host_log_folder) -> None: super().__init__( - image_name="localstack/localstack:latest", + image_name="localstack/localstack:4.1", name="localstack-main", environment={ "LOCALSTACK_SERVICES": "kinesis,sqs,sns,xray", From d0f6efa86fbb281e285053ddc8e7ac29a3362f92 Mon Sep 17 00:00:00 2001 From: William Conti Date: Tue, 4 Feb 2025 16:08:50 -0500 Subject: [PATCH 19/19] fix --- .../crossed_integrations/test_kinesis.py | 4 +- .../crossed_integrations/test_sns_to_sqs.py | 4 +- .../crossed_integrations/test_sqs.py | 2 - tests/integrations/test_dsm.py | 100 +++++++----------- 4 files changed, 43 insertions(+), 67 deletions(-) diff --git a/tests/integrations/crossed_integrations/test_kinesis.py b/tests/integrations/crossed_integrations/test_kinesis.py index 867123ca82c..a6f5acc9893 100644 --- a/tests/integrations/crossed_integrations/test_kinesis.py +++ b/tests/integrations/crossed_integrations/test_kinesis.py @@ -2,11 +2,9 @@ import json from utils.buddies import python_buddy -from utils import interfaces, scenarios, weblog, missing_feature, features, context, irrelevant +from utils import interfaces, scenarios, weblog, missing_feature, features, context from utils.tools import logger -from tests.integrations.utils import delete_kinesis_stream - class _Test_Kinesis: """Test Kinesis compatibility with inputted datadog tracer""" diff --git a/tests/integrations/crossed_integrations/test_sns_to_sqs.py b/tests/integrations/crossed_integrations/test_sns_to_sqs.py index 3d249d858dd..61cd1ef470d 100644 --- a/tests/integrations/crossed_integrations/test_sns_to_sqs.py +++ b/tests/integrations/crossed_integrations/test_sns_to_sqs.py @@ -2,11 +2,9 @@ import json from utils.buddies import python_buddy -from utils import interfaces, scenarios, weblog, missing_feature, features, context, irrelevant +from utils import interfaces, scenarios, weblog, missing_feature, features, context from utils.tools import logger -from tests.integrations.utils import delete_sns_topic, delete_sqs_queue - class _Test_SNS: """Test sns compatibility with inputted datadog tracer""" diff --git a/tests/integrations/crossed_integrations/test_sqs.py b/tests/integrations/crossed_integrations/test_sqs.py index 3dd69141374..ed54122662a 100644 --- a/tests/integrations/crossed_integrations/test_sqs.py +++ b/tests/integrations/crossed_integrations/test_sqs.py @@ -5,8 +5,6 @@ from utils import interfaces, scenarios, weblog, missing_feature, features, context, irrelevant from utils.tools import logger -from tests.integrations.utils import delete_sqs_queue - class _Test_SQS: """Test sqs compatibility with inputted datadog tracer""" diff --git a/tests/integrations/test_dsm.py b/tests/integrations/test_dsm.py index bacf7df144f..48e362675b9 100644 --- a/tests/integrations/test_dsm.py +++ b/tests/integrations/test_dsm.py @@ -6,12 +6,7 @@ import json import os -from tests.integrations.utils import ( - compute_dsm_hash, - delete_sqs_queue, - delete_kinesis_stream, - delete_sns_topic, -) +from tests.integrations.utils import compute_dsm_hash from utils import weblog, interfaces, scenarios, irrelevant, context, bug, features, missing_feature, flaky from utils.tools import logger @@ -279,22 +274,18 @@ class Test_DsmSQS: """Verify DSM stats points for AWS Sqs Service""" def setup_dsm_sqs(self): - try: - message = get_message("Test_DsmSQS", "sqs") - - # we can't add the time hash to node since we can't replicate the hashing algo in python and compute a hash, - # which changes for each run with the time stamp added - if context.library.library != "nodejs": - self.queue = f"{DSM_QUEUE}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}" - else: - self.queue = f"{DSM_QUEUE}_{context.library.library}" - - self.r = weblog.get( - f"/dsm?integration=sqs&timeout=60&queue={self.queue}&message={message}", timeout=DSM_REQUEST_TIMEOUT - ) - finally: - if context.library.library != "nodejs": - delete_sqs_queue(self.queue) + message = get_message("Test_DsmSQS", "sqs") + + # we can't add the time hash to node since we can't replicate the hashing algo in python and compute a hash, + # which changes for each run with the time stamp added + if context.library.library != "nodejs": + self.queue = f"{DSM_QUEUE}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}" + else: + self.queue = f"{DSM_QUEUE}_{context.library.library}" + + self.r = weblog.get( + f"/dsm?integration=sqs&timeout=60&queue={self.queue}&message={message}", timeout=DSM_REQUEST_TIMEOUT + ) def test_dsm_sqs(self): assert self.r.text == "ok" @@ -332,26 +323,21 @@ class Test_DsmSNS: """Verify DSM stats points for AWS SNS Service""" def setup_dsm_sns(self): - try: - message = get_message("Test_DsmSNS", "sns") - - # we can't add the time hash to node since we can't replicate the hashing algo in python and compute a hash, - # which changes for each run with the time stamp added - if context.library.library != "nodejs": - self.topic = f"{DSM_TOPIC}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}_raw" - self.queue = f"{DSM_QUEUE_SNS}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}_raw" - else: - self.topic = f"{DSM_TOPIC}_{context.library.library}_raw" - self.queue = f"{DSM_QUEUE_SNS}_{context.library.library}_raw" - - self.r = weblog.get( - f"/dsm?integration=sns&timeout=60&queue={self.queue}&topic={self.topic}&message={message}", - timeout=DSM_REQUEST_TIMEOUT, - ) - finally: - if context.library.library != "nodejs": - delete_sns_topic(self.topic) - delete_sqs_queue(self.queue) + message = get_message("Test_DsmSNS", "sns") + + # we can't add the time hash to node since we can't replicate the hashing algo in python and compute a hash, + # which changes for each run with the time stamp added + if context.library.library != "nodejs": + self.topic = f"{DSM_TOPIC}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}_raw" + self.queue = f"{DSM_QUEUE_SNS}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}_raw" + else: + self.topic = f"{DSM_TOPIC}_{context.library.library}_raw" + self.queue = f"{DSM_QUEUE_SNS}_{context.library.library}_raw" + + self.r = weblog.get( + f"/dsm?integration=sns&timeout=60&queue={self.queue}&topic={self.topic}&message={message}", + timeout=DSM_REQUEST_TIMEOUT, + ) def test_dsm_sns(self): assert self.r.text == "ok" @@ -391,23 +377,19 @@ class Test_DsmKinesis: """Verify DSM stats points for AWS Kinesis Service""" def setup_dsm_kinesis(self): - try: - message = get_message("Test_DsmKinesis", "kinesis") - - # we can't add the time hash to node since we can't replicate the hashing algo in python and compute a hash, - # which changes for each run with the time stamp added - if context.library.library != "nodejs": - self.stream = f"{DSM_STREAM}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}" - else: - self.stream = f"{DSM_STREAM}_{context.library.library}" - - self.r = weblog.get( - f"/dsm?integration=kinesis&timeout=60&stream={self.stream}&message={message}", - timeout=DSM_REQUEST_TIMEOUT, - ) - finally: - if context.library.library != "nodejs": - delete_kinesis_stream(self.stream) + message = get_message("Test_DsmKinesis", "kinesis") + + # we can't add the time hash to node since we can't replicate the hashing algo in python and compute a hash, + # which changes for each run with the time stamp added + if context.library.library != "nodejs": + self.stream = f"{DSM_STREAM}_{context.library.library}_{WEBLOG_VARIANT_SANITIZED}_{scenarios.integrations_aws.unique_id}" + else: + self.stream = f"{DSM_STREAM}_{context.library.library}" + + self.r = weblog.get( + f"/dsm?integration=kinesis&timeout=60&stream={self.stream}&message={message}", + timeout=DSM_REQUEST_TIMEOUT, + ) @missing_feature(library="java", reason="DSM is not implemented for Java AWS Kinesis.") def test_dsm_kinesis(self):