Skip to content

Commit

Permalink
Merge pull request #2749 from joelluijmes/snapshot-hard-deletes-joell
Browse files Browse the repository at this point in the history
Include hard-deletes when making snapshot
  • Loading branch information
beckjake authored Sep 23, 2020
2 parents 120eb5b + a4b80cc commit 8ee490b
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 5 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
- dbt will compare configurations using the un-rendered form of the config block in dbt_project.yml ([#2713](https://github.com/fishtown-analytics/dbt/issues/2713), [#2735](https://github.com/fishtown-analytics/dbt/pull/2735))
- Added state and defer arguments to the RPC client, matching the CLI ([#2678](https://github.com/fishtown-analytics/dbt/issues/2678), [#2736](https://github.com/fishtown-analytics/dbt/pull/2736))
- Added schema and dbt versions to JSON artifacts ([#2670](https://github.com/fishtown-analytics/dbt/issues/2670), [#2767](https://github.com/fishtown-analytics/dbt/pull/2767))
- Added ability to snapshot hard-deleted records (opt-in with `invalidate_hard_deletes` config option). ([#249](https://github.com/fishtown-analytics/dbt/issues/249), [#2749](https://github.com/fishtown-analytics/dbt/pull/2749))

Contributors:
- [@joelluijmes](https://github.com/joelluijmes) ([#2749](https://github.com/fishtown-analytics/dbt/pull/2749))

## dbt 0.18.1 (Release TBD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@
from snapshot_query
),

{%- if strategy.invalidate_hard_deletes %}

deletes_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key
from snapshot_query
),
{% endif %}

insertions as (

select
Expand Down Expand Up @@ -99,9 +110,33 @@
)
)

{%- if strategy.invalidate_hard_deletes -%}
,

deletes as (

select
'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as dbt_valid_from,
{{ snapshot_get_time() }} as dbt_updated_at,
{{ snapshot_get_time() }} as dbt_valid_to,
snapshotted_data.dbt_scd_id

from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_valid_to is null
and source_data.dbt_unique_key is null
)
{%- endif %}

select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
union all
select * from deletes
{%- endif %}

{%- endmacro %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
then update
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

when matched
and DBT_INTERNAL_DEST.dbt_valid_to is null
and DBT_INTERNAL_SOURCE.dbt_change_type = 'delete'
then update
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

when not matched
and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
then insert ({{ insert_cols_csv }})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set primary_key = config['unique_key'] %}
{% set updated_at = config['updated_at'] %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %}

{#/*
The snapshot relation might not have an {{ updated_at }} value if the
Expand All @@ -86,7 +87,8 @@
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
}) %}
{% endmacro %}

Expand Down Expand Up @@ -131,6 +133,8 @@
{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %}

{% set select_current_time -%}
select {{ snapshot_get_time() }} as snapshot_start
{%- endset %}
Expand Down Expand Up @@ -173,6 +177,7 @@
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
}) %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
and DBT_INTERNAL_SOURCE.dbt_change_type::text = 'update'::text
and {{ target }}.dbt_valid_to is null;

update {{ target }}
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
from {{ source }} as DBT_INTERNAL_SOURCE
where DBT_INTERNAL_SOURCE.dbt_scd_id::text = {{ target }}.dbt_scd_id::text
and DBT_INTERNAL_SOURCE.dbt_change_type::text = 'delete'::text
and {{ target }}.dbt_valid_to is null;

insert into {{ target }} ({{ insert_cols_csv }})
select {% for column in insert_cols -%}
DBT_INTERNAL_SOURCE.{{ column }} {%- if not loop.last %}, {%- endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
updated_at='updated_at',
)
}}

{% if var('invalidate_hard_deletes', 'false') | as_bool %}
{{ config(invalidate_hard_deletes=True) }}
{% endif %}

select * from `{{target.database}}`.`{{schema}}`.seed

{% endsnapshot %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
updated_at='updated_at',
)
}}

{% if var('invalidate_hard_deletes', 'false') | as_bool %}
{{ config(invalidate_hard_deletes=True) }}
{% endif %}

select * from {{target.database}}.{{target.schema}}.seed

{% endsnapshot %}
106 changes: 106 additions & 0 deletions test/integration/004_simple_snapshot_test/test_simple_snapshot.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from test.integration.base import DBTIntegrationTest, use_profile
from datetime import datetime
import pytz
import dbt.exceptions


Expand Down Expand Up @@ -754,3 +756,107 @@ def test__postgres__changing_strategy(self):

results = self.run_dbt(['test'])
self.assertEqual(len(results), 1)


class TestSnapshotHardDelete(DBTIntegrationTest):
# These tests uses the same seed data, containing 20 records of which we hard delete the last 10.
# These deleted records set the dbt_valid_to to time the snapshot was ran.
NUM_SNAPSHOT_MODELS = 1

@property
def schema(self):
return "simple_snapshot_004"

@property
def models(self):
return "models"

@property
def project_config(self):
if self.adapter_type == 'bigquery':
paths = ['test-snapshots-bq']
else:
paths = ['test-snapshots-pg']

return {
'config-version': 2,
"data-paths": ['data'],
"snapshot-paths": paths,
'macro-paths': ['macros'],
}

def _seed_and_snapshot(self):
if self.adapter_type == 'bigquery':
self.run_sql_file('seed_bq.sql')
elif self.adapter_type == 'postgres':
self.run_sql_file('seed_pg.sql')
else:
self.run_sql_file('seed.sql')

results = self.run_dbt(['snapshot'])
self.assertEqual(len(results), self.NUM_SNAPSHOT_MODELS)

if self.adapter_type == 'snowflake':
self.assertTablesEqual("SNAPSHOT_EXPECTED", "SNAPSHOT_ACTUAL")
else:
self.assertTablesEqual("snapshot_expected", "snapshot_actual")

def _delete_records(self):
database = self.default_database
if self.adapter_type == 'bigquery':
database = self.adapter.quote(database)

self.run_sql(
'delete from {}.{}.seed where id >= 10;'.format(database, self.unique_schema())
)

def _invalidate_and_assert_records(self):
begin_snapshot_datetime = datetime.now(pytz.UTC)

results = self.run_dbt(['snapshot', '--vars', '{invalidate_hard_deletes: true}'])
self.assertEqual(len(results), self.NUM_SNAPSHOT_MODELS)

database = self.default_database
if self.adapter_type == 'bigquery':
database = self.adapter.quote(database)

results = self.run_sql(
'''
select
id,
dbt_valid_to
from {}.{}.snapshot_actual
order by id
'''.format(database, self.unique_schema()),
fetch='all'
)

self.assertEqual(len(results), 20)
for result in results[10:]:
# result is a tuple, the dbt_valid_to column is the latest
self.assertIsInstance(result[-1], datetime)
self.assertGreaterEqual(result[-1].replace(tzinfo=pytz.UTC), begin_snapshot_datetime)

@use_profile('postgres')
def test__postgres__snapshot_hard_delete(self):
self._seed_and_snapshot()
self._delete_records()
self._invalidate_and_assert_records()

@use_profile('bigquery')
def test__bigquery__snapshot_hard_delete(self):
self._seed_and_snapshot()
self._delete_records()
self._invalidate_and_assert_records()

@use_profile('snowflake')
def test__snowflake__snapshot_hard_delete(self):
self._seed_and_snapshot()
self._delete_records()
self._invalidate_and_assert_records()

@use_profile('redshift')
def test__redshift__snapshot_hard_delete(self):
self._seed_and_snapshot()
self._delete_records()
self._invalidate_and_assert_records()
6 changes: 3 additions & 3 deletions test/rpc/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
updated_at='updated_at',
)
}}
select 1 as id, '2019-10-31 23:59:40' as updated_at
select 1 as id, cast('2019-10-31 23:59:40' as timestamp) as updated_at
{% endsnapshot %}
'''
Expand All @@ -36,7 +36,7 @@
updated_at='updated_at',
)
}}
select 2 as id, '2019-10-31 23:59:40' as updated_at
select 2 as id, cast('2019-10-31 23:59:40' as timestamp) as updated_at
{% endsnapshot %}
'''
Expand Down Expand Up @@ -169,4 +169,4 @@ def test_rpc_snapshot_state(
results = querier.async_wait_for_result(
querier.snapshot(state='./state', select=['state:modified']),
)
assert len(results['results']) == 0
assert len(results['results']) == 0

0 comments on commit 8ee490b

Please sign in to comment.