forked from Yelp/python-gearman
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient_tests.py
450 lines (326 loc) · 21.8 KB
/
client_tests.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
import collections
import random
import unittest
from gearman.client import GearmanClient
from gearman.client_handler import GearmanClientCommandHandler
from gearman.constants import PRIORITY_NONE, PRIORITY_HIGH, PRIORITY_LOW, JOB_UNKNOWN, JOB_PENDING, JOB_CREATED, JOB_FAILED, JOB_COMPLETE
from gearman.errors import ExceededConnectionAttempts, ServerUnavailable, InvalidClientState
from gearman.protocol import submit_cmd_for_background_priority, GEARMAN_COMMAND_STATUS_RES, GEARMAN_COMMAND_GET_STATUS, GEARMAN_COMMAND_JOB_CREATED, \
GEARMAN_COMMAND_WORK_STATUS, GEARMAN_COMMAND_WORK_FAIL, GEARMAN_COMMAND_WORK_COMPLETE, GEARMAN_COMMAND_WORK_DATA, GEARMAN_COMMAND_WORK_WARNING
from tests._core_testing import _GearmanAbstractTest, MockGearmanConnectionManager, MockGearmanConnection
class MockGearmanClient(GearmanClient, MockGearmanConnectionManager):
pass
class ClientTest(_GearmanAbstractTest):
"""Test the public client interface"""
connection_manager_class = MockGearmanClient
command_handler_class = GearmanClientCommandHandler
def setUp(self):
super(ClientTest, self).setUp()
self.original_handle_connection_activity = self.connection_manager.handle_connection_activity
def tearDown(self):
super(ClientTest, self).tearDown()
self.connection_manager.handle_connection_activity = self.original_handle_connection_activity
def generate_job_request(self, submitted=True, accepted=True):
current_request = super(ClientTest, self).generate_job_request()
if submitted or accepted:
self.connection_manager.establish_request_connection(current_request)
self.command_handler.send_job_request(current_request)
if submitted and accepted:
self.command_handler.recv_command(GEARMAN_COMMAND_JOB_CREATED, job_handle=current_request.job.handle)
self.assert_(current_request.job.handle in self.command_handler.handle_to_request_map)
return current_request
def test_establish_request_connection_complex(self):
# Spin up a bunch of imaginary gearman connections
failed_connection = MockGearmanConnection()
failed_connection._fail_on_bind = True
failed_then_retried_connection = MockGearmanConnection()
failed_then_retried_connection._fail_on_bind = True
good_connection = MockGearmanConnection()
good_connection.connect()
# Register all our connections
self.connection_manager.connection_list = [failed_connection, failed_then_retried_connection, good_connection]
# When we first create our request, our client shouldn't know anything about it
current_request = self.generate_job_request(submitted=False, accepted=False)
self.failIf(current_request in self.connection_manager.request_to_rotating_connection_queue)
# Make sure that when we start up, we get our good connection
chosen_connection = self.connection_manager.establish_request_connection(current_request)
self.assertEqual(chosen_connection, good_connection)
self.assertFalse(failed_connection.connected)
self.assertFalse(failed_then_retried_connection.connected)
self.assertTrue(good_connection.connected)
# No state changed so we should still go to the correct connection
chosen_connection = self.connection_manager.establish_request_connection(current_request)
self.assertEqual(chosen_connection, good_connection)
# Pretend like our good connection died so we'll need to choose somethign else
good_connection._reset_connection()
good_connection._fail_on_bind = True
failed_then_retried_connection._fail_on_bind = False
failed_then_retried_connection.connect()
# Make sure we rotate good_connection and failed_connection out
chosen_connection = self.connection_manager.establish_request_connection(current_request)
self.assertEqual(chosen_connection, failed_then_retried_connection)
self.assertFalse(failed_connection.connected)
self.assertTrue(failed_then_retried_connection.connected)
self.assertFalse(good_connection.connected)
def test_establish_request_connection_dead(self):
self.connection_manager.connection_list = []
self.connection_manager.command_handlers = {}
current_request = self.generate_job_request(submitted=False, accepted=False)
# No connections == death
self.assertRaises(ServerUnavailable, self.connection_manager.establish_request_connection, current_request)
# Spin up a bunch of imaginary gearman connections
failed_connection = MockGearmanConnection()
failed_connection._fail_on_bind = True
self.connection_manager.connection_list.append(failed_connection)
# All failed connections == death
self.assertRaises(ServerUnavailable, self.connection_manager.establish_request_connection, current_request)
def test_auto_retry_behavior(self):
current_request = self.generate_job_request(submitted=False, accepted=False)
def fail_then_create_jobs(rx_conns, wr_conns, ex_conns):
if self.connection_manager.current_failures < self.connection_manager.expected_failures:
self.connection_manager.current_failures += 1
# We're going to down this connection and reset state
self.assertTrue(self.connection.connected)
self.connection_manager.handle_error(self.connection)
self.assertFalse(self.connection.connected)
# We're then going to IMMEDIATELY pull this connection back up
# So we don't bail out of the "self.connection_manager.poll_connections_until_stopped" loop
self.connection_manager.establish_connection(self.connection)
else:
self.assertEquals(current_request.state, JOB_PENDING)
self.command_handler.recv_command(GEARMAN_COMMAND_JOB_CREATED, job_handle=current_request.job.handle)
return rx_conns, wr_conns, ex_conns
self.connection_manager.handle_connection_activity = fail_then_create_jobs
self.connection_manager.expected_failures = 5
# Now that we've setup our rety behavior, we need to reset the entire state of our experiment
# First pass should succeed as we JUST touch our max attempts
self.connection_manager.current_failures = current_request.connection_attempts = 0
current_request.max_connection_attempts = self.connection_manager.expected_failures + 1
current_request.state = JOB_UNKNOWN
accepted_jobs = self.connection_manager.wait_until_jobs_accepted([current_request])
self.assertEquals(current_request.state, JOB_CREATED)
self.assertEquals(current_request.connection_attempts, current_request.max_connection_attempts)
# Second pass should fail as we JUST exceed our max attempts
self.connection_manager.current_failures = current_request.connection_attempts = 0
current_request.max_connection_attempts = self.connection_manager.expected_failures
current_request.state = JOB_UNKNOWN
self.assertRaises(ExceededConnectionAttempts, self.connection_manager.wait_until_jobs_accepted, [current_request])
self.assertEquals(current_request.state, JOB_UNKNOWN)
self.assertEquals(current_request.connection_attempts, current_request.max_connection_attempts)
def test_multiple_fg_job_submission(self):
submitted_job_count = 5
expected_job_list = [self.generate_job() for _ in xrange(submitted_job_count)]
def mark_jobs_created(rx_conns, wr_conns, ex_conns):
for current_job in expected_job_list:
self.command_handler.recv_command(GEARMAN_COMMAND_JOB_CREATED, job_handle=current_job.handle)
return rx_conns, wr_conns, ex_conns
self.connection_manager.handle_connection_activity = mark_jobs_created
job_dictionaries = [current_job.to_dict() for current_job in expected_job_list]
# Test multiple job submission
job_requests = self.connection_manager.submit_multiple_jobs(job_dictionaries, wait_until_complete=False)
for current_request, expected_job in zip(job_requests, expected_job_list):
current_job = current_request.job
self.assert_jobs_equal(current_job, expected_job)
self.assertEqual(current_request.priority, PRIORITY_NONE)
self.assertEqual(current_request.background, False)
self.assertEqual(current_request.state, JOB_CREATED)
self.assertFalse(current_request.complete)
def test_single_bg_job_submission(self):
expected_job = self.generate_job()
def mark_job_created(rx_conns, wr_conns, ex_conns):
self.command_handler.recv_command(GEARMAN_COMMAND_JOB_CREATED, job_handle=expected_job.handle)
return rx_conns, wr_conns, ex_conns
self.connection_manager.handle_connection_activity = mark_job_created
job_request = self.connection_manager.submit_job(expected_job.task, expected_job.data, unique=expected_job.unique, background=True, priority=PRIORITY_LOW, wait_until_complete=False)
current_job = job_request.job
self.assert_jobs_equal(current_job, expected_job)
self.assertEqual(job_request.priority, PRIORITY_LOW)
self.assertEqual(job_request.background, True)
self.assertEqual(job_request.state, JOB_CREATED)
self.assertTrue(job_request.complete)
def test_single_fg_job_submission_timeout(self):
expected_job = self.generate_job()
def job_failed_submission(rx_conns, wr_conns, ex_conns):
return rx_conns, wr_conns, ex_conns
self.connection_manager.handle_connection_activity = job_failed_submission
job_request = self.connection_manager.submit_job(expected_job.task, expected_job.data, unique=expected_job.unique, priority=PRIORITY_HIGH, poll_timeout=0.01)
self.assertEqual(job_request.priority, PRIORITY_HIGH)
self.assertEqual(job_request.background, False)
self.assertEqual(job_request.state, JOB_PENDING)
self.assertFalse(job_request.complete)
self.assertTrue(job_request.timed_out)
def test_wait_for_multiple_jobs_to_complete_or_timeout(self):
completed_request = self.generate_job_request()
failed_request = self.generate_job_request()
timeout_request = self.generate_job_request()
self.update_requests = True
def multiple_job_updates(rx_conns, wr_conns, ex_conns):
# Only give a single status update and have the 3rd job handle timeout
if self.update_requests:
self.command_handler.recv_command(GEARMAN_COMMAND_WORK_COMPLETE, job_handle=completed_request.job.handle, data='12345')
self.command_handler.recv_command(GEARMAN_COMMAND_WORK_FAIL, job_handle=failed_request.job.handle)
self.update_requests = False
return rx_conns, wr_conns, ex_conns
self.connection_manager.handle_connection_activity = multiple_job_updates
finished_requests = self.connection_manager.wait_until_jobs_completed([completed_request, failed_request, timeout_request], poll_timeout=0.01)
del self.update_requests
finished_completed_request, finished_failed_request, finished_timeout_request = finished_requests
self.assert_jobs_equal(finished_completed_request.job, completed_request.job)
self.assertEqual(finished_completed_request.state, JOB_COMPLETE)
self.assertEqual(finished_completed_request.result, '12345')
self.assertFalse(finished_completed_request.timed_out)
self.assert_(finished_completed_request.job.handle not in self.command_handler.handle_to_request_map)
self.assert_jobs_equal(finished_failed_request.job, failed_request.job)
self.assertEqual(finished_failed_request.state, JOB_FAILED)
self.assertEqual(finished_failed_request.result, None)
self.assertFalse(finished_failed_request.timed_out)
self.assert_(finished_failed_request.job.handle not in self.command_handler.handle_to_request_map)
self.assertEqual(finished_timeout_request.state, JOB_CREATED)
self.assertEqual(finished_timeout_request.result, None)
self.assertTrue(finished_timeout_request.timed_out)
self.assert_(finished_timeout_request.job.handle in self.command_handler.handle_to_request_map)
def test_get_job_status(self):
single_request = self.generate_job_request()
def retrieve_status(rx_conns, wr_conns, ex_conns):
self.command_handler.recv_command(GEARMAN_COMMAND_STATUS_RES, job_handle=single_request.job.handle, known='1', running='0', numerator='0', denominator='1')
return rx_conns, wr_conns, ex_conns
self.connection_manager.handle_connection_activity = retrieve_status
job_request = self.connection_manager.get_job_status(single_request)
request_status = job_request.status
self.failUnless(request_status)
self.assertTrue(request_status['known'])
self.assertFalse(request_status['running'])
self.assertEqual(request_status['numerator'], 0)
self.assertEqual(request_status['denominator'], 1)
self.assertFalse(job_request.timed_out)
def test_get_job_status_unknown(self):
single_request = self.generate_job_request()
current_handle = single_request.job.handle
self.command_handler.recv_command(GEARMAN_COMMAND_WORK_FAIL, job_handle=current_handle)
def retrieve_status(rx_conns, wr_conns, ex_conns):
self.command_handler.recv_command(GEARMAN_COMMAND_STATUS_RES, job_handle=current_handle, known='0', running='0', numerator='0', denominator='1')
return rx_conns, wr_conns, ex_conns
self.connection_manager.handle_connection_activity = retrieve_status
job_request = self.connection_manager.get_job_status(single_request)
request_status = job_request.status
self.failUnless(request_status)
self.assertFalse(request_status['known'])
self.assertFalse(request_status['running'])
self.assertEqual(request_status['numerator'], 0)
self.assertEqual(request_status['denominator'], 1)
self.assertFalse(job_request.timed_out)
self.assert_(current_handle not in self.command_handler.handle_to_request_map)
def test_get_job_status_timeout(self):
single_request = self.generate_job_request()
def retrieve_status_timeout(rx_conns, wr_conns, ex_conns):
pass
self.connection_manager.handle_connection_activity = retrieve_status_timeout
job_request = self.connection_manager.get_job_status(single_request, poll_timeout=0.01)
self.assertTrue(job_request.timed_out)
class ClientCommandHandlerInterfaceTest(_GearmanAbstractTest):
"""Test the public interface a GearmanClient may need to call in order to update state on a GearmanClientCommandHandler"""
connection_manager_class = MockGearmanClient
command_handler_class = GearmanClientCommandHandler
def test_send_job_request(self):
current_request = self.generate_job_request()
gearman_job = current_request.job
for priority in (PRIORITY_NONE, PRIORITY_HIGH, PRIORITY_LOW):
for background in (False, True):
current_request.reset()
current_request.priority = priority
current_request.background = background
self.command_handler.send_job_request(current_request)
queued_request = self.command_handler.requests_awaiting_handles.popleft()
self.assertEqual(queued_request, current_request)
expected_cmd_type = submit_cmd_for_background_priority(background, priority)
self.assert_sent_command(expected_cmd_type, task=gearman_job.task, data=gearman_job.data, unique=gearman_job.unique)
def test_get_status_of_job(self):
current_request = self.generate_job_request()
self.command_handler.send_get_status_of_job(current_request)
self.assert_sent_command(GEARMAN_COMMAND_GET_STATUS, job_handle=current_request.job.handle)
class ClientCommandHandlerStateMachineTest(_GearmanAbstractTest):
"""Test single state transitions within a GearmanWorkerCommandHandler"""
connection_manager_class = MockGearmanClient
command_handler_class = GearmanClientCommandHandler
def generate_job_request(self, submitted=True, accepted=True):
current_request = super(ClientCommandHandlerStateMachineTest, self).generate_job_request()
if submitted or accepted:
self.command_handler.requests_awaiting_handles.append(current_request)
current_request.state = JOB_PENDING
if submitted and accepted:
self.command_handler.recv_command(GEARMAN_COMMAND_JOB_CREATED, job_handle=current_request.job.handle)
return current_request
def test_received_job_created(self):
current_request = self.generate_job_request(accepted=False)
new_handle = str(random.random())
self.command_handler.recv_command(GEARMAN_COMMAND_JOB_CREATED, job_handle=new_handle)
self.assertEqual(current_request.job.handle, new_handle)
self.assertEqual(current_request.state, JOB_CREATED)
self.assertEqual(self.command_handler.handle_to_request_map[new_handle], current_request)
def test_received_job_created_out_of_order(self):
self.assertEqual(self.command_handler.requests_awaiting_handles, collections.deque())
# Make sure we bail cuz we have an empty queue
self.assertRaises(InvalidClientState, self.command_handler.recv_command, GEARMAN_COMMAND_JOB_CREATED, job_handle=None)
def test_required_state_pending(self):
current_request = self.generate_job_request(submitted=False, accepted=False)
new_handle = str(random.random())
invalid_states = [JOB_UNKNOWN, JOB_CREATED, JOB_COMPLETE, JOB_FAILED]
for bad_state in invalid_states:
current_request.state = bad_state
# We only want to check the state of request... not die if we don't have any pending requests
self.command_handler.requests_awaiting_handles.append(current_request)
self.assertRaises(InvalidClientState, self.command_handler.recv_command, GEARMAN_COMMAND_JOB_CREATED, job_handle=new_handle)
def test_required_state_queued(self):
current_request = self.generate_job_request()
job_handle = current_request.job.handle
new_data = str(random.random())
invalid_states = [JOB_UNKNOWN, JOB_PENDING, JOB_COMPLETE, JOB_FAILED]
for bad_state in invalid_states:
current_request.state = bad_state
# All these commands expect to be in JOB_CREATED
self.assertRaises(InvalidClientState, self.command_handler.recv_command, GEARMAN_COMMAND_WORK_DATA, job_handle=job_handle, data=new_data)
self.assertRaises(InvalidClientState, self.command_handler.recv_command, GEARMAN_COMMAND_WORK_WARNING, job_handle=job_handle, data=new_data)
self.assertRaises(InvalidClientState, self.command_handler.recv_command, GEARMAN_COMMAND_WORK_STATUS, job_handle=job_handle, numerator=0, denominator=1)
self.assertRaises(InvalidClientState, self.command_handler.recv_command, GEARMAN_COMMAND_WORK_COMPLETE, job_handle=job_handle, data=new_data)
self.assertRaises(InvalidClientState, self.command_handler.recv_command, GEARMAN_COMMAND_WORK_FAIL, job_handle=job_handle)
def test_in_flight_work_updates(self):
current_request = self.generate_job_request()
job_handle = current_request.job.handle
new_data = str(random.random())
# Test WORK_DATA
self.command_handler.recv_command(GEARMAN_COMMAND_WORK_DATA, job_handle=job_handle, data=new_data)
self.assertEqual(current_request.data_updates.popleft(), new_data)
self.assertEqual(current_request.state, JOB_CREATED)
# Test WORK_WARNING
self.command_handler.recv_command(GEARMAN_COMMAND_WORK_WARNING, job_handle=job_handle, data=new_data)
self.assertEqual(current_request.warning_updates.popleft(), new_data)
self.assertEqual(current_request.state, JOB_CREATED)
# Test WORK_STATUS
self.command_handler.recv_command(GEARMAN_COMMAND_WORK_STATUS, job_handle=job_handle, numerator=0, denominator=1)
self.assertEqual(current_request.status_updates.popleft(), (0, 1))
self.assertEqual(current_request.state, JOB_CREATED)
def test_work_complete(self):
current_request = self.generate_job_request()
job_handle = current_request.job.handle
new_data = str(random.random())
self.command_handler.recv_command(GEARMAN_COMMAND_WORK_COMPLETE, job_handle=job_handle, data=new_data)
self.assertEqual(current_request.result, new_data)
self.assertEqual(current_request.state, JOB_COMPLETE)
def test_work_fail(self):
current_request = self.generate_job_request()
job_handle = current_request.job.handle
new_data = str(random.random())
self.command_handler.recv_command(GEARMAN_COMMAND_WORK_FAIL, job_handle=job_handle)
self.assertEqual(current_request.state, JOB_FAILED)
def test_status_request(self):
current_request = self.generate_job_request()
job_handle = current_request.job.handle
self.assertEqual(current_request.status, {})
self.command_handler.recv_command(GEARMAN_COMMAND_STATUS_RES, job_handle=job_handle, known='1', running='1', numerator='0', denominator='1')
self.assertEqual(current_request.status['handle'], job_handle)
self.assertTrue(current_request.status['known'])
self.assertTrue(current_request.status['running'])
self.assertEqual(current_request.status['numerator'], 0)
self.assertEqual(current_request.status['denominator'], 1)
if __name__ == '__main__':
unittest.main()