Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions providers/databricks/docs/operators/sql_statements.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
.. Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

.. http://www.apache.org/licenses/LICENSE-2.0

.. Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

.. _howto/operator:DatabricksSQLStatementsOperator:


DatabricksSQLStatementsOperator
===============================

Use the :class:`~airflow.providers.databricks.operators.databricks.DatabricksSQLStatementsOperator` to submit a
Databricks SQL Statement to Databricks using the
`Databricks SQL Statement Execution API <https://docs.databricks.com/api/workspace/statementexecution>`_.


Using the Operator
------------------

The ``DatabricksSQLStatementsOperator`` submits SQL statements to Databricks using the
`/api/2.0/sql/statements/ <https://docs.databricks.com/api/workspace/statementexecution/executestatement>`_ endpoint.
It supports configurable execution parameters such as warehouse selection, catalog, schema, and parameterized queries.
The operator can either synchronously poll for query completion or run in a deferrable mode for improved efficiency.

The only required parameters for using the operator are:

* ``statement`` - The SQL statement to execute. The statement can optionally be parameterized, see parameters.
* ``warehouse_id`` - Warehouse upon which to execute a statement.

All other parameters are optional and described in the documentation for ``DatabricksSQLStatementsOperator`` including
but not limited to:

* ``catalog``
* ``schema``
* ``parameters``

Examples
--------

An example usage of the ``DatabricksSQLStatementsOperator`` is as follows:

.. exampleinclude:: /../../databricks/tests/system/databricks/example_databricks.py
:language: python
:start-after: [START howto_operator_sql_statements]
:end-before: [END howto_operator_sql_statements]
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
WORKSPACE_GET_STATUS_ENDPOINT = ("GET", "api/2.0/workspace/get-status")

SPARK_VERSIONS_ENDPOINT = ("GET", "api/2.0/clusters/spark-versions")
SQL_STATEMENTS_ENDPOINT = "api/2.0/sql/statements"


class RunLifeCycleState(Enum):
Expand Down Expand Up @@ -189,6 +190,67 @@ def from_json(cls, data: str) -> ClusterState:
return ClusterState(**json.loads(data))


class SQLStatementState:
"""Utility class for the SQL statement state concept of Databricks statements."""

SQL_STATEMENT_LIFE_CYCLE_STATES = [
"PENDING",
"RUNNING",
"SUCCEEDED",
"FAILED",
"CANCELED",
"CLOSED",
]

def __init__(
self, state: str = "", error_code: str = "", error_message: str = "", *args, **kwargs
) -> None:
if state not in self.SQL_STATEMENT_LIFE_CYCLE_STATES:
raise AirflowException(
f"Unexpected SQL statement life cycle state: {state}: If the state has "
"been introduced recently, please check the Databricks user "
"guide for troubleshooting information"
)

self.state = state
self.error_code = error_code
self.error_message = error_message

@property
def is_terminal(self) -> bool:
"""True if the current state is a terminal state."""
return self.state in ("SUCCEEDED", "FAILED", "CANCELED", "CLOSED")

@property
def is_running(self) -> bool:
"""True if the current state is running."""
return self.state in ("PENDING", "RUNNING")

@property
def is_successful(self) -> bool:
"""True if the state is SUCCEEDED."""
return self.state == "SUCCEEDED"

def __eq__(self, other: object) -> bool:
if not isinstance(other, SQLStatementState):
return NotImplemented
return (
self.state == other.state
and self.error_code == other.error_code
and self.error_message == other.error_message
)

def __repr__(self) -> str:
return str(self.__dict__)

def to_json(self) -> str:
return json.dumps(self.__dict__)

@classmethod
def from_json(cls, data: str) -> SQLStatementState:
return SQLStatementState(**json.loads(data))


class DatabricksHook(BaseDatabricksHook):
"""
Interact with Databricks.
Expand Down Expand Up @@ -709,6 +771,54 @@ def update_job_permission(self, job_id: int, json: dict[str, Any]) -> dict:
"""
return self._do_api_call(("PATCH", f"api/2.0/permissions/jobs/{job_id}"), json)

def post_sql_statement(self, json: dict[str, Any]) -> str:
"""
Submit a SQL statement to the Databricks SQL Statements endpoint.

:param json: The data used in the body of the request to the SQL Statements endpoint.
:return: The statement_id as a string.
"""
response = self._do_api_call(("POST", f"{SQL_STATEMENTS_ENDPOINT}"), json)
return response["statement_id"]

def get_sql_statement_state(self, statement_id: str) -> SQLStatementState:
"""
Retrieve run state of the SQL statement.

:param statement_id: ID of the SQL statement.
:return: state of the SQL statement.
"""
get_statement_endpoint = ("GET", f"{SQL_STATEMENTS_ENDPOINT}/{statement_id}")
response = self._do_api_call(get_statement_endpoint)
state = response["status"]["state"]
error_code = response["status"].get("error", {}).get("error_code", "")
error_message = response["status"].get("error", {}).get("message", "")
return SQLStatementState(state, error_code, error_message)

async def a_get_sql_statement_state(self, statement_id: str) -> SQLStatementState:
"""
Async version of `get_sql_statement_state`.

:param statement_id: ID of the SQL statement
:return: state of the SQL statement
"""
get_sql_statement_endpoint = ("GET", f"{SQL_STATEMENTS_ENDPOINT}/{statement_id}")
response = await self._a_do_api_call(get_sql_statement_endpoint)
state = response["status"]["state"]
error_code = response["status"].get("error", {}).get("error_code", "")
error_message = response["status"].get("error", {}).get("message", "")
return SQLStatementState(state, error_code, error_message)

def cancel_sql_statement(self, statement_id: str) -> None:
"""
Cancel the SQL statement.

:param statement_id: ID of the SQL statement
"""
self.log.info("Canceling SQL statement with ID: %s", statement_id)
cancel_sql_statement_endpoint = ("POST", f"{SQL_STATEMENTS_ENDPOINT}/{statement_id}/cancel")
self._do_api_call(cancel_sql_statement_endpoint)

def test_connection(self) -> tuple[bool, str]:
"""Test the Databricks connectivity from UI."""
hook = DatabricksHook(databricks_conn_id=self.databricks_conn_id)
Expand Down
Loading