From ac25604eef2e1df667182e97b4d290d0e04153c6 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Wed, 22 Nov 2023 22:44:43 +0000 Subject: [PATCH 01/24] proto generation --- google/cloud/spanner_v1/types/spanner.py | 14 ++++++++++++++ scripts/fixup_spanner_v1_keywords.py | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/google/cloud/spanner_v1/types/spanner.py b/google/cloud/spanner_v1/types/spanner.py index 3dbacbe26b..5db82421c2 100644 --- a/google/cloud/spanner_v1/types/spanner.py +++ b/google/cloud/spanner_v1/types/spanner.py @@ -24,6 +24,7 @@ from google.cloud.spanner_v1.types import result_set from google.cloud.spanner_v1.types import transaction as gs_transaction from google.cloud.spanner_v1.types import type as gs_type +from google.protobuf import duration_pb2 # type: ignore from google.protobuf import struct_pb2 # type: ignore from google.protobuf import timestamp_pb2 # type: ignore from google.rpc import status_pb2 # type: ignore @@ -1434,6 +1435,14 @@ class CommitRequest(proto.Message): be included in the [CommitResponse][google.spanner.v1.CommitResponse.commit_stats]. Default value is ``false``. + max_commit_delay (google.protobuf.duration_pb2.Duration): + The amount of latency this request is willing + to incur in order to improve throughput. If this + field is not set, Spanner assumes requests are + relatively latency sensitive and automatically + determines an appropriate delay time. You can + specify a batching delay value between 0 and 500 + ms. request_options (google.cloud.spanner_v1.types.RequestOptions): Common options for this request. """ @@ -1462,6 +1471,11 @@ class CommitRequest(proto.Message): proto.BOOL, number=5, ) + max_commit_delay: duration_pb2.Duration = proto.Field( + proto.MESSAGE, + number=8, + message=duration_pb2.Duration, + ) request_options: "RequestOptions" = proto.Field( proto.MESSAGE, number=6, diff --git a/scripts/fixup_spanner_v1_keywords.py b/scripts/fixup_spanner_v1_keywords.py index f79f70b2dd..939da961f0 100644 --- a/scripts/fixup_spanner_v1_keywords.py +++ b/scripts/fixup_spanner_v1_keywords.py @@ -42,7 +42,7 @@ class spannerCallTransformer(cst.CSTTransformer): 'batch_create_sessions': ('database', 'session_count', 'session_template', ), 'batch_write': ('session', 'mutation_groups', 'request_options', ), 'begin_transaction': ('session', 'options', 'request_options', ), - 'commit': ('session', 'transaction_id', 'single_use_transaction', 'mutations', 'return_commit_stats', 'request_options', ), + 'commit': ('session', 'transaction_id', 'single_use_transaction', 'mutations', 'return_commit_stats', 'max_commit_delay', 'request_options', ), 'create_session': ('database', 'session', ), 'delete_session': ('name', ), 'execute_batch_dml': ('session', 'transaction', 'statements', 'seqno', 'request_options', ), From 891d80073e4eb14fb4ca14993525be2d3a542a81 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Wed, 22 Nov 2023 23:19:31 +0000 Subject: [PATCH 02/24] max commit delay --- google/cloud/spanner_v1/batch.py | 11 ++++++++++- google/cloud/spanner_v1/transaction.py | 12 +++++++++++- 2 files changed, 21 insertions(+), 2 deletions(-) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 41e4460c30..b3799737db 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -30,6 +30,7 @@ from google.cloud.spanner_v1._helpers import _retry from google.cloud.spanner_v1._helpers import _check_rst_stream_error from google.api_core.exceptions import InternalServerError +from google.protobuf.duration_pb2 import Duration class _BatchBase(_SessionWrapper): @@ -145,7 +146,7 @@ def _check_state(self): if self.committed is not None: raise ValueError("Batch already committed") - def commit(self, return_commit_stats=False, request_options=None): + def commit(self, return_commit_stats=False, request_options=None, max_commit_delay_ms=None): """Commit mutations to the database. :type return_commit_stats: bool @@ -158,6 +159,9 @@ def commit(self, return_commit_stats=False, request_options=None): (Optional) Common options for this request. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. + :param max_commit_delay_ms: + (Optional) The amount of latency this request is willing to incur + in order to improve throughput. :rtype: datetime :returns: timestamp of the committed changes. @@ -182,11 +186,16 @@ def commit(self, return_commit_stats=False, request_options=None): # Request tags are not supported for commit requests. request_options.request_tag = None + Duration max_commit_delay = None + if max_commit_delay_ms is not None: + max_commit_delay.nanos = 1000000 * [max_commit_delay_ms] + request = CommitRequest( session=self._session.name, mutations=self._mutations, single_use_transaction=txn_options, return_commit_stats=return_commit_stats, + max_commit_delay=max_commit_delay, request_options=request_options, ) with trace_call("CloudSpanner.Commit", self._session, trace_attributes): diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index d564d0d488..6761e867f3 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -36,6 +36,7 @@ from google.cloud.spanner_v1 import RequestOptions from google.api_core import gapic_v1 from google.api_core.exceptions import InternalServerError +from google.protobuf.duration_pb2 import Duration class Transaction(_SnapshotBase, _BatchBase): @@ -180,7 +181,7 @@ def rollback(self): self.rolled_back = True del self._session._transaction - def commit(self, return_commit_stats=False, request_options=None): + def commit(self, return_commit_stats=False, request_options=None, max_commit_delay_ms=None): """Commit mutations to the database. :type return_commit_stats: bool @@ -193,6 +194,10 @@ def commit(self, return_commit_stats=False, request_options=None): (Optional) Common options for this request. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. + :param max_commit_delay_ms: + (Optional) The amount of latency this request is willing to incur + in order to improve throughput. + :class:`~google.cloud.spanner_v1.types.MaxCommitDelay`. :rtype: datetime :returns: timestamp of the committed changes. @@ -223,11 +228,16 @@ def commit(self, return_commit_stats=False, request_options=None): # Request tags are not supported for commit requests. request_options.request_tag = None + Duration max_commit_delay = None + if max_commit_delay_ms is not None: + max_commit_delay.nanos = 1000000 * [max_commit_delay_ms] + request = CommitRequest( session=self._session.name, mutations=self._mutations, transaction_id=self._transaction_id, return_commit_stats=return_commit_stats, + max_commit_delay=max_commit_delay, request_options=request_options, ) with trace_call("CloudSpanner.Commit", self._session, trace_attributes): From 5a1a797fb3c6c577c29cb88ed5f7a08bccf83056 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 5 Dec 2023 18:40:53 +0000 Subject: [PATCH 03/24] Fix some errors --- google/cloud/spanner_v1/batch.py | 6 +++--- google/cloud/spanner_v1/transaction.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index b3799737db..1d03c5403f 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -186,9 +186,9 @@ def commit(self, return_commit_stats=False, request_options=None, max_commit_del # Request tags are not supported for commit requests. request_options.request_tag = None - Duration max_commit_delay = None - if max_commit_delay_ms is not None: - max_commit_delay.nanos = 1000000 * [max_commit_delay_ms] + max_commit_delay = None + if max_commit_delay_ms: + max_commit_delay = Duration.FromMilliseconds(max_commit_delay_ms) request = CommitRequest( session=self._session.name, diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 6761e867f3..35b0908384 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -228,9 +228,9 @@ def commit(self, return_commit_stats=False, request_options=None, max_commit_del # Request tags are not supported for commit requests. request_options.request_tag = None - Duration max_commit_delay = None - if max_commit_delay_ms is not None: - max_commit_delay.nanos = 1000000 * [max_commit_delay_ms] + max_commit_delay = None + if max_commit_delay_ms: + max_commit_delay = Duration.FromMilliseconds(max_commit_delay_ms) request = CommitRequest( session=self._session.name, From 9fb4162e256abc5e618a575565524e49daae2621 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 5 Dec 2023 20:39:41 +0000 Subject: [PATCH 04/24] Unit tests --- tests/unit/test_transaction.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index ffcffa115e..cdbbe98929 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -346,7 +346,7 @@ def test_commit_w_other_error(self): ) def _commit_helper( - self, mutate=True, return_commit_stats=False, request_options=None + self, mutate=True, return_commit_stats=False, request_options=None, max_commit_delay_ms=None ): import datetime from google.cloud.spanner_v1 import CommitResponse @@ -370,13 +370,15 @@ def _commit_helper( transaction.delete(TABLE_NAME, keyset) transaction.commit( - return_commit_stats=return_commit_stats, request_options=request_options + return_commit_stats=return_commit_stats, + request_options=request_options, + max_commit_delay_ms=max_commit_delay_ms ) self.assertEqual(transaction.committed, now) self.assertIsNone(session._transaction) - session_id, mutations, txn_id, actual_request_options, metadata = api._committed + session_id, mutations, txn_id, actual_request_options, max_commit_delay, metadata = api._committed if request_options is None: expected_request_options = RequestOptions( @@ -391,6 +393,11 @@ def _commit_helper( expected_request_options.transaction_tag = self.TRANSACTION_TAG expected_request_options.request_tag = None + expected_max_commit_delay = None + if max_commit_delay_ms: + expected_max_commit_delay = Duration.FromMilliseconds(max_commit_delay_ms) + + self.assertEqual(expected_max_commit_delay, max_commit_delay) self.assertEqual(session_id, session.name) self.assertEqual(txn_id, self.TRANSACTION_ID) self.assertEqual(mutations, transaction._mutations) @@ -851,7 +858,7 @@ def test_context_mgr_success(self): self.assertEqual(transaction.committed, now) - session_id, mutations, txn_id, _, metadata = api._committed + session_id, mutations, txn_id, _, _, metadata = api._committed self.assertEqual(session_id, self.SESSION_NAME) self.assertEqual(txn_id, self.TRANSACTION_ID) self.assertEqual(mutations, transaction._mutations) @@ -936,11 +943,14 @@ def commit( metadata=None, ): assert not request.single_use_transaction + assert request.max_commit_delay is None self._committed = ( request.session, request.mutations, request.transaction_id, request.request_options, + request.max_commit_delay, +# None, metadata, ) return self._commit_response From 7c88b09094c415578808c32511b32a3f853c5aeb Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 5 Dec 2023 20:44:13 +0000 Subject: [PATCH 05/24] regenerate proto changes --- google/cloud/spanner_v1/batch.py | 4 +++- google/cloud/spanner_v1/transaction.py | 4 +++- google/cloud/spanner_v1/types/spanner.py | 10 +++------- tests/unit/test_transaction.py | 19 +++++++++++++++---- 4 files changed, 24 insertions(+), 13 deletions(-) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 4be7cacdc0..7d1beb27ac 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -147,7 +147,9 @@ def _check_state(self): if self.committed is not None: raise ValueError("Batch already committed") - def commit(self, return_commit_stats=False, request_options=None, max_commit_delay_ms=None): + def commit( + self, return_commit_stats=False, request_options=None, max_commit_delay_ms=None + ): """Commit mutations to the database. :type return_commit_stats: bool diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 35b0908384..569e36a8e6 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -181,7 +181,9 @@ def rollback(self): self.rolled_back = True del self._session._transaction - def commit(self, return_commit_stats=False, request_options=None, max_commit_delay_ms=None): + def commit( + self, return_commit_stats=False, request_options=None, max_commit_delay_ms=None + ): """Commit mutations to the database. :type return_commit_stats: bool diff --git a/google/cloud/spanner_v1/types/spanner.py b/google/cloud/spanner_v1/types/spanner.py index 5db82421c2..92b248982c 100644 --- a/google/cloud/spanner_v1/types/spanner.py +++ b/google/cloud/spanner_v1/types/spanner.py @@ -1436,13 +1436,8 @@ class CommitRequest(proto.Message): [CommitResponse][google.spanner.v1.CommitResponse.commit_stats]. Default value is ``false``. max_commit_delay (google.protobuf.duration_pb2.Duration): - The amount of latency this request is willing - to incur in order to improve throughput. If this - field is not set, Spanner assumes requests are - relatively latency sensitive and automatically - determines an appropriate delay time. You can - specify a batching delay value between 0 and 500 - ms. + + This field is a member of `oneof`_ ``_max_commit_delay``. request_options (google.cloud.spanner_v1.types.RequestOptions): Common options for this request. """ @@ -1474,6 +1469,7 @@ class CommitRequest(proto.Message): max_commit_delay: duration_pb2.Duration = proto.Field( proto.MESSAGE, number=8, + optional=True, message=duration_pb2.Duration, ) request_options: "RequestOptions" = proto.Field( diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index cdbbe98929..5ae267ab7a 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -346,7 +346,11 @@ def test_commit_w_other_error(self): ) def _commit_helper( - self, mutate=True, return_commit_stats=False, request_options=None, max_commit_delay_ms=None + self, + mutate=True, + return_commit_stats=False, + request_options=None, + max_commit_delay_ms=None, ): import datetime from google.cloud.spanner_v1 import CommitResponse @@ -372,13 +376,20 @@ def _commit_helper( transaction.commit( return_commit_stats=return_commit_stats, request_options=request_options, - max_commit_delay_ms=max_commit_delay_ms + max_commit_delay_ms=max_commit_delay_ms, ) self.assertEqual(transaction.committed, now) self.assertIsNone(session._transaction) - session_id, mutations, txn_id, actual_request_options, max_commit_delay, metadata = api._committed + ( + session_id, + mutations, + txn_id, + actual_request_options, + max_commit_delay, + metadata, + ) = api._committed if request_options is None: expected_request_options = RequestOptions( @@ -950,7 +961,7 @@ def commit( request.transaction_id, request.request_options, request.max_commit_delay, -# None, + # None, metadata, ) return self._commit_response From cd0f27801f8811621d3dceb2a64e6021aec7fbc1 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 5 Dec 2023 22:38:57 +0000 Subject: [PATCH 06/24] Fix unit tests --- tests/unit/test_transaction.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 5ae267ab7a..14055bdee3 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -16,6 +16,7 @@ import mock from google.cloud.spanner_v1 import RequestOptions +from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import Type from google.cloud.spanner_v1 import TypeCode from google.api_core.retry import Retry @@ -954,14 +955,17 @@ def commit( metadata=None, ): assert not request.single_use_transaction - assert request.max_commit_delay is None + + max_commit_delay = None + if CommitRequest.max_commit_delay in request: + max_commit_delay = request.max_commit_delay + self._committed = ( request.session, request.mutations, request.transaction_id, request.request_options, - request.max_commit_delay, - # None, + max_commit_delay, metadata, ) return self._commit_response From 680cb40d01d6b7fe8b6f29eb83469d60073f6eaf Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 5 Dec 2023 22:55:37 +0000 Subject: [PATCH 07/24] Finish test_transaction.py --- google/cloud/spanner_v1/transaction.py | 3 ++- tests/unit/test_transaction.py | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index 569e36a8e6..a3abba6b60 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -232,7 +232,8 @@ def commit( max_commit_delay = None if max_commit_delay_ms: - max_commit_delay = Duration.FromMilliseconds(max_commit_delay_ms) + max_commit_delay = Duration() + max_commit_delay.FromMilliseconds(millis=max_commit_delay_ms) request = CommitRequest( session=self._session.name, diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 14055bdee3..8ef5e65cd2 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -357,6 +357,7 @@ def _commit_helper( from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1.keyset import KeySet from google.cloud._helpers import UTC + from google.protobuf.duration_pb2 import Duration now = datetime.datetime.utcnow().replace(tzinfo=UTC) keys = [[0], [1], [2]] @@ -407,7 +408,7 @@ def _commit_helper( expected_max_commit_delay = None if max_commit_delay_ms: - expected_max_commit_delay = Duration.FromMilliseconds(max_commit_delay_ms) + expected_max_commit_delay = datetime.timedelta(milliseconds=max_commit_delay_ms) self.assertEqual(expected_max_commit_delay, max_commit_delay) self.assertEqual(session_id, session.name) @@ -442,6 +443,9 @@ def test_commit_w_mutations(self): def test_commit_w_return_commit_stats(self): self._commit_helper(return_commit_stats=True) + def test_commit_w_max_commit_delay(self): + self._commit_helper(max_commit_delay_ms=100) + def test_commit_w_request_tag_success(self): request_options = RequestOptions( request_tag="tag-1", From 4f7044d453594105c89e1b5a7f0983b7ed5589d7 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 5 Dec 2023 23:43:40 +0000 Subject: [PATCH 08/24] Finish test_batch.py --- google/cloud/spanner_v1/batch.py | 3 ++- tests/unit/test_batch.py | 38 ++++++++++++++++++++++++-------- 2 files changed, 31 insertions(+), 10 deletions(-) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 7d1beb27ac..e44ed28459 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -191,7 +191,8 @@ def commit( max_commit_delay = None if max_commit_delay_ms: - max_commit_delay = Duration.FromMilliseconds(max_commit_delay_ms) + max_commit_delay = Duration() + max_commit_delay.FromMilliseconds(max_commit_delay_ms) request = CommitRequest( session=self._session.name, diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index 203c8a0cb5..acaecfe2f6 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -233,7 +233,7 @@ def test_commit_ok(self): self.assertEqual(committed, now) self.assertEqual(batch.committed, committed) - (session, mutations, single_use_txn, request_options, metadata) = api._committed + (session, mutations, single_use_txn, request_options, max_commit_delay, metadata) = api._committed self.assertEqual(session, self.SESSION_NAME) self.assertEqual(mutations, batch._mutations) self.assertIsInstance(single_use_txn, TransactionOptions) @@ -246,12 +246,13 @@ def test_commit_ok(self): ], ) self.assertEqual(request_options, RequestOptions()) + self.assertEqual(max_commit_delay, None) self.assertSpanAttributes( "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) ) - def _test_commit_with_request_options(self, request_options=None): + def _test_commit_with_options(self, request_options=None, max_commit_delay_ms=None): import datetime from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1 import TransactionOptions @@ -267,7 +268,7 @@ def _test_commit_with_request_options(self, request_options=None): batch = self._make_one(session) batch.transaction_tag = self.TRANSACTION_TAG batch.insert(TABLE_NAME, COLUMNS, VALUES) - committed = batch.commit(request_options=request_options) + committed = batch.commit(request_options=request_options, max_commit_delay_ms=max_commit_delay_ms) self.assertEqual(committed, now) self.assertEqual(batch.committed, committed) @@ -284,6 +285,7 @@ def _test_commit_with_request_options(self, request_options=None): mutations, single_use_txn, actual_request_options, + max_commit_delay, metadata, ) = api._committed self.assertEqual(session, self.SESSION_NAME) @@ -303,33 +305,44 @@ def _test_commit_with_request_options(self, request_options=None): "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) ) + expected_max_commit_delay = None + if max_commit_delay_ms: + expected_max_commit_delay = datetime.timedelta(milliseconds=max_commit_delay_ms) + self.assertEqual(expected_max_commit_delay, max_commit_delay) + def test_commit_w_request_tag_success(self): request_options = RequestOptions( request_tag="tag-1", ) - self._test_commit_with_request_options(request_options=request_options) + self._test_commit_with_options(request_options=request_options) def test_commit_w_transaction_tag_success(self): request_options = RequestOptions( transaction_tag="tag-1-1", ) - self._test_commit_with_request_options(request_options=request_options) + self._test_commit_with_options(request_options=request_options) def test_commit_w_request_and_transaction_tag_success(self): request_options = RequestOptions( request_tag="tag-1", transaction_tag="tag-1-1", ) - self._test_commit_with_request_options(request_options=request_options) + self._test_commit_with_options(request_options=request_options) def test_commit_w_request_and_transaction_tag_dictionary_success(self): request_options = {"request_tag": "tag-1", "transaction_tag": "tag-1-1"} - self._test_commit_with_request_options(request_options=request_options) + self._test_commit_with_options(request_options=request_options) def test_commit_w_incorrect_tag_dictionary_error(self): request_options = {"incorrect_tag": "tag-1-1"} with self.assertRaises(ValueError): - self._test_commit_with_request_options(request_options=request_options) + self._test_commit_with_options(request_options=request_options) + + def test_commit_w_max_commit_delay(self): + request_options = RequestOptions( + request_tag="tag-1", + ) + self._test_commit_with_options(request_options=request_options, max_commit_delay_ms=100) def test_context_mgr_already_committed(self): import datetime @@ -368,7 +381,7 @@ def test_context_mgr_success(self): self.assertEqual(batch.committed, now) - (session, mutations, single_use_txn, request_options, metadata) = api._committed + (session, mutations, single_use_txn, request_options, _, metadata) = api._committed self.assertEqual(session, self.SESSION_NAME) self.assertEqual(mutations, batch._mutations) self.assertIsInstance(single_use_txn, TransactionOptions) @@ -564,6 +577,12 @@ def commit( metadata=None, ): from google.api_core.exceptions import Unknown + from google.protobuf.duration_pb2 import Duration + from google.cloud.spanner_v1 import CommitRequest + + max_commit_delay = None + if CommitRequest.max_commit_delay in request: + max_commit_delay = request.max_commit_delay assert request.transaction_id == b"" self._committed = ( @@ -571,6 +590,7 @@ def commit( request.mutations, request.single_use_transaction, request.request_options, + max_commit_delay, metadata, ) if self._rpc_error: From f71f86e2f9aec6e1bb21ba669623f4fbca3ff7ea Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 5 Dec 2023 23:45:06 +0000 Subject: [PATCH 09/24] Formatting --- tests/unit/test_batch.py | 30 +++++++++++++++++++++++++----- tests/unit/test_transaction.py | 6 ++++-- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index acaecfe2f6..c8493d75a4 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -233,7 +233,14 @@ def test_commit_ok(self): self.assertEqual(committed, now) self.assertEqual(batch.committed, committed) - (session, mutations, single_use_txn, request_options, max_commit_delay, metadata) = api._committed + ( + session, + mutations, + single_use_txn, + request_options, + max_commit_delay, + metadata, + ) = api._committed self.assertEqual(session, self.SESSION_NAME) self.assertEqual(mutations, batch._mutations) self.assertIsInstance(single_use_txn, TransactionOptions) @@ -268,7 +275,9 @@ def _test_commit_with_options(self, request_options=None, max_commit_delay_ms=No batch = self._make_one(session) batch.transaction_tag = self.TRANSACTION_TAG batch.insert(TABLE_NAME, COLUMNS, VALUES) - committed = batch.commit(request_options=request_options, max_commit_delay_ms=max_commit_delay_ms) + committed = batch.commit( + request_options=request_options, max_commit_delay_ms=max_commit_delay_ms + ) self.assertEqual(committed, now) self.assertEqual(batch.committed, committed) @@ -307,7 +316,9 @@ def _test_commit_with_options(self, request_options=None, max_commit_delay_ms=No expected_max_commit_delay = None if max_commit_delay_ms: - expected_max_commit_delay = datetime.timedelta(milliseconds=max_commit_delay_ms) + expected_max_commit_delay = datetime.timedelta( + milliseconds=max_commit_delay_ms + ) self.assertEqual(expected_max_commit_delay, max_commit_delay) def test_commit_w_request_tag_success(self): @@ -342,7 +353,9 @@ def test_commit_w_max_commit_delay(self): request_options = RequestOptions( request_tag="tag-1", ) - self._test_commit_with_options(request_options=request_options, max_commit_delay_ms=100) + self._test_commit_with_options( + request_options=request_options, max_commit_delay_ms=100 + ) def test_context_mgr_already_committed(self): import datetime @@ -381,7 +394,14 @@ def test_context_mgr_success(self): self.assertEqual(batch.committed, now) - (session, mutations, single_use_txn, request_options, _, metadata) = api._committed + ( + session, + mutations, + single_use_txn, + request_options, + _, + metadata, + ) = api._committed self.assertEqual(session, self.SESSION_NAME) self.assertEqual(mutations, batch._mutations) self.assertIsInstance(single_use_txn, TransactionOptions) diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 8ef5e65cd2..df706fe2f6 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -408,7 +408,9 @@ def _commit_helper( expected_max_commit_delay = None if max_commit_delay_ms: - expected_max_commit_delay = datetime.timedelta(milliseconds=max_commit_delay_ms) + expected_max_commit_delay = datetime.timedelta( + milliseconds=max_commit_delay_ms + ) self.assertEqual(expected_max_commit_delay, max_commit_delay) self.assertEqual(session_id, session.name) @@ -963,7 +965,7 @@ def commit( max_commit_delay = None if CommitRequest.max_commit_delay in request: max_commit_delay = request.max_commit_delay - + self._committed = ( request.session, request.mutations, From 3c7dd7b5c28c9a9ae4b34d7ffc59cfe7eeec3546 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 5 Dec 2023 23:54:01 +0000 Subject: [PATCH 10/24] Cleanup --- tests/unit/test_batch.py | 1 - tests/unit/test_transaction.py | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index c8493d75a4..26e67d501c 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -597,7 +597,6 @@ def commit( metadata=None, ): from google.api_core.exceptions import Unknown - from google.protobuf.duration_pb2 import Duration from google.cloud.spanner_v1 import CommitRequest max_commit_delay = None diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index df706fe2f6..70bc94a524 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -16,7 +16,6 @@ import mock from google.cloud.spanner_v1 import RequestOptions -from google.cloud.spanner_v1 import CommitRequest from google.cloud.spanner_v1 import Type from google.cloud.spanner_v1 import TypeCode from google.api_core.retry import Retry @@ -960,6 +959,8 @@ def commit( request=None, metadata=None, ): + from google.cloud.spanner_v1 import CommitRequest + assert not request.single_use_transaction max_commit_delay = None From e156c8660191bc14292e810dbee020a27d1014e5 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Mon, 22 Jan 2024 22:59:20 +0000 Subject: [PATCH 11/24] Fix merge conflict --- google/cloud/spanner_v1/types/spanner.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/google/cloud/spanner_v1/types/spanner.py b/google/cloud/spanner_v1/types/spanner.py index 9f6a0792f1..2590c212d2 100644 --- a/google/cloud/spanner_v1/types/spanner.py +++ b/google/cloud/spanner_v1/types/spanner.py @@ -1474,10 +1474,6 @@ class CommitRequest(proto.Message): max_commit_delay: duration_pb2.Duration = proto.Field( proto.MESSAGE, number=8, -<<<<<<< HEAD - optional=True, -======= ->>>>>>> main message=duration_pb2.Duration, ) request_options: "RequestOptions" = proto.Field( From ca6e352cda93d429b3179bb0074b8c37370cc9cc Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Mon, 22 Jan 2024 23:59:24 +0000 Subject: [PATCH 12/24] Add optional=True --- google/cloud/spanner_v1/types/spanner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/google/cloud/spanner_v1/types/spanner.py b/google/cloud/spanner_v1/types/spanner.py index 2590c212d2..99c883addc 100644 --- a/google/cloud/spanner_v1/types/spanner.py +++ b/google/cloud/spanner_v1/types/spanner.py @@ -1474,6 +1474,7 @@ class CommitRequest(proto.Message): max_commit_delay: duration_pb2.Duration = proto.Field( proto.MESSAGE, number=8, + optional=True, message=duration_pb2.Duration, ) request_options: "RequestOptions" = proto.Field( From 18d4c0c97bc6ed53c6b28d30187811a719386eea Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 23 Jan 2024 02:41:27 +0000 Subject: [PATCH 13/24] Remove optional=True, try calling HasField. --- google/cloud/spanner_v1/types/spanner.py | 1 - tests/unit/test_transaction.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/google/cloud/spanner_v1/types/spanner.py b/google/cloud/spanner_v1/types/spanner.py index 99c883addc..2590c212d2 100644 --- a/google/cloud/spanner_v1/types/spanner.py +++ b/google/cloud/spanner_v1/types/spanner.py @@ -1474,7 +1474,6 @@ class CommitRequest(proto.Message): max_commit_delay: duration_pb2.Duration = proto.Field( proto.MESSAGE, number=8, - optional=True, message=duration_pb2.Duration, ) request_options: "RequestOptions" = proto.Field( diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 82babc37b4..8dc04d35ce 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -966,7 +966,7 @@ def commit( assert not request.single_use_transaction max_commit_delay = None - if CommitRequest.max_commit_delay in request: + if request.HasField("max_commit_delay"): max_commit_delay = request.max_commit_delay self._committed = ( From 1a444759b47e0fef8abf85d8859e7a237f0bea23 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 23 Jan 2024 21:11:37 +0000 Subject: [PATCH 14/24] Update HasField to be called on the protobuf. --- tests/unit/test_transaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 8dc04d35ce..8b430fe1eb 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -966,7 +966,7 @@ def commit( assert not request.single_use_transaction max_commit_delay = None - if request.HasField("max_commit_delay"): + if type(request).pb(request).HasField("max_commit_delay"): max_commit_delay = request.max_commit_delay self._committed = ( From 5e2476ef6b627b0dd90a3afafad218563f331473 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 23 Jan 2024 21:43:04 +0000 Subject: [PATCH 15/24] Update to timedelta.duration instead of an int. --- google/cloud/spanner_v1/batch.py | 10 +++------- google/cloud/spanner_v1/transaction.py | 10 +++------- tests/unit/test_batch.py | 15 +++++---------- tests/unit/test_transaction.py | 16 +++++----------- 4 files changed, 16 insertions(+), 35 deletions(-) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index e44ed28459..21cb3214c0 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -148,7 +148,7 @@ def _check_state(self): raise ValueError("Batch already committed") def commit( - self, return_commit_stats=False, request_options=None, max_commit_delay_ms=None + self, return_commit_stats=False, request_options=None, max_commit_delay=None ): """Commit mutations to the database. @@ -162,7 +162,8 @@ def commit( (Optional) Common options for this request. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. - :param max_commit_delay_ms: + :type max_commit_delay: :class:`datetime.timedelta` + :param max_commit_delay: (Optional) The amount of latency this request is willing to incur in order to improve throughput. @@ -189,11 +190,6 @@ def commit( # Request tags are not supported for commit requests. request_options.request_tag = None - max_commit_delay = None - if max_commit_delay_ms: - max_commit_delay = Duration() - max_commit_delay.FromMilliseconds(max_commit_delay_ms) - request = CommitRequest( session=self._session.name, mutations=self._mutations, diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index a3abba6b60..a15bfbd611 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -182,7 +182,7 @@ def rollback(self): del self._session._transaction def commit( - self, return_commit_stats=False, request_options=None, max_commit_delay_ms=None + self, return_commit_stats=False, request_options=None, max_commit_delay=None ): """Commit mutations to the database. @@ -196,7 +196,8 @@ def commit( (Optional) Common options for this request. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. - :param max_commit_delay_ms: + :type max_commit_delay: :class:`datetime.timedelta` + :param max_commit_delay: (Optional) The amount of latency this request is willing to incur in order to improve throughput. :class:`~google.cloud.spanner_v1.types.MaxCommitDelay`. @@ -230,11 +231,6 @@ def commit( # Request tags are not supported for commit requests. request_options.request_tag = None - max_commit_delay = None - if max_commit_delay_ms: - max_commit_delay = Duration() - max_commit_delay.FromMilliseconds(millis=max_commit_delay_ms) - request = CommitRequest( session=self._session.name, mutations=self._mutations, diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index 26e67d501c..b33b9c7993 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -259,7 +259,7 @@ def test_commit_ok(self): "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) ) - def _test_commit_with_options(self, request_options=None, max_commit_delay_ms=None): + def _test_commit_with_options(self, request_options=None, max_commit_delay_in=None): import datetime from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1 import TransactionOptions @@ -276,7 +276,7 @@ def _test_commit_with_options(self, request_options=None, max_commit_delay_ms=No batch.transaction_tag = self.TRANSACTION_TAG batch.insert(TABLE_NAME, COLUMNS, VALUES) committed = batch.commit( - request_options=request_options, max_commit_delay_ms=max_commit_delay_ms + request_options=request_options, max_commit_delay=max_commit_delay_in ) self.assertEqual(committed, now) @@ -314,12 +314,7 @@ def _test_commit_with_options(self, request_options=None, max_commit_delay_ms=No "CloudSpanner.Commit", attributes=dict(BASE_ATTRIBUTES, num_mutations=1) ) - expected_max_commit_delay = None - if max_commit_delay_ms: - expected_max_commit_delay = datetime.timedelta( - milliseconds=max_commit_delay_ms - ) - self.assertEqual(expected_max_commit_delay, max_commit_delay) + self.assertEqual(max_commit_delay_in, max_commit_delay) def test_commit_w_request_tag_success(self): request_options = RequestOptions( @@ -354,7 +349,7 @@ def test_commit_w_max_commit_delay(self): request_tag="tag-1", ) self._test_commit_with_options( - request_options=request_options, max_commit_delay_ms=100 + request_options=request_options, max_commit_delay=datetime.timedelta(milliseconds=100) ) def test_context_mgr_already_committed(self): @@ -600,7 +595,7 @@ def commit( from google.cloud.spanner_v1 import CommitRequest max_commit_delay = None - if CommitRequest.max_commit_delay in request: + if type(request).pb(request).HasField("max_commit_delay"): max_commit_delay = request.max_commit_delay assert request.transaction_id == b"" diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 8b430fe1eb..01e1d29cde 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -13,6 +13,7 @@ # limitations under the License. +import datetime import mock from google.cloud.spanner_v1 import RequestOptions @@ -350,9 +351,8 @@ def _commit_helper( mutate=True, return_commit_stats=False, request_options=None, - max_commit_delay_ms=None, + max_commit_delay_in=None, ): - import datetime from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1.keyset import KeySet from google.cloud._helpers import UTC @@ -377,7 +377,7 @@ def _commit_helper( transaction.commit( return_commit_stats=return_commit_stats, request_options=request_options, - max_commit_delay_ms=max_commit_delay_ms, + max_commit_delay=max_commit_delay_in, ) self.assertEqual(transaction.committed, now) @@ -405,13 +405,7 @@ def _commit_helper( expected_request_options.transaction_tag = self.TRANSACTION_TAG expected_request_options.request_tag = None - expected_max_commit_delay = None - if max_commit_delay_ms: - expected_max_commit_delay = datetime.timedelta( - milliseconds=max_commit_delay_ms - ) - - self.assertEqual(expected_max_commit_delay, max_commit_delay) + self.assertEqual(max_commit_delay_in, max_commit_delay) self.assertEqual(session_id, session.name) self.assertEqual(txn_id, self.TRANSACTION_ID) self.assertEqual(mutations, transaction._mutations) @@ -445,7 +439,7 @@ def test_commit_w_return_commit_stats(self): self._commit_helper(return_commit_stats=True) def test_commit_w_max_commit_delay(self): - self._commit_helper(max_commit_delay_ms=100) + self._commit_helper(max_commit_delay_in=datetime.timedelta(milliseconds=100)) def test_commit_w_request_tag_success(self): request_options = RequestOptions( From 5a39a2d77cfcd49bd1a361ebdb018b27180e9489 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 23 Jan 2024 21:50:50 +0000 Subject: [PATCH 16/24] Cleanup --- tests/unit/test_batch.py | 4 +++- tests/unit/test_transaction.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index b33b9c7993..acfc785fa5 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -345,11 +345,13 @@ def test_commit_w_incorrect_tag_dictionary_error(self): self._test_commit_with_options(request_options=request_options) def test_commit_w_max_commit_delay(self): + import datetime + request_options = RequestOptions( request_tag="tag-1", ) self._test_commit_with_options( - request_options=request_options, max_commit_delay=datetime.timedelta(milliseconds=100) + request_options=request_options, max_commit_delay_in=datetime.timedelta(milliseconds=100) ) def test_context_mgr_already_committed(self): diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 01e1d29cde..29ce3d430f 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -13,7 +13,6 @@ # limitations under the License. -import datetime import mock from google.cloud.spanner_v1 import RequestOptions @@ -353,6 +352,8 @@ def _commit_helper( request_options=None, max_commit_delay_in=None, ): + import datetime + from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1.keyset import KeySet from google.cloud._helpers import UTC @@ -439,6 +440,7 @@ def test_commit_w_return_commit_stats(self): self._commit_helper(return_commit_stats=True) def test_commit_w_max_commit_delay(self): + import datetime self._commit_helper(max_commit_delay_in=datetime.timedelta(milliseconds=100)) def test_commit_w_request_tag_success(self): From 04b73dd916a5ec8ff9243502894ca64c6f9587e3 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Wed, 24 Jan 2024 19:48:22 +0000 Subject: [PATCH 17/24] Changes from Sri to pipe value to top-level funcitons and to add integration tests. Thanks Sri --- google/cloud/spanner_v1/database.py | 16 ++++++++++--- google/cloud/spanner_v1/session.py | 4 ++++ tests/system/test_database_api.py | 36 +++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index c8c3b92edc..a0c35caa20 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -720,7 +720,7 @@ def snapshot(self, **kw): """ return SnapshotCheckout(self, **kw) - def batch(self, request_options=None): + def batch(self, request_options=None, max_commit_delay=None): """Return an object which wraps a batch. The wrapper *must* be used as a context manager, with the batch @@ -732,11 +732,15 @@ def batch(self, request_options=None): (Optional) Common options for the commit request. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. + :type max_commit_delay: :class:`datetime.timedelta` + :param max_commit_delay: + (Optional) The amount of latency this request is willing to incur + in order to improve throughput. :rtype: :class:`~google.cloud.spanner_v1.database.BatchCheckout` :returns: new wrapper """ - return BatchCheckout(self, request_options) + return BatchCheckout(self, request_options, max_commit_delay) def mutation_groups(self): """Return an object which wraps a mutation_group. @@ -1034,9 +1038,13 @@ class BatchCheckout(object): (Optional) Common options for the commit request. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. + :type max_commit_delay: :class:`datetime.timedelta` + :param max_commit_delay: + (Optional) The amount of latency this request is willing to incur + in order to improve throughput. """ - def __init__(self, database, request_options=None): + def __init__(self, database, request_options=None, max_commit_delay=None): self._database = database self._session = self._batch = None if request_options is None: @@ -1045,6 +1053,7 @@ def __init__(self, database, request_options=None): self._request_options = RequestOptions(request_options) else: self._request_options = request_options + self._max_commit_delay = max_commit_delay def __enter__(self): """Begin ``with`` block.""" @@ -1061,6 +1070,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): self._batch.commit( return_commit_stats=self._database.log_commit_stats, request_options=self._request_options, + max_commit_delay=self._max_commit_delay, ) finally: if self._database.log_commit_stats and self._batch.commit_stats: diff --git a/google/cloud/spanner_v1/session.py b/google/cloud/spanner_v1/session.py index b25af53805..d0a44f6856 100644 --- a/google/cloud/spanner_v1/session.py +++ b/google/cloud/spanner_v1/session.py @@ -363,6 +363,8 @@ def run_in_transaction(self, func, *args, **kw): to continue retrying the transaction. "commit_request_options" will be removed and used to set the request options for the commit request. + "max_commit_delay" will be removed and used to set the max commit delay for the request. + "transaction_tag" will be removed and used to set the transaction tag for the request. :rtype: Any :returns: The return value of ``func``. @@ -372,6 +374,7 @@ def run_in_transaction(self, func, *args, **kw): """ deadline = time.time() + kw.pop("timeout_secs", DEFAULT_RETRY_TIMEOUT_SECS) commit_request_options = kw.pop("commit_request_options", None) + max_commit_delay = kw.pop("max_commit_delay", None) transaction_tag = kw.pop("transaction_tag", None) attempts = 0 @@ -400,6 +403,7 @@ def run_in_transaction(self, func, *args, **kw): txn.commit( return_commit_stats=self._database.log_commit_stats, request_options=commit_request_options, + max_commit_delay=max_commit_delay, ) except Aborted as exc: del self._transaction diff --git a/tests/system/test_database_api.py b/tests/system/test_database_api.py index 052e628188..3a0eb3f3cc 100644 --- a/tests/system/test_database_api.py +++ b/tests/system/test_database_api.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import datetime import time import uuid @@ -819,3 +820,38 @@ def _transaction_read(transaction): with pytest.raises(exceptions.InvalidArgument): shared_database.run_in_transaction(_transaction_read) + + +def test_db_batch_insert_w_max_commit_delay(shared_database): + _helpers.retry_has_all_dll(shared_database.reload)() + sd = _sample_data + + with shared_database.batch(max_commit_delay=datetime.timedelta(milliseconds=100)) as batch: + batch.delete(sd.TABLE, sd.ALL) + batch.insert(sd.TABLE, sd.COLUMNS, sd.ROW_DATA) + + with shared_database.snapshot(read_timestamp=batch.committed) as snapshot: + from_snap = list(snapshot.read(sd.TABLE, sd.COLUMNS, sd.ALL)) + + sd._check_rows_data(from_snap) + + +def test_db_run_in_transaction_w_max_commit_delay(shared_database): + _helpers.retry_has_all_dll(shared_database.reload)() + sd = _sample_data + + with shared_database.batch() as batch: + batch.delete(sd.TABLE, sd.ALL) + + def _unit_of_work(transaction, test): + rows = list(transaction.read(test.TABLE, test.COLUMNS, sd.ALL)) + assert rows == [] + + transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA) + + shared_database.run_in_transaction(_unit_of_work, test=sd, max_commit_delay=datetime.timedelta(milliseconds=100)) + + with shared_database.snapshot() as after: + rows = list(after.execute_sql(sd.SQL)) + + sd._check_rows_data(rows) From 3ca55e1dc04f8ecde7e013cf1f8aef26e4ea8af7 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Wed, 24 Jan 2024 19:51:41 +0000 Subject: [PATCH 18/24] Run nox -s blacken --- tests/system/test_database_api.py | 8 ++++++-- tests/unit/test_batch.py | 3 ++- tests/unit/test_transaction.py | 1 + 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/tests/system/test_database_api.py b/tests/system/test_database_api.py index 3a0eb3f3cc..fbaee7476d 100644 --- a/tests/system/test_database_api.py +++ b/tests/system/test_database_api.py @@ -826,7 +826,9 @@ def test_db_batch_insert_w_max_commit_delay(shared_database): _helpers.retry_has_all_dll(shared_database.reload)() sd = _sample_data - with shared_database.batch(max_commit_delay=datetime.timedelta(milliseconds=100)) as batch: + with shared_database.batch( + max_commit_delay=datetime.timedelta(milliseconds=100) + ) as batch: batch.delete(sd.TABLE, sd.ALL) batch.insert(sd.TABLE, sd.COLUMNS, sd.ROW_DATA) @@ -849,7 +851,9 @@ def _unit_of_work(transaction, test): transaction.insert_or_update(test.TABLE, test.COLUMNS, test.ROW_DATA) - shared_database.run_in_transaction(_unit_of_work, test=sd, max_commit_delay=datetime.timedelta(milliseconds=100)) + shared_database.run_in_transaction( + _unit_of_work, test=sd, max_commit_delay=datetime.timedelta(milliseconds=100) + ) with shared_database.snapshot() as after: rows = list(after.execute_sql(sd.SQL)) diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index acfc785fa5..64bbd449df 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -351,7 +351,8 @@ def test_commit_w_max_commit_delay(self): request_tag="tag-1", ) self._test_commit_with_options( - request_options=request_options, max_commit_delay_in=datetime.timedelta(milliseconds=100) + request_options=request_options, + max_commit_delay_in=datetime.timedelta(milliseconds=100), ) def test_context_mgr_already_committed(self): diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 29ce3d430f..5291d6da55 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -441,6 +441,7 @@ def test_commit_w_return_commit_stats(self): def test_commit_w_max_commit_delay(self): import datetime + self._commit_helper(max_commit_delay_in=datetime.timedelta(milliseconds=100)) def test_commit_w_request_tag_success(self): From 2a44ed933a6fd56f1bc58b7d4a55d53d28ad913a Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Sun, 28 Jan 2024 06:23:31 +0000 Subject: [PATCH 19/24] feat(spanner): remove unused imports and add line --- google/cloud/spanner_v1/batch.py | 1 - google/cloud/spanner_v1/database.py | 5 ++++- google/cloud/spanner_v1/transaction.py | 2 +- tests/unit/test_batch.py | 1 - tests/unit/test_transaction.py | 3 --- 5 files changed, 5 insertions(+), 7 deletions(-) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 21cb3214c0..2a9a5024c5 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -31,7 +31,6 @@ from google.cloud.spanner_v1._helpers import _retry from google.cloud.spanner_v1._helpers import _check_rst_stream_error from google.api_core.exceptions import InternalServerError -from google.protobuf.duration_pb2 import Duration class _BatchBase(_SessionWrapper): diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 9a1e852278..27f21640c4 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -800,9 +800,11 @@ def run_in_transaction(self, func, *args, **kw): :type kw: dict :param kw: (Optional) keyword arguments to be passed to ``func``. - If passed, "timeout_secs" will be removed and used to + If passed, + "timeout_secs" will be removed and used to override the default retry timeout which defines maximum timestamp to continue retrying the transaction. + "max_commit_delay" will be removed and used to set the max_commit_delay for the request. :rtype: Any :returns: The return value of ``func``. @@ -1039,6 +1041,7 @@ class BatchCheckout(object): (Optional) Common options for the commit request. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. + :type max_commit_delay: :class:`datetime.timedelta` :param max_commit_delay: (Optional) The amount of latency this request is willing to incur diff --git a/google/cloud/spanner_v1/transaction.py b/google/cloud/spanner_v1/transaction.py index a15bfbd611..3c950401ac 100644 --- a/google/cloud/spanner_v1/transaction.py +++ b/google/cloud/spanner_v1/transaction.py @@ -36,7 +36,6 @@ from google.cloud.spanner_v1 import RequestOptions from google.api_core import gapic_v1 from google.api_core.exceptions import InternalServerError -from google.protobuf.duration_pb2 import Duration class Transaction(_SnapshotBase, _BatchBase): @@ -196,6 +195,7 @@ def commit( (Optional) Common options for this request. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. + :type max_commit_delay: :class:`datetime.timedelta` :param max_commit_delay: (Optional) The amount of latency this request is willing to incur diff --git a/tests/unit/test_batch.py b/tests/unit/test_batch.py index 64bbd449df..1c02e93f1d 100644 --- a/tests/unit/test_batch.py +++ b/tests/unit/test_batch.py @@ -595,7 +595,6 @@ def commit( metadata=None, ): from google.api_core.exceptions import Unknown - from google.cloud.spanner_v1 import CommitRequest max_commit_delay = None if type(request).pb(request).HasField("max_commit_delay"): diff --git a/tests/unit/test_transaction.py b/tests/unit/test_transaction.py index 5291d6da55..d391fe4c13 100644 --- a/tests/unit/test_transaction.py +++ b/tests/unit/test_transaction.py @@ -357,7 +357,6 @@ def _commit_helper( from google.cloud.spanner_v1 import CommitResponse from google.cloud.spanner_v1.keyset import KeySet from google.cloud._helpers import UTC - from google.protobuf.duration_pb2 import Duration now = datetime.datetime.utcnow().replace(tzinfo=UTC) keys = [[0], [1], [2]] @@ -958,8 +957,6 @@ def commit( request=None, metadata=None, ): - from google.cloud.spanner_v1 import CommitRequest - assert not request.single_use_transaction max_commit_delay = None From 8d8c6aebb58e0bcc2e73574026ee9cb1c5cc416a Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Sun, 28 Jan 2024 06:27:03 +0000 Subject: [PATCH 20/24] feat(spanner): add empty line in python docs --- google/cloud/spanner_v1/batch.py | 1 + google/cloud/spanner_v1/database.py | 1 + 2 files changed, 2 insertions(+) diff --git a/google/cloud/spanner_v1/batch.py b/google/cloud/spanner_v1/batch.py index 2a9a5024c5..9cb2afbc2c 100644 --- a/google/cloud/spanner_v1/batch.py +++ b/google/cloud/spanner_v1/batch.py @@ -161,6 +161,7 @@ def commit( (Optional) Common options for this request. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. + :type max_commit_delay: :class:`datetime.timedelta` :param max_commit_delay: (Optional) The amount of latency this request is willing to incur diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 27f21640c4..7555def2dd 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -733,6 +733,7 @@ def batch(self, request_options=None, max_commit_delay=None): (Optional) Common options for the commit request. If a dict is provided, it must be of the same form as the protobuf message :class:`~google.cloud.spanner_v1.types.RequestOptions`. + :type max_commit_delay: :class:`datetime.timedelta` :param max_commit_delay: (Optional) The amount of latency this request is willing to incur From 814c69bc3fea4219c4d66110116efca17796640a Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 30 Jan 2024 05:50:38 +0000 Subject: [PATCH 21/24] Update comment with valid values. --- google/cloud/spanner_v1/database.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index 7555def2dd..a9c77cf160 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -737,7 +737,7 @@ def batch(self, request_options=None, max_commit_delay=None): :type max_commit_delay: :class:`datetime.timedelta` :param max_commit_delay: (Optional) The amount of latency this request is willing to incur - in order to improve throughput. + in order to improve throughput. Value must be between 0ms and 500ms. :rtype: :class:`~google.cloud.spanner_v1.database.BatchCheckout` :returns: new wrapper From 3bc03988481c01ed77252356921a6d83608a1409 Mon Sep 17 00:00:00 2001 From: Navarre Ginsberg Date: Tue, 30 Jan 2024 05:55:38 +0000 Subject: [PATCH 22/24] Update comment with valid values. --- google/cloud/spanner_v1/database.py | 7 +++++-- noxfile.py | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index a9c77cf160..b69b507287 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -737,7 +737,8 @@ def batch(self, request_options=None, max_commit_delay=None): :type max_commit_delay: :class:`datetime.timedelta` :param max_commit_delay: (Optional) The amount of latency this request is willing to incur - in order to improve throughput. Value must be between 0ms and 500ms. + in order to improve throughput. Value must be between 0ms and + 500ms. :rtype: :class:`~google.cloud.spanner_v1.database.BatchCheckout` :returns: new wrapper @@ -805,7 +806,9 @@ def run_in_transaction(self, func, *args, **kw): "timeout_secs" will be removed and used to override the default retry timeout which defines maximum timestamp to continue retrying the transaction. - "max_commit_delay" will be removed and used to set the max_commit_delay for the request. + "max_commit_delay" will be removed and used to set the + max_commit_delay for the request. Value must be between + 0ms and 500ms. :rtype: Any :returns: The return value of ``func``. diff --git a/noxfile.py b/noxfile.py index 9b71c55a7a..5e33ec1717 100644 --- a/noxfile.py +++ b/noxfile.py @@ -32,7 +32,7 @@ ISORT_VERSION = "isort==5.11.0" LINT_PATHS = ["docs", "google", "tests", "noxfile.py", "setup.py"] -DEFAULT_PYTHON_VERSION = "3.8" +DEFAULT_PYTHON_VERSION = "3.11" UNIT_TEST_PYTHON_VERSIONS: List[str] = ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] UNIT_TEST_STANDARD_DEPENDENCIES = [ From 6093d08d40ffe518eb23c493a1d9f7f3a16f6ab9 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Tue, 30 Jan 2024 06:28:23 +0000 Subject: [PATCH 23/24] feat(spanner): fix lint --- google/cloud/spanner_v1/database.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/spanner_v1/database.py b/google/cloud/spanner_v1/database.py index b69b507287..b23db95284 100644 --- a/google/cloud/spanner_v1/database.py +++ b/google/cloud/spanner_v1/database.py @@ -807,8 +807,8 @@ def run_in_transaction(self, func, *args, **kw): override the default retry timeout which defines maximum timestamp to continue retrying the transaction. "max_commit_delay" will be removed and used to set the - max_commit_delay for the request. Value must be between - 0ms and 500ms. + max_commit_delay for the request. Value must be between + 0ms and 500ms. :rtype: Any :returns: The return value of ``func``. From ac46411b1332669a7958bce25324763e15f1bf80 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Fri, 2 Feb 2024 05:39:08 +0000 Subject: [PATCH 24/24] feat(spanner): rever nox file changes --- noxfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noxfile.py b/noxfile.py index 5e33ec1717..9b71c55a7a 100644 --- a/noxfile.py +++ b/noxfile.py @@ -32,7 +32,7 @@ ISORT_VERSION = "isort==5.11.0" LINT_PATHS = ["docs", "google", "tests", "noxfile.py", "setup.py"] -DEFAULT_PYTHON_VERSION = "3.11" +DEFAULT_PYTHON_VERSION = "3.8" UNIT_TEST_PYTHON_VERSIONS: List[str] = ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] UNIT_TEST_STANDARD_DEPENDENCIES = [