Skip to content

Commit

Permalink
4. Append-only merge if no unique_key
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Jan 12, 2021
1 parent fb6a4bb commit 6f7e1f2
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 51 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ where date_day::date >= '2019-01-01'
group by 1
```

The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). It also requires you to specify a `unique key` to match existing records.
The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). If a `unique key` is specified, the statement will match existing records and overwrite them with new values. If a `unique key` config is not specified, dbt will simply
append new data to the model, without overwriting any existing data. (For atomic replacement of an entire Delta table, use the `'table'` materialization instead.)

```
{{ config(
Expand Down
32 changes: 13 additions & 19 deletions dbt/include/spark/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -70,23 +70,21 @@
{% do return(strategy) %}
{% endmacro %}

{% macro dbt_spark_validate_merge(file_format) %}
{% set invalid_file_format_msg -%}
You can only choose the 'merge' incremental_strategy when file_format is set to 'delta'
{%- endset %}

{% if file_format != 'delta' %}
{% do exceptions.raise_compiler_error(invalid_file_format_msg) %}
{% endif %}

{% endmacro %}


{% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %}
{# ignore dest_columns - we will just use `*` #}

{% set merge_condition %}
{% if unique_key %}
on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% else %}
on false
{% endif %}
{% endset %}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{{ merge_condition }}
when matched then update set *
when not matched then insert *
{% endmacro %}
Expand All @@ -106,21 +104,17 @@

{% materialization incremental, adapter='spark' -%}
{#-- Validate early so we don't run SQL if the file_format is invalid --#}
{% set file_format = dbt_spark_validate_get_file_format() -%}
{%- set file_format = dbt_spark_validate_get_file_format() -%}
{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%}
{%- set unique_key = config.get('unique_key', none) -%}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{% if strategy == 'merge' %}
{%- set unique_key = config.require('unique_key') -%}
{% do dbt_spark_validate_merge(file_format) %}
{% endif %}

{% if strategy == 'insert_overwrite' and config.get('partition_by') %}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endif %}
Expand Down
11 changes: 0 additions & 11 deletions test/integration/spark-databricks-http.dbtspec
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,6 @@ target:
connect_retries: 5
connect_timeout: 60
projects:
- overrides: incremental
paths:
"models/incremental.sql":
materialized: incremental
body: "select * from {{ source('raw', 'seed') }}"
facts:
base:
rowcount: 10
added:
rowcount: 20
- overrides: snapshot_strategy_check_cols
dbt_project_yml: &file_format_delta
# we're going to UPDATE the seed tables as part of testing, so we must make them delta format
Expand All @@ -40,4 +30,3 @@ sequences:
test_dbt_data_test: data_test
test_dbt_ephemeral_data_tests: data_test_ephemeral_models
test_dbt_schema_test: schema_test

10 changes: 0 additions & 10 deletions test/integration/spark-databricks-odbc-cluster.dbtspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ target:
connect_retries: 5
connect_timeout: 60
projects:
- overrides: incremental
paths:
"models/incremental.sql":
materialized: incremental
body: "select * from {{ source('raw', 'seed') }}"
facts:
base:
rowcount: 10
added:
rowcount: 20
- overrides: snapshot_strategy_check_cols
dbt_project_yml: &file_format_delta
# we're going to UPDATE the seed tables as part of testing, so we must make them delta format
Expand Down
10 changes: 0 additions & 10 deletions test/integration/spark-databricks-odbc-sql-endpoint.dbtspec
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ target:
connect_retries: 5
connect_timeout: 60
projects:
- overrides: incremental
paths:
"models/incremental.sql":
materialized: incremental
body: "select * from {{ source('raw', 'seed') }}"
facts:
base:
rowcount: 10
added:
rowcount: 20
- overrides: snapshot_strategy_check_cols
dbt_project_yml: &file_format_delta
# we're going to UPDATE the seed tables as part of testing, so we must make them delta format
Expand Down

0 comments on commit 6f7e1f2

Please sign in to comment.