-
Notifications
You must be signed in to change notification settings - Fork 157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CT-2051] [Bug] New insert_overwrite Bigquery partitioning with integer keys can create huge temporary array variables, exceeding BQ limits #16
Comments
Thanks for laying this out @haukeduden! This issue is at the intersection of two dbt+BigQuery features that are net-new in 0.16.0: support for integer-range partitioning, and dynamic partition overwrite in incremental models. It's not surprising to hear that there's some roughness around the edges between them. What do you think makes most sense for dbt's behavior here? It sounds from your description like, if the partition is an integer range, instead of the materialization script running select as struct
array_agg(distinct PARTITION_FIELD)
from TEMP_TABLE you'd prefer that dbt run something like with range as (
select generate_array(PARTITION_START, PARTITION_END, PARTITION_INTERVAL) as range_array
),
values_to_replace as (
select distinct PARTITION_FIELD as int_value from TEMP TABLE
),
joined as (
select *,
range_bucket(values_to_replace.int_value, range.range_array) as range_index
from range
cross join int_values
)
select as struct array_agg(distinct range_array[offset(range_index)]) as partitions_for_replacement
from joined
where range_index between 0 and (array_length(range_array) - 1) One conceptual hurdle is that the |
Hello @jtcohen6. Could you not simply add a filter for the field values to calculate a representative value for the partition (e.g. the first value of the partition)? This would match what is done for timestamp values with time partitioned tables, which are sent through the 'date' filter.
The get_partition macro would simply calculate this:
And then in the merge clause you could write:
Wouldn't that work? The only necessary modification would be to send the integer values through the get_partition filter macro, otherwise the code could stay exactly the same. |
@jtcohen6 I just realized that I am have only answered half your question ;). Yes, that is how I had understood the partitioning to work and yes, I knew that I would have to select full partitions of source data. By the way: I believe that for many integer-partitioned tables a simpler merge strategy would also be sufficient. I think a common use case is a table with an auto-increment integer id field and rows themselves being immutable / never updated. At least that is the case for the table I have been using to try out insert_overwrite. For these cases, a simple "append" strategy would be sufficient. I.e. a strategy where one asserts that the new data and the existing data have no overlap. The merge statement would become much simpler, as the entire "when not matched by source" block could be left out. It would also be easier to select the source data, since one could simply select everything above the current max value, without worrying about partition sizes and modulos. |
@haukeduden I'm not sure the approach suggested here would work as functions of the partition key don't seem to be supported: https://cloud.google.com/bigquery/docs/querying-partitioned-tables#query_an_integer-range_partitioned_table I'm trying to solve a related problem and struggling! Any thoughts on this problem would be greatly appreciated. I'm thinking liberal use of dynamic SQL may be required. This would allow you to create the |
Hi, I think I'm also affected by this problem. When I set incremental_strategy to insert_overwrite on my integer-partitioned model, I get this error: |
This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days. |
I think this is still an issue? |
Yes, this is not solved yet. If a proper solution is too difficult then I believe DBT should simple yield an error and declare insert_overwrite incompatible with integer partitioned tables. At least then one does not risk running into huge BigQuery costs or waste time with cryptic error messages. |
Just ran into this myself. My use case is not append only, it's a series of updates to entities that have a unique integer ID and efficiently updating just the current state for them. It seems like the (very similar) approach by @jtcohen6 in dbt-labs/dbt-core#1971 that was ultimately abandoned in favour of dbt-labs/dbt-core#2140 would still be viable here - a merge that uses statically calculated min and max values on the partition field in the For some of my data it's almost append only and so the benefits would be large (they'd be small ranges almost all the time), for some less so. Perhaps dbt-labs/dbt-core#1971 is worth resurrecting as something that would at least offer a viable path? Also trying to poke around at the old code and figure out how to define my own materialisation to make this happen for myself at least 😓 |
I haven't opened a PR over here because I really have very little idea what I'm doing, but it looks like the |
Here's a version that you can drop in and enable on a model by defining {# Note that this is implemented in the partition_by object in recent dbt-bigquery #}
{% macro _render_wrapped(partition_by, alias=none) -%}
{%- if partition_by.data_type == 'int64' -%}
{{ partition_by.render(alias) }}
{%- else -%}
{{ partition_by.data_type }}({{ partition_by.render(alias) }})
{%- endif -%}
{%- endmacro %}
{% macro bigquery__get_merge_sql(target, source, unique_key, dest_columns, predicates=none) %}
{% if config.get('match_strategy', 'exact') == 'exact' %}
{# Standard merge strategy #}
{{ default__get_merge_sql(target, source, unique_key, dest_columns, predicates) }}
{% else %}
{# Use predicates merge strategy #}
{%- set tmp_relation = make_temp_relation(this) %}
{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
{%- set tmp_relation_exists = tmp_relation.render() in source -%}
{%- set predicates = [] if predicates is none else predicates -%}
declare _dbt_min_partition_value {{ partition_by.data_type }};
declare _dbt_max_partition_value {{ partition_by.data_type }};
{# have we already created the temp table to check for schema changes? #}
{% if not tmp_relation_exists %}
{{ declare_dbt_max_partition(this, partition_by, sql) }}
-- 1. create a temp table with model data
{# TODO support ingestion time partitioning but IDK how to call BQ adapter bq_create_table_as #}
{# bq_create_table_as(partition_by.time_ingestion_partitioning, True, tmp_relation, sql, 'sql') S#}
{{ bigquery__create_table_as(True, tmp_relation, sql) }}
{# Disable the sql header, as if there was one it was output to make the temp table; DOES NOT WORK. #}
{% call set_sql_header(config) %}
-- header already rendered
{% endcall %}
{%- set source -%}
(
select
{% if partition_by.time_ingestion_partitioning -%}
_PARTITIONTIME,
{%- endif -%}
* from {{ tmp_relation }}
)
{% endset %}
{% else %}
-- 1. temp table already exists, we used it to check for schema changes
{% endif %}
-- 2. define partition ranges to update
set (_dbt_min_partition_value, _dbt_max_partition_value) = (
select as struct
min({{ _render_wrapped(partition_by) }}),
max({{ _render_wrapped(partition_by) }})
from {{ tmp_relation }}
);
-- 3. run the merge statement
{% set predicate -%}
{{ _render_wrapped(partition_by, 'DBT_INTERNAL_DEST') }}
between _dbt_min_partition_value and _dbt_max_partition_value
{%- endset %}
{% do predicates.append(predicate) %}
{{ default__get_merge_sql(target, source, unique_key, dest_columns, predicates) }};
-- 4. clean up the temp table
drop table if exists {{ tmp_relation }};
{% endif %}
{% endmacro %} 🚨 one issue is that sql headers are output twice if building the temp table and |
Anecdata only, but this approach reduces one incremental build of mine on a sample run from 3.4TB to 431.4GB, huge win. |
Re-opening that one following @elyobo's PR (#371). Trying to sum-up the entire situation before diving in:
Those are the options currently available to one exploring that space. Here we are talking about an optimization of the specific case of Currently when generating the list of partitions to process, dbt generates the distinct list of values. Optimally, what we would need instead is the distinct list of partitions (see above for why). This is what @jtcohen6 mentioned up above almost 3 years ago (omg). I'm not sure why we abandoned that plan? In the meantime, @elyobo you offer instead to pick the min and max values from that list, and process every partitions in between - even if they don't need to be. Not as optimal as targeting specific partitions, but better than the current implementation in some scenarios. Now we would need a knob to turn that on/off since the behavior could makes things worst in some situations. Did you explore the original idea of generating the distinct list of partitions? I'd rather we make the thing better all the time, and not have to surface another setting to a situation already quite complex in my opinion. @github-christophe-oudar I know you're knowledgeable about that area. Any wisdom to share? ;) |
Thanks @Fleid, that sounds like a pretty accurate summary.
The approach that was chosen instead seems to be better where it's viable - the abandoned approach in dbt-labs/dbt-core#1971 just used min and max like I brought back. I don't think that the corner case here (the IDs being so many that storing them exceeds the available memory limit in BQ scripting) was considered - I don't recall seeing it anywhere.
The food-for-thought-probably-throw-away implementation in that PR includes such a toggle.
No, I didn't - honestly I didn't even think of it, just brought back the min/max as a simple solution that has (through the workaround in #16 (comment)) been a decent money saver for us. I guess there's somewhere in the BQ information schema that exposes this information and we could use that to calculate the affected partitions from the temp table? If we had e.g. value |
Ok so we are solving for 2 things here:
I buy that! What I would like then, is an additional flag in the {{ config(
materialized='incremental',
incremental_strategy='insert_overwrite',
partition_by={
"field": "user_id",
"data_type": "int64",
"range": {
"start": 0,
"end": 100,
"interval": 10
},
"replace_partitions": "min_max"
}
)}}
select
user_id,
event_name,
created_at
from {{ ref('events') }} I'm making it a enumeration rather than a boolean, as I'm expecting more strategies to appear (current would be If that sounds not too far off, the thing still an unknown for me if it should be restricted to Also what we would need is to check that |
Not that this is absolutely not a todo list for you, but a design exercise with you on what it should look like in terms of ergonomics ;) |
That sounds pretty good; enum instead of boolean is sensible. The min/max approach would also work with date partitioning but is unlikely to be a good choice - sequential integer IDs can easily exceed the BQ scripting memory limits, but the number of unique dates in a set of data are likely to be far smaller and fit within the limits. I wouldn't prevent such a choice but would definitely not default to it and would mention this in the docs. My head is totally out of this space at the moment so will take me some time to dig back in, but would be happy to give it a go once I get some time to wrap my head around it again. |
Just a side note: BigQuery has a maximum limit of 10.000 partitions. So I do not think that a partition list could cause a crash in the BigQuery case (don't know about other databases, though). Also, in my experience you normally just replace a few chosen partitions with insert_overwrite, so the list is likely very short. Having said that: I like the min/max idea better. It is more generic and should work under all circumstances, even edge cases, and with all backends. But if that does not work out for some reason, then I think listing the partitions would also be fine. |
I think the limit is 4k and yeah, it should be fine. I think there's some work involved to convert the set of integer values into a set of parameters to effectively prune the partitions - each partition has many integers in range, so we need to do something like
The set of mins and maxes would therefore be two integers per partition, 8k max values, which still seems fine - just harder to figure out how to do this (and I can't see where to get the relevant partitioning parameters). |
@elyobo should I close the PR you have open for now, since we're switching strategy? |
@Fleid happy to close it and have done so. |
I recently refactored many incremental models I maintain to make use of the I startet by looking at the compiled SQL and found:
with
As already pointed out in this issue the array in
and
Note that I had two partitions to replace in my example. This code drops the entire partition and writes it again, which is the behavior I assumed from reading the documentation, i.e. the line '[...] because it operates on partitions of data rather than individual rows. [...]'. Using a After proposing my view on the topic: @elyobo, @Fleid, is someone working on that issue at the moment? If a change is in the pipeline I would lean back and wait for an update. If not, I could try to follow the process to contribute to dbt myself. PS: dbt is a great piece of software :-) |
@dbeatty10 is this something on your radar for incremental materialization? |
I hadn't noticed that query cost increase @tmatalla-wagner, what sort of size arrays are you dealing with to hit that? Or perhaps I did, it was just still drastically cheaper than it had been anyway so it seemed like a win 😅 This isn't something I've had time to look at and it would take me a fair bit of work just to understand enough to get started tbh, I don't know the internals of dbt well at all and even figuring out ☝️ that hacky stuff above took a loooong time :D |
@elyobo: the array length of @Fleid, @dbeatty10: I will wait for your response whether the dbt core team is working on that issue. If not, I would try to open a PR addressing the issue while (hopefully) following your desired process for contributing :-) |
@tmatalla-wagner we don't have any relevant changes in the pipeline, and we'd welcome your contributions towards this effort! Our contributing guide has some helpful information about the process of opening a PR and getting it reviewed. Please reach out if you need any help! |
Hi all, just wanted to see if there had been any progress on this issue? Running into it myself now, and would be really useful to know what the current best strategy to overcome this is. Thanks! |
Hi @carlos-raylo, unfortunately I haven't worked on a solution within dbt. A simple workaround is to setup your incremental model in a way that the partition_by column itself contains the transformation as suggested by me earlier (link). This would look somewhat like this:
|
@tmatalla-wagner Thanks very much for this suggestion, will see if it helps! |
@carlos-raylo we're still running this one #16 (comment); works well except that models using the strategy can't put things into the sql header because they end up being output twice IIRC. |
Describe the bug
When doing an incremental update of an integer-partitioned bigquery table with the new insert_overwrite merge strategy then DBT calculates which partitions it should replace. In the process it can generate a huge BigQuery ARRAY value.
The cause is that DBT does not take the "interval" parameter of the partitioning specification into account. The generated SQL code selects "array_agg(distinct PARTITION_FIELD)" when calculating the partitions to replace. This selects ALL distinct partition field values of the incremental update, even if these values are actually in the same partition. This causes a potentially huge array to be created. If there is enough data in the table then this will even cause an error because BQ's maximum array size (100 MB) is exceeded.
Note that this bug is not triggered by time partitioned tables because for those all partition fields are dates and the partition size is always one day (i.e. there is only one valid value per partition).
Steps To Reproduce
Run the model once with empty source data
Add 1000 records in the source data with values for partkey from 0 to 999
Generate the incremental update code for the model and look at the values in the dbt_partitions_for_replacement internal variable.
You will see that it contains all 1000 values for partkey from 0 to 999, even though those are all inside the same single partition.
Expected behavior
DBT should ensure that no huge temporary array variable is generated.
The dbt_partitions_for_replacement array should have at most as many elements as the number of partitions being updated. In my opinion the way to go would be to store only the starting values of each partition in the array and then modify the merge clause to use a range for each partition.
Screenshots and log output
N/A
System information
Which database are you using dbt with?
The output of
dbt --version
:The operating system you're using:
macOS
The output of
python --version
:Python 3.7.4
Additional context
The text was updated successfully, but these errors were encountered: