Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions providers/databricks/docs/operators/jobs_create.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ Using the Operator
------------------

There are three ways to instantiate this operator. In the first way, you can take the JSON payload that you typically use
to call the ``api/2.1/jobs/create`` endpoint and pass it directly to our ``DatabricksCreateJobsOperator`` through the
to call the ``api/2.2/jobs/create`` endpoint and pass it directly to our ``DatabricksCreateJobsOperator`` through the
``json`` parameter. With this approach you get full control over the underlying payload to Jobs REST API, including
execution of Databricks jobs with multiple tasks, but it's harder to detect errors because of the lack of the type checking.

The second way to accomplish the same thing is to use the named parameters of the ``DatabricksCreateJobsOperator`` directly. Note that there is exactly
one named parameter for each top level parameter in the ``api/2.1/jobs/create`` endpoint.
one named parameter for each top level parameter in the ``api/2.2/jobs/create`` endpoint.

The third way is to use both the json parameter **AND** the named parameters. They will be merged
together. If there are conflicts during the merge, the named parameters will take precedence and
Expand Down
4 changes: 2 additions & 2 deletions providers/databricks/docs/operators/run_now.rst
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ DatabricksRunNowOperator
========================

Use the :class:`~airflow.providers.databricks.operators.DatabricksRunNowOperator` to trigger a run of an existing Databricks job
via `api/2.1/jobs/run-now <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow>`_ API endpoint.
via `api/2.2/jobs/run-now <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow>`_ API endpoint.


Using the Operator
^^^^^^^^^^^^^^^^^^

There are two ways to instantiate this operator. In the first way, you can take the JSON payload that you typically use
to call the ``api/2.1/jobs/run-now`` endpoint and pass it directly to our ``DatabricksRunNowOperator`` through the ``json`` parameter.
to call the ``api/2.2/jobs/run-now`` endpoint and pass it directly to our ``DatabricksRunNowOperator`` through the ``json`` parameter.

Another way to accomplish the same thing is to use the named parameters of the ``DatabricksRunNowOperator`` directly.
Note that there is exactly one named parameter for each top level parameter in the ``jobs/run-now`` endpoint.
Expand Down
6 changes: 3 additions & 3 deletions providers/databricks/docs/operators/submit_run.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ DatabricksSubmitRunOperator
===========================

Use the :class:`~airflow.providers.databricks.operators.DatabricksSubmitRunOperator` to submit
a new Databricks job via Databricks `api/2.1/jobs/runs/submit <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit>`_ API endpoint.
a new Databricks job via Databricks `api/2.2/jobs/runs/submit <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit>`_ API endpoint.


Using the Operator
------------------

There are three ways to instantiate this operator. In the first way, you can take the JSON payload that you typically use
to call the ``api/2.1/jobs/runs/submit`` endpoint and pass it directly to our ``DatabricksSubmitRunOperator`` through the
to call the ``api/2.2/jobs/runs/submit`` endpoint and pass it directly to our ``DatabricksSubmitRunOperator`` through the
``json`` parameter. With this approach you get full control over the underlying payload to Jobs REST API, including
execution of Databricks jobs with multiple tasks, but it's harder to detect errors because of the lack of the type checking.

Expand Down Expand Up @@ -91,7 +91,7 @@ Currently the named parameters that ``DatabricksSubmitRunOperator`` supports are
task_id="notebook_run", new_cluster=new_cluster, notebook_task=notebook_task
)

Another way to do is use the param tasks to pass array of objects to instantiate this operator. Here the value of tasks param that is used to invoke ``api/2.1/jobs/runs/submit`` endpoint is passed through the ``tasks`` param in ``DatabricksSubmitRunOperator``. Instead of invoking single task, you can pass array of task and submit a one-time run.
Another way to do is use the param tasks to pass array of objects to instantiate this operator. Here the value of tasks param that is used to invoke ``api/2.2/jobs/runs/submit`` endpoint is passed through the ``tasks`` param in ``DatabricksSubmitRunOperator``. Instead of invoking single task, you can pass array of task and submit a one-time run.

.. code-block:: python

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@

This hook enable the submitting and running of jobs to the Databricks platform. Internally the
operators talk to the
``api/2.1/jobs/run-now``
``api/2.2/jobs/run-now``
`endpoint <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow>_`
or the ``api/2.1/jobs/runs/submit``
or the ``api/2.2/jobs/runs/submit``
`endpoint <https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit>`_.
"""

Expand All @@ -37,37 +37,37 @@
from airflow.providers.common.compat.sdk import AirflowException
from airflow.providers.databricks.hooks.databricks_base import BaseDatabricksHook

GET_CLUSTER_ENDPOINT = ("GET", "2.0/clusters/get")
RESTART_CLUSTER_ENDPOINT = ("POST", "2.0/clusters/restart")
START_CLUSTER_ENDPOINT = ("POST", "2.0/clusters/start")
TERMINATE_CLUSTER_ENDPOINT = ("POST", "2.0/clusters/delete")
GET_CLUSTER_ENDPOINT = ("GET", "2.2/clusters/get")
RESTART_CLUSTER_ENDPOINT = ("POST", "2.2/clusters/restart")
START_CLUSTER_ENDPOINT = ("POST", "2.2/clusters/start")
TERMINATE_CLUSTER_ENDPOINT = ("POST", "2.2/clusters/delete")

CREATE_ENDPOINT = ("POST", "2.1/jobs/create")
RESET_ENDPOINT = ("POST", "2.1/jobs/reset")
UPDATE_ENDPOINT = ("POST", "2.1/jobs/update")
RUN_NOW_ENDPOINT = ("POST", "2.1/jobs/run-now")
SUBMIT_RUN_ENDPOINT = ("POST", "2.1/jobs/runs/submit")
GET_RUN_ENDPOINT = ("GET", "2.1/jobs/runs/get")
CANCEL_RUN_ENDPOINT = ("POST", "2.1/jobs/runs/cancel")
DELETE_RUN_ENDPOINT = ("POST", "2.1/jobs/runs/delete")
REPAIR_RUN_ENDPOINT = ("POST", "2.1/jobs/runs/repair")
OUTPUT_RUNS_JOB_ENDPOINT = ("GET", "2.1/jobs/runs/get-output")
CANCEL_ALL_RUNS_ENDPOINT = ("POST", "2.1/jobs/runs/cancel-all")
CREATE_ENDPOINT = ("POST", "2.2/jobs/create")
RESET_ENDPOINT = ("POST", "2.2/jobs/reset")
UPDATE_ENDPOINT = ("POST", "2.2/jobs/update")
RUN_NOW_ENDPOINT = ("POST", "2.2/jobs/run-now")
SUBMIT_RUN_ENDPOINT = ("POST", "2.2/jobs/runs/submit")
GET_RUN_ENDPOINT = ("GET", "2.2/jobs/runs/get")
CANCEL_RUN_ENDPOINT = ("POST", "2.2/jobs/runs/cancel")
DELETE_RUN_ENDPOINT = ("POST", "2.2/jobs/runs/delete")
REPAIR_RUN_ENDPOINT = ("POST", "2.2/jobs/runs/repair")
OUTPUT_RUNS_JOB_ENDPOINT = ("GET", "2.2/jobs/runs/get-output")
CANCEL_ALL_RUNS_ENDPOINT = ("POST", "2.2/jobs/runs/cancel-all")

INSTALL_LIBS_ENDPOINT = ("POST", "2.0/libraries/install")
UNINSTALL_LIBS_ENDPOINT = ("POST", "2.0/libraries/uninstall")
UPDATE_REPO_ENDPOINT = ("PATCH", "2.0/repos/")
DELETE_REPO_ENDPOINT = ("DELETE", "2.0/repos/")
CREATE_REPO_ENDPOINT = ("POST", "2.0/repos")
INSTALL_LIBS_ENDPOINT = ("POST", "2.2/libraries/install")
UNINSTALL_LIBS_ENDPOINT = ("POST", "2.2/libraries/uninstall")
UPDATE_REPO_ENDPOINT = ("PATCH", "2.2/repos/")
DELETE_REPO_ENDPOINT = ("DELETE", "2.2/repos/")
CREATE_REPO_ENDPOINT = ("POST", "2.2/repos")

LIST_JOBS_ENDPOINT = ("GET", "2.1/jobs/list")
LIST_PIPELINES_ENDPOINT = ("GET", "2.0/pipelines")
LIST_SQL_ENDPOINTS_ENDPOINT = ("GET", "2.0/sql/endpoints")
LIST_JOBS_ENDPOINT = ("GET", "2.2/jobs/list")
LIST_PIPELINES_ENDPOINT = ("GET", "2.2/pipelines")
LIST_SQL_ENDPOINTS_ENDPOINT = ("GET", "2.2/sql/endpoints")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This endpoint unfortunately does not exist and I even do think that has been migrated to: https://docs.databricks.com/api/workspace/warehouses/list

[2026-01-13 03:32:41] ERROR - Task failed with exception source=task loc=task_runner.py:986
AirflowException: Response: {"error":"Bad Target: /api/2.2/sql/endpoints"}
, Status Code: 404

HTTPError: 404 Client Error: Not Found for url: https://dbc-instance-random.cloud.databricks.com/api/2.2/sql/endpoints

File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 704 in _do_api_call
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 445 in __iter__
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 378 in iter
File "/home/airflow/.local/lib/python3.12/site-packages/tenacity/__init__.py", line 400 in <lambda>
File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 449 in result
File "/usr/python/lib/python3.12/concurrent/futures/_base.py", line 401 in __get_result
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/providers/databricks/hooks/databricks_base.py", line 723 in _do_api_call
File "/home/airflow/.local/lib/python3.12/site-packages/requests/models.py", line 1026 in raise_for_status

Previous version api/2.0/sql/endpoints works is there even though is not listed in the Databricks documentation.


WORKSPACE_GET_STATUS_ENDPOINT = ("GET", "2.0/workspace/get-status")
WORKSPACE_GET_STATUS_ENDPOINT = ("GET", "2.2/workspace/get-status")

SPARK_VERSIONS_ENDPOINT = ("GET", "2.0/clusters/spark-versions")
SQL_STATEMENTS_ENDPOINT = "2.0/sql/statements"
SPARK_VERSIONS_ENDPOINT = ("GET", "2.2/clusters/spark-versions")
SQL_STATEMENTS_ENDPOINT = "2.2/sql/statements"


class RunLifeCycleState(Enum):
Expand Down Expand Up @@ -293,7 +293,7 @@ def __init__(

def create_job(self, json: dict) -> int:
"""
Call the ``api/2.1/jobs/create`` endpoint.
Call the ``api/2.2/jobs/create`` endpoint.

:param json: The data used in the body of the request to the ``create`` endpoint.
:return: the job_id as an int
Expand All @@ -303,7 +303,7 @@ def create_job(self, json: dict) -> int:

def reset_job(self, job_id: str, json: dict) -> None:
"""
Call the ``api/2.1/jobs/reset`` endpoint.
Call the ``api/2.2/jobs/reset`` endpoint.

:param json: The data used in the new_settings of the request to the ``reset`` endpoint.
"""
Expand All @@ -321,7 +321,7 @@ def reset_job(self, job_id: str, json: dict) -> None:

def update_job(self, job_id: str, json: dict) -> None:
"""
Call the ``api/2.1/jobs/update`` endpoint.
Call the ``api/2.2/jobs/update`` endpoint.

:param job_id: The id of the job to update.
:param json: The data used in the new_settings of the request to the ``update`` endpoint.
Expand All @@ -330,7 +330,7 @@ def update_job(self, job_id: str, json: dict) -> None:

def run_now(self, json: dict) -> int:
"""
Call the ``api/2.1/jobs/run-now`` endpoint.
Call the ``api/2.2/jobs/run-now`` endpoint.

:param json: The data used in the body of the request to the ``run-now`` endpoint.
:return: the run_id as an int
Expand All @@ -340,7 +340,7 @@ def run_now(self, json: dict) -> int:

def submit_run(self, json: dict) -> int:
"""
Call the ``api/2.1/jobs/runs/submit`` endpoint.
Call the ``api/2.2/jobs/runs/submit`` endpoint.

:param json: The data used in the body of the request to the ``submit`` endpoint.
:return: the run_id as an int
Expand Down Expand Up @@ -385,9 +385,9 @@ def list_jobs(
all_jobs += [j for j in jobs if j["settings"]["name"] == job_name]
else:
all_jobs += jobs
has_more = response.get("has_more", False)
if has_more:
page_token = response.get("next_page_token", "")
# issue-59189: API v2.2 removes "has_more" field
page_token = response.get("next_page_token", "")
has_more = bool(page_token)

return all_jobs

Expand Down Expand Up @@ -717,7 +717,7 @@ def install(self, json: dict) -> None:
"""
Install libraries on the cluster.

Utility function to call the ``2.0/libraries/install`` endpoint.
Utility function to call the ``2.2/libraries/install`` endpoint.

:param json: json dictionary containing cluster_id and an array of library
"""
Expand All @@ -727,7 +727,7 @@ def uninstall(self, json: dict) -> None:
"""
Uninstall libraries on the cluster.

Utility function to call the ``2.0/libraries/uninstall`` endpoint.
Utility function to call the ``2.2/libraries/uninstall`` endpoint.

:param json: json dictionary containing cluster_id and an array of library
"""
Expand Down Expand Up @@ -790,7 +790,7 @@ def update_job_permission(self, job_id: int, json: dict[str, Any]) -> dict:
:param json: payload
:return: json containing permission specification
"""
return self._do_api_call(("PATCH", f"2.0/permissions/jobs/{job_id}"), json)
return self._do_api_call(("PATCH", f"2.2/permissions/jobs/{job_id}"), json)

def post_sql_statement(self, json: dict[str, Any]) -> str:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class DatabricksCreateJobsOperator(BaseOperator):
https://docs.databricks.com/api/workspace/jobs/reset

:param json: A JSON object containing API parameters which will be passed
directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters
directly to the ``api/2.2/jobs/create`` endpoint. The other named parameters
(i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will
be merged with this json dictionary if they are provided.
If there are conflicts during the merge, the named parameters will
Expand Down Expand Up @@ -391,7 +391,7 @@ def execute(self, context: Context) -> int:

class DatabricksSubmitRunOperator(BaseOperator):
"""
Submits a Spark job run to Databricks using the api/2.1/jobs/runs/submit API endpoint.
Submits a Spark job run to Databricks using the api/2.2/jobs/runs/submit API endpoint.

See: https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit

Expand All @@ -406,7 +406,7 @@ class DatabricksSubmitRunOperator(BaseOperator):
.. seealso::
https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit
:param json: A JSON object containing API parameters which will be passed
directly to the ``api/2.1/jobs/runs/submit`` endpoint. The other named parameters
directly to the ``api/2.2/jobs/runs/submit`` endpoint. The other named parameters
(i.e. ``spark_jar_task``, ``notebook_task``..) to this operator will
be merged with this json dictionary if they are provided.
If there are conflicts during the merge, the named parameters will
Expand Down Expand Up @@ -644,14 +644,14 @@ def execute_complete(self, context: dict | None, event: dict):

class DatabricksRunNowOperator(BaseOperator):
"""
Runs an existing Spark job run to Databricks using the api/2.1/jobs/run-now API endpoint.
Runs an existing Spark job run to Databricks using the api/2.2/jobs/run-now API endpoint.

See: https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow

There are two ways to instantiate this operator.

In the first way, you can take the JSON payload that you typically use
to call the ``api/2.1/jobs/run-now`` endpoint and pass it directly
to call the ``api/2.2/jobs/run-now`` endpoint and pass it directly
to our ``DatabricksRunNowOperator`` through the ``json`` parameter.
For example ::

Expand Down Expand Up @@ -729,7 +729,7 @@ class DatabricksRunNowOperator(BaseOperator):
https://docs.databricks.com/en/workflows/jobs/settings.html#add-parameters-for-all-job-tasks

:param json: A JSON object containing API parameters which will be passed
directly to the ``api/2.1/jobs/run-now`` endpoint. The other named parameters
directly to the ``api/2.2/jobs/run-now`` endpoint. The other named parameters
(i.e. ``notebook_params``, ``spark_submit_params``..) to this operator will
be merged with this json dictionary if they are provided.
If there are conflicts during the merge, the named parameters will
Expand Down
Loading