From f190664e868811bd6c962cefdeb550f648019215 Mon Sep 17 00:00:00 2001 From: brunofaustino Date: Fri, 4 Aug 2023 04:40:44 -0300 Subject: [PATCH 1/3] feat: add catalog_name param to AthenaConnection handle (#358) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jérémy Guiselin <9251353+Jrmyy@users.noreply.github.com> --- dbt/adapters/athena/connections.py | 1 + 1 file changed, 1 insertion(+) diff --git a/dbt/adapters/athena/connections.py b/dbt/adapters/athena/connections.py index 40d4c855..67e2e71e 100644 --- a/dbt/adapters/athena/connections.py +++ b/dbt/adapters/athena/connections.py @@ -203,6 +203,7 @@ def open(cls, connection: Connection) -> Connection: handle = AthenaConnection( s3_staging_dir=creds.s3_staging_dir, endpoint_url=creds.endpoint_url, + catalog_name=creds.database, schema_name=creds.schema, work_group=creds.work_group, cursor_class=AthenaCursor, From 8d2a80c3ce7fb5c21111673ed194385f35b37fce Mon Sep 17 00:00:00 2001 From: Pavel Roslovets Date: Fri, 4 Aug 2023 09:52:11 +0200 Subject: [PATCH 2/3] fix: enable persist_docs for views (#337) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jérémy Guiselin <9251353+Jrmyy@users.noreply.github.com> Co-authored-by: Serhii Dimchenko <39801237+svdimchenko@users.noreply.github.com> --- README.md | 1 - dbt/adapters/athena/impl.py | 78 ++++++++++++++----- .../athena/macros/adapters/persist_docs.sql | 6 +- .../materializations/models/view/view.sql | 1 + 4 files changed, 65 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 035ed10a..ccc82e72 100644 --- a/README.md +++ b/README.md @@ -25,7 +25,6 @@ * Does **not** support the use of `unique_key` * Supports [snapshots][snapshots] * Does not support [Python models][python-models] -* Does not support [persist docs][persist-docs] for views [seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds [incremental]: https://docs.getdbt.com/docs/build/incremental-models diff --git a/dbt/adapters/athena/impl.py b/dbt/adapters/athena/impl.py index 9ef979ec..07a82769 100755 --- a/dbt/adapters/athena/impl.py +++ b/dbt/adapters/athena/impl.py @@ -15,6 +15,7 @@ from mypy_boto3_glue.type_defs import ( ColumnTypeDef, GetTableResponseTypeDef, + TableInputTypeDef, TableTypeDef, TableVersionTypeDef, ) @@ -636,35 +637,66 @@ def persist_docs_to_glue( model: Dict[str, Any], persist_relation_docs: bool = False, persist_column_docs: bool = False, + skip_archive_table_version: bool = False, ) -> None: + """Save model/columns description to Glue Table metadata. + + :param skip_archive_table_version: if True, current table version will not be archived before creating new one. + The purpose is to avoid creating redundant table version if it already was created during the same dbt run + after CREATE OR REPLACE VIEW or ALTER TABLE statements. + Every dbt run should create not more than one table version. + """ conn = self.connections.get_thread_connection() client = conn.handle with boto3_client_lock: glue_client = client.session.client("glue", region_name=client.region_name, config=get_boto3_config()) - table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name).get("Table") - updated_table = { - "Name": table["Name"], - "StorageDescriptor": table["StorageDescriptor"], - "PartitionKeys": table.get("PartitionKeys", []), - "TableType": table["TableType"], - "Parameters": table.get("Parameters", {}), - "Description": table.get("Description", ""), - } + # By default, there is no need to update Glue Table + need_udpate_table = False + # Get Table from Glue + table = glue_client.get_table(DatabaseName=relation.schema, Name=relation.name)["Table"] + # Prepare new version of Glue Table picking up significant fields + updated_table = self._get_table_input(table) + # Update table description if persist_relation_docs: - table_comment = clean_sql_comment(model["description"]) - updated_table["Description"] = table_comment - updated_table["Parameters"]["comment"] = table_comment - + # Prepare dbt description + clean_table_description = clean_sql_comment(model["description"]) + # Get current description from Glue + glue_table_description = table.get("Description", "") + # Get current description parameter from Glue + glue_table_comment = table["Parameters"].get("comment", "") + # Update description if it's different + if clean_table_description != glue_table_description or clean_table_description != glue_table_comment: + updated_table["Description"] = clean_table_description + updated_table_parameters: Dict[str, str] = dict(updated_table["Parameters"]) + updated_table_parameters["comment"] = clean_table_description + updated_table["Parameters"] = updated_table_parameters + need_udpate_table = True + + # Update column comments if persist_column_docs: + # Process every column for col_obj in updated_table["StorageDescriptor"]["Columns"]: + # Get column description from dbt col_name = col_obj["Name"] - col_comment = model["columns"].get(col_name, {}).get("description") - if col_comment: - col_obj["Comment"] = clean_sql_comment(col_comment) - - glue_client.update_table(DatabaseName=relation.schema, TableInput=updated_table) + if col_name in model["columns"]: + col_comment = model["columns"][col_name]["description"] + # Prepare column description from dbt + clean_col_comment = clean_sql_comment(col_comment) + # Get current column comment from Glue + glue_col_comment = col_obj.get("Comment", "") + # Update column description if it's different + if glue_col_comment != clean_col_comment: + col_obj["Comment"] = clean_col_comment + need_udpate_table = True + + # Update Glue Table only if table/column description is modified. + # It prevents redundant schema version creating after incremental runs. + if need_udpate_table: + glue_client.update_table( + DatabaseName=relation.schema, TableInput=updated_table, SkipArchive=skip_archive_table_version + ) @available def list_schemas(self, database: str) -> List[str]: @@ -833,3 +865,13 @@ def is_list(self, value: Any) -> bool: a list since this is complicated with purely Jinja syntax. """ return isinstance(value, list) + + @staticmethod + def _get_table_input(table: TableTypeDef) -> TableInputTypeDef: + """ + Prepare Glue Table dictionary to be a table_input argument of update_table() method. + + This is needed because update_table() does not accept some read-only fields of table dictionary + returned by get_table() method. + """ + return {k: v for k, v in table.items() if k in TableInputTypeDef.__annotations__} diff --git a/dbt/include/athena/macros/adapters/persist_docs.sql b/dbt/include/athena/macros/adapters/persist_docs.sql index 2a7b8748..78503ba9 100644 --- a/dbt/include/athena/macros/adapters/persist_docs.sql +++ b/dbt/include/athena/macros/adapters/persist_docs.sql @@ -1,10 +1,12 @@ {% macro athena__persist_docs(relation, model, for_relation, for_columns) -%} {% set persist_relation_docs = for_relation and config.persist_relation_docs() and model.description %} {% set persist_column_docs = for_columns and config.persist_column_docs() and model.columns %} - {% if (persist_relation_docs or persist_column_docs) and relation.type != 'view' %} + {% set skip_archive_table_version = not is_incremental() %} + {% if persist_relation_docs or persist_column_docs %} {% do adapter.persist_docs_to_glue(relation, model, persist_relation_docs, - persist_column_docs) %}} + persist_column_docs, + skip_archive_table_version=skip_archive_table_version) %}} {% endif %} {% endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/view/view.sql b/dbt/include/athena/macros/materializations/models/view/view.sql index 1cf3b337..3b1a4a89 100644 --- a/dbt/include/athena/macros/materializations/models/view/view.sql +++ b/dbt/include/athena/macros/materializations/models/view/view.sql @@ -2,6 +2,7 @@ {% set to_return = create_or_replace_view(run_outside_transaction_hooks=False) %} {% set target_relation = this.incorporate(type='view') %} + {% do persist_docs(target_relation, model) %} {% do return(to_return) %} {%- endmaterialization %} From 3c1c6c1c62fbd27943633845ff5e30c81f5ddbef Mon Sep 17 00:00:00 2001 From: Oleksandr Kot Date: Fri, 4 Aug 2023 13:54:18 +0300 Subject: [PATCH 3/3] feat: add iceberg sync column types support (#304) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Oleksandr Kot Co-authored-by: Serhii Dimchenko <39801237+svdimchenko@users.noreply.github.com> Co-authored-by: Jérémy Guiselin <9251353+Jrmyy@users.noreply.github.com> --- .../models/incremental/column_helpers.sql | 8 +++ .../models/incremental/on_schema_change.sql | 51 +++++++++++++++++++ .../athena/macros/utils/ddl_dml_data_type.sql | 9 +++- 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/dbt/include/athena/macros/materializations/models/incremental/column_helpers.sql b/dbt/include/athena/macros/materializations/models/incremental/column_helpers.sql index 53b9bfc0..e8263295 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/column_helpers.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/column_helpers.sql @@ -48,3 +48,11 @@ {{ return(run_query(sql)) }} {% endif %} {% endmacro %} + +{% macro alter_relation_rename_column(relation, source_column, target_column, target_column_type) -%} + {% set sql -%} + alter {{ relation.type }} {{ relation.render_pure() }} + change column {{ source_column }} {{ target_column }} {{ ddl_data_type(target_column_type) }} + {%- endset -%} + {{ return(run_query(sql)) }} +{% endmacro %} diff --git a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql index 3d9f8137..ef641b70 100644 --- a/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql +++ b/dbt/include/athena/macros/materializations/models/incremental/on_schema_change.sql @@ -13,12 +13,32 @@ {%- set remove_from_target_arr = schema_changes_dict['target_not_in_source'] -%} {%- set new_target_types = schema_changes_dict['new_target_types'] -%} {% if table_type == 'iceberg' %} + {# + If last run of alter_column_type was failed on rename tmp column to origin. + Do rename to protect origin column from deletion and losing data. + #} + {% for remove_col in remove_from_target_arr if remove_col.column.endswith('__dbt_alter') %} + {%- set origin_col_name = remove_col.column | replace('__dbt_alter', '') -%} + {% for add_col in add_to_target_arr if add_col.column == origin_col_name %} + {%- do alter_relation_rename_column(target_relation, remove_col.name, add_col.name, add_col.data_type) -%} + {%- do remove_from_target_arr.remove(remove_col) -%} + {%- do add_to_target_arr.remove(add_col) -%} + {% endfor %} + {% endfor %} + {% if add_to_target_arr | length > 0 %} {%- do alter_relation_add_columns(target_relation, add_to_target_arr) -%} {% endif %} {% if remove_from_target_arr | length > 0 %} {%- do alter_relation_drop_columns(target_relation, remove_from_target_arr) -%} {% endif %} + {% if new_target_types != [] %} + {% for ntt in new_target_types %} + {% set column_name = ntt['column_name'] %} + {% set new_type = ntt['new_type'] %} + {% do alter_column_type(target_relation, column_name, new_type) %} + {% endfor %} + {% endif %} {% else %} {%- set replace_with_target_arr = remove_partitions_from_columns(schema_changes_dict['source_columns'], partitioned_by) -%} {% if add_to_target_arr | length > 0 or remove_from_target_arr | length > 0 or new_target_types | length > 0 %} @@ -35,3 +55,34 @@ {% endset %} {% do log(schema_change_message) %} {% endmacro %} + +{% macro athena__alter_column_type(relation, column_name, new_column_type) -%} + {# + 1. Create a new column (w/ temp name and correct type) + 2. Copy data over to it + 3. Drop the existing column + 4. Rename the new column to existing column + #} + {%- set tmp_column = column_name + '__dbt_alter' -%} + {%- set new_ddl_data_type = ddl_data_type(new_column_type) -%} + + {#- do alter_relation_add_columns(relation, [ tmp_column ]) -#} + {%- set add_column_query -%} + alter {{ relation.type }} {{ relation.render_pure() }} add columns({{ tmp_column }} {{ new_ddl_data_type }}); + {%- endset -%} + {%- do run_query(add_column_query) -%} + + {%- set update_query -%} + update {{ relation.render_pure() }} set {{ tmp_column }} = cast({{ column_name }} as {{ new_column_type }}); + {%- endset -%} + {%- do run_query(update_query) -%} + + {#- do alter_relation_drop_columns(relation, [ column_name ]) -#} + {%- set drop_column_query -%} + alter {{ relation.type }} {{ relation.render_pure() }} drop column {{ column_name }}; + {%- endset -%} + {%- do run_query(drop_column_query) -%} + + {%- do alter_relation_rename_column(relation, tmp_column, column_name, new_column_type) -%} + +{% endmacro %} diff --git a/dbt/include/athena/macros/utils/ddl_dml_data_type.sql b/dbt/include/athena/macros/utils/ddl_dml_data_type.sql index a97c489c..ec726627 100644 --- a/dbt/include/athena/macros/utils/ddl_dml_data_type.sql +++ b/dbt/include/athena/macros/utils/ddl_dml_data_type.sql @@ -1,7 +1,9 @@ {# Athena has different types between DML and DDL #} {# ref: https://docs.aws.amazon.com/athena/latest/ug/data-types.html #} {% macro ddl_data_type(col_type) -%} - -- transform varchar + {%- set table_type = config.get('table_type', 'hive') -%} + + -- transform varchar {% set re = modules.re %} {% set data_type = re.sub('(?:varchar|character varying)(?:\(\d+\))?', 'string', col_type) %} @@ -15,6 +17,11 @@ {% set data_type = data_type.replace('integer', 'int') -%} {%- endif -%} + -- transform timestamp + {%- if table_type == 'iceberg' and 'timestamp' in data_type -%} + {% set data_type = 'timestamp' -%} + {%- endif -%} + {{ return(data_type) }} {% endmacro %}