Skip to content

Commit

Permalink
Add timeout to retry_over_time (#880)
Browse files Browse the repository at this point in the history
add ut
  • Loading branch information
tothegump authored and Omer Katz committed Aug 9, 2018
1 parent 0c740a7 commit 1529235
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 5 deletions.
7 changes: 5 additions & 2 deletions kombu/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ def release(self):

def ensure_connection(self, errback=None, max_retries=None,
interval_start=2, interval_step=2, interval_max=30,
callback=None, reraise_as_library_errors=True):
callback=None, reraise_as_library_errors=True,
timeout=None):
"""Ensure we have a connection to the server.
If not retry establishing the connection with the settings
Expand All @@ -384,6 +385,8 @@ def ensure_connection(self, errback=None, max_retries=None,
each retry.
callback (Callable): Optional callback that is called for every
internal iteration (1 s).
timeout (int): Maximum amount of time in seconds to spend
waiting for connection
"""
def on_error(exc, intervals, retries, interval=0):
round = self.completes_cycle(retries)
Expand All @@ -402,7 +405,7 @@ def on_error(exc, intervals, retries, interval=0):
retry_over_time(self.connect, self.recoverable_connection_errors,
(), {}, on_error, max_retries,
interval_start, interval_step, interval_max,
callback)
callback, timeout=timeout)
return self

@contextmanager
Expand Down
11 changes: 8 additions & 3 deletions kombu/utils/functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from collections import Iterable, Mapping, OrderedDict
from itertools import count, repeat
from time import sleep
from time import sleep, time

from vine.utils import wraps

Expand Down Expand Up @@ -295,7 +295,7 @@ def fxrangemax(start=1.0, stop=None, step=1.0, max=100.0):

def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
max_retries=None, interval_start=2, interval_step=2,
interval_max=30, callback=None):
interval_max=30, callback=None, timeout=None):
"""Retry the function over and over until max retries is exceeded.
For each retry we sleep a for a while before we try again, this interval
Expand All @@ -316,23 +316,28 @@ def retry_over_time(fun, catch, args=[], kwargs={}, errback=None,
which return the time in seconds to sleep next, and ``retries``
is the number of previous retries.
max_retries (int): Maximum number of retries before we give up.
If this is not set, we will retry forever.
If neither of this and timeout is set, we will retry forever.
If one of this and timeout is reached, stop.
interval_start (float): How long (in seconds) we start sleeping
between retries.
interval_step (float): By how much the interval is increased for
each retry.
interval_max (float): Maximum number of seconds to sleep
between retries.
timeout (int): Maximum seconds waiting before we give up.
"""
interval_range = fxrange(interval_start,
interval_max + interval_start,
interval_step, repeatlast=True)
end = time() + timeout if timeout else None
for retries in count():
try:
return fun(*args, **kwargs)
except catch as exc:
if max_retries and retries >= max_retries:
raise
if end and time() > end:
raise
if callback:
callback()
tts = float(errback(exc, interval_range, retries) if errback
Expand Down
17 changes: 17 additions & 0 deletions t/unit/utils/test_functional.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import absolute_import, unicode_literals

import pickle

import pytest

from itertools import count
Expand Down Expand Up @@ -209,6 +210,22 @@ def test_simple(self):
finally:
utils.count = prev_count

def test_retry_timeout(self):

with pytest.raises(self.Predicate):
retry_over_time(
self.myfun, self.Predicate,
errback=self.errback, interval_max=14, timeout=1
)
assert self.index == 1

# no errback
with pytest.raises(self.Predicate):
retry_over_time(
self.myfun, self.Predicate,
errback=None, timeout=1,
)

@mock.sleepdeprived(module=utils)
def test_retry_once(self):
with pytest.raises(self.Predicate):
Expand Down

0 comments on commit 1529235

Please sign in to comment.