Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub: Fix streaming pull incorrectly handling FlowControl max_messages setting #7948

Merged
merged 5 commits into from
May 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 30 additions & 22 deletions pubsub/google/cloud/pubsub_v1/subscriber/_protocol/leaser.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,15 @@
class Leaser(object):
def __init__(self, manager):
self._thread = None
self._operational_lock = threading.Lock()
self._manager = manager

# a lock used for start/stop operations, protecting the _thread attribute
self._operational_lock = threading.Lock()

# A lock ensuring that add/remove operations are atomic and cannot be
# intertwined. Protects the _leased_messages and _bytes attributes.
self._add_remove_lock = threading.Lock()

self._leased_messages = {}
"""dict[str, float]: A mapping of ack IDs to the local time when the
ack ID was initially leased in seconds since the epoch."""
Expand All @@ -64,30 +70,32 @@ def bytes(self):

def add(self, items):
"""Add messages to be managed by the leaser."""
for item in items:
# Add the ack ID to the set of managed ack IDs, and increment
# the size counter.
if item.ack_id not in self._leased_messages:
self._leased_messages[item.ack_id] = _LeasedMessage(
added_time=time.time(), size=item.byte_size
)
self._bytes += item.byte_size
else:
_LOGGER.debug("Message %s is already lease managed", item.ack_id)
with self._add_remove_lock:
for item in items:
# Add the ack ID to the set of managed ack IDs, and increment
# the size counter.
if item.ack_id not in self._leased_messages:
self._leased_messages[item.ack_id] = _LeasedMessage(
added_time=time.time(), size=item.byte_size
)
self._bytes += item.byte_size
else:
_LOGGER.debug("Message %s is already lease managed", item.ack_id)

def remove(self, items):
"""Remove messages from lease management."""
# Remove the ack ID from lease management, and decrement the
# byte counter.
for item in items:
if self._leased_messages.pop(item.ack_id, None) is not None:
self._bytes -= item.byte_size
else:
_LOGGER.debug("Item %s was not managed.", item.ack_id)

if self._bytes < 0:
_LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes)
self._bytes = 0
with self._add_remove_lock:
# Remove the ack ID from lease management, and decrement the
# byte counter.
for item in items:
if self._leased_messages.pop(item.ack_id, None) is not None:
self._bytes -= item.byte_size
else:
_LOGGER.debug("Item %s was not managed.", item.ack_id)

if self._bytes < 0:
_LOGGER.debug("Bytes was unexpectedly negative: %d", self._bytes)
self._bytes = 0

def maintain_leases(self):
"""Maintain all of the leases being managed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import grpc
import six
from six.moves import queue

from google.api_core import bidi
from google.api_core import exceptions
Expand Down Expand Up @@ -116,6 +117,16 @@ def __init__(
else:
self._scheduler = scheduler

# A FIFO queue for the messages that have been received from the server,
# but not yet added to the lease management (and not sent to user callback),
# because the FlowControl limits have been hit.
self._messages_on_hold = queue.Queue()
plamut marked this conversation as resolved.
Show resolved Hide resolved

# A lock ensuring that pausing / resuming the consumer are both atomic
# operations that cannot be executed concurrently. Needed for properly
# syncing these operations with the current leaser load.
self._pause_resume_lock = threading.Lock()

# The threads created in ``.open()``.
self._dispatcher = None
self._leaser = None
Expand Down Expand Up @@ -211,26 +222,72 @@ def add_close_callback(self, callback):

def maybe_pause_consumer(self):
"""Check the current load and pause the consumer if needed."""
if self.load >= 1.0:
if self._consumer is not None and not self._consumer.is_paused:
_LOGGER.debug("Message backlog over load at %.2f, pausing.", self.load)
self._consumer.pause()
with self._pause_resume_lock:
if self.load >= 1.0:
if self._consumer is not None and not self._consumer.is_paused:
_LOGGER.debug(
"Message backlog over load at %.2f, pausing.", self.load
)
self._consumer.pause()

def maybe_resume_consumer(self):
"""Check the current load and resume the consumer if needed."""
# If we have been paused by flow control, check and see if we are
# back within our limits.
#
# In order to not thrash too much, require us to have passed below
# the resume threshold (80% by default) of each flow control setting
# before restarting.
if self._consumer is None or not self._consumer.is_paused:
return

if self.load < self.flow_control.resume_threshold:
self._consumer.resume()
else:
_LOGGER.debug("Did not resume, current load is %s", self.load)
"""Check the load and held messages and resume the consumer if needed.

If there are messages held internally, release those messages before
resuming the consumer. That will avoid leaser overload.
"""
with self._pause_resume_lock:
# If we have been paused by flow control, check and see if we are
# back within our limits.
#
# In order to not thrash too much, require us to have passed below
# the resume threshold (80% by default) of each flow control setting
# before restarting.
if self._consumer is None or not self._consumer.is_paused:
return

_LOGGER.debug("Current load: %.2f", self.load)

# Before maybe resuming the background consumer, release any messages
# currently on hold, if the current load allows for it.
self._maybe_release_messages()

if self.load < self.flow_control.resume_threshold:
_LOGGER.debug("Current load is %.2f, resuming consumer.", self.load)
self._consumer.resume()
else:
_LOGGER.debug("Did not resume, current load is %.2f.", self.load)

def _maybe_release_messages(self):
"""Release (some of) the held messages if the current load allows for it.

The method tries to release as many messages as the current leaser load
would allow. Each released message is added to the lease management,
and the user callback is scheduled for it.

If there are currently no messageges on hold, or if the leaser is
already overloaded, this method is effectively a no-op.

The method assumes the caller has acquired the ``_pause_resume_lock``.
"""
plamut marked this conversation as resolved.
Show resolved Hide resolved
while True:
if self.load >= 1.0:
break # already overloaded

try:
msg = self._messages_on_hold.get_nowait()
except queue.Empty:
break

self.leaser.add(
[requests.LeaseRequest(ack_id=msg.ack_id, byte_size=msg.size)]
)
_LOGGER.debug(
"Released held message to leaser, scheduling callback for it, "
"still on hold %s.",
self._messages_on_hold.qsize(),
)
self._scheduler.schedule(self._callback, msg)

def _send_unary_request(self, request):
"""Send a request using a separate unary request instead of over the
Expand Down Expand Up @@ -431,9 +488,10 @@ def _on_response(self, response):
After the messages have all had their ack deadline updated, execute
the callback for each message using the executor.
"""

_LOGGER.debug(
"Scheduling callbacks for %s messages.", len(response.received_messages)
"Processing %s received message(s), currenty on hold %s.",
len(response.received_messages),
self._messages_on_hold.qsize(),
)

# Immediately modack the messages we received, as this tells the server
Expand All @@ -443,12 +501,33 @@ def _on_response(self, response):
for message in response.received_messages
]
self._dispatcher.modify_ack_deadline(items)

invoke_callbacks_for = []

for received_message in response.received_messages:
message = google.cloud.pubsub_v1.subscriber.message.Message(
received_message.message, received_message.ack_id, self._scheduler.queue
received_message.message,
received_message.ack_id,
self._scheduler.queue,
autolease=False,
)
# TODO: Immediately lease instead of using the callback queue.
self._scheduler.schedule(self._callback, message)
if self.load < 1.0:
req = requests.LeaseRequest(
ack_id=message.ack_id, byte_size=message.size
)
self.leaser.add([req])
invoke_callbacks_for.append(message)
self.maybe_pause_consumer()
else:
self._messages_on_hold.put(message)

_LOGGER.debug(
"Scheduling callbacks for %s new messages, new total on hold %s.",
len(invoke_callbacks_for),
self._messages_on_hold.qsize(),
)
for msg in invoke_callbacks_for:
self._scheduler.schedule(self._callback, msg)

def _should_recover(self, exception):
"""Determine if an error on the RPC stream should be recovered.
Expand Down
14 changes: 10 additions & 4 deletions pubsub/google/cloud/pubsub_v1/subscriber/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class Message(object):
published.
"""

def __init__(self, message, ack_id, request_queue):
def __init__(self, message, ack_id, request_queue, autolease=True):
"""Construct the Message.

.. note::
Expand All @@ -85,6 +85,9 @@ def __init__(self, message, ack_id, request_queue):
request_queue (queue.Queue): A queue provided by the policy that
can accept requests; the policy is responsible for handling
those requests.
autolease (bool): An optional flag determining whether a new Message
instance should automatically lease itself upon creation.
Defaults to :data:`True`.
"""
self._message = message
self._ack_id = ack_id
Expand All @@ -98,7 +101,8 @@ def __init__(self, message, ack_id, request_queue):

# The policy should lease this message, telling PubSub that it has
# it until it is acked or otherwise dropped.
self.lease()
if autolease:
self.lease()

def __repr__(self):
# Get an abbreviated version of the data.
Expand Down Expand Up @@ -208,8 +212,10 @@ def lease(self):
"""Inform the policy to lease this message continually.

.. note::
This method is called by the constructor, and you should never
need to call it manually.
By default this method is called by the constructor, and you should
never need to call it manually, unless the
:class:`~.pubsub_v1.subscriber.message.Message` instance was
created with ``autolease=False``.
"""
self._request_queue.put(
requests.LeaseRequest(ack_id=self._ack_id, byte_size=self.size)
Expand Down
Loading