Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subvert insert_overwrite merge strategy to bring back merge #371

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,7 @@
{%- if copy_partitions is true %}
{{ bq_dynamic_copy_partitions_insert_overwrite_sql(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns, tmp_relation_exists, copy_partitions) }}
{% else -%}
{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}
{%- set is_exact_match_strategy = config.get('match_strategy', 'exact') == 'exact' -%}

{%- set source_sql -%}
(
Expand All @@ -119,7 +117,12 @@
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;
{% if is_exact_match_strategy %}
declare dbt_partitions_for_replacement array<{{ partition_by.data_type }}>;
{% else %}
declare _dbt_min_partition_value {{ partition_by.data_type }};
declare _dbt_max_partition_value {{ partition_by.data_type }};
{% endif %}

{# have we already created the temp table to check for schema changes? #}
{% if not tmp_relation_exists %}
Expand All @@ -131,15 +134,35 @@
-- 1. temp table already exists, we used it to check for schema changes
{% endif %}

-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct {{ partition_by.render_wrapped() }})
from {{ tmp_relation }}
);

-- 3. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
{% if is_exact_match_strategy %}
-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct {{ partition_by.render_wrapped() }})
from {{ tmp_relation }}
);

-- 3. run the merge statement
{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
{% else %}
-- 2. define partition ranges to update
set (_dbt_min_partition_value, _dbt_max_partition_value) = (
select as struct
min({{ partition_by.render_wrapped() }}),
max({{ partition_by.render_wrapped() }})
from {{ tmp_relation }}
);

-- 3. run the merge statement
{% set predicate -%}
{{ partition_by.render_wrapped(alias='DBT_INTERNAL_DEST') }}
BETWEEN _dbt_min_partition_value AND _dbt_max_partition_value
{%- endset %}
{{ get_merge_sql(target_relation, source_sql, unique_key, dest_columns, [predicate]) }};
{% endif %}

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}
Expand Down