diff --git a/CHANGELOG.md b/CHANGELOG.md index 59a2ef92d4a..6c3a864f51d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,10 @@ - dbt will compare configurations using the un-rendered form of the config block in dbt_project.yml ([#2713](https://github.com/fishtown-analytics/dbt/issues/2713), [#2735](https://github.com/fishtown-analytics/dbt/pull/2735)) - Added state and defer arguments to the RPC client, matching the CLI ([#2678](https://github.com/fishtown-analytics/dbt/issues/2678), [#2736](https://github.com/fishtown-analytics/dbt/pull/2736)) - Added schema and dbt versions to JSON artifacts ([#2670](https://github.com/fishtown-analytics/dbt/issues/2670), [#2767](https://github.com/fishtown-analytics/dbt/pull/2767)) +- Added ability to snapshot hard-deleted records (opt-in with `invalidate_hard_deletes` config option). ([#249](https://github.com/fishtown-analytics/dbt/issues/249), [#2749](https://github.com/fishtown-analytics/dbt/pull/2749)) + +Contributors: +- [@joelluijmes](https://github.com/joelluijmes) ([#2749](https://github.com/fishtown-analytics/dbt/pull/2749)) ## dbt 0.18.1 (Release TBD) diff --git a/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql b/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql index 04d2112fe51..0043f30ee3b 100644 --- a/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql +++ b/core/dbt/include/global_project/macros/materializations/snapshot/snapshot.sql @@ -65,6 +65,17 @@ from snapshot_query ), + {%- if strategy.invalidate_hard_deletes %} + + deletes_source_data as ( + + select + *, + {{ strategy.unique_key }} as dbt_unique_key + from snapshot_query + ), + {% endif %} + insertions as ( select @@ -99,9 +110,33 @@ ) ) + {%- if strategy.invalidate_hard_deletes -%} + , + + deletes as ( + + select + 'delete' as dbt_change_type, + source_data.*, + {{ snapshot_get_time() }} as dbt_valid_from, + {{ snapshot_get_time() }} as dbt_updated_at, + {{ snapshot_get_time() }} as dbt_valid_to, + snapshotted_data.dbt_scd_id + + from snapshotted_data + left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key + where snapshotted_data.dbt_valid_to is null + and source_data.dbt_unique_key is null + ) + {%- endif %} + select * from insertions union all select * from updates + {%- if strategy.invalidate_hard_deletes %} + union all + select * from deletes + {%- endif %} {%- endmacro %} diff --git a/core/dbt/include/global_project/macros/materializations/snapshot/snapshot_merge.sql b/core/dbt/include/global_project/macros/materializations/snapshot/snapshot_merge.sql index e4370196db3..74bfa2b32d3 100644 --- a/core/dbt/include/global_project/macros/materializations/snapshot/snapshot_merge.sql +++ b/core/dbt/include/global_project/macros/materializations/snapshot/snapshot_merge.sql @@ -17,6 +17,12 @@ then update set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to + when matched + and DBT_INTERNAL_DEST.dbt_valid_to is null + and DBT_INTERNAL_SOURCE.dbt_change_type = 'delete' + then update + set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to + when not matched and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert' then insert ({{ insert_cols_csv }}) diff --git a/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql b/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql index 94915868061..6b5f2a470b5 100644 --- a/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql +++ b/core/dbt/include/global_project/macros/materializations/snapshot/strategies.sql @@ -66,6 +66,7 @@ {% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} {% set primary_key = config['unique_key'] %} {% set updated_at = config['updated_at'] %} + {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} {#/* The snapshot relation might not have an {{ updated_at }} value if the @@ -86,7 +87,8 @@ "unique_key": primary_key, "updated_at": updated_at, "row_changed": row_changed_expr, - "scd_id": scd_id_expr + "scd_id": scd_id_expr, + "invalidate_hard_deletes": invalidate_hard_deletes }) %} {% endmacro %} @@ -131,6 +133,8 @@ {% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} {% set check_cols_config = config['check_cols'] %} {% set primary_key = config['unique_key'] %} + {% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %} + {% set select_current_time -%} select {{ snapshot_get_time() }} as snapshot_start {%- endset %} @@ -173,6 +177,7 @@ "unique_key": primary_key, "updated_at": updated_at, "row_changed": row_changed_expr, - "scd_id": scd_id_expr + "scd_id": scd_id_expr, + "invalidate_hard_deletes": invalidate_hard_deletes }) %} {% endmacro %} diff --git a/plugins/postgres/dbt/include/postgres/macros/materializations/snapshot_merge.sql b/plugins/postgres/dbt/include/postgres/macros/materializations/snapshot_merge.sql index 6f8e7cbc6a5..ba016c83a6a 100644 --- a/plugins/postgres/dbt/include/postgres/macros/materializations/snapshot_merge.sql +++ b/plugins/postgres/dbt/include/postgres/macros/materializations/snapshot_merge.sql @@ -9,6 +9,13 @@ and DBT_INTERNAL_SOURCE.dbt_change_type::text = 'update'::text and {{ target }}.dbt_valid_to is null; + update {{ target }} + set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to + from {{ source }} as DBT_INTERNAL_SOURCE + where DBT_INTERNAL_SOURCE.dbt_scd_id::text = {{ target }}.dbt_scd_id::text + and DBT_INTERNAL_SOURCE.dbt_change_type::text = 'delete'::text + and {{ target }}.dbt_valid_to is null; + insert into {{ target }} ({{ insert_cols_csv }}) select {% for column in insert_cols -%} DBT_INTERNAL_SOURCE.{{ column }} {%- if not loop.last %}, {%- endif %} diff --git a/test/integration/004_simple_snapshot_test/test-snapshots-bq/snapshot.sql b/test/integration/004_simple_snapshot_test/test-snapshots-bq/snapshot.sql index 639e65904b7..af77f5fc472 100644 --- a/test/integration/004_simple_snapshot_test/test-snapshots-bq/snapshot.sql +++ b/test/integration/004_simple_snapshot_test/test-snapshots-bq/snapshot.sql @@ -9,6 +9,11 @@ updated_at='updated_at', ) }} + + {% if var('invalidate_hard_deletes', 'false') | as_bool %} + {{ config(invalidate_hard_deletes=True) }} + {% endif %} + select * from `{{target.database}}`.`{{schema}}`.seed {% endsnapshot %} diff --git a/test/integration/004_simple_snapshot_test/test-snapshots-pg/snapshot.sql b/test/integration/004_simple_snapshot_test/test-snapshots-pg/snapshot.sql index 70be6d4ed89..ae5aac087ac 100644 --- a/test/integration/004_simple_snapshot_test/test-snapshots-pg/snapshot.sql +++ b/test/integration/004_simple_snapshot_test/test-snapshots-pg/snapshot.sql @@ -9,6 +9,11 @@ updated_at='updated_at', ) }} + + {% if var('invalidate_hard_deletes', 'false') | as_bool %} + {{ config(invalidate_hard_deletes=True) }} + {% endif %} + select * from {{target.database}}.{{target.schema}}.seed {% endsnapshot %} diff --git a/test/integration/004_simple_snapshot_test/test_simple_snapshot.py b/test/integration/004_simple_snapshot_test/test_simple_snapshot.py index fc3e1be3840..8ca08ccc047 100644 --- a/test/integration/004_simple_snapshot_test/test_simple_snapshot.py +++ b/test/integration/004_simple_snapshot_test/test_simple_snapshot.py @@ -1,4 +1,6 @@ from test.integration.base import DBTIntegrationTest, use_profile +from datetime import datetime +import pytz import dbt.exceptions @@ -754,3 +756,107 @@ def test__postgres__changing_strategy(self): results = self.run_dbt(['test']) self.assertEqual(len(results), 1) + + +class TestSnapshotHardDelete(DBTIntegrationTest): + # These tests uses the same seed data, containing 20 records of which we hard delete the last 10. + # These deleted records set the dbt_valid_to to time the snapshot was ran. + NUM_SNAPSHOT_MODELS = 1 + + @property + def schema(self): + return "simple_snapshot_004" + + @property + def models(self): + return "models" + + @property + def project_config(self): + if self.adapter_type == 'bigquery': + paths = ['test-snapshots-bq'] + else: + paths = ['test-snapshots-pg'] + + return { + 'config-version': 2, + "data-paths": ['data'], + "snapshot-paths": paths, + 'macro-paths': ['macros'], + } + + def _seed_and_snapshot(self): + if self.adapter_type == 'bigquery': + self.run_sql_file('seed_bq.sql') + elif self.adapter_type == 'postgres': + self.run_sql_file('seed_pg.sql') + else: + self.run_sql_file('seed.sql') + + results = self.run_dbt(['snapshot']) + self.assertEqual(len(results), self.NUM_SNAPSHOT_MODELS) + + if self.adapter_type == 'snowflake': + self.assertTablesEqual("SNAPSHOT_EXPECTED", "SNAPSHOT_ACTUAL") + else: + self.assertTablesEqual("snapshot_expected", "snapshot_actual") + + def _delete_records(self): + database = self.default_database + if self.adapter_type == 'bigquery': + database = self.adapter.quote(database) + + self.run_sql( + 'delete from {}.{}.seed where id >= 10;'.format(database, self.unique_schema()) + ) + + def _invalidate_and_assert_records(self): + begin_snapshot_datetime = datetime.now(pytz.UTC) + + results = self.run_dbt(['snapshot', '--vars', '{invalidate_hard_deletes: true}']) + self.assertEqual(len(results), self.NUM_SNAPSHOT_MODELS) + + database = self.default_database + if self.adapter_type == 'bigquery': + database = self.adapter.quote(database) + + results = self.run_sql( + ''' + select + id, + dbt_valid_to + from {}.{}.snapshot_actual + order by id + '''.format(database, self.unique_schema()), + fetch='all' + ) + + self.assertEqual(len(results), 20) + for result in results[10:]: + # result is a tuple, the dbt_valid_to column is the latest + self.assertIsInstance(result[-1], datetime) + self.assertGreaterEqual(result[-1].replace(tzinfo=pytz.UTC), begin_snapshot_datetime) + + @use_profile('postgres') + def test__postgres__snapshot_hard_delete(self): + self._seed_and_snapshot() + self._delete_records() + self._invalidate_and_assert_records() + + @use_profile('bigquery') + def test__bigquery__snapshot_hard_delete(self): + self._seed_and_snapshot() + self._delete_records() + self._invalidate_and_assert_records() + + @use_profile('snowflake') + def test__snowflake__snapshot_hard_delete(self): + self._seed_and_snapshot() + self._delete_records() + self._invalidate_and_assert_records() + + @use_profile('redshift') + def test__redshift__snapshot_hard_delete(self): + self._seed_and_snapshot() + self._delete_records() + self._invalidate_and_assert_records() \ No newline at end of file diff --git a/test/rpc/test_snapshots.py b/test/rpc/test_snapshots.py index 0a313ad0452..44fbfa3031e 100644 --- a/test/rpc/test_snapshots.py +++ b/test/rpc/test_snapshots.py @@ -19,7 +19,7 @@ updated_at='updated_at', ) }} - select 1 as id, '2019-10-31 23:59:40' as updated_at + select 1 as id, cast('2019-10-31 23:59:40' as timestamp) as updated_at {% endsnapshot %} ''' @@ -36,7 +36,7 @@ updated_at='updated_at', ) }} - select 2 as id, '2019-10-31 23:59:40' as updated_at + select 2 as id, cast('2019-10-31 23:59:40' as timestamp) as updated_at {% endsnapshot %} ''' @@ -169,4 +169,4 @@ def test_rpc_snapshot_state( results = querier.async_wait_for_result( querier.snapshot(state='./state', select=['state:modified']), ) - assert len(results['results']) == 0 + assert len(results['results']) == 0 \ No newline at end of file