Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub end-to-end sample #1800

Merged
merged 3 commits into from
Oct 30, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pubsub/cloud-client/publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def list_topics(project_id):

def create_topic(project_id, topic_name):
"""Create a new Pub/Sub topic."""
# [START pubsub_quickstart_create_topic]
# [START pubsub_create_topic]
from google.cloud import pubsub_v1

Expand All @@ -53,6 +54,7 @@ def create_topic(project_id, topic_name):
topic = publisher.create_topic(topic_path)

print('Topic created: {}'.format(topic))
# [END pubsub_quickstart_create_topic]
# [END pubsub_create_topic]


Expand Down
91 changes: 80 additions & 11 deletions pubsub/cloud-client/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,94 @@
# limitations under the License.


def run_quickstart():
# [START pubsub_quickstart_create_topic]
# Imports the Google Cloud client library
import argparse


def end_to_end(project_id, topic_name, subscription_name, num_messages):
# [START pubsub_end_to_end]
import time

from google.cloud import pubsub_v1

# Instantiates a client
# TODO project_id = "Your Google Cloud Project ID"
# TODO topic_name = "Your Pub/Sub topic name"
# TODO num_messages = number of messages to test end-to-end

# Instantiates a publisher and subscriber client
publisher = pubsub_v1.PublisherClient()
subscriber = pubsub_v1.SubscriberClient()

# The `topic_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/topics/{topic_name}`
topic_path = subscriber.topic_path(project_id, topic_name)

# The resource path for the new topic contains the project ID
# and the topic name.
topic_path = publisher.topic_path(
'my-project', 'my-new-topic')
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_name}`
subscription_path = subscriber.subscription_path(
project_id, subscription_name)

# Create the topic.
topic = publisher.create_topic(topic_path)
print('\nTopic created: {}'.format(topic.name))

# Create a subscription.
subscription = subscriber.create_subscription(
subscription_path, topic_path)
print('\nSubscription created: {}\n'.format(subscription.name))

publish_begin = time.time()

# Publish messages.
for n in range(num_messages):
data = u'Message number {}'.format(n)
# Data must be a bytestring
data = data.encode('utf-8')
# When you publish a message, the client returns a future.
future = publisher.publish(topic_path, data=data)
print('Published {} of message ID {}.'.format(data, future.result()))

publish_time = time.time() - publish_begin

print('Topic created: {}'.format(topic))
# [END pubsub_quickstart_create_topic]
messages = set()

def callback(message):
print('Received message: {}'.format(message))
# Unacknowledged messages will be sent again.
message.ack()
messages.add(message)

subscribe_begin = time.time()

# Receive messages. The subscriber is nonblocking.
subscriber.subscribe(subscription_path, callback=callback)

print('\nListening for messages on {}...\n'.format(subscription_path))

while True:
if len(messages) == num_messages:
subscribe_time = time.time() - subscribe_begin
print("\nReceived all messages.")
print("Publish time lapsed: {:.2f}s.".format(publish_time))
print("Subscribe time lapsed: {:.2f}s.".format(subscribe_time))
break
else:
# Sleeps the thread at 50Hz to save on resources.
time.sleep(1./50)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit. Flake8 will complain if there isn't a space around the / operator.

# [END pubsub_end_to_end]


if __name__ == '__main__':
run_quickstart()

parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument('project_id', help='Your Google Cloud project ID')
parser.add_argument('topic_name', help='Your topic name')
parser.add_argument('subscription_name', help='Your subscription name')
parser.add_argument('num_msgs', type=int, help='Number of test messages')

args = parser.parse_args()

end_to_end(args.project_id, args.topic_name, args.subscription_name,
args.num_msgs)
58 changes: 41 additions & 17 deletions pubsub/cloud-client/quickstart_test.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Copyright 2016 Google Inc. All Rights Reserved.
#!/usr/bin/env python

# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,33 +17,55 @@
import os

from google.cloud import pubsub_v1
import mock
import pytest

import quickstart

PROJECT = os.environ['GCLOUD_PROJECT']
# Must match the dataset listed in quickstart.py
TOPIC_NAME = 'my-new-topic'
TOPIC_PATH = 'projects/{}/topics/{}'.format(PROJECT, TOPIC_NAME)
TOPIC = 'end-to-end-test-topic'
SUBSCRIPTION = 'end-to-end-test-topic-sub'
N = 10


@pytest.fixture(scope='module')
def publisher_client():
yield pubsub_v1.PublisherClient()


@pytest.fixture
def temporary_topic():
"""Fixture that ensures the test topic does not exist before the test."""
publisher = pubsub_v1.PublisherClient()
@pytest.fixture(scope='module')
def topic(publisher_client):
topic_path = publisher_client.topic_path(PROJECT, TOPIC)

try:
publisher.delete_topic(TOPIC_PATH)
publisher_client.delete_topic(topic_path)
except Exception:
pass

yield
yield TOPIC


@mock.patch.object(
pubsub_v1.PublisherClient, 'topic_path', return_value=TOPIC_PATH)
def test_quickstart(unused_topic_path, temporary_topic, capsys):
quickstart.run_quickstart()
@pytest.fixture(scope='module')
def subscriber_client():
yield pubsub_v1.SubscriberClient()


@pytest.fixture(scope='module')
def subscription(subscriber_client, topic):
subscription_path = subscriber_client.subscription_path(
PROJECT, SUBSCRIPTION)

try:
subscriber_client.delete_subscription(subscription_path)
except Exception:
pass

yield SUBSCRIPTION


def test_end_to_end(topic, subscription, capsys):

quickstart.end_to_end(PROJECT, topic, subscription, N)
out, _ = capsys.readouterr()
assert TOPIC_NAME in out

assert "Received all messages" in out
assert "Publish time lapsed" in out
assert "Subscribe time lapsed" in out