Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/support incremental predicates #5702

Merged
merged 15 commits into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20220823-085727.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: incremental predicates
time: 2022-08-23T08:57:27.640804-05:00
custom:
Author: dave-connors-3
Issue: "5680"
PR: "5702"
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('incremental_predicates', none) %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
dbeatty10 marked this conversation as resolved.
Show resolved Hide resolved
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}

{% endif %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{% macro get_merge_sql(target, source, unique_key, dest_columns, predicates=none) -%}
{{ adapter.dispatch('get_merge_sql', 'dbt')(target, source, unique_key, dest_columns, predicates) }}
{% macro get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{{ adapter.dispatch('get_merge_sql', 'dbt')(target, source, unique_key, dest_columns, incremental_predicates) }}
{%- endmacro %}

{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{% macro default__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{%- set predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set merge_update_columns = config.get('merge_update_columns') -%}
{%- set merge_exclude_columns = config.get('merge_exclude_columns') -%}
Expand Down Expand Up @@ -32,7 +32,7 @@

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on {{ predicates | join(' and ') }}
on {{"(" ~ predicates | join(") and (") ~ ")"}}

{% if unique_key %}
when matched then update set
Expand All @@ -50,11 +50,11 @@
{% endmacro %}


{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns) }}
{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}
{{ adapter.dispatch('get_delete_insert_merge_sql', 'dbt')(target, source, unique_key, dest_columns, incremental_predicates) }}
{%- endmacro %}

{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) -%}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

Expand All @@ -65,16 +65,26 @@
where (
{% for key in unique_key %}
{{ source }}.{{ key }} = {{ target }}.{{ key }}
{{ "and " if not loop.last }}
{{ "and " if not loop.last}}
{% endfor %}
{% if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{% endif %}
);
{% else %}
delete from {{ target }}
where (
{{ unique_key }}) in (
select ({{ unique_key }})
from {{ source }}
);
)
{%- if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%};

{% endif %}
{% endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

{% macro default__get_incremental_delete_insert_sql(arg_dict) %}

{% do return(get_delete_insert_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"])) %}
{% do return(get_delete_insert_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thought is that we could maybe support both, borrowing from @NiallRees :

{% macro default__get_incremental_merge_sql(arg_dict) %}
  {% if "predicates" in arg_dict %}
    {% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["predicates"])) %}
  {% else %}
    {% do return(get_delete_insert_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %}
{% endmacro %}


{% endmacro %}

Expand All @@ -35,7 +35,7 @@

{% macro default__get_incremental_merge_sql(arg_dict) %}

{% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"])) %}
{% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %}

{% endmacro %}

Expand All @@ -48,7 +48,7 @@

{% macro default__get_incremental_insert_overwrite_sql(arg_dict) %}

{% do return(get_insert_overwrite_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"], arg_dict["predicates"])) %}
{% do return(get_insert_overwrite_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"], arg_dict["incremental_predicates"])) %}

{% endmacro %}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import pytest
from dbt.tests.util import run_dbt, check_relations_equal
from collections import namedtuple


models__delete_insert_incremental_predicates_sql = """
{{ config(
materialized = 'incremental',
unique_key = 'id'
) }}

{% if not is_incremental() %}

select 1 as id, 'hello' as msg, 'blue' as color
union all
select 2 as id, 'goodbye' as msg, 'red' as color

{% else %}

-- delete will not happen on the above record where id = 2, so new record will be inserted instead
select 1 as id, 'hey' as msg, 'blue' as color
union all
select 2 as id, 'yo' as msg, 'green' as color
union all
select 3 as id, 'anyway' as msg, 'purple' as color

{% endif %}
"""

seeds__expected_delete_insert_incremental_predicates_csv = """id,msg,color
1,hey,blue
2,goodbye,red
2,yo,green
3,anyway,purple
"""

ResultHolder = namedtuple(
"ResultHolder",
[
"seed_count",
"model_count",
"seed_rows",
"inc_test_model_count",
"opt_model_count",
"relation",
],
)


class BaseIncrementalPredicates:
@pytest.fixture(scope="class")
def models(self):
return {
"delete_insert_incremental_predicates.sql": models__delete_insert_incremental_predicates_sql
}

@pytest.fixture(scope="class")
def seeds(self):
return {
"expected_delete_insert_incremental_predicates.csv": seeds__expected_delete_insert_incremental_predicates_csv
}

@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+incremental_predicates": [
"id != 2"
],
"+incremental_strategy": "delete+insert"
}
}

def update_incremental_model(self, incremental_model):
"""update incremental model after the seed table has been updated"""
model_result_set = run_dbt(["run", "--select", incremental_model])
return len(model_result_set)

def get_test_fields(
self, project, seed, incremental_model, update_sql_file, opt_model_count=None
):

seed_count = len(run_dbt(["seed", "--select", seed, "--full-refresh"]))

model_count = len(run_dbt(["run", "--select", incremental_model, "--full-refresh"]))
# pass on kwarg
relation = incremental_model
# update seed in anticipation of incremental model update
row_count_query = "select * from {}.{}".format(project.test_schema, seed)
# project.run_sql_file(Path("seeds") / Path(update_sql_file + ".sql"))
seed_rows = len(project.run_sql(row_count_query, fetch="all"))

# propagate seed state to incremental model according to unique keys
inc_test_model_count = self.update_incremental_model(incremental_model=incremental_model)

return ResultHolder(
seed_count, model_count, seed_rows, inc_test_model_count, opt_model_count, relation
)

def check_scenario_correctness(self, expected_fields, test_case_fields, project):
"""Invoke assertions to verify correct build functionality"""
# 1. test seed(s) should build afresh
assert expected_fields.seed_count == test_case_fields.seed_count
# 2. test model(s) should build afresh
assert expected_fields.model_count == test_case_fields.model_count
# 3. seeds should have intended row counts post update
assert expected_fields.seed_rows == test_case_fields.seed_rows
# 4. incremental test model(s) should be updated
assert expected_fields.inc_test_model_count == test_case_fields.inc_test_model_count
# 5. extra incremental model(s) should be built; optional since
# comparison may be between an incremental model and seed
if expected_fields.opt_model_count and test_case_fields.opt_model_count:
assert expected_fields.opt_model_count == test_case_fields.opt_model_count
# 6. result table should match intended result set (itself a relation)
check_relations_equal(
project.adapter, [expected_fields.relation, test_case_fields.relation]
)

def get_expected_fields(self, relation, seed_rows, opt_model_count=None):
return ResultHolder(
seed_count=1,
model_count=1,
inc_test_model_count=1,
seed_rows=seed_rows,
opt_model_count=opt_model_count,
relation=relation
)

# no unique_key test
def test__incremental_predicates(self, project):
"""seed should match model after two incremental runs"""

expected_fields = self.get_expected_fields(relation="expected_delete_insert_incremental_predicates", seed_rows=4)
test_case_fields = self.get_test_fields(
project, seed="expected_delete_insert_incremental_predicates", incremental_model="delete_insert_incremental_predicates", update_sql_file=None
)
self.check_scenario_correctness(expected_fields, test_case_fields, project)


class TestIncrementalPredicatesDeleteInsert(BaseIncrementalPredicates):
pass


class TestPredicatesDeleteInsert(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+predicates": [
"id != 2"
],
"+incremental_strategy": "delete+insert"
}
}