Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include hard-deletes when making snapshot #2749

Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
### Features
- dbt will compare configurations using the un-rendered form of the config block in dbt_project.yml ([#2713](https://github.com/fishtown-analytics/dbt/issues/2713), [#2735](https://github.com/fishtown-analytics/dbt/pull/2735))
- Added state and defer arguments to the RPC client, matching the CLI ([#2678](https://github.com/fishtown-analytics/dbt/issues/2678), [#2736](https://github.com/fishtown-analytics/dbt/pull/2736))
- Added ability to snapshot hard-deleted records (opt-in with `invalidate_hard_deletes` config option). ([#249](https://github.com/fishtown-analytics/dbt/issues/249), [#2749](https://github.com/fishtown-analytics/dbt/pull/2749))

Contributors:
- [@joelluijmes](https://github.com/joelluijmes) ([#2749](https://github.com/fishtown-analytics/dbt/pull/2749))

## dbt 0.18.0 (September 03, 2020)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@
from snapshot_query
),

{%- if strategy.invalidate_hard_deletes %}
deletes_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key
from snapshot_query
),
{% endif %}

insertions as (

select
Expand Down Expand Up @@ -99,9 +109,33 @@
)
)

{%- if strategy.invalidate_hard_deletes -%}
,

deletes as (

select
'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as dbt_valid_from,
{{ snapshot_get_time() }} as dbt_updated_at,
{{ snapshot_get_time() }} as dbt_valid_to,
snapshotted_data.dbt_scd_id

from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_valid_to is null
and source_data.dbt_unique_key is null
)
{%- endif %}

select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
union all
select * from deletes
{%- endif %}

{%- endmacro %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
then update
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

when matched
and DBT_INTERNAL_DEST.dbt_valid_to is null
and DBT_INTERNAL_SOURCE.dbt_change_type = 'delete'
then update
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

when not matched
and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
then insert ({{ insert_cols_csv }})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set primary_key = config['unique_key'] %}
{% set updated_at = config['updated_at'] %}
{% set invalidate_hard_deletes = config['invalidate_hard_deletes'] %}
joelluijmes marked this conversation as resolved.
Show resolved Hide resolved

{#/*
The snapshot relation might not have an {{ updated_at }} value if the
Expand All @@ -86,7 +87,8 @@
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
}) %}
{% endmacro %}

Expand Down Expand Up @@ -131,6 +133,8 @@
{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set invalidate_hard_deletes = config['invalidate_hard_deletes'] %}
joelluijmes marked this conversation as resolved.
Show resolved Hide resolved

{% set select_current_time -%}
select {{ snapshot_get_time() }} as snapshot_start
{%- endset %}
Expand Down Expand Up @@ -173,6 +177,7 @@
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
}) %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
and DBT_INTERNAL_SOURCE.dbt_change_type::text = 'update'::text
and {{ target }}.dbt_valid_to is null;

update {{ target }}
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
from {{ source }} as DBT_INTERNAL_SOURCE
where DBT_INTERNAL_SOURCE.dbt_scd_id::text = {{ target }}.dbt_scd_id::text
and DBT_INTERNAL_SOURCE.dbt_change_type::text = 'delete'::text
and {{ target }}.dbt_valid_to is null;

insert into {{ target }} ({{ insert_cols_csv }})
select {% for column in insert_cols -%}
DBT_INTERNAL_SOURCE.{{ column }} {%- if not loop.last %}, {%- endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
updated_at='updated_at',
)
}}

{% if var('invalidate_hard_deletes', 'false') | as_bool %}
{{ config(invalidate_hard_deletes=True) }}
{% endif %}

select * from {{target.database}}.{{target.schema}}.seed

{% endsnapshot %}
42 changes: 42 additions & 0 deletions test/integration/004_simple_snapshot_test/test_simple_snapshot.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from test.integration.base import DBTIntegrationTest, use_profile
from datetime import datetime
import dbt.exceptions


Expand Down Expand Up @@ -754,3 +755,44 @@ def test__postgres__changing_strategy(self):

results = self.run_dbt(['test'])
self.assertEqual(len(results), 1)


class TestSnapshotHardDelete(BaseSimpleSnapshotTest):

@property
def project_config(self):
return {
'config-version': 2,
"data-paths": ['data'],
"snapshot-paths": ['test-snapshots-pg'],
'macro-paths': ['macros'],
}

@use_profile('postgres')
def test__postgres__snapshot_hard_delete(self):
# This test uses the same seed data, containing 20 records of which we hard delete the last 10.
# These deleted records set the dbt_valid_to to time the snapshot was ran.

self.dbt_run_seed_snapshot()
self.assert_expected()

database = self.default_database
self.run_sql(
'delete from {}.{}.seed where id >= 10;'.format(database, self.unique_schema())
)

begin_snapshot_datetime = datetime.utcnow()

results = self.run_dbt(['snapshot', '--vars', '{invalidate_hard_deletes: true}'])
self.assertEqual(len(results), self.NUM_SNAPSHOT_MODELS)

results = self.run_sql(
'select * from {}.{}.snapshot_actual'.format(database, self.unique_schema()),
fetch='all'
)

self.assertEqual(len(results), 20)
for result in results[10:]:
# result is a tuple, the dbt_valid_to column is the latest
self.assertIsInstance(result[-1], datetime)
self.assertGreaterEqual(result[-1], begin_snapshot_datetime)
6 changes: 3 additions & 3 deletions test/rpc/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
updated_at='updated_at',
)
}}
select 1 as id, '2019-10-31 23:59:40' as updated_at
select 1 as id, cast('2019-10-31 23:59:40' as timestamp) as updated_at

{% endsnapshot %}
'''
Expand All @@ -36,7 +36,7 @@
updated_at='updated_at',
)
}}
select 2 as id, '2019-10-31 23:59:40' as updated_at
select 2 as id, cast('2019-10-31 23:59:40' as timestamp) as updated_at

{% endsnapshot %}
'''
Expand Down Expand Up @@ -169,4 +169,4 @@ def test_rpc_snapshot_state(
results = querier.async_wait_for_result(
querier.snapshot(state='./state', select=['state:modified']),
)
assert len(results['results']) == 0
assert len(results['results']) == 0