Skip to content

Commit

Permalink
Fix async_task timeout parameter handling when cluster timeout is set…
Browse files Browse the repository at this point in the history
… to None (the default)

The cluster timeout configuration default value None is documented to mean
tasks never timeout out. Also the documentation states that the timeout
can be overridden for individual tasks.

With the old implementation the timeout parameter given to async_task was
not honored if the cluster timeout was set to None.

Fixes: #335
  • Loading branch information
janneronkko committed Jan 27, 2019
1 parent 2650659 commit 0e2df88
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
10 changes: 6 additions & 4 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def reincarnate(self, process):
else:
self.pool.remove(process)
self.spawn_worker()
if self.timeout and int(process.timer.value) == 0:
if process.timer.value == 0:
# only need to terminate on timeout, otherwise we risk destabilizing the queues
process.terminate()
logger.warn(_("reincarnated worker {} after timeout").format(process.name))
Expand Down Expand Up @@ -210,11 +210,11 @@ def guard(self):
# Check Workers
for p in self.pool:
# Are you alive?
if not p.is_alive() or (self.timeout and p.timer.value == 0):
if not p.is_alive() or p.timer.value == 0:
self.reincarnate(p)
continue
# Decrement timer if work is being done
if self.timeout and p.timer.value > 0:
if p.timer.value > 0:
p.timer.value -= cycle
# Check Monitor
if not self.monitor.is_alive():
Expand Down Expand Up @@ -347,6 +347,8 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
name = current_process().name
logger.info(_('{} ready for work at {}').format(name, current_process().pid))
task_count = 0
if timeout is None:
timeout = -1
# Start reading the task queue
for task in iter(task_queue.get, 'STOP'):
result = None
Expand All @@ -368,7 +370,7 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
# We're still going
if not result:
db.close_old_connections()
timer_value = task['kwargs'].pop('timeout', timeout or 0)
timer_value = task['kwargs'].pop('timeout', timeout)
# signal execution
pre_execute.send(sender="django_q", func=f, task=task)
# execute the payload
Expand Down
2 changes: 2 additions & 0 deletions django_q/tests/test_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ def test_enqueue(broker, admin_user):
@pytest.mark.parametrize('cluster_config_timeout, async_task_kwargs', (
(1, {}),
(10, {'timeout': 1}),
(None, {'timeout': 1}),
))
def test_timeout(broker, cluster_config_timeout, async_task_kwargs):
# set up the Sentinel
Expand All @@ -269,6 +270,7 @@ def test_timeout(broker, cluster_config_timeout, async_task_kwargs):
(5, {}),
(10, {'timeout': 5}),
(1, {'timeout': 5}),
(None, {'timeout': 5}),
))
def test_timeout_task_finishes(broker, cluster_config_timeout, async_task_kwargs):
# set up the Sentinel
Expand Down

0 comments on commit 0e2df88

Please sign in to comment.