Skip to content

Commit

Permalink
Return Databricks Query History API id as part of AdapterResponse (#376)
Browse files Browse the repository at this point in the history
query_id is now written to run_results.json

Signed-off-by: Jesse Whitehouse <jesse.whitehouse@databricks.com>
  • Loading branch information
Jesse authored Jun 29, 2023
1 parent e3bc534 commit af2435d
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Added support for model contracts ([#336](https://github.com/databricks/dbt-databricks/pull/336))
- Include log events from databricks-sql-connector in dbt logging output.
- Adapter now populates the `query_id` field in `run_results.json` with Query History API query ID.

## dbt-databricks 1.5.1 (May 9, 2023)

Expand Down
30 changes: 29 additions & 1 deletion dbt/adapters/databricks/connections.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import uuid
import logging
import warnings
from contextlib import contextmanager
Expand Down Expand Up @@ -480,6 +481,17 @@ def execute(self, sql: str, bindings: Optional[Sequence[Any]] = None) -> None:
bindings = [self._fix_binding(binding) for binding in bindings]
self._cursor.execute(sql, bindings)

@property
def hex_query_id(self) -> str:
"""Return the hex GUID for this query
This UUID can be tied back to the Databricks query history API
"""

_as_hex = uuid.UUID(bytes=self._cursor.active_result_set.command_id.operationId.guid)

return str(_as_hex)

@classmethod
def _fix_binding(cls, value: Any) -> Any:
"""Convert complex datatypes to primitives that can be loaded by
Expand Down Expand Up @@ -552,6 +564,11 @@ def _get_comment_macro(self) -> Optional[str]:
return self.config.query_comment.comment


@dataclass
class DatabricksAdapterResponse(AdapterResponse):
query_id: str = ""


class DatabricksConnectionManager(SparkConnectionManager):
TYPE: str = "databricks"
credentials_provider: CredentialsProvider = None
Expand Down Expand Up @@ -636,7 +653,7 @@ def add_query(

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[AdapterResponse, Table]:
) -> Tuple[DatabricksAdapterResponse, Table]:
sql = self._add_query_comment(sql)
_, cursor = self.add_query(sql, auto_begin)
try:
Expand Down Expand Up @@ -751,6 +768,17 @@ def exponential_backoff(attempt: int) -> int:
retry_timeout=(timeout if timeout is not None else exponential_backoff),
)

@classmethod
def get_response(cls, cursor: DatabricksSQLCursorWrapper) -> DatabricksAdapterResponse:
_query_id = getattr(cursor, "hex_query_id", None)
if cursor is None:
logger.debug("No cursor was provided. Query ID not available.")
query_id = "N/A"
else:
query_id = _query_id
message = "OK"
return DatabricksAdapterResponse(_message=message, query_id=query_id) # type: ignore


def _log_dbsql_errors(exc: Exception) -> None:
if isinstance(exc, Error):
Expand Down
8 changes: 8 additions & 0 deletions tests/integration/run_results_json/models/models.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
version: 2
models:
- name: view_model
columns:
- name: id
tests:
- unique
- not_null
5 changes: 5 additions & 0 deletions tests/integration/run_results_json/models/simple_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
select 1 as id
union all
select 1 as id
union all
select null as id
49 changes: 49 additions & 0 deletions tests/integration/run_results_json/test_run_results_json.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
from tests.integration.base import DBTIntegrationTest, use_profile
import os
import json
import tempfile


class TestRunResultsJson(DBTIntegrationTest):
def setUp(self):
self.tempdir: tempfile.TemporaryDirectory = tempfile.TemporaryDirectory()
return super().setUp()

def tearDown(self):
self.tempdir.cleanup()
return super().tearDown()

@property
def project_config(self):
return {
"config-version": 2,
"models": {"materialized": "table"},
"target-path": self.tempdir.name,
}

@property
def models(self):
return "models"

@property
def schema(self):
return "test_results_json"

def run_and_check_for_query_id(self):
self.run_dbt(["run"])

_fhpath = os.path.join(self.tempdir.name, "run_results.json")
with open(_fhpath, "r") as results_json_raw:
results_json = json.load(results_json_raw)
self.assertIsNotNone(
results_json["results"][0]["adapter_response"].get("query_id"),
"Query ID column was not written to run_results.json",
)

@use_profile("databricks_sql_endpoint")
def test_run_results_json_databricks_sql_endpoint(self):
self.run_and_check_for_query_id()

@use_profile("databricks_uc_sql_endpoint")
def test_run_results_json_databricks_uc_sql_endpoint(self):
self.run_and_check_for_query_id()

0 comments on commit af2435d

Please sign in to comment.