Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Databricks operators' json parameter compatible with XComs, Jinja expression values #35433

Open
1 task done
josh-fell opened this issue Nov 4, 2023 · 6 comments · Fixed by #40471
Open
1 task done

Comments

@josh-fell
Copy link
Contributor

josh-fell commented Nov 4, 2023

Body

A few Databricks operators exist which have a json parameter that can be a JSON object which contains any number of API parameters. In the constructors of these operators, the json value can be added to be other parameters that can be passed to the operator like name, tags, etc.

Since json is a templated field, attempting to modify it in this way will fail/not work as expected if the input arg is a string (e.g. "{{ var.json....}}" or an XComArg (meaning it's an output of a previous task). Template fields are not rendered until just before the execute method is called.

To illustrate the point, let's use this example DAG where we define the json arg in a previous task and use its output:

from __future__ import annotations

from pendulum import datetime
from typing import TYPE_CHECKING, Sequence

from airflow.decorators import dag, task
from airflow.models.baseoperator import BaseOperator

if TYPE_CHECKING:
    from airflow.utils.context import Context


class DatabricksCreateJobsOperator(BaseOperator):
    template_fields: Sequence[str] = ("json", "databricks_conn_id")

    def __init__(
        self,
        *,
        json: dict | None = None,
        name: str | None = None,
        tags: dict[str, str] | None = None,
        **kwargs
    ):
        super().__init__(**kwargs)
        self.json = json or {}
        if name is not None:
            self.json["name"] = name
        if tags is not None:
            self.json["tags"] = tags

    def execute(context: Context) -> None:
        pass

@dag(start_date=datetime(2023, 1, 1), schedule=None)
def derived_template_fields():
    @task
    def push_json() -> dict[str, str]:
        return {"key1": "val1", "key2": "val2"}

    json = push_json()

    DatabricksCreateJobsOperator(
        task_id="create_job_w_json", json=json, name="some_name", tags={"key3": "value3"}
    )


derived_template_fields()

DAG parsing fails with:

Running: airflow dags reserialize
[2023-08-31T14:29:57.796+0000] {utils.py:430} WARNING - No module named 'paramiko'
[2023-08-31T14:29:57.816+0000] {utils.py:430} WARNING - No module named 'airflow.providers.dbt'
[2023-08-31T14:29:58.533+0000] {dagbag.py:539} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2023-08-31T14:29:58.615+0000] {dagbag.py:347} ERROR - Failed to import: /usr/local/airflow/dags/derived_template_fields.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 343, in parse
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 940, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/usr/local/airflow/dags/derived_template_fields.py", line 50, in <module>
    derived_template_fields()
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 3798, in factory
    f(**f_kwargs)
  File "/usr/local/airflow/dags/derived_template_fields.py", line 41, in derived_template_fields
    DatabricksCreateJobsOperator(
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 436, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/derived_template_fields.py", line 27, in __init__
    self.json["name"] = name
    ~~~~~~~~~^^^^^^^^
TypeError: 'PlainXComArg' object does not support item assignment

Even if we change the json arg assignment to use the classic XCom Jinja template approach (i.e. json = "{{ ti.xcom_pull(task_ids='push_json') }}"), the DAG fails to parse:

Running: airflow dags reserialize
[2023-08-31T14:32:01.553+0000] {utils.py:430} WARNING - No module named 'paramiko'
[2023-08-31T14:32:01.574+0000] {utils.py:430} WARNING - No module named 'airflow.providers.dbt'
[2023-08-31T14:32:02.341+0000] {dagbag.py:539} INFO - Filling up the DagBag from /usr/local/airflow/dags
[2023-08-31T14:32:02.415+0000] {dagbag.py:347} ERROR - Failed to import: /usr/local/airflow/dags/derived_template_fields.py
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dagbag.py", line 343, in parse
    loader.exec_module(new_module)
  File "<frozen importlib._bootstrap_external>", line 940, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/usr/local/airflow/dags/derived_template_fields.py", line 51, in <module>
    derived_template_fields()
  File "/usr/local/lib/python3.11/site-packages/airflow/models/dag.py", line 3798, in factory
    f(**f_kwargs)
  File "/usr/local/airflow/dags/derived_template_fields.py", line 42, in derived_template_fields
    DatabricksCreateJobsOperator(
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 436, in apply_defaults
    result = func(self, **kwargs, default_args=default_args)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/dags/derived_template_fields.py", line 27, in __init__
    self.json["name"] = name
    ~~~~~~~~~^^^^^^^^
TypeError: 'str' object does not support item assignment

It would be best to move modifying json (and generally any template field) to the execute method instead.

Currently this impacts the following operators:

  • DatabricksCreateJobsOperator
  • DatabricksSubmitRunOperator
  • DatabricksRunNowOperator

Committer

  • I acknowledge that I am a maintainer/committer of the Apache Airflow project.
@josh-fell josh-fell added kind:meta High-level information important to the community provider:databricks labels Nov 4, 2023
@eladkal eladkal added area:providers kind:feature Feature Requests and removed kind:meta High-level information important to the community labels Nov 4, 2023
@thcidale0808
Copy link

@josh-fell I'm happy to contribute with this one. Can this be assigned to me?

@josh-fell
Copy link
Contributor Author

@thcidale0808 Absolutely! All yours.

@LRancez
Copy link

LRancez commented Feb 27, 2024

I can confirm that this is still happening at version 2.8.2. I tried to workaround the issue using docker images with previous versions but the error persists. I went back many many versions, at least until version 2.3.0, and the issue persists.
Thanks for the support, hope you can fix this soon.

@josh-fell
Copy link
Contributor Author

@thcidale0808 Are you still working on this one?

@boraberke
Copy link
Contributor

I can take this over if @thcidale0808 is not working on it anymore. Could you please assign this to me @josh-fell?

@josh-fell josh-fell assigned boraberke and unassigned thcidale0808 Jun 25, 2024
@josh-fell
Copy link
Contributor Author

All yours @boraberke!

@pankajkoti pankajkoti reopened this Jul 26, 2024
pankajkoti added a commit that referenced this issue Jul 27, 2024
* Revert "Fix named parameters templating in Databricks operators (#40864)"

This reverts commit cfe1d53.

* Revert "Make Databricks operators' json parameter compatible with XComs, Jinja expression values (#40471)"

This reverts commit 4fb2140.

This reverts PR #40864 and PR #40471.

Previously, PR #40471 was contributed to address issue #35433. 
However, that contribution gave rise to another issue #40788. 
Next #40788 was being attempted to be resolved in PR #40864. 
However, with the second PR, it appears that the previous old 
issue #35433 has [resurfaced](#40864 (comment)). So, at the moment, the case is 
that we have 2 PRs on top of the existing implementation 
eventually having nil effect and the previous issues persists. 
I believe it is better to revert those 2 PRs, reopen the earlier 
issue #35433 and peacefully address it by taking the needed time.
kosteev pushed a commit to GoogleCloudPlatform/composer-airflow that referenced this issue Nov 9, 2024
* Revert "Fix named parameters templating in Databricks operators (#40864)"

This reverts commit cfe1d53ed041ea903292e3789e1a5238db5b5031.

* Revert "Make Databricks operators' json parameter compatible with XComs, Jinja expression values (#40471)"

This reverts commit 4fb2140f393b6332903fb833151c2ce8a9c66fe2.

This reverts PR #40864 and PR #40471.

Previously, PR apache/airflow#40471 was contributed to address issue apache/airflow#35433.
However, that contribution gave rise to another issue apache/airflow#40788.
Next apache/airflow#40788 was being attempted to be resolved in PR #40864.
However, with the second PR, it appears that the previous old
issue #35433 has [resurfaced](apache/airflow#40864 (comment)). So, at the moment, the case is
that we have 2 PRs on top of the existing implementation
eventually having nil effect and the previous issues persists.
I believe it is better to revert those 2 PRs, reopen the earlier
issue #35433 and peacefully address it by taking the needed time.

GitOrigin-RevId: 4535e08b862e2b7ff4f2a76de7124983d4efe9db
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants