From 31e389064bf600ac42f17940ff0f94c1e39abf40 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Thu, 30 Jun 2022 19:39:25 +0200 Subject: [PATCH] Multi-query statements: fix rows_affected + query comments (#153) * Add test for rows_affected * Alt path for begin + commit. Query comments post-split * Fix unit test * Add changelog entry --- CHANGELOG.md | 5 +++ dbt/adapters/snowflake/connections.py | 36 ++++++++++++++++++- .../adapter/test_incremental_run_result.py | 22 ++++++++++++ tests/unit/test_snowflake_adapter.py | 6 ++-- 4 files changed, 65 insertions(+), 4 deletions(-) create mode 100644 tests/functional/adapter/test_incremental_run_result.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 3860f1642..1fffd7bc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## dbt-snowflake 1.2.0rc1 (Release TBD) + +### Fixes +- In multi-query statements, prepend all queries with query comments. Use the last non-`COMMIT` query to store metadata about the model result. **Note:** this restores previous (pre-v0.21) behavior for incremental models and snapshots, which will again correctly reflect the number of rows modified in `adapter_response.rows_affected` ([#140](https://github.com/dbt-labs/dbt-snowflake/issues/140), [#147](https://github.com/dbt-labs/dbt-snowflake/issues147140), [#153](https://github.com/dbt-labs/dbt-snowflake/pull/153)) + ## dbt-snowflake 1.2.0b1 (June 24, 2022) ### Features diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py index ed8140924..ddc104ffd 100644 --- a/dbt/adapters/snowflake/connections.py +++ b/dbt/adapters/snowflake/connections.py @@ -6,7 +6,10 @@ from dataclasses import dataclass from io import StringIO from time import sleep -from typing import Optional +from typing import Optional, Tuple + +import agate +import dbt.clients.agate_helper from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization @@ -430,6 +433,19 @@ def process_results(cls, column_names, rows): return super().process_results(column_names, fixed) + def execute( + self, sql: str, auto_begin: bool = False, fetch: bool = False + ) -> Tuple[AdapterResponse, agate.Table]: + # don't apply the query comment here + # it will be applied after ';' queries are split + _, cursor = self.add_query(sql, auto_begin) + response = self.get_response(cursor) + if fetch: + table = self.get_result_from_cursor(cursor) + else: + table = dbt.clients.agate_helper.empty_table() + return response, table + def add_query(self, sql, auto_begin=True, bindings=None, abridge_sql_log=False): connection = None @@ -455,6 +471,24 @@ def add_query(self, sql, auto_begin=True, bindings=None, abridge_sql_log=False): if without_comments == "": continue + # Even though we turn off transactions by default for Snowflake, + # the user/macro has passed them *explicitly*, probably to wrap a DML statement + # Let their wish be granted! + # This also has the effect of ignoring "commit" in the RunResult for this model + # https://github.com/dbt-labs/dbt-snowflake/issues/147 + if individual_query.lower() == "begin;": + super().add_begin_query() + continue + + elif individual_query.lower() == "commit;": + super().add_commit_query() + continue + + # add a query comment to *every* statement + # needed for models with multi-step materializations + # https://github.com/dbt-labs/dbt-snowflake/issues/140 + individual_query = self._add_query_comment(individual_query) + connection, cursor = super().add_query( individual_query, auto_begin, bindings=bindings, abridge_sql_log=abridge_sql_log ) diff --git a/tests/functional/adapter/test_incremental_run_result.py b/tests/functional/adapter/test_incremental_run_result.py new file mode 100644 index 000000000..b86fd8a2d --- /dev/null +++ b/tests/functional/adapter/test_incremental_run_result.py @@ -0,0 +1,22 @@ +import pytest + +from dbt.tests.util import run_dbt +from dbt.tests.adapter.basic.test_incremental import BaseIncremental + +class TestIncrementalRunResultSnowflake(BaseIncremental): + """Bonus test to verify that incremental models return the number of rows affected""" + def test_incremental(self, project): + # seed command + results = run_dbt(["seed"]) + assert len(results) == 2 + + # run with initial seed + results = run_dbt(["run", "--vars", "seed_name: base"]) + assert len(results) == 1 + + # run with additions + results = run_dbt(["run", "--vars", "seed_name: added"]) + assert len(results) == 1 + # verify that run_result is correct + rows_affected = results[0].adapter_response["rows_affected"] + assert rows_affected == 10, f"Expected 10 rows changed, found {rows_affected}" diff --git a/tests/unit/test_snowflake_adapter.py b/tests/unit/test_snowflake_adapter.py index 5752ec197..6cd50ca67 100644 --- a/tests/unit/test_snowflake_adapter.py +++ b/tests/unit/test_snowflake_adapter.py @@ -133,9 +133,9 @@ def test_quoting_on_truncate(self): # no query comment because wrapped in begin; + commit; for explicit DML self.mock_execute.assert_has_calls([ - mock.call('/* dbt */\nbegin;', None), - mock.call('truncate table test_database."test_schema".test_table\n ;', None), - mock.call('commit;', None) + mock.call('/* dbt */\nBEGIN', None), + mock.call('/* dbt */\ntruncate table test_database."test_schema".test_table\n ;', None), + mock.call('/* dbt */\nCOMMIT', None) ]) def test_quoting_on_rename(self):