Skip to content

Commit

Permalink
PR feedback. Reorganize macros
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Jan 14, 2021
1 parent c8e3770 commit bc18f80
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 160 deletions.
5 changes: 1 addition & 4 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

### Breaking changes
- Users of the `http` and `thrift` connection methods need to install extra requirements: `pip install dbt-spark[PyHive]` ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/109), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/126))
- Incremental models have `incremental_strategy: append` by default. This strategy adds new records
without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending
on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error
if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141))
- Incremental models have `incremental_strategy: append` by default. This strategy adds new records without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141))

### Under the hood
- Enable `CREATE OR REPLACE` support when using Delta. Instead of dropping and recreating the table, it will keep the existing table, and add a new version as supported by Delta. This will ensure that the table stays available when running the pipeline, and you can track the history.
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,15 +169,15 @@ The following configurations can be supplied to models run with the dbt-spark pl

dbt has a number of ways to build models incrementally, called "incremental strategies." Some strategies depend on certain file formats, connection types, and other model configurations:
- `append` (default): Insert new records without updating or overwriting any existing data.
- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta`. Not available on Databricks SQL Endpoints. For atomic replacement of Delta tables, use the `table` materialization.]
- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta` or when connectinng via Databricks SQL Endpoints. For atomic replacement of Delta tables, use the `table` materialization instead.]
- `merge`: Match records based on a `unique_key`; update old records, insert new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.) [Requires `file_format: delta`. Available only on Databricks Runtime.]

Examples:

```sql
{{ config(
materialized='incremental',
incremental_strategy='append'
incremental_strategy='append',
) }}


Expand Down
154 changes: 0 additions & 154 deletions dbt/include/spark/macros/materializations/incremental.sql

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{% materialization incremental, adapter='spark' -%}

{#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#}
{%- set raw_file_format = config.get('file_format', default='parquet') -%}
{%- set raw_strategy = config.get('incremental_strategy', default='append') -%}

{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}

{%- set unique_key = config.get('unique_key', none) -%}
{%- set partition_by = config.get('partition_by', none) -%}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{% if strategy == 'insert_overwrite' and partition_by %}
{% call statement() %}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endcall %}
{% endif %}

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %}
{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{% macro get_insert_overwrite_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert overwrite table {{ target_relation }}
{{ partition_cols(label="partition") }}
select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }}

{% endmacro %}


{% macro get_insert_into_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert into table {{ target_relation }}
select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }}

{% endmacro %}


{% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %}
{# ignore dest_columns - we will just use `*` #}

{% set merge_condition %}
{% if unique_key %}
on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% else %}
on false
{% endif %}
{% endset %}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE
{{ merge_condition }}
when matched then update set *
when not matched then insert *
{% endmacro %}


{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %}
{%- if strategy == 'append' -%}
{#-- insert new records into existing table, without updating or overwriting #}
{{ get_insert_into_sql(source, target) }}
{%- elif strategy == 'insert_overwrite' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target) }}
{%- elif strategy == 'merge' -%}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }}
{%- else -%}
{% set no_sql_for_strategy_msg -%}
No known SQL for the incremental strategy provided: {{ strategy }}
{%- endset %}
{%- do exceptions.raise_compiler_error(no_sql_for_strategy_msg) -%}
{%- endif -%}

{% endmacro %}
59 changes: 59 additions & 0 deletions dbt/include/spark/macros/materializations/incremental/validate.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
{% macro dbt_spark_validate_get_file_format(raw_file_format) %}
{#-- Validate the file format #}

{% set accepted_formats = ['text', 'csv', 'json', 'jdbc', 'parquet', 'orc', 'hive', 'delta', 'libsvm'] %}

{% set invalid_file_format_msg -%}
Invalid file format provided: {{ raw_file_format }}
Expected one of: {{ accepted_formats | join(', ') }}
{%- endset %}

{% if raw_file_format not in accepted_formats %}
{% do exceptions.raise_compiler_error(invalid_file_format_msg) %}
{% endif %}

{% do return(raw_file_format) %}
{% endmacro %}


{% macro dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) %}
{#-- Validate the incremental strategy #}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
Expected one of: 'append', 'merge', 'insert_overwrite'
{%- endset %}

{% set invalid_merge_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You can only choose this strategy when file_format is set to 'delta'
{%- endset %}

{% set invalid_insert_overwrite_delta_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You cannot use this strategy when file_format is set to 'delta'
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% set invalid_insert_overwrite_endpoint_msg -%}
Invalid incremental strategy provided: {{ raw_strategy }}
You cannot use this strategy when connecting via endpoint
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% if raw_strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if raw_strategy == 'merge' and file_format != 'delta' %}
{% do exceptions.raise_compiler_error(invalid_merge_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and file_format == 'delta' %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_delta_msg) %}
{% endif %}
{% if raw_strategy == 'insert_overwrite' and target.endpoint %}
{% do exceptions.raise_compiler_error(invalid_insert_overwrite_endpoint_msg) %}
{% endif %}
{% endif %}

{% do return(raw_strategy) %}
{% endmacro %}

0 comments on commit bc18f80

Please sign in to comment.