Skip to content

Commit

Permalink
Added sample for publishing/receiving messages with custom attributes…
Browse files Browse the repository at this point in the history
  • Loading branch information
chenyumic authored and plamut committed Jul 10, 2020
1 parent b24f725 commit 0f9df7a
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 0 deletions.
24 changes: 24 additions & 0 deletions samples/snippets/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ def publish_messages(project, topic_name):
print('Published messages.')


def publish_messages_with_custom_attributes(project, topic_name):
"""Publishes multiple messages with custom attributes
to a Pub/Sub topic."""
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# Add two attributes, origin and username, to the message
publisher.publish(
topic_path, data, origin='python-sample', username='gcp')

print('Published messages with custom attributes.')


def publish_messages_with_futures(project, topic_name):
"""Publishes multiple messages to a Pub/Sub topic and prints their
message IDs."""
Expand Down Expand Up @@ -132,6 +149,11 @@ def publish_messages_with_batch_settings(project, topic_name):
'publish', help=publish_messages.__doc__)
publish_parser.add_argument('topic_name')

publish_with_custom_attributes_parser = subparsers.add_parser(
'publish-with-custom-attributes',
help=publish_messages_with_custom_attributes.__doc__)
publish_with_custom_attributes_parser.add_argument('topic_name')

publish_with_futures_parser = subparsers.add_parser(
'publish-with-futures',
help=publish_messages_with_futures.__doc__)
Expand All @@ -152,6 +174,8 @@ def publish_messages_with_batch_settings(project, topic_name):
delete_topic(args.project, args.topic_name)
elif args.command == 'publish':
publish_messages(args.project, args.topic_name)
elif args.command == 'publish-with-custom-attributes':
publish_messages_with_custom_attributes(args.project, args.topic_name)
elif args.command == 'publish-with-futures':
publish_messages_with_futures(args.project, args.topic_name)
elif args.command == 'publish-with-batch-settings':
Expand Down
7 changes: 7 additions & 0 deletions samples/snippets/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ def test_publish(topic, capsys):
assert 'Published' in out


def test_publish_with_custom_attributes(topic, capsys):
publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC)

out, _ = capsys.readouterr()
assert 'Published' in out


def test_publish_with_batch_settings(topic, capsys):
publisher.publish_messages_with_batch_settings(PROJECT, TOPIC)

Expand Down
32 changes: 32 additions & 0 deletions samples/snippets/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,30 @@ def callback(message):
time.sleep(60)


def receive_messages_with_custom_attributes(project, subscription_name):
"""Receives messages from a pull subscription."""
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)

def callback(message):
print('Received message: {}'.format(message.data))
if message.attributes:
print('Attributes:')
for key in message.attributes:
value = message.attributes.get(key)
print('{}: {}'.format(key, value))
message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)


def receive_messages_with_flow_control(project, subscription_name):
"""Receives messages from a pull subscription with flow control."""
subscriber = pubsub_v1.SubscriberClient()
Expand Down Expand Up @@ -227,6 +251,11 @@ def callback(message):
'receive', help=receive_messages.__doc__)
receive_parser.add_argument('subscription_name')

receive_with_custom_attributes_parser = subparsers.add_parser(
'receive-custom-attributes',
help=receive_messages_with_custom_attributes.__doc__)
receive_with_custom_attributes_parser.add_argument('subscription_name')

receive_with_flow_control_parser = subparsers.add_parser(
'receive-flow-control',
help=receive_messages_with_flow_control.__doc__)
Expand Down Expand Up @@ -259,6 +288,9 @@ def callback(message):
args.project, args.subscription_name, args.endpoint)
elif args.command == 'receive':
receive_messages(args.project, args.subscription_name)
elif args.command == 'receive-custom-attributes':
receive_messages_with_custom_attributes(
args.project, args.subscription_name)
elif args.command == 'receive-flow-control':
receive_messages_with_flow_control(
args.project, args.subscription_name)
Expand Down
21 changes: 21 additions & 0 deletions samples/snippets/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ def _publish_messages(publisher_client, topic):
topic, data=data)


def _publish_messages_with_custom_attributes(publisher_client, topic):
data = u'Test message'.encode('utf-8')
publisher_client.publish(topic, data=data, origin='python-sample')


def _make_sleep_patch():
real_sleep = time.sleep

Expand All @@ -155,6 +160,22 @@ def test_receive(publisher_client, topic, subscription, capsys):
assert 'Message 1' in out


def test_receive_with_custom_attributes(
publisher_client, topic, subscription, capsys):
_publish_messages_with_custom_attributes(publisher_client, topic)

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
subscriber.receive_messages_with_custom_attributes(
PROJECT, SUBSCRIPTION)

out, _ = capsys.readouterr()
assert 'Test message' in out
assert 'Attributes' in out
assert 'origin' in out
assert 'python-sample' in out


def test_receive_with_flow_control(
publisher_client, topic, subscription, capsys):
_publish_messages(publisher_client, topic)
Expand Down

0 comments on commit 0f9df7a

Please sign in to comment.