Skip to content

Commit

Permalink
Multi-query statements: fix rows_affected + query comments (#153)
Browse files Browse the repository at this point in the history
* Add test for rows_affected

* Alt path for begin + commit. Query comments post-split

* Fix unit test

* Add changelog entry
  • Loading branch information
jtcohen6 authored Jun 30, 2022
1 parent b9875e4 commit 31e3890
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 4 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## dbt-snowflake 1.2.0rc1 (Release TBD)

### Fixes
- In multi-query statements, prepend all queries with query comments. Use the last non-`COMMIT` query to store metadata about the model result. **Note:** this restores previous (pre-v0.21) behavior for incremental models and snapshots, which will again correctly reflect the number of rows modified in `adapter_response.rows_affected` ([#140](https://github.com/dbt-labs/dbt-snowflake/issues/140), [#147](https://github.com/dbt-labs/dbt-snowflake/issues147140), [#153](https://github.com/dbt-labs/dbt-snowflake/pull/153))

## dbt-snowflake 1.2.0b1 (June 24, 2022)

### Features
Expand Down
36 changes: 35 additions & 1 deletion dbt/adapters/snowflake/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
from dataclasses import dataclass
from io import StringIO
from time import sleep
from typing import Optional
from typing import Optional, Tuple

import agate
import dbt.clients.agate_helper

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
Expand Down Expand Up @@ -430,6 +433,19 @@ def process_results(cls, column_names, rows):

return super().process_results(column_names, fixed)

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[AdapterResponse, agate.Table]:
# don't apply the query comment here
# it will be applied after ';' queries are split
_, cursor = self.add_query(sql, auto_begin)
response = self.get_response(cursor)
if fetch:
table = self.get_result_from_cursor(cursor)
else:
table = dbt.clients.agate_helper.empty_table()
return response, table

def add_query(self, sql, auto_begin=True, bindings=None, abridge_sql_log=False):

connection = None
Expand All @@ -455,6 +471,24 @@ def add_query(self, sql, auto_begin=True, bindings=None, abridge_sql_log=False):
if without_comments == "":
continue

# Even though we turn off transactions by default for Snowflake,
# the user/macro has passed them *explicitly*, probably to wrap a DML statement
# Let their wish be granted!
# This also has the effect of ignoring "commit" in the RunResult for this model
# https://github.com/dbt-labs/dbt-snowflake/issues/147
if individual_query.lower() == "begin;":
super().add_begin_query()
continue

elif individual_query.lower() == "commit;":
super().add_commit_query()
continue

# add a query comment to *every* statement
# needed for models with multi-step materializations
# https://github.com/dbt-labs/dbt-snowflake/issues/140
individual_query = self._add_query_comment(individual_query)

connection, cursor = super().add_query(
individual_query, auto_begin, bindings=bindings, abridge_sql_log=abridge_sql_log
)
Expand Down
22 changes: 22 additions & 0 deletions tests/functional/adapter/test_incremental_run_result.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import pytest

from dbt.tests.util import run_dbt
from dbt.tests.adapter.basic.test_incremental import BaseIncremental

class TestIncrementalRunResultSnowflake(BaseIncremental):
"""Bonus test to verify that incremental models return the number of rows affected"""
def test_incremental(self, project):
# seed command
results = run_dbt(["seed"])
assert len(results) == 2

# run with initial seed
results = run_dbt(["run", "--vars", "seed_name: base"])
assert len(results) == 1

# run with additions
results = run_dbt(["run", "--vars", "seed_name: added"])
assert len(results) == 1
# verify that run_result is correct
rows_affected = results[0].adapter_response["rows_affected"]
assert rows_affected == 10, f"Expected 10 rows changed, found {rows_affected}"
6 changes: 3 additions & 3 deletions tests/unit/test_snowflake_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ def test_quoting_on_truncate(self):

# no query comment because wrapped in begin; + commit; for explicit DML
self.mock_execute.assert_has_calls([
mock.call('/* dbt */\nbegin;', None),
mock.call('truncate table test_database."test_schema".test_table\n ;', None),
mock.call('commit;', None)
mock.call('/* dbt */\nBEGIN', None),
mock.call('/* dbt */\ntruncate table test_database."test_schema".test_table\n ;', None),
mock.call('/* dbt */\nCOMMIT', None)
])

def test_quoting_on_rename(self):
Expand Down

0 comments on commit 31e3890

Please sign in to comment.