Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modernize insert_by_period materialization #32

Open
clrcrl opened this issue Feb 14, 2020 · 15 comments
Open

Modernize insert_by_period materialization #32

clrcrl opened this issue Feb 14, 2020 · 15 comments
Labels
insert_by_period relating to the insert by period materializaiton

Comments

@clrcrl
Copy link
Contributor

clrcrl commented Feb 14, 2020

The insert_by_period materialization was written ~2 years ago, and since then, we've improved the way we write materializations. e.g.:

  • Better API for relations (or maybe I just didn't use them correctly)
  • No non_destructive flag
  • Helpful functions like make_temp_relation

Additionally, this materialization doesn't work on warehouses other than Redshift (see dbt-labs/dbt-utils#189)

It's time to give this some love: check out the refreshed code for the incremental materialization as a good starting point.

@DVAlexHiggs
Copy link

This would be really useful for us for use with our dbtvault package which currently uses Snowflake. Starting to experiment! Has anything changed since this issue was created that will be useful to know?

@drewbanin
Copy link

Hey @DVAlexHiggs - check out the latest docs on custom materializations over here: https://docs.getdbt.com/docs/writing-code-in-dbt/extending-dbts-programming-environment/creating-new-materializations

We've also added some helper functions that should simplify materialization code a whole bunch. Check out the default incremental materialization for some inspiration: https://github.com/fishtown-analytics/dbt/blob/dev/octavius-catto/core/dbt/include/global_project/macros/materializations/incremental/incremental.sql

@DVAlexHiggs
Copy link

DVAlexHiggs commented Sep 25, 2020

Hi. We've recently released v0.7.0 of dbtvault, which includes a modified snowflake implementation of this macro.

I believe it is a more modern version, using some of the new adapter functions and such.

Have a look here:
https://github.com/Datavault-UK/dbtvault/blob/v0.7.0/macros/materialisations/vault_insert_by_period_materialization.sql

We've also added a number of additional features such as inferred date ranges, and a 'BASE LOAD' feature which gets around initial load problems.

How useful is this for dbt-utils? I am happy to modify this accordingly for dbt-utils and contribute back.

@fernandobrito
Copy link

@DVAlexHiggs thanks a lot for writing the comment above! I'm a Snowflake dbt user and after trying the insert_by_period from dbt-utils (and failing due to obscure error messages), I arrived here after Google'ing for alternatives. The insert_by_period from dbtvault worked like a charm for me! I also enjoyed the detailed documentation over there. Thanks for sharing!

@etoulas
Copy link

etoulas commented Jan 15, 2021

I'd like to see Postgres and other warehouses supported.

I got this running on Postgres 11 making two small modifications (to get rid of the errors).

On the first try I got the error "subquery in FROM must have an alias".
Add an alias t to the sub-select in insert_by_period_materialization.sql#L49:

  select
    {{target_cols_csv}}
  from (
    {{filtered_sql}}
  ) t -- this is line 49

Once that was fixed I got another error saying "cannot create temporary relation in non-temporary schema".
Change the schema parameter to None in insert_by_period_materialization.sql#L130:

    {%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
                                               schema=None, type='table') -%}  -- this is line 130

This did the trick and it worked for me on Postgres.

@seamus-mckinsey
Copy link

@DVAlexHiggs thanks a lot for writing the comment above! I'm a Snowflake dbt user and after trying the insert_by_period from dbt-utils (and failing due to obscure error messages), I arrived here after Google'ing for alternatives. The insert_by_period from dbtvault worked like a charm for me! I also enjoyed the detailed documentation over there. Thanks for sharing!

@fernandobrito I've made some modifications to the macro here for Snowflake. It's a bit flaky - it always fails the first time with an object is not iterable error, but then will succeed. It's not great but it works 🤷 https://gist.github.com/seamus-mckinsey/a93b2e14ea5dea94909eefbd7680acfa

@DVAlexHiggs
Copy link

@DVAlexHiggs thanks a lot for writing the comment above! I'm a Snowflake dbt user and after trying the insert_by_period from dbt-utils (and failing due to obscure error messages), I arrived here after Google'ing for alternatives. The insert_by_period from dbtvault worked like a charm for me! I also enjoyed the detailed documentation over there. Thanks for sharing!

@fernandobrito I've made some modifications to the macro here for Snowflake. It's a bit flaky - it always fails the first time with an object is not iterable error, but then will succeed. It's not great but it works 🤷 https://gist.github.com/seamus-mckinsey/a93b2e14ea5dea94909eefbd7680acfa

We have fully working version with extra features included in our dbtvault package:

https://github.com/Datavault-UK/dbtvault/blob/master/macros/materialisations/vault_insert_by_period_materialization.sql

@seamus-mckinsey
Copy link

Oh wow, thank you @DVAlexHiggs !!

@HorvathDanielMarton
Copy link

To anyone, for whom this materialization is relevant & interested, we have a working version of insert_by_period on Snowflake (dbt-labs/dbt-utils#410). Looking forward to the feedback and potential next steps from the project owners.

@fernandobrito
Copy link

fernandobrito commented Oct 14, 2021

Thanks for your work on this, @HorvathDanielMarton. I'm using dbtvault (see link above) just for their implementation of this macro, and although it works great, extra dependencies mean extra time waiting until all of them catch up with new dbt releases, so having it on dbt-utils would be amazing. I haven't had the chance to test your fork yet, but if I do I will write my feedback here.

@moltar
Copy link

moltar commented Aug 1, 2022

I can confirm that @etoulas's solution works for PostgreSQL. Can this be added to the package?

@joellabes
Copy link
Contributor

@moltar we're in the process of extracting the insert_by_period materialization from this project and moving it to the experiments repo, to better reflect its level of maturity.

Once that move is complete, we'd welcome a PR to incorporate this fix!

@smitsrr
Copy link

smitsrr commented Sep 19, 2022

@joellabes
Copy link
Contributor

Sure is! Bring on the PRs (although the CI testing is currently busted 😬)

@joellabes joellabes transferred this issue from dbt-labs/dbt-utils Sep 20, 2022
@KyleWellendorff
Copy link

KyleWellendorff commented Nov 7, 2022

Universal approach if you have source history tables and would like to run them sequentially how to they got imported and produce CDC by taking advantage of macros & the built in SNAPSHOTs:

I needed to do this for Big Query and did it manually bc I am fairly new to creating custom macros. Basically, we have tons of source tables that has multiple data deliveries each time interval. We just append the newest results. We do record each delivery date as an existing field

Strategy summary - create a bunch of macros and a snapshot model and you literally dont have to change how you ETL records to maintain cdc :

1.) Create macro to Create brand new table: Table_1_CDC from existing source history table: Source_Table_1. After Table_1_CDC has been created, create view of the distinct timestamp field:
CREATE VIEW Table_1_CDC_View AS SELECT DISTINCT(delivery_date) FROM Table_1_CDC

2.) Create a macro to store the date value for VAR{{ min_cdc_date }}:
(SELECT MIN(delivery_date) from Table_1_CDC_View )

3.) Create a macro to store the date value for VAR{{ max_cdc_date }}:
(SELECT MAX(delivery_date) from Table_1_CDC_View )

4.) Create a macro to:

IF ( VAR{{ min_cdc_date }} < VAR{{ min_cdc_date }} ) THEN

DELETE FROM  TABLE Table_1_CDC WHERE delivery_date = VAR{{ min_cdc_date }}

& also check

IF VAR{{ min_cdc }} = VAR{{ max_cdc }}) THEN

CREATE VIEW "Table_1_Source_CDC_View" AS 
WITH stageAS (
SELECT 
to_be_imported.delivery_date AS not_imorted_date
, imported.delivery_date AS imported_date 
, *
,row_number() OVER(parition by date) AS Row_number
FROM Table_1_CDC AS to_be_imported
RIGHT JOIN Source_Table_1 AS imported
ON to_be_imported.delivery_date  =  imported.delivery_date  
ORDER BY imported.delivery_date 
 )
, final AS (
SELECT 
delivery_date
, CASE WHEN not_imorted_date IS NULL THEN 0
           WHEN imported_date IS NOT NULL THEN 1
END AS Import_status
, LEAD(Row_number,1 ) as Next_partition
, *
FROM T
)
SELECT * FROM final WHERE import_status = 0 and delivery_date = VAR{{ min_cdc }}

9.Create a macro that conditionally populates VAR{{snapshot_sql}} as follows:
Either:
SELECT * FROM Table_1_CDC WHERE delivery_date = VAR{{MIN}}
OR
if ( SELECT MAX(delivery_date) FROM Source_Table_1 = VAR{{MAX}} )
then (DO NOTHING / RETURN)
IFelse

SELECT* FROM Table_1_Source_CDC_View join Table_1_cdc on id=id and cdc.MINdate)=date
WHERE Table_1_cdc.deliverydate IS NOT NULL AND cdc.row_number=Next_partition

wrap the macros logically into one or two wrapper macros and call those in your snapshot model and it should create a CDC table for you out of already historical data. If logically nested, you can keep your pipelines exactly rhe, with addition to these macros being called in the snapshot models. Just execute the VAR{{ sql }}

Possible alternative: Could create a view for each date you want and just select * from those and drop them. Or even a macro for custom views but the SQL for it may get tricky depending on CDC type, if there were schema changes, etc.)

--bigquery specific: import into existing source history table. Using time_travel feature and ability to log source model run times (adding date_time column)), just query the table using time travel (keeps past 7 days of records)

1.create a copy of the existing source table
2.create something similar to this and use bigquery to schedule running it once a day:
delete the MIN(row_number) data after getting a distinct row number per partition from the copy table

3.Create snapshot model for daily loads
Having the select statement take advantage of SYSTEM_TIME travel

SELECT * FROM source_copy WHERE SYSTEM_TIME = yesterday
EXCEPT
SELECT * FROM source_copy 

so schedule a query to delete the minimum partition daily and have the snapshot model run right after it and you should be able to build a cdc table from already populated history tables

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
insert_by_period relating to the insert by period materializaiton
Projects
None yet
Development

Successfully merging a pull request may close this issue.