Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize insert_overwrite incremental strategy with WRITE_TRUNCATE / Partition copy #77

Closed
github-christophe-oudar opened this issue Dec 1, 2021 · 13 comments · Fixed by #167
Labels
enhancement New feature or request

Comments

@github-christophe-oudar
Copy link
Contributor

Describe the feature

This is a follow-up from a discussion over the topic with @jtcohen6.

Let's assume we have a model with incremental materialization and we're going for an incremental run.

merge into mydataset.newtable as DBT_INTERNAL_DEST
    using (
    SELECT * from mydataset.newtable__dbt_tmp
    ) as DBT_INTERNAL_SOURCE
    on FALSE

when not matched by source
        and DBT_INTERNAL_DEST._PARTITIONTIME in unnest([TIMESTAMP("2021-11-30")])
    then delete

when not matched then insert
    (`_PARTITIONTIME`, `transaction_id`)
values
    (`_PARTITIONTIME`, `transaction_id`)

However the merge will first delete the data to insert the data.
On big partitions, it can take a while resulting in a long operation compared to other approaches such as:

  • select whole partition and insert in the destination table with a partition decorator (ie mydataset.newtable$20211130) while using WRITE_TRUNCATE setting.
  • copying the whole partition ie: bq cp mydataset.newtable__dbt_tmp$20211130 mydataset.newtable$20211130.

As far as I know the fastest approach is to use bq cp (it might require to benchmark it though) unless you don't need a temporary table (ie no column change & single partition) in which case, you would use the input query and write to the destination table with the partition decorator directly.

The main inconvenient for that approach is that even though the insert_overwrite strategy operation isn't fully atomic (the temporary table is created and stays even if the MERGE fails), the MERGE query is atomic on all partitions.
So to keep the same behavior, it requires to have a single partition or accept the tradeoff of breaking the atomicity of the partitions replacement step.

Therefore it could be relevant to have a copy_partitions config to activate that approach.

Describe alternatives you've considered

It could also be a whole new incremental strategy if relevant.

Additional context

I had a production case that I described in the thread where, using a single partition, MERGE version (insert_overwrite) was taking 43 minutes while the query with WRITE_TRUNCATE took 26 minutes for the same result.
At Teads, our internal BigQuery query wrapper tool is using a select & write_truncate since we're not using a temporary table as an intermediate step because we only process a single partition per query (which is a specific case compared to the dbt approach). It's quite a deal breaker to use dbt as is for those queries because of that performance overhead.
Of course, it would be much better if Google could make a server side optimisation on MERGE queries when it detects that pattern.

Who will this benefit?

It will benefit anyone using insert_overwrite incremental strategy with large partitions where delete/insert are long to process.

Are you interested in contributing this feature?

Yes

@github-christophe-oudar github-christophe-oudar added enhancement New feature or request triage labels Dec 1, 2021
@switzer
Copy link

switzer commented Dec 7, 2021

When we rebuild a day of data for one of our larger tables, the data collection takes 7 Tb and the merge takes 33 Tb to process. We regularly do the data collection into a temp table, and manually copy the partition into the table. I'd love to see this as a separate incremental strategy.

Here is the copy step that we do:
bq cp -a 'project-name:dataset.dim_table__dbt_tmp$20211201' project-name:dataset.dim_table

@github-christophe-oudar
Copy link
Contributor Author

Exactly it would be much better for you to have that copy step directly done by dbt.
Is the -a flag here to avoid the partition decorator on the destination table?

@switzer
Copy link

switzer commented Dec 9, 2021

I think that this is just how our process was built - build a partition in a temp table, rm a partition on the destination table, cp the partition to the destination table. I think you are right, we could just overwrite the partition by using the partition decorator in the destination table, and remove the rm step.

@github-christophe-oudar
Copy link
Contributor Author

Right, I tend to just cp -f to overwrite existing partitions and spare some time by skipping partition deletion.

@McKnight-42 McKnight-42 removed the triage label Jan 5, 2022
@McKnight-42
Copy link
Contributor

opening this up for any work you may wish to do on it as seems after conversation with @jtcohen6 and continued on since that there is a good grasp of what to do, happy to help in any way I can.

@jtcohen6
Copy link
Contributor

jtcohen6 commented Jan 6, 2022

For visibility: @McKnight-42 and I have been discussing this over the past few days, since it touches on a whole matrix of BigQuery functionality, and a lot of the history of the dbt-bigquery plugin.

With the benefit of that discussion, a couple quick thoughts:

  • We should seek to prefer SQL wherever possible. It offers the most visibility and "replay-ability" for end users. In specific circumstances where it's justified, I'm not strictly opposed to supporting/exposing BigQuery python client methods. It sounds like this could be one of those cases! For some users, the better performance is worth the loss of visibility and atomicity. We would want to support it as a non-default, opt-in config, and clearly document the trade offs involved.
  • There are a few challenges with ingestion-time-partitioned tables (Support for ingestion time partition table on BigQuery as incremental materialization #75 but also essential here), specifically around first run / full refresh for incremental models. Would we need a separate statement for every partition's "backfill"? How to sort the right data into each partition? The incremental mechanism (above) is easier to grasp, though even there we need the ability to insert multiple partitions, e.g. to support lookback windows or recovery after a few days.

So, just want to say — thanks for the thoughtful writeup @github-christophe-oudar (here and in #75)! Lots to chew on. Let's keep digging in :)

@github-christophe-oudar
Copy link
Contributor Author

Thank you both for getting back to me on those issues.

We should seek to prefer SQL wherever possible

I wish it was possible to do otherwise. However the overhead is so large that it's tough to accept the tradeoff sometimes.
Of course, it indeed requires an appropriate interface to hide the implementation details.

I'm strictly opposed to supporting/exposing BigQuery python client methods

Do you mean that you would like to prevent user to be able to access the BQ driver through materialization code or just through Jinja macros?

We would want to support it as a non-default, opt-in config, and clearly document the trade offs involved.

I agree it doesn't need to be a default and it has to be documented.

Would we need a separate statement for every partition's "backfill"?

I'd need more tries to see what's possible or not. I'd say we'll likely need one statement per partition to be backfilled.
It does break the atomicity as explained in the issue. It's a tradeoff. As most of my production use cases is processing partitions one by one, it doesn't matter for me.

How to sort the right data into each partition?

I'm not sure what you mean: let's say we want to issue queries such as bq cp mydataset.newtable__dbt_tmp$20211130 mydataset.newtable$20211130. I would go for listing the existing partitions (select distinct _PARTITIONTIME as d FROM mydataset.newtable__dbt_tmp) and then generate the copy queries.

So what could be the plan to develop that feature?
I would suggest:

  • Add a boolean config copy_partitions to the query config definition
  • if the flag is true and merge strategy is insert overwrite, query in temporary table (as usual) then read the partitions from temporary tables
  • For all existing partitions, copy them into the matching destination table partitions

@jtcohen6
Copy link
Contributor

jtcohen6 commented Jan 7, 2022

I'm strictly opposed to supporting/exposing BigQuery python client methods

Do you mean that you would like to prevent user to be able to access the BQ driver through materialization code or just through Jinja macros?

Sorry, I was missing an essential word here: I'm not strictly opposed when there's a justifiable trade-off! We'd want to expose specific BQ python functionality, as context methods, via Jinja macros, with as much logging as we can muster for visibility.

How to sort the right data into each partition?

I'm not sure what you mean: let's say we want to issue queries such as bq cp mydataset.newtable__dbt_tmp$20211130 mydataset.newtable$20211130. I would go for listing the existing partitions (select distinct _PARTITIONTIME as d FROM mydataset.newtable__dbt_tmp) and then generate the copy queries.

This approach makes sense for incremental additions to the table. What I'm confused about is, when we need to insert multiple partitions at once, or recreate all partitions of an ingestion-time-partitioned table from scratch (either for the very first time, or --full-refresh) — I think we'll need a way to take someone's model SQL, which returns data for many days/partitions of data, and have a way to reliably split it up into different partitions.

In the older (now deprecated) method, we required users to include [DBT__PARTITION_DATE] in their model SQL, and dbt would interpolate that with each value from partitions. Is that an approach we should seek to emulate again? Or should we require that users specify a column from the underlying data, and map the association from that column's value to _PARTITIONTIME/$partition_value?

@github-christophe-oudar
Copy link
Contributor Author

Thanks for the clarification, that makes sense!

For table creation (aka full refresh), you don't really to mind it that much: it's going to be such as

CREATE TABLE
  mydataset.newtable (date: TIMESTAMP, transaction_id INT64)
PARTITION BY date
AS (
SELECT TIMESTAMP("2021-11-01") as date, 1 as transaction_id
)

As described in #75, it won't work for ingestion time partition tables but that's the point of the issue (that would be fixed by splitting that in 2 steps).

I would expect people working with ingestion time partition tables to select proper partition column (_PARTITIONTIME). It would make it much easier to work with. I think we should put a warning if the request output column doesn't contain appropriate columns. Indeed BQ, when inserted without an explicit partition (ie implicit NULL on partition column), it will insert in NULL.

I'd like to make it as "straightforward" as possible, hopefully I can fit all usages with that approach. However you might have a point that a dedicated provided partitions value usage might be worth it as listing partitions in a table can take quite some time (and therefore could be skipped if the user already knows affected partitions, ie in my case, I will use a variable in that query that I'm already using to know which partition is being processed).

@jtcohen6
Copy link
Contributor

After reading through #75 for the fourth or fifth time, it finally clicked for me how we're going to get the "best of both worlds" here: partition values derived from actual columns, and also the performance characteristics of older-school ingestion-time partitioned tables (+ legacySQL / bq partition replacement).

@github-christophe-oudar Thank you for patience on this one! I know the delay here has been on me, and that you've got other things on your plate these days. If someone else is interested in working on this, we'd be happy to help support them. I think the right next steps are:

  • Start by working on Support for ingestion time partition table on BigQuery as incremental materialization #75. The proposal here depends on a viable point of entry to defining ingestion-based partitioned tables.
  • Do some experimentation to determine whether (a) bq cp or (b) WRITE_TRUNCATE + legacySQL table decorator is more performant
  • Create a highly specialized adapter context method to hook into that functionality
  • Add an optional config to the insert_overwrite incremental strategy that leverages the context method instead of MERGE

@github-christophe-oudar
Copy link
Contributor Author

Thanks for having a thorough look at the issues.
Indeed I'm quite busy these days but hopefully it will clear up in the next weeks.
If somebody wants to take it, I can assist somehow else I guess I'll look into it

My gut feeling is that (a) is faster but a quick test would definitely assess it.
The plan looks great, I'm looking forward to seeing it happen 🙌

@github-christophe-oudar
Copy link
Contributor Author

I started to look at:

Do some experimentation to determine whether (a) bq cp or (b) WRITE_TRUNCATE + legacySQL table decorator is more performant

So far:
if the plan is to go with a temporary table to keep all features regarding column sync, then for a partition of 192 GB:

  • (a) solution is
bq cp -f dataset.source_table$20220415 dataset.destination_table$20220415

the run time is 4s (and I'm not sure it uses any slots)

  • (b) solution is
SELECT * FROM `dataset.source_table` WHERE _PARTITIONTIME = "2022-04-18"

with the destination table being dataset.destination_table$20220418
then the job took 2 min 22 and the slot time consumed is 10 hr 24 min.

So there is no match regarding both options (unless I didn't understand what you meant with solution b)

Here how I see it implemented:

  • Add a copy_partitions to partition_by object (default to false), throw an error if incremental_strategy != 'insert_overwrite'.
  • If insert_overwrite & copy_partitions, we replace the MERGE part by:
    • In static mode (partitions defined), we copy explicit partitions from the output in temp table
    • In dynamic mode, we retrieve the partitions from the output in temp table and copy all of them

I'll look into implementing that approach on top of the existing PR for #136 in another PR if you don't see any issue with that approach.

@github-actions
Copy link
Contributor

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
5 participants