Skip to content

Google Dataflow provider does not retry on service 503 errors #57359

@pmcquighan-camus

Description

@pmcquighan-camus

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==18.0.0

Apache Airflow version

3.1.0

Operating System

Debian

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

What happened

The provider was polling for job status every 30 seconds for about 15-minutes as expected. However, the Dataflow API then returned a 503 - Service Unavailable error, and the task then failed. Retries of the task also failed since this was run in deferrable mode and so the task retries saw the trigger had completed with an exception, and then also failed.

The Google Dataflow API seems to return 503s with some regularity (weekly) with our current workload resulting in a failed DAG run every several days (that can be retried by clearing out the full task state, but is wasteful since the dataflow job likely already succeeded but any needed XCom values are not output)

Initial try:

[2025-10-26 22:39:17] ERROR - Exception occurred while checking for job completion. source=airflow.providers.google.cloud.triggers.dataflow.TemplateJobStartTrigger loc=dataflow.py:149
ServiceUnavailable: 503 The service is currently unavailable.
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/triggers/dataflow.py", line 113 in run
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 1480 in get_job_status
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/hooks/dataflow.py", line 1457 in get_job
File "/home/airflow/.local/lib/python3.12/site-packages/google/cloud/dataflow_v1beta3/services/jobs_v1_beta3/async_client.py", line 478 in get_job
File "/home/airflow/.local/lib/python3.12/site-packages/google/api_core/grpc_helpers_async.py", line 88 in __await__
AioRpcError: <AioRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "The service is currently unavailable."
	debug_error_string = "UNKNOWN:Error received from peer ipv4:74.125.132.95:443 {created_time:"2025-10-27T05:39:17.82102088+00:00", grpc_status:14, grpc_message:"The service is currently unavailable."}"
>
File "/home/airflow/.local/lib/python3.12/site-packages/google/api_core/grpc_helpers_async.py", line 85 in __await__
File "/home/airflow/.local/lib/python3.12/site-packages/grpc/aio/_interceptor.py", line 472 in __await__
File "/home/airflow/.local/lib/python3.12/site-packages/grpc/aio/_call.py", line 327 in __await__

Task retry a few minutes later:

[2025-10-26 22:42:11] DEBUG - Result from 'on_task_instance_running': [] source=airflow.listeners.listener loc=listener.py:42
[2025-10-26 22:42:11] INFO - status: error, msg: 503 The service is currently unavailable. source=airflow.task.operators.airflow.providers.google.cloud.operators.dataflow.DataflowStartFlexTemplateOperator loc=dataflow.py:648
[2025-10-26 22:42:11] ERROR - Task failed with exception source=task loc=task_runner.py:972
AirflowException: 503 The service is currently unavailable.
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 920 in run
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1307 in _execute_task
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/bases/operator.py", line 1632 in resume_execution
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/operators/dataflow.py", line 649 in execute_complete
[2025-10-26 22:42:11] ERROR - Top level error source=task loc=task_runner.py:1457
ValueError: dictionary update sequence element #0 has length 1; 2 is required
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1452 in main
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1390 in finalize
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/links/base.py", line 112 in get_link
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/links/base.py", line 92 in get_config
[2025-10-26 22:42:11] WARNING - Process exited abnormally exit_code=1 source=task

What you think should happen instead

Since the 503 error is a retryable error, the trigger should retry several times or perhaps indefinitely, before failing, just as if the job were still running.

How to reproduce

Run a dataflow flex template via the DataflowStartFlexTemplateOperator, if the google dataflow API returns a 503 while the trigger is polling for status the error will recur.

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions