From c8e3770e077e8c54026156b14e61133ef59fa7ff Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Wed, 13 Jan 2021 17:30:14 +0100 Subject: [PATCH] Introduce 'append' strategy as default --- CHANGELOG.md | 4 ++ README.md | 49 +++++++++++++------ .../macros/materializations/incremental.sql | 31 +++++++++--- 3 files changed, 60 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e6e1560c0..31499aa9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ### Breaking changes - Users of the `http` and `thrift` connection methods need to install extra requirements: `pip install dbt-spark[PyHive]` ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/109), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/126)) +- Incremental models have `incremental_strategy: append` by default. This strategy adds new records +without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending +on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error +if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141)) ### Under the hood - Enable `CREATE OR REPLACE` support when using Delta. Instead of dropping and recreating the table, it will keep the existing table, and add a new version as supported by Delta. This will ensure that the table stays available when running the pipeline, and you can track the history. diff --git a/README.md b/README.md index e9cee6ade..38a1e8ec4 100644 --- a/README.md +++ b/README.md @@ -161,50 +161,67 @@ The following configurations can be supplied to models run with the dbt-spark pl | partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` | | clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` | | buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` | -| incremental_strategy | The strategy to use for incremental models (`insert_overwrite` or `merge`). Note `merge` requires `file_format` = `delta` and `unique_key` to be specified. | Optional (default: `insert_overwrite`) | `merge` | +| incremental_strategy | The strategy to use for incremental models (`append`, `insert_overwrite`, or `merge`). | Optional (default: `append`) | `merge` | | persist_docs | Whether dbt should include the model description as a table `comment` | Optional | `{'relation': true}` | **Incremental Models** -To use incremental models, specify a `partition_by` clause in your model config. The default incremental strategy used is `insert_overwrite`, which will overwrite the partitions included in your query. Be sure to re-select _all_ of the relevant -data for a partition when using the `insert_overwrite` strategy. If a `partition_by` config is not specified, dbt will simply -append new data to the model, without overwriting any existing data. +dbt has a number of ways to build models incrementally, called "incremental strategies." Some strategies depend on certain file formats, connection types, and other model configurations: +- `append` (default): Insert new records without updating or overwriting any existing data. +- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta`. Not available on Databricks SQL Endpoints. For atomic replacement of Delta tables, use the `table` materialization.] +- `merge`: Match records based on a `unique_key`; update old records, insert new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.) [Requires `file_format: delta`. Available only on Databricks Runtime.] +Examples: + +```sql +{{ config( + materialized='incremental', + incremental_strategy='append' +) }} + + +-- All rows returned by this query will be appended to the existing table + +select * from {{ ref('events') }} +{% if is_incremental() %} + where event_ts > (select max(event_ts) from {{ this }}) +{% endif %} ``` + +```sql {{ config( materialized='incremental', + incremental_strategy='merge', partition_by=['date_day'], file_format='parquet' ) }} -/* - Every partition returned by this query will be overwritten - when this model runs -*/ +-- Every partition returned by this query will overwrite existing partitions select date_day, count(*) as users from {{ ref('events') }} -where date_day::date >= '2019-01-01' +{% if is_incremental() %} + where date_day > (select max(date_day) from {{ this }}) +{% endif %} group by 1 ``` -The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). If a `unique key` is specified, the statement will match existing records and overwrite them with new values. If a `unique key` config is not specified, dbt will simply -append new data to the model, without overwriting any existing data. (For atomic replacement of an entire Delta table, use the `'table'` materialization instead.) - -``` +```sql {{ config( materialized='incremental', incremental_strategy='merge', - partition_by=['date_day'], + unique_key='event_id', file_format='delta' ) }} -select * -from {{ ref('events') }} +-- Existing events, matched on `event_id`, will be updated +-- New events will be appended + +select * from {{ ref('events') }} {% if is_incremental() %} where date_day > (select max(date_day) from {{ this }}) {% endif %} diff --git a/dbt/include/spark/macros/materializations/incremental.sql b/dbt/include/spark/macros/materializations/incremental.sql index d912a45f9..bb6d5dc0c 100644 --- a/dbt/include/spark/macros/materializations/incremental.sql +++ b/dbt/include/spark/macros/materializations/incremental.sql @@ -8,6 +8,17 @@ {% endmacro %} + +{% macro get_insert_into_sql(source_relation, target_relation) %} + + {%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%} + {%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%} + insert into table {{ target_relation }} + select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }} + +{% endmacro %} + + {% macro dbt_spark_validate_get_file_format() %} {#-- Find and validate the file format #} {%- set file_format = config.get("file_format", default="parquet") -%} @@ -24,13 +35,14 @@ {% do return(file_format) %} {% endmacro %} + {% macro dbt_spark_validate_get_incremental_strategy(file_format) %} {#-- Find and validate the incremental strategy #} - {%- set strategy = config.get("incremental_strategy", default="insert_overwrite") -%} + {%- set strategy = config.get("incremental_strategy", default="append") -%} {% set invalid_strategy_msg -%} Invalid incremental strategy provided: {{ strategy }} - Expected one of: 'merge', 'insert_overwrite' + Expected one of: 'append', 'merge', 'insert_overwrite' {%- endset %} {% set invalid_merge_msg -%} @@ -41,16 +53,16 @@ {% set invalid_insert_overwrite_delta_msg -%} Invalid incremental strategy provided: {{ strategy }} You cannot use this strategy when file_format is set to 'delta' - Use the `merge` strategy instead + Use the 'append' or 'merge' strategy instead {%- endset %} {% set invalid_insert_overwrite_endpoint_msg -%} Invalid incremental strategy provided: {{ strategy }} You cannot use this strategy when connecting via endpoint - Use `incremental_strategy: merge` with `file_format: delta` instead + Use the 'append' or 'merge' strategy instead {%- endset %} - {% if strategy not in ['merge', 'insert_overwrite'] %} + {% if strategy not in ['append', 'merge', 'insert_overwrite'] %} {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} {%-else %} {% if strategy == 'merge' and file_format != 'delta' %} @@ -88,11 +100,14 @@ {% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %} - {%- if strategy == 'insert_overwrite' -%} + {%- if strategy == 'append' -%} + {#-- insert new records into existing table, without updating or overwriting #} + {{ get_insert_into_sql(source, target) }} + {%- elif strategy == 'insert_overwrite' -%} {#-- insert statements don't like CTEs, so support them via a temp view #} {{ get_insert_overwrite_sql(source, target) }} - {%- else -%} - {#-- merge all columns with databricks delta - schema changes are handled for us #} + {%- elif strategy == 'merge' -%} + {#-- merge all columns with databricks delta - schema changes are handled for us #} {{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }} {%- endif -%}