|
14 | 14 | # See the License for the specific language governing permissions and |
15 | 15 | # limitations under the License. |
16 | 16 |
|
17 | | -import mock |
18 | 17 | import os |
19 | 18 | import pytest |
20 | | -import time |
21 | 19 |
|
22 | 20 | from google.api_core.exceptions import AlreadyExists |
23 | 21 | from google.cloud import pubsub_v1 |
|
29 | 27 | TOPIC = 'quickstart-sub-test-topic' |
30 | 28 | SUBSCRIPTION = 'quickstart-sub-test-topic-sub' |
31 | 29 |
|
32 | | - |
33 | | -@pytest.fixture(scope='module') |
34 | | -def publisher_client(): |
35 | | - yield pubsub_v1.PublisherClient() |
| 30 | +publisher_client = pubsub_v1.PublisherClient() |
| 31 | +subscriber_client = pubsub_v1.SubscriberClient() |
36 | 32 |
|
37 | 33 |
|
38 | 34 | @pytest.fixture(scope='module') |
39 | | -def topic_path(publisher_client): |
| 35 | +def topic_path(): |
40 | 36 | topic_path = publisher_client.topic_path(PROJECT, TOPIC) |
41 | 37 |
|
42 | 38 | try: |
43 | | - publisher_client.create_topic(topic_path) |
| 39 | + topic = publisher_client.create_topic(topic_path) |
| 40 | + return topic.name |
44 | 41 | except AlreadyExists: |
45 | | - pass |
46 | | - |
47 | | - yield topic_path |
48 | | - |
49 | | - |
50 | | -@pytest.fixture(scope='module') |
51 | | -def subscriber_client(): |
52 | | - yield pubsub_v1.SubscriberClient() |
| 42 | + return topic_path |
53 | 43 |
|
54 | 44 |
|
55 | 45 | @pytest.fixture(scope='module') |
56 | | -def subscription(subscriber_client, topic_path): |
| 46 | +def subscription_path(topic_path): |
57 | 47 | subscription_path = subscriber_client.subscription_path( |
58 | 48 | PROJECT, SUBSCRIPTION) |
59 | 49 |
|
60 | 50 | try: |
61 | | - subscriber_client.create_subscription(subscription_path, topic_path) |
| 51 | + subscription = subscriber_client.create_subscription( |
| 52 | + subscription_path, topic_path) |
| 53 | + return subscription.name |
62 | 54 | except AlreadyExists: |
63 | | - pass |
64 | | - |
65 | | - yield SUBSCRIPTION |
| 55 | + return subscription_path |
66 | 56 |
|
67 | 57 |
|
68 | | -@pytest.fixture |
69 | | -def to_delete(publisher_client, subscriber_client): |
70 | | - doomed = [] |
71 | | - yield doomed |
72 | | - for client, item in doomed: |
| 58 | +def _to_delete(resource_paths): |
| 59 | + for item in resource_paths: |
73 | 60 | if 'topics' in item: |
74 | 61 | publisher_client.delete_topic(item) |
75 | 62 | if 'subscriptions' in item: |
76 | 63 | subscriber_client.delete_subscription(item) |
77 | 64 |
|
78 | 65 |
|
79 | | -def _make_sleep_patch(): |
80 | | - real_sleep = time.sleep |
| 66 | +def _publish_messages(topic_path): |
| 67 | + publish_future = publisher_client.publish(topic_path, data=b'Hello World!') |
| 68 | + publish_future.result() |
| 69 | + |
81 | 70 |
|
82 | | - def new_sleep(period): |
83 | | - if period == 60: |
84 | | - real_sleep(10) |
85 | | - raise RuntimeError('sigil') |
86 | | - else: |
87 | | - real_sleep(period) |
| 71 | +def _sub_timeout(project_id, subscription_name): |
| 72 | + # This is an exactly copy of `sub.py` except |
| 73 | + # StreamingPullFuture.result() will time out after 10s. |
| 74 | + client = pubsub_v1.SubscriberClient() |
| 75 | + subscription_path = client.subscription_path( |
| 76 | + project_id, subscription_name) |
88 | 77 |
|
89 | | - return mock.patch('time.sleep', new=new_sleep) |
| 78 | + def callback(message): |
| 79 | + print('Received message {} of message ID {}\n'.format( |
| 80 | + message, message.message_id)) |
| 81 | + message.ack() |
| 82 | + print('Acknowledged message {}\n'.format(message.message_id)) |
90 | 83 |
|
| 84 | + streaming_pull_future = client.subscribe( |
| 85 | + subscription_path, callback=callback) |
| 86 | + print('Listening for messages on {}..\n'.format(subscription_path)) |
| 87 | + |
| 88 | + try: |
| 89 | + streaming_pull_future.result(timeout=10) |
| 90 | + except: # noqa |
| 91 | + streaming_pull_future.cancel() |
91 | 92 |
|
92 | | -def test_sub(publisher_client, |
93 | | - topic_path, |
94 | | - subscriber_client, |
95 | | - subscription, |
96 | | - to_delete, |
97 | | - capsys): |
98 | 93 |
|
99 | | - publisher_client.publish(topic_path, data=b'Hello, World!') |
| 94 | +def test_sub(monkeypatch, topic_path, subscription_path, capsys): |
| 95 | + monkeypatch.setattr(sub, 'sub', _sub_timeout) |
100 | 96 |
|
101 | | - to_delete.append((publisher_client, topic_path)) |
| 97 | + _publish_messages(topic_path) |
102 | 98 |
|
103 | | - with _make_sleep_patch(): |
104 | | - with pytest.raises(RuntimeError, match='sigil'): |
105 | | - sub.sub(PROJECT, subscription) |
| 99 | + sub.sub(PROJECT, SUBSCRIPTION) |
106 | 100 |
|
107 | | - to_delete.append((subscriber_client, |
108 | | - 'projects/{}/subscriptions/{}'.format(PROJECT, |
109 | | - SUBSCRIPTION))) |
| 101 | + # Clean up resources. |
| 102 | + _to_delete([topic_path, subscription_path]) |
110 | 103 |
|
111 | 104 | out, _ = capsys.readouterr() |
112 | 105 | assert "Received message" in out |
|
0 commit comments