Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp incremental by period and make it work for BQ #42

Merged
merged 6 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions insert_by_period/integration_tests/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ dbt_modules/
logs/
.env/
profiles.yml
package-lock.yml
2 changes: 1 addition & 1 deletion insert_by_period/integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@

name: 'dbt_utils_integration_tests'
name: 'insert_by_period_integration_tests'
version: '1.0'

profile: 'integration_tests'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{{
config(
materialized = 'view',
enabled=(target.type == 'snowflake')
enabled=(project_name == 'insert_by_period_integration_tests'),
)
}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
config(
materialized = 'insert_by_period',
period = 'month',
timestamp_field = 'created_at',
timestamp_field = 'cast(created_at as timestamp)',
start_date = '2018-01-01',
stop_date = '2018-06-01',
enabled=(target.type == 'snowflake')
enabled=(project_name == 'insert_by_period_integration_tests'),
)
}}

Expand Down
26 changes: 24 additions & 2 deletions insert_by_period/macros/get_period_boundaries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
{% call statement('period_boundaries', fetch_result=True) -%}
with data as (
select
coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp,
coalesce(max({{timestamp_field}}), '{{start_date}}')::timestamp as start_timestamp,
coalesce(
{{ dateadd('millisecond',
-1,
"nullif('" ~ stop_date ~ "','')::timestamp") }},
{{ dbt.current_timestamp() }}
) as stop_timestamp
from "{{target_schema}}"."{{target_table}}"
from {{adapter.quote(target_schema)}}.{{adapter.quote(target_table)}}
)

select
Expand All @@ -27,3 +27,25 @@
{%- endcall %}

{%- endmacro %}


{% macro bigquery__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%}

{% call statement('period_boundaries', fetch_result=True) -%}
with data as (
select
coalesce(max({{timestamp_field}}), cast('{{start_date}}' as timestamp)) as start_timestamp,
coalesce(datetime_add(cast(nullif('{{stop_date}}','') as timestamp), interval -1 millisecond), {{dbt.current_timestamp()}}) as stop_timestamp
from {{adapter.quote(target_schema)}}.{{adapter.quote(target_table)}}
)

select
start_timestamp,
stop_timestamp,
{{ datediff('start_timestamp',
'stop_timestamp',
period) }} + 1 as num_periods
from data
{%- endcall %}

{%- endmacro %}
dbeatty10 marked this conversation as resolved.
Show resolved Hide resolved
25 changes: 22 additions & 3 deletions insert_by_period/macros/get_period_sql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
{% macro default__get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

{%- set period_filter -%}
("{{timestamp_field}}" > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
"{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and
"{{timestamp_field}}" < '{{stop_timestamp}}'::timestamp)
({{timestamp_field}} > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and
{{timestamp_field}} <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and
{{timestamp_field}} < '{{stop_timestamp}}'::timestamp)
dbeatty10 marked this conversation as resolved.
Show resolved Hide resolved
{%- endset -%}

{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}
Expand All @@ -19,3 +19,22 @@
) target_cols

{%- endmacro %}


{% macro bigquery__get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%}

{%- set period_filter -%}
({{timestamp_field}} > cast(cast(timestamp('{{start_timestamp}}') as datetime) + interval {{offset}} {{period}} as timestamp) and
{{timestamp_field}} <= cast(cast(timestamp('{{start_timestamp}}') as datetime) + interval {{offset}} {{period}} + interval 1 {{period}} as timestamp) and
{{timestamp_field}} < cast('{{stop_timestamp}}' as timestamp))
{%- endset -%}
dbeatty10 marked this conversation as resolved.
Show resolved Hide resolved

{%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%}

select
{{target_cols_csv}}
from (
{{filtered_sql}}
) target_cols

{%- endmacro %}
34 changes: 17 additions & 17 deletions insert_by_period/macros/insert_by_period_materialization.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,10 @@
{%- set old_relation = none -%}
{%- endif %}

{{run_hooks(pre_hooks, inside_transaction=False)}}
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `begin` happens here, so `commit` after it to finish the transaction
{{run_hooks(pre_hooks, inside_transaction=True)}}
{% call statement() -%}
begin; -- make extra sure we've closed out the transaction
commit;
{%- endcall %}
-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% if force_create or old_relation is none -%}
Expand Down Expand Up @@ -97,7 +93,7 @@
(
select
{{target_cols_csv}}
from {{tmp_relation.include(schema=False)}}
from {{tmp_relation.include(schema=True)}}
);
{%- endcall %}
{% set result = load_result('main-' ~ i) %}
Expand All @@ -115,17 +111,21 @@

{%- endfor %}

{% call statement() -%}
begin;
{%- endcall %}
-- from the table mat
{% do create_indexes(target_relation) %}

{{run_hooks(post_hooks, inside_transaction=True)}}
{{ run_hooks(post_hooks, inside_transaction=True) }}

{% call statement() -%}
commit;
{%- endcall %}
{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{run_hooks(post_hooks, inside_transaction=False)}}
{{ run_hooks(post_hooks, inside_transaction=False) }}
-- end from the table mat

{%- set status_string = "INSERT " ~ loop_vars['sum_rows_inserted'] -%}

Expand All @@ -136,4 +136,4 @@
-- Return the relations created in this materialization
{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
{%- endmaterialization %}