From b4b809d616cf93881815d6baadf2dd322ab566d1 Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo <63511057+acocuzzo@users.noreply.github.com> Date: Thu, 22 Sep 2022 12:43:11 -0400 Subject: [PATCH] fix: remove expired ack_ids (#787) --- .../_protocol/streaming_pull_manager.py | 47 ++++++++++++------- .../subscriber/test_streaming_pull_manager.py | 46 ++++++++++++++---- 2 files changed, 66 insertions(+), 27 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 932699261..21c1bab7b 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -988,7 +988,9 @@ def _get_initial_request( # Return the initial request. return request - def _send_lease_modacks(self, ack_ids: Iterable[str], ack_deadline: float): + def _send_lease_modacks( + self, ack_ids: Iterable[str], ack_deadline: float + ) -> List[str]: exactly_once_enabled = False with self._exactly_once_enabled_lock: exactly_once_enabled = self._exactly_once_enabled @@ -1002,15 +1004,19 @@ def _send_lease_modacks(self, ack_ids: Iterable[str], ack_deadline: float): assert self._dispatcher is not None self._dispatcher.modify_ack_deadline(items) + expired_ack_ids = [] for req in items: try: assert req.future is not None req.future.result() - except AcknowledgeError: + except AcknowledgeError as ack_error: _LOGGER.warning( "AcknowledgeError when lease-modacking a message.", exc_info=True, ) + if ack_error.error_code == AcknowledgeStatus.INVALID_ACK_ID: + expired_ack_ids.append(req.ack_id) + return expired_ack_ids else: items = [ requests.ModAckRequest(ack_id, self.ack_deadline, None) @@ -1018,6 +1024,7 @@ def _send_lease_modacks(self, ack_ids: Iterable[str], ack_deadline: float): ] assert self._dispatcher is not None self._dispatcher.modify_ack_deadline(items) + return [] def _exactly_once_delivery_enabled(self) -> bool: """Whether exactly-once delivery is enabled for the subscription.""" @@ -1071,28 +1078,32 @@ def _on_response(self, response: gapic_types.StreamingPullResponse) -> None: # modack the messages we received, as this tells the server that we've # received them. ack_id_gen = (message.ack_id for message in received_messages) - self._send_lease_modacks(ack_id_gen, self.ack_deadline) + expired_ack_ids = set(self._send_lease_modacks(ack_id_gen, self.ack_deadline)) with self._pause_resume_lock: assert self._scheduler is not None assert self._leaser is not None for received_message in received_messages: - message = google.cloud.pubsub_v1.subscriber.message.Message( - received_message.message, - received_message.ack_id, - received_message.delivery_attempt, - self._scheduler.queue, - self._exactly_once_delivery_enabled, - ) - self._messages_on_hold.put(message) - self._on_hold_bytes += message.size - req = requests.LeaseRequest( - ack_id=message.ack_id, - byte_size=message.size, - ordering_key=message.ordering_key, - ) - self._leaser.add([req]) + if ( + not self._exactly_once_delivery_enabled() + or received_message.ack_id not in expired_ack_ids + ): + message = google.cloud.pubsub_v1.subscriber.message.Message( + received_message.message, + received_message.ack_id, + received_message.delivery_attempt, + self._scheduler.queue, + self._exactly_once_delivery_enabled, + ) + self._messages_on_hold.put(message) + self._on_hold_bytes += message.size + req = requests.LeaseRequest( + ack_id=message.ack_id, + byte_size=message.size, + ordering_key=message.ordering_key, + ) + self._leaser.add([req]) self._maybe_release_messages() diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 459ab6d67..b4f76f20b 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -1125,6 +1125,7 @@ def test_heartbeat_stream_ack_deadline_seconds(caplog): "google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True ) def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc): + manager = make_manager() with mock.patch.object( @@ -1852,11 +1853,18 @@ def test__on_response_exactly_once_immediate_modacks_fail(): def complete_futures_with_error(*args, **kwargs): modack_requests = args[0] for req in modack_requests: - req.future.set_exception( - subscriber_exceptions.AcknowledgeError( - subscriber_exceptions.AcknowledgeStatus.SUCCESS, None + if req.ack_id == "fack": + req.future.set_exception( + subscriber_exceptions.AcknowledgeError( + subscriber_exceptions.AcknowledgeStatus.INVALID_ACK_ID, None + ) + ) + else: + req.future.set_exception( + subscriber_exceptions.AcknowledgeError( + subscriber_exceptions.AcknowledgeStatus.SUCCESS, None + ) ) - ) dispatcher.modify_ack_deadline.side_effect = complete_futures_with_error @@ -1866,19 +1874,39 @@ def complete_futures_with_error(*args, **kwargs): gapic_types.ReceivedMessage( ack_id="fack", message=gapic_types.PubsubMessage(data=b"foo", message_id="1"), - ) + ), + gapic_types.ReceivedMessage( + ack_id="good", + message=gapic_types.PubsubMessage(data=b"foo", message_id="2"), + ), ], subscription_properties=gapic_types.StreamingPullResponse.SubscriptionProperties( exactly_once_delivery_enabled=True ), ) - # adjust message bookkeeping in leaser - fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=42) + # Actually run the method and prove that modack and schedule are called in + # the expected way. + + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=10) - # exactly_once should be enabled manager._on_response(response) - # exceptions are logged, but otherwise no effect + + # The second messages should be scheduled, and not the first. + + schedule_calls = scheduler.schedule.mock_calls + assert len(schedule_calls) == 1 + call_args = schedule_calls[0][1] + assert call_args[0] == mock.sentinel.callback + assert isinstance(call_args[1], message.Message) + assert call_args[1].message_id == "2" + + assert manager._messages_on_hold.size == 0 + # No messages available + assert manager._messages_on_hold.get() is None + + # do not add message + assert manager.load == 0.001 def test__should_recover_true():