-
Notifications
You must be signed in to change notification settings - Fork 0
/
test.py
308 lines (249 loc) · 10.7 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
import unittest
from tornado.gen import sleep
from tornado.testing import (AsyncHTTPTestCase, gen_test, main as testing_main)
from tornado.options import options
import lingerclient
from linger import linger
options.logging = None
class TestMethods(AsyncHTTPTestCase):
def get_app(self):
application, self.settings = linger.make_app()
return application
def setUp(self):
super().setUp()
self._content_type = 'text/plain'
self.client = lingerclient.AsyncLingerClient(
self.get_url(''), content_type=self._content_type,
io_loop=self.io_loop)
self.kwargs = { # default add_message kwargs
'channel': 'test',
'body': 'test msg',
'priority': 0,
'timeout': 30,
'deliver': 0,
'linger': 0,
'topic': ''
}
def tearDown(self):
cb = self.settings.get('shutdown_callback')
if cb is not None:
cb()
super().tearDown()
def check_msg(self, msg, orig):
"""Check that msg match the original"""
self.assertIsNotNone(msg)
self.assertEqual(msg['channel'], orig['channel'])
self.assertEqual(msg['body'], orig['body'])
self.assertEqual(msg['mimetype'], self._content_type)
self.assertEqual(msg['topic'], orig['topic'])
self.assertEqual(msg['timeout'], orig['timeout'])
self.assertEqual(msg['priority'], orig['priority'])
self.assertEqual(msg['linger'], orig['linger'])
self.assertEqual(msg['deliver'], orig['deliver'])
self.assertEqual(msg['delivered'], 1)
@gen_test
def test_add_get(self):
"""Add msg, get it again"""
self.client.post(**self.kwargs)
future = self.client.get(self.kwargs['channel'])
msg = yield future
self.check_msg(msg, self.kwargs)
@gen_test
def test_add_get_nowait(self):
"""Add msg, get it again without waiting"""
self.client.post(**self.kwargs)
future = self.client.get(self.kwargs['channel'], nowait=True)
msg = yield future
self.check_msg(msg, self.kwargs)
@gen_test
def test_get_wait_add(self):
"""Ask for msg, wait, add msg, get it again"""
future = self.client.get(self.kwargs['channel'])
self.client.post(**self.kwargs)
msg = yield future
self.check_msg(msg, self.kwargs)
@gen_test
def test_drain_channel(self):
"""Add msgs to channel, drain, check channel is empty"""
count = 3
for i in range(count):
yield self.client.post(**self.kwargs)
chan_name = self.kwargs['channel']
channels = yield self.client.channels()
# test: exists only this one channel
self.assertEqual(channels, [chan_name])
# test: all and only the messages added are ready, none are hidden
stats = yield self.client.channel_stats(chan_name)
self.assertEqual(stats, {'ready': count, 'hidden': 0})
# test: no messages left in channel
self.assertTrue(self.client.drain(chan_name))
stats = yield self.client.channel_stats(chan_name)
self.assertEqual(sum(stats.values()), 0)
@gen_test(timeout=10)
def test_subscribe_unsubscribe(self):
"""Subscribe and unsubscribe topic"""
self.kwargs['topic'] = 'some-topic'
kwargs = {k: self.kwargs[k] for k in (
'channel', 'topic', 'priority', 'timeout', 'deliver', 'linger')}
yield self.client.subscribe(**kwargs)
chan_name = kwargs['channel']
topic = kwargs['topic']
channels = yield self.client.channels()
# test: exists only this one channel
self.assertEqual(channels, [chan_name])
# test: exists only this one topic (globally)
topics = yield self.client.topics()
self.assertEqual(topics, [topic])
# test: exists only this one topic for this channel
topics = yield self.client.subscriptions(chan_name)
self.assertEqual(topics, [topic])
# subscribe first channel to another topic
all_topics = [topic]
kwargs['topic'] = 'another-topic' # 2nd topic
all_topics.append(kwargs['topic'])
all_topics.sort()
yield self.client.subscribe(**kwargs)
# test: exists only these two topics (globally)
topics = yield self.client.topics()
self.assertEqual(topics, all_topics)
# test: exists only these two topics for this channel
topics = yield self.client.subscriptions(chan_name)
self.assertEqual(topics, all_topics)
# subscribe another channel to the second topic
all_channels = [chan_name]
kwargs['channel'] = 'another-test' # 2nd channel
all_channels.append(kwargs['channel'])
all_channels.sort()
yield self.client.subscribe(**kwargs)
# test: exists only these two topics (globally)
topics = yield self.client.topics()
self.assertEqual(topics, all_topics)
# test: exists only these two topics for the first channel
topics = yield self.client.subscriptions(chan_name)
self.assertEqual(topics, all_topics)
# test: exists only the second topic for the second channel
topics = yield self.client.subscriptions(kwargs['channel'])
self.assertEqual(topics, [kwargs['topic']])
# test: the first topic has only the first channel as subscriber
channels = yield self.client.subscribers(topic)
self.assertEqual(channels, [chan_name])
# test: the second topic has both channels as subscribers
channels = yield self.client.subscribers(kwargs['topic'])
self.assertEqual(channels, all_channels)
# unsucsbribe the second channel from the second topic
yield self.client.unsubscribe(kwargs['channel'], kwargs['topic'])
# test: the second topic has only the first channel as subscriber
channels = yield self.client.subscribers(kwargs['topic'])
self.assertEqual(channels, [chan_name])
# unsucsbribe the first channel from the first topic
yield self.client.unsubscribe(chan_name, topic)
# test: exists only the first channel
channels = yield self.client.channels()
self.assertEqual(channels, [chan_name])
# test: exists only the second topic (globally)
topics = yield self.client.topics()
self.assertEqual(topics, [kwargs['topic']])
# unsucsbribe the first channel from the second topic
yield self.client.unsubscribe(chan_name, kwargs['topic'])
# test: exists no topics (globally)
topics = yield self.client.topics()
self.assertTrue(len(topics) == 0)
# test: are no more channels
channels = yield self.client.channels()
self.assertTrue(len(channels) == 0)
@gen_test
def test_publish_subscribe(self):
self.kwargs['topic'] = 'some-topic'
sub_kwargs = {k: self.kwargs[k] for k in (
'channel', 'topic', 'priority', 'timeout', 'deliver', 'linger')}
yield self.client.subscribe(**sub_kwargs)
pub_kwargs = {k: self.kwargs[k] for k in ('topic', 'body')}
yield self.client.publish(**pub_kwargs)
msg = yield self.client.get(self.kwargs['channel'], nowait=True)
self.check_msg(msg, self.kwargs)
@gen_test
def test_timeout(self):
"""Timeout test"""
self.kwargs['timeout'] = 1 # hide for only 1 sec
self.client.post(**self.kwargs)
future = self.client.get(self.kwargs['channel'], nowait=True)
msg = yield future
# test: no message ready
future = self.client.get(self.kwargs['channel'], nowait=True)
msg2 = yield future
self.assertIsNone(msg2)
# test: message is ready again
yield sleep(1.2)
future = self.client.get(self.kwargs['channel'], nowait=True)
msg3 = yield future
self.assertEqual(msg['delivered'], 1)
self.assertEqual(msg3['delivered'], 2)
for m in (msg, msg3):
del m['delivered']
del m['received']
self.assertEqual(msg, msg3)
@gen_test
def test_linger(self):
"""Linger test"""
self.kwargs['linger'] = 1 # purge after 1 sec
yield self.client.post(**self.kwargs)
# test: message has been purged
yield sleep(1.5)
msg = yield self.client.get(self.kwargs['channel'], nowait=True)
self.assertIsNone(msg)
@gen_test
def test_deliver(self):
"""Deliver test"""
self.kwargs['deliver'] = 1 # deliver only once
self.kwargs['timeout'] = 1 # hide for only 1 sec
yield self.client.post(**self.kwargs)
msg = yield self.client.get(self.kwargs['channel'], nowait=True)
self.check_msg(msg, self.kwargs)
# test: message has been purged
yield sleep(1.5)
msg2 = yield self.client.get(self.kwargs['channel'], nowait=True)
self.assertIsNone(msg2)
@gen_test
def test_priority(self):
"""Priority test"""
# add message with priority 0
self.kwargs['priority'] = 0
self.kwargs['body'] = '1'
yield self.client.post(**self.kwargs)
# add message with priority 1
self.kwargs['priority'] = 1
self.kwargs['body'] = '2'
yield self.client.post(**self.kwargs)
# add message with priority -1
self.kwargs['priority'] = -1
self.kwargs['body'] = '0'
yield self.client.post(**self.kwargs)
for i in range(3):
msg = yield self.client.get(
self.kwargs['channel'], nowait=True)
self.assertEqual(msg['priority'], i-1)
self.assertEqual(int(msg['body']), i)
@gen_test(timeout=10)
def test_touch(self):
"""Timeout-Touch test"""
self.kwargs['timeout'] = 1 # hide for only 1 sec
self.client.post(**self.kwargs)
msg = yield self.client.get(self.kwargs['channel'], nowait=True)
self.check_msg(msg, self.kwargs)
# touch msg 3 times, before it is shown again
for _ in range(3):
yield sleep(0.9)
r = yield self.client.touch(msg['id'])
self.assertTrue(r)
# test that the message is not delivered (because it is hidden)
msg2 = yield self.client.get(self.kwargs['channel'], nowait=True)
self.assertIsNone(msg2)
yield sleep(1.2)
# test that msg is timed out and re-delivered
msg3 = yield self.client.get(self.kwargs['channel'], nowait=True)
self.assertIsNotNone(msg3)
self.assertEqual(msg['id'], msg3['id'])
def all():
return unittest.defaultTestLoader.loadTestsFromTestCase(TestMethods)
if __name__ == '__main__':
testing_main()