From 087a158bab5413929030ec7bd3360cf67f30f6c3 Mon Sep 17 00:00:00 2001 From: Charlotte van der Scheun Date: Fri, 30 Oct 2020 10:59:15 +0100 Subject: [PATCH 1/4] replace partitionOverwriteMode inside merge strategy --- dbt/include/spark/macros/materializations/incremental.sql | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index f5d7335fb..387c9ae49 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -97,13 +97,12 @@ {% if strategy == 'merge' %} {%- set unique_key = config.require('unique_key') -%} + {% call statement() %} + set spark.sql.sources.partitionOverwriteMode = DYNAMIC + {% endcall %} {% do dbt_spark_validate_merge(file_format) %} {% endif %} - {% call statement() %} - set spark.sql.sources.partitionOverwriteMode = DYNAMIC - {% endcall %} - {% call statement() %} set spark.sql.hive.convertMetastoreParquet = false {% endcall %} From 30b22229623fccdb847c46c913f170d53aa0e5d2 Mon Sep 17 00:00:00 2001 From: Charlotte van der Scheun Date: Mon, 2 Nov 2020 07:50:40 +0100 Subject: [PATCH 2/4] dynamic overwrite when partition || unique key --- dbt/include/spark/macros/materializations/incremental.sql | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index 387c9ae49..037941beb 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -62,6 +62,7 @@ {% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %} + {# ignore dest_columns - we will just use `*` #} merge into {{ target }} as DBT_INTERNAL_DEST using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE @@ -97,10 +98,13 @@ {% if strategy == 'merge' %} {%- set unique_key = config.require('unique_key') -%} + {% do dbt_spark_validate_merge(file_format) %} + {% endif %} + + {% if unique_key or config.get('partition_by') %} {% call statement() %} set spark.sql.sources.partitionOverwriteMode = DYNAMIC {% endcall %} - {% do dbt_spark_validate_merge(file_format) %} {% endif %} {% call statement() %} From f240857d4924eb545d41d6aefd41ae8295eed44d Mon Sep 17 00:00:00 2001 From: charlottevdscheun <65390869+charlottevdscheun@users.noreply.github.com> Date: Mon, 2 Nov 2020 10:05:33 +0100 Subject: [PATCH 3/4] Update dbt/include/spark/macros/materializations/incremental.sql Fokko's suggestion to remove unique key from the if statement Co-authored-by: Fokko Driesprong --- dbt/include/spark/macros/materializations/incremental.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index 037941beb..76c6e490a 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -101,7 +101,7 @@ {% do dbt_spark_validate_merge(file_format) %} {% endif %} - {% if unique_key or config.get('partition_by') %} + {% if config.get('partition_by') %} {% call statement() %} set spark.sql.sources.partitionOverwriteMode = DYNAMIC {% endcall %} From f6d54d7fb41988e18412676e24b297868aa10620 Mon Sep 17 00:00:00 2001 From: Charlotte van der Scheun Date: Mon, 2 Nov 2020 15:19:56 +0100 Subject: [PATCH 4/4] add documentation about insert_overwrite when no partition --- README.md | 2 +- dbt/include/spark/macros/materializations/incremental.sql | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 4b5dc4e95..890c5cc48 100644 --- a/README.md +++ b/README.md @@ -127,7 +127,7 @@ The following configurations can be supplied to models run with the dbt-spark pl **Incremental Models** To use incremental models, specify a `partition_by` clause in your model config. The default incremental strategy used is `insert_overwrite`, which will overwrite the partitions included in your query. Be sure to re-select _all_ of the relevant -data for a partition when using the `insert_overwrite` strategy. +data for a partition when using the `insert_overwrite` strategy. If a `partition_by` config is not specified, dbt will overwrite the entire table as an atomic operation, replacing it with new data of the same schema. This is analogous to `truncate` + `insert`. ``` {{ config( diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index 037941beb..b16acc9c1 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -62,7 +62,6 @@ {% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %} - {# ignore dest_columns - we will just use `*` #} merge into {{ target }} as DBT_INTERNAL_DEST using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE