Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DatabricksRunNowOperator failing as named parameters Jinja templating not getting resolved #40788

Closed
1 of 2 tasks
vatsrahul1001 opened this issue Jul 15, 2024 · 10 comments · Fixed by #40864
Closed
1 of 2 tasks

Comments

@vatsrahul1001
Copy link
Collaborator

vatsrahul1001 commented Jul 15, 2024

Apache Airflow version

main (development)

If "Other Airflow 2 version" selected, which one?

No response

What happened?

DatabricksRunNowOperator started failing after upgrading to 6.7.0 version with the below error


[2024-07-15, 05:29:05 UTC] {taskinstance.py:2905} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 563, in _do_api_call
    for attempt in self._get_retry_object():
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 435, in __iter__
    do = self.iter(retry_state=retry_state)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 368, in iter
    result = action(retry_state)
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/tenacity/__init__.py", line 390, in <lambda>
    self._add_action_func(lambda rs: rs.outcome.result())
                                     ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 573, in _do_api_call
    response.raise_for_status()
  File "/usr/local/lib/python3.11/site-packages/requests/models.py", line 1021, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 400 Client Error: Bad Request for url: https://adb-2703548196728655.15.azuredatabricks.net/api/2.1/jobs/run-now
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 460, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 432, in _execute_callable
    return execute_callable(context=context, **execute_callable_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 401, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/operators/databricks.py", line 862, in execute
    self.run_id = hook.run_now(self.json)
                  ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks.py", line 243, in run_now
    response = self._do_api_call(RUN_NOW_ENDPOINT, json)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.11/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 580, in _do_api_call
    raise AirflowException(msg)
airflow.exceptions.AirflowException: Response: {"error_code":"INVALID_PARAMETER_VALUE","message":"Job 0 does not exist."}, Status Code: 400

I have verified it works well with 6.6.0 version

What you think should happen instead?

No response

How to reproduce

  1. Try to run below DAG with databricks provider 6.7.0

import json
import os
from datetime import timedelta
from typing import Dict, Optional

from airflow.models.dag import DAG
from airflow.utils.timezone import datetime

from airflow.providers.databricks.operators.databricks import (
DatabricksRunNowOperator,
DatabricksSubmitRunOperator,
)

DATABRICKS_CONN_ID = os.getenv("ASTRO_DATABRICKS_CONN_ID", "databricks_default")
# Notebook path as a Json object
notebook_task = '{"notebook_path": "/Users/x/quick_start"}'
NOTEBOOK_TASK = json.loads(os.getenv("DATABRICKS_NOTEBOOK_TASK", notebook_task))
notebook_params: Optional[Dict[str, str]] = {"Variable": "5"}
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))

default_args = {
    "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
    "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
    "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}

new_cluster = {
    "num_workers": 1,
    "spark_version": "10.4.x-scala2.12",
    "spark_conf": {},
    "azure_attributes": {
        "availability": "ON_DEMAND_AZURE",
        "spot_bid_max_price": -1,
    },
    "node_type_id": "Standard_D3_v2",
    "ssh_public_keys": [],
    "custom_tags": {},
    "spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"},
    "cluster_source": "JOB",
    "init_scripts": [],
}


with DAG(
    dag_id="example_async_databricks",
    start_date=datetime(2022, 1, 1),
    schedule=None,
    catchup=False,
    default_args=default_args,
    tags=["example", "async", "databricks"],
) as dag:
    # [START howto_operator_databricks_submit_run_async]
    opr_submit_run = DatabricksSubmitRunOperator(
        task_id="submit_run",
        databricks_conn_id=DATABRICKS_CONN_ID,
        new_cluster=new_cluster,
        notebook_task=NOTEBOOK_TASK,
        do_xcom_push=True,
        deferrable=True
    )
    # [END howto_operator_databricks_submit_run_async]

    # [START howto_operator_databricks_run_now_async]
    opr_run_now = DatabricksRunNowOperator(
        task_id="run_now",
        databricks_conn_id=DATABRICKS_CONN_ID,
        job_id="{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}",
        notebook_params=notebook_params,
        deferrable=True
    )
    # [END howto_operator_databricks_run_now_async]

opr_submit_run >> opr_run_now

Operating System

Linux

Versions of Apache Airflow Providers

databricks 6.7.0

Deployment

Astronomer

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@vatsrahul1001 vatsrahul1001 added kind:bug This is a clearly a bug area:core needs-triage label for new issues that we didn't triage yet provider:databricks and removed needs-triage label for new issues that we didn't triage yet labels Jul 15, 2024
@vatsrahul1001
Copy link
Collaborator Author

Looks like its related to #40471

@eladkal
Copy link
Contributor

eladkal commented Jul 15, 2024

cc @boraberke

@boraberke
Copy link
Contributor

Hi @vatsrahul1001,

job_id field of DatabricksRunNowOperator is not a templated field which might be the cause of the issue. Before #40471 constructor added job_id inside the json parameter before rendering the templated field which supported job_id as a template field.

A workaround for this could be setting json parameter as

json={"job_id": "{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}"},

instead of specifying explicit job_id parameter:

job_id="{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}",

I do not have test environment for Databricks to test if my assumption works. Let me know if this fixes the problem.

@wolfier
Copy link
Contributor

wolfier commented Jul 16, 2024

Previously, before 6.7.0, even though the named parameters were not templated, they were placed in a templated field named json in the init function. When execute is called, the template field json is resolved.

In 6.7.0, the change made it so that the named parameters are saved to a non-templated field overridden_json_params to be later used in the execute function in via calling _setup_and_validate_json. This means that named parameters that would have been templated are no longer resolved.

@boraberke Was this an intended change?

@vatsrahul1001 vatsrahul1001 changed the title DatabricksRunNowOperator failing with Response: {"error_code":"INVALID_PARAMETER_VALUE","message":"Job 0 does not exist."} DatabricksRunNowOperator failing as named parameters Jinja templating not getting resolved Jul 16, 2024
@vatsrahul1001
Copy link
Collaborator Author

vatsrahul1001 commented Jul 16, 2024

@boraberke

Hi @vatsrahul1001,

job_id field of DatabricksRunNowOperator is not a templated field which might be the cause of the issue. Before #40471 constructor added job_id inside the json parameter before rendering the templated field which supported job_id as a template field.

A workaround for this could be setting json parameter as

json={"job_id": "{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}"},

instead of specifying explicit job_id parameter:

job_id="{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}",

I do not have test environment for Databricks to test if my assumption works. Let me know if this fixes the problem.

I tried using templating json param as expected, however, existing example DAG using named param with templating should not break by this new change. As mentioned here using only named params instead of json is very common in use.

@boraberke
Copy link
Contributor

Previously, before 6.7.0, even though the named parameters were not templated, they were placed in a templated field named json in the init function. When execute is called, the template field json is resolved.

In 6.7.0, the change made it so that the named parameters are saved to a non-templated field overridden_json_params to be later used in the execute function in via calling _setup_and_validate_json. This means that named parameters that would have been templated are no longer resolved.

@boraberke Was this an intended change?

#40471 intended to solve the issue #35433, which is to fix json parameter to be templated as it should be. However, as you stated, named parameters that were implicitly templated (i.e. not in the template_fields but merged with json) are no longer resolved correctly.

This affected all of the below operators:

  • DatabricksCreateJobsOperator
  • DatabricksSubmitRunOperator
  • DatabricksRunNowOperator

I tried using templating json param as expected, however, existing example DAG using named param with templating should not break by this new change. As mentioned here using only named params instead of json is very common in use.

I agree @vatsrahul1001, apparently docs mentioned some of the params, including job_id as templated here but I did not see them before.

Adding necessary named params into template_fields may be a way to fix it. WDYT @wolfier @vatsrahul1001?

Additionally, @potiuk should we revert #40471 or add a new commit that fixes this issue?

@vatsrahul1001
Copy link
Collaborator Author

vatsrahul1001 commented Jul 16, 2024

@boraberke As per documentation Template substitution occurs just before the pre_execute function of your operator is called. I don't think so adding named params into template_fields will resolve this

@potiuk
Copy link
Member

potiuk commented Jul 16, 2024

Additionally, @potiuk should we revert #40471 or add a new commit that fixes this issue?

Fix will be best

@Stormhand
Copy link

Hello,
these changes also broke my code where im using jinja templating in the notebook_params for DatabricksRunNowOperator

@vatsrahul1001
Copy link
Collaborator Author

@boraberke are you working on a fix?

romsharon98 pushed a commit to romsharon98/airflow that referenced this issue Jul 26, 2024
This PR fixes the many named parameters that was templated and was broken with apache#40471.

The following operators are affected:

DatabricksCreateJobsOperator
DatabricksSubmitRunOperator
DatabricksRunNowOperator

closes: apache#40788
pankajkoti added a commit that referenced this issue Jul 27, 2024
* Revert "Fix named parameters templating in Databricks operators (#40864)"

This reverts commit cfe1d53.

* Revert "Make Databricks operators' json parameter compatible with XComs, Jinja expression values (#40471)"

This reverts commit 4fb2140.

This reverts PR #40864 and PR #40471.

Previously, PR #40471 was contributed to address issue #35433. 
However, that contribution gave rise to another issue #40788. 
Next #40788 was being attempted to be resolved in PR #40864. 
However, with the second PR, it appears that the previous old 
issue #35433 has [resurfaced](#40864 (comment)). So, at the moment, the case is 
that we have 2 PRs on top of the existing implementation 
eventually having nil effect and the previous issues persists. 
I believe it is better to revert those 2 PRs, reopen the earlier 
issue #35433 and peacefully address it by taking the needed time.
molcay pushed a commit to VladaZakharova/airflow that referenced this issue Aug 19, 2024
…e#41050)

* Revert "Fix named parameters templating in Databricks operators (apache#40864)"

This reverts commit cfe1d53.

* Revert "Make Databricks operators' json parameter compatible with XComs, Jinja expression values (apache#40471)"

This reverts commit 4fb2140.

This reverts PR apache#40864 and PR apache#40471.

Previously, PR apache#40471 was contributed to address issue apache#35433. 
However, that contribution gave rise to another issue apache#40788. 
Next apache#40788 was being attempted to be resolved in PR apache#40864. 
However, with the second PR, it appears that the previous old 
issue apache#35433 has [resurfaced](apache#40864 (comment)). So, at the moment, the case is 
that we have 2 PRs on top of the existing implementation 
eventually having nil effect and the previous issues persists. 
I believe it is better to revert those 2 PRs, reopen the earlier 
issue apache#35433 and peacefully address it by taking the needed time.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

6 participants