Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reader: send RDY before FIN/REQ #98

Merged
merged 1 commit into from
Sep 12, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions nsq/async.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,11 @@ def _on_data(self, data, **kwargs):
self.trigger('error', conn=self, error=nsq.Error(data))

def _on_message_requeue(self, message, backoff=True, time_ms=-1, **kwargs):
if backoff:
self.trigger('backoff', conn=self)
else:
self.trigger('continue', conn=self)

self.in_flight -= 1
try:
time_ms = self.requeue_delay * message.attempts * 1000 if time_ms < 0 else time_ms
Expand All @@ -400,12 +405,9 @@ def _on_message_requeue(self, message, backoff=True, time_ms=-1, **kwargs):
self.trigger('error', conn=self, error=nsq.SendError(
'failed to send REQ %s @ %d' % (message.id, time_ms), e))

if backoff:
self.trigger('backoff', conn=self)
else:
self.trigger('continue', conn=self)

def _on_message_finish(self, message, **kwargs):
self.trigger('resume', conn=self)

self.in_flight -= 1
try:
self.send(nsq.finish(message.id))
Expand All @@ -414,8 +416,6 @@ def _on_message_finish(self, message, **kwargs):
self.trigger('error', conn=self,
error=nsq.SendError('failed to send FIN %s' % message.id, e))

self.trigger('resume', conn=self)

def _on_message_touch(self, message, **kwargs):
try:
self.send(nsq.touch(message.id))
Expand Down
77 changes: 46 additions & 31 deletions tests/test_backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,14 @@ def test_backoff_easy():

expected_args = [
'SUB test test\n',
'RDY 1\n', 'RDY 5\n',
'FIN 1234\n', 'REQ 1234 0\n',
'RDY 0\n', 'RDY 1\n',
'FIN 1234\n', 'RDY 5\n'
'RDY 1\n',
'RDY 5\n',
'FIN 1234\n',
'RDY 0\n',
'REQ 1234 0\n',
'RDY 1\n',
'RDY 5\n',
'FIN 1234\n'
]
assert conn.stream.write.call_args_list == [((arg,),) for arg in expected_args]

Expand Down Expand Up @@ -124,15 +128,21 @@ def test_backoff_out_of_order():

expected_args = [
'SUB test test\n',
'RDY 1\n', 'RDY 2\n',
'FIN 1234\n', 'REQ 1234 0\n',
'RDY 0\n', 'FIN 1234\n', 'RDY 2\n'
'RDY 1\n',
'RDY 2\n',
'FIN 1234\n',
'RDY 0\n',
'REQ 1234 0\n',
'FIN 1234\n',
'RDY 2\n',
]
assert conn1.stream.write.call_args_list == [((arg,),) for arg in expected_args]

expected_args = [
'SUB test test\n',
'RDY 1\n', 'RDY 0\n', 'RDY 2\n'
'RDY 1\n',
'RDY 0\n',
'RDY 2\n'
]
assert conn2.stream.write.call_args_list == [((arg,),) for arg in expected_args]

Expand Down Expand Up @@ -189,14 +199,14 @@ def test_backoff_requeue_recovery():
'RDY 1\n',
'RDY 2\n',
'FIN 1234\n',
'REQ 1234 0\n',
'RDY 0\n',
'RDY 1\n',
'REQ 1234 0\n',
'RDY 1\n',
'RDY 0\n',
'REQ 1234 0\n',
'RDY 1\n',
'FIN 1234\n',
'RDY 2\n',
'FIN 1234\n'
]
assert conn.stream.write.call_args_list == [((arg,),) for arg in expected_args]

Expand All @@ -218,14 +228,14 @@ def test_backoff_hard():
msg.trigger('requeue', message=msg)
num_fails += 1

expected_args.append('REQ 1234 0\n')
expected_args.append('RDY 0\n')
expected_args.append('REQ 1234 0\n')
else:
msg.trigger('finish', message=msg)
num_fails -= 1

expected_args.append('FIN 1234\n')
expected_args.append('RDY 0\n')
expected_args.append('FIN 1234\n')

assert r.backoff_block is True
assert r.backoff_timer.get_interval() > 0
Expand All @@ -246,19 +256,19 @@ def test_backoff_hard():
msg = _send_message(conn)

msg.trigger('finish', message=msg)
expected_args.append('RDY 0\n')
expected_args.append('FIN 1234\n')
timeout_args, timeout_kwargs = mock_ioloop.add_timeout.call_args
if timeout_args[0] != last_timeout_time:
timeout_args[1]()
last_timeout_time = timeout_args[0]
expected_args.append('RDY 0\n')
expected_args.append('RDY 1\n')

msg = _send_message(conn)

msg.trigger('finish', message=msg)
expected_args.append('FIN 1234\n')
expected_args.append('RDY 5\n')
expected_args.append('FIN 1234\n')

assert r.backoff_block is False
assert r.backoff_timer.get_interval() == 0
Expand Down Expand Up @@ -295,17 +305,17 @@ def test_backoff_many_conns():
total_fails += 1
conn.fails += 1

conn.expected_args.append('REQ 1234 0\n')
for c in conns:
c.expected_args.append('RDY 0\n')
conn.expected_args.append('REQ 1234 0\n')
else:
msg.trigger('finish', message=msg)
total_fails -= 1
conn.fails -= 1

conn.expected_args.append('FIN 1234\n')
for c in conns:
c.expected_args.append('RDY 0\n')
conn.expected_args.append('FIN 1234\n')

assert r.backoff_block is True
assert r.backoff_timer.get_interval() > 0
Expand All @@ -323,7 +333,7 @@ def test_backoff_many_conns():
fail = False

while total_fails:
print "%r: %d fails (%d total_fails)" % (conn, c.fails, total_fails)
print "%r: %d fails (%d total_fails)" % (conn, conn.fails, total_fails)

if not conn.fails:
# force an idle connection
Expand All @@ -341,6 +351,13 @@ def test_backoff_many_conns():
total_fails -= 1
conn.fails -= 1

if total_fails > 0:
for c in conns:
c.expected_args.append('RDY 0\n')
else:
for c in conns:
c.expected_args.append('RDY 1\n')

conn.expected_args.append('FIN 1234\n')

timeout_args, timeout_kwargs = mock_ioloop.add_timeout.call_args
Expand All @@ -349,12 +366,7 @@ def test_backoff_many_conns():
last_timeout_time = timeout_args[0]

if total_fails > 0:
for c in conns:
c.expected_args.append('RDY 0\n')
conn.expected_args.append('RDY 1\n')
else:
for c in conns:
c.expected_args.append('RDY 1\n')

assert r.backoff_block is False
assert r.backoff_timer.get_interval() == 0
Expand Down Expand Up @@ -410,17 +422,17 @@ def test_backoff_conns_disconnect():
total_fails += 1
conn.fails += 1

conn.expected_args.append('REQ 1234 0\n')
for c in conns:
c.expected_args.append('RDY 0\n')
conn.expected_args.append('REQ 1234 0\n')
else:
msg.trigger('finish', message=msg)
total_fails -= 1
conn.fails -= 1

conn.expected_args.append('FIN 1234\n')
for c in conns:
c.expected_args.append('RDY 0\n')
conn.expected_args.append('FIN 1234\n')

assert r.backoff_block is True
assert r.backoff_timer.get_interval() > 0
Expand All @@ -438,12 +450,20 @@ def test_backoff_conns_disconnect():
fail = False

while total_fails:
print "%r: %d fails (%d total_fails)" % (conn, c.fails, total_fails)
print "%r: %d fails (%d total_fails)" % (conn, conn.fails, total_fails)

msg = _send_message(conn)

msg.trigger('finish', message=msg)
total_fails -= 1
conn.fails -= 1

if total_fails > 0:
for c in conns:
c.expected_args.append('RDY 0\n')
else:
for c in conns:
c.expected_args.append('RDY 1\n')

conn.expected_args.append('FIN 1234\n')

Expand All @@ -453,12 +473,7 @@ def test_backoff_conns_disconnect():
last_timeout_time = timeout_args[0]

if total_fails > 0:
for c in conns:
c.expected_args.append('RDY 0\n')
conn.expected_args.append('RDY 1\n')
else:
for c in conns:
c.expected_args.append('RDY 1\n')

assert r.backoff_block is False
assert r.backoff_timer.get_interval() == 0
Expand Down