From 1bbe4d5fb1909994d99dcaf776662be5218677b4 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Tue, 8 Mar 2022 15:58:45 -0500 Subject: [PATCH 1/2] Return singleton success future for exactly-once methods in Message --- google/cloud/pubsub_v1/subscriber/message.py | 21 +++++++++++-------- .../unit/pubsub_v1/subscriber/test_message.py | 3 +++ 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/message.py b/google/cloud/pubsub_v1/subscriber/message.py index 5744aa71c..b364a06be 100644 --- a/google/cloud/pubsub_v1/subscriber/message.py +++ b/google/cloud/pubsub_v1/subscriber/message.py @@ -40,6 +40,9 @@ attributes: {} }}""" +_SUCCESS_FUTURE = futures.Future() +_SUCCESS_FUTURE.set_result(AcknowledgeStatus.SUCCESS) + def _indent(lines: str, prefix: str = " ") -> str: """Indent some text. @@ -291,12 +294,12 @@ def ack_with_response(self) -> "futures.Future": pubsub_v1.subscriber.exceptions.AcknowledgeError exception will be thrown. """ - future = futures.Future() - req_future = None if self._exactly_once_delivery_enabled_func(): + future = futures.Future() req_future = future else: - future.set_result(AcknowledgeStatus.SUCCESS) + future = _SUCCESS_FUTURE + req_future = None time_to_ack = math.ceil(time.time() - self._received_timestamp) self._request_queue.put( requests.AckRequest( @@ -390,12 +393,12 @@ def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future": will be thrown. """ - future = futures.Future() - req_future = None if self._exactly_once_delivery_enabled_func(): + future = futures.Future() req_future = future else: - future.set_result(AcknowledgeStatus.SUCCESS) + future = _SUCCESS_FUTURE + req_future = None self._request_queue.put( requests.ModAckRequest( @@ -451,12 +454,12 @@ def nack_with_response(self) -> "futures.Future": will be thrown. """ - future = futures.Future() - req_future = None if self._exactly_once_delivery_enabled_func(): + future = futures.Future() req_future = future else: - future.set_result(AcknowledgeStatus.SUCCESS) + future = _SUCCESS_FUTURE + req_future = None self._request_queue.put( requests.NackRequest( diff --git a/tests/unit/pubsub_v1/subscriber/test_message.py b/tests/unit/pubsub_v1/subscriber/test_message.py index f5c7bf3c7..0debabaf3 100644 --- a/tests/unit/pubsub_v1/subscriber/test_message.py +++ b/tests/unit/pubsub_v1/subscriber/test_message.py @@ -156,6 +156,7 @@ def test_ack_with_response_exactly_once_delivery_disabled(): ) ) assert future.result() == AcknowledgeStatus.SUCCESS + assert future == message._SUCCESS_FUTURE check_call_types(put, requests.AckRequest) @@ -205,6 +206,7 @@ def test_modify_ack_deadline_with_response_exactly_once_delivery_disabled(): requests.ModAckRequest(ack_id="bogus_ack_id", seconds=60, future=None) ) assert future.result() == AcknowledgeStatus.SUCCESS + assert future == message._SUCCESS_FUTURE check_call_types(put, requests.ModAckRequest) @@ -242,6 +244,7 @@ def test_nack_with_response_exactly_once_delivery_disabled(): ) ) assert future.result() == AcknowledgeStatus.SUCCESS + assert future == message._SUCCESS_FUTURE check_call_types(put, requests.NackRequest) From 3bad24b62fac2aa87aaa75ceda1942023f8cd6d3 Mon Sep 17 00:00:00 2001 From: Prad Nelluru Date: Tue, 8 Mar 2022 17:34:17 -0500 Subject: [PATCH 2/2] Add type annotations to satisfy the type checker. --- google/cloud/pubsub_v1/subscriber/message.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/google/cloud/pubsub_v1/subscriber/message.py b/google/cloud/pubsub_v1/subscriber/message.py index b364a06be..ab17bab78 100644 --- a/google/cloud/pubsub_v1/subscriber/message.py +++ b/google/cloud/pubsub_v1/subscriber/message.py @@ -294,6 +294,7 @@ def ack_with_response(self) -> "futures.Future": pubsub_v1.subscriber.exceptions.AcknowledgeError exception will be thrown. """ + req_future: Optional[futures.Future] if self._exactly_once_delivery_enabled_func(): future = futures.Future() req_future = future @@ -393,6 +394,7 @@ def modify_ack_deadline_with_response(self, seconds: int) -> "futures.Future": will be thrown. """ + req_future: Optional[futures.Future] if self._exactly_once_delivery_enabled_func(): future = futures.Future() req_future = future @@ -454,6 +456,7 @@ def nack_with_response(self) -> "futures.Future": will be thrown. """ + req_future: Optional[futures.Future] if self._exactly_once_delivery_enabled_func(): future = futures.Future() req_future = future