Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add snapshot support #66

Closed
wants to merge 4 commits into from
Closed

Conversation

tongqqiu
Copy link

  • cast as string (the base on use varchar, which is not working on spark)
  • snapshot only works on delta format
  • slightly change the based implementation to fit delta merging needs.

Tony Qiu added 4 commits March 2, 2020 16:06
- making snapshot working with delta lake
…pr/support_snapshot

# Conflicts:
#	dbt/adapters/spark/impl.py
#	dbt/include/spark/macros/materializations/seed.sql
@tongqqiu
Copy link
Author

@beckjake @jtcohen6 Try to make snapshot working on the delta format

@beckjake beckjake force-pushed the pr/0.15.3_upgrade branch from a24e4e3 to 9b13d12 Compare March 23, 2020 14:51
@jtcohen6
Copy link
Contributor

@tongqqiu This is really cool! I'm going to take your code for a spin over the next few days. It would be awesome to ship this as part of a 0.16.0 release.

@jtcohen6 jtcohen6 changed the base branch from pr/0.15.3_upgrade to master March 23, 2020 23:36
@jtcohen6
Copy link
Contributor

jtcohen6 commented Mar 26, 2020

There's two categories of code addition going on here:

  1. Obvious adapter-specific reimplementation. dbt-core has created space for these by use of adapter macros.
    • spark__snapshot_hash_arguments—all set
    • spark__snapshot_string_as_time—need to add this to the PR, here's my version:
{% macro spark__snapshot_string_as_time(timestamp) -%}
    {%- set result = "to_timestamp('" ~ timestamp ~ "')" -%}
    {{ return(result) }}
{%- endmacro %}
  1. Areas where dbt's core implementation does not work on Spark by default, and has not enabled easy override of targeted functions by way of adapter macros.

Specific problem

There are two issues with the insert that's happening in dbt's core implementation here:

  • On Spark, insert into does not take a list of columns for insertion
  • Spark doesn't have temp tables, it only has temp views. Temp views cannot be inserted into or described

The solution in this PR is to create two separate temp tables, one for updates and one for inserts, and then perform two merge statements (one for each).

While I can't determine the exact implementation right now, if we're already overwriting a bunch of code, I wonder if we don't instead create a unioned table of all updates + inserts, and then perform a single (atomic) merge statement that looks a lot like the default implementation:

merge into {{ target }} as DBT_INTERNAL_DEST
    using {{ source.include(schema=false) }} as DBT_INTERNAL_SOURCE
    on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id
    
    when matched
     and DBT_INTERNAL_DEST.dbt_valid_to is null
     and DBT_INTERNAL_SOURCE.dbt_change_type = 'update'
        then update
        set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

    when not matched
     and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
        then insert *

General problem

How can we override these macros? build_snapshot_table, snapshot_staging_table_updates, snapshot_staging_table_inserts. Simply creating a macro in dbt-spark with the same name as a macro in dbt-core results in

Running with dbt=0.16.0
Encountered an error:
'ParsedMacro' object has no attribute 'namespace'

Right now, the only sane way to do this is by adding a prefix (e.g. spark__) to every one of these macros, such that they have a different name from a macro defined in core.

I'd be interested to talk through a better solution for overriding parts (but not all) of the snapshot macro-stack, working toward something that could be more easily generalized to other plugins.

Working version

I was able to get the code in this PR working on 0.16.0 by:

  • merging the changes from master
  • adding spark__snapshot_string_as_time as above
  • renaming all the macros that have namespace collisions with dbt-core

I think, no matter how we implement this, we should ensure that the user knows early and often this is Delta-only functionality. Part of that looks like updating the README, part of it looks like raising a compilation error if the snapshot's config.file_format != 'delta'.

@beckjake
Copy link
Contributor

Suggestion: Name spark-unique macros that aren't intentionally overriding a core adapter_macro with spark_ instead of spark__ (one underscore, not two) to avoid getting accidentally picked up by core if it ever changes.

@jtcohen6
Copy link
Contributor

jtcohen6 commented Apr 7, 2020

@tongqqiu Are you up for making the few small changes to this PR required to get it working? I believe it would just involve:

  • Merging changes from master
  • Implementing spark__snapshot_string_as_time
  • Prepending macro names with spark_ if they have dbt-core namespace collisions

@beckjake As far as how to add a test for Spark snapshots, I thinking that for now it's something we could add to the spark-support branch of dbt-integration-tests.

@jtcohen6 jtcohen6 mentioned this pull request Apr 8, 2020
@tongqqiu
Copy link
Author

tongqqiu commented Apr 8, 2020

@jtcohen6 should you help with those changes?

@jtcohen6
Copy link
Contributor

@jtcohen6 should you help with those changes?

I'm happy to take it from here, if that's okay with you!

@tongqqiu
Copy link
Author

@jtcohen6 yes. That would be great!

@jtcohen6 jtcohen6 mentioned this pull request Apr 21, 2020
1 task
@jtcohen6 jtcohen6 closed this in #76 May 22, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants