Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/delta snapshots #76

Merged
merged 12 commits into from
May 22, 2020
172 changes: 172 additions & 0 deletions dbt/include/spark/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
{% macro spark__snapshot_hash_arguments(args) -%}
md5({%- for arg in args -%}
coalesce(cast({{ arg }} as string ), '')
{% if not loop.last %} || '|' || {% endif %}
{%- endfor -%})
{%- endmacro %}


{% macro spark__snapshot_string_as_time(timestamp) -%}
{%- set result = "to_timestamp('" ~ timestamp ~ "')" -%}
{{ return(result) }}
{%- endmacro %}


{% macro spark__snapshot_merge_sql(target, source, insert_cols) -%}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id
when matched
and DBT_INTERNAL_DEST.dbt_valid_to is null
and DBT_INTERNAL_SOURCE.dbt_change_type = 'update'
then update
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

when not matched
and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
then insert *
;
{% endmacro %}


{% macro spark_build_snapshot_staging_table(strategy, sql, target_relation) %}
{% set tmp_identifier = target_relation.identifier ~ '__dbt_tmp' %}

{%- set tmp_relation = api.Relation.create(identifier=tmp_identifier,
schema=target_relation.schema,
database=target_relation.database,
type='view') -%}

{% set select = snapshot_staging_table(strategy, sql, target_relation) %}

{# needs to be a non-temp view so that its columns can be ascertained via `describe` #}
{% call statement('build_snapshot_staging_relation') %}
{{ create_view_as(tmp_relation, select) }}
{% endcall %}

{% do return(tmp_relation) %}
{% endmacro %}


{% macro spark__post_snapshot(staging_relation) %}
{% do adapter.drop_relation(staging_relation) %}
{% endmacro %}


{% macro spark__create_columns(relation, columns) %}
{% if columns|length > 0 %}
{% call statement() %}
alter table {{ relation }} add columns (
{% for column in columns %}
`{{ column.name }}` {{ column.data_type }} {{- ',' if not loop.last -}}
{% endfor %}
);
{% endcall %}
{% endif %}
{% endmacro %}


{% materialization snapshot, adapter='spark' %}
{%- set config = model['config'] -%}

{%- set target_table = model.get('alias', model.get('name')) -%}

{%- set strategy_name = config.get('strategy') -%}
{%- set unique_key = config.get('unique_key') %}
{%- set file_format = config.get('file_format', 'parquet') -%}

{% set invalid_format_msg -%}
Invalid file format: {{ file_format }}
Snapshot functionality requires file_format be set to 'delta'
{%- endset %}

{%- if file_format != 'delta' -%}
{% do exceptions.raise_compiler_error(invalid_format_msg) %}
{% endif %}

{% if not adapter.check_schema_exists(model.database, model.schema) %}
{% do create_schema(model.database, model.schema) %}
{% endif %}

{% set target_relation_exists, target_relation = get_or_create_relation(
database=model.database,
schema=model.schema,
identifier=target_table,
type='table') -%}

{%- if not target_relation.is_table -%}
{% do exceptions.relation_wrong_type(target_relation, 'table') %}
{%- endif -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}

{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, model['injected_sql']) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}

{% set staging_table = spark_build_snapshot_staging_table(strategy, sql, target_relation) %}

-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}

{% do create_columns(target_relation, missing_columns) %}

{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}

{% set quoted_source_columns = [] %}
{% for column in source_columns %}
{% do quoted_source_columns.append(adapter.quote(column.name)) %}
{% endfor %}

{% set final_sql = snapshot_merge_sql(
target = target_relation,
source = staging_table,
insert_cols = quoted_source_columns
)
%}

{% endif %}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}

{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}