Skip to content

Commit

Permalink
Merge pull request #141 from fishtown-analytics/feature/rationalize-i…
Browse files Browse the repository at this point in the history
…ncremental

Rationalize incremental materialization
  • Loading branch information
jtcohen6 authored Feb 19, 2021
2 parents afea257 + b8c7d77 commit 1ac9fda
Show file tree
Hide file tree
Showing 31 changed files with 695 additions and 195 deletions.
24 changes: 19 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
- store_artifacts:
path: ./logs

integration-spark-databricks-odbc:
integration-spark-databricks-odbc-cluster: &databricks-odbc
environment:
DBT_INVOCATION_ENV: circle
ODBC_DRIVER: Simba # TODO: move env var to Docker image
Expand All @@ -74,7 +74,18 @@ jobs:
- checkout
- run:
name: Run integration tests
command: tox -e integration-spark-databricks-odbc-cluster,integration-spark-databricks-odbc-sql-endpoint
command: tox -e integration-spark-databricks-odbc-cluster
no_output_timeout: 1h
- store_artifacts:
path: ./logs

integration-spark-databricks-odbc-endpoint:
<<: *databricks-odbc
steps:
- checkout
- run:
name: Run integration tests
command: tox -e integration-spark-databricks-odbc-sql-endpoint
no_output_timeout: 1h
- store_artifacts:
path: ./logs
Expand All @@ -89,7 +100,10 @@ workflows:
- unit
- integration-spark-databricks-http:
requires:
- unit
- integration-spark-databricks-odbc:
- integration-spark-thrift
- integration-spark-databricks-odbc-cluster:
requires:
- unit
- integration-spark-thrift
- integration-spark-databricks-odbc-endpoint:
requires:
- integration-spark-thrift
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ dbt-integration-tests
test/integration/.user.yml
.DS_Store
.vscode
*.log
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## dbt-spark 0.19.0 (Release TBD)

### Breaking changes
- Incremental models have `incremental_strategy: append` by default. This strategy adds new records without updating or overwriting existing records. For that, use `merge` or `insert_overwrite` instead, depending on the file format, connection method, and attributes of your underlying data. dbt will try to raise a helpful error if you configure a strategy that is not supported for a given file format or connection. ([#140](https://github.com/fishtown-analytics/dbt-spark/pull/140), [#141](https://github.com/fishtown-analytics/dbt-spark/pull/141))

### Fixes
- Capture hard-deleted records in snapshot merge, when `invalidate_hard_deletes` config is set ([#109](https://github.com/fishtown-analytics/dbt-spark/pull/143), [#126](https://github.com/fishtown-analytics/dbt-spark/pull/144))

Expand Down
47 changes: 33 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,48 +161,67 @@ The following configurations can be supplied to models run with the dbt-spark pl
| partition_by | Partition the created table by the specified columns. A directory is created for each partition. | Optional | `partition_1` |
| clustered_by | Each partition in the created table will be split into a fixed number of buckets by the specified columns. | Optional | `cluster_1` |
| buckets | The number of buckets to create while clustering | Required if `clustered_by` is specified | `8` |
| incremental_strategy | The strategy to use for incremental models (`insert_overwrite` or `merge`). Note `merge` requires `file_format` = `delta` and `unique_key` to be specified. | Optional (default: `insert_overwrite`) | `merge` |
| incremental_strategy | The strategy to use for incremental models (`append`, `insert_overwrite`, or `merge`). | Optional (default: `append`) | `merge` |
| persist_docs | Whether dbt should include the model description as a table `comment` | Optional | `{'relation': true}` |


**Incremental Models**

To use incremental models, specify a `partition_by` clause in your model config. The default incremental strategy used is `insert_overwrite`, which will overwrite the partitions included in your query. Be sure to re-select _all_ of the relevant
data for a partition when using the `insert_overwrite` strategy. If a `partition_by` config is not specified, dbt will overwrite the entire table as an atomic operation, replacing it with new data of the same schema. This is analogous to `truncate` + `insert`.
dbt has a number of ways to build models incrementally, called "incremental strategies." Some strategies depend on certain file formats, connection types, and other model configurations:
- `append` (default): Insert new records without updating or overwriting any existing data.
- `insert_overwrite`: If `partition_by` is specified, overwrite partitions in the table with new data. (Be sure to re-select _all_ of the relevant data for a partition.) If no `partition_by` is specified, overwrite the entire table with new data. [Cannot be used with `file_format: delta` or when connectinng via Databricks SQL Endpoints. For dynamic partition replacement with `method: odbc` + Databricks `cluster`, you must you **must** include `set spark.sql.sources.partitionOverwriteMode DYNAMIC` in the [cluster SparkConfig](https://docs.databricks.com/clusters/configure.html#spark-config). For atomic replacement of Delta tables, use the `table` materialization instead.]
- `merge`: Match records based on a `unique_key`; update old records, insert new ones. (If no `unique_key` is specified, all new data is inserted, similar to `append`.) [Requires `file_format: delta`. Available only on Databricks Runtime.]

Examples:

```sql
{{ config(
materialized='incremental',
incremental_strategy='append',
) }}


-- All rows returned by this query will be appended to the existing table

select * from {{ ref('events') }}
{% if is_incremental() %}
where event_ts > (select max(event_ts) from {{ this }})
{% endif %}
```

```sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
partition_by=['date_day'],
file_format='parquet'
) }}

/*
Every partition returned by this query will be overwritten
when this model runs
*/
-- Every partition returned by this query will overwrite existing partitions

select
date_day,
count(*) as users

from {{ ref('events') }}
where date_day::date >= '2019-01-01'
{% if is_incremental() %}
where date_day > (select max(date_day) from {{ this }})
{% endif %}
group by 1
```

The `merge` strategy is only supported when using file_format `delta` (supported in Databricks). It also requires you to specify a `unique key` to match existing records.

```
```sql
{{ config(
materialized='incremental',
incremental_strategy='merge',
partition_by=['date_day'],
unique_key='event_id',
file_format='delta'
) }}

select *
from {{ ref('events') }}
-- Existing events, matched on `event_id`, will be updated
-- New events will be appended

select * from {{ ref('events') }}
{% if is_incremental() %}
where date_day > (select max(date_day) from {{ this }})
{% endif %}
Expand Down
133 changes: 0 additions & 133 deletions dbt/include/spark/macros/materializations/incremental.sql

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
{% materialization incremental, adapter='spark' -%}

{#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#}
{%- set raw_file_format = config.get('file_format', default='parquet') -%}
{%- set raw_strategy = config.get('incremental_strategy', default='append') -%}

{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}

{%- set unique_key = config.get('unique_key', none) -%}
{%- set partition_by = config.get('partition_by', none) -%}

{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{% if strategy == 'insert_overwrite' and partition_by %}
{% call statement() %}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endcall %}
{% endif %}

{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{% do adapter.drop_relation(existing_relation) %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %}
{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}

{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{% macro get_insert_overwrite_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert overwrite table {{ target_relation }}
{{ partition_cols(label="partition") }}
select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }}

{% endmacro %}


{% macro get_insert_into_sql(source_relation, target_relation) %}

{%- set dest_columns = adapter.get_columns_in_relation(target_relation) -%}
{%- set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') -%}
insert into table {{ target_relation }}
select {{dest_cols_csv}} from {{ source_relation.include(database=false, schema=false) }}

{% endmacro %}


{% macro spark__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %}
{# ignore dest_columns - we will just use `*` #}

{% set merge_condition %}
{% if unique_key %}
on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
{% else %}
on false
{% endif %}
{% endset %}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE
{{ merge_condition }}
when matched then update set *
when not matched then insert *
{% endmacro %}


{% macro dbt_spark_get_incremental_sql(strategy, source, target, unique_key) %}
{%- if strategy == 'append' -%}
{#-- insert new records into existing table, without updating or overwriting #}
{{ get_insert_into_sql(source, target) }}
{%- elif strategy == 'insert_overwrite' -%}
{#-- insert statements don't like CTEs, so support them via a temp view #}
{{ get_insert_overwrite_sql(source, target) }}
{%- elif strategy == 'merge' -%}
{#-- merge all columns with databricks delta - schema changes are handled for us #}
{{ get_merge_sql(target, source, unique_key, dest_columns=none, predicates=none) }}
{%- else -%}
{% set no_sql_for_strategy_msg -%}
No known SQL for the incremental strategy provided: {{ strategy }}
{%- endset %}
{%- do exceptions.raise_compiler_error(no_sql_for_strategy_msg) -%}
{%- endif -%}

{% endmacro %}
Loading

0 comments on commit 1ac9fda

Please sign in to comment.