Skip to content

Commit

Permalink
Pubsub: send large messages off in its own batch
Browse files Browse the repository at this point in the history
  • Loading branch information
chemelnucfin committed Mar 3, 2018
1 parent 71d4adb commit bbdf265
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 29 deletions.
12 changes: 8 additions & 4 deletions pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def make_lock():
Returns:
_thread.Lock: A newly created lock.
"""
return threading.Lock()
return threading.RLock()

@property
def client(self):
Expand Down Expand Up @@ -204,6 +204,8 @@ def _commit(self):
self._messages,
)
end = time.time()
self._client._publish_count += len(self._messages)
self._client._commit_count += 1
_LOGGER.debug('gRPC Publish took %s seconds.', end - start)

if len(response.message_ids) == len(self._futures):
Expand Down Expand Up @@ -234,9 +236,10 @@ def monitor(self):

# Sleep for however long we should be waiting.
time.sleep(self._settings.max_latency)

_LOGGER.debug('Monitor is waking up')
return self._commit()

if self._size:
return self._commit()

def publish(self, message):
"""Publish a single message.
Expand Down Expand Up @@ -278,7 +281,8 @@ def publish(self, message):

# Try to commit, but it must be **without** the lock held, since
# ``commit()`` will try to obtain the lock.
if num_messages >= self._settings.max_messages:
if (num_messages >= self._settings.max_messages
or self._size >= self._settings.max_bytes):
self.commit()

return future
62 changes: 49 additions & 13 deletions pubsub/google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ def __init__(self, batch_settings=(), batch_class=thread.Batch, **kwargs):
self._batch_class = batch_class
self._batch_lock = batch_class.make_lock()
self._batches = {}
self._commit_count = 0
self._publish_count = 0

@property
def target(self):
Expand All @@ -102,7 +104,8 @@ def target(self):
"""
return publisher_client.PublisherClient.SERVICE_ADDRESS

def batch(self, topic, create=False, autocommit=True):
def _batch(
self, topic, create=False, autocommit=True, batch_settings=None):
"""Return the current batch for the provided topic.
This will create a new batch if ``create=True`` or if no batch
Expand All @@ -117,6 +120,8 @@ def batch(self, topic, create=False, autocommit=True):
primarily useful for debugging and testing, since it allows
the caller to avoid some side effects that batch creation
might have (e.g. spawning a worker to publish a batch).
batch_settings (~.pubsub_v1.types.BatchSettings):
The batch settings to use for this batch.
Returns:
~.pubsub_v1.batch.Batch: The batch object.
Expand All @@ -130,16 +135,40 @@ def batch(self, topic, create=False, autocommit=True):
create = True

if create:
if batch_settings is None:
batch_settings = self.batch_settings

batch = self._batch_class(
autocommit=autocommit,
client=self,
settings=self.batch_settings,
settings=batch_settings,
topic=topic,
)
self._batches[topic] = batch

return batch

def batch(self, topic, create=False, autocommit=True):
"""Return the current batch for the provided topic.
This will create a new batch if ``create=True`` or if no batch
currently exists.
Args:
topic (str): A string representing the topic.
create (bool): Whether to create a new batch. Defaults to
:data:`False`. If :data:`True`, this will create a new batch
even if one already exists.
autocommit (bool): Whether to autocommit this batch. This is
primarily useful for debugging and testing, since it allows
the caller to avoid some side effects that batch creation
might have (e.g. spawning a worker to publish a batch).
Returns:
~.pubsub_v1.batch.Batch: The batch object.
"""
return self._batch(topic, create, autocommit)

def publish(self, topic, data, **attrs):
"""Publish a single message.
Expand Down Expand Up @@ -199,19 +228,26 @@ def publish(self, topic, data, **attrs):

# Create the Pub/Sub message object.
message = types.PubsubMessage(data=data, attributes=attrs)
if message.ByteSize() > self.batch_settings.max_bytes:
raise ValueError(
'Message being published is too large for the '
'batch settings with max bytes {}.'.
format(self.batch_settings.max_bytes)
)

# Delegate the publishing to the batch.
batch = self.batch(topic)
future = None
while future is None:
future = batch.publish(message)
if future is None:
batch = self.batch(topic, create=True)
message_size = message.ByteSize()

if message_size > batch.settings.max_bytes:
with self._batch_lock:
backup = self._batches[topic]
settings = types.BatchSettings(message_size, 0.05, 1)
batch = self._batch(
topic, create=True, batch_settings=settings)
future = None
while future is None:
future = batch.publish(message)
self._batches[topic] = backup
else:
future = None
while future is None:
future = batch.publish(message)
if future is None:
batch = self.batch(topic, create=True)

return future
4 changes: 3 additions & 1 deletion pubsub/nox.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ def system(session, py):
session.install('.')

# Run py.test against the system tests.
session.run('py.test', '--quiet', 'tests/system.py')
session.run('py.test',
'--quiet',
'tests/system.py')


@nox.session
Expand Down
100 changes: 98 additions & 2 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@
import datetime
import threading
import time
from multiprocessing.pool import ThreadPool

import pytest
import six

import google.auth
from google.cloud import pubsub_v1


from test_utils.system import unique_resource_id


Expand Down Expand Up @@ -182,6 +181,84 @@ def test_subscribe_to_messages_async_callbacks(
assert callback.calls >= 2


def worker(function, topic_path, index, max_messages):
if int(index) < max_messages // 2:
max_size = max_messages // 5
else:
max_size = max_messages
bytestring = index.zfill(int(max_size))
return function(topic_path, bytestring, num=str(index))


def worker_helper(args):
function, topic_path, index, max_messages = args
return worker(function, topic_path, index, max_messages)


def test_publish_many_messages_over_batch_size(
publisher, topic_path, cleanup, subscriber, subscription_path):

futures = []
cleanup.append((publisher.delete_topic, topic_path))
publisher.create_topic(topic_path)
publisher._publish_count = 0
publisher._commit_count = 0

subscriber.create_subscription(subscription_path, topic_path)
subscription = subscriber.subscribe(subscription_path)

max_bytes = 40
max_latency = 5
max_messages = 40
publisher.batch_settings = pubsub_v1.types.BatchSettings(
max_bytes,
max_latency,
max_messages)
callback = MessageAckCallback()

# Actually open the subscription and hold it open for a few seconds.
subscription.open(callback)

pool = ThreadPool()
if six.PY3:
indices = [
str(index).encode()
for index in six.moves.range(max_messages)]
else:
indices = [bytes(index) for index in six.moves.range(max_messages)]
futures = pool.map(
worker_helper,
zip([publisher.publish] * max_messages,
[topic_path] * max_messages,
indices,
[max_messages] * max_messages)
)
pool.close()
pool.join()

for future in futures:
future.result()

assert len(futures) == max_messages
assert publisher._publish_count == 40
assert publisher._commit_count <= publisher._publish_count

# We want to make sure that the callback was called asynchronously. So
# track when each call happened and make sure below.
for second in six.moves.range(50):
time.sleep(1)

# The callback should have fired at least fifty times, but it
# may take some time.
if callback.calls >= max_messages:
break

assert sorted(set(callback.data)) == sorted(indices)

# Okay, we took too long; fail out.
assert callback.calls >= max_messages


class AckCallback(object):

def __init__(self):
Expand All @@ -195,6 +272,25 @@ def __call__(self, message):
self.calls += 1


class MessageAckCallback(object):

def __init__(self):
self.calls = 0
self.data = []
self.lock = threading.Lock()

def __call__(self, message):
message.ack()
# Only increment the number of calls **after** finishing.
with self.lock:
self.calls += 1
data = message.data.lstrip(b'0')
if data == b'':
self.data.append(b'0')
else:
self.data.append(data)


class TimesCallback(object):

def __init__(self, sleep_time):
Expand Down
3 changes: 2 additions & 1 deletion pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def test_init_infinite_latency():
assert batch._thread is None


@mock.patch.object(threading, 'Lock')
@mock.patch.object(threading, 'RLock')
def test_make_lock(Lock):
lock = Batch.make_lock()
assert lock is Lock.return_value
Expand Down Expand Up @@ -203,6 +203,7 @@ def test_blocking__commit_wrong_messageid_length():

def test_monitor():
batch = create_batch(max_latency=5.0)
batch._size = 1
with mock.patch.object(time, 'sleep') as sleep:
with mock.patch.object(type(batch), '_commit') as _commit:
batch.monitor()
Expand Down
40 changes: 32 additions & 8 deletions pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@

import mock
import pytest
import sys


from google.cloud.pubsub_v1.gapic import publisher_client
from google.cloud.pubsub_v1 import publisher
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.gapic import publisher_client
from google.cloud.pubsub_v1.publisher.batch import thread


def test_init():
Expand Down Expand Up @@ -93,6 +96,7 @@ def test_publish():
# Use a mock in lieu of the actual batch class.
batch = mock.Mock(spec=client._batch_class)
# Set the mock up to claim indiscriminately that it accepts all messages.
batch.settings.max_bytes = sys.maxsize
batch.will_accept.return_value = True
batch.publish.side_effect = (
mock.sentinel.future1,
Expand Down Expand Up @@ -135,13 +139,30 @@ def test_publish_data_too_large():
creds = mock.Mock(spec=credentials.Credentials)
client = publisher.Client(credentials=creds)
topic = 'topic/path'
client.batch_settings = types.BatchSettings(
0,
client.batch_settings.max_latency,
client.batch_settings.max_messages
)
with pytest.raises(ValueError):
client.publish(topic, b'This is a text string.')
to_send = b'This is a text string.'

original_batch = mock.create_autospec(thread.Batch)
original_batch.settings.max_bytes = 0
new_batch = mock.create_autospec(thread.Batch)
new_batch.settings.max_bytes = len(to_send)

client._batches[topic] = original_batch
original_batch._commit.assert_not_called()
batch_class = 'google.cloud.pubsub_v1.publisher.client.Client._batch'
with mock.patch(batch_class) as batch:
batch.return_value = new_batch
client.publish(topic, to_send)
new_batch.publish.assert_called_once()
original_batch._commit.assert_not_called()


def test__batch():
creds = mock.Mock(spec=credentials.Credentials)
client = publisher.Client(credentials=creds)
topic = 'topic/path'
batch_settings = types.BatchSettings(1, 2, 3)
batch = client._batch(topic, batch_settings=batch_settings)
assert batch._settings == batch_settings


def test_publish_attrs_bytestring():
Expand All @@ -151,6 +172,7 @@ def test_publish_attrs_bytestring():
# Use a mock in lieu of the actual batch class.
batch = mock.Mock(spec=client._batch_class)
# Set the mock up to claim indiscriminately that it accepts all messages.
batch.settings.max_bytes = sys.maxsize
batch.will_accept.return_value = True

topic = 'topic/path'
Expand Down Expand Up @@ -180,7 +202,9 @@ def test_publish_new_batch_needed():
# Set the first mock up to claim indiscriminately that it rejects all
# messages and the second accepts all.
batch1.publish.return_value = None
batch1.settings.max_bytes = sys.maxsize
batch2.publish.return_value = mock.sentinel.future
batch2.settings.max_bytes = sys.maxsize

topic = 'topic/path'
client._batches[topic] = batch1
Expand Down

0 comments on commit bbdf265

Please sign in to comment.