From 3493d3f30c2f4f92ec8457c0f981128e367db806 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 17 Aug 2021 10:16:08 +0200 Subject: [PATCH 01/30] Update legacy macro to 0.21.latest incremental --- .../insert_by_period_materialization.sql | 270 ++++++------------ 1 file changed, 85 insertions(+), 185 deletions(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 9b43bdbb..0de610fb 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -1,189 +1,89 @@ -{% macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} - {{ return(adapter.dispatch('get_period_boundaries', 'dbt_utils')(target_schema, target_table, timestamp_field, start_date, stop_date, period)) }} -{% endmacro %} - -{% macro default__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} - - {% call statement('period_boundaries', fetch_result=True) -%} - with data as ( - select - coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp, - coalesce( - {{dbt_utils.dateadd('millisecond', - -1, - "nullif('" ~ stop_date ~ "','')::timestamp")}}, - {{dbt_utils.current_timestamp()}} - ) as stop_timestamp - from "{{target_schema}}"."{{target_table}}" - ) - - select - start_timestamp, - stop_timestamp, - {{dbt_utils.datediff('start_timestamp', - 'stop_timestamp', - period)}} + 1 as num_periods - from data - {%- endcall %} - -{%- endmacro %} - -{% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} - {{ return(adapter.dispatch('get_period_sql', 'dbt_utils')(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset)) }} -{% endmacro %} - -{% macro default__get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} - - {%- set period_filter -%} - ("{{timestamp_field}}" > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and - "{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and - "{{timestamp_field}}" < '{{stop_timestamp}}'::timestamp) - {%- endset -%} - - {%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%} - - select - {{target_cols_csv}} - from ( - {{filtered_sql}} - ) - -{%- endmacro %} {% materialization insert_by_period, default -%} - {%- set timestamp_field = config.require('timestamp_field') -%} - {%- set start_date = config.require('start_date') -%} - {%- set stop_date = config.get('stop_date') or '' -%}} - {%- set period = config.get('period') or 'week' -%} - - {%- if sql.find('__PERIOD_FILTER__') == -1 -%} - {%- set error_message -%} - Model '{{ model.unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql - {%- endset -%} - {{ exceptions.raise_compiler_error(error_message) }} - {%- endif -%} - - {%- set identifier = model['name'] -%} - - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - {%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, type='table') -%} - - {%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%} - {%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%} - - {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} - {%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%} - - {%- set should_truncate = (non_destructive_mode and full_refresh_mode and exists_as_table) -%} - {%- set should_drop = (not should_truncate and (full_refresh_mode or exists_not_as_table)) -%} - {%- set force_create = (flags.FULL_REFRESH and not flags.NON_DESTRUCTIVE) -%} - - -- setup - {% if old_relation is none -%} - -- noop - {%- elif should_truncate -%} - {{adapter.truncate_relation(old_relation)}} - {%- elif should_drop -%} - {{adapter.drop_relation(old_relation)}} - {%- set old_relation = none -%} - {%- endif %} - - {{run_hooks(pre_hooks, inside_transaction=False)}} - - -- `begin` happens here, so `commit` after it to finish the transaction - {{run_hooks(pre_hooks, inside_transaction=True)}} - {% call statement() -%} - begin; -- make extra sure we've closed out the transaction - commit; - {%- endcall %} - - -- build model - {% if force_create or old_relation is none -%} - {# Create an empty target table -#} - {% call statement('main') -%} - {%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%} - {{create_table_as(False, target_relation, empty_sql)}}; - {%- endcall %} - {%- endif %} - - {% set _ = dbt_utils.get_period_boundaries(schema, - identifier, - timestamp_field, - start_date, - stop_date, - period) %} - {%- set start_timestamp = load_result('period_boundaries')['data'][0][0] | string -%} - {%- set stop_timestamp = load_result('period_boundaries')['data'][0][1] | string -%} - {%- set num_periods = load_result('period_boundaries')['data'][0][2] | int -%} - - {% set target_columns = adapter.get_columns_in_relation(target_relation) %} - {%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%} - {%- set loop_vars = {'sum_rows_inserted': 0} -%} - - -- commit each period as a separate transaction - {% for i in range(num_periods) -%} - {%- set msg = "Running for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) -%} - {{ dbt_utils.log_info(msg) }} - - {%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period' ~ i ~ '_tmp' -%} - {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, - schema=schema, type='table') -%} - {% call statement() -%} - {% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv, - sql, - timestamp_field, - period, - start_timestamp, - stop_timestamp, - i) %} - {{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}} - {%- endcall %} - - {{adapter.expand_target_column_types(from_relation=tmp_relation, - to_relation=target_relation)}} - {%- set name = 'main-' ~ i -%} - {% call statement(name, fetch_result=True) -%} - insert into {{target_relation}} ({{target_cols_csv}}) - ( - select - {{target_cols_csv}} - from {{tmp_relation.include(schema=False)}} - ); - {%- endcall %} - {% set result = load_result('main-' ~ i) %} - {% if 'response' in result.keys() %} {# added in v0.19.0 #} - {% set rows_inserted = result['response']['rows_affected'] %} - {% else %} {# older versions #} - {% set rows_inserted = result['status'].split(" ")[2] | int %} - {% endif %} - - {%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%} - {%- if loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} {% endif -%} - - {%- set msg = "Ran for " ~ period ~ " " ~ (i + 1) ~ " of " ~ (num_periods) ~ "; " ~ rows_inserted ~ " records inserted" -%} - {{ dbt_utils.log_info(msg) }} - - {%- endfor %} - - {% call statement() -%} - begin; - {%- endcall %} - - {{run_hooks(post_hooks, inside_transaction=True)}} - - {% call statement() -%} - commit; - {%- endcall %} - - {{run_hooks(post_hooks, inside_transaction=False)}} - - {%- set status_string = "INSERT " ~ loop_vars['sum_rows_inserted'] -%} - - {% call noop_statement('main', status_string) -%} - -- no-op - {%- endcall %} - - -- Return the relations created in this materialization - {{ return({'relations': [target_relation]}) }} + + {% set unique_key = config.get('unique_key') %} + + {% set target_relation = this.incorporate(type='table') %} + {% set existing_relation = load_relation(this) %} + {% set tmp_relation = make_temp_relation(target_relation) %} + {%- set full_refresh_mode = (should_full_refresh()) -%} + + {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} + + {% set tmp_identifier = model['name'] + '__dbt_tmp' %} + {% set backup_identifier = model['name'] + "__dbt_backup" %} + + -- the intermediate_ and backup_ relations should not already exist in the database; get_relation + -- will return None in that case. Otherwise, we get a relation that we can drop + -- later, before we try to use this name for the current operation. This has to happen before + -- BEGIN, in a separate transaction + {% set preexisting_intermediate_relation = adapter.get_relation(identifier=tmp_identifier, + schema=schema, + database=database) %} + {% set preexisting_backup_relation = adapter.get_relation(identifier=backup_identifier, + schema=schema, + database=database) %} + {{ drop_relation_if_exists(preexisting_intermediate_relation) }} + {{ drop_relation_if_exists(preexisting_backup_relation) }} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% set to_drop = [] %} + + {# -- first check whether we want to full refresh for source view or config reasons #} + {% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %} + + {% if existing_relation is none %} + {% set build_sql = create_table_as(False, target_relation, sql) %} +{% elif trigger_full_refresh %} + {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} + {% set tmp_identifier = model['name'] + '__dbt_tmp' %} + {% set backup_identifier = model['name'] + '__dbt_backup' %} + {% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %} + {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} + + {% set build_sql = create_table_as(False, intermediate_relation, sql) %} + {% set need_swap = true %} + {% do to_drop.append(backup_relation) %} + {% else %} + {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% do adapter.expand_target_column_types( + from_relation=tmp_relation, + to_relation=target_relation) %} + {% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} + {% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} + + {% endif %} + + {% call statement("main") %} + {{ build_sql }} + {% endcall %} + + {% if need_swap %} + {% do adapter.rename_relation(target_relation, backup_relation) %} + {% do adapter.rename_relation(intermediate_relation, target_relation) %} + {% endif %} + + {% do persist_docs(target_relation, model) %} + + {% if existing_relation is none or existing_relation.is_view or should_full_refresh() %} + {% do create_indexes(target_relation) %} + {% endif %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + -- `COMMIT` happens here + {% do adapter.commit() %} + + {% for rel in to_drop %} + {% do adapter.drop_relation(rel) %} + {% endfor %} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {{ return({'relations': [target_relation]}) }} {%- endmaterialization %} From 051a2e4b79bb505fa77770947755f501520eb86b Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 17 Aug 2021 14:36:47 +0200 Subject: [PATCH 02/30] Raise error if no filter specified --- .../insert_by_period_materialization.sql | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 0de610fb..20fbea6c 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -1,5 +1,13 @@ {% materialization insert_by_period, default -%} + {%- if sql.find('__PERIOD_FILTER__') == -1 -%} + {%- set error_message -%} + Model '{{ model.unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql + {%- endset -%} + {{ exceptions.raise_compiler_error(error_message) }} + {%- endif -%} + + {%- set start_date = config.require('start_date') -%} {% set unique_key = config.get('unique_key') %} @@ -38,7 +46,7 @@ {% if existing_relation is none %} {% set build_sql = create_table_as(False, target_relation, sql) %} -{% elif trigger_full_refresh %} + {% elif trigger_full_refresh %} {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} {% set tmp_identifier = model['name'] + '__dbt_tmp' %} {% set backup_identifier = model['name'] + '__dbt_backup' %} From 8536054579af23004ebd9791c4bcdebe6a094ea8 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 17 Aug 2021 14:37:14 +0200 Subject: [PATCH 03/30] Create empty table first --- macros/materializations/insert_by_period_materialization.sql | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 20fbea6c..35575691 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -45,7 +45,8 @@ {% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %} {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) %} + {%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%} -- We create an empty table + {% set build_sql = create_table_as(False, target_relation, empty_sql) %} {% elif trigger_full_refresh %} {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} {% set tmp_identifier = model['name'] + '__dbt_tmp' %} From 96ff9ede1797f880117f86187a4d1445593195d6 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Mon, 23 Aug 2021 10:40:18 +0200 Subject: [PATCH 04/30] Add configurations and period filter --- .../materializations/insert_by_period_materialization.sql | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 35575691..340b150e 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -1,5 +1,6 @@ {% materialization insert_by_period, default -%} + -- If there is no __PERIOD_FILTER__ specified, raise error. (Maybe create a macro for this.) {%- if sql.find('__PERIOD_FILTER__') == -1 -%} {%- set error_message -%} Model '{{ model.unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql @@ -7,9 +8,14 @@ {{ exceptions.raise_compiler_error(error_message) }} {%- endif -%} + -- Configuration (required) + {%- set timestamp_field = config.require('timestamp_field') -%} {%- set start_date = config.require('start_date') -%} - {% set unique_key = config.get('unique_key') %} + -- Configuration (others) + {%- set unique_key = config.get('unique_key') -%} + {%- set stop_date = config.get('stop_date') or '' -%}} + {%- set period = config.get('period') or 'day' -%} {% set target_relation = this.incorporate(type='table') %} {% set existing_relation = load_relation(this) %} From 7ac2058f9b0b6756e05a232f47923e283fa664ad Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Mon, 23 Aug 2021 14:52:03 +0200 Subject: [PATCH 05/30] Make macro work on manually defined simple example --- .../insert_by_period_materialization.sql | 129 +++++++++++++++++- 1 file changed, 124 insertions(+), 5 deletions(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 340b150e..2d05f28d 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -1,3 +1,9 @@ +{% macro simple_call(sql_code) -%} + select * + from ( + {{sql_code}} + ) +{% endmacro %} {% materialization insert_by_period, default -%} -- If there is no __PERIOD_FILTER__ specified, raise error. (Maybe create a macro for this.) @@ -15,7 +21,7 @@ -- Configuration (others) {%- set unique_key = config.get('unique_key') -%} {%- set stop_date = config.get('stop_date') or '' -%}} - {%- set period = config.get('period') or 'day' -%} + {%- set period = config.get('period') or 'hour' -%} {% set target_relation = this.incorporate(type='table') %} {% set existing_relation = load_relation(this) %} @@ -53,6 +59,8 @@ {% if existing_relation is none %} {%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%} -- We create an empty table {% set build_sql = create_table_as(False, target_relation, empty_sql) %} + + {{ dbt_utils.log_info("We are in the existing_relation is none and creating an empty table.") }} {% elif trigger_full_refresh %} {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} {% set tmp_identifier = model['name'] + '__dbt_tmp' %} @@ -60,23 +68,134 @@ {% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %} {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} - {% set build_sql = create_table_as(False, intermediate_relation, sql) %} + {% set unfiltered_sql = sql | replace("__PERIOD_FILTER__", 'true') %} -- Period filter should have no effect + {% set build_sql = create_table_as(False, intermediate_relation, unfiltered_sql) %} {% set need_swap = true %} {% do to_drop.append(backup_relation) %} - {% else %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} + + {{ dbt_utils.log_info("We are in the elif trigger_full_refresh. So __PERIOD_FILTER__ is just set to true, thus it should have no effect.") }} + {% endif %} + + -- Let's worry about this later. (If the table exists and it's not a full refresh) + {% if existing_relation and not trigger_full_refresh %} + {% set unfiltered_sql = sql | replace("__PERIOD_FILTER__", 'true') %} -- Period filter should have no effect. This is a BUG here. + {% do run_query(create_table_as(True, tmp_relation, unfiltered_sql)) %} {% do adapter.expand_target_column_types( from_relation=tmp_relation, to_relation=target_relation) %} {% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} {% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} - + + {{ dbt_utils.log_info("We are in our custom case.") }} {% endif %} + {{ dbt_utils.log_info("Calling the empty table creation statement.") }} {% call statement("main") %} {{ build_sql }} {% endcall %} + -- Now that we have an empty table, let's put something in it. + + -- 1st iteration + {%- set msg = "Running for " ~ period ~ " 1 of 2" -%} + {{ dbt_utils.log_info(msg) }} + + {%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period_1_tmp' -%} + {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, + schema=schema, type='table') -%} + + -- this is a macro (start) + {%- set period_filter -%} + ({{timestamp_field}} > '2021-08-23 00:00:00'::timestamp + interval '0 {{period}}' and + {{timestamp_field}} <= '2021-08-23 00:00:00'::timestamp + interval '0 {{period}}' + interval '1 {{period}}' and + {{timestamp_field}} < '{{stop_date}}'::timestamp) + {%- endset -%} + + {%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%} -- Filtering for 1st period + -- (end) + + {{ dbt_utils.log_info("We are now calling the 1st iteration statement.") }} + + {% call statement() -%} + {% set tmp_table_sql = dbt_utils.simple_call(filtered_sql) %} + {{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}} + {%- endcall %} + + {{adapter.expand_target_column_types(from_relation=tmp_relation, + to_relation=target_relation)}} + + {{ dbt_utils.log_info("We are now inserting the result of the 1st iteration.") }} + + {%- set name = 'main-1' -%} + {% call statement(name, fetch_result=True) -%} + insert into {{target_relation}} + ( + select + * + from {{tmp_relation.include(schema=False)}} + ); + {%- endcall %} + + {% set result = load_result('main-1') %} + {% if 'response' in result.keys() %} {# added in v0.19.0 #} + {% set rows_inserted = result['response']['rows_affected'] %} + {% else %} {# older versions #} + {% set rows_inserted = result['status'].split(" ")[2] | int %} + {% endif %} + + {%- set msg = "Ran for " ~ period ~ " 1 of 2; " ~ rows_inserted ~ " records inserted" -%} + {{ dbt_utils.log_info(msg) }} + + -- 2nd iteration + {%- set msg = "Running for " ~ period ~ " 2 of 2" -%} + {{ dbt_utils.log_info(msg) }} + + {%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period_2_tmp' -%} + {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, + schema=schema, type='table') -%} + + -- this is a macro (start) + {%- set period_filter -%} + ({{timestamp_field}} > '2021-08-23 00:00:00'::timestamp + interval '1 {{period}}' and + {{timestamp_field}} <= '2021-08-23 00:00:00'::timestamp + interval '1 {{period}}' + interval '1 {{period}}' and + {{timestamp_field}} < '{{stop_date}}'::timestamp) + {%- endset -%} + + {%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%} -- Filtering for 1st period + -- (end) + + {{ dbt_utils.log_info("We are now calling the 2nd iteration statement.") }} + + {% call statement() -%} + {% set tmp_table_sql = dbt_utils.simple_call(filtered_sql) %} + {{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}} + {%- endcall %} + + {{adapter.expand_target_column_types(from_relation=tmp_relation, + to_relation=target_relation)}} + + {{ dbt_utils.log_info("We are now inserting the result of the 2nd iteration.") }} + + {%- set name = 'main-2' -%} + {% call statement(name, fetch_result=True) -%} + insert into {{target_relation}} + ( + select + * + from {{tmp_relation.include(schema=False)}} + ); + {%- endcall %} + + {% set result = load_result('main-2') %} + {% if 'response' in result.keys() %} {# added in v0.19.0 #} + {% set rows_inserted = result['response']['rows_affected'] %} + {% else %} {# older versions #} + {% set rows_inserted = result['status'].split(" ")[2] | int %} + {% endif %} + + {%- set msg = "Ran for " ~ period ~ " 2 of 2; " ~ rows_inserted ~ " records inserted" -%} + {{ dbt_utils.log_info(msg) }} + {% if need_swap %} {% do adapter.rename_relation(target_relation, backup_relation) %} {% do adapter.rename_relation(intermediate_relation, target_relation) %} From 364772bbe00319e923fc0615b19a709fa9fec5e1 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Mon, 23 Aug 2021 15:12:45 +0200 Subject: [PATCH 06/30] Implement for loop with predefined time boundaries --- .../insert_by_period_materialization.sql | 115 ++++++------------ 1 file changed, 34 insertions(+), 81 deletions(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 2d05f28d..8f5f3450 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -96,105 +96,58 @@ -- Now that we have an empty table, let's put something in it. - -- 1st iteration - {%- set msg = "Running for " ~ period ~ " 1 of 2" -%} - {{ dbt_utils.log_info(msg) }} + {%- set number_of_periods = 12 -%} -- define some number manually for testing purposes - {%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period_1_tmp' -%} - {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, - schema=schema, type='table') -%} - - -- this is a macro (start) - {%- set period_filter -%} - ({{timestamp_field}} > '2021-08-23 00:00:00'::timestamp + interval '0 {{period}}' and - {{timestamp_field}} <= '2021-08-23 00:00:00'::timestamp + interval '0 {{period}}' + interval '1 {{period}}' and - {{timestamp_field}} < '{{stop_date}}'::timestamp) - {%- endset -%} - - {%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%} -- Filtering for 1st period - -- (end) - - {{ dbt_utils.log_info("We are now calling the 1st iteration statement.") }} - - {% call statement() -%} - {% set tmp_table_sql = dbt_utils.simple_call(filtered_sql) %} - {{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}} - {%- endcall %} - - {{adapter.expand_target_column_types(from_relation=tmp_relation, - to_relation=target_relation)}} - - {{ dbt_utils.log_info("We are now inserting the result of the 1st iteration.") }} - - {%- set name = 'main-1' -%} - {% call statement(name, fetch_result=True) -%} - insert into {{target_relation}} - ( - select - * - from {{tmp_relation.include(schema=False)}} - ); - {%- endcall %} + {% for i in range(number_of_periods) -%} + {%- set msg = "Running for " ~ period ~ " " ~ i ~ " of " ~ number_of_periods -%} + {{ dbt_utils.log_info(msg) }} - {% set result = load_result('main-1') %} - {% if 'response' in result.keys() %} {# added in v0.19.0 #} - {% set rows_inserted = result['response']['rows_affected'] %} - {% else %} {# older versions #} - {% set rows_inserted = result['status'].split(" ")[2] | int %} - {% endif %} - - {%- set msg = "Ran for " ~ period ~ " 1 of 2; " ~ rows_inserted ~ " records inserted" -%} - {{ dbt_utils.log_info(msg) }} - - -- 2nd iteration - {%- set msg = "Running for " ~ period ~ " 2 of 2" -%} - {{ dbt_utils.log_info(msg) }} - - {%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period_2_tmp' -%} - {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, + {%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period_' ~ i ~ '_tmp' -%} + {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, schema=schema, type='table') -%} - -- this is a macro (start) - {%- set period_filter -%} - ({{timestamp_field}} > '2021-08-23 00:00:00'::timestamp + interval '1 {{period}}' and - {{timestamp_field}} <= '2021-08-23 00:00:00'::timestamp + interval '1 {{period}}' + interval '1 {{period}}' and - {{timestamp_field}} < '{{stop_date}}'::timestamp) - {%- endset -%} + -- this is a macro (start) + {%- set period_filter -%} + ({{timestamp_field}} > '2021-08-23 00:00:00'::timestamp + interval '{{i}} {{period}}' and + {{timestamp_field}} <= '2021-08-23 00:00:00'::timestamp + interval '{{i}} {{period}}' + interval '1 {{period}}' and + {{timestamp_field}} < '{{stop_date}}'::timestamp) + {%- endset -%} - {%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%} -- Filtering for 1st period - -- (end) + {%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%} -- Filtering for 1st period + -- (end) - {{ dbt_utils.log_info("We are now calling the 2nd iteration statement.") }} + {{ dbt_utils.log_info("We are now calling the " ~ (i + 1) ~ ". iteration statement.") }} - {% call statement() -%} - {% set tmp_table_sql = dbt_utils.simple_call(filtered_sql) %} - {{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}} - {%- endcall %} + {% call statement() -%} + {% set tmp_table_sql = dbt_utils.simple_call(filtered_sql) %} + {{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}} + {%- endcall %} - {{adapter.expand_target_column_types(from_relation=tmp_relation, + {{adapter.expand_target_column_types(from_relation=tmp_relation, to_relation=target_relation)}} - {{ dbt_utils.log_info("We are now inserting the result of the 2nd iteration.") }} + {{ dbt_utils.log_info("We are now inserting the result of the " ~ (i + 1) ~ ". iteration.") }} - {%- set name = 'main-2' -%} - {% call statement(name, fetch_result=True) -%} - insert into {{target_relation}} - ( - select - * - from {{tmp_relation.include(schema=False)}} - ); - {%- endcall %} + {%- set name = 'main-' ~ i -%} + {% call statement(name, fetch_result=True) -%} + insert into {{target_relation}} + ( + select + * + from {{tmp_relation.include(schema=False)}} + ); + {%- endcall %} - {% set result = load_result('main-2') %} + {% set result = load_result('main-' ~ i) %} {% if 'response' in result.keys() %} {# added in v0.19.0 #} {% set rows_inserted = result['response']['rows_affected'] %} {% else %} {# older versions #} {% set rows_inserted = result['status'].split(" ")[2] | int %} {% endif %} - {%- set msg = "Ran for " ~ period ~ " 2 of 2; " ~ rows_inserted ~ " records inserted" -%} - {{ dbt_utils.log_info(msg) }} + {%- set msg = "Ran for " ~ period ~ " " ~ i ~ " of " ~ number_of_periods ~ "; " ~ rows_inserted ~ " records inserted" -%} + {{ dbt_utils.log_info(msg) }} + {%- endfor %} {% if need_swap %} {% do adapter.rename_relation(target_relation, backup_relation) %} From 147494a3a4bc14476c14b508d9ec81c9be7f94a4 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Mon, 23 Aug 2021 16:23:32 +0200 Subject: [PATCH 07/30] Move period boundary macros to helpers.sql --- macros/materializations/helpers.sql | 51 +++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 macros/materializations/helpers.sql diff --git a/macros/materializations/helpers.sql b/macros/materializations/helpers.sql new file mode 100644 index 00000000..4e242a73 --- /dev/null +++ b/macros/materializations/helpers.sql @@ -0,0 +1,51 @@ +{% macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} + {{ return(adapter.dispatch('get_period_boundaries', 'dbt_utils')(target_schema, target_table, timestamp_field, start_date, stop_date, period)) }} +{% endmacro %} + +{% macro default__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} + + {% call statement('period_boundaries', fetch_result=True) -%} + with data as ( + select + coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp, + coalesce( + {{dbt_utils.dateadd('millisecond', + -1, + "nullif('" ~ stop_date ~ "','')::timestamp")}}, + {{dbt_utils.current_timestamp()}} + ) as stop_timestamp + from "{{target_schema}}"."{{target_table}}" + ) + + select + start_timestamp, + stop_timestamp, + {{dbt_utils.datediff('start_timestamp', + 'stop_timestamp', + period)}} + 1 as num_periods + from data + {%- endcall %} + +{%- endmacro %} + +{% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} + {{ return(adapter.dispatch('get_period_sql', 'dbt_utils')(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset)) }} +{% endmacro %} + +{% macro default__get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} + + {%- set period_filter -%} + ("{{timestamp_field}}" > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and + "{{timestamp_field}}" <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and + "{{timestamp_field}}" < '{{stop_timestamp}}'::timestamp) + {%- endset -%} + + {%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%} + + select + {{target_cols_csv}} + from ( + {{filtered_sql}} + ) + +{%- endmacro %} From 3bb17d5e503e13696f7e6f5f573168d8671f972e Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 24 Aug 2021 08:43:25 +0200 Subject: [PATCH 08/30] Modify helper macros to support Snowflake --- macros/materializations/helpers.sql | 47 +++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/macros/materializations/helpers.sql b/macros/materializations/helpers.sql index 4e242a73..706a7198 100644 --- a/macros/materializations/helpers.sql +++ b/macros/materializations/helpers.sql @@ -28,6 +28,35 @@ {%- endmacro %} +{% macro snowflake__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} + + {% call statement('period_boundaries', fetch_result=True) -%} + {%- set model_name = model['name'] -%} + + with data as ( + select + coalesce(max({{timestamp_field}}), '{{start_date}}')::timestamp as start_timestamp, + coalesce( + {{dbt_utils.dateadd('millisecond', + -1, + "nullif('" ~ stop_date ~ "','')::timestamp")}}, + {{dbt_utils.current_timestamp()}} + ) as stop_timestamp + from {{target.schema}}.{{model_name}} + ) + + select + start_timestamp, + stop_timestamp, + {{dbt_utils.datediff('start_timestamp', + 'stop_timestamp', + period)}} + 1 as num_periods + from data + {%- endcall %} + +{%- endmacro %} + + {% macro get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} {{ return(adapter.dispatch('get_period_sql', 'dbt_utils')(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset)) }} {% endmacro %} @@ -49,3 +78,21 @@ ) {%- endmacro %} + +{% macro snowflake__get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, offset) -%} + + {%- set period_filter -%} + ({{timestamp_field}} > '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' and + {{timestamp_field}} <= '{{start_timestamp}}'::timestamp + interval '{{offset}} {{period}}' + interval '1 {{period}}' and + {{timestamp_field}} < '{{stop_timestamp}}'::timestamp) + {%- endset -%} + + {%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%} + + select + {{target_cols_csv}} + from ( + {{filtered_sql}} + ) + +{%- endmacro %} From a8d32598dadba524c17d98f8a4c6d4d8d3551473 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 24 Aug 2021 09:06:26 +0200 Subject: [PATCH 09/30] Use macros to automate the for loop --- .../insert_by_period_materialization.sql | 45 ++++++++++--------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 8f5f3450..4b10fb6b 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -1,9 +1,3 @@ -{% macro simple_call(sql_code) -%} - select * - from ( - {{sql_code}} - ) -{% endmacro %} {% materialization insert_by_period, default -%} -- If there is no __PERIOD_FILTER__ specified, raise error. (Maybe create a macro for this.) @@ -96,30 +90,37 @@ -- Now that we have an empty table, let's put something in it. - {%- set number_of_periods = 12 -%} -- define some number manually for testing purposes - - {% for i in range(number_of_periods) -%} - {%- set msg = "Running for " ~ period ~ " " ~ i ~ " of " ~ number_of_periods -%} + {% set _ = dbt_utils.get_period_boundaries(schema, + identifier, + timestamp_field, + start_date, + stop_date, + period) %} + {%- set start_timestamp = load_result('period_boundaries')['data'][0][0] | string -%} + {%- set stop_timestamp = load_result('period_boundaries')['data'][0][1] | string -%} + {%- set num_periods = load_result('period_boundaries')['data'][0][2] | int -%} + + {% set target_columns = adapter.get_columns_in_relation(target_relation) %} + {%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%} + + {% for i in range(num_periods) -%} + {%- set msg = "Running for " ~ period ~ " " ~ (i + 1) ~ " of " ~ num_periods -%} {{ dbt_utils.log_info(msg) }} {%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period_' ~ i ~ '_tmp' -%} {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, schema=schema, type='table') -%} - -- this is a macro (start) - {%- set period_filter -%} - ({{timestamp_field}} > '2021-08-23 00:00:00'::timestamp + interval '{{i}} {{period}}' and - {{timestamp_field}} <= '2021-08-23 00:00:00'::timestamp + interval '{{i}} {{period}}' + interval '1 {{period}}' and - {{timestamp_field}} < '{{stop_date}}'::timestamp) - {%- endset -%} - - {%- set filtered_sql = sql | replace("__PERIOD_FILTER__", period_filter) -%} -- Filtering for 1st period - -- (end) - {{ dbt_utils.log_info("We are now calling the " ~ (i + 1) ~ ". iteration statement.") }} {% call statement() -%} - {% set tmp_table_sql = dbt_utils.simple_call(filtered_sql) %} + {% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv, + sql, + timestamp_field, + period, + start_timestamp, + stop_timestamp, + i) %} {{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}} {%- endcall %} @@ -145,7 +146,7 @@ {% set rows_inserted = result['status'].split(" ")[2] | int %} {% endif %} - {%- set msg = "Ran for " ~ period ~ " " ~ i ~ " of " ~ number_of_periods ~ "; " ~ rows_inserted ~ " records inserted" -%} + {%- set msg = "Ran for " ~ period ~ " " ~ ( i + 1) ~ " of " ~ num_periods ~ "; " ~ rows_inserted ~ " records inserted" -%} {{ dbt_utils.log_info(msg) }} {%- endfor %} From 3e100c2cb1328ffa788a4ce65acde02a92400998 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Thu, 26 Aug 2021 10:18:39 +0200 Subject: [PATCH 10/30] Refactor and add logging, comments --- .../insert_by_period_materialization.sql | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 4b10fb6b..83051ad5 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -55,6 +55,11 @@ {% set build_sql = create_table_as(False, target_relation, empty_sql) %} {{ dbt_utils.log_info("We are in the existing_relation is none and creating an empty table.") }} + {{ dbt_utils.log_info("Calling the empty table creation statement.") }} + + {% call statement("main") %} + {{ build_sql }} + {% endcall %} {% elif trigger_full_refresh %} {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} {% set tmp_identifier = model['name'] + '__dbt_tmp' %} @@ -72,7 +77,7 @@ -- Let's worry about this later. (If the table exists and it's not a full refresh) {% if existing_relation and not trigger_full_refresh %} - {% set unfiltered_sql = sql | replace("__PERIOD_FILTER__", 'true') %} -- Period filter should have no effect. This is a BUG here. + {% set unfiltered_sql = sql | replace("__PERIOD_FILTER__", 'false') %} -- Period filter should zero out everything. {% do run_query(create_table_as(True, tmp_relation, unfiltered_sql)) %} {% do adapter.expand_target_column_types( from_relation=tmp_relation, @@ -83,11 +88,6 @@ {{ dbt_utils.log_info("We are in our custom case.") }} {% endif %} - {{ dbt_utils.log_info("Calling the empty table creation statement.") }} - {% call statement("main") %} - {{ build_sql }} - {% endcall %} - -- Now that we have an empty table, let's put something in it. {% set _ = dbt_utils.get_period_boundaries(schema, @@ -141,12 +141,17 @@ {% set result = load_result('main-' ~ i) %} {% if 'response' in result.keys() %} {# added in v0.19.0 #} + -- for some reason, if the result has 0 rows, then this doesn't work. + -- but only if it's the last... after the custom case. + -- no error if the table is created in the same run {% set rows_inserted = result['response']['rows_affected'] %} + {{ dbt_utils.log_info(result['response']) }} + {{ dbt_utils.log_info(result) }} {% else %} {# older versions #} {% set rows_inserted = result['status'].split(" ")[2] | int %} {% endif %} - {%- set msg = "Ran for " ~ period ~ " " ~ ( i + 1) ~ " of " ~ num_periods ~ "; " ~ rows_inserted ~ " records inserted" -%} + {%- set msg = "Ran for " ~ period ~ " " ~ ( i + 1 ) ~ " of " ~ num_periods ~ "; " ~ rows_inserted ~ " records inserted" -%} {{ dbt_utils.log_info(msg) }} {%- endfor %} From 760a33dfa4568cc49b32a9ee6d06f6a9b3821aae Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Thu, 26 Aug 2021 10:51:33 +0200 Subject: [PATCH 11/30] Use incremental_upsert() in the for loop --- .../insert_by_period_materialization.sql | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 83051ad5..ec287688 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -56,7 +56,7 @@ {{ dbt_utils.log_info("We are in the existing_relation is none and creating an empty table.") }} {{ dbt_utils.log_info("Calling the empty table creation statement.") }} - + {% call statement("main") %} {{ build_sql }} {% endcall %} @@ -113,30 +113,26 @@ {{ dbt_utils.log_info("We are now calling the " ~ (i + 1) ~ ". iteration statement.") }} - {% call statement() -%} - {% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv, + {% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv, sql, timestamp_field, period, start_timestamp, stop_timestamp, i) %} - {{dbt.create_table_as(True, tmp_relation, tmp_table_sql)}} - {%- endcall %} - {{adapter.expand_target_column_types(from_relation=tmp_relation, - to_relation=target_relation)}} + {% do run_query(create_table_as(True, tmp_relation, tmp_table_sql)) %} + + {% do adapter.expand_target_column_types( + from_relation=tmp_relation, + to_relation=target_relation) %} {{ dbt_utils.log_info("We are now inserting the result of the " ~ (i + 1) ~ ". iteration.") }} {%- set name = 'main-' ~ i -%} + {% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} {% call statement(name, fetch_result=True) -%} - insert into {{target_relation}} - ( - select - * - from {{tmp_relation.include(schema=False)}} - ); + {{ build_sql }} {%- endcall %} {% set result = load_result('main-' ~ i) %} From ec73c80c951976fbe3631bf7882eb2bde960980a Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Thu, 26 Aug 2021 12:08:25 +0200 Subject: [PATCH 12/30] Introduce check_for_period_filter macro --- macros/materializations/helpers.sql | 9 +++++++++ .../insert_by_period_materialization.sql | 7 +------ 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/macros/materializations/helpers.sql b/macros/materializations/helpers.sql index 706a7198..f7bd3675 100644 --- a/macros/materializations/helpers.sql +++ b/macros/materializations/helpers.sql @@ -1,3 +1,12 @@ +{% macro check_for_period_filter(model_unique_id, sql) %} + {%- if sql.find('__PERIOD_FILTER__') == -1 -%} + {%- set error_message -%} + Model '{{ model_unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql + {%- endset -%} + {{ exceptions.raise_compiler_error(error_message) }} + {%- endif -%} +{% endmacro %} + {% macro get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} {{ return(adapter.dispatch('get_period_boundaries', 'dbt_utils')(target_schema, target_table, timestamp_field, start_date, stop_date, period)) }} {% endmacro %} diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index ec287688..c3f15028 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -1,12 +1,7 @@ {% materialization insert_by_period, default -%} -- If there is no __PERIOD_FILTER__ specified, raise error. (Maybe create a macro for this.) - {%- if sql.find('__PERIOD_FILTER__') == -1 -%} - {%- set error_message -%} - Model '{{ model.unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql - {%- endset -%} - {{ exceptions.raise_compiler_error(error_message) }} - {%- endif -%} + {{ dbt_utils.check_for_period_filter(model.unique_id, sql) }} -- Configuration (required) {%- set timestamp_field = config.require('timestamp_field') -%} From f876d1d6555c2cfb54e47a4058bb0fba82c4bcb1 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Thu, 26 Aug 2021 13:43:27 +0200 Subject: [PATCH 13/30] Dispatch check_for_period_filter() macro --- macros/materializations/helpers.sql | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/macros/materializations/helpers.sql b/macros/materializations/helpers.sql index f7bd3675..5142bfed 100644 --- a/macros/materializations/helpers.sql +++ b/macros/materializations/helpers.sql @@ -1,4 +1,8 @@ {% macro check_for_period_filter(model_unique_id, sql) %} + {{ return(adapter.dispatch('check_for_period_filter', 'dbt_utils')(model_unique_id, sql)) }} +{% endmacro %} + +{% macro default__check_for_period_filter(model_unique_id, sql) %} {%- if sql.find('__PERIOD_FILTER__') == -1 -%} {%- set error_message -%} Model '{{ model_unique_id }}' does not include the required string '__PERIOD_FILTER__' in its sql From a961622e164ccea77e13ea3aa91e7408c9e31e35 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Thu, 26 Aug 2021 16:47:00 +0200 Subject: [PATCH 14/30] Remove unnecessary code, improve logging --- .../insert_by_period_materialization.sql | 26 +++++++++---------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index c3f15028..231d0007 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -1,6 +1,6 @@ {% materialization insert_by_period, default -%} - -- If there is no __PERIOD_FILTER__ specified, raise error. (Maybe create a macro for this.) + -- If there is no __PERIOD_FILTER__ specified, raise error. {{ dbt_utils.check_for_period_filter(model.unique_id, sql) }} -- Configuration (required) @@ -70,19 +70,6 @@ {{ dbt_utils.log_info("We are in the elif trigger_full_refresh. So __PERIOD_FILTER__ is just set to true, thus it should have no effect.") }} {% endif %} - -- Let's worry about this later. (If the table exists and it's not a full refresh) - {% if existing_relation and not trigger_full_refresh %} - {% set unfiltered_sql = sql | replace("__PERIOD_FILTER__", 'false') %} -- Period filter should zero out everything. - {% do run_query(create_table_as(True, tmp_relation, unfiltered_sql)) %} - {% do adapter.expand_target_column_types( - from_relation=tmp_relation, - to_relation=target_relation) %} - {% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} - {% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} - - {{ dbt_utils.log_info("We are in our custom case.") }} - {% endif %} - -- Now that we have an empty table, let's put something in it. {% set _ = dbt_utils.get_period_boundaries(schema, @@ -98,6 +85,8 @@ {% set target_columns = adapter.get_columns_in_relation(target_relation) %} {%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%} + {%- set loop_vars = {'sum_rows_inserted': 0} -%} + {% for i in range(num_periods) -%} {%- set msg = "Running for " ~ period ~ " " ~ (i + 1) ~ " of " ~ num_periods -%} {{ dbt_utils.log_info(msg) }} @@ -142,6 +131,9 @@ {% set rows_inserted = result['status'].split(" ")[2] | int %} {% endif %} + {%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%} + {%- if loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} {% endif -%} + {%- set msg = "Ran for " ~ period ~ " " ~ ( i + 1 ) ~ " of " ~ num_periods ~ "; " ~ rows_inserted ~ " records inserted" -%} {{ dbt_utils.log_info(msg) }} {%- endfor %} @@ -168,6 +160,12 @@ {{ run_hooks(post_hooks, inside_transaction=False) }} + {%- set status_string = "INSERT " ~ loop_vars['sum_rows_inserted'] -%} + + {% call noop_statement('main', status_string) -%} + -- no-op + {%- endcall %} + {{ return({'relations': [target_relation]}) }} {%- endmaterialization %} From 1e52037d78d380c39c4f1d6f74c90ee8f9c8b313 Mon Sep 17 00:00:00 2001 From: Daniel Diamond <33811744+danieldiamond@users.noreply.github.com> Date: Tue, 31 Aug 2021 00:06:54 +1000 Subject: [PATCH 15/30] Update schema_tests links (#385) --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 8a805326..bb6a1f30 100644 --- a/README.md +++ b/README.md @@ -220,7 +220,7 @@ models: to: ref('other_model_name') ``` -#### unique_where ([source](macros/schema_tests/unique_where.sql)) +#### unique_where ([source](macros/schema_tests/test_unique_where.sql)) This test validates that there are no duplicate values present in a field for a subset of rows by specifying a `where` clause. **Usage:** @@ -236,7 +236,7 @@ models: where: "_deleted = false" ``` -#### not_null_where ([source](macros/schema_tests/not_null_where.sql)) +#### not_null_where ([source](macros/schema_tests/test_not_null_where.sql)) This test validates that there are no null values present in a column for a subset of rows by specifying a `where` clause. **Usage:** From 5abb16096ea862813be900f721bb887194200147 Mon Sep 17 00:00:00 2001 From: Reid Williams <87150621+reidwil-jb@users.noreply.github.com> Date: Mon, 30 Aug 2021 10:08:23 -0400 Subject: [PATCH 16/30] PR - lowercase `except` values in star() (#403) * PR - lowercase `except` values in star() resolves https://github.com/dbt-labs/dbt-utils/issues/402 * Update test_star.sql * Update CHANGELOG.md * Update test_star.sql Co-authored-by: jasnonaz --- CHANGELOG.md | 4 ++++ integration_tests/models/sql/test_star.sql | 5 +---- macros/sql/star.sql | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48df29bb..4a7ef4f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ - Declare compatibility with dbt v0.21.0, which has no breaking changes for this package ([#398](https://github.com/fishtown-analytics/dbt-utils/pull/398)) +## Features + +- Allow user to provide any case type when defining the `exclude` argument in `dbt_utils.star()` ([#403](https://github.com/dbt-labs/dbt-utils/pull/403)) + # dbt-utils v0.7.0 ## Breaking changes diff --git a/integration_tests/models/sql/test_star.sql b/integration_tests/models/sql/test_star.sql index 3c1af078..2092e16a 100644 --- a/integration_tests/models/sql/test_star.sql +++ b/integration_tests/models/sql/test_star.sql @@ -1,7 +1,4 @@ - --- TODO : Should the star macro use a case-insensitive comparison for the `except` field on Snowflake? - -{% set exclude_field = 'FIELD_3' if target.type == 'snowflake' else 'field_3' %} +{% set exclude_field = 'field_3' %} with data as ( diff --git a/macros/sql/star.sql b/macros/sql/star.sql index 8f82f3ae..b55054de 100644 --- a/macros/sql/star.sql +++ b/macros/sql/star.sql @@ -16,7 +16,7 @@ {%- for col in cols -%} - {%- if col.column not in except -%} + {%- if col.column|lower not in except|lower -%} {% do include_cols.append(col.column) %} {%- endif %} From ed6d55c67e56ee08a82de007d20d2014a6cc5dcf Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 31 Aug 2021 14:00:57 +0200 Subject: [PATCH 17/30] Handle full refresh --- .../insert_by_period_materialization.sql | 33 +++++-------------- 1 file changed, 9 insertions(+), 24 deletions(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 231d0007..8220ad46 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -45,29 +45,22 @@ {# -- first check whether we want to full refresh for source view or config reasons #} {% set trigger_full_refresh = (full_refresh_mode or existing_relation.is_view) %} - {% if existing_relation is none %} - {%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%} -- We create an empty table + {# -- we create an empty table if there is no exisiting relation or a full refresh is triggered #} + {% if existing_relation is none or trigger_full_refresh %} + {%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%} {% set build_sql = create_table_as(False, target_relation, empty_sql) %} - {{ dbt_utils.log_info("We are in the existing_relation is none and creating an empty table.") }} + {% if existing_relation is none %} + {{ dbt_utils.log_info("We are in the existing_relation is none and creating an empty table.") }} + {% elif trigger_full_refresh %} + {{ dbt_utils.log_info("We are in the trigger_full_refresh mode and creating an empty table.") }} + {% endif %} + {{ dbt_utils.log_info("Calling the empty table creation statement.") }} {% call statement("main") %} {{ build_sql }} {% endcall %} - {% elif trigger_full_refresh %} - {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} - {% set tmp_identifier = model['name'] + '__dbt_tmp' %} - {% set backup_identifier = model['name'] + '__dbt_backup' %} - {% set intermediate_relation = existing_relation.incorporate(path={"identifier": tmp_identifier}) %} - {% set backup_relation = existing_relation.incorporate(path={"identifier": backup_identifier}) %} - - {% set unfiltered_sql = sql | replace("__PERIOD_FILTER__", 'true') %} -- Period filter should have no effect - {% set build_sql = create_table_as(False, intermediate_relation, unfiltered_sql) %} - {% set need_swap = true %} - {% do to_drop.append(backup_relation) %} - - {{ dbt_utils.log_info("We are in the elif trigger_full_refresh. So __PERIOD_FILTER__ is just set to true, thus it should have no effect.") }} {% endif %} -- Now that we have an empty table, let's put something in it. @@ -121,9 +114,6 @@ {% set result = load_result('main-' ~ i) %} {% if 'response' in result.keys() %} {# added in v0.19.0 #} - -- for some reason, if the result has 0 rows, then this doesn't work. - -- but only if it's the last... after the custom case. - -- no error if the table is created in the same run {% set rows_inserted = result['response']['rows_affected'] %} {{ dbt_utils.log_info(result['response']) }} {{ dbt_utils.log_info(result) }} @@ -138,11 +128,6 @@ {{ dbt_utils.log_info(msg) }} {%- endfor %} - {% if need_swap %} - {% do adapter.rename_relation(target_relation, backup_relation) %} - {% do adapter.rename_relation(intermediate_relation, target_relation) %} - {% endif %} - {% do persist_docs(target_relation, model) %} {% if existing_relation is none or existing_relation.is_view or should_full_refresh() %} From e53a80124955420aa18418ae4654ec497c2f9892 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 31 Aug 2021 14:06:50 +0200 Subject: [PATCH 18/30] Remove verbose CLI logging --- .../insert_by_period_materialization.sql | 16 +--------------- 1 file changed, 1 insertion(+), 15 deletions(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 8220ad46..ba7e88a7 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -50,14 +50,6 @@ {%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%} {% set build_sql = create_table_as(False, target_relation, empty_sql) %} - {% if existing_relation is none %} - {{ dbt_utils.log_info("We are in the existing_relation is none and creating an empty table.") }} - {% elif trigger_full_refresh %} - {{ dbt_utils.log_info("We are in the trigger_full_refresh mode and creating an empty table.") }} - {% endif %} - - {{ dbt_utils.log_info("Calling the empty table creation statement.") }} - {% call statement("main") %} {{ build_sql }} {% endcall %} @@ -88,8 +80,6 @@ {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, schema=schema, type='table') -%} - {{ dbt_utils.log_info("We are now calling the " ~ (i + 1) ~ ". iteration statement.") }} - {% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv, sql, timestamp_field, @@ -104,8 +94,6 @@ from_relation=tmp_relation, to_relation=target_relation) %} - {{ dbt_utils.log_info("We are now inserting the result of the " ~ (i + 1) ~ ". iteration.") }} - {%- set name = 'main-' ~ i -%} {% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} {% call statement(name, fetch_result=True) -%} @@ -115,8 +103,6 @@ {% set result = load_result('main-' ~ i) %} {% if 'response' in result.keys() %} {# added in v0.19.0 #} {% set rows_inserted = result['response']['rows_affected'] %} - {{ dbt_utils.log_info(result['response']) }} - {{ dbt_utils.log_info(result) }} {% else %} {# older versions #} {% set rows_inserted = result['status'].split(" ")[2] | int %} {% endif %} @@ -124,7 +110,7 @@ {%- set sum_rows_inserted = loop_vars['sum_rows_inserted'] + rows_inserted -%} {%- if loop_vars.update({'sum_rows_inserted': sum_rows_inserted}) %} {% endif -%} - {%- set msg = "Ran for " ~ period ~ " " ~ ( i + 1 ) ~ " of " ~ num_periods ~ "; " ~ rows_inserted ~ " records inserted" -%} + {%- set msg = "Ran completed for " ~ period ~ " " ~ ( i + 1 ) ~ " of " ~ num_periods ~ "; " ~ rows_inserted ~ " records inserted" -%} {{ dbt_utils.log_info(msg) }} {%- endfor %} From b360a91b9a4bbde61798d8b0c426f284c7641c60 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 31 Aug 2021 14:24:56 +0200 Subject: [PATCH 19/30] Update CHANGELOG.md --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 48df29bb..77bad3a6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,8 @@ - Declare compatibility with dbt v0.21.0, which has no breaking changes for this package ([#398](https://github.com/fishtown-analytics/dbt-utils/pull/398)) +## Features +* Modernize the `insert_by_period` materialization and make it Snowflake-compatible. # dbt-utils v0.7.0 ## Breaking changes From 53017a4696e7c3288ba7b21191043eb404854220 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 31 Aug 2021 14:32:43 +0200 Subject: [PATCH 20/30] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 8a805326..0db5fd5b 100644 --- a/README.md +++ b/README.md @@ -1079,8 +1079,8 @@ with events as ( * `stop_date`: literal date or timestamp (default=current_timestamp) **Caveats:** -* This materialization is compatible with dbt 0.10.1. -* This materialization has been written for Redshift. +* This materialization is compatible with dbt 0.21.0. +* This materialization has been written primarily for Snowflake. * This materialization can only be used for a model where records are not expected to change after they are created. * Any model post-hooks that use `{{ this }}` will fail using this materialization. For example: ```yaml From 888d781ddc8c20d5f3944f395e642c561109e061 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 31 Aug 2021 14:43:59 +0200 Subject: [PATCH 21/30] Alias table expression in a helper function --- macros/materializations/helpers.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/macros/materializations/helpers.sql b/macros/materializations/helpers.sql index 5142bfed..c5a9f334 100644 --- a/macros/materializations/helpers.sql +++ b/macros/materializations/helpers.sql @@ -88,7 +88,7 @@ {{target_cols_csv}} from ( {{filtered_sql}} - ) + ) as t -- has to have an alias {%- endmacro %} From 6fe4adbc469d42f4a11c617c83fd46a81439a574 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 31 Aug 2021 15:42:25 +0200 Subject: [PATCH 22/30] Update integration tests --- .../models/materializations/expected_insert_by_period.sql | 2 +- .../models/materializations/test_insert_by_period.sql | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration_tests/models/materializations/expected_insert_by_period.sql b/integration_tests/models/materializations/expected_insert_by_period.sql index 0a198b6b..98a8b158 100644 --- a/integration_tests/models/materializations/expected_insert_by_period.sql +++ b/integration_tests/models/materializations/expected_insert_by_period.sql @@ -1,7 +1,7 @@ {{ config( materialized = 'view', - enabled=(target.type == 'redshift') + enabled=(target.type == 'redshift' or target.type == 'snowflake') ) }} diff --git a/integration_tests/models/materializations/test_insert_by_period.sql b/integration_tests/models/materializations/test_insert_by_period.sql index 8cc396d3..33afe52d 100644 --- a/integration_tests/models/materializations/test_insert_by_period.sql +++ b/integration_tests/models/materializations/test_insert_by_period.sql @@ -5,7 +5,7 @@ timestamp_field = 'created_at', start_date = '2018-01-01', stop_date = '2018-06-01', - enabled=(target.type == 'redshift') + enabled=(target.type == 'redshift' or target.type == 'snowflake') ) }} From af2dd572cd4de375f3f2f181467e430d6776aabd Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 31 Aug 2021 16:04:37 +0200 Subject: [PATCH 23/30] Set default period to week --- macros/materializations/insert_by_period_materialization.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index ba7e88a7..d5f53f8a 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -10,7 +10,7 @@ -- Configuration (others) {%- set unique_key = config.get('unique_key') -%} {%- set stop_date = config.get('stop_date') or '' -%}} - {%- set period = config.get('period') or 'hour' -%} + {%- set period = config.get('period') or 'week' -%} {% set target_relation = this.incorporate(type='table') %} {% set existing_relation = load_relation(this) %} From cdba4b54398c6dfa5a3138e8e10e1be51a5a4fde Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Tue, 31 Aug 2021 17:06:19 +0200 Subject: [PATCH 24/30] Use make_temp_relation() in for loop --- macros/materializations/insert_by_period_materialization.sql | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index d5f53f8a..7c1fc092 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -76,9 +76,8 @@ {%- set msg = "Running for " ~ period ~ " " ~ (i + 1) ~ " of " ~ num_periods -%} {{ dbt_utils.log_info(msg) }} - {%- set tmp_identifier = model['name'] ~ '__dbt_incremental_period_' ~ i ~ '_tmp' -%} - {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier, - schema=schema, type='table') -%} + {%- set tmp_identifier_suffix = '__dbt_incremental_period_' ~ i ~ '_tmp' -%} + {% set tmp_relation = make_temp_relation(target_relation, tmp_identifier_suffix) %} {% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv, sql, From 7c81020158455a613ef10975157c1f073116dd3a Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Wed, 1 Sep 2021 15:10:18 +0200 Subject: [PATCH 25/30] Update README.md --- README.md | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/README.md b/README.md index 0db5fd5b..3a213dac 100644 --- a/README.md +++ b/README.md @@ -1082,16 +1082,6 @@ with events as ( * This materialization is compatible with dbt 0.21.0. * This materialization has been written primarily for Snowflake. * This materialization can only be used for a model where records are not expected to change after they are created. -* Any model post-hooks that use `{{ this }}` will fail using this materialization. For example: -```yaml -models: - project-name: - post-hook: "grant select on {{ this }} to db_reader" -``` -A useful workaround is to change the above post-hook to: -```yaml - post-hook: "grant select on {{ this.schema }}.{{ this.name }} to db_reader" -``` ---- From f82f2ee96fa79acd718129a72dd8143ea9096669 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Wed, 1 Sep 2021 16:51:52 +0200 Subject: [PATCH 26/30] Run test_insert_by_period for all targets --- .../models/materializations/expected_insert_by_period.sql | 3 +-- .../models/materializations/test_insert_by_period.sql | 3 +-- run_test.sh | 4 +--- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/integration_tests/models/materializations/expected_insert_by_period.sql b/integration_tests/models/materializations/expected_insert_by_period.sql index 98a8b158..4cd99e21 100644 --- a/integration_tests/models/materializations/expected_insert_by_period.sql +++ b/integration_tests/models/materializations/expected_insert_by_period.sql @@ -1,7 +1,6 @@ {{ config( - materialized = 'view', - enabled=(target.type == 'redshift' or target.type == 'snowflake') + materialized = 'view' ) }} diff --git a/integration_tests/models/materializations/test_insert_by_period.sql b/integration_tests/models/materializations/test_insert_by_period.sql index 33afe52d..589bd776 100644 --- a/integration_tests/models/materializations/test_insert_by_period.sql +++ b/integration_tests/models/materializations/test_insert_by_period.sql @@ -4,8 +4,7 @@ period = 'month', timestamp_field = 'created_at', start_date = '2018-01-01', - stop_date = '2018-06-01', - enabled=(target.type == 'redshift' or target.type == 'snowflake') + stop_date = '2018-06-01' ) }} diff --git a/run_test.sh b/run_test.sh index 6f630e94..6897c5f7 100755 --- a/run_test.sh +++ b/run_test.sh @@ -24,8 +24,6 @@ if [[ ! -z $3 ]]; then _seeds="--select $3 --full-refresh"; fi dbt deps --target $1 dbt seed --target $1 $_seeds -if [ $1 == 'redshift' ]; then - dbt run -x -m test_insert_by_period --full-refresh --target redshift -fi +dbt run -x -m test_insert_by_period --full-refresh --target $1 dbt run -x --target $1 $_models dbt test -x --target $1 $_models From e1cdcee5974b811ea0e7f4b70a1e2fc005f0a7af Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Fri, 3 Sep 2021 18:04:34 +0200 Subject: [PATCH 27/30] Add aggressive CLI logging for debug purposes --- .../insert_by_period_materialization.sql | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 7c1fc092..38282373 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -12,7 +12,13 @@ {%- set stop_date = config.get('stop_date') or '' -%}} {%- set period = config.get('period') or 'week' -%} + {{ dbt_utils.log_info("[DEBUG] Parameters are set!") }} + {% set target_relation = this.incorporate(type='table') %} + + {%- set target_msg = "[DEBUG] Target is set to be " ~ target_relation -%} + {{ dbt_utils.log_info(target_msg) }} + {% set existing_relation = load_relation(this) %} {% set tmp_relation = make_temp_relation(target_relation) %} {%- set full_refresh_mode = (should_full_refresh()) -%} @@ -37,9 +43,13 @@ {{ run_hooks(pre_hooks, inside_transaction=False) }} + {{ dbt_utils.log_info("[DEBUG] Prehooks (outside transaction) are done!") }} + -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} + {{ dbt_utils.log_info("[DEBUG] Prehooks (inside transaction) are done!") }} + {% set to_drop = [] %} {# -- first check whether we want to full refresh for source view or config reasons #} @@ -50,11 +60,20 @@ {%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%} {% set build_sql = create_table_as(False, target_relation, empty_sql) %} + {{ dbt_utils.log_info("[DEBUG] Within 'if existing_relation is none or trigger_full_refresh'!") }} + {%- set existing_relation_msg = "[DEBUG] Existing relation is: " ~ existing_relation -%} + {%- set trigger_full_refresh_msg = "[DEBUG] Trigger full refresh flag is: " ~ trigger_full_refresh -%} + {{ dbt_utils.log_info(existing_relation_msg) }} + {{ dbt_utils.log_info(trigger_full_refresh_msg) }} + {% call statement("main") %} + {{ dbt_utils.log_info("[DEBUG] Executing the following:") }} + {{ dbt_utils.log_info(build_sql) }} {{ build_sql }} {% endcall %} {% endif %} + {{ dbt_utils.log_info("[DEBUG] We have an empty table now.") }} -- Now that we have an empty table, let's put something in it. {% set _ = dbt_utils.get_period_boundaries(schema, @@ -67,18 +86,32 @@ {%- set stop_timestamp = load_result('period_boundaries')['data'][0][1] | string -%} {%- set num_periods = load_result('period_boundaries')['data'][0][2] | int -%} + + {%- set start_timestamp_msg = "[DEBUG] Start timestamp is: " ~ start_timestamp -%} + {%- set stop_timestamp_msg = "[DEBUG] End timestamp is: " ~ stop_timestamp -%} + {%- set num_periods_msg = "[DEBUG] Number of periods: " ~ num_periods ~ " " ~ period ~ "(s)" -%} + {{ dbt_utils.log_info(start_timestamp_msg) }} + {{ dbt_utils.log_info(stop_timestamp_msg) }} + {{ dbt_utils.log_info(num_periods_msg) }} + {% set target_columns = adapter.get_columns_in_relation(target_relation) %} {%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%} {%- set loop_vars = {'sum_rows_inserted': 0} -%} {% for i in range(num_periods) -%} + {%- set loop_enter_msg = "[DEBUG] Entering the " ~ (i + 1) ~ ". iteration of the loop." -%} + {{ dbt_utils.log_info(loop_enter_msg) }} + {%- set msg = "Running for " ~ period ~ " " ~ (i + 1) ~ " of " ~ num_periods -%} {{ dbt_utils.log_info(msg) }} {%- set tmp_identifier_suffix = '__dbt_incremental_period_' ~ i ~ '_tmp' -%} {% set tmp_relation = make_temp_relation(target_relation, tmp_identifier_suffix) %} + {%- set tmp_relation_msg = "[DEBUG] (Within loop) Termporary relation is: " ~ tmp_relation -%} + {{ dbt_utils.log_info(tmp_relation_msg) }} + {% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv, sql, timestamp_field, @@ -96,6 +129,8 @@ {%- set name = 'main-' ~ i -%} {% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} {% call statement(name, fetch_result=True) -%} + {{ dbt_utils.log_info("[DEBUG] (Within loop) Executing the following:") }} + {{ dbt_utils.log_info(build_sql) }} {{ build_sql }} {%- endcall %} @@ -113,6 +148,8 @@ {{ dbt_utils.log_info(msg) }} {%- endfor %} + {{ dbt_utils.log_info("[DEBUG] Out of the loop.") }} + {% do persist_docs(target_relation, model) %} {% if existing_relation is none or existing_relation.is_view or should_full_refresh() %} From 713f8a65f527c39348522023e1a96df9e56b2c35 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Fri, 3 Sep 2021 18:30:57 +0200 Subject: [PATCH 28/30] Add logging to get_period_boundaries() --- macros/materializations/helpers.sql | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/macros/materializations/helpers.sql b/macros/materializations/helpers.sql index c5a9f334..1f1dc9c0 100644 --- a/macros/materializations/helpers.sql +++ b/macros/materializations/helpers.sql @@ -18,6 +18,11 @@ {% macro default__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} {% call statement('period_boundaries', fetch_result=True) -%} + {%- set target_schema_msg = " [DEBUG] (get_period_boundaries) target schema is: " ~ target_schema -%} + {%- set target_table_msg = " [DEBUG] (get_period_boundaries) target table is: " ~ target_table -%} + {{ dbt_utils.log_info(target_schema_msg) }} + {{ dbt_utils.log_info(target_table_msg) }} + with data as ( select coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp, From 1ed1e58da08d960c955496a8820a632a0df9c2a0 Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Fri, 3 Sep 2021 18:36:25 +0200 Subject: [PATCH 29/30] Extend helper function logging --- macros/materializations/helpers.sql | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/macros/materializations/helpers.sql b/macros/materializations/helpers.sql index 1f1dc9c0..76ebb249 100644 --- a/macros/materializations/helpers.sql +++ b/macros/materializations/helpers.sql @@ -18,10 +18,15 @@ {% macro default__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} {% call statement('period_boundaries', fetch_result=True) -%} + {{ dbt_utils.log_info(model) }} + {%- set model_name = model['name'] -%} + {%- set target_schema_msg = " [DEBUG] (get_period_boundaries) target schema is: " ~ target_schema -%} {%- set target_table_msg = " [DEBUG] (get_period_boundaries) target table is: " ~ target_table -%} + {%- set model_name_msg = " [DEBUG] (get_period_boundaries) model[name] is: " ~ model_name -%} {{ dbt_utils.log_info(target_schema_msg) }} {{ dbt_utils.log_info(target_table_msg) }} + {{ dbt_utils.log_info(model_name_msg) }} with data as ( select @@ -32,7 +37,7 @@ "nullif('" ~ stop_date ~ "','')::timestamp")}}, {{dbt_utils.current_timestamp()}} ) as stop_timestamp - from "{{target_schema}}"."{{target_table}}" + from "{{target_schema}}"."{{model_name}}" ) select From 9c680f91ad4978416481030fa1e7f1de7df1b5da Mon Sep 17 00:00:00 2001 From: Daniel Horvath Date: Fri, 3 Sep 2021 18:58:04 +0200 Subject: [PATCH 30/30] Remove debug logging --- macros/materializations/helpers.sql | 8 ----- .../insert_by_period_materialization.sql | 36 ------------------- 2 files changed, 44 deletions(-) diff --git a/macros/materializations/helpers.sql b/macros/materializations/helpers.sql index 76ebb249..3aa338c2 100644 --- a/macros/materializations/helpers.sql +++ b/macros/materializations/helpers.sql @@ -18,16 +18,8 @@ {% macro default__get_period_boundaries(target_schema, target_table, timestamp_field, start_date, stop_date, period) -%} {% call statement('period_boundaries', fetch_result=True) -%} - {{ dbt_utils.log_info(model) }} {%- set model_name = model['name'] -%} - {%- set target_schema_msg = " [DEBUG] (get_period_boundaries) target schema is: " ~ target_schema -%} - {%- set target_table_msg = " [DEBUG] (get_period_boundaries) target table is: " ~ target_table -%} - {%- set model_name_msg = " [DEBUG] (get_period_boundaries) model[name] is: " ~ model_name -%} - {{ dbt_utils.log_info(target_schema_msg) }} - {{ dbt_utils.log_info(target_table_msg) }} - {{ dbt_utils.log_info(model_name_msg) }} - with data as ( select coalesce(max("{{timestamp_field}}"), '{{start_date}}')::timestamp as start_timestamp, diff --git a/macros/materializations/insert_by_period_materialization.sql b/macros/materializations/insert_by_period_materialization.sql index 38282373..181666d5 100644 --- a/macros/materializations/insert_by_period_materialization.sql +++ b/macros/materializations/insert_by_period_materialization.sql @@ -12,13 +12,8 @@ {%- set stop_date = config.get('stop_date') or '' -%}} {%- set period = config.get('period') or 'week' -%} - {{ dbt_utils.log_info("[DEBUG] Parameters are set!") }} - {% set target_relation = this.incorporate(type='table') %} - {%- set target_msg = "[DEBUG] Target is set to be " ~ target_relation -%} - {{ dbt_utils.log_info(target_msg) }} - {% set existing_relation = load_relation(this) %} {% set tmp_relation = make_temp_relation(target_relation) %} {%- set full_refresh_mode = (should_full_refresh()) -%} @@ -43,13 +38,9 @@ {{ run_hooks(pre_hooks, inside_transaction=False) }} - {{ dbt_utils.log_info("[DEBUG] Prehooks (outside transaction) are done!") }} - -- `BEGIN` happens here: {{ run_hooks(pre_hooks, inside_transaction=True) }} - {{ dbt_utils.log_info("[DEBUG] Prehooks (inside transaction) are done!") }} - {% set to_drop = [] %} {# -- first check whether we want to full refresh for source view or config reasons #} @@ -60,20 +51,11 @@ {%- set empty_sql = sql | replace("__PERIOD_FILTER__", 'false') -%} {% set build_sql = create_table_as(False, target_relation, empty_sql) %} - {{ dbt_utils.log_info("[DEBUG] Within 'if existing_relation is none or trigger_full_refresh'!") }} - {%- set existing_relation_msg = "[DEBUG] Existing relation is: " ~ existing_relation -%} - {%- set trigger_full_refresh_msg = "[DEBUG] Trigger full refresh flag is: " ~ trigger_full_refresh -%} - {{ dbt_utils.log_info(existing_relation_msg) }} - {{ dbt_utils.log_info(trigger_full_refresh_msg) }} - {% call statement("main") %} - {{ dbt_utils.log_info("[DEBUG] Executing the following:") }} - {{ dbt_utils.log_info(build_sql) }} {{ build_sql }} {% endcall %} {% endif %} - {{ dbt_utils.log_info("[DEBUG] We have an empty table now.") }} -- Now that we have an empty table, let's put something in it. {% set _ = dbt_utils.get_period_boundaries(schema, @@ -86,32 +68,18 @@ {%- set stop_timestamp = load_result('period_boundaries')['data'][0][1] | string -%} {%- set num_periods = load_result('period_boundaries')['data'][0][2] | int -%} - - {%- set start_timestamp_msg = "[DEBUG] Start timestamp is: " ~ start_timestamp -%} - {%- set stop_timestamp_msg = "[DEBUG] End timestamp is: " ~ stop_timestamp -%} - {%- set num_periods_msg = "[DEBUG] Number of periods: " ~ num_periods ~ " " ~ period ~ "(s)" -%} - {{ dbt_utils.log_info(start_timestamp_msg) }} - {{ dbt_utils.log_info(stop_timestamp_msg) }} - {{ dbt_utils.log_info(num_periods_msg) }} - {% set target_columns = adapter.get_columns_in_relation(target_relation) %} {%- set target_cols_csv = target_columns | map(attribute='quoted') | join(', ') -%} {%- set loop_vars = {'sum_rows_inserted': 0} -%} {% for i in range(num_periods) -%} - {%- set loop_enter_msg = "[DEBUG] Entering the " ~ (i + 1) ~ ". iteration of the loop." -%} - {{ dbt_utils.log_info(loop_enter_msg) }} - {%- set msg = "Running for " ~ period ~ " " ~ (i + 1) ~ " of " ~ num_periods -%} {{ dbt_utils.log_info(msg) }} {%- set tmp_identifier_suffix = '__dbt_incremental_period_' ~ i ~ '_tmp' -%} {% set tmp_relation = make_temp_relation(target_relation, tmp_identifier_suffix) %} - {%- set tmp_relation_msg = "[DEBUG] (Within loop) Termporary relation is: " ~ tmp_relation -%} - {{ dbt_utils.log_info(tmp_relation_msg) }} - {% set tmp_table_sql = dbt_utils.get_period_sql(target_cols_csv, sql, timestamp_field, @@ -129,8 +97,6 @@ {%- set name = 'main-' ~ i -%} {% set build_sql = incremental_upsert(tmp_relation, target_relation, unique_key=unique_key) %} {% call statement(name, fetch_result=True) -%} - {{ dbt_utils.log_info("[DEBUG] (Within loop) Executing the following:") }} - {{ dbt_utils.log_info(build_sql) }} {{ build_sql }} {%- endcall %} @@ -148,8 +114,6 @@ {{ dbt_utils.log_info(msg) }} {%- endfor %} - {{ dbt_utils.log_info("[DEBUG] Out of the loop.") }} - {% do persist_docs(target_relation, model) %} {% if existing_relation is none or existing_relation.is_view or should_full_refresh() %}