-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Labels
area:providersgood first issuekind:bugThis is a clearly a bugThis is a clearly a bugprovider:googleGoogle (including GCP) related issuesGoogle (including GCP) related issues
Description
Apache Airflow version
3.0.0
If "Other Airflow 2 version" selected, which one?
No response
What happened?
There is a issue within the airflow/providers/google/cloud/hooks/datafusion.py concerning how it interacts with CDAP's Lifecycle Microservices for starting programs.
- Problem: The Airflow hook currently uses the
POST /v3/namespaces/<namespace-id>/startendpoint (documented here: https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-Multiple-Programs) to start a single program. While this endpoint returns an overall HTTP200 OKstatus code even if some individual programs fail, the Airflow code (specifically around) does not appear to inspect theself._check_response_status_and_data( statusCodefield within the response body. - Impact: When there is some problem in lets say runtime args the response body will have the error code and no runId. This leads to the failure on the next line where the code is trying to extract the run ID.
Full Error
[2025-05-05T12:19:53.950+0000] {taskinstance.py:2907} ERROR - Task failed with exception
Traceback (most recent call last):
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 465, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
return execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/datafusion.py", line 825, in execute
pipeline_id = hook.start_pipeline(
^^^^^^^^^^^^^^^^^^^^
File "/opt/python3.11/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/datafusion.py", line 500, in start_pipeline
return response_json[0]["runId"]
~~~~~~~~~~~~~~~~^^^^^^^^^
KeyError: 'runId'
What you think should happen instead?
There are two issues within the airflow/providers/google/cloud/hooks/datafusion.py concerning how it interacts with CDAP's Lifecycle Microservices for starting programs.
1. Insufficient Status Code Checking
- Problem: The Airflow hook currently uses the
POST /v3/namespaces/<namespace-id>/startendpoint (documented here: https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-Multiple-Programs) to start a single program. While this endpoint returns an overall HTTP200 OKstatus code even if some individual programs fail, the Airflow code (specifically around) does not appear to inspect theself._check_response_status_and_data( statusCodefield within the response body. - Proposed Fix: The hook should be updated to parse the JSON response body and check the
statusCodefor each program entry to ensure all intended programs started successfully.
2. [Preferred Approach] Suboptimal API Usage for Single Program Starts
- Problem: When starting a single program, the Airflow hook (around ) appears to be using the
def start_pipeline( POST /v3/namespaces/<namespace-id>/startendpoint, which is designed for starting multiple programs. - Recommendation: It is highly recommended to use the dedicated
POST /v3/namespaces/<namespace-id>/apps/<app-id>/<program-type>/<program-id>/startendpoint for starting a single program (documented here: https://cdap.atlassian.net/wiki/spaces/DOCS/pages/477560983/Lifecycle+Microservices#Start-a-Program). - Benefit: Using the specific single-program endpoint can provide clearer and more direct error handling for individual program starts, potentially reducing complexity in parsing multi-program responses for a single operation.
How to reproduce
Trigger the CDF start pipeline with wrong runtime args or incorrect program.
Operating System
I am not sure how is that related to the issue. NA
Versions of Apache Airflow Providers
No response
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
area:providersgood first issuekind:bugThis is a clearly a bugThis is a clearly a bugprovider:googleGoogle (including GCP) related issuesGoogle (including GCP) related issues
