From ce8631ce7f834d39126ea1d6aeef3a3706bc3d48 Mon Sep 17 00:00:00 2001 From: Isaiah Iruoha Date: Wed, 30 Jul 2025 15:54:32 -0700 Subject: [PATCH 1/4] Fix bug in lambda executor DLQ handling --- .../executors/aws_lambda/lambda_executor.py | 33 ++++++++++++------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py index fcf31ed29de6b..aba1c687bf49a 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py @@ -398,8 +398,9 @@ def process_queue(self, queue_url: str): self.log.warning("Deleting the message to avoid processing it again.") self.sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle) continue - return_code = body.get("return_code") ser_task_key = body.get("task_key") + return_code = body.get("return_code") + command = body.get("command") # Fetch the real task key from the running_tasks dict, using the serialized task key. try: task_key = self.running_tasks[ser_task_key] @@ -425,17 +426,27 @@ def process_queue(self, queue_url: str): "Successful Lambda invocation for task %s received from SQS queue.", task_key ) else: - # In this case the Lambda likely started but failed at run time since we got a non-zero - # return code. We could consider retrying these tasks within the executor, because this _likely_ - # means the Airflow task did not run to completion, however we can't be sure (maybe the - # lambda runtime code has a bug and is returning a non-zero when it actually passed?). So - # perhaps not retrying is the safest option. self.fail(task_key) - self.log.error( - "Lambda invocation for task: %s has failed to run with return code %s", - task_key, - return_code, - ) + if queue_url == self.dlq_url and return_code == None: + # DLQ failure: AWS Lambda service could not complete the invocation after retries. + # This indicates a Lambda-level failure (timeout, memory limit, crash, etc.) + # where the function was unable to successfully execute to return a result. + self.log.error( + "Lambda invocation for task: %s failed at the service level (from DLQ). Command: %s", + task_key, + command, + ) + else: + # In this case the Lambda likely started but failed at run time since we got a non-zero + # return code. We could consider retrying these tasks within the executor, because this _likely_ + # means the Airflow task did not run to completion, however we can't be sure (maybe the + # lambda runtime code has a bug and is returning a non-zero when it actually passed?). So + # perhaps not retrying is the safest option. + self.log.error( + "Lambda invocation for task: %s has failed to run with return code %s", + task_key, + return_code, + ) # Remove the task from the tracking mapping. self.running_tasks.pop(ser_task_key) From 5e90b1c08fc5e4ad2c71ea1ff9f6de7de094260d Mon Sep 17 00:00:00 2001 From: Isaiah Iruoha Date: Thu, 31 Jul 2025 12:07:01 -0700 Subject: [PATCH 2/4] Fix static checks --- .../amazon/aws/executors/aws_lambda/lambda_executor.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py index aba1c687bf49a..4180749270131 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py @@ -399,8 +399,8 @@ def process_queue(self, queue_url: str): self.sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle) continue ser_task_key = body.get("task_key") - return_code = body.get("return_code") - command = body.get("command") + return_code = body.get("return_code") + command = body.get("command") # Fetch the real task key from the running_tasks dict, using the serialized task key. try: task_key = self.running_tasks[ser_task_key] @@ -429,7 +429,7 @@ def process_queue(self, queue_url: str): self.fail(task_key) if queue_url == self.dlq_url and return_code == None: # DLQ failure: AWS Lambda service could not complete the invocation after retries. - # This indicates a Lambda-level failure (timeout, memory limit, crash, etc.) + # This indicates a Lambda-level failure (timeout, memory limit, crash, etc.) # where the function was unable to successfully execute to return a result. self.log.error( "Lambda invocation for task: %s failed at the service level (from DLQ). Command: %s", From b7b2b06f185822bc96607148f939092de9e878c1 Mon Sep 17 00:00:00 2001 From: Isaiah Iruoha Date: Thu, 31 Jul 2025 12:29:45 -0700 Subject: [PATCH 3/4] Adjust logging and error messages --- .../amazon/aws/executors/aws_lambda/lambda_executor.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py index 4180749270131..84a7e7583c6c6 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py @@ -400,7 +400,6 @@ def process_queue(self, queue_url: str): continue ser_task_key = body.get("task_key") return_code = body.get("return_code") - command = body.get("command") # Fetch the real task key from the running_tasks dict, using the serialized task key. try: task_key = self.running_tasks[ser_task_key] @@ -427,14 +426,13 @@ def process_queue(self, queue_url: str): ) else: self.fail(task_key) - if queue_url == self.dlq_url and return_code == None: + if queue_url == self.dlq_url and return_code is None: # DLQ failure: AWS Lambda service could not complete the invocation after retries. # This indicates a Lambda-level failure (timeout, memory limit, crash, etc.) # where the function was unable to successfully execute to return a result. self.log.error( - "Lambda invocation for task: %s failed at the service level (from DLQ). Command: %s", + "DLQ message received: Lambda invocation for task: %s was unable to successfully execute. This likely indicates a Lambda-level failure (timeout, memory limit, crash, etc.).", task_key, - command, ) else: # In this case the Lambda likely started but failed at run time since we got a non-zero @@ -442,8 +440,8 @@ def process_queue(self, queue_url: str): # means the Airflow task did not run to completion, however we can't be sure (maybe the # lambda runtime code has a bug and is returning a non-zero when it actually passed?). So # perhaps not retrying is the safest option. - self.log.error( - "Lambda invocation for task: %s has failed to run with return code %s", + self.log.debug( + "Lambda invocation for task: %s completed but the underlying Airflow task has returned a non-zero exit code %s", task_key, return_code, ) From e4fe6f73ecd434bfbe4d132806b260b4956fe50d Mon Sep 17 00:00:00 2001 From: Isaiah Iruoha Date: Thu, 31 Jul 2025 14:44:39 -0700 Subject: [PATCH 4/4] Adjustments to resolve comments --- .../amazon/aws/executors/aws_lambda/lambda_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py index 84a7e7583c6c6..bffab39fa6fda 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py @@ -398,8 +398,8 @@ def process_queue(self, queue_url: str): self.log.warning("Deleting the message to avoid processing it again.") self.sqs_client.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle) continue - ser_task_key = body.get("task_key") return_code = body.get("return_code") + ser_task_key = body.get("task_key") # Fetch the real task key from the running_tasks dict, using the serialized task key. try: task_key = self.running_tasks[ser_task_key]