-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Description
Apache Airflow Provider(s)
amazon
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==9.0.0
Apache Airflow version
2.10.3
Operating System
Amazon Linux 2023
Deployment
Amazon (AWS) MWAA
Deployment details
MWAA in production and mwaa-local-runner for local testing. Bug exists on both.
What happened
When a Deferred GlueJobOperator tasks failed with internal errors , the glueJobOperator task is marked as failed. However the existing glue job_run_id still running.
When the GlueJobOperator task retry 2nd time, it will automatically create a new glue job_run_id while the existing glue job_run_id still running. This caused duplicate run glue jobs at AWS Glue side.
Example of internal errors:
[2025-12-05, 01:10:33 UTC] {baseoperator.py:1798} ERROR - Trigger failed:
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 558, in cleanup_finished_triggers
result = details["task"].result()
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/jobs/triggerer_job_runner.py", line 630, in run_trigger
async for event in trigger.run():
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/triggers/glue.py", line 73, in run
await hook.async_job_completion(self.job_name, self.run_id, self.verbose)
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 315, in async_job_completion
ret = self._handle_state(job_run_state, job_name, run_id, verbose, next_log_tokens)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 334, in _handle_state
self.print_job_logs(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 278, in print_job_logs
continuation_tokens.output_stream_continuation = display_logs_from(
^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/hooks/glue.py", line 245, in display_logs_from
for response in paginator.paginate(
File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/paginate.py", line 269, in __iter__
response = self._make_request(current_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/paginate.py", line 357, in _make_request
return self._method(**current_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 569, in _api_call
return self._make_api_call(operation_name, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/botocore/client.py", line 1023, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.errorfactory.ThrottlingException: An error occurred (ThrottlingException) when calling the FilterLogEvents operation (reached max retries: 4): Rate exceeded
[2025-12-05, 01:10:33 UTC] {taskinstance.py:3311} ERROR - Task failed with exception
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 1799, in resume_execution
raise TaskDeferralError(next_kwargs.get("error", "Unknown"))
airflow.exceptions.TaskDeferralError: Trigger failure
or
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/aiobotocore/httpsession.py", line 222, in send
response = await session.request(
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/aiohttp/client.py", line 667, in _request
raise ConnectionTimeoutError(
aiohttp.client_exceptions.ConnectionTimeoutError: Connection timeout to host http://169.254.170.2/v2/credentials/c9ab61ee-ea14-42a2-9a16-0b4aebdedad7
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/airflow/.local/lib/python3.11/site-packages/aiobotocore/utils.py", line 722, in _get_response
response = await session.send(request.prepare())
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/airflow/.local/lib/python3.11/site-packages/aiobotocore/httpsession.py", line 259, in send
raise ConnectTimeoutError(endpoint_url=request.url, error=e)
botocore.exceptions.ConnectTimeoutError: Connect timeout on endpoint URL: "[http://169.254.170.2/v2/credentials/c9ab61ee-ea14-42a2-9a16-0b4aebdedad7"](http://169.254.170.2/v2/credentials/c9ab61ee-ea14-42a2-9a16-0b4aebdedad7%22)
What you think should happen instead
I would propose to include a logic for GlueJobOperator during next retry, the GlueJobOperator would check if there is a previous glue job_run_id , it would check the the glue STATE,
If the glue STATE = in progress / completed, GlueJobOperator should not create a new glue job_run_id
If the glue STATE = failed, should create a new glue job_run_id
How to reproduce
- Create a GlueJobOperator task.
- Set deferrable=True and verbose=False
- Run the task and watch the Airflow logs while it is in a deferred state.
Sample of the GlueJobOperator:
company = GlueJobOperator(
task_id="company",
job_name=var_glue_job_name,
verbose=False,
stop_job_run_on_kill=True,
deferrable=True,
pool=var_glue_pool,
priority_weight=3,
job_poll_interval=random.randint(30, 120)
)
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct