Skip to content

Commit

Permalink
Snowflake: no transactions, except for DML (#3510)
Browse files Browse the repository at this point in the history
* Rm Snowflake txnal logic. Explicit for DML

* Be less clever. Update create_or_replace_view()

* Seed DML as well

* Changelog entry

* Fix unit test

* One semicolon can change the world
  • Loading branch information
jtcohen6 authored Jul 27, 2021
1 parent 16264f5 commit 792f39a
Show file tree
Hide file tree
Showing 12 changed files with 126 additions and 68 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
## dbt 0.21.0 (Release TBD)

### Breaking changes
- Add full node selection to source freshness command and align selection syntax with other tasks (`dbt source freshness --select source_name` --> `dbt source freshness --select source:souce_name`) and rename `dbt source snapshot-freshness` -> `dbt source freshness`. ([#2987](https://github.com/dbt-labs/dbt/issues/2987), [#3554](https://github.com/dbt-labs/dbt/pull/3554))
- **dbt-snowflake:** Turn off transactions and turn on `autocommit` by default. Explicitly specify `begin` and `commit` for DML statements in incremental and snapshot materializations. Note that this may affect user-space code that depends on tranactions.

### Features
- Add `dbt build` command to run models, tests, seeds, and snapshots in DAG order. ([#2743] (https://github.com/dbt-labs/dbt/issues/2743), [#3490] (https://github.com/dbt-labs/dbt/issues/3490), [#3608](https://github.com/dbt-labs/dbt/issues/3608))
- Introduce `on_schema_change` config to detect and handle schema changes on incremental models ([#1132](https://github.com/fishtown-analytics/dbt/issues/1132), [#3387](https://github.com/fishtown-analytics/dbt/issues/3387))

### Breaking changes
- Add full node selection to source freshness command and align selection syntax with other tasks (`dbt source freshness --select source_name` --> `dbt source freshness --select source:souce_name`) and rename `dbt source snapshot-freshness` -> `dbt source freshness`. ([#2987](https://github.com/dbt-labs/dbt/issues/2987), [#3554](https://github.com/dbt-labs/dbt/pull/3554))

### Fixes
- Fix exception on yml files with all comments [3568](https://github.com/dbt-labs/dbt/issues/3568)
- Fix docs generation for cross-db sources in REDSHIFT RA3 node ([#3236](https://github.com/fishtown-analytics/dbt/issues/3236), [#3408](https://github.com/fishtown-analytics/dbt/pull/3408))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
(
select {{ dest_cols_csv }}
from {{ source }}
);
)

{%- endmacro %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
then insert ({{ insert_cols_csv }})
values ({{ insert_cols_csv }})
;
{% endmacro %}

{% endmacro %}

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{% endmacro %}

{% macro default__handle_existing_table(full_refresh, old_relation) %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{{ adapter.drop_relation(old_relation) }}
{% endmacro %}

Expand All @@ -19,7 +20,7 @@
*/
#}
{% macro create_or_replace_view(run_outside_transaction_hooks=True) %}
{% macro create_or_replace_view() %}
{%- set identifier = model['alias'] -%}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
Expand All @@ -30,13 +31,7 @@
identifier=identifier, schema=schema, database=database,
type='view') -%}
{% if run_outside_transaction_hooks %}
-- no transactions on BigQuery
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{% endif %}
-- `BEGIN` happens here on Snowflake
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{{ run_hooks(pre_hooks) }}
-- If there's a table with the same name and we weren't told to full refresh,
-- that's an error. If we were told to full refresh, drop it. This behavior differs
Expand All @@ -50,14 +45,7 @@
{{ create_view_as(target_relation, sql) }}
{%- endcall %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if run_outside_transaction_hooks %}
-- No transactions on BigQuery
{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endif %}
{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


{% materialization view, adapter='bigquery' -%}
{% set to_return = create_or_replace_view(run_outside_transaction_hooks=False) %}
{% set to_return = create_or_replace_view() %}

{% set target_relation = this.incorporate(type='view') %}
{% do persist_docs(target_relation, model) %}
Expand Down
31 changes: 18 additions & 13 deletions plugins/snowflake/dbt/adapters/snowflake/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def open(cls, connection):
schema=creds.schema,
warehouse=creds.warehouse,
role=creds.role,
autocommit=False,
autocommit=True,
client_session_keep_alive=creds.client_session_keep_alive,
application='dbt',
**creds.auth_args()
Expand Down Expand Up @@ -275,6 +275,23 @@ def get_response(cls, cursor) -> AdapterResponse:
code=code
)

# disable transactional logic by default on Snowflake
# except for DML statements where explicitly defined
def add_begin_query(self, *args, **kwargs):
pass

def add_commit_query(self, *args, **kwargs):
pass

def begin(self):
pass

def commit(self):
pass

def clear_transaction(self):
pass

@classmethod
def _split_queries(cls, sql):
"Splits sql statements at semicolons into discrete queries"
Expand Down Expand Up @@ -352,15 +369,3 @@ def add_query(self, sql, auto_begin=True,
)

return connection, cursor

@classmethod
def _rollback_handle(cls, connection):
"""On snowflake, rolling back the handle of an aborted session raises
an exception.
"""
try:
connection.handle.rollback()
except snowflake.connector.errors.ProgrammingError as e:
msg = str(e)
if 'Session no longer exists' not in msg:
raise
29 changes: 28 additions & 1 deletion plugins/snowflake/dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,31 @@

{% endif %}

{% endmacro %}
{% endmacro %}


{% macro snowflake_dml_explicit_transaction(dml) %}
{#
Use this macro to wrap all INSERT, MERGE, UPDATE, DELETE, and TRUNCATE
statements before passing them into run_query(), or calling in the 'main' statement
of a materialization
#}
{% set dml_transaction -%}
begin;
{{ dml }};
commit;
{%- endset %}

{% do return(dml_transaction) %}

{% endmacro %}


{% macro snowflake__truncate_relation(relation) -%}
{% set truncate_dml %}
truncate table {{ relation }}
{% endset %}
{% call statement('truncate_relation') -%}
{{ snowflake_dml_explicit_transaction(truncate_dml) }}
{%- endcall %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@
{% set strategy = dbt_snowflake_validate_get_incremental_strategy(config) -%}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

-- setup
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
Expand Down Expand Up @@ -72,12 +68,7 @@
{{ build_sql }}
{%- endcall -%}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ run_hooks(post_hooks) }}

{% set target_relation = target_relation.incorporate(type='table') %}
{% do persist_docs(target_relation, model) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute='name')) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{%- set dml -%}
{%- if unique_key is none -%}

{{ sql_header if sql_header is not none }}
Expand All @@ -17,12 +18,27 @@
(
select {{ dest_cols_csv }}
from {{ source_sql }}
);
)

{%- else -%}

{{ default__get_merge_sql(target, source_sql, unique_key, dest_columns, predicates) }}

{%- endif -%}
{%- endset -%}

{% do return(snowflake_dml_explicit_transaction(dml)) %}

{% endmacro %}


{% macro snowflake__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) %}
{% set dml = default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}


{% macro snowflake__snapshot_merge_sql(target, source, insert_cols) %}
{% set dml = default__snapshot_merge_sql(target, source, insert_cols) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{% macro snowflake__load_csv_rows(model, agate_table) %}
{% set cols_sql = get_seed_column_quoted_csv(model, agate_table.column_names) %}
{% set bindings = [] %}

{% set statements = [] %}

{% for chunk in agate_table.rows | batch(batch_size) %}
{% set bindings = [] %}

{% for row in chunk %}
{% do bindings.extend(row) %}
{% endfor %}

{% set sql %}
insert into {{ this.render() }} ({{ cols_sql }}) values
{% for row in chunk -%}
({%- for column in agate_table.column_names -%}
%s
{%- if not loop.last%},{%- endif %}
{%- endfor -%})
{%- if not loop.last%},{%- endif %}
{%- endfor %}
{% endset %}

{% do adapter.add_query('BEGIN', auto_begin=False) %}
{% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %}
{% do adapter.add_query('COMMIT', auto_begin=False) %}

{% if loop.index0 == 0 %}
{% do statements.append(sql) %}
{% endif %}
{% endfor %}

{# Return SQL so we can render it out into the compiled files #}
{{ return(statements[0]) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
schema=schema,
database=database, type='table') -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{{ run_hooks(pre_hooks) }}

{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to
-- downtime, but it should be a relatively infrequent occurrence #}
Expand All @@ -26,12 +23,7 @@
{{ create_table_as(false, target_relation, sql) }}
{%- endcall %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ run_hooks(post_hooks) }}

{% do persist_docs(target_relation, model) %}

Expand Down
21 changes: 12 additions & 9 deletions test/unit/test_snowflake_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ def test_quoting_on_truncate(self):
)
self.adapter.truncate_relation(relation)

# no query comment because wrapped in begin; + commit; for explicit DML
self.mock_execute.assert_has_calls([
mock.call('/* dbt */\ntruncate table test_database."test_schema".test_table', None)
mock.call('/* dbt */\nbegin;', None),
mock.call('truncate table test_database."test_schema".test_table\n ;', None),
mock.call('commit;', None)
])

def test_quoting_on_rename(self):
Expand Down Expand Up @@ -258,7 +261,7 @@ def test_client_session_keep_alive_false_by_default(self):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
role=None, schema='public', user='test_user',
warehouse='test_warehouse', private_key=None, application='dbt')
Expand All @@ -274,7 +277,7 @@ def test_client_session_keep_alive_true(self):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=True, database='test_database',
role=None, schema='public', user='test_user',
warehouse='test_warehouse', private_key=None, application='dbt')
Expand All @@ -291,7 +294,7 @@ def test_user_pass_authentication(self):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
password='test_password', role=None, schema='public',
user='test_user', warehouse='test_warehouse', private_key=None,
Expand All @@ -310,7 +313,7 @@ def test_authenticator_user_pass_authentication(self):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
password='test_password', role=None, schema='public',
user='test_user', warehouse='test_warehouse',
Expand All @@ -329,7 +332,7 @@ def test_authenticator_externalbrowser_authentication(self):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
role=None, schema='public', user='test_user',
warehouse='test_warehouse', authenticator='externalbrowser',
Expand All @@ -348,7 +351,7 @@ def test_authenticator_oauth_authentication(self):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
role=None, schema='public', user='test_user',
warehouse='test_warehouse', authenticator='oauth', token='my-oauth-token',
Expand All @@ -369,7 +372,7 @@ def test_authenticator_private_key_authentication(self, mock_get_private_key):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
role=None, schema='public', user='test_user',
warehouse='test_warehouse', private_key='test_key',
Expand All @@ -390,7 +393,7 @@ def test_authenticator_private_key_authentication_no_passphrase(self, mock_get_p
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
role=None, schema='public', user='test_user',
warehouse='test_warehouse', private_key='test_key',
Expand Down

0 comments on commit 792f39a

Please sign in to comment.