diff --git a/providers/databricks/docs/operators/jobs_create.rst b/providers/databricks/docs/operators/jobs_create.rst index 1f4b6b5d05f58..fd2d9a906ff13 100644 --- a/providers/databricks/docs/operators/jobs_create.rst +++ b/providers/databricks/docs/operators/jobs_create.rst @@ -31,12 +31,12 @@ Using the Operator ------------------ There are three ways to instantiate this operator. In the first way, you can take the JSON payload that you typically use -to call the ``api/2.1/jobs/create`` endpoint and pass it directly to our ``DatabricksCreateJobsOperator`` through the +to call the ``api/2.2/jobs/create`` endpoint and pass it directly to our ``DatabricksCreateJobsOperator`` through the ``json`` parameter. With this approach you get full control over the underlying payload to Jobs REST API, including execution of Databricks jobs with multiple tasks, but it's harder to detect errors because of the lack of the type checking. The second way to accomplish the same thing is to use the named parameters of the ``DatabricksCreateJobsOperator`` directly. Note that there is exactly -one named parameter for each top level parameter in the ``api/2.1/jobs/create`` endpoint. +one named parameter for each top level parameter in the ``api/2.2/jobs/create`` endpoint. The third way is to use both the json parameter **AND** the named parameters. They will be merged together. If there are conflicts during the merge, the named parameters will take precedence and diff --git a/providers/databricks/docs/operators/run_now.rst b/providers/databricks/docs/operators/run_now.rst index 17259b1a1b462..1b51124e5f4c9 100644 --- a/providers/databricks/docs/operators/run_now.rst +++ b/providers/databricks/docs/operators/run_now.rst @@ -21,14 +21,14 @@ DatabricksRunNowOperator ======================== Use the :class:`~airflow.providers.databricks.operators.DatabricksRunNowOperator` to trigger a run of an existing Databricks job -via `api/2.1/jobs/run-now `_ API endpoint. +via `api/2.2/jobs/run-now `_ API endpoint. Using the Operator ^^^^^^^^^^^^^^^^^^ There are two ways to instantiate this operator. In the first way, you can take the JSON payload that you typically use -to call the ``api/2.1/jobs/run-now`` endpoint and pass it directly to our ``DatabricksRunNowOperator`` through the ``json`` parameter. +to call the ``api/2.2/jobs/run-now`` endpoint and pass it directly to our ``DatabricksRunNowOperator`` through the ``json`` parameter. Another way to accomplish the same thing is to use the named parameters of the ``DatabricksRunNowOperator`` directly. Note that there is exactly one named parameter for each top level parameter in the ``jobs/run-now`` endpoint. diff --git a/providers/databricks/docs/operators/submit_run.rst b/providers/databricks/docs/operators/submit_run.rst index 7dd7695ce3d95..522a051d2e7e3 100644 --- a/providers/databricks/docs/operators/submit_run.rst +++ b/providers/databricks/docs/operators/submit_run.rst @@ -24,14 +24,14 @@ DatabricksSubmitRunOperator =========================== Use the :class:`~airflow.providers.databricks.operators.DatabricksSubmitRunOperator` to submit -a new Databricks job via Databricks `api/2.1/jobs/runs/submit `_ API endpoint. +a new Databricks job via Databricks `api/2.2/jobs/runs/submit `_ API endpoint. Using the Operator ------------------ There are three ways to instantiate this operator. In the first way, you can take the JSON payload that you typically use -to call the ``api/2.1/jobs/runs/submit`` endpoint and pass it directly to our ``DatabricksSubmitRunOperator`` through the +to call the ``api/2.2/jobs/runs/submit`` endpoint and pass it directly to our ``DatabricksSubmitRunOperator`` through the ``json`` parameter. With this approach you get full control over the underlying payload to Jobs REST API, including execution of Databricks jobs with multiple tasks, but it's harder to detect errors because of the lack of the type checking. @@ -91,7 +91,7 @@ Currently the named parameters that ``DatabricksSubmitRunOperator`` supports are task_id="notebook_run", new_cluster=new_cluster, notebook_task=notebook_task ) -Another way to do is use the param tasks to pass array of objects to instantiate this operator. Here the value of tasks param that is used to invoke ``api/2.1/jobs/runs/submit`` endpoint is passed through the ``tasks`` param in ``DatabricksSubmitRunOperator``. Instead of invoking single task, you can pass array of task and submit a one-time run. +Another way to do is use the param tasks to pass array of objects to instantiate this operator. Here the value of tasks param that is used to invoke ``api/2.2/jobs/runs/submit`` endpoint is passed through the ``tasks`` param in ``DatabricksSubmitRunOperator``. Instead of invoking single task, you can pass array of task and submit a one-time run. .. code-block:: python diff --git a/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py b/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py index 2ff01c1cbc26b..ca5f93b970642 100644 --- a/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py @@ -20,9 +20,9 @@ This hook enable the submitting and running of jobs to the Databricks platform. Internally the operators talk to the -``api/2.1/jobs/run-now`` +``api/2.2/jobs/run-now`` `endpoint _` -or the ``api/2.1/jobs/runs/submit`` +or the ``api/2.2/jobs/runs/submit`` `endpoint `_. """ @@ -37,37 +37,37 @@ from airflow.providers.common.compat.sdk import AirflowException from airflow.providers.databricks.hooks.databricks_base import BaseDatabricksHook -GET_CLUSTER_ENDPOINT = ("GET", "2.0/clusters/get") -RESTART_CLUSTER_ENDPOINT = ("POST", "2.0/clusters/restart") -START_CLUSTER_ENDPOINT = ("POST", "2.0/clusters/start") -TERMINATE_CLUSTER_ENDPOINT = ("POST", "2.0/clusters/delete") +GET_CLUSTER_ENDPOINT = ("GET", "2.2/clusters/get") +RESTART_CLUSTER_ENDPOINT = ("POST", "2.2/clusters/restart") +START_CLUSTER_ENDPOINT = ("POST", "2.2/clusters/start") +TERMINATE_CLUSTER_ENDPOINT = ("POST", "2.2/clusters/delete") -CREATE_ENDPOINT = ("POST", "2.1/jobs/create") -RESET_ENDPOINT = ("POST", "2.1/jobs/reset") -UPDATE_ENDPOINT = ("POST", "2.1/jobs/update") -RUN_NOW_ENDPOINT = ("POST", "2.1/jobs/run-now") -SUBMIT_RUN_ENDPOINT = ("POST", "2.1/jobs/runs/submit") -GET_RUN_ENDPOINT = ("GET", "2.1/jobs/runs/get") -CANCEL_RUN_ENDPOINT = ("POST", "2.1/jobs/runs/cancel") -DELETE_RUN_ENDPOINT = ("POST", "2.1/jobs/runs/delete") -REPAIR_RUN_ENDPOINT = ("POST", "2.1/jobs/runs/repair") -OUTPUT_RUNS_JOB_ENDPOINT = ("GET", "2.1/jobs/runs/get-output") -CANCEL_ALL_RUNS_ENDPOINT = ("POST", "2.1/jobs/runs/cancel-all") +CREATE_ENDPOINT = ("POST", "2.2/jobs/create") +RESET_ENDPOINT = ("POST", "2.2/jobs/reset") +UPDATE_ENDPOINT = ("POST", "2.2/jobs/update") +RUN_NOW_ENDPOINT = ("POST", "2.2/jobs/run-now") +SUBMIT_RUN_ENDPOINT = ("POST", "2.2/jobs/runs/submit") +GET_RUN_ENDPOINT = ("GET", "2.2/jobs/runs/get") +CANCEL_RUN_ENDPOINT = ("POST", "2.2/jobs/runs/cancel") +DELETE_RUN_ENDPOINT = ("POST", "2.2/jobs/runs/delete") +REPAIR_RUN_ENDPOINT = ("POST", "2.2/jobs/runs/repair") +OUTPUT_RUNS_JOB_ENDPOINT = ("GET", "2.2/jobs/runs/get-output") +CANCEL_ALL_RUNS_ENDPOINT = ("POST", "2.2/jobs/runs/cancel-all") -INSTALL_LIBS_ENDPOINT = ("POST", "2.0/libraries/install") -UNINSTALL_LIBS_ENDPOINT = ("POST", "2.0/libraries/uninstall") -UPDATE_REPO_ENDPOINT = ("PATCH", "2.0/repos/") -DELETE_REPO_ENDPOINT = ("DELETE", "2.0/repos/") -CREATE_REPO_ENDPOINT = ("POST", "2.0/repos") +INSTALL_LIBS_ENDPOINT = ("POST", "2.2/libraries/install") +UNINSTALL_LIBS_ENDPOINT = ("POST", "2.2/libraries/uninstall") +UPDATE_REPO_ENDPOINT = ("PATCH", "2.2/repos/") +DELETE_REPO_ENDPOINT = ("DELETE", "2.2/repos/") +CREATE_REPO_ENDPOINT = ("POST", "2.2/repos") -LIST_JOBS_ENDPOINT = ("GET", "2.1/jobs/list") -LIST_PIPELINES_ENDPOINT = ("GET", "2.0/pipelines") -LIST_SQL_ENDPOINTS_ENDPOINT = ("GET", "2.0/sql/endpoints") +LIST_JOBS_ENDPOINT = ("GET", "2.2/jobs/list") +LIST_PIPELINES_ENDPOINT = ("GET", "2.2/pipelines") +LIST_SQL_ENDPOINTS_ENDPOINT = ("GET", "2.2/sql/endpoints") -WORKSPACE_GET_STATUS_ENDPOINT = ("GET", "2.0/workspace/get-status") +WORKSPACE_GET_STATUS_ENDPOINT = ("GET", "2.2/workspace/get-status") -SPARK_VERSIONS_ENDPOINT = ("GET", "2.0/clusters/spark-versions") -SQL_STATEMENTS_ENDPOINT = "2.0/sql/statements" +SPARK_VERSIONS_ENDPOINT = ("GET", "2.2/clusters/spark-versions") +SQL_STATEMENTS_ENDPOINT = "2.2/sql/statements" class RunLifeCycleState(Enum): @@ -293,7 +293,7 @@ def __init__( def create_job(self, json: dict) -> int: """ - Call the ``api/2.1/jobs/create`` endpoint. + Call the ``api/2.2/jobs/create`` endpoint. :param json: The data used in the body of the request to the ``create`` endpoint. :return: the job_id as an int @@ -303,7 +303,7 @@ def create_job(self, json: dict) -> int: def reset_job(self, job_id: str, json: dict) -> None: """ - Call the ``api/2.1/jobs/reset`` endpoint. + Call the ``api/2.2/jobs/reset`` endpoint. :param json: The data used in the new_settings of the request to the ``reset`` endpoint. """ @@ -321,7 +321,7 @@ def reset_job(self, job_id: str, json: dict) -> None: def update_job(self, job_id: str, json: dict) -> None: """ - Call the ``api/2.1/jobs/update`` endpoint. + Call the ``api/2.2/jobs/update`` endpoint. :param job_id: The id of the job to update. :param json: The data used in the new_settings of the request to the ``update`` endpoint. @@ -330,7 +330,7 @@ def update_job(self, job_id: str, json: dict) -> None: def run_now(self, json: dict) -> int: """ - Call the ``api/2.1/jobs/run-now`` endpoint. + Call the ``api/2.2/jobs/run-now`` endpoint. :param json: The data used in the body of the request to the ``run-now`` endpoint. :return: the run_id as an int @@ -340,7 +340,7 @@ def run_now(self, json: dict) -> int: def submit_run(self, json: dict) -> int: """ - Call the ``api/2.1/jobs/runs/submit`` endpoint. + Call the ``api/2.2/jobs/runs/submit`` endpoint. :param json: The data used in the body of the request to the ``submit`` endpoint. :return: the run_id as an int @@ -385,9 +385,9 @@ def list_jobs( all_jobs += [j for j in jobs if j["settings"]["name"] == job_name] else: all_jobs += jobs - has_more = response.get("has_more", False) - if has_more: - page_token = response.get("next_page_token", "") + # issue-59189: API v2.2 removes "has_more" field + page_token = response.get("next_page_token", "") + has_more = bool(page_token) return all_jobs @@ -717,7 +717,7 @@ def install(self, json: dict) -> None: """ Install libraries on the cluster. - Utility function to call the ``2.0/libraries/install`` endpoint. + Utility function to call the ``2.2/libraries/install`` endpoint. :param json: json dictionary containing cluster_id and an array of library """ @@ -727,7 +727,7 @@ def uninstall(self, json: dict) -> None: """ Uninstall libraries on the cluster. - Utility function to call the ``2.0/libraries/uninstall`` endpoint. + Utility function to call the ``2.2/libraries/uninstall`` endpoint. :param json: json dictionary containing cluster_id and an array of library """ @@ -790,7 +790,7 @@ def update_job_permission(self, job_id: int, json: dict[str, Any]) -> dict: :param json: payload :return: json containing permission specification """ - return self._do_api_call(("PATCH", f"2.0/permissions/jobs/{job_id}"), json) + return self._do_api_call(("PATCH", f"2.2/permissions/jobs/{job_id}"), json) def post_sql_statement(self, json: dict[str, Any]) -> str: """ diff --git a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py index 98737d459a66a..3eec01be47197 100644 --- a/providers/databricks/src/airflow/providers/databricks/operators/databricks.py +++ b/providers/databricks/src/airflow/providers/databricks/operators/databricks.py @@ -260,7 +260,7 @@ class DatabricksCreateJobsOperator(BaseOperator): https://docs.databricks.com/api/workspace/jobs/reset :param json: A JSON object containing API parameters which will be passed - directly to the ``api/2.1/jobs/create`` endpoint. The other named parameters + directly to the ``api/2.2/jobs/create`` endpoint. The other named parameters (i.e. ``name``, ``tags``, ``tasks``, etc.) to this operator will be merged with this json dictionary if they are provided. If there are conflicts during the merge, the named parameters will @@ -391,7 +391,7 @@ def execute(self, context: Context) -> int: class DatabricksSubmitRunOperator(BaseOperator): """ - Submits a Spark job run to Databricks using the api/2.1/jobs/runs/submit API endpoint. + Submits a Spark job run to Databricks using the api/2.2/jobs/runs/submit API endpoint. See: https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit @@ -406,7 +406,7 @@ class DatabricksSubmitRunOperator(BaseOperator): .. seealso:: https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunsSubmit :param json: A JSON object containing API parameters which will be passed - directly to the ``api/2.1/jobs/runs/submit`` endpoint. The other named parameters + directly to the ``api/2.2/jobs/runs/submit`` endpoint. The other named parameters (i.e. ``spark_jar_task``, ``notebook_task``..) to this operator will be merged with this json dictionary if they are provided. If there are conflicts during the merge, the named parameters will @@ -644,14 +644,14 @@ def execute_complete(self, context: dict | None, event: dict): class DatabricksRunNowOperator(BaseOperator): """ - Runs an existing Spark job run to Databricks using the api/2.1/jobs/run-now API endpoint. + Runs an existing Spark job run to Databricks using the api/2.2/jobs/run-now API endpoint. See: https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow There are two ways to instantiate this operator. In the first way, you can take the JSON payload that you typically use - to call the ``api/2.1/jobs/run-now`` endpoint and pass it directly + to call the ``api/2.2/jobs/run-now`` endpoint and pass it directly to our ``DatabricksRunNowOperator`` through the ``json`` parameter. For example :: @@ -729,7 +729,7 @@ class DatabricksRunNowOperator(BaseOperator): https://docs.databricks.com/en/workflows/jobs/settings.html#add-parameters-for-all-job-tasks :param json: A JSON object containing API parameters which will be passed - directly to the ``api/2.1/jobs/run-now`` endpoint. The other named parameters + directly to the ``api/2.2/jobs/run-now`` endpoint. The other named parameters (i.e. ``notebook_params``, ``spark_submit_params``..) to this operator will be merged with this json dictionary if they are provided. If there are conflicts during the merge, the named parameters will diff --git a/providers/databricks/tests/unit/databricks/hooks/test_databricks.py b/providers/databricks/tests/unit/databricks/hooks/test_databricks.py index d657411b3d84b..dc7eee6af893a 100644 --- a/providers/databricks/tests/unit/databricks/hooks/test_databricks.py +++ b/providers/databricks/tests/unit/databricks/hooks/test_databricks.py @@ -57,7 +57,7 @@ DEFAULT_CONN_ID = "databricks_default" NOTEBOOK_TASK = {"notebook_path": "/test"} SPARK_PYTHON_TASK = {"python_file": "test.py", "parameters": ["--param", "123"]} -NEW_CLUSTER = {"spark_version": "2.0.x-scala2.10", "node_type_id": "r3.xlarge", "num_workers": 1} +NEW_CLUSTER = {"spark_version": "2.2.x-scala2.10", "node_type_id": "r3.xlarge", "num_workers": 1} CLUSTER_ID = "cluster_id" RUN_ID = 1 JOB_ID = 42 @@ -139,145 +139,145 @@ def create_endpoint(host): """ Utility function to generate the create endpoint given the host. """ - return f"https://{host}/api/2.1/jobs/create" + return f"https://{host}/api/2.2/jobs/create" def reset_endpoint(host): """ Utility function to generate the reset endpoint given the host. """ - return f"https://{host}/api/2.1/jobs/reset" + return f"https://{host}/api/2.2/jobs/reset" def update_endpoint(host): """ Utility function to generate the update endpoint given the host. """ - return f"https://{host}/api/2.1/jobs/update" + return f"https://{host}/api/2.2/jobs/update" def run_now_endpoint(host): """ Utility function to generate the run now endpoint given the host. """ - return f"https://{host}/api/2.1/jobs/run-now" + return f"https://{host}/api/2.2/jobs/run-now" def submit_run_endpoint(host): """ Utility function to generate the submit run endpoint given the host. """ - return f"https://{host}/api/2.1/jobs/runs/submit" + return f"https://{host}/api/2.2/jobs/runs/submit" def get_run_endpoint(host): """ Utility function to generate the get run endpoint given the host. """ - return f"https://{host}/api/2.1/jobs/runs/get" + return f"https://{host}/api/2.2/jobs/runs/get" def get_run_output_endpoint(host): """ Utility function to generate the get run output endpoint given the host. """ - return f"https://{host}/api/2.1/jobs/runs/get-output" + return f"https://{host}/api/2.2/jobs/runs/get-output" def cancel_run_endpoint(host): """ Utility function to generate the cancel run endpoint given the host. """ - return f"https://{host}/api/2.1/jobs/runs/cancel" + return f"https://{host}/api/2.2/jobs/runs/cancel" def cancel_all_runs_endpoint(host): """ Utility function to generate the cancel all runs endpoint given the host. """ - return f"https://{host}/api/2.1/jobs/runs/cancel-all" + return f"https://{host}/api/2.2/jobs/runs/cancel-all" def delete_run_endpoint(host): """ Utility function to generate delete run endpoint given the host. """ - return f"https://{host}/api/2.1/jobs/runs/delete" + return f"https://{host}/api/2.2/jobs/runs/delete" def repair_run_endpoint(host): """ Utility function to generate delete run endpoint given the host. """ - return f"https://{host}/api/2.1/jobs/runs/repair" + return f"https://{host}/api/2.2/jobs/runs/repair" def get_cluster_endpoint(host): """ Utility function to generate the get run endpoint given the host. """ - return f"https://{host}/api/2.0/clusters/get" + return f"https://{host}/api/2.2/clusters/get" def start_cluster_endpoint(host): """ Utility function to generate the get run endpoint given the host. """ - return f"https://{host}/api/2.0/clusters/start" + return f"https://{host}/api/2.2/clusters/start" def restart_cluster_endpoint(host): """ Utility function to generate the get run endpoint given the host. """ - return f"https://{host}/api/2.0/clusters/restart" + return f"https://{host}/api/2.2/clusters/restart" def terminate_cluster_endpoint(host): """ Utility function to generate the get run endpoint given the host. """ - return f"https://{host}/api/2.0/clusters/delete" + return f"https://{host}/api/2.2/clusters/delete" def install_endpoint(host): """ Utility function to generate the install endpoint given the host. """ - return f"https://{host}/api/2.0/libraries/install" + return f"https://{host}/api/2.2/libraries/install" def uninstall_endpoint(host): """ Utility function to generate the uninstall endpoint given the host. """ - return f"https://{host}/api/2.0/libraries/uninstall" + return f"https://{host}/api/2.2/libraries/uninstall" def list_jobs_endpoint(host): """ Utility function to generate the list jobs endpoint given the host """ - return f"https://{host}/api/2.1/jobs/list" + return f"https://{host}/api/2.2/jobs/list" def list_pipelines_endpoint(host): """ Utility function to generate the list jobs endpoint given the host """ - return f"https://{host}/api/2.0/pipelines" + return f"https://{host}/api/2.2/pipelines" def list_spark_versions_endpoint(host): """Utility function to generate the list spark versions endpoint given the host""" - return f"https://{host}/api/2.0/clusters/spark-versions" + return f"https://{host}/api/2.2/clusters/spark-versions" def permissions_endpoint(host, job_id): """ Utility function to generate the permissions endpoint given the host """ - return f"https://{host}/api/2.0/permissions/jobs/{job_id}" + return f"https://{host}/api/2.2/permissions/jobs/{job_id}" def create_valid_response_mock(content): @@ -289,7 +289,7 @@ def create_valid_response_mock(content): def sql_statements_endpoint(host): """Utility function to generate the sql statements endpoint given the host.""" - return f"https://{host}/api/2.0/sql/statements" + return f"https://{host}/api/2.2/sql/statements" def create_successful_response_mock(content): @@ -446,7 +446,7 @@ def test_do_api_call_custom_retry(self): def test_do_api_call_patch(self, mock_requests): mock_requests.patch.return_value.json.return_value = {"cluster_name": "new_name"} data = {"cluster_name": "new_name"} - patched_cluster_name = self.hook._do_api_call(("PATCH", "2.1/jobs/runs/submit"), data) + patched_cluster_name = self.hook._do_api_call(("PATCH", "2.2/jobs/runs/submit"), data) assert patched_cluster_name["cluster_name"] == "new_name" mock_requests.patch.assert_called_once_with( @@ -1302,7 +1302,7 @@ def test_update_job_permission(self, mock_requests): self.hook.update_job_permission(1, ACCESS_CONTROL_DICT) mock_requests.patch.assert_called_once_with( - f"https://{HOST}/api/2.0/permissions/jobs/1", + f"https://{HOST}/api/2.2/permissions/jobs/1", json=utils.normalise_json_content(ACCESS_CONTROL_DICT), params=None, auth=HTTPBasicAuth(LOGIN, PASSWORD), @@ -1840,7 +1840,7 @@ async def test_do_api_call_patch(self, mock_patch): ) data = {"cluster_name": "new_name"} async with self.hook: - patched_cluster_name = await self.hook._a_do_api_call(("PATCH", "2.1/jobs/runs/submit"), data) + patched_cluster_name = await self.hook._a_do_api_call(("PATCH", "2.2/jobs/runs/submit"), data) assert patched_cluster_name["cluster_name"] == "new_name" mock_patch.assert_called_once_with( diff --git a/providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py b/providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py index 8e24c7775252f..89beb6a8e9cae 100644 --- a/providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py +++ b/providers/databricks/tests/unit/databricks/hooks/test_databricks_base.py @@ -666,7 +666,7 @@ async def test_a_get_token_not_configured_raises(self, mock_conn): @pytest.mark.parametrize( ("schema", "port", "host", "endpoint", "expected_url"), [ - ("https", 443, "example.com", "api/2.0/jobs/list", "https://example.com:443/api/2.0/jobs/list"), + ("https", 443, "example.com", "api/2.2/jobs/list", "https://example.com:443/api/2.2/jobs/list"), ("http", 8080, "localhost", "status", "http://localhost:8080/status"), (None, None, "my.db.net", "api", "https://my.db.net/api"), ("https", None, "myhost", "v1/info", "https://myhost/v1/info"),