diff --git a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py index fcf31ed29de6b..bffab39fa6fda 100644 --- a/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py +++ b/providers/amazon/src/airflow/providers/amazon/aws/executors/aws_lambda/lambda_executor.py @@ -425,17 +425,26 @@ def process_queue(self, queue_url: str): "Successful Lambda invocation for task %s received from SQS queue.", task_key ) else: - # In this case the Lambda likely started but failed at run time since we got a non-zero - # return code. We could consider retrying these tasks within the executor, because this _likely_ - # means the Airflow task did not run to completion, however we can't be sure (maybe the - # lambda runtime code has a bug and is returning a non-zero when it actually passed?). So - # perhaps not retrying is the safest option. self.fail(task_key) - self.log.error( - "Lambda invocation for task: %s has failed to run with return code %s", - task_key, - return_code, - ) + if queue_url == self.dlq_url and return_code is None: + # DLQ failure: AWS Lambda service could not complete the invocation after retries. + # This indicates a Lambda-level failure (timeout, memory limit, crash, etc.) + # where the function was unable to successfully execute to return a result. + self.log.error( + "DLQ message received: Lambda invocation for task: %s was unable to successfully execute. This likely indicates a Lambda-level failure (timeout, memory limit, crash, etc.).", + task_key, + ) + else: + # In this case the Lambda likely started but failed at run time since we got a non-zero + # return code. We could consider retrying these tasks within the executor, because this _likely_ + # means the Airflow task did not run to completion, however we can't be sure (maybe the + # lambda runtime code has a bug and is returning a non-zero when it actually passed?). So + # perhaps not retrying is the safest option. + self.log.debug( + "Lambda invocation for task: %s completed but the underlying Airflow task has returned a non-zero exit code %s", + task_key, + return_code, + ) # Remove the task from the tracking mapping. self.running_tasks.pop(ser_task_key)