From 49d1a9593ae48a3868f60553ad699ba0c13b9a57 Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 29 Oct 2018 15:16:47 -0700 Subject: [PATCH 1/2] Created new end-to-end sample, moved old sample --- pubsub/cloud-client/publisher.py | 2 + pubsub/cloud-client/quickstart.py | 91 ++++++++++++++++++++++---- pubsub/cloud-client/quickstart_test.py | 58 +++++++++++----- 3 files changed, 123 insertions(+), 28 deletions(-) diff --git a/pubsub/cloud-client/publisher.py b/pubsub/cloud-client/publisher.py index b7e574e04978..fcb0d9b0f2e3 100644 --- a/pubsub/cloud-client/publisher.py +++ b/pubsub/cloud-client/publisher.py @@ -41,6 +41,7 @@ def list_topics(project_id): def create_topic(project_id, topic_name): """Create a new Pub/Sub topic.""" + # [START pubsub_quickstart_create_topic] # [START pubsub_create_topic] from google.cloud import pubsub_v1 @@ -53,6 +54,7 @@ def create_topic(project_id, topic_name): topic = publisher.create_topic(topic_path) print('Topic created: {}'.format(topic)) + # [END pubsub_quickstart_create_topic] # [END pubsub_create_topic] diff --git a/pubsub/cloud-client/quickstart.py b/pubsub/cloud-client/quickstart.py index 1ff2efed3a32..0d897981ae5c 100644 --- a/pubsub/cloud-client/quickstart.py +++ b/pubsub/cloud-client/quickstart.py @@ -15,25 +15,94 @@ # limitations under the License. -def run_quickstart(): - # [START pubsub_quickstart_create_topic] - # Imports the Google Cloud client library +import argparse + + +def end_to_end(project_id, topic_name, subscription_name, num_messages): + # [START pubsub_end_to_end] + import time + from google.cloud import pubsub_v1 - # Instantiates a client + # TODO project_id = "Your Google Cloud Project ID" + # TODO topic_name = "Your Pub/Sub topic name" + # TODO num_messages = number of messages to test end-to-end + + # Instantiates a publisher and subscriber client publisher = pubsub_v1.PublisherClient() + subscriber = pubsub_v1.SubscriberClient() + + # The `topic_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/topics/{topic_name}` + topic_path = subscriber.topic_path(project_id, topic_name) - # The resource path for the new topic contains the project ID - # and the topic name. - topic_path = publisher.topic_path( - 'my-project', 'my-new-topic') + # The `subscription_path` method creates a fully qualified identifier + # in the form `projects/{project_id}/subscriptions/{subscription_name}` + subscription_path = subscriber.subscription_path( + project_id, subscription_name) # Create the topic. topic = publisher.create_topic(topic_path) + print('\nTopic created: {}'.format(topic.name)) + + # Create a subscription. + subscription = subscriber.create_subscription( + subscription_path, topic_path) + print('\nSubscription created: {}\n'.format(subscription.name)) + + publish_begin = time.time() + + # Publish messages. + for n in range(num_messages): + data = u'Message number {}'.format(n) + # Data must be a bytestring + data = data.encode('utf-8') + # When you publish a message, the client returns a future. + future = publisher.publish(topic_path, data=data) + print('Published {} of message ID {}.'.format(data, future.result())) + + publish_time = time.time() - publish_begin - print('Topic created: {}'.format(topic)) - # [END pubsub_quickstart_create_topic] + messages = set() + + def callback(message): + print('Received message: {}'.format(message)) + # Unacknowledged messages will be sent again. + message.ack() + messages.add(message) + + subscribe_begin = time.time() + + # Receive messages. The subscriber is nonblocking. + subscriber.subscribe(subscription_path, callback=callback) + + print('\nListening for messages on {}...\n'.format(subscription_path)) + + while True: + if len(messages) == num_messages: + subscribe_time = time.time() - subscribe_begin + print("\nReceived all messages.") + print("Publish time lapsed: {:.2f}s.".format(publish_time)) + print("Subscribe time lapsed: {:.2f}s.".format(subscribe_time)) + break + else: + # Sleeps the thread at 50Hz to save on resources. + time.sleep(1./50) + # [END pubsub_end_to_end] if __name__ == '__main__': - run_quickstart() + + parser = argparse.ArgumentParser( + description=__doc__, + formatter_class=argparse.RawDescriptionHelpFormatter + ) + parser.add_argument('project_id', help='Your Google Cloud project ID') + parser.add_argument('topic_name', help='Your topic name') + parser.add_argument('subscription_name', help='Your subscription name') + parser.add_argument('num_msgs', type=int, help='Number of test messages') + + args = parser.parse_args() + + end_to_end(args.project_id, args.topic_name, args.subscription_name, + args.num_msgs) diff --git a/pubsub/cloud-client/quickstart_test.py b/pubsub/cloud-client/quickstart_test.py index 520213bcf32c..ee6f7d4b21a2 100644 --- a/pubsub/cloud-client/quickstart_test.py +++ b/pubsub/cloud-client/quickstart_test.py @@ -1,4 +1,6 @@ -# Copyright 2016 Google Inc. All Rights Reserved. +#!/usr/bin/env python + +# Copyright 2018 Google Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,33 +17,55 @@ import os from google.cloud import pubsub_v1 -import mock import pytest - import quickstart PROJECT = os.environ['GCLOUD_PROJECT'] -# Must match the dataset listed in quickstart.py -TOPIC_NAME = 'my-new-topic' -TOPIC_PATH = 'projects/{}/topics/{}'.format(PROJECT, TOPIC_NAME) +TOPIC = 'end-to-end-test-topic' +SUBSCRIPTION = 'end-to-end-test-topic-sub' +N = 10 + + +@pytest.fixture(scope='module') +def publisher_client(): + yield pubsub_v1.PublisherClient() -@pytest.fixture -def temporary_topic(): - """Fixture that ensures the test topic does not exist before the test.""" - publisher = pubsub_v1.PublisherClient() +@pytest.fixture(scope='module') +def topic(publisher_client): + topic_path = publisher_client.topic_path(PROJECT, TOPIC) try: - publisher.delete_topic(TOPIC_PATH) + publisher_client.delete_topic(topic_path) except Exception: pass - yield + yield TOPIC -@mock.patch.object( - pubsub_v1.PublisherClient, 'topic_path', return_value=TOPIC_PATH) -def test_quickstart(unused_topic_path, temporary_topic, capsys): - quickstart.run_quickstart() +@pytest.fixture(scope='module') +def subscriber_client(): + yield pubsub_v1.SubscriberClient() + + +@pytest.fixture(scope='module') +def subscription(subscriber_client, topic): + subscription_path = subscriber_client.subscription_path( + PROJECT, SUBSCRIPTION) + + try: + subscriber_client.delete_subscription(subscription_path) + except Exception: + pass + + yield SUBSCRIPTION + + +def test_end_to_end(topic, subscription, capsys): + + quickstart.end_to_end(PROJECT, topic, subscription, N) out, _ = capsys.readouterr() - assert TOPIC_NAME in out + + assert "Received all messages" in out + assert "Publish time lapsed" in out + assert "Subscribe time lapsed" in out From 5d40d91c7bd412b937780847c1646d13e70148be Mon Sep 17 00:00:00 2001 From: Tianzi Cai Date: Mon, 29 Oct 2018 16:49:17 -0700 Subject: [PATCH 2/2] Add space around operator --- pubsub/cloud-client/quickstart.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/cloud-client/quickstart.py b/pubsub/cloud-client/quickstart.py index 0d897981ae5c..f48d085e06b5 100644 --- a/pubsub/cloud-client/quickstart.py +++ b/pubsub/cloud-client/quickstart.py @@ -87,7 +87,7 @@ def callback(message): break else: # Sleeps the thread at 50Hz to save on resources. - time.sleep(1./50) + time.sleep(1. / 50) # [END pubsub_end_to_end]