From 6f7e1f2f4241e1890c5f236f8338469392a6d7f1 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Tue, 12 Jan 2021 14:08:05 +0100 Subject: [PATCH] 4. Append-only merge if no unique_key --- README.md | 3 +- .../macros/materializations/incremental.sql | 32 ++++++++----------- .../integration/spark-databricks-http.dbtspec | 11 ------- .../spark-databricks-odbc-cluster.dbtspec | 10 ------ ...spark-databricks-odbc-sql-endpoint.dbtspec | 10 ------ 5 files changed, 15 insertions(+), 51 deletions(-) diff --git a/README.md b/README.md index 9297c9833..e9cee6ade 100644 --- a/README.md +++ b/README.md @@ -192,7 +192,8 @@ where date_day::date >= '2019-01-01' group by 1 ``` -The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). It also requires you to specify a `unique key` to match existing records. +The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). If a `unique key` is specified, the statement will match existing records and overwrite them with new values. If a `unique key` config is not specified, dbt will simply +append new data to the model, without overwriting any existing data. (For atomic replacement of an entire Delta table, use the `'table'` materialization instead.) ``` {{ config( diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index dd159cf12..dde4bdd0f 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -70,23 +70,21 @@ {% do return(strategy) %} {% endmacro %} -{% macro dbt_spark_validate_merge(file_format) %} - {% set invalid_file_format_msg -%} - You can only choose the 'merge' incremental_strategy when file_format is set to 'delta' - {%- endset %} - - {% if file_format != 'delta' %} - {% do exceptions.raise_compiler_error(invalid_file_format_msg) %} - {% endif %} - -{% endmacro %} - {% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %} {# ignore dest_columns - we will just use `*` #} + + {% set merge_condition %} + {% if unique_key %} + on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {% else %} + on false + {% endif %} + {% endset %} + merge into {{ target }} as DBT_INTERNAL_DEST using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE - on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }} + {{ merge_condition }} when matched then update set * when not matched then insert * {% endmacro %} @@ -106,9 +104,10 @@ {% materialization incremental, adapter='spark' -%} {#-- Validate early so we don't run SQL if the file_format is invalid --#} - {% set file_format = dbt_spark_validate_get_file_format() -%} + {%- set file_format = dbt_spark_validate_get_file_format() -%} {#-- Validate early so we don't run SQL if the strategy is invalid --#} - {% set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%} + {%- set strategy = dbt_spark_validate_get_incremental_strategy(file_format) -%} + {%- set unique_key = config.get('unique_key', none) -%} {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} @@ -116,11 +115,6 @@ {% set existing_relation = load_relation(this) %} {% set tmp_relation = make_temp_relation(this) %} - {% if strategy == 'merge' %} - {%- set unique_key = config.require('unique_key') -%} - {% do dbt_spark_validate_merge(file_format) %} - {% endif %} - {% if strategy == 'insert_overwrite' and config.get('partition_by') %} set spark.sql.sources.partitionOverwriteMode = DYNAMIC {% endif %} diff --git a/test/integration/spark-databricks-http.dbtspec b/test/integration/spark-databricks-http.dbtspec index c20e4242b..67342da39 100644 --- a/test/integration/spark-databricks-http.dbtspec +++ b/test/integration/spark-databricks-http.dbtspec @@ -9,16 +9,6 @@ target: connect_retries: 5 connect_timeout: 60 projects: - - overrides: incremental - paths: - "models/incremental.sql": - materialized: incremental - body: "select * from {{ source('raw', 'seed') }}" - facts: - base: - rowcount: 10 - added: - rowcount: 20 - overrides: snapshot_strategy_check_cols dbt_project_yml: &file_format_delta # we're going to UPDATE the seed tables as part of testing, so we must make them delta format @@ -40,4 +30,3 @@ sequences: test_dbt_data_test: data_test test_dbt_ephemeral_data_tests: data_test_ephemeral_models test_dbt_schema_test: schema_test - diff --git a/test/integration/spark-databricks-odbc-cluster.dbtspec b/test/integration/spark-databricks-odbc-cluster.dbtspec index 8dc4975ea..b320dc3a4 100644 --- a/test/integration/spark-databricks-odbc-cluster.dbtspec +++ b/test/integration/spark-databricks-odbc-cluster.dbtspec @@ -10,16 +10,6 @@ target: connect_retries: 5 connect_timeout: 60 projects: - - overrides: incremental - paths: - "models/incremental.sql": - materialized: incremental - body: "select * from {{ source('raw', 'seed') }}" - facts: - base: - rowcount: 10 - added: - rowcount: 20 - overrides: snapshot_strategy_check_cols dbt_project_yml: &file_format_delta # we're going to UPDATE the seed tables as part of testing, so we must make them delta format diff --git a/test/integration/spark-databricks-odbc-sql-endpoint.dbtspec b/test/integration/spark-databricks-odbc-sql-endpoint.dbtspec index 0251cb312..0aa7be765 100644 --- a/test/integration/spark-databricks-odbc-sql-endpoint.dbtspec +++ b/test/integration/spark-databricks-odbc-sql-endpoint.dbtspec @@ -10,16 +10,6 @@ target: connect_retries: 5 connect_timeout: 60 projects: - - overrides: incremental - paths: - "models/incremental.sql": - materialized: incremental - body: "select * from {{ source('raw', 'seed') }}" - facts: - base: - rowcount: 10 - added: - rowcount: 20 - overrides: snapshot_strategy_check_cols dbt_project_yml: &file_format_delta # we're going to UPDATE the seed tables as part of testing, so we must make them delta format