From 9589e7aaa19b67459a30459d8c7f3be06415746a Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Wed, 11 Dec 2019 09:14:14 -0700 Subject: [PATCH 1/2] Fix snapshot check all strategy with added column --- .../materializations/snapshot/strategies.sql | 53 +++++++++++--- .../004_simple_snapshot_test/data/seed.csv | 4 + .../data/seed_newcol.csv | 4 + .../test-snapshots-checkall/snapshot.sql | 4 + .../test_simple_snapshot.py | 73 +++++++++++++++++++ test/integration/base.py | 2 +- 6 files changed, 128 insertions(+), 12 deletions(-) create mode 100644 test/integration/004_simple_snapshot_test/data/seed.csv create mode 100644 test/integration/004_simple_snapshot_test/data/seed_newcol.csv create mode 100644 test/integration/004_simple_snapshot_test/test-snapshots-checkall/snapshot.sql diff --git a/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql b/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql index 7847117e005..108d3f01709 100644 --- a/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql +++ b/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql @@ -93,6 +93,32 @@ ) %} {% endmacro %} + +{% macro snapshot_check_all_get_existing_columns(node, target_exists) -%} + {%- set query_columns = get_columns_in_query(node['injected_sql']) -%} + {%- if not target_exists -%} + {# no table yet -> return whatever the query does #} + {{ return([false, query_columns]) }} + {%- endif -%} + {# handle any schema changes #} + {%- set target_table = node.get('alias', node.get('name')) -%} + {%- set target_relation = adapter.get_relation(database=node.database, schema=node.schema, identifier=target_table) -%} + {%- set existing_cols = get_columns_in_query('select * from ' ~ target_relation) -%} + {%- set ns = namespace() -%} {# handle for-loop scoping with a namespace #} + {%- set ns.column_added = false -%} + + {%- set intersection = [] -%} + {%- for col in query_columns -%} + {%- if col in existing_cols -%} + {%- do intersection.append(col) -%} + {%- else -%} + {% set ns.column_added = true %} + {%- endif -%} + {%- endfor -%} + {{ return([ns.column_added, intersection]) }} +{%- endmacro %} + + {% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} {% set check_cols_config = config['check_cols'] %} {% set primary_key = config['unique_key'] %} @@ -107,24 +133,29 @@ {%- endif %} {% set updated_at = snapshot_string_as_time(now) %} + {% set column_added = false %} + {% if check_cols_config == 'all' %} - {% set check_cols = get_columns_in_query(node['injected_sql']) %} + {% set column_added, check_cols = snapshot_check_all_get_existing_columns(node, target_exists) %} {% elif check_cols_config is iterable and (check_cols_config | length) > 0 %} {% set check_cols = check_cols_config %} {% else %} {% do exceptions.raise_compiler_error("Invalid value for 'check_cols': " ~ check_cols_config) %} {% endif %} - {% set row_changed_expr -%} - ( - {% for col in check_cols %} - {{ snapshotted_rel }}.{{ col }} != {{ current_rel }}.{{ col }} - or - ({{ snapshotted_rel }}.{{ col }} is null) != ({{ current_rel }}.{{ col }} is null) - {%- if not loop.last %} or {% endif %} - - {% endfor %} - ) + {%- set row_changed_expr -%} + ( + {%- if column_added -%} + TRUE + {%- else -%} + {%- for col in check_cols -%} + {{ snapshotted_rel }}.{{ col }} != {{ current_rel }}.{{ col }} + or + ({{ snapshotted_rel }}.{{ col }} is null) != ({{ current_rel }}.{{ col }} is null) + {%- if not loop.last %} or {% endif -%} + {%- endfor -%} + {%- endif -%} + ) {%- endset %} {% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %} diff --git a/test/integration/004_simple_snapshot_test/data/seed.csv b/test/integration/004_simple_snapshot_test/data/seed.csv new file mode 100644 index 00000000000..9da8d46ff03 --- /dev/null +++ b/test/integration/004_simple_snapshot_test/data/seed.csv @@ -0,0 +1,4 @@ +id,first_name +1,Judith +2,Arthur +3,Rachel diff --git a/test/integration/004_simple_snapshot_test/data/seed_newcol.csv b/test/integration/004_simple_snapshot_test/data/seed_newcol.csv new file mode 100644 index 00000000000..005517bdab9 --- /dev/null +++ b/test/integration/004_simple_snapshot_test/data/seed_newcol.csv @@ -0,0 +1,4 @@ +id,first_name,last_name +1,Judith,Kennedy +2,Arthur,Kelly +3,Rachel,Moreno diff --git a/test/integration/004_simple_snapshot_test/test-snapshots-checkall/snapshot.sql b/test/integration/004_simple_snapshot_test/test-snapshots-checkall/snapshot.sql new file mode 100644 index 00000000000..b9cd002ca1b --- /dev/null +++ b/test/integration/004_simple_snapshot_test/test-snapshots-checkall/snapshot.sql @@ -0,0 +1,4 @@ +{% snapshot my_snapshot %} + {{ config(check_cols='all', unique_key='id', strategy='check', target_database=database, target_schema=schema) }} + select * from {{ ref(var('seed_name', 'seed')) }} +{% endsnapshot %} diff --git a/test/integration/004_simple_snapshot_test/test_simple_snapshot.py b/test/integration/004_simple_snapshot_test/test_simple_snapshot.py index b6a42bdc753..ce21887009f 100644 --- a/test/integration/004_simple_snapshot_test/test_simple_snapshot.py +++ b/test/integration/004_simple_snapshot_test/test_simple_snapshot.py @@ -95,6 +95,79 @@ def test__redshift__simple_snapshot(self): self.assert_expected() +class TestSimpleColumnSnapshotFiles(DBTIntegrationTest): + + @property + def schema(self): + return "simple_snapshot_004" + + @property + def models(self): + return "models-checkall" + + @property + def project_config(self): + return { + 'data-paths': ['data'], + 'macro-paths': ['custom-snapshot-macros', 'macros'], + 'snapshot-paths': ['test-snapshots-checkall'], + 'seeds': { + 'quote_columns': False, + } + } + + def _run_snapshot_test(self): + self.run_dbt(['seed']) + self.run_dbt(['snapshot']) + database = self.default_database + if self.adapter_type == 'bigquery': + database = self.adapter.quote(database) + results = self.run_sql( + 'select * from {}.{}.my_snapshot'.format(database, self.unique_schema()), + fetch='all' + ) + self.assertEqual(len(results), 3) + for result in results: + self.assertEqual(len(result), 6) + + self.run_dbt(['snapshot', '--vars', '{seed_name: seed_newcol}']) + results = self.run_sql( + 'select * from {}.{}.my_snapshot where last_name is not NULL'.format(database, self.unique_schema()), + fetch='all' + ) + self.assertEqual(len(results), 3) + + for result in results: + # new column + self.assertEqual(len(result), 7) + self.assertIsNotNone(result[-1]) + + results = self.run_sql( + 'select * from {}.{}.my_snapshot where last_name is NULL'.format(database, self.unique_schema()), + fetch='all' + ) + self.assertEqual(len(results), 3) + for result in results: + # new column + self.assertEqual(len(result), 7) + + @use_profile('postgres') + def test_postgres_renamed_source(self): + self._run_snapshot_test() + + @use_profile('snowflake') + def test_snowflake_renamed_source(self): + self._run_snapshot_test() + + @use_profile('redshift') + def test_redshift_renamed_source(self): + self._run_snapshot_test() + + @use_profile('bigquery') + def test_bigquery_renamed_source(self): + self._run_snapshot_test() + + class TestCustomSnapshotFiles(BaseSimpleSnapshotTest): @property def project_config(self): diff --git a/test/integration/base.py b/test/integration/base.py index f8546cd7061..c5cbc902c72 100644 --- a/test/integration/base.py +++ b/test/integration/base.py @@ -619,7 +619,7 @@ def run_sql_common(self, sql, fetch, conn): else: return except BaseException as e: - if conn.handle and not conn.handle.is_closed(): + if conn.handle and not conn.handle.closed: conn.handle.rollback() print(sql) print(e) From ab4925f59f0bde0555d17f20940a6fd9576585e8 Mon Sep 17 00:00:00 2001 From: Jacob Beck Date: Wed, 11 Dec 2019 13:30:04 -0700 Subject: [PATCH 2/2] this is ok now --- .../004_simple_snapshot_test/test_simple_snapshot.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/test/integration/004_simple_snapshot_test/test_simple_snapshot.py b/test/integration/004_simple_snapshot_test/test_simple_snapshot.py index ce21887009f..1b86de9e21a 100644 --- a/test/integration/004_simple_snapshot_test/test_simple_snapshot.py +++ b/test/integration/004_simple_snapshot_test/test_simple_snapshot.py @@ -592,10 +592,8 @@ def test__bigquery__snapshot_with_new_field(self): # This adds new fields to the source table, and updates the expected snapshot output accordingly self.run_sql_file("add_column_to_source_bq.sql") - # this should fail because `check="all"` will try to compare the nested field - self.run_dbt(['snapshot'], expect_pass=False) - - self.run_dbt(["snapshot", '--select', 'snapshot_actual']) + # check_cols='all' will replace the changed field + self.run_dbt(['snapshot']) # A more thorough test would assert that snapshotted == expected, but BigQuery does not support the # "EXCEPT DISTINCT" operator on nested fields! Instead, just check that schemas are congruent. @@ -610,9 +608,15 @@ def test__bigquery__snapshot_with_new_field(self): schema=self.unique_schema(), table='snapshot_actual' ) + snapshotted_all_cols = self.get_table_columns( + database=self.default_database, + schema=self.unique_schema(), + table='snapshot_checkall' + ) self.assertTrue(len(expected_cols) > 0, "source table does not exist -- bad test") self.assertEqual(len(expected_cols), len(snapshotted_cols), "actual and expected column lengths are different") + self.assertEqual(len(expected_cols), len(snapshotted_all_cols)) for (expected_col, actual_col) in zip(expected_cols, snapshotted_cols): expected_name, expected_type, _ = expected_col