diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py index bcb73352b537..8a683e4e772d 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py @@ -36,9 +36,15 @@ class Leaser(object): def __init__(self, manager): self._thread = None - self._operational_lock = threading.Lock() self._manager = manager + # a lock used for start/stop operations, protecting the _thread attribute + self._operational_lock = threading.Lock() + + # A lock ensuring that add/remove operations are atomic and cannot be + # intertwined. Protects the _leased_messages and _bytes attributes. + self._add_remove_lock = threading.Lock() + self._leased_messages = {} """dict[str, float]: A mapping of ack IDs to the local time when the ack ID was initially leased in seconds since the epoch.""" @@ -64,30 +70,32 @@ def bytes(self): def add(self, items): """Add messages to be managed by the leaser.""" - for item in items: - # Add the ack ID to the set of managed ack IDs, and increment - # the size counter. - if item.ack_id not in self._leased_messages: - self._leased_messages[item.ack_id] = _LeasedMessage( - added_time=time.time(), size=item.byte_size - ) - self._bytes += item.byte_size - else: - _LOGGER.debug("Message %s is already lease managed", item.ack_id) + with self._add_remove_lock: + for item in items: + # Add the ack ID to the set of managed ack IDs, and increment + # the size counter. + if item.ack_id not in self._leased_messages: + self._leased_messages[item.ack_id] = _LeasedMessage( + added_time=time.time(), size=item.byte_size + ) + self._bytes += item.byte_size + else: + _LOGGER.debug("Message %s is already lease managed", item.ack_id) def remove(self, items): """Remove messages from lease management.""" - # Remove the ack ID from lease management, and decrement the - # byte counter. - for item in items: - if self._leased_messages.pop(item.ack_id, None) is not None: - self._bytes -= item.byte_size - else: - _LOGGER.debug("Item %s was not managed.", item.ack_id) - - if self._bytes < 0: - _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes) - self._bytes = 0 + with self._add_remove_lock: + # Remove the ack ID from lease management, and decrement the + # byte counter. + for item in items: + if self._leased_messages.pop(item.ack_id, None) is not None: + self._bytes -= item.byte_size + else: + _LOGGER.debug("Item %s was not managed.", item.ack_id) + + if self._bytes < 0: + _LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes) + self._bytes = 0 def maintain_leases(self): """Maintain all of the leases being managed. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 650e2f661915..74008bc94fcb 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -21,6 +21,7 @@ import grpc import six +from six.moves import queue from google.api_core import bidi from google.api_core import exceptions @@ -116,6 +117,16 @@ def __init__( else: self._scheduler = scheduler + # A FIFO queue for the messages that have been received from the server, + # but not yet added to the lease management (and not sent to user callback), + # because the FlowControl limits have been hit. + self._messages_on_hold = queue.Queue() + + # A lock ensuring that pausing / resuming the consumer are both atomic + # operations that cannot be executed concurrently. Needed for properly + # syncing these operations with the current leaser load. + self._pause_resume_lock = threading.Lock() + # The threads created in ``.open()``. self._dispatcher = None self._leaser = None @@ -211,26 +222,72 @@ def add_close_callback(self, callback): def maybe_pause_consumer(self): """Check the current load and pause the consumer if needed.""" - if self.load >= 1.0: - if self._consumer is not None and not self._consumer.is_paused: - _LOGGER.debug("Message backlog over load at %.2f, pausing.", self.load) - self._consumer.pause() + with self._pause_resume_lock: + if self.load >= 1.0: + if self._consumer is not None and not self._consumer.is_paused: + _LOGGER.debug( + "Message backlog over load at %.2f, pausing.", self.load + ) + self._consumer.pause() def maybe_resume_consumer(self): - """Check the current load and resume the consumer if needed.""" - # If we have been paused by flow control, check and see if we are - # back within our limits. - # - # In order to not thrash too much, require us to have passed below - # the resume threshold (80% by default) of each flow control setting - # before restarting. - if self._consumer is None or not self._consumer.is_paused: - return - - if self.load < self.flow_control.resume_threshold: - self._consumer.resume() - else: - _LOGGER.debug("Did not resume, current load is %s", self.load) + """Check the load and held messages and resume the consumer if needed. + + If there are messages held internally, release those messages before + resuming the consumer. That will avoid leaser overload. + """ + with self._pause_resume_lock: + # If we have been paused by flow control, check and see if we are + # back within our limits. + # + # In order to not thrash too much, require us to have passed below + # the resume threshold (80% by default) of each flow control setting + # before restarting. + if self._consumer is None or not self._consumer.is_paused: + return + + _LOGGER.debug("Current load: %.2f", self.load) + + # Before maybe resuming the background consumer, release any messages + # currently on hold, if the current load allows for it. + self._maybe_release_messages() + + if self.load < self.flow_control.resume_threshold: + _LOGGER.debug("Current load is %.2f, resuming consumer.", self.load) + self._consumer.resume() + else: + _LOGGER.debug("Did not resume, current load is %.2f.", self.load) + + def _maybe_release_messages(self): + """Release (some of) the held messages if the current load allows for it. + + The method tries to release as many messages as the current leaser load + would allow. Each released message is added to the lease management, + and the user callback is scheduled for it. + + If there are currently no messageges on hold, or if the leaser is + already overloaded, this method is effectively a no-op. + + The method assumes the caller has acquired the ``_pause_resume_lock``. + """ + while True: + if self.load >= 1.0: + break # already overloaded + + try: + msg = self._messages_on_hold.get_nowait() + except queue.Empty: + break + + self.leaser.add( + [requests.LeaseRequest(ack_id=msg.ack_id, byte_size=msg.size)] + ) + _LOGGER.debug( + "Released held message to leaser, scheduling callback for it, " + "still on hold %s.", + self._messages_on_hold.qsize(), + ) + self._scheduler.schedule(self._callback, msg) def _send_unary_request(self, request): """Send a request using a separate unary request instead of over the @@ -431,9 +488,10 @@ def _on_response(self, response): After the messages have all had their ack deadline updated, execute the callback for each message using the executor. """ - _LOGGER.debug( - "Scheduling callbacks for %s messages.", len(response.received_messages) + "Processing %s received message(s), currenty on hold %s.", + len(response.received_messages), + self._messages_on_hold.qsize(), ) # Immediately modack the messages we received, as this tells the server @@ -443,12 +501,33 @@ def _on_response(self, response): for message in response.received_messages ] self._dispatcher.modify_ack_deadline(items) + + invoke_callbacks_for = [] + for received_message in response.received_messages: message = google.cloud.pubsub_v1.subscriber.message.Message( - received_message.message, received_message.ack_id, self._scheduler.queue + received_message.message, + received_message.ack_id, + self._scheduler.queue, + autolease=False, ) - # TODO: Immediately lease instead of using the callback queue. - self._scheduler.schedule(self._callback, message) + if self.load < 1.0: + req = requests.LeaseRequest( + ack_id=message.ack_id, byte_size=message.size + ) + self.leaser.add([req]) + invoke_callbacks_for.append(message) + self.maybe_pause_consumer() + else: + self._messages_on_hold.put(message) + + _LOGGER.debug( + "Scheduling callbacks for %s new messages, new total on hold %s.", + len(invoke_callbacks_for), + self._messages_on_hold.qsize(), + ) + for msg in invoke_callbacks_for: + self._scheduler.schedule(self._callback, msg) def _should_recover(self, exception): """Determine if an error on the RPC stream should be recovered. diff --git a/pubsub/google/cloud/pubsub_v1/subscriber/message.py b/pubsub/google/cloud/pubsub_v1/subscriber/message.py index 56dde9a7f6b8..b62a28ff6cb6 100644 --- a/pubsub/google/cloud/pubsub_v1/subscriber/message.py +++ b/pubsub/google/cloud/pubsub_v1/subscriber/message.py @@ -70,7 +70,7 @@ class Message(object): published. """ - def __init__(self, message, ack_id, request_queue): + def __init__(self, message, ack_id, request_queue, autolease=True): """Construct the Message. .. note:: @@ -85,6 +85,9 @@ def __init__(self, message, ack_id, request_queue): request_queue (queue.Queue): A queue provided by the policy that can accept requests; the policy is responsible for handling those requests. + autolease (bool): An optional flag determining whether a new Message + instance should automatically lease itself upon creation. + Defaults to :data:`True`. """ self._message = message self._ack_id = ack_id @@ -98,7 +101,8 @@ def __init__(self, message, ack_id, request_queue): # The policy should lease this message, telling PubSub that it has # it until it is acked or otherwise dropped. - self.lease() + if autolease: + self.lease() def __repr__(self): # Get an abbreviated version of the data. @@ -208,8 +212,10 @@ def lease(self): """Inform the policy to lease this message continually. .. note:: - This method is called by the constructor, and you should never - need to call it manually. + By default this method is called by the constructor, and you should + never need to call it manually, unless the + :class:`~.pubsub_v1.subscriber.message.Message` instance was + created with ``autolease=False``. """ self._request_queue.put( requests.LeaseRequest(ack_id=self._ack_id, byte_size=self.size) diff --git a/pubsub/tests/system.py b/pubsub/tests/system.py index e8921e039164..13e81d281f42 100644 --- a/pubsub/tests/system.py +++ b/pubsub/tests/system.py @@ -15,6 +15,7 @@ from __future__ import absolute_import import datetime +import itertools import threading import time @@ -24,6 +25,9 @@ import google.auth from google.cloud import pubsub_v1 +from google.cloud.pubsub_v1 import exceptions +from google.cloud.pubsub_v1 import futures +from google.cloud.pubsub_v1 import types from test_utils.system import unique_resource_id @@ -206,6 +210,85 @@ class CallbackError(Exception): with pytest.raises(CallbackError): future.result(timeout=30) + def test_streaming_pull_max_messages( + self, publisher, topic_path, subscriber, subscription_path, cleanup + ): + # Make sure the topic and subscription get deleted. + cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + + # create a topic and subscribe to it + publisher.create_topic(topic_path) + subscriber.create_subscription(subscription_path, topic_path) + + batch_sizes = (7, 4, 8, 2, 10, 1, 3, 8, 6, 1) # total: 50 + self._publish_messages(publisher, topic_path, batch_sizes=batch_sizes) + + # now subscribe and do the main part, check for max pending messages + total_messages = sum(batch_sizes) + flow_control = types.FlowControl(max_messages=5) + callback = StreamingPullCallback( + processing_time=1, resolve_at_msg_count=total_messages + ) + + subscription_future = subscriber.subscribe( + subscription_path, callback, flow_control=flow_control + ) + + # Expected time to process all messages in ideal case: + # (total_messages / FlowControl.max_messages) * processing_time + # + # With total=50, max messages=5, and processing_time=1 this amounts to + # 10 seconds (+ overhead), thus a full minute should be more than enough + # for the processing to complete. If not, fail the test with a timeout. + try: + callback.done_future.result(timeout=60) + except exceptions.TimeoutError: + pytest.fail( + "Timeout: receiving/processing streamed messages took too long." + ) + + # The callback future gets resolved once total_messages have been processed, + # but we want to wait for just a little bit longer to possibly catch cases + # when the callback gets invoked *more* than total_messages times. + time.sleep(3) + + try: + # All messages should have been processed exactly once, and no more + # than max_messages simultaneously at any time. + assert callback.completed_calls == total_messages + assert sorted(callback.seen_message_ids) == list( + range(1, total_messages + 1) + ) + assert callback.max_pending_ack <= flow_control.max_messages + finally: + subscription_future.cancel() # trigger clean shutdown + + def _publish_messages(self, publisher, topic_path, batch_sizes): + """Publish ``count`` messages in batches and wait until completion.""" + publish_futures = [] + msg_counter = itertools.count(start=1) + + for batch_size in batch_sizes: + msg_batch = self._make_messages(count=batch_size) + for msg in msg_batch: + future = publisher.publish( + topic_path, msg, seq_num=str(next(msg_counter)) + ) + publish_futures.append(future) + time.sleep(0.1) + + # wait untill all messages have been successfully published + for future in publish_futures: + future.result(timeout=30) + + def _make_messages(self, count): + messages = [ + u"message {}/{}".format(i, count).encode("utf-8") + for i in range(1, count + 1) + ] + return messages + class AckCallback(object): def __init__(self): @@ -236,3 +319,33 @@ def __call__(self, message): # ``calls`` is incremented to do it. self.call_times.append(now) self.calls += 1 + + +class StreamingPullCallback(object): + def __init__(self, processing_time, resolve_at_msg_count): + self._lock = threading.Lock() + self._processing_time = processing_time + self._pending_ack = 0 + self.max_pending_ack = 0 + self.completed_calls = 0 + self.seen_message_ids = [] + + self._resolve_at_msg_count = resolve_at_msg_count + self.done_future = futures.Future() + + def __call__(self, message): + with self._lock: + self._pending_ack += 1 + self.max_pending_ack = max(self.max_pending_ack, self._pending_ack) + self.seen_message_ids.append(int(message.attributes["seq_num"])) + + time.sleep(self._processing_time) + + with self._lock: + self._pending_ack -= 1 + message.ack() + self.completed_calls += 1 + + if self.completed_calls >= self._resolve_at_msg_count: + if not self.done_future.done(): + self.done_future.set_result(None) diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py index 98a946ae75c6..8c22992f7a2b 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_message.py @@ -33,7 +33,7 @@ PUBLISHED_SECONDS = datetime_helpers.to_milliseconds(PUBLISHED) // 1000 -def create_message(data, ack_id="ACKID", **attrs): +def create_message(data, ack_id="ACKID", autolease=True, **attrs): with mock.patch.object(message.Message, "lease") as lease: with mock.patch.object(time, "time") as time_: time_.return_value = RECEIVED_SECONDS @@ -48,8 +48,12 @@ def create_message(data, ack_id="ACKID", **attrs): ), ack_id, queue.Queue(), + autolease=autolease, ) - lease.assert_called_once_with() + if autolease: + lease.assert_called_once_with() + else: + lease.assert_not_called() return msg @@ -79,6 +83,11 @@ def test_publish_time(): assert msg.publish_time == PUBLISHED +def test_disable_autolease_on_creation(): + # the create_message() helper does the actual assertion + create_message(b"foo", autolease=False) + + def check_call_types(mock, *args, **kwargs): """Checks a mock's call types. diff --git a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index cbd02e28ac6c..22585675a324 100644 --- a/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -13,9 +13,11 @@ # limitations under the License. import logging +import types as stdlib_types import mock import pytest +from six.moves import queue from google.api_core import bidi from google.api_core import exceptions @@ -113,6 +115,23 @@ def make_manager(**kwargs): ) +def fake_leaser_add(leaser, init_msg_count=0, init_bytes=0): + """Add a simplified fake add() method to a leaser instance. + + The fake add() method actually increases the leaser's internal message count + by one for each message, and the total bytes by 10 for each message (hardcoded, + regardless of the actual message size). + """ + + def fake_add(self, items): + self.message_count += len(items) + self.bytes += len(items) * 10 + + leaser.message_count = init_msg_count + leaser.bytes = init_bytes + leaser.add = stdlib_types.MethodType(fake_add, leaser) + + def test_ack_deadline(): manager = make_manager() assert manager.ack_deadline == 10 @@ -208,6 +227,66 @@ def test_maybe_resume_consumer_wo_consumer_set(): manager.maybe_resume_consumer() # no raise +def test__maybe_release_messages_on_overload(): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000) + ) + # Ensure load is exactly 1.0 (to verify that >= condition is used) + _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) + _leaser.message_count = 10 + _leaser.bytes = 1000 + + msg = mock.create_autospec(message.Message, instance=True, ack_id="ack", size=11) + manager._messages_on_hold.put(msg) + + manager._maybe_release_messages() + + assert manager._messages_on_hold.qsize() == 1 + manager._leaser.add.assert_not_called() + manager._scheduler.schedule.assert_not_called() + + +def test__maybe_release_messages_below_overload(): + manager = make_manager( + flow_control=types.FlowControl(max_messages=10, max_bytes=1000) + ) + manager._callback = mock.sentinel.callback + + # init leaser message count to 8 to leave room for 2 more messages + _leaser = manager._leaser = mock.create_autospec(leaser.Leaser) + fake_leaser_add(_leaser, init_msg_count=8, init_bytes=200) + _leaser.add = mock.Mock(wraps=_leaser.add) # to spy on calls + + messages = [ + mock.create_autospec(message.Message, instance=True, ack_id="ack_foo", size=11), + mock.create_autospec(message.Message, instance=True, ack_id="ack_bar", size=22), + mock.create_autospec(message.Message, instance=True, ack_id="ack_baz", size=33), + ] + for msg in messages: + manager._messages_on_hold.put(msg) + + # the actual call of MUT + manager._maybe_release_messages() + + assert manager._messages_on_hold.qsize() == 1 + msg = manager._messages_on_hold.get_nowait() + assert msg.ack_id == "ack_baz" + + assert len(_leaser.add.mock_calls) == 2 + expected_calls = [ + mock.call([requests.LeaseRequest(ack_id="ack_foo", byte_size=11)]), + mock.call([requests.LeaseRequest(ack_id="ack_bar", byte_size=22)]), + ] + _leaser.add.assert_has_calls(expected_calls) + + schedule_calls = manager._scheduler.schedule.mock_calls + assert len(schedule_calls) == 2 + for _, call_args, _ in schedule_calls: + assert call_args[0] == mock.sentinel.callback + assert isinstance(call_args[1], message.Message) + assert call_args[1].ack_id in ("ack_foo", "ack_bar") + + def test_send_unary(): manager = make_manager() manager._UNARY_REQUESTS = True @@ -470,8 +549,8 @@ def test__get_initial_request_wo_leaser(): assert initial_request.modify_deadline_seconds == [] -def test_on_response(): - manager, _, dispatcher, _, _, scheduler = make_running_manager() +def test__on_response_no_leaser_overload(): + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() manager._callback = mock.sentinel.callback # Set up the messages. @@ -486,6 +565,9 @@ def test_on_response(): ] ) + # adjust message bookkeeping in leaser + fake_leaser_add(leaser, init_msg_count=0, init_bytes=0) + # Actually run the method and prove that modack and schedule # are called in the expected way. manager._on_response(response) @@ -500,6 +582,64 @@ def test_on_response(): assert call[1][0] == mock.sentinel.callback assert isinstance(call[1][1], message.Message) + # the leaser load limit not hit, no messages had to be put on hold + assert manager._messages_on_hold.qsize() == 0 + + +def test__on_response_with_leaser_overload(): + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + + # Set up the messages. + response = types.StreamingPullResponse( + received_messages=[ + types.ReceivedMessage( + ack_id="fack", message=types.PubsubMessage(data=b"foo", message_id="1") + ), + types.ReceivedMessage( + ack_id="back", message=types.PubsubMessage(data=b"bar", message_id="2") + ), + types.ReceivedMessage( + ack_id="zack", message=types.PubsubMessage(data=b"baz", message_id="3") + ), + ] + ) + + # Adjust message bookkeeping in leaser. Pick 99 messages, which is just below + # the default FlowControl.max_messages limit. + fake_leaser_add(leaser, init_msg_count=99, init_bytes=990) + + # Actually run the method and prove that modack and schedule + # are called in the expected way. + manager._on_response(response) + + dispatcher.modify_ack_deadline.assert_called_once_with( + [ + requests.ModAckRequest("fack", 10), + requests.ModAckRequest("back", 10), + requests.ModAckRequest("zack", 10), + ] + ) + + # one message should be scheduled, the leaser capacity allows for it + schedule_calls = scheduler.schedule.mock_calls + assert len(schedule_calls) == 1 + call_args = schedule_calls[0][1] + assert call_args[0] == mock.sentinel.callback + assert isinstance(call_args[1], message.Message) + assert call_args[1].message_id == "1" + + # the rest of the messages should have been put on hold + assert manager._messages_on_hold.qsize() == 2 + while True: + try: + msg = manager._messages_on_hold.get_nowait() + except queue.Empty: + break + else: + assert isinstance(msg, message.Message) + assert msg.message_id in ("2", "3") + def test_retryable_stream_errors(): # Make sure the config matches our hard-coded tuple of exceptions.