diff --git a/nsq/async.py b/nsq/async.py index 613bd7c..1585625 100644 --- a/nsq/async.py +++ b/nsq/async.py @@ -391,6 +391,11 @@ def _on_data(self, data, **kwargs): self.trigger('error', conn=self, error=nsq.Error(data)) def _on_message_requeue(self, message, backoff=True, time_ms=-1, **kwargs): + if backoff: + self.trigger('backoff', conn=self) + else: + self.trigger('continue', conn=self) + self.in_flight -= 1 try: time_ms = self.requeue_delay * message.attempts * 1000 if time_ms < 0 else time_ms @@ -400,12 +405,9 @@ def _on_message_requeue(self, message, backoff=True, time_ms=-1, **kwargs): self.trigger('error', conn=self, error=nsq.SendError( 'failed to send REQ %s @ %d' % (message.id, time_ms), e)) - if backoff: - self.trigger('backoff', conn=self) - else: - self.trigger('continue', conn=self) - def _on_message_finish(self, message, **kwargs): + self.trigger('resume', conn=self) + self.in_flight -= 1 try: self.send(nsq.finish(message.id)) @@ -414,8 +416,6 @@ def _on_message_finish(self, message, **kwargs): self.trigger('error', conn=self, error=nsq.SendError('failed to send FIN %s' % message.id, e)) - self.trigger('resume', conn=self) - def _on_message_touch(self, message, **kwargs): try: self.send(nsq.touch(message.id)) diff --git a/tests/test_backoff.py b/tests/test_backoff.py index f2f8f24..00235c4 100644 --- a/tests/test_backoff.py +++ b/tests/test_backoff.py @@ -84,10 +84,14 @@ def test_backoff_easy(): expected_args = [ 'SUB test test\n', - 'RDY 1\n', 'RDY 5\n', - 'FIN 1234\n', 'REQ 1234 0\n', - 'RDY 0\n', 'RDY 1\n', - 'FIN 1234\n', 'RDY 5\n' + 'RDY 1\n', + 'RDY 5\n', + 'FIN 1234\n', + 'RDY 0\n', + 'REQ 1234 0\n', + 'RDY 1\n', + 'RDY 5\n', + 'FIN 1234\n' ] assert conn.stream.write.call_args_list == [((arg,),) for arg in expected_args] @@ -124,15 +128,21 @@ def test_backoff_out_of_order(): expected_args = [ 'SUB test test\n', - 'RDY 1\n', 'RDY 2\n', - 'FIN 1234\n', 'REQ 1234 0\n', - 'RDY 0\n', 'FIN 1234\n', 'RDY 2\n' + 'RDY 1\n', + 'RDY 2\n', + 'FIN 1234\n', + 'RDY 0\n', + 'REQ 1234 0\n', + 'FIN 1234\n', + 'RDY 2\n', ] assert conn1.stream.write.call_args_list == [((arg,),) for arg in expected_args] expected_args = [ 'SUB test test\n', - 'RDY 1\n', 'RDY 0\n', 'RDY 2\n' + 'RDY 1\n', + 'RDY 0\n', + 'RDY 2\n' ] assert conn2.stream.write.call_args_list == [((arg,),) for arg in expected_args] @@ -189,14 +199,14 @@ def test_backoff_requeue_recovery(): 'RDY 1\n', 'RDY 2\n', 'FIN 1234\n', - 'REQ 1234 0\n', 'RDY 0\n', - 'RDY 1\n', 'REQ 1234 0\n', + 'RDY 1\n', 'RDY 0\n', + 'REQ 1234 0\n', 'RDY 1\n', - 'FIN 1234\n', 'RDY 2\n', + 'FIN 1234\n' ] assert conn.stream.write.call_args_list == [((arg,),) for arg in expected_args] @@ -218,14 +228,14 @@ def test_backoff_hard(): msg.trigger('requeue', message=msg) num_fails += 1 - expected_args.append('REQ 1234 0\n') expected_args.append('RDY 0\n') + expected_args.append('REQ 1234 0\n') else: msg.trigger('finish', message=msg) num_fails -= 1 - expected_args.append('FIN 1234\n') expected_args.append('RDY 0\n') + expected_args.append('FIN 1234\n') assert r.backoff_block is True assert r.backoff_timer.get_interval() > 0 @@ -246,19 +256,19 @@ def test_backoff_hard(): msg = _send_message(conn) msg.trigger('finish', message=msg) + expected_args.append('RDY 0\n') expected_args.append('FIN 1234\n') timeout_args, timeout_kwargs = mock_ioloop.add_timeout.call_args if timeout_args[0] != last_timeout_time: timeout_args[1]() last_timeout_time = timeout_args[0] - expected_args.append('RDY 0\n') expected_args.append('RDY 1\n') msg = _send_message(conn) msg.trigger('finish', message=msg) - expected_args.append('FIN 1234\n') expected_args.append('RDY 5\n') + expected_args.append('FIN 1234\n') assert r.backoff_block is False assert r.backoff_timer.get_interval() == 0 @@ -295,17 +305,17 @@ def test_backoff_many_conns(): total_fails += 1 conn.fails += 1 - conn.expected_args.append('REQ 1234 0\n') for c in conns: c.expected_args.append('RDY 0\n') + conn.expected_args.append('REQ 1234 0\n') else: msg.trigger('finish', message=msg) total_fails -= 1 conn.fails -= 1 - conn.expected_args.append('FIN 1234\n') for c in conns: c.expected_args.append('RDY 0\n') + conn.expected_args.append('FIN 1234\n') assert r.backoff_block is True assert r.backoff_timer.get_interval() > 0 @@ -323,7 +333,7 @@ def test_backoff_many_conns(): fail = False while total_fails: - print "%r: %d fails (%d total_fails)" % (conn, c.fails, total_fails) + print "%r: %d fails (%d total_fails)" % (conn, conn.fails, total_fails) if not conn.fails: # force an idle connection @@ -341,6 +351,13 @@ def test_backoff_many_conns(): total_fails -= 1 conn.fails -= 1 + if total_fails > 0: + for c in conns: + c.expected_args.append('RDY 0\n') + else: + for c in conns: + c.expected_args.append('RDY 1\n') + conn.expected_args.append('FIN 1234\n') timeout_args, timeout_kwargs = mock_ioloop.add_timeout.call_args @@ -349,12 +366,7 @@ def test_backoff_many_conns(): last_timeout_time = timeout_args[0] if total_fails > 0: - for c in conns: - c.expected_args.append('RDY 0\n') conn.expected_args.append('RDY 1\n') - else: - for c in conns: - c.expected_args.append('RDY 1\n') assert r.backoff_block is False assert r.backoff_timer.get_interval() == 0 @@ -410,17 +422,17 @@ def test_backoff_conns_disconnect(): total_fails += 1 conn.fails += 1 - conn.expected_args.append('REQ 1234 0\n') for c in conns: c.expected_args.append('RDY 0\n') + conn.expected_args.append('REQ 1234 0\n') else: msg.trigger('finish', message=msg) total_fails -= 1 conn.fails -= 1 - conn.expected_args.append('FIN 1234\n') for c in conns: c.expected_args.append('RDY 0\n') + conn.expected_args.append('FIN 1234\n') assert r.backoff_block is True assert r.backoff_timer.get_interval() > 0 @@ -438,12 +450,20 @@ def test_backoff_conns_disconnect(): fail = False while total_fails: - print "%r: %d fails (%d total_fails)" % (conn, c.fails, total_fails) + print "%r: %d fails (%d total_fails)" % (conn, conn.fails, total_fails) msg = _send_message(conn) msg.trigger('finish', message=msg) total_fails -= 1 + conn.fails -= 1 + + if total_fails > 0: + for c in conns: + c.expected_args.append('RDY 0\n') + else: + for c in conns: + c.expected_args.append('RDY 1\n') conn.expected_args.append('FIN 1234\n') @@ -453,12 +473,7 @@ def test_backoff_conns_disconnect(): last_timeout_time = timeout_args[0] if total_fails > 0: - for c in conns: - c.expected_args.append('RDY 0\n') conn.expected_args.append('RDY 1\n') - else: - for c in conns: - c.expected_args.append('RDY 1\n') assert r.backoff_block is False assert r.backoff_timer.get_interval() == 0