From b005886a205ea26a11ce5b9eb74b48700e4bf8b9 Mon Sep 17 00:00:00 2001 From: Isaiah Iruoha Date: Tue, 29 Jul 2025 10:20:13 -0700 Subject: [PATCH 1/8] Create lambda executor dlq system test file --- .../aws/tests/test_lambda_executor_dlq.py | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 providers/amazon/tests/system/amazon/aws/tests/test_lambda_executor_dlq.py diff --git a/providers/amazon/tests/system/amazon/aws/tests/test_lambda_executor_dlq.py b/providers/amazon/tests/system/amazon/aws/tests/test_lambda_executor_dlq.py new file mode 100644 index 0000000000000..ae04b74bec9a3 --- /dev/null +++ b/providers/amazon/tests/system/amazon/aws/tests/test_lambda_executor_dlq.py @@ -0,0 +1,121 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import os +import time +from datetime import datetime, timedelta, timezone +from urllib.parse import urlparse + +import boto3 + +from airflow.utils.trigger_rule import TriggerRule + +from tests_common.test_utils.version_compat import AIRFLOW_V_3_0_PLUS + +if AIRFLOW_V_3_0_PLUS: + from airflow.sdk import DAG, chain, task +else: + from airflow.decorators import task # type: ignore[attr-defined,no-redef] + from airflow.models.baseoperator import chain # type: ignore[attr-defined,no-redef] + from airflow.models.dag import DAG # type: ignore[attr-defined,no-redef,assignment] + +from system.amazon.aws.utils import SystemTestContextBuilder + +sys_test_context_task = SystemTestContextBuilder().build() + +DAG_ID = "test_lambda_executor_dlq" + + +@task +def verify_test_setup(): + """Verify Lambda executor DLQ configuration is available""" + dlq_url = os.environ.get("AIRFLOW__AWS_LAMBDA_EXECUTOR__DEAD_LETTER_QUEUE_URL") + print(f"Lambda executor DLQ URL: {dlq_url}") + + parsed_url = urlparse(dlq_url) + dlq_queue_name = parsed_url.path.split("/")[-1] + return dlq_queue_name + + +@task(executor_config={"poison_pill": True}) +def cause_lambda_failure(): + return "This task is designed to fail at Lambda service level, injected within the function handler" + + +@task(trigger_rule=TriggerRule.ALL_DONE) +def verify_dlq_activity(dlq_queue_name: str): + """Verify DLQ processing occurred by checking CloudWatch metrics""" + + cloudwatch = boto3.client("cloudwatch") + # Try for up to 10 attempts (5 minutes total) + for attempt in range(10): + end_time = datetime.now(timezone.utc) + start_time = end_time - timedelta(minutes=5) + try: + received_response = cloudwatch.get_metric_statistics( + Namespace="AWS/SQS", + MetricName="NumberOfMessagesDeleted", + Dimensions=[{"Name": "QueueName", "Value": dlq_queue_name}], + StartTime=start_time, + EndTime=end_time, + Period=300, + Statistics=["Sum"], + ) + + total_deleted = sum(point["Sum"] for point in received_response["Datapoints"]) + print(f"Messages deleted from DLQ: {total_deleted}") + + if total_deleted > 0: + return "DLQ Processing Confirmed" + if attempt < 9: + time.sleep(30) + except Exception as e: + print(f"Error checking DLQ metrics: {e}") + raise AssertionError("FAIL: No DLQ activity detected after 5 minutes of polling") + + +default_args = { + "execution_timeout": timedelta(minutes=15), +} + +with DAG( + dag_id=DAG_ID, + default_args=default_args, + schedule="@once", + start_date=datetime(2021, 1, 1), + tags=["test"], + catchup=False, +) as dag: + test_context = sys_test_context_task() + dlq_name = verify_test_setup() + failed_task = cause_lambda_failure() + verify_task = verify_dlq_activity(dlq_name) + + chain( + test_context, + dlq_name, + failed_task, + verify_task, + ) + from tests_common.test_utils.watcher import watcher + + verify_task >> watcher() + +from tests_common.test_utils.system_tests import get_test_run + +test_run = get_test_run(dag) From 1817a746e3e29e92a89c8b7251448bd682bf358d Mon Sep 17 00:00:00 2001 From: Isaiah Iruoha Date: Tue, 5 Aug 2025 09:38:34 -0700 Subject: [PATCH 2/8] Pre-commit changes for CI tests --- docs/spelling_wordlist.txt | 1 + .../aws/tests/test_lambda_executor_dlq.py | 43 ++++++++----------- 2 files changed, 19 insertions(+), 25 deletions(-) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 08cf8545c7020..e14633f0500fc 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -529,6 +529,7 @@ Dlp dlp DlpJob DlpServiceClient +dlq dms DNs dns diff --git a/providers/amazon/tests/system/amazon/aws/tests/test_lambda_executor_dlq.py b/providers/amazon/tests/system/amazon/aws/tests/test_lambda_executor_dlq.py index ae04b74bec9a3..ac37a5e35ea50 100644 --- a/providers/amazon/tests/system/amazon/aws/tests/test_lambda_executor_dlq.py +++ b/providers/amazon/tests/system/amazon/aws/tests/test_lambda_executor_dlq.py @@ -52,6 +52,7 @@ def verify_test_setup(): return dlq_queue_name +# Configuration to search for this poison pill has been done within the Lambda app.py handler code @task(executor_config={"poison_pill": True}) def cause_lambda_failure(): return "This task is designed to fail at Lambda service level, injected within the function handler" @@ -66,36 +67,28 @@ def verify_dlq_activity(dlq_queue_name: str): for attempt in range(10): end_time = datetime.now(timezone.utc) start_time = end_time - timedelta(minutes=5) - try: - received_response = cloudwatch.get_metric_statistics( - Namespace="AWS/SQS", - MetricName="NumberOfMessagesDeleted", - Dimensions=[{"Name": "QueueName", "Value": dlq_queue_name}], - StartTime=start_time, - EndTime=end_time, - Period=300, - Statistics=["Sum"], - ) - - total_deleted = sum(point["Sum"] for point in received_response["Datapoints"]) + received_response = cloudwatch.get_metric_statistics( + Namespace="AWS/SQS", + MetricName="NumberOfMessagesDeleted", + Dimensions=[{"Name": "QueueName", "Value": dlq_queue_name}], + StartTime=start_time, + EndTime=end_time, + Period=300, + Statistics=["Sum"], + ) + + total_deleted = sum(point["Sum"] for point in received_response["Datapoints"]) + if total_deleted > 0: print(f"Messages deleted from DLQ: {total_deleted}") - - if total_deleted > 0: - return "DLQ Processing Confirmed" - if attempt < 9: - time.sleep(30) - except Exception as e: - print(f"Error checking DLQ metrics: {e}") + return "DLQ Processing Confirmed" + print("No messages detected in DLQ yet") + if attempt < 9: + time.sleep(30) raise AssertionError("FAIL: No DLQ activity detected after 5 minutes of polling") -default_args = { - "execution_timeout": timedelta(minutes=15), -} - with DAG( dag_id=DAG_ID, - default_args=default_args, schedule="@once", start_date=datetime(2021, 1, 1), tags=["test"], @@ -116,6 +109,6 @@ def verify_dlq_activity(dlq_queue_name: str): verify_task >> watcher() -from tests_common.test_utils.system_tests import get_test_run +from tests_common.test_utils.system_tests import get_test_run # noqa: E402 test_run = get_test_run(dag) From c4c619108613964bde255e9d9584aaa1b0ae6e36 Mon Sep 17 00:00:00 2001 From: Isaiah Iruoha Date: Tue, 5 Aug 2025 10:17:36 -0700 Subject: [PATCH 3/8] Add DLQ to spellcheck --- docs/spelling_wordlist.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index e14633f0500fc..cd549dabcf72e 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -530,6 +530,8 @@ dlp DlpJob DlpServiceClient dlq +DLQ +Dlq dms DNs dns From 19b7cbd3188b478278897ed739748736bcfde5fe Mon Sep 17 00:00:00 2001 From: Isaiah Iruoha Date: Tue, 5 Aug 2025 10:46:22 -0700 Subject: [PATCH 4/8] Adjusted spellling wordlist --- docs/spelling_wordlist.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index cd549dabcf72e..bbfe1e1de39cf 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -529,9 +529,9 @@ Dlp dlp DlpJob DlpServiceClient -dlq DLQ Dlq +dlq dms DNs dns From bf07a60fa22916b262de3e1918dada3f8c732400 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Tue, 5 Aug 2025 14:59:59 -0400 Subject: [PATCH 5/8] Ignore system test related generated doc files --- devel-common/src/docs/utils/conf_constants.py | 1 - providers/amazon/docs/conf.py | 2 ++ providers/amazon/docs/index.rst | 1 - 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/devel-common/src/docs/utils/conf_constants.py b/devel-common/src/docs/utils/conf_constants.py index d483adae6eba4..577a8fb6c1172 100644 --- a/devel-common/src/docs/utils/conf_constants.py +++ b/devel-common/src/docs/utils/conf_constants.py @@ -337,7 +337,6 @@ def get_google_intersphinx_mapping() -> dict[str, tuple[str, tuple[str]]]: "*/tests/__init__.py", "*/tests/system/__init__.py", "*/tests/system/example_empty.py", - "*/test_aws_auth_manager.py", "*/check_translations_completeness.py", ] diff --git a/providers/amazon/docs/conf.py b/providers/amazon/docs/conf.py index f4d1d5f8d2975..3b0397c52997e 100644 --- a/providers/amazon/docs/conf.py +++ b/providers/amazon/docs/conf.py @@ -25,3 +25,5 @@ os.environ["AIRFLOW_PACKAGE_NAME"] = "apache-airflow-providers-amazon" from docs.provider_conf import * # noqa: F403 + +exclude_patterns.append("_api/tests/system/amazon/*") diff --git a/providers/amazon/docs/index.rst b/providers/amazon/docs/index.rst index 05497db2812c0..33035c9aca10d 100644 --- a/providers/amazon/docs/index.rst +++ b/providers/amazon/docs/index.rst @@ -58,7 +58,6 @@ :maxdepth: 1 :caption: System tests - System Tests <_api/tests/system/amazon/index> System Tests Dashboard .. toctree:: From 1ba1e665ec43685c659456dccbc8aa5491587b9e Mon Sep 17 00:00:00 2001 From: Isaiah Iruoha Date: Tue, 5 Aug 2025 12:53:30 -0700 Subject: [PATCH 6/8] Fix static checks --- providers/amazon/docs/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/amazon/docs/conf.py b/providers/amazon/docs/conf.py index 3b0397c52997e..e65bf9bd7b072 100644 --- a/providers/amazon/docs/conf.py +++ b/providers/amazon/docs/conf.py @@ -26,4 +26,4 @@ from docs.provider_conf import * # noqa: F403 -exclude_patterns.append("_api/tests/system/amazon/*") +exclude_patterns.append("_api/tests/system/amazon/*") # noqa: F405 \ No newline at end of file From 3f584a6558361b890fa70fd067c177e993d33e3e Mon Sep 17 00:00:00 2001 From: Isaiah Iruoha Date: Tue, 5 Aug 2025 12:54:45 -0700 Subject: [PATCH 7/8] Fix static checks --- providers/amazon/docs/conf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/amazon/docs/conf.py b/providers/amazon/docs/conf.py index e65bf9bd7b072..8af188a0e5fb8 100644 --- a/providers/amazon/docs/conf.py +++ b/providers/amazon/docs/conf.py @@ -26,4 +26,4 @@ from docs.provider_conf import * # noqa: F403 -exclude_patterns.append("_api/tests/system/amazon/*") # noqa: F405 \ No newline at end of file +exclude_patterns.append("_api/tests/system/amazon/*") # noqa: F405 From 1e93b4f086f8183199feafbcf2b3e62baa76a1f9 Mon Sep 17 00:00:00 2001 From: vincbeck Date: Tue, 5 Aug 2025 17:03:24 -0400 Subject: [PATCH 8/8] Fix doc building --- devel-common/src/docs/utils/conf_constants.py | 1 + providers/amazon/docs/conf.py | 2 -- providers/amazon/docs/index.rst | 1 + 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/devel-common/src/docs/utils/conf_constants.py b/devel-common/src/docs/utils/conf_constants.py index 577a8fb6c1172..89bd07b61e11f 100644 --- a/devel-common/src/docs/utils/conf_constants.py +++ b/devel-common/src/docs/utils/conf_constants.py @@ -336,6 +336,7 @@ def get_google_intersphinx_mapping() -> dict[str, tuple[str, tuple[str]]]: "*/conftest.py", "*/tests/__init__.py", "*/tests/system/__init__.py", + "*/tests/system/*/tests/*", "*/tests/system/example_empty.py", "*/check_translations_completeness.py", ] diff --git a/providers/amazon/docs/conf.py b/providers/amazon/docs/conf.py index 8af188a0e5fb8..f4d1d5f8d2975 100644 --- a/providers/amazon/docs/conf.py +++ b/providers/amazon/docs/conf.py @@ -25,5 +25,3 @@ os.environ["AIRFLOW_PACKAGE_NAME"] = "apache-airflow-providers-amazon" from docs.provider_conf import * # noqa: F403 - -exclude_patterns.append("_api/tests/system/amazon/*") # noqa: F405 diff --git a/providers/amazon/docs/index.rst b/providers/amazon/docs/index.rst index 33035c9aca10d..05497db2812c0 100644 --- a/providers/amazon/docs/index.rst +++ b/providers/amazon/docs/index.rst @@ -58,6 +58,7 @@ :maxdepth: 1 :caption: System tests + System Tests <_api/tests/system/amazon/index> System Tests Dashboard .. toctree::