Skip to content

Commit

Permalink
Feature/incremental predicates (#161)
Browse files Browse the repository at this point in the history
### Description

Updates the incremental materialization to include user supplied `incremental_predicates` and passes that to the `dbt_spark_get_incremental_sql` macro. 

The changes to `dbt_spark_get_incremental_sql` are updated in [dbt-spark PR #436](dbt-labs/dbt-spark#436)

Co-authored-by: Takuya UESHIN <ueshin@databricks.com>
  • Loading branch information
dave-connors-3 and ueshin authored Dec 15, 2022
1 parent 1d85eb5 commit 8124c35
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

### Features
- Support Python 3.11 ([#233](https://github.com/databricks/dbt-databricks/pull/233))
- Support `incremental_predicates` ([#161](https://github.com/databricks/dbt-databricks/pull/161))

## dbt-databricks 1.3.3 (Release TBD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

{#-- Set vars --#}

{%- set incremental_predicates = config.get('predicates', default=none) or config.get('incremental_predicates', default=none) -%}
{%- set unique_key = config.get('unique_key', none) -%}
{%- set partition_by = config.get('partition_by', none) -%}
{%- set language = model['language'] -%}
Expand Down Expand Up @@ -54,7 +55,7 @@
'temp_relation': temp_relation,
'unique_key': unique_key,
'dest_columns': none,
'predicates': none}) -%}
'incremental_predicates': incremental_predicates}) -%}
{%- set build_sql = strategy_sql_macro_func(strategy_arg_dict) -%}
{%- if language == 'sql' -%}
{%- call statement('main') -%}
Expand Down
55 changes: 55 additions & 0 deletions tests/functional/adapter/test_incremental_predicates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates


models__databricks_incremental_predicates_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id'
) }}
{% if not is_incremental() %}
select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color
{% else %}
-- merge will not happen on the above record where id = 2, so new record will fall to insert
select cast(1 as bigint) as id, 'hey' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'yo' as msg, 'green' as color
union all
select cast(3 as bigint) as id, 'anyway' as msg, 'purple' as color
{% endif %}
"""


class TestIncrementalPredicatesMergeDatabricks(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"+incremental_predicates": ["dbt_internal_dest.id != 2"]}}

@pytest.fixture(scope="class")
def models(self):
return {
"delete_insert_incremental_predicates.sql": (
models__databricks_incremental_predicates_sql
)
}


class TestPredicatesMergeDatabricks(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"+predicates": ["dbt_internal_dest.id != 2"]}}

@pytest.fixture(scope="class")
def models(self):
return {
"delete_insert_incremental_predicates.sql": (
models__databricks_incremental_predicates_sql
)
}

0 comments on commit 8124c35

Please sign in to comment.