From 5e6e7469512b47af0e61eb317646acb51c465d42 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 16 Jul 2019 23:24:19 -0400 Subject: [PATCH 1/5] possible fix for re-used check cols on BQ --- .../materializations/snapshot/snapshot.sql | 38 +++++++++---------- .../materializations/snapshot/strategies.sql | 14 +++++-- .../dbt/adapters/bigquery/connections.py | 19 ++++++++-- 3 files changed, 46 insertions(+), 25 deletions(-) diff --git a/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql b/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql index 82ea029b323..a52e302d814 100644 --- a/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql +++ b/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql @@ -31,6 +31,15 @@ ), + snapshotted_data as ( + + select *, + {{ strategy.unique_key }} as dbt_unique_key + + from {{ target_relation }} + + ), + source_data as ( select *, @@ -43,15 +52,6 @@ from snapshot_query ), - snapshotted_data as ( - - select *, - {{ strategy.unique_key }} as dbt_unique_key - - from {{ target_relation }} - - ), - insertions as ( select @@ -84,6 +84,15 @@ ), + snapshotted_data as ( + + select *, + {{ strategy.unique_key }} as dbt_unique_key + + from {{ target_relation }} + + ), + source_data as ( select @@ -96,15 +105,6 @@ from snapshot_query ), - snapshotted_data as ( - - select *, - {{ strategy.unique_key }} as dbt_unique_key - - from {{ target_relation }} - - ), - updates as ( select @@ -202,7 +202,7 @@ {%- endif -%} {% set strategy_macro = strategy_dispatch(strategy_name) %} - {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config) %} + {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %} {% if not target_relation_exists %} diff --git a/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql b/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql index 452cfbbd66c..f23fc91f9e3 100644 --- a/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql +++ b/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql @@ -62,7 +62,7 @@ {# Core strategy definitions #} -{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config) %} +{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} {% set primary_key = config['unique_key'] %} {% set updated_at = config['updated_at'] %} @@ -81,7 +81,7 @@ {% endmacro %} -{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config) %} +{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} {% set check_cols_config = config['check_cols'] %} {% set primary_key = config['unique_key'] %} {% set updated_at = snapshot_get_time() %} @@ -106,7 +106,15 @@ ) {%- endset %} - {% set scd_id_cols = [primary_key] + (check_cols | list) %} + {% if target_exists %} + {% set tbl_version -%} + cast((select count(*) from {{ snapshotted_rel }} where {{ snapshotted_rel }}.dbt_unique_key = {{ primary_key }}) as string) + {%- endset %} + {% set scd_id_cols = [primary_key, tbl_version] + (check_cols | list) %} + {% else %} + {% set scd_id_cols = [primary_key] + (check_cols | list) %} + {% endif %} + {% set scd_id_expr = snapshot_hash_arguments(scd_id_cols) %} {% do return({ diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 24731c1a747..678b272dc9c 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -202,15 +202,28 @@ def raw_execute(self, sql, fetch=False): def execute(self, sql, auto_begin=False, fetch=None): # auto_begin is ignored on bigquery, and only included for consistency - _, iterator = self.raw_execute(sql, fetch=fetch) + query_job, iterator = self.raw_execute(sql, fetch=fetch) if fetch: res = self.get_table_from_response(iterator) else: res = dbt.clients.agate_helper.empty_table() - # If we get here, the query succeeded - status = 'OK' + if query_job.statement_type == 'CREATE_VIEW': + status = 'CREATE VIEW' + + elif query_job.statement_type == 'CREATE_TABLE_AS_SELECT': + conn = self.get_thread_connection() + client = conn.handle + table = client.get_table(query_job.destination) + status = 'CREATE TABLE ({})'.format(table.num_rows) + + elif query_job.statement_type in ['INSERT', 'DELETE', 'MERGE']: + status = '{} ({})'.format(query_job.statement_type, query_job.num_dml_affected_rows) + + else: + status = 'OK' + return status, res def create_bigquery_table(self, database, schema, table_name, callback, From 4df0bbd8147f055d623cf48ceef5d82bae10c6d7 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Tue, 16 Jul 2019 23:43:03 -0400 Subject: [PATCH 2/5] touchup var name and sql formatting --- .../materializations/snapshot/strategies.sql | 20 +++++++++++-------- .../macros/materializations/snapshot.sql | 9 ++++++--- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql b/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql index f23fc91f9e3..0fa1abbe68e 100644 --- a/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql +++ b/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql @@ -35,16 +35,17 @@ {# Create SCD Hash SQL fields cross-db #} -{% macro snapshot_hash_arguments(args) %} +{% macro snapshot_hash_arguments(args) -%} {{ adapter_macro('snapshot_hash_arguments', args) }} -{% endmacro %} +{%- endmacro %} -{% macro default__snapshot_hash_arguments(args) %} +{% macro default__snapshot_hash_arguments(args) -%} md5({% for arg in args %} - coalesce(cast({{ arg }} as varchar ), '') {% if not loop.last %} || '|' || {% endif %} + coalesce(cast({{ arg }} as varchar ), '') + {% if not loop.last %} || '|' || {% endif %} {% endfor %}) -{% endmacro %} +{%- endmacro %} {# @@ -107,10 +108,13 @@ {%- endset %} {% if target_exists %} - {% set tbl_version -%} - cast((select count(*) from {{ snapshotted_rel }} where {{ snapshotted_rel }}.dbt_unique_key = {{ primary_key }}) as string) + {% set row_version -%} + ( + select count(*) from {{ snapshotted_rel }} + where {{ snapshotted_rel }}.dbt_unique_key = {{ primary_key }} + ) {%- endset %} - {% set scd_id_cols = [primary_key, tbl_version] + (check_cols | list) %} + {% set scd_id_cols = [primary_key, row_version] + (check_cols | list) %} {% else %} {% set scd_id_cols = [primary_key] + (check_cols | list) %} {% endif %} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/snapshot.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/snapshot.sql index 4cd5a04467c..2cfbdb2956e 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/snapshot.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/snapshot.sql @@ -1,6 +1,9 @@ -{% macro bigquery__snapshot_hash_arguments(args) %} - to_hex(md5(concat({% for arg in args %}coalesce(cast({{ arg }} as string), ''){% if not loop.last %}, '|',{% endif %}{% endfor %}))) -{% endmacro %} +{% macro bigquery__snapshot_hash_arguments(args) -%} + to_hex(md5(concat({% for arg in args %} + coalesce(cast({{ arg }} as string), ''){% if not loop.last %}, '|',{% endif -%} + {% endfor %} + ))) +{%- endmacro %} {% macro bigquery__create_columns(relation, columns) %} {{ adapter.alter_table_add_columns(relation, columns) }} From a2e801c2de127e514d67e4518343732742cee140 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 21 Jul 2019 13:40:43 -0400 Subject: [PATCH 3/5] pep8 --- plugins/bigquery/dbt/adapters/bigquery/connections.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 678b272dc9c..5743b0aa52b 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -219,7 +219,10 @@ def execute(self, sql, auto_begin=False, fetch=None): status = 'CREATE TABLE ({})'.format(table.num_rows) elif query_job.statement_type in ['INSERT', 'DELETE', 'MERGE']: - status = '{} ({})'.format(query_job.statement_type, query_job.num_dml_affected_rows) + status = '{} ({})'.format( + query_job.statement_type, + query_job.num_dml_affected_rows + ) else: status = 'OK' From 35d1a7a1b587a83baf5e13bb1c85014b82a366fc Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Sun, 21 Jul 2019 15:26:40 -0400 Subject: [PATCH 4/5] add tests --- .../check_snapshots_test_current.sql | 51 +++++++++++++++++ .../check-snapshots/check_cols_cycle.sql | 33 +++++++++++ .../test_snapshot_check_cols.py | 55 +++++++++++++++++++ 3 files changed, 139 insertions(+) create mode 100644 test/integration/004_simple_snapshot_test/check-snapshots-expected/check_snapshots_test_current.sql create mode 100644 test/integration/004_simple_snapshot_test/check-snapshots/check_cols_cycle.sql create mode 100644 test/integration/004_simple_snapshot_test/test_snapshot_check_cols.py diff --git a/test/integration/004_simple_snapshot_test/check-snapshots-expected/check_snapshots_test_current.sql b/test/integration/004_simple_snapshot_test/check-snapshots-expected/check_snapshots_test_current.sql new file mode 100644 index 00000000000..414afb4727c --- /dev/null +++ b/test/integration/004_simple_snapshot_test/check-snapshots-expected/check_snapshots_test_current.sql @@ -0,0 +1,51 @@ + + +with query as ( + + -- check that the current value for id=1 is red + select case when ( + select count(*) + from {{ ref('check_cols_cycle') }} + where id = 1 and color = 'red' and dbt_valid_to is null + ) = 1 then 0 else 1 end as failures + + union all + + -- check that the previous 'red' value for id=1 is invalidated + select case when ( + select count(*) + from {{ ref('check_cols_cycle') }} + where id = 1 and color = 'red' and dbt_valid_to is not null + ) = 1 then 0 else 1 end as failures + + union all + + -- check that there's only one current record for id=2 + select case when ( + select count(*) + from {{ ref('check_cols_cycle') }} + where id = 2 and color = 'pink' and dbt_valid_to is null + ) = 1 then 0 else 1 end as failures + + union all + + -- check that the previous value for id=2 is represented + select case when ( + select count(*) + from {{ ref('check_cols_cycle') }} + where id = 2 and color = 'green' and dbt_valid_to is not null + ) = 1 then 0 else 1 end as failures + + union all + + -- check that there are 5 records total in the table + select case when ( + select count(*) + from {{ ref('check_cols_cycle') }} + ) = 5 then 0 else 1 end as failures + +) + +select * +from query +where failures = 1 diff --git a/test/integration/004_simple_snapshot_test/check-snapshots/check_cols_cycle.sql b/test/integration/004_simple_snapshot_test/check-snapshots/check_cols_cycle.sql new file mode 100644 index 00000000000..8b36f35a1bc --- /dev/null +++ b/test/integration/004_simple_snapshot_test/check-snapshots/check_cols_cycle.sql @@ -0,0 +1,33 @@ + +{% snapshot check_cols_cycle %} + + {{ + config( + target_database=database, + target_schema=schema, + unique_key='id', + strategy='check', + check_cols=['color'] + ) + }} + + {% if var('version') == 1 %} + + select 1 as id, 'red' as color union all + select 2 as id, 'green' as color + + {% elif var('version') == 2 %} + + select 1 as id, 'blue' as color union all + select 2 as id, 'green' as color + + {% elif var('version') == 3 %} + + select 1 as id, 'red' as color union all + select 2 as id, 'pink' as color + + {% else %} + {% do exceptions.raise_compiler_error("Got bad version: " ~ var('version')) %} + {% endif %} + +{% endsnapshot %} diff --git a/test/integration/004_simple_snapshot_test/test_snapshot_check_cols.py b/test/integration/004_simple_snapshot_test/test_snapshot_check_cols.py new file mode 100644 index 00000000000..0416ed97eff --- /dev/null +++ b/test/integration/004_simple_snapshot_test/test_snapshot_check_cols.py @@ -0,0 +1,55 @@ +from test.integration.base import DBTIntegrationTest, use_profile +import dbt.exceptions + + +class TestSimpleSnapshotFiles(DBTIntegrationTest): + NUM_SNAPSHOT_MODELS = 1 + + @property + def schema(self): + return "simple_snapshot_004" + + @property + def models(self): + return "models" + + @property + def project_config(self): + return { + "snapshot-paths": ['check-snapshots'], + "test-paths": ['check-snapshots-expected'], + "source-paths": [], + } + + def test_snapshot_check_cols_cycle(self): + results = self.run_dbt(["snapshot", '--vars', 'version: 1']) + self.assertEqual(len(results), 1) + + results = self.run_dbt(["snapshot", '--vars', 'version: 2']) + self.assertEqual(len(results), 1) + + results = self.run_dbt(["snapshot", '--vars', 'version: 3']) + self.assertEqual(len(results), 1) + + def assert_expected(self): + self.run_dbt(['test', '--data', '--vars', 'version: 3']) + + @use_profile('snowflake') + def test__snowflake__simple_snapshot(self): + self.test_snapshot_check_cols_cycle() + self.assert_expected() + + @use_profile('postgres') + def test__postgres__simple_snapshot(self): + self.test_snapshot_check_cols_cycle() + self.assert_expected() + + @use_profile('bigquery') + def test__bigquery__simple_snapshot(self): + self.test_snapshot_check_cols_cycle() + self.assert_expected() + + @use_profile('redshift') + def test__redshift__simple_snapshot(self): + self.test_snapshot_check_cols_cycle() + self.assert_expected() From b6e7351431c2666e63d2acb7f79ac387ab7df658 Mon Sep 17 00:00:00 2001 From: Drew Banin Date: Mon, 22 Jul 2019 11:14:03 -0400 Subject: [PATCH 5/5] snapshot surrogate key whitespace control --- .../macros/materializations/snapshot/strategies.sql | 4 ++-- .../dbt/include/bigquery/macros/materializations/snapshot.sql | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql b/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql index 0fa1abbe68e..6de9151ddff 100644 --- a/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql +++ b/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql @@ -41,10 +41,10 @@ {% macro default__snapshot_hash_arguments(args) -%} - md5({% for arg in args %} + md5({%- for arg in args -%} coalesce(cast({{ arg }} as varchar ), '') {% if not loop.last %} || '|' || {% endif %} - {% endfor %}) + {%- endfor -%}) {%- endmacro %} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/snapshot.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/snapshot.sql index 2cfbdb2956e..836a44c8d72 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/snapshot.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/snapshot.sql @@ -1,7 +1,7 @@ {% macro bigquery__snapshot_hash_arguments(args) -%} - to_hex(md5(concat({% for arg in args %} + to_hex(md5(concat({%- for arg in args -%} coalesce(cast({{ arg }} as string), ''){% if not loop.last %}, '|',{% endif -%} - {% endfor %} + {%- endfor -%} ))) {%- endmacro %}