Skip to content

Commit

Permalink
Introduce 'append' strategy as default
Browse files Browse the repository at this point in the history
  • Loading branch information
jtcohen6 committed Jan 13, 2021
1 parent b60d7c2 commit c8e3770
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 24 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

### Breaking changes
- Users of the `http` and `thrift` connection methods need to install extra requirements: `pip install dbt-spark[PyHive]` ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/109), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/126))
- Incremental models have `incremental_strategy: append` by default. This strategy adds new records
without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending
on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error
if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141))

### Under the hood
- Enable `CREATE OR REPLACE` support when using Delta. Instead of dropping and recreating the table, it will keep the existing table, and add a new version as supported by Delta. This will ensure that the table stays available when running the pipeline, and you can track the history.
Expand Down
49 changes: 33 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,50 +161,67 @@ The following configurations can be supplied to models run with the dbt-spark pl
| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` |
| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` |
| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` |
| incremental_strategy | The strategy to use for incremental models (`insert_overwrite` or `merge`). Note `merge` requires `file_format` = `delta` and `unique_key` to be specified. | Optional (default: `insert_overwrite`) | `merge` |
| incremental_strategy | The strategy to use for incremental models (`append`, `insert_overwrite`, or `merge`). | Optional (default: `append`) | `merge` |
| persist_docs | Whether dbt should include the model description as a table `comment` | Optional | `{'relation': true}` |


**Incremental Models**

To use incremental models, specify a `partition_by` clause in your model config. The default incremental strategy used is `insert_overwrite`, which will overwrite the partitions included in your query. Be sure to re-select _all_ of the relevant
data for a partition when using the `insert_overwrite` strategy. If a `partition_by` config is not specified, dbt will simply
append new data to the model, without overwriting any existing data.
dbt has a number of ways to build models incrementally, called "incremental strategies." Some strategies depend on certain file formats, connection types, and other model configurations:
- `append` (default): Insert new records without updating or overwriting any existing data.
- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta`. Not available on Databricks SQL Endpoints. For atomic replacement of Delta tables, use the `table` materialization.]
- `merge`: Match records based on a `unique_key`; update old records, insert new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.) [Requires `file_format: delta`. Available only on Databricks Runtime.]

Examples:

```sql
{{ config(
materialized='incremental',
incremental_strategy='append'
) }}


-- All rows returned by this query will be appended to the existing table

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}
```

```sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
partition_by=['date_day'],
file_format='parquet'
) }}

/*
Every partition returned by this query will be overwritten
when this model runs
*/
-- Every partition returned by this query will overwrite existing partitions

select
date_day,
count(*) as users

from {{ ref('events') }}
where date_day::date >= '2019-01-01'
{% if is_incremental() %}
where date_day > (select max(date_day) from {{ this }})
{% endif %}
group by 1
```

The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). If a `unique key` is specified, the statement will match existing records and overwrite them with new values. If a `unique key` config is not specified, dbt will simply
append new data to the model, without overwriting any existing data. (For atomic replacement of an entire Delta table, use the `'table'` materialization instead.)

```
```sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
partition_by=['date_day'],
unique_key='event_id',
file_format='delta'
) }}

select *
from {{ ref('events') }}
-- Existing events, matched on `event_id`, will be updated
-- New events will be appended

select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day > (select max(date_day) from {{ this }})
{% endif %}
Expand Down
31 changes: 23 additions & 8 deletions dbt/include/spark/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@

{% endmacro %}


{% macro get_insert_into_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert into table {{ target_relation }}
select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }}

{% endmacro %}


{% macro dbt_spark_validate_get_file_format() %}
{#-- Find and validate the file format #}
{%- set file_format = config.get("file_format", default="parquet") -%}
Expand All @@ -24,13 +35,14 @@
{% do return(file_format) %}
{% endmacro %}


{% macro dbt_spark_validate_get_incremental_strategy(file_format) %}
{#-- Find and validate the incremental strategy #}
{%- set strategy = config.get("incremental_strategy", default="insert_overwrite") -%}
{%- set strategy = config.get("incremental_strategy", default="append") -%}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ strategy }}
Expected one of: 'merge', 'insert_overwrite'
Expected one of: 'append', 'merge', 'insert_overwrite'
{%- endset %}

{% set invalid_merge_msg -%}
Expand All @@ -41,16 +53,16 @@
{% set invalid_insert_overwrite_delta_msg -%}
Invalid incremental strategy provided: {{ strategy }}
You cannot use this strategy when file_format is set to 'delta'
Use the `merge` strategy instead
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% set invalid_insert_overwrite_endpoint_msg -%}
Invalid incremental strategy provided: {{ strategy }}
You cannot use this strategy when connecting via endpoint
Use `incremental_strategy: merge` with `file_format: delta` instead
Use the 'append' or 'merge' strategy instead
{%- endset %}

{% if strategy not in ['merge', 'insert_overwrite'] %}
{% if strategy not in ['append', 'merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{%-else %}
{% if strategy == 'merge' and file_format != 'delta' %}
Expand Down Expand Up @@ -88,11 +100,14 @@


{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %}
{%- if strategy == 'insert_overwrite' -%}
{%- if strategy == 'append' -%}
{#-- insert new records into existing table, without updating or overwriting #}
{{ get_insert_into_sql(source, target) }}
{%- elif strategy == 'insert_overwrite' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target) }}
{%- else -%}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{%- elif strategy == 'merge' -%}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }}
{%- endif -%}

Expand Down

0 comments on commit c8e3770

Please sign in to comment.