-
Notifications
You must be signed in to change notification settings - Fork 16.5k
Closed
Labels
area:providerskind:bugThis is a clearly a bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetlabel for new issues that we didn't triage yetprovider:googleGoogle (including GCP) related issuesGoogle (including GCP) related issues
Description
Apache Airflow Provider(s)
Versions of Apache Airflow Providers
apache-airflow-providers-google==18.0.0
Apache Airflow version
3.0.6
Operating System
Debian GNU/Linux 12 (bookworm)
Deployment
Official Apache Airflow Helm Chart
Deployment details
No response
What happened
When using the CloudRunExecuteJobOperator with deferred=True, the following error appears in the logs:
ERROR - Top level error: source="task"
ValueError: dictionary update sequence element #0 has length 1; 2 is required
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1353 in main
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py", line 1291 in finalize
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/links/base.py", line 112 in get_link
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/google/cloud/links/base.py", line 92 in get_config
It seems this error comes from the operator writing log_uri as plain string, while BaseGoogleLink.get_config seems to expect only dictionary entries. When monkey-patching get_config like this
def _patched(self, operator, ti_key):
conf = {}
conf.update(getattr(operator, "extra_links_params", {}))
from airflow.sdk.execution_time.xcom import XCom
x = XCom.get_value(key=self.key, ti_key=ti_key) or {}
if isinstance(x, str):
x = {self.key: x}
conf.update(x)
# if the config did not define, return None to stop URL formatting
if not conf:
return None
# Add a default value for the 'namespace' parameter for backward compatibility.
# This is for datafusion
conf.setdefault("namespace", "default")
return conf
BaseGoogleLink.get_config = _patched
the error disappears. Not sure if it's actually the root course or the best approach to handle this.
What you think should happen instead
No error (whether that means writing a dictionary to xcom, or handling the get_config differently, I am not sure).
How to reproduce
Simple example DAG:
@dag(dag_id="test")
def get_dag() -> None:
op = CloudRunExecuteJobOperator(
task_id="test-job",
project_id="my-project",
region="my-region",
job_name="my-job-name",
deferrable=True,
)
op
get_dag()
Anything else
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
area:providerskind:bugThis is a clearly a bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetlabel for new issues that we didn't triage yetprovider:googleGoogle (including GCP) related issuesGoogle (including GCP) related issues