Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowflake: no transactions, except for DML #3510

Merged
merged 6 commits into from
Jul 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
## dbt 0.21.0 (Release TBD)

### Breaking changes
- Add full node selection to source freshness command and align selection syntax with other tasks (`dbt source freshness --select source_name` --> `dbt source freshness --select source:souce_name`) and rename `dbt source snapshot-freshness` -> `dbt source freshness`. ([#2987](https://github.com/dbt-labs/dbt/issues/2987), [#3554](https://github.com/dbt-labs/dbt/pull/3554))
- **dbt-snowflake:** Turn off transactions and turn on `autocommit` by default. Explicitly specify `begin` and `commit` for DML statements in incremental and snapshot materializations. Note that this may affect user-space code that depends on tranactions.

### Features
- Add `dbt build` command to run models, tests, seeds, and snapshots in DAG order. ([#2743] (https://github.com/dbt-labs/dbt/issues/2743), [#3490] (https://github.com/dbt-labs/dbt/issues/3490), [#3608](https://github.com/dbt-labs/dbt/issues/3608))
- Introduce `on_schema_change` config to detect and handle schema changes on incremental models ([#1132](https://github.com/fishtown-analytics/dbt/issues/1132), [#3387](https://github.com/fishtown-analytics/dbt/issues/3387))

### Breaking changes
- Add full node selection to source freshness command and align selection syntax with other tasks (`dbt source freshness --select source_name` --> `dbt source freshness --select source:souce_name`) and rename `dbt source snapshot-freshness` -> `dbt source freshness`. ([#2987](https://github.com/dbt-labs/dbt/issues/2987), [#3554](https://github.com/dbt-labs/dbt/pull/3554))

### Fixes
- Fix docs generation for cross-db sources in REDSHIFT RA3 node ([#3236](https://github.com/fishtown-analytics/dbt/issues/3236), [#3408](https://github.com/fishtown-analytics/dbt/pull/3408))
- Fix type coercion issues when fetching query result sets ([#2984](https://github.com/fishtown-analytics/dbt/issues/2984), [#3499](https://github.com/fishtown-analytics/dbt/pull/3499))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
(
select {{ dest_cols_csv }}
from {{ source }}
);
)

{%- endmacro %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
then insert ({{ insert_cols_csv }})
values ({{ insert_cols_csv }})
;
{% endmacro %}

{% endmacro %}

Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
{% endmacro %}

{% macro default__handle_existing_table(full_refresh, old_relation) %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{{ adapter.drop_relation(old_relation) }}
{% endmacro %}

Expand All @@ -19,7 +20,7 @@
*/
#}

{% macro create_or_replace_view(run_outside_transaction_hooks=True) %}
{% macro create_or_replace_view() %}
{%- set identifier = model['alias'] -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
Expand All @@ -30,13 +31,7 @@
identifier=identifier, schema=schema, database=database,
type='view') -%}

{% if run_outside_transaction_hooks %}
-- no transactions on BigQuery
{{ run_hooks(pre_hooks, inside_transaction=False) }}
{% endif %}

-- `BEGIN` happens here on Snowflake
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{{ run_hooks(pre_hooks) }}

-- If there's a table with the same name and we weren't told to full refresh,
-- that's an error. If we were told to full refresh, drop it. This behavior differs
Expand All @@ -50,14 +45,7 @@
{{ create_view_as(target_relation, sql) }}
{%- endcall %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if run_outside_transaction_hooks %}
-- No transactions on BigQuery
{{ run_hooks(post_hooks, inside_transaction=False) }}
{% endif %}
{{ run_hooks(post_hooks) }}

{{ return({'relations': [target_relation]}) }}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@


{% materialization view, adapter='bigquery' -%}
{% set to_return = create_or_replace_view(run_outside_transaction_hooks=False) %}
{% set to_return = create_or_replace_view() %}

{% set target_relation = this.incorporate(type='view') %}
{% do persist_docs(target_relation, model) %}
Expand Down
31 changes: 18 additions & 13 deletions plugins/snowflake/dbt/adapters/snowflake/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def open(cls, connection):
schema=creds.schema,
warehouse=creds.warehouse,
role=creds.role,
autocommit=False,
autocommit=True,
client_session_keep_alive=creds.client_session_keep_alive,
application='dbt',
**creds.auth_args()
Expand Down Expand Up @@ -275,6 +275,23 @@ def get_response(cls, cursor) -> AdapterResponse:
code=code
)

# disable transactional logic by default on Snowflake
# except for DML statements where explicitly defined
def add_begin_query(self, *args, **kwargs):
pass

def add_commit_query(self, *args, **kwargs):
pass

def begin(self):
pass

def commit(self):
pass

def clear_transaction(self):
pass

@classmethod
def _split_queries(cls, sql):
"Splits sql statements at semicolons into discrete queries"
Expand Down Expand Up @@ -352,15 +369,3 @@ def add_query(self, sql, auto_begin=True,
)

return connection, cursor

@classmethod
def _rollback_handle(cls, connection):
"""On snowflake, rolling back the handle of an aborted session raises
an exception.
"""
try:
connection.handle.rollback()
except snowflake.connector.errors.ProgrammingError as e:
msg = str(e)
if 'Session no longer exists' not in msg:
raise
29 changes: 28 additions & 1 deletion plugins/snowflake/dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,31 @@

{% endif %}

{% endmacro %}
{% endmacro %}


{% macro snowflake_dml_explicit_transaction(dml) %}
{#
Use this macro to wrap all INSERT, MERGE, UPDATE, DELETE, and TRUNCATE
statements before passing them into run_query(), or calling in the 'main' statement
of a materialization
#}
{% set dml_transaction -%}
begin;
{{ dml }};
commit;
{%- endset %}

{% do return(dml_transaction) %}

{% endmacro %}


{% macro snowflake__truncate_relation(relation) -%}
{% set truncate_dml %}
truncate table {{ relation }}
{% endset %}
{% call statement('truncate_relation') -%}
{{ snowflake_dml_explicit_transaction(truncate_dml) }}
{%- endcall %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@
{% set strategy = dbt_snowflake_validate_get_incremental_strategy(config) -%}
{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

-- setup
{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{{ run_hooks(pre_hooks) }}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
Expand Down Expand Up @@ -72,12 +68,7 @@
{{ build_sql }}
{%- endcall -%}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ run_hooks(post_hooks) }}

{% set target_relation = target_relation.incorporate(type='table') %}
{% do persist_docs(target_relation, model) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute='name')) -%}
{%- set sql_header = config.get('sql_header', none) -%}

{%- set dml -%}
{%- if unique_key is none -%}

{{ sql_header if sql_header is not none }}
Expand All @@ -17,12 +18,27 @@
(
select {{ dest_cols_csv }}
from {{ source_sql }}
);
)

{%- else -%}

{{ default__get_merge_sql(target, source_sql, unique_key, dest_columns, predicates) }}

{%- endif -%}
{%- endset -%}

{% do return(snowflake_dml_explicit_transaction(dml)) %}

{% endmacro %}


{% macro snowflake__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) %}
{% set dml = default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any problem with specifying code with an explicit begin/commit in addition to enabling autocommit? Surprised to see that this works! I would have expected that we would need to disable autocommit for these queries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is one of the things that's tricky about autocommit. You don't actually need to disable it, it just doesn't take effect if you supply explicit begin + commit. From docs:

Statements inside an explicit transaction are not affected by AUTOCOMMIT. For example, statements inside an explicit BEGIN TRANSACTION ... ROLLBACK are rolled back even if AUTOCOMMIT is TRUE.

{% endmacro %}


{% macro snowflake__snapshot_merge_sql(target, source, insert_cols) %}
{% set dml = default__snapshot_merge_sql(target, source, insert_cols) %}
{% do return(snowflake_dml_explicit_transaction(dml)) %}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
{% macro snowflake__load_csv_rows(model, agate_table) %}
{% set cols_sql = get_seed_column_quoted_csv(model, agate_table.column_names) %}
{% set bindings = [] %}

{% set statements = [] %}

{% for chunk in agate_table.rows | batch(batch_size) %}
{% set bindings = [] %}

{% for row in chunk %}
{% do bindings.extend(row) %}
{% endfor %}

{% set sql %}
insert into {{ this.render() }} ({{ cols_sql }}) values
{% for row in chunk -%}
({%- for column in agate_table.column_names -%}
%s
{%- if not loop.last%},{%- endif %}
{%- endfor -%})
{%- if not loop.last%},{%- endif %}
{%- endfor %}
{% endset %}

{% do adapter.add_query('BEGIN', auto_begin=False) %}
{% do adapter.add_query(sql, bindings=bindings, abridge_sql_log=True) %}
{% do adapter.add_query('COMMIT', auto_begin=False) %}

{% if loop.index0 == 0 %}
{% do statements.append(sql) %}
{% endif %}
{% endfor %}

{# Return SQL so we can render it out into the compiled files #}
{{ return(statements[0]) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@
schema=schema,
database=database, type='table') -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{{ run_hooks(pre_hooks) }}

{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to
-- downtime, but it should be a relatively infrequent occurrence #}
Expand All @@ -26,12 +23,7 @@
{{ create_table_as(false, target_relation, sql) }}
{%- endcall %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ run_hooks(post_hooks) }}

{% do persist_docs(target_relation, model) %}

Expand Down
21 changes: 12 additions & 9 deletions test/unit/test_snowflake_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,11 @@ def test_quoting_on_truncate(self):
)
self.adapter.truncate_relation(relation)

# no query comment because wrapped in begin; + commit; for explicit DML
self.mock_execute.assert_has_calls([
mock.call('/* dbt */\ntruncate table test_database."test_schema".test_table', None)
mock.call('/* dbt */\nbegin;', None),
mock.call('truncate table test_database."test_schema".test_table\n ;', None),
mock.call('commit;', None)
])

def test_quoting_on_rename(self):
Expand Down Expand Up @@ -258,7 +261,7 @@ def test_client_session_keep_alive_false_by_default(self):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
role=None, schema='public', user='test_user',
warehouse='test_warehouse', private_key=None, application='dbt')
Expand All @@ -274,7 +277,7 @@ def test_client_session_keep_alive_true(self):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=True, database='test_database',
role=None, schema='public', user='test_user',
warehouse='test_warehouse', private_key=None, application='dbt')
Expand All @@ -291,7 +294,7 @@ def test_user_pass_authentication(self):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
password='test_password', role=None, schema='public',
user='test_user', warehouse='test_warehouse', private_key=None,
Expand All @@ -310,7 +313,7 @@ def test_authenticator_user_pass_authentication(self):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
password='test_password', role=None, schema='public',
user='test_user', warehouse='test_warehouse',
Expand All @@ -329,7 +332,7 @@ def test_authenticator_externalbrowser_authentication(self):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
role=None, schema='public', user='test_user',
warehouse='test_warehouse', authenticator='externalbrowser',
Expand All @@ -348,7 +351,7 @@ def test_authenticator_oauth_authentication(self):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
role=None, schema='public', user='test_user',
warehouse='test_warehouse', authenticator='oauth', token='my-oauth-token',
Expand All @@ -369,7 +372,7 @@ def test_authenticator_private_key_authentication(self, mock_get_private_key):
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
role=None, schema='public', user='test_user',
warehouse='test_warehouse', private_key='test_key',
Expand All @@ -390,7 +393,7 @@ def test_authenticator_private_key_authentication_no_passphrase(self, mock_get_p
conn.handle
self.snowflake.assert_has_calls([
mock.call(
account='test_account', autocommit=False,
account='test_account', autocommit=True,
client_session_keep_alive=False, database='test_database',
role=None, schema='public', user='test_user',
warehouse='test_warehouse', private_key='test_key',
Expand Down