Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Normalize overflow handling for max count and bytes #5343

Merged
merged 2 commits into from
May 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions pubsub/google/cloud/pubsub_v1/publisher/batch/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,6 @@ def will_accept(self, message):
if self.status != BatchStatus.ACCEPTING_MESSAGES:
return False

# If this message will make the batch exceed the ``max_bytes``
# setting, return False.
if self.size + message.ByteSize() > self.settings.max_bytes:
return False

# If this message will make the batch exceed the ``max_messages``
# setting, return False.
if len(self.messages) >= self.settings.max_messages:
Expand Down
36 changes: 22 additions & 14 deletions pubsub/google/cloud/pubsub_v1/publisher/batch/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,25 +280,33 @@ def publish(self, message):
if not isinstance(message, types.PubsubMessage):
message = types.PubsubMessage(**message)

future = None

with self._state_lock:
if not self.will_accept(message):
return None

# Add the size to the running total of the size, so we know
# if future messages need to be rejected.
self._size += message.ByteSize()
# Store the actual message in the batch's message queue.
self._messages.append(message)
# Track the future on this batch (so that the result of the
# future can be set).
future = futures.Future(completed=threading.Event())
self._futures.append(future)
# Determine the number of messages before releasing the lock.
num_messages = len(self._messages)
return future

new_size = self._size + message.ByteSize()
new_count = len(self._messages) + 1
overflow = (
new_size > self.settings.max_bytes or
new_count >= self._settings.max_messages
)

if not self._messages or not overflow:

# Store the actual message in the batch's message queue.
self._messages.append(message)
self._size = new_size

# Track the future on this batch (so that the result of the
# future can be set).
future = futures.Future(completed=threading.Event())
self._futures.append(future)

# Try to commit, but it must be **without** the lock held, since
# ``commit()`` will try to obtain the lock.
if num_messages >= self._settings.max_messages:
if overflow:
self.commit()

return future
6 changes: 0 additions & 6 deletions pubsub/google/cloud/pubsub_v1/publisher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,6 @@ def publish(self, topic, data, **attrs):

# Create the Pub/Sub message object.
message = types.PubsubMessage(data=data, attributes=attrs)
if message.ByteSize() > self.batch_settings.max_bytes:
raise ValueError(
'Message being published is too large for the '
'batch settings with max bytes {}.'.
format(self.batch_settings.max_bytes)
)

# Delegate the publishing to the batch.
batch = self.batch(topic)
Expand Down
14 changes: 7 additions & 7 deletions pubsub/tests/unit/pubsub_v1/publisher/batch/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,18 @@ def test_will_accept():
assert batch.will_accept(message) is True


def test_will_not_accept_status():
batch = create_batch(status='talk to the hand')
message = types.PubsubMessage()
assert batch.will_accept(message) is False


def test_will_not_accept_size():
def test_will_accept_oversize():
batch = create_batch(
settings=types.BatchSettings(max_bytes=10),
status=BatchStatus.ACCEPTING_MESSAGES,
)
message = types.PubsubMessage(data=b'abcdefghijklmnopqrstuvwxyz')
assert batch.will_accept(message) is True


def test_will_not_accept_status():
batch = create_batch(status='talk to the hand')
message = types.PubsubMessage()
assert batch.will_accept(message) is False


Expand Down
4 changes: 2 additions & 2 deletions pubsub/tests/unit/pubsub_v1/publisher/batch/test_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,12 @@ def test_publish_exceed_max_messages():
assert commit.call_count == 0

# When a fourth message is published, commit should be called.
# No future will be returned in this case.
future = batch.publish(types.PubsubMessage(data=b'last one'))
commit.assert_called_once_with()

futures.append(future)
assert future is None
assert batch._futures == futures
assert len(futures) == max_messages


def test_publish_dict():
Expand Down
13 changes: 0 additions & 13 deletions pubsub/tests/unit/pubsub_v1/publisher/test_publisher_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,19 +131,6 @@ def test_publish_data_not_bytestring_error():
client.publish(topic, 42)


def test_publish_data_too_large():
creds = mock.Mock(spec=credentials.Credentials)
client = publisher.Client(credentials=creds)
topic = 'topic/path'
client.batch_settings = types.BatchSettings(
0,
client.batch_settings.max_latency,
client.batch_settings.max_messages
)
with pytest.raises(ValueError):
client.publish(topic, b'This is a text string.')


def test_publish_attrs_bytestring():
creds = mock.Mock(spec=credentials.Credentials)
client = publisher.Client(credentials=creds)
Expand Down