Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
C123R authored Aug 4, 2023
2 parents 2cad1dc + 3c1c6c1 commit 4431eef
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 22 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
* Does **not** support the use of `unique_key`
* Supports [snapshots][snapshots]
* Does not support [Python models][python-models]
* Does not support [persist docs][persist-docs] for views

[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds
[incremental]: https://docs.getdbt.com/docs/build/incremental-models
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ def open(cls, connection: Connection) -> Connection:
handle = AthenaConnection(
s3_staging_dir=creds.s3_staging_dir,
endpoint_url=creds.endpoint_url,
catalog_name=creds.database,
schema_name=creds.schema,
work_group=creds.work_group,
cursor_class=AthenaCursor,
Expand Down
78 changes: 60 additions & 18 deletions dbt/adapters/athena/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from mypy_boto3_glue.type_defs import (
ColumnTypeDef,
GetTableResponseTypeDef,
TableInputTypeDef,
TableTypeDef,
TableVersionTypeDef,
)
Expand Down Expand Up @@ -636,35 +637,66 @@ def persist_docs_to_glue(
model: Dict[str, Any],
persist_relation_docs: bool = False,
persist_column_docs: bool = False,
skip_archive_table_version: bool = False,
) -> None:
"""Save model/columns description to Glue Table metadata.
:param skip_archive_table_version: if True, current table version will not be archived before creating new one.
The purpose is to avoid creating redundant table version if it already was created during the same dbt run
after CREATE OR REPLACE VIEW or ALTER TABLE statements.
Every dbt run should create not more than one table version.
"""
conn = self.connections.get_thread_connection()
client = conn.handle

with boto3_client_lock:
glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config())

table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name).get("Table")
updated_table = {
"Name": table["Name"],
"StorageDescriptor": table["StorageDescriptor"],
"PartitionKeys": table.get("PartitionKeys", []),
"TableType": table["TableType"],
"Parameters": table.get("Parameters", {}),
"Description": table.get("Description", ""),
}
# By default, there is no need to update Glue Table
need_udpate_table = False
# Get Table from Glue
table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name)["Table"]
# Prepare new version of Glue Table picking up significant fields
updated_table = self._get_table_input(table)
# Update table description
if persist_relation_docs:
table_comment = clean_sql_comment(model["description"])
updated_table["Description"] = table_comment
updated_table["Parameters"]["comment"] = table_comment

# Prepare dbt description
clean_table_description = clean_sql_comment(model["description"])
# Get current description from Glue
glue_table_description = table.get("Description", "")
# Get current description parameter from Glue
glue_table_comment = table["Parameters"].get("comment", "")
# Update description if it's different
if clean_table_description != glue_table_description or clean_table_description != glue_table_comment:
updated_table["Description"] = clean_table_description
updated_table_parameters: Dict[str, str] = dict(updated_table["Parameters"])
updated_table_parameters["comment"] = clean_table_description
updated_table["Parameters"] = updated_table_parameters
need_udpate_table = True

# Update column comments
if persist_column_docs:
# Process every column
for col_obj in updated_table["StorageDescriptor"]["Columns"]:
# Get column description from dbt
col_name = col_obj["Name"]
col_comment = model["columns"].get(col_name, {}).get("description")
if col_comment:
col_obj["Comment"] = clean_sql_comment(col_comment)

glue_client.update_table(DatabaseName=relation.schema, TableInput=updated_table)
if col_name in model["columns"]:
col_comment = model["columns"][col_name]["description"]
# Prepare column description from dbt
clean_col_comment = clean_sql_comment(col_comment)
# Get current column comment from Glue
glue_col_comment = col_obj.get("Comment", "")
# Update column description if it's different
if glue_col_comment != clean_col_comment:
col_obj["Comment"] = clean_col_comment
need_udpate_table = True

# Update Glue Table only if table/column description is modified.
# It prevents redundant schema version creating after incremental runs.
if need_udpate_table:
glue_client.update_table(
DatabaseName=relation.schema, TableInput=updated_table, SkipArchive=skip_archive_table_version
)

@available
def list_schemas(self, database: str) -> List[str]:
Expand Down Expand Up @@ -833,3 +865,13 @@ def is_list(self, value: Any) -> bool:
a list since this is complicated with purely Jinja syntax.
"""
return isinstance(value, list)

@staticmethod
def _get_table_input(table: TableTypeDef) -> TableInputTypeDef:
"""
Prepare Glue Table dictionary to be a table_input argument of update_table() method.
This is needed because update_table() does not accept some read-only fields of table dictionary
returned by get_table() method.
"""
return {k: v for k, v in table.items() if k in TableInputTypeDef.__annotations__}
6 changes: 4 additions & 2 deletions dbt/include/athena/macros/adapters/persist_docs.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
{% macro athena__persist_docs(relation, model, for_relation, for_columns) -%}
{% set persist_relation_docs = for_relation and config.persist_relation_docs() and model.description %}
{% set persist_column_docs = for_columns and config.persist_column_docs() and model.columns %}
{% if (persist_relation_docs or persist_column_docs) and relation.type != 'view' %}
{% set skip_archive_table_version = not is_incremental() %}
{% if persist_relation_docs or persist_column_docs %}
{% do adapter.persist_docs_to_glue(relation,
model,
persist_relation_docs,
persist_column_docs) %}}
persist_column_docs,
skip_archive_table_version=skip_archive_table_version) %}}
{% endif %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,11 @@
{{ return(run_query(sql)) }}
{% endif %}
{% endmacro %}

{% macro alter_relation_rename_column(relation, source_column, target_column, target_column_type) -%}
{% set sql -%}
alter {{ relation.type }} {{ relation.render_pure() }}
change column {{ source_column }} {{ target_column }} {{ ddl_data_type(target_column_type) }}
{%- endset -%}
{{ return(run_query(sql)) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,32 @@
{%- set remove_from_target_arr = schema_changes_dict['target_not_in_source'] -%}
{%- set new_target_types = schema_changes_dict['new_target_types'] -%}
{% if table_type == 'iceberg' %}
{#
If last run of alter_column_type was failed on rename tmp column to origin.
Do rename to protect origin column from deletion and losing data.
#}
{% for remove_col in remove_from_target_arr if remove_col.column.endswith('__dbt_alter') %}
{%- set origin_col_name = remove_col.column | replace('__dbt_alter', '') -%}
{% for add_col in add_to_target_arr if add_col.column == origin_col_name %}
{%- do alter_relation_rename_column(target_relation, remove_col.name, add_col.name, add_col.data_type) -%}
{%- do remove_from_target_arr.remove(remove_col) -%}
{%- do add_to_target_arr.remove(add_col) -%}
{% endfor %}
{% endfor %}

{% if add_to_target_arr | length > 0 %}
{%- do alter_relation_add_columns(target_relation, add_to_target_arr) -%}
{% endif %}
{% if remove_from_target_arr | length > 0 %}
{%- do alter_relation_drop_columns(target_relation, remove_from_target_arr) -%}
{% endif %}
{% if new_target_types != [] %}
{% for ntt in new_target_types %}
{% set column_name = ntt['column_name'] %}
{% set new_type = ntt['new_type'] %}
{% do alter_column_type(target_relation, column_name, new_type) %}
{% endfor %}
{% endif %}
{% else %}
{%- set replace_with_target_arr = remove_partitions_from_columns(schema_changes_dict['source_columns'], partitioned_by) -%}
{% if add_to_target_arr | length > 0 or remove_from_target_arr | length > 0 or new_target_types | length > 0 %}
Expand All @@ -35,3 +55,34 @@
{% endset %}
{% do log(schema_change_message) %}
{% endmacro %}

{% macro athena__alter_column_type(relation, column_name, new_column_type) -%}
{#
1. Create a new column (w/ temp name and correct type)
2. Copy data over to it
3. Drop the existing column
4. Rename the new column to existing column
#}
{%- set tmp_column = column_name + '__dbt_alter' -%}
{%- set new_ddl_data_type = ddl_data_type(new_column_type) -%}

{#- do alter_relation_add_columns(relation, [ tmp_column ]) -#}
{%- set add_column_query -%}
alter {{ relation.type }} {{ relation.render_pure() }} add columns({{ tmp_column }} {{ new_ddl_data_type }});
{%- endset -%}
{%- do run_query(add_column_query) -%}

{%- set update_query -%}
update {{ relation.render_pure() }} set {{ tmp_column }} = cast({{ column_name }} as {{ new_column_type }});
{%- endset -%}
{%- do run_query(update_query) -%}

{#- do alter_relation_drop_columns(relation, [ column_name ]) -#}
{%- set drop_column_query -%}
alter {{ relation.type }} {{ relation.render_pure() }} drop column {{ column_name }};
{%- endset -%}
{%- do run_query(drop_column_query) -%}

{%- do alter_relation_rename_column(relation, tmp_column, column_name, new_column_type) -%}

{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{% set to_return = create_or_replace_view(run_outside_transaction_hooks=False) %}

{% set target_relation = this.incorporate(type='view') %}
{% do persist_docs(target_relation, model) %}

{% do return(to_return) %}
{%- endmaterialization %}
9 changes: 8 additions & 1 deletion dbt/include/athena/macros/utils/ddl_dml_data_type.sql
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
{# Athena has different types between DML and DDL #}
{# ref: https://docs.aws.amazon.com/athena/latest/ug/data-types.html #}
{% macro ddl_data_type(col_type) -%}
-- transform varchar
{%- set table_type = config.get('table_type', 'hive') -%}

-- transform varchar
{% set re = modules.re %}
{% set data_type = re.sub('(?:varchar|character varying)(?:\(\d+\))?', 'string', col_type) %}

Expand All @@ -15,6 +17,11 @@
{% set data_type = data_type.replace('integer', 'int') -%}
{%- endif -%}

-- transform timestamp
{%- if table_type == 'iceberg' and 'timestamp' in data_type -%}
{% set data_type = 'timestamp' -%}
{%- endif -%}

{{ return(data_type) }}
{% endmacro %}

Expand Down

0 comments on commit 4431eef

Please sign in to comment.