Skip to content

Airflow Datafusion Hook: Regression in the work of start_pipeline method for DataFusionHook #60661

@MaksYermak

Description

@MaksYermak

Apache Airflow Provider(s)

google

Versions of Apache Airflow Providers

apache-airflow-providers-google==19.3.0

Apache Airflow version

2.11.0; 3.x.x

Operating System

Ubuntu

Deployment

Other

Deployment details

No response

What happened

After merging this PR #58698 in the main branch the start_pipeline method started to fail with this error:

[2026-01-15, 00:01:17 UTC] {taskinstance.py:3336} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/python/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 776, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 742, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 424, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.11/site-packages/airflow/providers/google/cloud/operators/datafusion.py", line 800, in execute
    self.pipeline_id = hook.start_pipeline(
                       ^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.11/site-packages/airflow/providers/google/cloud/hooks/datafusion.py", line 488, in start_pipeline
    response_json = json.loads(response.data)
                    ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.11/json/__init__.py", line 346, in loads
    return _default_decoder.decode(s)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.11/json/decoder.py", line 337, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/python/lib/python3.11/json/decoder.py", line 355, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)

What you think should happen instead

This PR #58698 brought the breaking changes to the start_pipeline method for DataFusionHook. This PR was for handling errors, but it broke the start_pipeline method and a bunch of DataFusion operators. In my opinion we need to revert this PR for returning to the previous behavior and think about another solution for handling errors.

How to reproduce

Run this DAG https://github.com/apache/airflow/blob/main/providers/google/tests/system/google/cloud/datafusion/example_datafusion.py in Breeze environment or any other Airflow environment. The start_pipeline_def task should fail with json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0) error.

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions