Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-1114] remove Cache call from get_columns_in_relation #451

Merged
merged 6 commits into from
Sep 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changes/unreleased/Fixes-20220914-010520.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
kind: Fixes
body: change to get_columns_in_relation to fix cache inconsistencies to fix cache
issues in incremental models causing failure on on_schema_change
time: 2022-09-14T01:05:20.312981-05:00
custom:
Author: McKnight-42
Issue: "447"
PR: "451"
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ exclude: '^tests/.*'

# Force all unspecified python hooks to run python 3.8
default_language_version:
python: python3.8
python: python3

repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
Expand Down
42 changes: 13 additions & 29 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,36 +207,20 @@ def find_table_information_separator(rows: List[dict]) -> int:
return pos

def get_columns_in_relation(self, relation: Relation) -> List[SparkColumn]:
cached_relations = self.cache.get_relations(relation.database, relation.schema)
cached_relation = next(
(
cached_relation
for cached_relation in cached_relations
if str(cached_relation) == str(relation)
),
None,
)
columns = []
if cached_relation and cached_relation.information:
columns = self.parse_columns_from_information(cached_relation)
if not columns:
# in open source delta 'show table extended' query output doesnt
# return relation's schema. if columns are empty from cache,
# use get_columns_in_relation spark macro
# which would execute 'describe extended tablename' query
try:
rows: List[agate.Row] = self.execute_macro(
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation}
)
columns = self.parse_describe_extended(relation, rows)
except dbt.exceptions.RuntimeException as e:
# spark would throw error when table doesn't exist, where other
# CDW would just return and empty list, normalizing the behavior here
errmsg = getattr(e, "msg", "")
if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg:
pass
else:
raise e
try:
rows: List[agate.Row] = self.execute_macro(
GET_COLUMNS_IN_RELATION_RAW_MACRO_NAME, kwargs={"relation": relation}
)
columns = self.parse_describe_extended(relation, rows)
except dbt.exceptions.RuntimeException as e:
# spark would throw error when table doesn't exist, where other
# CDW would just return and empty list, normalizing the behavior here
errmsg = getattr(e, "msg", "")
if "Table or view not found" in errmsg or "NoSuchTableException" in errmsg:
pass
else:
raise e

# strip hudi metadata columns.
columns = [x for x in columns if x.name not in self.HUDI_METADATA_COLUMNS]
Expand Down