Skip to content

Commit

Permalink
Add microbatch strategy (#924)
Browse files Browse the repository at this point in the history
* Add microbatch strategy

This work is basically in entirety a duplicate of the work done by
MichelleArk in dbt-labs/dbt-snowflake#1179.
I don't really expect this to work first try, but it might. I expect
to need to do some edits, but who knows, maybe I'll get lucky.

* Add changie doc

* Add comment to microbatch macro to explain why we are re-implementing delete+insert

* Add `begin` to microbatch config in test_incremental_microbatch.py

* Cleanup predicates in microbatch materialization

* Fix predicate/incremental predicate logic in microbatch macro

* Remove unnecessary `if` in microbatch macro

The `if` is unnecessary because predicates are guaranteed to exist,
but the `if` was guarding against when there are no predicates.

* Get batch start and end time in the same way

* Remove unnecessary `target` specifications for columns of predicates in microbatch materialization

The `target.` portion of `target.<column_name>` is unnecessary for the predicates in the
microbatch materialization macro because the delete statement already ensures the "targeting`
of `target` in the delete statement via the clause `delete from {{ target }}`. Said another way,
there is no use of the word `using` in the delete clause, thus it is unambiguous what is being
deleted from.

---------

Co-authored-by: Michelle Ark <MichelleArk@users.noreply.github.com>
Co-authored-by: Mike Alfare <13974384+mikealfare@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 7, 2024
1 parent 1943ac5 commit fccbe2d
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 1 deletion.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20241002-171112.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Add microbatch strategy
time: 2024-10-02T17:11:12.88725-05:00
custom:
Author: QMalcolm
Issue: "923"
2 changes: 1 addition & 1 deletion dbt/adapters/redshift/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "delete+insert", "merge"]
return ["append", "delete+insert", "merge", "microbatch"]

def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
return f"{add_to} + interval '{number} {interval}'"
Expand Down
47 changes: 47 additions & 0 deletions dbt/include/redshift/macros/materializations/incremental_merge.sql
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,50 @@
)

{% endmacro %}

{% macro redshift__get_incremental_microbatch_sql(arg_dict) %}
{#-
Technically this function could just call out to the default implementation of delete_insert.
However, the default implementation requires a unique_id, which we actually do not want or
need. Thus we re-implement delete insert here without the unique_id requirement
-#}

{%- set target = arg_dict["target_relation"] -%}
{%- set source = arg_dict["temp_relation"] -%}
{%- set dest_columns = arg_dict["dest_columns"] -%}
{%- set predicates = [] -%}

{%- set incremental_predicates = [] if arg_dict.get('incremental_predicates') is none else arg_dict.get('incremental_predicates') -%}
{%- for pred in incremental_predicates -%}
{% if "DBT_INTERNAL_DEST." in pred %}
{%- set pred = pred | replace("DBT_INTERNAL_DEST.", target ~ "." ) -%}
{% endif %}
{% if "dbt_internal_dest." in pred %}
{%- set pred = pred | replace("dbt_internal_dest.", target ~ "." ) -%}
{% endif %}
{% do predicates.append(pred) %}
{% endfor %}

{% if not model.config.get("__dbt_internal_microbatch_event_time_start") or not model.config.get("__dbt_internal_microbatch_event_time_end") -%}
{% do exceptions.raise_compiler_error('dbt could not compute the start and end timestamps for the running batch') %}
{% endif %}

{#-- Add additional incremental_predicates to filter for batch --#}
{% do predicates.append(model.config.event_time ~ " >= TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_start ~ "'") %}
{% do predicates.append(model.config.event_time ~ " < TIMESTAMP '" ~ model.config.__dbt_internal_microbatch_event_time_end ~ "'") %}
{% do arg_dict.update({'incremental_predicates': predicates}) %}

delete from {{ target }}
where (
{% for predicate in predicates %}
{%- if not loop.first %}and {% endif -%} {{ predicate }}
{% endfor %}
);

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import pytest
from dbt.tests.adapter.incremental.test_incremental_microbatch import (
BaseMicrobatch,
)


# No requirement for a unique_id for redshift microbatch!
_microbatch_model_no_unique_id_sql = """
{{ config(materialized='incremental', incremental_strategy='microbatch', event_time='event_time', batch_size='day', begin=modules.datetime.datetime(2020, 1, 1, 0, 0, 0)) }}
select * from {{ ref('input_model') }}
"""


class TestSnowflakeMicrobatch(BaseMicrobatch):
@pytest.fixture(scope="class")
def microbatch_model_sql(self) -> str:
return _microbatch_model_no_unique_id_sql

@pytest.fixture(scope="class")
def insert_two_rows_sql(self, project) -> str:
test_schema_relation = project.adapter.Relation.create(
database=project.database, schema=project.test_schema
)
return f"insert into {test_schema_relation}.input_model (id, event_time) values (4, '2020-01-04 00:00:00-0'), (5, '2020-01-05 00:00:00-0')"

0 comments on commit fccbe2d

Please sign in to comment.