Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added sample for publishing/receiving messages with custom attributes #1409

Merged
merged 1 commit into from
Mar 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions pubsub/cloud-client/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@ def publish_messages(project, topic_name):
print('Published messages.')


def publish_messages_with_custom_attributes(project, topic_name):
"""Publishes multiple messages with custom attributes
to a Pub/Sub topic."""
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic_name)

for n in range(1, 10):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# Add two attributes, origin and username, to the message
publisher.publish(
topic_path, data, origin='python-sample', username='gcp')

print('Published messages with custom attributes.')


def publish_messages_with_futures(project, topic_name):
"""Publishes multiple messages to a Pub/Sub topic and prints their
message IDs."""
Expand Down Expand Up @@ -132,6 +149,11 @@ def publish_messages_with_batch_settings(project, topic_name):
'publish', help=publish_messages.__doc__)
publish_parser.add_argument('topic_name')

publish_with_custom_attributes_parser = subparsers.add_parser(
'publish-with-custom-attributes',
help=publish_messages_with_custom_attributes.__doc__)
publish_with_custom_attributes_parser.add_argument('topic_name')

publish_with_futures_parser = subparsers.add_parser(
'publish-with-futures',
help=publish_messages_with_futures.__doc__)
Expand All @@ -152,6 +174,8 @@ def publish_messages_with_batch_settings(project, topic_name):
delete_topic(args.project, args.topic_name)
elif args.command == 'publish':
publish_messages(args.project, args.topic_name)
elif args.command == 'publish-with-custom-attributes':
publish_messages_with_custom_attributes(args.project, args.topic_name)
elif args.command == 'publish-with-futures':
publish_messages_with_futures(args.project, args.topic_name)
elif args.command == 'publish-with-batch-settings':
Expand Down
7 changes: 7 additions & 0 deletions pubsub/cloud-client/publisher_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ def test_publish(topic, capsys):
assert 'Published' in out


def test_publish_with_custom_attributes(topic, capsys):
publisher.publish_messages_with_custom_attributes(PROJECT, TOPIC)

out, _ = capsys.readouterr()
assert 'Published' in out


def test_publish_with_batch_settings(topic, capsys):
publisher.publish_messages_with_batch_settings(PROJECT, TOPIC)

Expand Down
32 changes: 32 additions & 0 deletions pubsub/cloud-client/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,30 @@ def callback(message):
time.sleep(60)


def receive_messages_with_custom_attributes(project, subscription_name):
"""Receives messages from a pull subscription."""
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)

def callback(message):
print('Received message: {}'.format(message.data))
if message.attributes:
print('Attributes:')
for key in message.attributes:
value = message.attributes.get(key)
print('{}: {}'.format(key, value))
message.ack()

subscriber.subscribe(subscription_path, callback=callback)

# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)


def receive_messages_with_flow_control(project, subscription_name):
"""Receives messages from a pull subscription with flow control."""
subscriber = pubsub_v1.SubscriberClient()
Expand Down Expand Up @@ -227,6 +251,11 @@ def callback(message):
'receive', help=receive_messages.__doc__)
receive_parser.add_argument('subscription_name')

receive_with_custom_attributes_parser = subparsers.add_parser(
'receive-custom-attributes',
help=receive_messages_with_custom_attributes.__doc__)
receive_with_custom_attributes_parser.add_argument('subscription_name')

receive_with_flow_control_parser = subparsers.add_parser(
'receive-flow-control',
help=receive_messages_with_flow_control.__doc__)
Expand Down Expand Up @@ -259,6 +288,9 @@ def callback(message):
args.project, args.subscription_name, args.endpoint)
elif args.command == 'receive':
receive_messages(args.project, args.subscription_name)
elif args.command == 'receive-custom-attributes':
receive_messages_with_custom_attributes(
args.project, args.subscription_name)
elif args.command == 'receive-flow-control':
receive_messages_with_flow_control(
args.project, args.subscription_name)
Expand Down
21 changes: 21 additions & 0 deletions pubsub/cloud-client/subscriber_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ def _publish_messages(publisher_client, topic):
topic, data=data)


def _publish_messages_with_custom_attributes(publisher_client, topic):
data = u'Test message'.encode('utf-8')
publisher_client.publish(topic, data=data, origin='python-sample')


def _make_sleep_patch():
real_sleep = time.sleep

Expand All @@ -155,6 +160,22 @@ def test_receive(publisher_client, topic, subscription, capsys):
assert 'Message 1' in out


def test_receive_with_custom_attributes(
publisher_client, topic, subscription, capsys):
_publish_messages_with_custom_attributes(publisher_client, topic)

with _make_sleep_patch():
with pytest.raises(RuntimeError, match='sigil'):
subscriber.receive_messages_with_custom_attributes(
PROJECT, SUBSCRIPTION)

out, _ = capsys.readouterr()
assert 'Test message' in out
assert 'Attributes' in out
assert 'origin' in out
assert 'python-sample' in out


def test_receive_with_flow_control(
publisher_client, topic, subscription, capsys):
_publish_messages(publisher_client, topic)
Expand Down