From 3f824d0e2cd82f41089a435b3ff624df52073f59 Mon Sep 17 00:00:00 2001 From: Josh Taylor Date: Wed, 20 Jul 2022 12:44:05 +0800 Subject: [PATCH 1/8] Add release_connection setting to not release the connection --- dbt/adapters/snowflake/connections.py | 28 +++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py index ddc104ffd..78d12709a 100644 --- a/dbt/adapters/snowflake/connections.py +++ b/dbt/adapters/snowflake/connections.py @@ -64,6 +64,7 @@ class SnowflakeCredentials(Credentials): retry_on_database_errors: bool = False retry_all: bool = False insecure_mode: Optional[bool] = False + release_connection: Optional[bool] = True def __post_init__(self): if self.authenticator != "oauth" and ( @@ -216,6 +217,11 @@ def _get_private_key(self): class SnowflakeConnectionManager(SQLConnectionManager): TYPE = "snowflake" + # Used for determining if the connection should be released. + # As this feature is new, we don't want to break existing integrations, instead make it opt-in for users to test. + # Reusing connections can see a big performance improvement, as you don't need to reauthenticate. + __release_connection: Optional[bool] = True + @contextmanager def exception_handler(self, sql): try: @@ -260,6 +266,10 @@ def open(cls, connection): return connection creds = connection.credentials + + if not creds.release_connection: + cls.__release_connection = False + error = None for attempt in range(1 + creds.connect_retries): try: @@ -508,3 +518,21 @@ def add_query(self, sql, auto_begin=True, bindings=None, abridge_sql_log=False): ) return connection, cursor + + def release(self) -> None: + if not self.__release_connection: + return + + with self.lock: + conn = self.get_if_exists() + if conn is None: + return + + try: + # always close the connection. close() calls _rollback() if there + # is an open transaction + self.close(conn) + except Exception: + # if rollback or close failed, remove our busted connection + self.clear_thread_connection() + raise \ No newline at end of file From 432723791511e042d36a50e772b46c88dca3cfc6 Mon Sep 17 00:00:00 2001 From: Josh Taylor Date: Sun, 23 Oct 2022 21:12:17 +0800 Subject: [PATCH 2/8] Merge branch 'main' into feature/connection-release --- dbt/adapters/snowflake/connections.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py index d4bc36664..afc1ea010 100644 --- a/dbt/adapters/snowflake/connections.py +++ b/dbt/adapters/snowflake/connections.py @@ -228,7 +228,6 @@ def _get_private_key(self): class SnowflakeConnectionManager(SQLConnectionManager): TYPE = "snowflake" - # Used for determining if the connection should be released. # As this feature is new, we don't want to break existing integrations, instead make it opt-in for users to test. # Reusing connections can see a big performance improvement, as you don't need to reauthenticate. @@ -498,4 +497,4 @@ def release(self) -> None: except Exception: # if rollback or close failed, remove our busted connection self.clear_thread_connection() - raise \ No newline at end of file + raise From cccffeacdc032f74c09e40fc03afd41c860af91c Mon Sep 17 00:00:00 2001 From: Josh Taylor Date: Sun, 23 Oct 2022 21:12:31 +0800 Subject: [PATCH 3/8] remove space --- dbt/adapters/snowflake/connections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py index afc1ea010..a7e284edf 100644 --- a/dbt/adapters/snowflake/connections.py +++ b/dbt/adapters/snowflake/connections.py @@ -497,4 +497,4 @@ def release(self) -> None: except Exception: # if rollback or close failed, remove our busted connection self.clear_thread_connection() - raise + raise \ No newline at end of file From 8ec71a5d0c0c9eb3fdb81f5ca4b1a05667d9f857 Mon Sep 17 00:00:00 2001 From: Mila Page Date: Tue, 31 Jan 2023 01:46:20 -0800 Subject: [PATCH 4/8] Simplify code and add profiles.yml integration. Default still to release. --- dbt/adapters/snowflake/connections.py | 24 ++++-------------------- 1 file changed, 4 insertions(+), 20 deletions(-) diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py index 42988a3df..1836382d2 100644 --- a/dbt/adapters/snowflake/connections.py +++ b/dbt/adapters/snowflake/connections.py @@ -153,6 +153,7 @@ def auth_args(self): # enable mfa token cache for linux result["client_request_mfa_token"] = True result["private_key"] = self._get_private_key() + result["release_connection"] = self.release_connection return result def _get_access_token(self) -> str: @@ -233,10 +234,6 @@ def _get_private_key(self): class SnowflakeConnectionManager(SQLConnectionManager): TYPE = "snowflake" - # Used for determining if the connection should be released. - # As this feature is new, we don't want to break existing integrations, instead make it opt-in for users to test. - # Reusing connections can see a big performance improvement, as you don't need to reauthenticate. - __release_connection: Optional[bool] = True @contextmanager def exception_handler(self, sql): @@ -493,19 +490,6 @@ def add_query(self, sql, auto_begin=True, bindings=None, abridge_sql_log=False): return connection, cursor def release(self) -> None: - if not self.__release_connection: - return - - with self.lock: - conn = self.get_if_exists() - if conn is None: - return - - try: - # always close the connection. close() calls _rollback() if there - # is an open transaction - self.close(conn) - except Exception: - # if rollback or close failed, remove our busted connection - self.clear_thread_connection() - raise \ No newline at end of file + """If the release_connection param in profiles.yml is False, maintain the connection.""" + if self.profile.credentials.release_connection: # type: ignore + super().release() From 7edf78288302b81d93605b4986eb961afab20aa4 Mon Sep 17 00:00:00 2001 From: Mila Page Date: Tue, 31 Jan 2023 02:06:39 -0800 Subject: [PATCH 5/8] Add release_connection to unit tests --- tests/unit/test_snowflake_adapter.py | 40 +++++++++++++++++++--------- 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/tests/unit/test_snowflake_adapter.py b/tests/unit/test_snowflake_adapter.py index 7b7ab4101..4475be3db 100644 --- a/tests/unit/test_snowflake_adapter.py +++ b/tests/unit/test_snowflake_adapter.py @@ -262,7 +262,7 @@ def test_client_session_keep_alive_false_by_default(self): client_session_keep_alive=False, database='test_database', role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key=None, application='dbt', insecure_mode=False, - session_parameters={}), + session_parameters={}, release_connection=True), ]) def test_client_session_keep_alive_true(self): @@ -279,7 +279,7 @@ def test_client_session_keep_alive_true(self): client_session_keep_alive=True, database='test_database', role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key=None, application='dbt', insecure_mode=False, - session_parameters={}) + session_parameters={}, release_connection=True) ]) def test_user_pass_authentication(self): @@ -298,7 +298,7 @@ def test_user_pass_authentication(self): password='test_password', role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key=None, application='dbt', insecure_mode=False, - session_parameters={}) + session_parameters={}, release_connection=True) ]) def test_authenticator_user_pass_authentication(self): @@ -318,9 +318,9 @@ def test_authenticator_user_pass_authentication(self): password='test_password', role=None, schema='public', user='test_user', warehouse='test_warehouse', authenticator='test_sso_url', private_key=None, - application='dbt', client_request_mfa_token=True, + application='dbt', client_request_mfa_token=True, client_store_temporary_credential=True, insecure_mode=False, - session_parameters={}) + session_parameters={}, release_connection=True) ]) def test_authenticator_externalbrowser_authentication(self): @@ -338,9 +338,9 @@ def test_authenticator_externalbrowser_authentication(self): client_session_keep_alive=False, database='test_database', role=None, schema='public', user='test_user', warehouse='test_warehouse', authenticator='externalbrowser', - private_key=None, application='dbt', client_request_mfa_token=True, + private_key=None, application='dbt', client_request_mfa_token=True, client_store_temporary_credential=True, insecure_mode=False, - session_parameters={}) + session_parameters={}, release_connection=True) ]) def test_authenticator_oauth_authentication(self): @@ -359,9 +359,9 @@ def test_authenticator_oauth_authentication(self): client_session_keep_alive=False, database='test_database', role=None, schema='public', user='test_user', warehouse='test_warehouse', authenticator='oauth', token='my-oauth-token', - private_key=None, application='dbt', client_request_mfa_token=True, + private_key=None, application='dbt', client_request_mfa_token=True, client_store_temporary_credential=True, insecure_mode=False, - session_parameters={}) + session_parameters={}, release_connection=True) ]) @mock.patch('dbt.adapters.snowflake.SnowflakeCredentials._get_private_key', return_value='test_key') @@ -383,7 +383,7 @@ def test_authenticator_private_key_authentication(self, mock_get_private_key): role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key='test_key', application='dbt', insecure_mode=False, - session_parameters={}) + session_parameters={}, release_connection=True) ]) @mock.patch('dbt.adapters.snowflake.SnowflakeCredentials._get_private_key', return_value='test_key') @@ -405,7 +405,7 @@ def test_authenticator_private_key_authentication_no_passphrase(self, mock_get_p role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key='test_key', application='dbt', insecure_mode=False, - session_parameters={}) + session_parameters={}, release_connection=True) ]) def test_query_tag(self): @@ -422,7 +422,23 @@ def test_query_tag(self): password='test_password', role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key=None, application='dbt', insecure_mode=False, - session_parameters={"QUERY_TAG": "test_query_tag"}) + session_parameters={"QUERY_TAG": "test_query_tag"}, release_connection=True) + ]) + + def test_release_credentials(self): + self.config.credentials = self.config.credentials.replace(release_connection=False) + self.adapter = SnowflakeAdapter(self.config) + conn = self.adapter.connections.set_connection_name(name='new_connection_with_new_config') + + self.snowflake.assert_not_called() + conn.handle + self.snowflake.assert_has_calls([ + mock.call( + account='test_account', autocommit=True, + client_session_keep_alive=False, database='test_database', + role=None, schema='public', user='test_user', warehouse='test_warehouse', + private_key=None, application='dbt', insecure_mode=False, + session_parameters={}, release_connection=False) ]) From ff047a03bf9d53629bd63835a62153400d56e020 Mon Sep 17 00:00:00 2001 From: Mila Page Date: Tue, 31 Jan 2023 02:27:08 -0800 Subject: [PATCH 6/8] Add changelog --- .changes/unreleased/Features-20230131-022659.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20230131-022659.yaml diff --git a/.changes/unreleased/Features-20230131-022659.yaml b/.changes/unreleased/Features-20230131-022659.yaml new file mode 100644 index 000000000..c5d01c71f --- /dev/null +++ b/.changes/unreleased/Features-20230131-022659.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Add configurable setting for keeping connections open on Snowflake +time: 2023-01-31T02:26:59.701589-08:00 +custom: + Author: versusfacit joshuataylor + Issue: "854" From 3614d92516466ad3d2b7c518e1dcf9a19e5447c1 Mon Sep 17 00:00:00 2001 From: Mila Page Date: Tue, 31 Jan 2023 02:38:04 -0800 Subject: [PATCH 7/8] Add warning for combination that can result in a bug. --- dbt/adapters/snowflake/connections.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py index 1836382d2..7d1c666da 100644 --- a/dbt/adapters/snowflake/connections.py +++ b/dbt/adapters/snowflake/connections.py @@ -152,8 +152,19 @@ def auth_args(self): result["client_store_temporary_credential"] = True # enable mfa token cache for linux result["client_request_mfa_token"] = True - result["private_key"] = self._get_private_key() + + # Warning: This profile configuration can result in specific threads, even just one, + # hanging open during execution. While uncommon, this can cancel out any speed up and + # make run tasks take even longer than they might normally. result["release_connection"] = self.release_connection + if self.client_session_keep_alive and not self.release_connection: + warn_or_error( + AdapterEventWarning( + base_msg="Invalid profile: release_connection is False. client_session_keep_alive must also be False!" + ) + ) + + result["private_key"] = self._get_private_key() return result def _get_access_token(self) -> str: From a2b6b70a3b70559057677d5a2fcf645c22abcd39 Mon Sep 17 00:00:00 2001 From: Mila Page Date: Mon, 6 Feb 2023 18:28:41 -0800 Subject: [PATCH 8/8] Rename config var with None default, change up tests. Remove warnings. Add comments. --- dbt/adapters/snowflake/connections.py | 23 ++++++++------------- tests/unit/test_snowflake_adapter.py | 29 +++++++++++++++------------ 2 files changed, 24 insertions(+), 28 deletions(-) diff --git a/dbt/adapters/snowflake/connections.py b/dbt/adapters/snowflake/connections.py index 7d1c666da..e020c47ed 100644 --- a/dbt/adapters/snowflake/connections.py +++ b/dbt/adapters/snowflake/connections.py @@ -78,7 +78,7 @@ class SnowflakeCredentials(Credentials): retry_on_database_errors: bool = False retry_all: bool = False insecure_mode: Optional[bool] = False - release_connection: Optional[bool] = True + reuse_connections: Optional[bool] = None def __post_init__(self): if self.authenticator != "oauth" and ( @@ -152,18 +152,7 @@ def auth_args(self): result["client_store_temporary_credential"] = True # enable mfa token cache for linux result["client_request_mfa_token"] = True - - # Warning: This profile configuration can result in specific threads, even just one, - # hanging open during execution. While uncommon, this can cancel out any speed up and - # make run tasks take even longer than they might normally. - result["release_connection"] = self.release_connection - if self.client_session_keep_alive and not self.release_connection: - warn_or_error( - AdapterEventWarning( - base_msg="Invalid profile: release_connection is False. client_session_keep_alive must also be False!" - ) - ) - + result["reuse_connections"] = self.reuse_connections result["private_key"] = self._get_private_key() return result @@ -501,6 +490,10 @@ def add_query(self, sql, auto_begin=True, bindings=None, abridge_sql_log=False): return connection, cursor def release(self) -> None: - """If the release_connection param in profiles.yml is False, maintain the connection.""" - if self.profile.credentials.release_connection: # type: ignore + """Reuse connections by deferring release until adapter context manager in core + resets adapters. This cleanup_all happens before Python teardown. Idle connections + incur no costs while waiting in the connection pool.""" + if self.profile.credentials.reuse_connections: # type: ignore + return + else: super().release() diff --git a/tests/unit/test_snowflake_adapter.py b/tests/unit/test_snowflake_adapter.py index 4475be3db..49186206e 100644 --- a/tests/unit/test_snowflake_adapter.py +++ b/tests/unit/test_snowflake_adapter.py @@ -262,7 +262,7 @@ def test_client_session_keep_alive_false_by_default(self): client_session_keep_alive=False, database='test_database', role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key=None, application='dbt', insecure_mode=False, - session_parameters={}, release_connection=True), + session_parameters={}, reuse_connections=None), ]) def test_client_session_keep_alive_true(self): @@ -279,7 +279,7 @@ def test_client_session_keep_alive_true(self): client_session_keep_alive=True, database='test_database', role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key=None, application='dbt', insecure_mode=False, - session_parameters={}, release_connection=True) + session_parameters={}, reuse_connections=None) ]) def test_user_pass_authentication(self): @@ -298,7 +298,7 @@ def test_user_pass_authentication(self): password='test_password', role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key=None, application='dbt', insecure_mode=False, - session_parameters={}, release_connection=True) + session_parameters={}, reuse_connections=None) ]) def test_authenticator_user_pass_authentication(self): @@ -320,7 +320,7 @@ def test_authenticator_user_pass_authentication(self): authenticator='test_sso_url', private_key=None, application='dbt', client_request_mfa_token=True, client_store_temporary_credential=True, insecure_mode=False, - session_parameters={}, release_connection=True) + session_parameters={}, reuse_connections=None) ]) def test_authenticator_externalbrowser_authentication(self): @@ -340,7 +340,7 @@ def test_authenticator_externalbrowser_authentication(self): warehouse='test_warehouse', authenticator='externalbrowser', private_key=None, application='dbt', client_request_mfa_token=True, client_store_temporary_credential=True, insecure_mode=False, - session_parameters={}, release_connection=True) + session_parameters={}, reuse_connections=None) ]) def test_authenticator_oauth_authentication(self): @@ -361,7 +361,7 @@ def test_authenticator_oauth_authentication(self): warehouse='test_warehouse', authenticator='oauth', token='my-oauth-token', private_key=None, application='dbt', client_request_mfa_token=True, client_store_temporary_credential=True, insecure_mode=False, - session_parameters={}, release_connection=True) + session_parameters={}, reuse_connections=None) ]) @mock.patch('dbt.adapters.snowflake.SnowflakeCredentials._get_private_key', return_value='test_key') @@ -383,7 +383,7 @@ def test_authenticator_private_key_authentication(self, mock_get_private_key): role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key='test_key', application='dbt', insecure_mode=False, - session_parameters={}, release_connection=True) + session_parameters={}, reuse_connections=None) ]) @mock.patch('dbt.adapters.snowflake.SnowflakeCredentials._get_private_key', return_value='test_key') @@ -405,7 +405,7 @@ def test_authenticator_private_key_authentication_no_passphrase(self, mock_get_p role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key='test_key', application='dbt', insecure_mode=False, - session_parameters={}, release_connection=True) + session_parameters={}, reuse_connections=None) ]) def test_query_tag(self): @@ -422,11 +422,14 @@ def test_query_tag(self): password='test_password', role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key=None, application='dbt', insecure_mode=False, - session_parameters={"QUERY_TAG": "test_query_tag"}, release_connection=True) + session_parameters={"QUERY_TAG": "test_query_tag"}, reuse_connections=None) ]) - def test_release_credentials(self): - self.config.credentials = self.config.credentials.replace(release_connection=False) + def test_reuse_connections_with_keep_alive(self): + self.config.credentials = self.config.credentials.replace( + reuse_connections=True, + client_session_keep_alive=True + ) self.adapter = SnowflakeAdapter(self.config) conn = self.adapter.connections.set_connection_name(name='new_connection_with_new_config') @@ -435,10 +438,10 @@ def test_release_credentials(self): self.snowflake.assert_has_calls([ mock.call( account='test_account', autocommit=True, - client_session_keep_alive=False, database='test_database', + client_session_keep_alive=True, database='test_database', role=None, schema='public', user='test_user', warehouse='test_warehouse', private_key=None, application='dbt', insecure_mode=False, - session_parameters={}, release_connection=False) + session_parameters={}, reuse_connections=True) ])