Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include hard-deletes when making snapshot #2749

Merged
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
- dbt will compare configurations using the un-rendered form of the config block in dbt_project.yml ([#2713](https://github.com/fishtown-analytics/dbt/issues/2713), [#2735](https://github.com/fishtown-analytics/dbt/pull/2735))
- Added state and defer arguments to the RPC client, matching the CLI ([#2678](https://github.com/fishtown-analytics/dbt/issues/2678), [#2736](https://github.com/fishtown-analytics/dbt/pull/2736))
- Added schema and dbt versions to JSON artifacts ([#2670](https://github.com/fishtown-analytics/dbt/issues/2670), [#2767](https://github.com/fishtown-analytics/dbt/pull/2767))
- Added ability to snapshot hard-deleted records (opt-in with `invalidate_hard_deletes` config option). ([#249](https://github.com/fishtown-analytics/dbt/issues/249), [#2749](https://github.com/fishtown-analytics/dbt/pull/2749))

Contributors:
- [@joelluijmes](https://github.com/joelluijmes) ([#2749](https://github.com/fishtown-analytics/dbt/pull/2749))

## dbt 0.18.1 (Release TBD)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@
from snapshot_query
),

{%- if strategy.invalidate_hard_deletes %}

deletes_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key
from snapshot_query
),
{% endif %}

insertions as (

select
Expand Down Expand Up @@ -99,9 +110,33 @@
)
)

{%- if strategy.invalidate_hard_deletes -%}
,

deletes as (

select
'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as dbt_valid_from,
{{ snapshot_get_time() }} as dbt_updated_at,
{{ snapshot_get_time() }} as dbt_valid_to,
snapshotted_data.dbt_scd_id

from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_valid_to is null
and source_data.dbt_unique_key is null
)
{%- endif %}

select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
union all
select * from deletes
{%- endif %}

{%- endmacro %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@
then update
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

when matched
and DBT_INTERNAL_DEST.dbt_valid_to is null
and DBT_INTERNAL_SOURCE.dbt_change_type = 'delete'
then update
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

when not matched
and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
then insert ({{ insert_cols_csv }})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set primary_key = config['unique_key'] %}
{% set updated_at = config['updated_at'] %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %}

{#/*
The snapshot relation might not have an {{ updated_at }} value if the
Expand All @@ -86,7 +87,8 @@
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
}) %}
{% endmacro %}

Expand Down Expand Up @@ -131,6 +133,8 @@
{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %}

{% set select_current_time -%}
select {{ snapshot_get_time() }} as snapshot_start
{%- endset %}
Expand Down Expand Up @@ -173,6 +177,7 @@
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
}) %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@
and DBT_INTERNAL_SOURCE.dbt_change_type::text = 'update'::text
and {{ target }}.dbt_valid_to is null;

update {{ target }}
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to
from {{ source }} as DBT_INTERNAL_SOURCE
where DBT_INTERNAL_SOURCE.dbt_scd_id::text = {{ target }}.dbt_scd_id::text
and DBT_INTERNAL_SOURCE.dbt_change_type::text = 'delete'::text
and {{ target }}.dbt_valid_to is null;

insert into {{ target }} ({{ insert_cols_csv }})
select {% for column in insert_cols -%}
DBT_INTERNAL_SOURCE.{{ column }} {%- if not loop.last %}, {%- endif %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
updated_at='updated_at',
)
}}

{% if var('invalidate_hard_deletes', 'false') | as_bool %}
{{ config(invalidate_hard_deletes=True) }}
{% endif %}

select * from `{{target.database}}`.`{{schema}}`.seed

{% endsnapshot %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@
updated_at='updated_at',
)
}}

{% if var('invalidate_hard_deletes', 'false') | as_bool %}
{{ config(invalidate_hard_deletes=True) }}
{% endif %}

select * from {{target.database}}.{{target.schema}}.seed

{% endsnapshot %}
106 changes: 106 additions & 0 deletions test/integration/004_simple_snapshot_test/test_simple_snapshot.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from test.integration.base import DBTIntegrationTest, use_profile
from datetime import datetime
import pytz
import dbt.exceptions


Expand Down Expand Up @@ -754,3 +756,107 @@ def test__postgres__changing_strategy(self):

results = self.run_dbt(['test'])
self.assertEqual(len(results), 1)


class TestSnapshotHardDelete(DBTIntegrationTest):
# These tests uses the same seed data, containing 20 records of which we hard delete the last 10.
# These deleted records set the dbt_valid_to to time the snapshot was ran.
NUM_SNAPSHOT_MODELS = 1

@property
def schema(self):
return "simple_snapshot_004"

@property
def models(self):
return "models"

@property
def project_config(self):
if self.adapter_type == 'bigquery':
paths = ['test-snapshots-bq']
else:
paths = ['test-snapshots-pg']

return {
'config-version': 2,
"data-paths": ['data'],
"snapshot-paths": paths,
'macro-paths': ['macros'],
}

def _seed_and_snapshot(self):
if self.adapter_type == 'bigquery':
self.run_sql_file('seed_bq.sql')
elif self.adapter_type == 'postgres':
self.run_sql_file('seed_pg.sql')
else:
self.run_sql_file('seed.sql')

results = self.run_dbt(['snapshot'])
self.assertEqual(len(results), self.NUM_SNAPSHOT_MODELS)

if self.adapter_type == 'snowflake':
self.assertTablesEqual("SNAPSHOT_EXPECTED", "SNAPSHOT_ACTUAL")
else:
self.assertTablesEqual("snapshot_expected", "snapshot_actual")

def _delete_records(self):
database = self.default_database
if self.adapter_type == 'bigquery':
database = self.adapter.quote(database)

self.run_sql(
'delete from {}.{}.seed where id >= 10;'.format(database, self.unique_schema())
)

def _invalidate_and_assert_records(self):
begin_snapshot_datetime = datetime.now(pytz.UTC)

results = self.run_dbt(['snapshot', '--vars', '{invalidate_hard_deletes: true}'])
self.assertEqual(len(results), self.NUM_SNAPSHOT_MODELS)

database = self.default_database
if self.adapter_type == 'bigquery':
database = self.adapter.quote(database)

results = self.run_sql(
'''
select
id,
dbt_valid_to
from {}.{}.snapshot_actual
order by id
'''.format(database, self.unique_schema()),
fetch='all'
)

self.assertEqual(len(results), 20)
for result in results[10:]:
# result is a tuple, the dbt_valid_to column is the latest
self.assertIsInstance(result[-1], datetime)
self.assertGreaterEqual(result[-1].replace(tzinfo=pytz.UTC), begin_snapshot_datetime)

@use_profile('postgres')
def test__postgres__snapshot_hard_delete(self):
self._seed_and_snapshot()
self._delete_records()
self._invalidate_and_assert_records()

@use_profile('bigquery')
def test__bigquery__snapshot_hard_delete(self):
self._seed_and_snapshot()
self._delete_records()
self._invalidate_and_assert_records()

@use_profile('snowflake')
def test__snowflake__snapshot_hard_delete(self):
self._seed_and_snapshot()
self._delete_records()
self._invalidate_and_assert_records()

@use_profile('redshift')
def test__redshift__snapshot_hard_delete(self):
self._seed_and_snapshot()
self._delete_records()
self._invalidate_and_assert_records()
6 changes: 3 additions & 3 deletions test/rpc/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
updated_at='updated_at',
)
}}
select 1 as id, '2019-10-31 23:59:40' as updated_at
select 1 as id, cast('2019-10-31 23:59:40' as timestamp) as updated_at

{% endsnapshot %}
'''
Expand All @@ -36,7 +36,7 @@
updated_at='updated_at',
)
}}
select 2 as id, '2019-10-31 23:59:40' as updated_at
select 2 as id, cast('2019-10-31 23:59:40' as timestamp) as updated_at

{% endsnapshot %}
'''
Expand Down Expand Up @@ -169,4 +169,4 @@ def test_rpc_snapshot_state(
results = querier.async_wait_for_result(
querier.snapshot(state='./state', select=['state:modified']),
)
assert len(results['results']) == 0
assert len(results['results']) == 0