From bc18f8022e23c2fcdba84253d39be8f071c526e2 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Thu, 14 Jan 2021 13:26:47 +0100 Subject: [PATCH] PR feedback. Reorganize macros --- CHANGELOG.md | 5 +- README.md | 4 +- .../macros/materializations/incremental.sql | 154 ------------------ .../incremental/incremental.sql | 45 +++++ .../incremental/strategies.sql | 58 +++++++ .../materializations/incremental/validate.sql | 59 +++++++ 6 files changed, 165 insertions(+), 160 deletions(-) delete mode 100644 dbt/include/spark/macros/materializations/incremental.sql create mode 100644 dbt/include/spark/macros/materializations/incremental/incremental.sql create mode 100644 dbt/include/spark/macros/materializations/incremental/strategies.sql create mode 100644 dbt/include/spark/macros/materializations/incremental/validate.sql diff --git a/CHANGELOG.md b/CHANGELOG.md index 31499aa9e..485b88e94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,10 +2,7 @@ ### Breaking changes - Users of the `http` and `thrift` connection methods need to install extra requirements: `pip install dbt-spark[PyHive]` ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/109), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/126)) -- Incremental models have `incremental_strategy: append` by default. This strategy adds new records -without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending -on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error -if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141)) +- Incremental models have `incremental_strategy: append` by default. This strategy adds new records without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141)) ### Under the hood - Enable `CREATE OR REPLACE` support when using Delta. Instead of dropping and recreating the table, it will keep the existing table, and add a new version as supported by Delta. This will ensure that the table stays available when running the pipeline, and you can track the history. diff --git a/README.md b/README.md index 38a1e8ec4..09e716b5c 100644 --- a/README.md +++ b/README.md @@ -169,7 +169,7 @@ The following configurations can be supplied to models run with the dbt-spark pl dbt has a number of ways to build models incrementally, called "incremental strategies." Some strategies depend on certain file formats, connection types, and other model configurations: - `append` (default): Insert new records without updating or overwriting any existing data. -- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta`. Not available on Databricks SQL Endpoints. For atomic replacement of Delta tables, use the `table` materialization.] +- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta` or when connectinng via Databricks SQL Endpoints. For atomic replacement of Delta tables, use the `table` materialization instead.] - `merge`: Match records based on a `unique_key`; update old records, insert new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.) [Requires `file_format: delta`. Available only on Databricks Runtime.] Examples: @@ -177,7 +177,7 @@ Examples: ```sql {{ config( materialized='incremental', - incremental_strategy='append' + incremental_strategy='append', ) }} diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql deleted file mode 100644 index bb6d5dc0c..000000000 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ /dev/null @@ -1,154 +0,0 @@ -{% macro get_insert_overwrite_sql(source_relation, target_relation) %} - - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} - {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - insert overwrite table {{ target_relation }} - {{ partition_cols(label="partition") }} - select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} - -{% endmacro %} - - -{% macro get_insert_into_sql(source_relation, target_relation) %} - - {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} - {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} - insert into table {{ target_relation }} - select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} - -{% endmacro %} - - -{% macro dbt_spark_validate_get_file_format() %} - {#-- Find and validate the file format #} - {%- set file_format = config.get("file_format", default="parquet") -%} - - {% set invalid_file_format_msg -%} - Invalid file format provided: {{ file_format }} - Expected one of: 'text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm' - {%- endset %} - - {% if file_format not in ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %} - {% do exceptions.raise_compiler_error(invalid_file_format_msg) %} - {% endif %} - - {% do return(file_format) %} -{% endmacro %} - - -{% macro dbt_spark_validate_get_incremental_strategy(file_format) %} - {#-- Find and validate the incremental strategy #} - {%- set strategy = config.get("incremental_strategy", default="append") -%} - - {% set invalid_strategy_msg -%} - Invalid incremental strategy provided: {{ strategy }} - Expected one of: 'append', 'merge', 'insert_overwrite' - {%- endset %} - - {% set invalid_merge_msg -%} - Invalid incremental strategy provided: {{ strategy }} - You can only choose this strategy when file_format is set to 'delta' - {%- endset %} - - {% set invalid_insert_overwrite_delta_msg -%} - Invalid incremental strategy provided: {{ strategy }} - You cannot use this strategy when file_format is set to 'delta' - Use the 'append' or 'merge' strategy instead - {%- endset %} - - {% set invalid_insert_overwrite_endpoint_msg -%} - Invalid incremental strategy provided: {{ strategy }} - You cannot use this strategy when connecting via endpoint - Use the 'append' or 'merge' strategy instead - {%- endset %} - - {% if strategy not in ['append', 'merge', 'insert_overwrite'] %} - {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} - {%-else %} - {% if strategy == 'merge' and file_format != 'delta' %} - {% do exceptions.raise_compiler_error(invalid_merge_msg) %} - {% endif %} - {% if strategy == 'insert_overwrite' and file_format == 'delta' %} - {% do exceptions.raise_compiler_error(invalid_insert_overwrite_delta_msg) %} - {% endif %} - {% if strategy == 'insert_overwrite' and target.endpoint %} - {% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %} - {% endif %} - {% endif %} - - {% do return(strategy) %} -{% endmacro %} - - -{% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %} - {# ignore dest_columns - we will just use `*` #} - - {% set merge_condition %} - {% if unique_key %} - on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} - {% else %} - on false - {% endif %} - {% endset %} - - merge into {{ target }} as DBT_INTERNAL_DEST - using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE - {{ merge_condition }} - when matched then update set * - when not matched then insert * -{% endmacro %} - - -{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %} - {%- if strategy == 'append' -%} - {#-- insert new records into existing table, without updating or overwriting #} - {{ get_insert_into_sql(source, target) }} - {%- elif strategy == 'insert_overwrite' -%} - {#-- insert statements don't like CTEs, so support them via a temp view #} - {{ get_insert_overwrite_sql(source, target) }} - {%- elif strategy == 'merge' -%} - {#-- merge all columns with databricks delta - schema changes are handled for us #} - {{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }} - {%- endif -%} - -{% endmacro %} - - -{% materialization incremental, adapter='spark' -%} - {#-- Validate early so we don't run SQL if the file_format is invalid --#} - {%- set file_format = dbt_spark_validate_get_file_format() -%} - {#-- Validate early so we don't run SQL if the strategy is invalid --#} - {%- set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%} - {%- set unique_key = config.get('unique_key', none) -%} - - {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} - - {% set target_relation = this %} - {% set existing_relation = load_relation(this) %} - {% set tmp_relation = make_temp_relation(this) %} - - {% if strategy == 'insert_overwrite' and config.get('partition_by') %} - set spark.sql.sources.partitionOverwriteMode = DYNAMIC - {% endif %} - - {{ run_hooks(pre_hooks) }} - - {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% elif existing_relation.is_view or full_refresh_mode %} - {% do adapter.drop_relation(existing_relation) %} - {% set build_sql = create_table_as(False, target_relation, sql) %} - {% else %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} - {% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %} - {% endif %} - - {%- call statement('main') -%} - {{ build_sql }} - {%- endcall -%} - - {{ run_hooks(post_hooks) }} - - {{ return({'relations': [target_relation]}) }} - -{%- endmaterialization %} diff --git a/dbt/include/spark/macros/materializations/incremental/incremental.sql b/dbt/include/spark/macros/materializations/incremental/incremental.sql new file mode 100644 index 000000000..b11990b36 --- /dev/null +++ b/dbt/include/spark/macros/materializations/incremental/incremental.sql @@ -0,0 +1,45 @@ +{% materialization incremental, adapter='spark' -%} + + {#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#} + {%- set raw_file_format = config.get('file_format', default='parquet') -%} + {%- set raw_strategy = config.get('incremental_strategy', default='append') -%} + + {%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%} + {%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%} + + {%- set unique_key = config.get('unique_key', none) -%} + {%- set partition_by = config.get('partition_by', none) -%} + + {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} + + {% set target_relation = this %} + {% set existing_relation = load_relation(this) %} + {% set tmp_relation = make_temp_relation(this) %} + + {% if strategy == 'insert_overwrite' and partition_by %} + {% call statement() %} + set spark.sql.sources.partitionOverwriteMode = DYNAMIC + {% endcall %} + {% endif %} + + {{ run_hooks(pre_hooks) }} + + {% if existing_relation is none %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% elif existing_relation.is_view or full_refresh_mode %} + {% do adapter.drop_relation(existing_relation) %} + {% set build_sql = create_table_as(False, target_relation, sql) %} + {% else %} + {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %} + {% endif %} + + {%- call statement('main') -%} + {{ build_sql }} + {%- endcall -%} + + {{ run_hooks(post_hooks) }} + + {{ return({'relations': [target_relation]}) }} + +{%- endmaterialization %} diff --git a/dbt/include/spark/macros/materializations/incremental/strategies.sql b/dbt/include/spark/macros/materializations/incremental/strategies.sql new file mode 100644 index 000000000..d3ffafc10 --- /dev/null +++ b/dbt/include/spark/macros/materializations/incremental/strategies.sql @@ -0,0 +1,58 @@ +{% macro get_insert_overwrite_sql(source_relation, target_relation) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + insert overwrite table {{ target_relation }} + {{ partition_cols(label="partition") }} + select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} + +{% endmacro %} + + +{% macro get_insert_into_sql(source_relation, target_relation) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + insert into table {{ target_relation }} + select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} + +{% endmacro %} + + +{% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %} + {# ignore dest_columns - we will just use `*` #} + + {% set merge_condition %} + {% if unique_key %} + on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% else %} + on false + {% endif %} + {% endset %} + + merge into {{ target }} as DBT_INTERNAL_DEST + using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE + {{ merge_condition }} + when matched then update set * + when not matched then insert * +{% endmacro %} + + +{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %} + {%- if strategy == 'append' -%} + {#-- insert new records into existing table, without updating or overwriting #} + {{ get_insert_into_sql(source, target) }} + {%- elif strategy == 'insert_overwrite' -%} + {#-- insert statements don't like CTEs, so support them via a temp view #} + {{ get_insert_overwrite_sql(source, target) }} + {%- elif strategy == 'merge' -%} + {#-- merge all columns with databricks delta - schema changes are handled for us #} + {{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }} + {%- else -%} + {% set no_sql_for_strategy_msg -%} + No known SQL for the incremental strategy provided: {{ strategy }} + {%- endset %} + {%- do exceptions.raise_compiler_error(no_sql_for_strategy_msg) -%} + {%- endif -%} + +{% endmacro %} diff --git a/dbt/include/spark/macros/materializations/incremental/validate.sql b/dbt/include/spark/macros/materializations/incremental/validate.sql new file mode 100644 index 000000000..400a2eee5 --- /dev/null +++ b/dbt/include/spark/macros/materializations/incremental/validate.sql @@ -0,0 +1,59 @@ +{% macro dbt_spark_validate_get_file_format(raw_file_format) %} + {#-- Validate the file format #} + + {% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %} + + {% set invalid_file_format_msg -%} + Invalid file format provided: {{ raw_file_format }} + Expected one of: {{ accepted_formats | join(', ') }} + {%- endset %} + + {% if raw_file_format not in accepted_formats %} + {% do exceptions.raise_compiler_error(invalid_file_format_msg) %} + {% endif %} + + {% do return(raw_file_format) %} +{% endmacro %} + + +{% macro dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) %} + {#-- Validate the incremental strategy #} + + {% set invalid_strategy_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + Expected one of: 'append', 'merge', 'insert_overwrite' + {%- endset %} + + {% set invalid_merge_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + You can only choose this strategy when file_format is set to 'delta' + {%- endset %} + + {% set invalid_insert_overwrite_delta_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + You cannot use this strategy when file_format is set to 'delta' + Use the 'append' or 'merge' strategy instead + {%- endset %} + + {% set invalid_insert_overwrite_endpoint_msg -%} + Invalid incremental strategy provided: {{ raw_strategy }} + You cannot use this strategy when connecting via endpoint + Use the 'append' or 'merge' strategy instead + {%- endset %} + + {% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %} + {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} + {%-else %} + {% if raw_strategy == 'merge' and file_format != 'delta' %} + {% do exceptions.raise_compiler_error(invalid_merge_msg) %} + {% endif %} + {% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %} + {% do exceptions.raise_compiler_error(invalid_insert_overwrite_delta_msg) %} + {% endif %} + {% if raw_strategy == 'insert_overwrite' and target.endpoint %} + {% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %} + {% endif %} + {% endif %} + + {% do return(raw_strategy) %} +{% endmacro %}