Skip to content

Commit

Permalink
Behavior: Get column info from information_schema Part I (#808)
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored Sep 27, 2024
1 parent 60b487d commit 41c164e
Show file tree
Hide file tree
Showing 14 changed files with 271 additions and 59 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- custom aliases for source and target tables can be specified and used in condition clauses;
- `matched` and `not matched` steps can now be skipped;
- Allow for the use of custom constraints, using the `custom` constraint type with an `expression` as the constraint (thanks @roydobbe). ([792](https://github.com/databricks/dbt-databricks/pull/792))
- Add "use_info_schema_for_columns" behavior flag to turn on use of information_schema to get column info where possible. This may have more latency but will not truncate complex data types the way that 'describe' can. ([808](https://github.com/databricks/dbt-databricks/pull/808))

### Under the Hood

Expand Down
71 changes: 71 additions & 0 deletions dbt/adapters/databricks/behaviors/columns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from abc import ABC, abstractmethod
from typing import List
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.databricks.column import DatabricksColumn
from dbt.adapters.databricks.relation import DatabricksRelation
from dbt.adapters.databricks.utils import handle_missing_objects
from dbt_common.utils.dict import AttrDict

GET_COLUMNS_COMMENTS_MACRO_NAME = "get_columns_comments"


class GetColumnsBehavior(ABC):
@classmethod
@abstractmethod
def get_columns_in_relation(
cls, adapter: SQLAdapter, relation: DatabricksRelation
) -> List[DatabricksColumn]:
pass

@staticmethod
def _get_columns_with_comments(
adapter: SQLAdapter, relation: DatabricksRelation, macro_name: str
) -> List[AttrDict]:
return list(
handle_missing_objects(
lambda: adapter.execute_macro(macro_name, kwargs={"relation": relation}),
AttrDict(),
)
)


class GetColumnsByDescribe(GetColumnsBehavior):
@classmethod
def get_columns_in_relation(
cls, adapter: SQLAdapter, relation: DatabricksRelation
) -> List[DatabricksColumn]:
rows = cls._get_columns_with_comments(adapter, relation, "get_columns_comments")
return cls._parse_columns(rows)

@classmethod
def _parse_columns(cls, rows: List[AttrDict]) -> List[DatabricksColumn]:
columns = []

for row in rows:
if row["col_name"].startswith("#"):
break
columns.append(
DatabricksColumn(
column=row["col_name"], dtype=row["data_type"], comment=row["comment"]
)
)

return columns


class GetColumnsByInformationSchema(GetColumnsByDescribe):
@classmethod
def get_columns_in_relation(
cls, adapter: SQLAdapter, relation: DatabricksRelation
) -> List[DatabricksColumn]:
if relation.is_hive_metastore() or relation.type == DatabricksRelation.View:
return super().get_columns_in_relation(adapter, relation)

rows = cls._get_columns_with_comments(
adapter, relation, "get_columns_comments_via_information_schema"
)
return cls._parse_columns(rows)

@classmethod
def _parse_columns(cls, rows: List[AttrDict]) -> List[DatabricksColumn]:
return [DatabricksColumn(column=row[0], dtype=row[1], comment=row[2]) for row in rows]
76 changes: 31 additions & 45 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from multiprocessing.context import SpawnContext
import os
import re
from abc import ABC
Expand All @@ -7,7 +8,6 @@
from contextlib import contextmanager
from dataclasses import dataclass
from typing import Any
from typing import Callable
from typing import cast
from typing import ClassVar
from typing import Dict
Expand All @@ -21,7 +21,6 @@
from typing import Tuple
from typing import Type
from typing import TYPE_CHECKING
from typing import TypeVar
from typing import Union

from dbt.adapters.base import AdapterConfig
Expand All @@ -37,6 +36,11 @@
from dbt.adapters.contracts.connection import Connection
from dbt.adapters.contracts.relation import RelationConfig
from dbt.adapters.contracts.relation import RelationType
from dbt.adapters.databricks.behaviors.columns import (
GetColumnsBehavior,
GetColumnsByDescribe,
GetColumnsByInformationSchema,
)
from dbt.adapters.databricks.column import DatabricksColumn
from dbt.adapters.databricks.connections import DatabricksConnectionManager
from dbt.adapters.databricks.connections import DatabricksDBTConnection
Expand All @@ -63,7 +67,7 @@
StreamingTableConfig,
)
from dbt.adapters.databricks.relation_configs.tblproperties import TblPropertiesConfig
from dbt.adapters.databricks.utils import get_first_row
from dbt.adapters.databricks.utils import get_first_row, handle_missing_objects
from dbt.adapters.databricks.utils import redact_credentials
from dbt.adapters.databricks.utils import undefined_proof
from dbt.adapters.relation_configs import RelationResults
Expand All @@ -73,8 +77,7 @@
from dbt.adapters.spark.impl import KEY_TABLE_STATISTICS
from dbt.adapters.spark.impl import LIST_SCHEMAS_MACRO_NAME
from dbt.adapters.spark.impl import SparkAdapter
from dbt.adapters.spark.impl import TABLE_OR_VIEW_NOT_FOUND_MESSAGES
from dbt_common.exceptions import DbtRuntimeError
from dbt_common.behavior_flags import BehaviorFlag
from dbt_common.utils import executor
from dbt_common.utils.dict import AttrDict

Expand All @@ -90,6 +93,15 @@
SHOW_VIEWS_MACRO_NAME = "show_views"
GET_COLUMNS_COMMENTS_MACRO_NAME = "get_columns_comments"

USE_INFO_SCHEMA_FOR_COLUMNS = BehaviorFlag(
name="use_info_schema_for_columns",
default=False,
description=(
"Use info schema to gather column information to ensure complex types are not truncated."
" Incurs some overhead, so disabled by default."
),
) # type: ignore[typeddict-item]


@dataclass
class DatabricksConfig(AdapterConfig):
Expand All @@ -116,26 +128,6 @@ class DatabricksConfig(AdapterConfig):
merge_with_schema_evolution: Optional[bool] = None


def check_not_found_error(errmsg: str) -> bool:
new_error = "[SCHEMA_NOT_FOUND]" in errmsg
old_error = re.match(r".*(Database).*(not found).*", errmsg, re.DOTALL)
found_msgs = (msg in errmsg for msg in TABLE_OR_VIEW_NOT_FOUND_MESSAGES)
return new_error or old_error is not None or any(found_msgs)


T = TypeVar("T")


def handle_missing_objects(exec: Callable[[], T], default: T) -> T:
try:
return exec()
except DbtRuntimeError as e:
errmsg = getattr(e, "msg", "")
if check_not_found_error(errmsg):
return default
raise e


def get_identifier_list_string(table_names: Set[str]) -> str:
"""Returns `"|".join(table_names)` by default.
Expand Down Expand Up @@ -175,6 +167,19 @@ class DatabricksAdapter(SparkAdapter):
}
)

get_column_behavior: GetColumnsBehavior

def __init__(self, config: Any, mp_context: SpawnContext) -> None:
super().__init__(config, mp_context)
if self.behavior.use_info_schema_for_columns.no_warn: # type: ignore[attr-defined]
self.get_column_behavior = GetColumnsByInformationSchema()
else:
self.get_column_behavior = GetColumnsByDescribe()

@property
def _behavior_flags(self) -> List[BehaviorFlag]:
return [USE_INFO_SCHEMA_FOR_COLUMNS]

# override/overload
def acquire_connection(
self, name: Optional[str] = None, query_header_context: Any = None
Expand Down Expand Up @@ -388,26 +393,7 @@ def parse_describe_extended( # type: ignore[override]
def get_columns_in_relation( # type: ignore[override]
self, relation: DatabricksRelation
) -> List[DatabricksColumn]:
rows = list(
handle_missing_objects(
lambda: self.execute_macro(
GET_COLUMNS_COMMENTS_MACRO_NAME, kwargs={"relation": relation}
),
AttrDict(),
)
)

columns = []
for row in rows:
if row["col_name"].startswith("#"):
break
columns.append(
DatabricksColumn(
column=row["col_name"], dtype=row["data_type"], comment=row["comment"]
)
)

return columns
return self.get_column_behavior.get_columns_in_relation(self, relation)

def _get_updated_relation(
self, relation: DatabricksRelation
Expand Down
22 changes: 22 additions & 0 deletions dbt/adapters/databricks/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from typing import TypeVar

from dbt.adapters.base import BaseAdapter
from dbt.adapters.spark.impl import TABLE_OR_VIEW_NOT_FOUND_MESSAGES
from dbt_common.exceptions import DbtRuntimeError
from jinja2 import Undefined

if TYPE_CHECKING:
Expand Down Expand Up @@ -92,3 +94,23 @@ def get_first_row(results: "Table") -> "Row":

return Row(values=set())
return results.rows[0]


def check_not_found_error(errmsg: str) -> bool:
new_error = "[SCHEMA_NOT_FOUND]" in errmsg
old_error = re.match(r".*(Database).*(not found).*", errmsg, re.DOTALL)
found_msgs = (msg in errmsg for msg in TABLE_OR_VIEW_NOT_FOUND_MESSAGES)
return new_error or old_error is not None or any(found_msgs)


T = TypeVar("T")


def handle_missing_objects(exec: Callable[[], T], default: T) -> T:
try:
return exec()
except DbtRuntimeError as e:
errmsg = getattr(e, "msg", "")
if check_not_found_error(errmsg):
return default
raise e
19 changes: 19 additions & 0 deletions dbt/include/databricks/macros/adapters/persist_docs.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,25 @@
{% do return(load_result('get_columns_comments').table) %}
{% endmacro %}

{% macro get_columns_comments_via_information_schema(relation) -%}
{% call statement('repair_table', fetch_result=False) -%}
REPAIR TABLE {{ relation|lower }} SYNC METADATA
{% endcall %}
{% call statement('get_columns_comments_via_information_schema', fetch_result=True) -%}
select
column_name,
full_data_type,
comment
from `system`.`information_schema`.`columns`
where
table_catalog = '{{ relation.database|lower }}' and
table_schema = '{{ relation.schema|lower }}' and
table_name = '{{ relation.identifier|lower }}'
{% endcall %}

{% do return(load_result('get_columns_comments_via_information_schema').table) %}
{% endmacro %}

{% macro databricks__persist_docs(relation, model, for_relation, for_columns) -%}
{%- if for_relation and config.persist_relation_docs() and model.description %}
{% do alter_table_comment(relation, model) %}
Expand Down
2 changes: 1 addition & 1 deletion dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ types-requests
types-mock
pre-commit

dbt-tests-adapter~=1.8.0
dbt-tests-adapter>=1.8.0, <2.0
5 changes: 3 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
databricks-sql-connector>=3.4.0, <3.5.0
dbt-spark~=1.8.0
dbt-core>=1.8.0, <2.0
dbt-adapters>=1.3.0, <2.0
dbt-core>=1.8.7, <2.0
dbt-common>=1.10.0, <2.0
dbt-adapters>=1.7.0, <2.0
databricks-sdk==0.17.0
keyring>=23.13.0
protobuf<5.0.0
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ def _get_plugin_version() -> str:
include_package_data=True,
install_requires=[
"dbt-spark>=1.8.0, <2.0",
"dbt-core>=1.8.0, <2.0",
"dbt-adapters>=1.3.0, <2.0",
"databricks-sql-connector>=3.2.0, <3.3.0",
"dbt-core>=1.8.7, <2.0",
"dbt-adapters>=1.7.0, <2.0",
"dbt-common>=1.10.0, <2.0",
"databricks-sql-connector>=3.4.0, <3.5.0",
"databricks-sdk==0.17.0",
"keyring>=23.13.0",
"pandas<2.2.0",
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


def pytest_addoption(parser):
parser.addoption("--profile", action="store", default="databricks_uc_sql_endpoint", type=str)
parser.addoption("--profile", action="store", default="databricks_uc_cluster", type=str)


# Using @pytest.mark.skip_profile('databricks_cluster') uses the 'skip_by_adapter_type'
Expand Down
25 changes: 25 additions & 0 deletions tests/functional/adapter/columns/fixtures.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
base_model = """
select struct('a', 1, 'b', 'b', 'c', ARRAY(1,2,3)) as struct_col, 'hello' as str_col
"""

schema = """
version: 2
models:
- name: base_model
config:
materialized: table
columns:
- name: struct_col
- name: str_col
"""

view_schema = """
version: 2
models:
- name: base_model
config:
materialized: view
columns:
- name: struct_col
- name: str_col
"""
Loading

0 comments on commit 41c164e

Please sign in to comment.