Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 5342046

Browse files
committedMay 25, 2018
Implement pre/post hooks and incremental tables on BigQuery. dbt-labs#779 dbt-labs#712
1 parent c19a426 commit 5342046

File tree

7 files changed

+128
-17
lines changed

7 files changed

+128
-17
lines changed
 

‎.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,3 +68,6 @@ target/
6868

6969
# Sublime Text
7070
*.sublime-*
71+
72+
#PyCharm/IntelliJ
73+
.idea/

‎dbt/adapters/bigquery/impl.py

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -372,8 +372,11 @@ def get_table_from_response(cls, resp):
372372

373373
@classmethod
374374
def add_begin_query(cls, profile, name):
375-
raise dbt.exceptions.NotImplementedException(
376-
'`add_begin_query` is not implemented for this adapter!')
375+
pass
376+
377+
@classmethod
378+
def add_commit_query(cls, profile, name):
379+
pass
377380

378381
@classmethod
379382
def create_schema(cls, profile, project_cfg, schema, model_name=None):
@@ -465,21 +468,6 @@ def get_dataset(cls, profile, project_cfg, dataset_name, model_name=None):
465468
dataset_ref = client.dataset(dataset_name)
466469
return google.cloud.bigquery.Dataset(dataset_ref)
467470

468-
@classmethod
469-
def warning_on_hooks(cls, hook_type):
470-
msg = "{} is not supported in bigquery and will be ignored"
471-
dbt.ui.printer.print_timestamped_line(msg.format(hook_type),
472-
dbt.ui.printer.COLOR_FG_YELLOW)
473-
474-
@classmethod
475-
def add_query(cls, profile, sql, model_name=None, auto_begin=True,
476-
bindings=None, abridge_sql_log=False):
477-
if model_name in ['on-run-start', 'on-run-end']:
478-
cls.warning_on_hooks(model_name)
479-
else:
480-
raise dbt.exceptions.NotImplementedException(
481-
'`add_query` is not implemented for this adapter!')
482-
483471
@classmethod
484472
def is_cancelable(cls):
485473
return False

‎dbt/include/global_project/macros/adapters/bigquery.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616

1717
create or replace table {{ relation }}
1818
{{ partition_by(raw_partition_by) }}
19+
{% if temporary %}
20+
OPTIONS(
21+
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
22+
)
23+
{% endif %}
1924
as (
2025
{{ sql }}
2126
);

‎dbt/include/global_project/macros/materializations/helpers.sql

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,13 @@
1111
{% endfor %}
1212
{% endmacro %}
1313

14+
{% macro run_bigquery_hooks(hooks) %}
15+
{% for hook in hooks %}
16+
{% call statement(auto_begin=inside_transaction) %}
17+
{{ hook.get('sql') }}
18+
{% endcall %}
19+
{% endfor %}
20+
{% endmacro %}
1421

1522
{% macro column_list(columns) %}
1623
{%- for col in columns %}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
{% macro dbt_bigquery__incremental_delete(target_relation, tmp_relation) -%}
2+
3+
{%- set unique_key = config.require('unique_key') -%}
4+
{%- set identifier = model['name'] -%}
5+
6+
delete
7+
from {{ target_relation }}
8+
where ({{ unique_key }}) in (
9+
select ({{ unique_key }})
10+
from {{ tmp_relation.include(schema=True) }}
11+
);
12+
13+
{%- endmacro %}
14+
15+
{% materialization incremental, default -%}
16+
{%- set sql_where = config.require('sql_where') -%}
17+
{%- set unique_key = config.get('unique_key') -%}
18+
19+
{%- set identifier = model['name'] -%}
20+
{%- set tmp_identifier = model['name'] + '__dbt_incremental_tmp' -%}
21+
22+
{%- set existing_relations = adapter.list_relations(schema=schema) -%}
23+
{%- set old_relation = adapter.get_relation(relations_list=existing_relations,
24+
schema=schema, identifier=identifier) -%}
25+
{%- set target_relation = api.Relation.create(identifier=identifier, schema=schema, type='table') -%}
26+
{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
27+
schema=schema, type='table') -%}
28+
29+
{%- set non_destructive_mode = (flags.NON_DESTRUCTIVE == True) -%}
30+
{%- set full_refresh_mode = (flags.FULL_REFRESH == True) -%}
31+
32+
{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
33+
{%- set exists_not_as_table = (old_relation is not none and not old_relation.is_table) -%}
34+
35+
{%- set should_truncate = (non_destructive_mode and full_refresh_mode and exists_as_table) -%}
36+
{%- set should_drop = (not should_truncate and (full_refresh_mode or exists_not_as_table)) -%}
37+
{%- set force_create = (flags.FULL_REFRESH and not flags.NON_DESTRUCTIVE) -%}
38+
39+
-- setup
40+
{% if old_relation is none -%}
41+
-- noop
42+
{%- elif should_truncate -%}
43+
{{ adapter.truncate_relation(old_relation) }}
44+
{%- elif should_drop -%}
45+
{{ adapter.drop_relation(old_relation) }}
46+
{%- set old_relation = none -%}
47+
{%- endif %}
48+
49+
{{ run_bigquery_hooks(pre_hooks) }}
50+
51+
-- build model
52+
{% if force_create or old_relation is none -%}
53+
{%- call statement('main') -%}
54+
{{ create_table_as(False, target_relation, sql) }}
55+
{%- endcall -%}
56+
{%- else -%}
57+
{%- call statement() -%}
58+
59+
{% set tmp_table_sql -%}
60+
with dbt_incr_sbq as (
61+
{{ sql }}
62+
)
63+
select * from dbt_incr_sbq
64+
where ({{ sql_where }})
65+
or ({{ sql_where }}) is null
66+
{%- endset %}
67+
68+
{{ dbt.create_table_as(True, tmp_relation, tmp_table_sql) }}
69+
70+
{%- endcall -%}
71+
72+
{{ adapter.expand_target_column_types(temp_table=tmp_identifier,
73+
to_schema=schema,
74+
to_table=identifier) }}
75+
76+
77+
{% if unique_key is not none -%}
78+
{%- call statement() -%}
79+
{{ dbt_bigquery__incremental_delete(target_relation, tmp_relation) }}
80+
{%- endcall -%}
81+
82+
{%- endif %}
83+
{%- call statement('main') -%}
84+
{% set dest_columns = adapter.get_columns_in_table(schema, identifier) %}
85+
{% set dest_cols_csv = dest_columns | map(attribute='quoted') | join(', ') %}
86+
87+
insert into {{ target_relation }} ({{ dest_cols_csv }})
88+
(
89+
select {{ dest_cols_csv }}
90+
from {{ tmp_relation.include(schema=True) }}
91+
);
92+
{% endcall %}
93+
94+
-- clean up temp table in bigquery
95+
{{ drop_relation_if_exists(tmp_relation) }}
96+
97+
{%- endif %}
98+
99+
{{ run_bigquery_hooks(post_hooks) }}
100+
101+
{%- endmaterialization %}

‎dbt/include/global_project/macros/materializations/table/bigquery_table.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@
5252
{% endif %}
5353
{% endif %}
5454

55+
{{ run_bigquery_hooks(pre_hooks) }}
56+
5557
{#
5658
Since dbt uses WRITE_TRUNCATE mode for tables, we only need to drop this thing
5759
if it is not a table. If it _is_ already a table, then we can overwrite it without downtime
@@ -71,5 +73,6 @@
7173
{% endcall -%}
7274
{% endif %}
7375

76+
{{ run_bigquery_hooks(pre_hooks) }}
7477

7578
{% endmaterialization %}

‎dbt/include/global_project/macros/materializations/view/bigquery_view.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
identifier=identifier, schema=schema,
1616
type='view') -%}
1717

18+
{{ run_bigquery_hooks(pre_hooks) }}
19+
1820
-- drop if exists
1921
{%- if old_relation is not none -%}
2022
{%- if old_relation.is_table and not flags.FULL_REFRESH -%}
@@ -36,4 +38,6 @@
3638
{%- endcall %}
3739
{%- endif %}
3840

41+
{{ run_bigquery_hooks(post_hooks) }}
42+
3943
{%- endmaterialization %}

0 commit comments

Comments
 (0)