Skip to content

Commit

Permalink
Add system test for subscriber batch callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed May 17, 2019
1 parent c38c2d9 commit 49fa1b2
Showing 1 changed file with 40 additions and 1 deletion.
41 changes: 40 additions & 1 deletion pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from google.cloud.pubsub_v1 import exceptions
from google.cloud.pubsub_v1 import futures
from google.cloud.pubsub_v1 import types

from google.cloud.pubsub_v1.subscriber import message

from test_utils.system import unique_resource_id

Expand Down Expand Up @@ -210,6 +210,45 @@ class CallbackError(Exception):
with pytest.raises(CallbackError):
future.result(timeout=30)

def test_batch_callbacks(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):
# Make sure the topic and subscription get deleted.
cleanup.append((publisher.delete_topic, topic_path))
cleanup.append((subscriber.delete_subscription, subscription_path))

# create a topic and subscribe to it
publisher.create_topic(topic_path)
subscriber.create_subscription(subscription_path, topic_path)

# publish messages and wait until published
self._publish_messages(publisher, topic_path, batch_sizes=[3])

# start pulling messages and check that the callback is indeed invoked
# with batches of messages
callback_lock = threading.Lock()
callback_invoked = futures.Future()

def callback(messages):
with callback_lock:
if not callback_invoked.done():
callback_invoked.set_result(messages)

subscription_future = subscriber.subscribe(
subscription_path, callback, batch=True
)

try:
result = callback_invoked.result(timeout=30)
except exceptions.TimeoutError:
pytest.fail("Subscription callback not invoked in time.")
else:
assert isinstance(result, list)
assert len(result) == 3
assert all(isinstance(item, message.Message) for item in result)
finally:
subscription_future.cancel() # shutdown streaming pull

def test_streaming_pull_max_messages(
self, publisher, topic_path, subscriber, subscription_path, cleanup
):
Expand Down

0 comments on commit 49fa1b2

Please sign in to comment.