Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental Load Predicates To Bound unique_id scans #3293

Closed
dm03514 opened this issue Apr 25, 2021 · 8 comments
Closed

Incremental Load Predicates To Bound unique_id scans #3293

dm03514 opened this issue Apr 25, 2021 · 8 comments
Labels
enhancement New feature or request good_first_issue Straightforward + self-contained changes, good for new contributors! incremental Incremental modeling with dbt

Comments

@dm03514
Copy link

dm03514 commented Apr 25, 2021

Describe the feature

Hello! I work with a number of very large tables 8-60TB and growing daily. Data is loaded incrementally.

Screen Shot 2021-04-24 at 3 25 15 PM

I often use the delete+insert incremental load strategy to ensure that the target table is duplicate free. Scan time on these large tables are often multi hour.

Below shows an incremental query using delete+insert executed against this large table in snowflake:

full_scan_query

Below shows the detailed profile:

scan_partitionss

it takes a lot of resources to perform full table scans on mult-terrabyte tables.

Is it possible to add support for predicates on the incremental load sql?

I created a POC Pull Request to illustrate this in action. The incremental_predicates are defined as part of the config:

{{
    config(
      materialized='incremental',
      incremental_strategy='delete+insert',
      incremental_predicates=[
        "collector_hour >= dateadd('day', -7, CONVERT_TIMEZONE('UTC', current_timestamp()))"
      ],
      unique_key='unique_id'
    )
}}

THe image below shows the predicates applied to the incremental load:

predicate_query

The effect of bounding the incremental unique window are profound:

predicate_profile

Of course, not every workload supports a bounded unique window, but we found it applicable for our use case.

Describe alternatives you've considered

I could think of a couple alternatives for this (none are dbt based):

  • Scale out database compute to handle full table scans in reasonable time periods
  • Upstream dedupers to guaranteee uniqueness before data is touched with dbt

Additional context

I believe all databases could benefit from optional support of incremental predicates.

Who will this benefit?

This should benefit any dbt users who have:

  • Multi-TB database deployments
  • Unique incremental delete+insert strategy
  • Queue based data source with limited retention (i.e. 7 days) - Since the datasources for these table are queue based we're guaranteed not to see duplicates outside of a fixed window.

Are you interested in contributing this feature?

Yes! I would be happy to!

@dm03514 dm03514 added enhancement New feature or request triage labels Apr 25, 2021
@jtcohen6
Copy link
Contributor

@dm03514 I'm a huge fan of this, and of the in-progress work over in #3294!

It's always been possible to make the interventions you suggest, by overriding macros + materializations user-space. We're always trying to find the right balance between:

  • Incremental models should be simple enough to use right from the get-go
  • Incremental models should be powerful enough to extend to massive datasets, and adapt to common needs

Once we see the same extensions/adaptations by community members over and over again, it's a clearer indication that building a new config into the default materialization is worth the trade-off in added complexity—and we've seen this one come up often. I think we can do a lot just by giving users the option to pass an arbitrary list of predicates, and stick them in the right spot within the DML statement.

My only requirement for this work would be consistency:

  1. The incremental materialization (default + all adapters) should always check the config for an incremental_predicates array.
  2. All strategy-related macros (get_x_sql) should accept an incremental_predicates argument, and template it to the appropriate spot in the DML statement. (The get_merge_sql macro already does, though by the name predicates.)

There's an even-more-generalized version of this that I previously thought about as pluggable incremental strategies (#2366). But I think a new config, given the existing strategies, makes a lot of sense.

Let me know if you agree with the above! If so, I think #3294 could be merge-able with just a little bit of work to make it consistent across adapters + strategies.

@jtcohen6 jtcohen6 removed the triage label Apr 26, 2021
@dm03514
Copy link
Author

dm03514 commented Apr 26, 2021

Thank you @jtcohen6 for responding. AHH i didn't realize that this was already achievable through overriding of macros 😅 .

The consistency requirements are clear. I will get a draft PR up which consistently adds incremental_predicates to default and all adapters, as well as all strategies.

I need to check out #2366!

I plan on getting a draft PR up for all adapters & strategies by end of next weekend (and hopefully sometime sooner).

Thank you for your feedback @jtcohen6

@dm03514
Copy link
Author

dm03514 commented May 3, 2021

I moved forward with the user defined strat while this submission is still in progress:

{% macro custom_validate_get_incremental_deleteinsert_strategy(config) %}
  {#-- Find and validate the incremental strategy #}
  {%- set strategy = config.get("incremental_strategy", default="delete+insert") -%}

  {% set invalid_strategy_msg -%}
    Invalid incremental strategy provided: {{ strategy }}
  {%- endset %}
  {% if strategy not in ['delete+insert'] %}
    {% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
  {% endif %}

  {% do return(strategy) %}
{% endmacro %}


{% macro get_bounded_delete_insert_sql(target, source, unique_key, dest_columns, incremental_predicates=None) -%}

    {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
    {%- set incremental_predicates = [] if incremental_predicates is none else [] + incremental_predicates -%}

    {% if unique_key is not none %}
    delete from {{ target }}
    where ({{ unique_key }}) in (
        select ({{ unique_key }})
        from {{ source }}
    )
    {% if incremental_predicates %} and {{ incremental_predicates | join(' and ') }} {% endif %};
    {% endif %}

    insert into {{ target }} ({{ dest_cols_csv }})
    (
        select {{ dest_cols_csv }}
        from {{ source }}
    );

{%- endmacro %}


{% materialization bounded_delete_insert, adapter='snowflake' %}
  {% set original_query_tag = set_query_tag() %}

  {%- set unique_key = config.get('unique_key') -%}
  {%- set full_refresh_mode = (should_full_refresh()) -%}
  {%- set incremental_predicates = config.get('incremental_predicates', default=None) -%}

  {% set target_relation = this %}
  {% set existing_relation = load_relation(this) %}
  {% set tmp_relation = make_temp_relation(this) %}

  {#-- Validate early so we don't run SQL if the strategy is invalid --#}
  {% set strategy = custom_validate_get_incremental_deleteinsert_strategy(config) -%}

  -- setup
  {{ run_hooks(pre_hooks, inside_transaction=False) }}

  -- `BEGIN` happens here:
  {{ run_hooks(pre_hooks, inside_transaction=True) }}

  {% if existing_relation is none %}
    {% set build_sql = create_table_as(False, target_relation, sql) %}
  {% elif existing_relation.is_view %}
    {#-- Can't overwrite a view with a table - we must drop --#}
    {{ log("Dropping relation " ~ target_relation ~ " because it is a view and this model is a table.") }}
    {% do adapter.drop_relation(existing_relation) %}
    {% set build_sql = create_table_as(False, target_relation, sql) %}
  {% elif full_refresh_mode %}
    {% set build_sql = create_table_as(False, target_relation, sql) %}
  {% else %}
    {% do run_query(create_table_as(True, tmp_relation, sql)) %}
    {% do adapter.expand_target_column_types(
           from_relation=tmp_relation,
           to_relation=target_relation) %}
    {% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
    {% set build_sql = get_bounded_delete_insert_sql(
      target_relation,
      tmp_relation,
      unique_key,
      dest_columns,
      incremental_predicates) %}
  {% endif %}

  {%- call statement('main') -%}
    {{ build_sql }}
  {%- endcall -%}

  {{ run_hooks(post_hooks, inside_transaction=True) }}

  -- `COMMIT` happens here
  {{ adapter.commit() }}

  {{ run_hooks(post_hooks, inside_transaction=False) }}

  {% set target_relation = target_relation.incorporate(type='table') %}
  {% do persist_docs(target_relation, model) %}

  {% do unset_query_tag(original_query_tag) %}

  {{ return({'relations': [target_relation]}) }}
{% endmaterialization %} 
{% macro is_custom_incremental() %}
    {#-- do not run introspective queries in parsing #}
    {% if not execute %}
        {{ return(False) }}
    {% else %}
        {% set relation = adapter.get_relation(this.database, this.schema, this.table) %}
        {{ return(relation is not none
                  and relation.type == 'table'
                  and model.config.materialized in (
                    'incremental',
                    'bounded_delete_insert',
                  )
                  and not should_full_refresh()) }}
    {% endif %}
{% endmacro %} 

Usage looks like:

{{
    config(
      materialized='bounded_delete_insert',
      incremental_strategy='delete+insert',
      unique_key='unique_id',
      incremental_predicates=[
        "collector_hour >= dateadd('day', -10, CONVERT_TIMEZONE('UTC', current_timestamp()))"
      ],
    )
}}

@dm03514
Copy link
Author

dm03514 commented Jun 2, 2021

WE've completely settled on the merge approach with predicates. We created a light macro to expose the predicates through the config for our models. The merge with predicates gracefully handles backfills (something that the bounded delete does not do :/). I will create a PR for exposing the merge predicates to config for your feedback. Thank you ! 🚀

@jtcohen6 jtcohen6 added this to the Oh-Twenty-One milestone Jun 15, 2021
@jtcohen6 jtcohen6 added the good_first_issue Straightforward + self-contained changes, good for new contributors! label Jul 12, 2021
@jtcohen6
Copy link
Contributor

@dm03514 Is this still something you'd be interested in contributing? It felt like we were getting close with #3294!

@jtcohen6 jtcohen6 removed this from the Oh-Twenty-One milestone Jul 30, 2021
@jtcohen6 jtcohen6 added the incremental Incremental modeling with dbt label Oct 28, 2021
@rumbin
Copy link

rumbin commented Dec 22, 2021

Would be great if we could have this feature.

See also discussion on discourse:
https://discourse.getdbt.com/t/incremental-model-use-db-cluster-column/1946

@oolongtea
Copy link

oolongtea commented May 4, 2022

not urgent but I'm definitely still interested in this, one pattern I'd like to use with a custom incremental_strategy is applying a lookback to the target table. example snowflake pseudo SQL:

merge into target
using (
   select id, a, b, c
   from events
) batch
on (target.id = batch.id
 and target.date > [35 days ago]
)
when matched ... update ...
when not matched ... insert ...

the use case is that the target table continues to grow larger and the batch has data within 30-ish days, so it's safe to only look at the target for the same date range to avoid scanning the whole table

we already do this with Airflow, but it's a pain to have to decide between Airflow-only vs dbt when the SQL is simple enough for dbt

looks like this is close with dave-connors-3's PR #4546 so I'll keep an eye on that. thanks!

@jtcohen6
Copy link
Contributor

jtcohen6 commented Jan 2, 2023

PRs for this have finally been merged, and this feature will be included in v1.4 :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request good_first_issue Straightforward + self-contained changes, good for new contributors! incremental Incremental modeling with dbt
Projects
None yet
4 participants