Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -425,17 +425,26 @@ def process_queue(self, queue_url: str):
"Successful Lambda invocation for task %s received from SQS queue.", task_key
)
else:
# In this case the Lambda likely started but failed at run time since we got a non-zero
# return code. We could consider retrying these tasks within the executor, because this _likely_
# means the Airflow task did not run to completion, however we can't be sure (maybe the
# lambda runtime code has a bug and is returning a non-zero when it actually passed?). So
# perhaps not retrying is the safest option.
self.fail(task_key)
self.log.error(
"Lambda invocation for task: %s has failed to run with return code %s",
task_key,
return_code,
)
if queue_url == self.dlq_url and return_code is None:
# DLQ failure: AWS Lambda service could not complete the invocation after retries.
# This indicates a Lambda-level failure (timeout, memory limit, crash, etc.)
# where the function was unable to successfully execute to return a result.
self.log.error(
"DLQ message received: Lambda invocation for task: %s was unable to successfully execute. This likely indicates a Lambda-level failure (timeout, memory limit, crash, etc.).",
task_key,
)
else:
# In this case the Lambda likely started but failed at run time since we got a non-zero
# return code. We could consider retrying these tasks within the executor, because this _likely_
# means the Airflow task did not run to completion, however we can't be sure (maybe the
# lambda runtime code has a bug and is returning a non-zero when it actually passed?). So
# perhaps not retrying is the safest option.
self.log.debug(
"Lambda invocation for task: %s completed but the underlying Airflow task has returned a non-zero exit code %s",
task_key,
return_code,
)
# Remove the task from the tracking mapping.
self.running_tasks.pop(ser_task_key)

Expand Down