From fb958f9f353f0c527fbcfefa88ba8f6934c27a55 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 9 Oct 2020 15:58:40 +0300 Subject: [PATCH 01/12] feat: implement transactions management --- google/cloud/spanner_dbapi/connection.py | 39 +++++++++++++++++++----- google/cloud/spanner_dbapi/cursor.py | 11 +++++++ 2 files changed, 42 insertions(+), 8 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 70ec5a0365..c70e5d44ed 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -10,15 +10,12 @@ import warnings from google.cloud import spanner_v1 +from google.cloud.spanner_v1.pool import BurstyPool from .cursor import Cursor from .exceptions import InterfaceError -AUTOCOMMIT_MODE_WARNING = ( - "This method is non-operational, as Cloud Spanner" - "DB API always works in `autocommit` mode." - "See https://github.com/googleapis/python-spanner-django#transaction-management-isnt-supported" -) +AUTOCOMMIT_MODE_WARNING = "This method is non-operational in autocommit mode" ColumnDetails = namedtuple("column_details", ["null_ok", "spanner_type"]) @@ -37,11 +34,25 @@ class Connection: """ def __init__(self, instance, database): + self._sessions_pool = BurstyPool() + self._sessions_pool.bind(database) + self.instance = instance self.database = database - self.is_closed = False self._ddl_statements = [] + self.transactions = [] + + self.is_closed = False + self.autocommit = True + + def session(self): + """Get a Cloud Spanner session. + + :rtype: :class:`google.cloud.spanner_v1.session.Session` + :returns: Cloud Spanner session object ready to use. + """ + return self._sessions_pool.get() def cursor(self): self._raise_if_closed() @@ -149,11 +160,23 @@ def close(self): def commit(self): """Commit all the pending transactions.""" - warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2) + if self.autocommit: + warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2) + else: + for transaction in self.transactions: + transaction.commit() + + self.transactions = [] def rollback(self): """Rollback all the pending transactions.""" - warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2) + if self.autocommit: + warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2) + else: + for transaction in self.transactions: + transaction.rollback() + + self.transactions = [] def __enter__(self): return self diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 73764b4c26..3d241c0f73 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -68,6 +68,7 @@ def __init__(self, connection): self._connection = connection self._is_closed = False + self.transaction = None # the number of rows to fetch at a time with fetchmany() self.arraysize = 1 @@ -88,6 +89,16 @@ def execute(self, sql, args=None): self._res = None + if not self._connection.autocommit: + if not self.transaction: + self.transaction = self._connection.session().transaction() + self.transaction.begin() + self._connection.transactions.append(self.transaction) + + self._res = self.transaction.execute_sql(sql) + self._itr = PeekIterator(self._res) + return + # Classify whether this is a read-only SQL statement. try: classification = classify_stmt(sql) From 4b7273fc46ff5b82ee050d45cee12a9aa41d8cd0 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 9 Oct 2020 17:16:58 +0300 Subject: [PATCH 02/12] feat: transactions management implementation --- google/cloud/spanner_dbapi/cursor.py | 6 +- tests/system/test_system.py | 163 +++++++++++++++++++++++++-- 2 files changed, 158 insertions(+), 11 deletions(-) diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 3d241c0f73..b6a07bdb09 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -90,7 +90,11 @@ def execute(self, sql, args=None): self._res = None if not self._connection.autocommit: - if not self.transaction: + if ( + not self.transaction + or self.transaction.committed + or self.transaction.rolled_back + ): self.transaction = self._connection.session().transaction() self.transaction.begin() self._connection.transactions.append(self.transaction) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 5710ba6ce6..950b03d62b 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -6,17 +6,160 @@ import unittest +# Copyright 2016 Google LLC All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +from google.api_core import exceptions + +from google.cloud.spanner import Client +from google.cloud.spanner import BurstyPool +from google.cloud.spanner_dbapi.connection import Connection + +from test_utils.retry import RetryErrors +from test_utils.system import unique_resource_id + + +CREATE_INSTANCE = ( + os.getenv("GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE") is not None +) + +if CREATE_INSTANCE: + INSTANCE_ID = "google-cloud" + unique_resource_id("-") +else: + INSTANCE_ID = os.environ.get( + "GOOGLE_CLOUD_TESTS_SPANNER_INSTANCE", "google-cloud-python-systest" + ) +EXISTING_INSTANCES = [] + +DDL_STATEMENTS = ( + """CREATE TABLE contacts ( + contact_id INT64, + first_name STRING(1024), + last_name STRING(1024), + email STRING(1024) + ) + PRIMARY KEY (contact_id)""", +) + + +class Config(object): + """Run-time configuration to be modified at set-up. + + This is a mutable stand-in to allow test set-up to modify + global state. + """ + + CLIENT = None + INSTANCE_CONFIG = None + INSTANCE = None + + +def _list_instances(): + return list(Config.CLIENT.list_instances()) -class TestSpannerDjangoDBAPI(unittest.TestCase): - def setUp(self): - # TODO: Implement this method - pass + +def setUpModule(): + Config.CLIENT = Client() + retry = RetryErrors(exceptions.ServiceUnavailable) + + configs = list(retry(Config.CLIENT.list_instance_configs)()) + + instances = retry(_list_instances)() + EXISTING_INSTANCES[:] = instances + + if CREATE_INSTANCE: + configs = [config for config in configs if "-us-" in config.name] + + if not configs: + raise ValueError("List instance configs failed in module set up.") + + Config.INSTANCE_CONFIG = configs[0] + config_name = configs[0].name + + Config.INSTANCE = Config.CLIENT.instance(INSTANCE_ID, config_name) + created_op = Config.INSTANCE.create() + created_op.result(30) # block until completion + else: + Config.INSTANCE = Config.CLIENT.instance(INSTANCE_ID) + Config.INSTANCE.reload() + + +def tearDownModule(): + if CREATE_INSTANCE: + Config.INSTANCE.delete() + + +class TestTransactionsManagement(unittest.TestCase): + DATABASE_NAME = "db-api-transactions-management" + + @classmethod + def setUpClass(cls): + cls._db = Config.INSTANCE.database( + cls.DATABASE_NAME, + ddl_statements=DDL_STATEMENTS, + pool=BurstyPool(labels={"testcase": "database_api"}), + ) + cls._db.create().result(30) # raises on failure / timeout. + + @classmethod + def tearDownClass(cls): + cls._db.drop() def tearDown(self): - # TODO: Implement this method - pass + with self._db.snapshot() as snapshot: + snapshot.execute_sql("DELETE FROM contacts WHERE true") + + def test_commit(self): + want_row = ( + 1, + "updated-first-name", + "last-name", + "test.email_updated@domen.ru", + ) + # connecting to the test database + conn = Connection(Config.INSTANCE, self._db) + conn.autocommit = False + cursor = conn.cursor() + + # executing several DML statements with one transaction + cursor.execute( + """ +INSERT INTO contacts (contact_id, first_name, last_name, email) +VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + cursor.execute( + """ +UPDATE contacts +SET first_name = 'updated-first-name' +WHERE first_name = 'first-name' +""" + ) + cursor.execute( + """ +UPDATE contacts +SET email = 'test.email_updated@domen.ru' +WHERE email = 'test.email@domen.ru' +""" + ) + conn.commit() + + # reading the resulting data from the database + cursor.execute("SELECT * FROM contacts") + got_rows = cursor.fetchall() + conn.commit() - def test_api(self): - # An dummy stub to avoid `exit code 5` errors - # TODO: Replace this with an actual system test method - self.assertTrue(True) + self.assertEqual(got_rows, [want_row]) From 44fd7dda9f63d20b0560c99f659fe75d3874130c Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Mon, 12 Oct 2020 12:08:00 +0300 Subject: [PATCH 03/12] add autocommit property setter and getter, add more system tests --- google/cloud/spanner_dbapi/connection.py | 35 +++++- google/cloud/spanner_dbapi/cursor.py | 2 +- tests/system/test_system.py | 133 ++++++++++++++++++++++- 3 files changed, 157 insertions(+), 13 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index c70e5d44ed..a8f6d3f814 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -34,8 +34,8 @@ class Connection: """ def __init__(self, instance, database): - self._sessions_pool = BurstyPool() - self._sessions_pool.bind(database) + self._pool = BurstyPool() + self._pool.bind(database) self.instance = instance self.database = database @@ -44,15 +44,34 @@ def __init__(self, instance, database): self.transactions = [] self.is_closed = False - self.autocommit = True + self._autocommit = False - def session(self): + @property + def autocommit(self): + """Autocommit mode flag for this connection. + + :rtype: bool + :returns: Autocommit mode flag value. + """ + return self._autocommit + + @autocommit.setter + def autocommit(self, value): + """Change this connection autocommit mode. + + :type value: bool + :param value: New autocommit mode state. + """ + if self._autocommit != value: + self.commit() + + def session_checkout(self): """Get a Cloud Spanner session. :rtype: :class:`google.cloud.spanner_v1.session.Session` :returns: Cloud Spanner session object ready to use. """ - return self._sessions_pool.get() + return self._pool.get() def cursor(self): self._raise_if_closed() @@ -153,8 +172,12 @@ def get_table_column_schema(self, table_name): def close(self): """Close this connection. - The connection will be unusable from this point forward. + The connection will be unusable from this point forward. Rollback + will be performed on all the pending transactions. """ + for transaction in self.transactions: + transaction.rollback() + self.__dbhandle = None self.is_closed = True diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index b6a07bdb09..73c7359d66 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -95,7 +95,7 @@ def execute(self, sql, args=None): or self.transaction.committed or self.transaction.rolled_back ): - self.transaction = self._connection.session().transaction() + self.transaction = self._connection.session_checkout().transaction() self.transaction.begin() self._connection.transactions.append(self.transaction) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 950b03d62b..7a42ca2fbd 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -98,15 +98,19 @@ def setUpModule(): def tearDownModule(): + """Delete the test instance, if it was created.""" if CREATE_INSTANCE: Config.INSTANCE.delete() class TestTransactionsManagement(unittest.TestCase): + """Transactions management support tests.""" + DATABASE_NAME = "db-api-transactions-management" @classmethod def setUpClass(cls): + """Create a test database.""" cls._db = Config.INSTANCE.database( cls.DATABASE_NAME, ddl_statements=DDL_STATEMENTS, @@ -116,25 +120,26 @@ def setUpClass(cls): @classmethod def tearDownClass(cls): + """Delete the test database.""" cls._db.drop() def tearDown(self): - with self._db.snapshot() as snapshot: - snapshot.execute_sql("DELETE FROM contacts WHERE true") + """Clear the test table after every test.""" + self._db.run_in_transaction(clear_table) def test_commit(self): + """Test committing a transaction with several statements.""" want_row = ( 1, "updated-first-name", "last-name", "test.email_updated@domen.ru", ) - # connecting to the test database + # connect to the test database conn = Connection(Config.INSTANCE, self._db) - conn.autocommit = False cursor = conn.cursor() - # executing several DML statements with one transaction + # execute several DML statements within one transaction cursor.execute( """ INSERT INTO contacts (contact_id, first_name, last_name, email) @@ -157,9 +162,125 @@ def test_commit(self): ) conn.commit() - # reading the resulting data from the database + # read the resulting data from the database + cursor.execute("SELECT * FROM contacts") + got_rows = cursor.fetchall() + conn.commit() + + self.assertEqual(got_rows, [want_row]) + + def test_rollback(self): + """Test rollbacking a transaction with several statements.""" + want_row = (2, "first-name", "last-name", "test.email@domen.ru") + # connect to the test database + conn = Connection(Config.INSTANCE, self._db) + cursor = conn.cursor() + + cursor.execute( + """ +INSERT INTO contacts (contact_id, first_name, last_name, email) +VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + conn.commit() + + # execute several DMLs with one transaction + cursor.execute( + """ +UPDATE contacts +SET first_name = 'updated-first-name' +WHERE first_name = 'first-name' +""" + ) + cursor.execute( + """ +UPDATE contacts +SET email = 'test.email_updated@domen.ru' +WHERE email = 'test.email@domen.ru' +""" + ) + conn.rollback() + + # read the resulting data from the database + cursor.execute("SELECT * FROM contacts") + got_rows = cursor.fetchall() + conn.commit() + + self.assertEqual(got_rows, [want_row]) + + def test_autocommit_mode_change(self): + """Test auto committing a transaction on `autocommit` mode change.""" + want_row = ( + 2, + "updated-first-name", + "last-name", + "test.email@domen.ru", + ) + # connect to the test database + conn = Connection(Config.INSTANCE, self._db) + cursor = conn.cursor() + + cursor.execute( + """ +INSERT INTO contacts (contact_id, first_name, last_name, email) +VALUES (2, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + cursor.execute( + """ +UPDATE contacts +SET first_name = 'updated-first-name' +WHERE first_name = 'first-name' +""" + ) + conn.autocommit = True + + # read the resulting data from the database + cursor.execute("SELECT * FROM contacts") + got_rows = cursor.fetchall() + conn.commit() + + self.assertEqual(got_rows, [want_row]) + + def test_rollback_on_connection_closing(self): + """ + When closing a connection all the pending transactions + must be rollbacked. Testing if it's working this way. + """ + want_row = (1, "first-name", "last-name", "test.email@domen.ru") + # connect to the test database + conn = Connection(Config.INSTANCE, self._db) + cursor = conn.cursor() + + cursor.execute( + """ +INSERT INTO contacts (contact_id, first_name, last_name, email) +VALUES (1, 'first-name', 'last-name', 'test.email@domen.ru') + """ + ) + conn.commit() + + cursor.execute( + """ +UPDATE contacts +SET first_name = 'updated-first-name' +WHERE first_name = 'first-name' +""" + ) + conn.close() + + # connect again, as the previous connection is no-op after closing + conn = Connection(Config.INSTANCE, self._db) + cursor = conn.cursor() + + # read the resulting data from the database cursor.execute("SELECT * FROM contacts") got_rows = cursor.fetchall() conn.commit() self.assertEqual(got_rows, [want_row]) + + +def clear_table(transaction): + """Clear the test table.""" + transaction.execute_update("DELETE FROM contacts WHERE true") From dc7e5a4bf6b1f4132652f999343ee5cc4e12473c Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Mon, 12 Oct 2020 12:33:11 +0300 Subject: [PATCH 04/12] fix unit tests --- google/cloud/spanner_dbapi/connection.py | 4 +++- tests/spanner_dbapi/test_connection.py | 3 ++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index a8f6d3f814..30eff9b031 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -62,9 +62,11 @@ def autocommit(self, value): :type value: bool :param value: New autocommit mode state. """ - if self._autocommit != value: + if value and not self._autocommit: self.commit() + self._autocommit = value + def session_checkout(self): """Get a Cloud Spanner session. diff --git a/tests/spanner_dbapi/test_connection.py b/tests/spanner_dbapi/test_connection.py index 1b285a933d..2e2b844f21 100644 --- a/tests/spanner_dbapi/test_connection.py +++ b/tests/spanner_dbapi/test_connection.py @@ -49,8 +49,9 @@ def test_close(self): connection.cursor() @mock.patch("warnings.warn") - def test_transaction_management_warnings(self, warn_mock): + def test_transaction_autocommit_warnings(self, warn_mock): connection = self._make_connection() + connection.autocommit = True connection.commit() warn_mock.assert_called_with( From 75e068524d5cfeafa3fe6d787c6f5f328ca1e62f Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 13 Oct 2020 13:08:41 +0300 Subject: [PATCH 05/12] use global sessions pool, use single transaction for single connection, return used sessions back to the pool --- google/cloud/spanner_dbapi/__init__.py | 6 +- google/cloud/spanner_dbapi/connection.py | 74 +++++++++++++++++------- google/cloud/spanner_dbapi/cursor.py | 14 +---- tests/system/test_system.py | 13 ++++- 4 files changed, 74 insertions(+), 33 deletions(-) diff --git a/google/cloud/spanner_dbapi/__init__.py b/google/cloud/spanner_dbapi/__init__.py index 014d82d3cc..90b38be049 100644 --- a/google/cloud/spanner_dbapi/__init__.py +++ b/google/cloud/spanner_dbapi/__init__.py @@ -47,6 +47,10 @@ # threadsafety level. threadsafety = 1 +# Cloud Spanner sessions pool, used by +# default for the whole DB API package +default_pool = spanner_v1.BurstyPool() + def connect( instance_id, database_id, project=None, credentials=None, user_agent=None @@ -93,7 +97,7 @@ def connect( if not database.exists(): raise ValueError("database '%s' does not exist." % database_id) - return Connection(instance, database) + return Connection(instance, database, default_pool) __all__ = [ diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index 30eff9b031..d7772d66d2 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -31,17 +31,21 @@ class Connection: :type database: :class:`~google.cloud.spanner_v1.database.Database` :param database: Cloud Spanner database to connect to. + + :type pool: :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool` + :param pool: (Optional) Cloud Spanner sessions pool. """ - def __init__(self, instance, database): - self._pool = BurstyPool() + def __init__(self, instance, database, pool=None): + self._pool = pool or BurstyPool() self._pool.bind(database) self.instance = instance self.database = database self._ddl_statements = [] - self.transactions = [] + self._transaction = None + self._session = None self.is_closed = False self._autocommit = False @@ -67,13 +71,47 @@ def autocommit(self, value): self._autocommit = value - def session_checkout(self): - """Get a Cloud Spanner session. + def _session_checkout(self): + """Get a Cloud Spanner session from a pool. + + If there is already a session associated with + this connection, it'll be used otherwise. :rtype: :class:`google.cloud.spanner_v1.session.Session` :returns: Cloud Spanner session object ready to use. """ - return self._pool.get() + if not self._session: + self._session = self._pool.get() + + return self._session + + def _release_session(self): + """Release the currently used Spanner session. + + The session will be returned into the sessions pool. + """ + self._pool.put(self._session) + self._session = None + + def transaction_checkout(self): + """Get a Cloud Spanner transaction. + + Begin a new transaction, if there is no transaction in + this connection yet. Return the begun one otherwise. + + :rtype: :class:`google.cloud.spanner_v1.transaction.Transaction` + :returns: Cloud Spanner transaction object ready to use. + """ + if not self.autocommit: + if ( + not self._transaction + or self._transaction.committed + or self._transaction.rolled_back + ): + self._transaction = self._session_checkout().transaction() + self._transaction.begin() + + return self._transaction def cursor(self): self._raise_if_closed() @@ -174,11 +212,11 @@ def get_table_column_schema(self, table_name): def close(self): """Close this connection. - The connection will be unusable from this point forward. Rollback - will be performed on all the pending transactions. + The connection will be unusable from this point forward. If the + connection has an active transaction, it will be rolled back. """ - for transaction in self.transactions: - transaction.rollback() + if self._transaction and not self._transaction.committed: + self._transaction.rollback() self.__dbhandle = None self.is_closed = True @@ -187,21 +225,17 @@ def commit(self): """Commit all the pending transactions.""" if self.autocommit: warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2) - else: - for transaction in self.transactions: - transaction.commit() - - self.transactions = [] + elif self._transaction: + self._transaction.commit() + self._release_session() def rollback(self): """Rollback all the pending transactions.""" if self.autocommit: warnings.warn(AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2) - else: - for transaction in self.transactions: - transaction.rollback() - - self.transactions = [] + elif self._transaction: + self._transaction.rollback() + self._release_session() def __enter__(self): return self diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 73c7359d66..19ac0be5df 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -68,7 +68,6 @@ def __init__(self, connection): self._connection = connection self._is_closed = False - self.transaction = None # the number of rows to fetch at a time with fetchmany() self.arraysize = 1 @@ -90,16 +89,9 @@ def execute(self, sql, args=None): self._res = None if not self._connection.autocommit: - if ( - not self.transaction - or self.transaction.committed - or self.transaction.rolled_back - ): - self.transaction = self._connection.session_checkout().transaction() - self.transaction.begin() - self._connection.transactions.append(self.transaction) - - self._res = self.transaction.execute_sql(sql) + transaction = self._connection.transaction_checkout() + + self._res = transaction.execute_sql(sql) self._itr = PeekIterator(self._res) return diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 7a42ca2fbd..d4039c5e38 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -169,6 +169,9 @@ def test_commit(self): self.assertEqual(got_rows, [want_row]) + cursor.close() + conn.close() + def test_rollback(self): """Test rollbacking a transaction with several statements.""" want_row = (2, "first-name", "last-name", "test.email@domen.ru") @@ -208,6 +211,9 @@ def test_rollback(self): self.assertEqual(got_rows, [want_row]) + cursor.close() + conn.close() + def test_autocommit_mode_change(self): """Test auto committing a transaction on `autocommit` mode change.""" want_row = ( @@ -238,10 +244,12 @@ def test_autocommit_mode_change(self): # read the resulting data from the database cursor.execute("SELECT * FROM contacts") got_rows = cursor.fetchall() - conn.commit() self.assertEqual(got_rows, [want_row]) + cursor.close() + conn.close() + def test_rollback_on_connection_closing(self): """ When closing a connection all the pending transactions @@ -280,6 +288,9 @@ def test_rollback_on_connection_closing(self): self.assertEqual(got_rows, [want_row]) + cursor.close() + conn.close() + def clear_table(transaction): """Clear the test table.""" From 0f690358a50671ce2e84033113bc3eb4191a22f4 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 14 Oct 2020 12:10:08 +0300 Subject: [PATCH 06/12] del excess license lines --- tests/system/test_system.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index d4039c5e38..8c1c3bb1ed 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -5,21 +5,6 @@ # https://developers.google.com/open-source/licenses/bsd import unittest - -# Copyright 2016 Google LLC All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - import os from google.api_core import exceptions From 79666d7900496579a8e77763c69e51b8aed1302f Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 16 Oct 2020 10:40:27 +0300 Subject: [PATCH 07/12] don't run DDL in transactions, fix system tests on emulator --- google/cloud/spanner_dbapi/cursor.py | 17 ++++++++++------- tests/system/test_system.py | 12 +++++++++++- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 19ac0be5df..1e7cdce9b3 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -88,16 +88,10 @@ def execute(self, sql, args=None): self._res = None - if not self._connection.autocommit: - transaction = self._connection.transaction_checkout() - - self._res = transaction.execute_sql(sql) - self._itr = PeekIterator(self._res) - return - # Classify whether this is a read-only SQL statement. try: classification = classify_stmt(sql) + if classification == STMT_DDL: self._connection.append_ddl_statement(sql) return @@ -106,6 +100,15 @@ def execute(self, sql, args=None): # any prior DDL statements were run. self._run_prior_DDL_statements() + if not self._connection.autocommit: + transaction = self._connection.transaction_checkout() + + sql, params = sql_pyformat_args_to_spanner(sql, args) + + self._res = transaction.execute_sql(sql, params) + self._itr = PeekIterator(self._res) + return + if classification == STMT_NON_UPDATING: self.__handle_DQL(sql, args or None) elif classification == STMT_INSERT: diff --git a/tests/system/test_system.py b/tests/system/test_system.py index 8c1c3bb1ed..c2ccc09f09 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -20,6 +20,7 @@ CREATE_INSTANCE = ( os.getenv("GOOGLE_CLOUD_TESTS_CREATE_SPANNER_INSTANCE") is not None ) +USE_EMULATOR = os.getenv("SPANNER_EMULATOR_HOST") is not None if CREATE_INSTANCE: INSTANCE_ID = "google-cloud" + unique_resource_id("-") @@ -57,7 +58,16 @@ def _list_instances(): def setUpModule(): - Config.CLIENT = Client() + if USE_EMULATOR: + from google.auth.credentials import AnonymousCredentials + + emulator_project = os.getenv("GCLOUD_PROJECT", "emulator-test-project") + Config.CLIENT = Client( + project=emulator_project, credentials=AnonymousCredentials() + ) + else: + Config.CLIENT = Client() + retry = RetryErrors(exceptions.ServiceUnavailable) configs = list(retry(Config.CLIENT.list_instance_configs)()) From 84d67fe48462010bcc155b01f266f3c397aadfeb Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 16 Oct 2020 10:44:43 +0300 Subject: [PATCH 08/12] fix emulator config --- tests/system/test_system.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/system/test_system.py b/tests/system/test_system.py index c2ccc09f09..f3ee345e15 100644 --- a/tests/system/test_system.py +++ b/tests/system/test_system.py @@ -76,7 +76,10 @@ def setUpModule(): EXISTING_INSTANCES[:] = instances if CREATE_INSTANCE: - configs = [config for config in configs if "-us-" in config.name] + if not USE_EMULATOR: + # Defend against back-end returning configs for regions we aren't + # actually allowed to use. + configs = [config for config in configs if "-us-" in config.name] if not configs: raise ValueError("List instance configs failed in module set up.") From 61e2af86ef13ddd6414fe412d221a3e2981ac278 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Fri, 16 Oct 2020 11:09:52 +0300 Subject: [PATCH 09/12] don't roll back transactions which are already rolled back on connection close --- google/cloud/spanner_dbapi/connection.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index d7772d66d2..c0ebf867aa 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -215,7 +215,11 @@ def close(self): The connection will be unusable from this point forward. If the connection has an active transaction, it will be rolled back. """ - if self._transaction and not self._transaction.committed: + if ( + self._transaction + and not self._transaction.committed + and not self._transaction.rolled_back + ): self._transaction.rollback() self.__dbhandle = None From efbdab3b4c6435dbd09a7bfadccaa4999f9af416 Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Tue, 20 Oct 2020 13:11:03 +0300 Subject: [PATCH 10/12] some fixes --- google/cloud/spanner_dbapi/__init__.py | 19 +++++++--- google/cloud/spanner_dbapi/config.py | 12 ++++++ google/cloud/spanner_dbapi/connection.py | 47 +++++++++++++++++++++--- google/cloud/spanner_dbapi/cursor.py | 4 +- tests/spanner_dbapi/test_connect.py | 19 +++++++++- tests/spanner_dbapi/test_connection.py | 22 ++++++++++- 6 files changed, 110 insertions(+), 13 deletions(-) create mode 100644 google/cloud/spanner_dbapi/config.py diff --git a/google/cloud/spanner_dbapi/__init__.py b/google/cloud/spanner_dbapi/__init__.py index 90b38be049..930795d286 100644 --- a/google/cloud/spanner_dbapi/__init__.py +++ b/google/cloud/spanner_dbapi/__init__.py @@ -6,8 +6,11 @@ """Connection-based DB API for Cloud Spanner.""" +import atexit + from google.cloud import spanner_v1 +from google.cloud.spanner_dbapi import config from .connection import Connection from .exceptions import ( DatabaseError, @@ -47,10 +50,6 @@ # threadsafety level. threadsafety = 1 -# Cloud Spanner sessions pool, used by -# default for the whole DB API package -default_pool = spanner_v1.BurstyPool() - def connect( instance_id, database_id, project=None, credentials=None, user_agent=None @@ -97,7 +96,17 @@ def connect( if not database.exists(): raise ValueError("database '%s' does not exist." % database_id) - return Connection(instance, database, default_pool) + if config.default_pool is None: + config.default_pool = spanner_v1.BurstyPool() + + return Connection(instance, database, config.default_pool) + + +@atexit.register +def _cleanup(): + """Clear the sessions pool on a program termination.""" + if config.default_pool is not None: + config.default_pool.clear() __all__ = [ diff --git a/google/cloud/spanner_dbapi/config.py b/google/cloud/spanner_dbapi/config.py new file mode 100644 index 0000000000..bc3e994881 --- /dev/null +++ b/google/cloud/spanner_dbapi/config.py @@ -0,0 +1,12 @@ +# Copyright 2020 Google LLC +# +# Use of this source code is governed by a BSD-style +# license that can be found in the LICENSE file or at +# https://developers.google.com/open-source/licenses/bsd + +"""Configurations module for DB API.""" + +# Cloud Spanner sessions pool, used by +# default for the whole DB API package. +# Is lazily initiated on a connection creation. +default_pool = None diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index c0ebf867aa..b72941ad0b 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -13,7 +13,7 @@ from google.cloud.spanner_v1.pool import BurstyPool from .cursor import Cursor -from .exceptions import InterfaceError +from .exceptions import InterfaceError, ProgrammingError AUTOCOMMIT_MODE_WARNING = "This method is non-operational in autocommit mode" @@ -40,8 +40,8 @@ def __init__(self, instance, database, pool=None): self._pool = pool or BurstyPool() self._pool.bind(database) - self.instance = instance - self.database = database + self._instance = instance + self._database = database self._ddl_statements = [] self._transaction = None @@ -71,11 +71,49 @@ def autocommit(self, value): self._autocommit = value + @property + def database(self): + """Database to which this connection relates. + + :rtype: :class:`~google.cloud.spanner_v1.database.Database` + :returns: The related database object. + """ + return self._database + + @database.setter + def database(self, _): + """Related database setter. + + Restricts replacing the related database. + """ + raise ProgrammingError( + "Replacing the related database of the existing connection is restricted." + ) + + @property + def instance(self): + """Instance to which this connection relates. + + :rtype: :class:`~google.cloud.spanner_v1.instance.Instance` + :returns: The related instance object. + """ + return self._instance + + @instance.setter + def instance(self, _): + """Related instance setter. + + Restricts replacing the related instance. + """ + raise ProgrammingError( + "Replacing the related instance of the existing connection is restricted." + ) + def _session_checkout(self): """Get a Cloud Spanner session from a pool. If there is already a session associated with - this connection, it'll be used otherwise. + this connection, it'll be used instead. :rtype: :class:`google.cloud.spanner_v1.session.Session` :returns: Cloud Spanner session object ready to use. @@ -222,7 +260,6 @@ def close(self): ): self._transaction.rollback() - self.__dbhandle = None self.is_closed = True def commit(self): diff --git a/google/cloud/spanner_dbapi/cursor.py b/google/cloud/spanner_dbapi/cursor.py index 1e7cdce9b3..95eae50e1a 100644 --- a/google/cloud/spanner_dbapi/cursor.py +++ b/google/cloud/spanner_dbapi/cursor.py @@ -105,7 +105,9 @@ def execute(self, sql, args=None): sql, params = sql_pyformat_args_to_spanner(sql, args) - self._res = transaction.execute_sql(sql, params) + self._res = transaction.execute_sql( + sql, params, param_types=get_param_types(params) + ) self._itr = PeekIterator(self._res) return diff --git a/tests/spanner_dbapi/test_connect.py b/tests/spanner_dbapi/test_connect.py index 260d3a0993..41d4808e59 100644 --- a/tests/spanner_dbapi/test_connect.py +++ b/tests/spanner_dbapi/test_connect.py @@ -11,7 +11,7 @@ import google.auth.credentials from google.api_core.gapic_v1.client_info import ClientInfo -from google.cloud.spanner_dbapi import connect, Connection +from google.cloud.spanner_dbapi import config, connect, Connection def _make_credentials(): @@ -108,3 +108,20 @@ def test_connect_database_id(self): database_mock.assert_called_once_with(DATABASE, pool=mock.ANY) self.assertIsInstance(connection, Connection) + + def test_pool_reuse(self): + DATABASE = "test-database" + DATABASE2 = "test-database2" + + with mock.patch("google.cloud.spanner_v1.instance.Instance.database"): + with mock.patch( + "google.cloud.spanner_v1.instance.Instance.exists", + return_value=True, + ): + connection = connect("test-instance", DATABASE) + + self.assertIsNotNone(config.default_pool) + self.assertEqual(connection._pool, config.default_pool) + + connection2 = connect("test-instance", DATABASE2) + self.assertEqual(connection._pool, connection2._pool) diff --git a/tests/spanner_dbapi/test_connection.py b/tests/spanner_dbapi/test_connection.py index 2e2b844f21..283e198eb7 100644 --- a/tests/spanner_dbapi/test_connection.py +++ b/tests/spanner_dbapi/test_connection.py @@ -11,7 +11,11 @@ # import google.cloud.spanner_dbapi.exceptions as dbapi_exceptions -from google.cloud.spanner_dbapi import Connection, InterfaceError +from google.cloud.spanner_dbapi import ( + Connection, + InterfaceError, + ProgrammingError, +) from google.cloud.spanner_dbapi.connection import AUTOCOMMIT_MODE_WARNING from google.cloud.spanner_v1.database import Database from google.cloud.spanner_v1.instance import Instance @@ -61,3 +65,19 @@ def test_transaction_autocommit_warnings(self, warn_mock): warn_mock.assert_called_with( AUTOCOMMIT_MODE_WARNING, UserWarning, stacklevel=2 ) + + def test_database_property(self): + connection = self._make_connection() + self.assertIsInstance(connection.database, Database) + self.assertEqual(connection.database, connection._database) + + with self.assertRaises(ProgrammingError): + connection.database = None + + def test_instance_property(self): + connection = self._make_connection() + self.assertIsInstance(connection.instance, Instance) + self.assertEqual(connection.instance, connection._instance) + + with self.assertRaises(ProgrammingError): + connection.instance = None From 5e23e591580b789252b18956efdb81f2a5d2281a Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Wed, 21 Oct 2020 11:29:02 +0300 Subject: [PATCH 11/12] don't use global pool --- google/cloud/spanner_dbapi/__init__.py | 33 +++++++++----------- google/cloud/spanner_dbapi/config.py | 12 -------- google/cloud/spanner_dbapi/connection.py | 39 ++++-------------------- tests/spanner_dbapi/test_connect.py | 30 +++++++++++------- tests/spanner_dbapi/test_connection.py | 10 ++---- 5 files changed, 43 insertions(+), 81 deletions(-) delete mode 100644 google/cloud/spanner_dbapi/config.py diff --git a/google/cloud/spanner_dbapi/__init__.py b/google/cloud/spanner_dbapi/__init__.py index 930795d286..0bb37492db 100644 --- a/google/cloud/spanner_dbapi/__init__.py +++ b/google/cloud/spanner_dbapi/__init__.py @@ -6,11 +6,8 @@ """Connection-based DB API for Cloud Spanner.""" -import atexit - from google.cloud import spanner_v1 -from google.cloud.spanner_dbapi import config from .connection import Connection from .exceptions import ( DatabaseError, @@ -52,7 +49,12 @@ def connect( - instance_id, database_id, project=None, credentials=None, user_agent=None + instance_id, + database_id, + project=None, + credentials=None, + pool=None, + user_agent=None, ): """ Create a connection to Cloud Spanner database. @@ -74,6 +76,13 @@ def connect( If none are specified, the client will attempt to ascertain the credentials from the environment. + :type pool: Concrete subclass of + :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool`. + :param pool: (Optional). Session pool to be used by database. + + :type user_agent: :class:`str` + :param user_agent: (Optional) User agent to be used with this connection requests. + :rtype: :class:`google.cloud.spanner_dbapi.connection.Connection` :returns: Connection object associated with the given Cloud Spanner resource. @@ -90,23 +99,11 @@ def connect( if not instance.exists(): raise ValueError("instance '%s' does not exist." % instance_id) - database = instance.database( - database_id, pool=spanner_v1.pool.BurstyPool() - ) + database = instance.database(database_id, pool=pool) if not database.exists(): raise ValueError("database '%s' does not exist." % database_id) - if config.default_pool is None: - config.default_pool = spanner_v1.BurstyPool() - - return Connection(instance, database, config.default_pool) - - -@atexit.register -def _cleanup(): - """Clear the sessions pool on a program termination.""" - if config.default_pool is not None: - config.default_pool.clear() + return Connection(instance, database) __all__ = [ diff --git a/google/cloud/spanner_dbapi/config.py b/google/cloud/spanner_dbapi/config.py deleted file mode 100644 index bc3e994881..0000000000 --- a/google/cloud/spanner_dbapi/config.py +++ /dev/null @@ -1,12 +0,0 @@ -# Copyright 2020 Google LLC -# -# Use of this source code is governed by a BSD-style -# license that can be found in the LICENSE file or at -# https://developers.google.com/open-source/licenses/bsd - -"""Configurations module for DB API.""" - -# Cloud Spanner sessions pool, used by -# default for the whole DB API package. -# Is lazily initiated on a connection creation. -default_pool = None diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index b72941ad0b..c902e1d9bd 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -10,10 +10,9 @@ import warnings from google.cloud import spanner_v1 -from google.cloud.spanner_v1.pool import BurstyPool from .cursor import Cursor -from .exceptions import InterfaceError, ProgrammingError +from .exceptions import InterfaceError AUTOCOMMIT_MODE_WARNING = "This method is non-operational in autocommit mode" @@ -31,15 +30,9 @@ class Connection: :type database: :class:`~google.cloud.spanner_v1.database.Database` :param database: Cloud Spanner database to connect to. - - :type pool: :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool` - :param pool: (Optional) Cloud Spanner sessions pool. """ - def __init__(self, instance, database, pool=None): - self._pool = pool or BurstyPool() - self._pool.bind(database) - + def __init__(self, instance, database): self._instance = instance self._database = database @@ -80,16 +73,6 @@ def database(self): """ return self._database - @database.setter - def database(self, _): - """Related database setter. - - Restricts replacing the related database. - """ - raise ProgrammingError( - "Replacing the related database of the existing connection is restricted." - ) - @property def instance(self): """Instance to which this connection relates. @@ -99,18 +82,8 @@ def instance(self): """ return self._instance - @instance.setter - def instance(self, _): - """Related instance setter. - - Restricts replacing the related instance. - """ - raise ProgrammingError( - "Replacing the related instance of the existing connection is restricted." - ) - def _session_checkout(self): - """Get a Cloud Spanner session from a pool. + """Get a Cloud Spanner session from the pool. If there is already a session associated with this connection, it'll be used instead. @@ -119,7 +92,7 @@ def _session_checkout(self): :returns: Cloud Spanner session object ready to use. """ if not self._session: - self._session = self._pool.get() + self._session = self.database._pool.get() return self._session @@ -128,7 +101,7 @@ def _release_session(self): The session will be returned into the sessions pool. """ - self._pool.put(self._session) + self.database._pool.put(self._session) self._session = None def transaction_checkout(self): @@ -138,7 +111,7 @@ def transaction_checkout(self): this connection yet. Return the begun one otherwise. :rtype: :class:`google.cloud.spanner_v1.transaction.Transaction` - :returns: Cloud Spanner transaction object ready to use. + :returns: A Cloud Spanner transaction object, ready to use. """ if not self.autocommit: if ( diff --git a/tests/spanner_dbapi/test_connect.py b/tests/spanner_dbapi/test_connect.py index 41d4808e59..fb4d89c373 100644 --- a/tests/spanner_dbapi/test_connect.py +++ b/tests/spanner_dbapi/test_connect.py @@ -11,7 +11,8 @@ import google.auth.credentials from google.api_core.gapic_v1.client_info import ClientInfo -from google.cloud.spanner_dbapi import config, connect, Connection +from google.cloud.spanner_dbapi import connect, Connection +from google.cloud.spanner_v1.pool import FixedSizePool def _make_credentials(): @@ -43,7 +44,7 @@ def test_connect(self): "test-database", PROJECT, CREDENTIALS, - USER_AGENT, + user_agent=USER_AGENT, ) self.assertIsInstance(connection, Connection) @@ -109,19 +110,26 @@ def test_connect_database_id(self): self.assertIsInstance(connection, Connection) - def test_pool_reuse(self): - DATABASE = "test-database" - DATABASE2 = "test-database2" - + def test_default_sessions_pool(self): with mock.patch("google.cloud.spanner_v1.instance.Instance.database"): with mock.patch( "google.cloud.spanner_v1.instance.Instance.exists", return_value=True, ): - connection = connect("test-instance", DATABASE) + connection = connect("test-instance", "test-database") - self.assertIsNotNone(config.default_pool) - self.assertEqual(connection._pool, config.default_pool) + self.assertIsNotNone(connection.database._pool) - connection2 = connect("test-instance", DATABASE2) - self.assertEqual(connection._pool, connection2._pool) + def test_sessions_pool(self): + database_id = "test-database" + pool = FixedSizePool() + + with mock.patch( + "google.cloud.spanner_v1.instance.Instance.database" + ) as database_mock: + with mock.patch( + "google.cloud.spanner_v1.instance.Instance.exists", + return_value=True, + ): + connect("test-instance", database_id, pool=pool) + database_mock.assert_called_once_with(database_id, pool=pool) diff --git a/tests/spanner_dbapi/test_connection.py b/tests/spanner_dbapi/test_connection.py index 283e198eb7..24260de12e 100644 --- a/tests/spanner_dbapi/test_connection.py +++ b/tests/spanner_dbapi/test_connection.py @@ -11,11 +11,7 @@ # import google.cloud.spanner_dbapi.exceptions as dbapi_exceptions -from google.cloud.spanner_dbapi import ( - Connection, - InterfaceError, - ProgrammingError, -) +from google.cloud.spanner_dbapi import Connection, InterfaceError from google.cloud.spanner_dbapi.connection import AUTOCOMMIT_MODE_WARNING from google.cloud.spanner_v1.database import Database from google.cloud.spanner_v1.instance import Instance @@ -71,7 +67,7 @@ def test_database_property(self): self.assertIsInstance(connection.database, Database) self.assertEqual(connection.database, connection._database) - with self.assertRaises(ProgrammingError): + with self.assertRaises(AttributeError): connection.database = None def test_instance_property(self): @@ -79,5 +75,5 @@ def test_instance_property(self): self.assertIsInstance(connection.instance, Instance) self.assertEqual(connection.instance, connection._instance) - with self.assertRaises(ProgrammingError): + with self.assertRaises(AttributeError): connection.instance = None From ba1df3654f75669b6926a7343a4eb679174abd5a Mon Sep 17 00:00:00 2001 From: IlyaFaer Date: Thu, 22 Oct 2020 10:57:48 +0300 Subject: [PATCH 12/12] add autocommit mode note --- google/cloud/spanner_dbapi/connection.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google/cloud/spanner_dbapi/connection.py b/google/cloud/spanner_dbapi/connection.py index c902e1d9bd..8907e65c03 100644 --- a/google/cloud/spanner_dbapi/connection.py +++ b/google/cloud/spanner_dbapi/connection.py @@ -110,6 +110,8 @@ def transaction_checkout(self): Begin a new transaction, if there is no transaction in this connection yet. Return the begun one otherwise. + The method is non operational in autocommit mode. + :rtype: :class:`google.cloud.spanner_v1.transaction.Transaction` :returns: A Cloud Spanner transaction object, ready to use. """