Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preserve database connection when sync=True #393

Merged
merged 1 commit into from
Jan 17, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 18 additions & 6 deletions django_q/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import traceback
# Django
from django import db
from django.conf import settings
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
from multiprocessing import Event, Process, Value, current_process
Expand Down Expand Up @@ -163,7 +162,7 @@ def reincarnate(self, process):
:param process: the process to reincarnate
:type process: Process or None
"""
db.connections.close_all() # Close any old connections
close_old_django_connections()
if process == self.monitor:
self.monitor = self.spawn_monitor()
logger.error(_("reincarnated monitor {} after sudden death").format(process.name))
Expand All @@ -187,7 +186,7 @@ def reincarnate(self, process):
def spawn_cluster(self):
self.pool = []
Stat(self).save()
db.connection.close()
close_old_django_connections()
# spawn worker pool
for __ in range(self.pool_size):
self.spawn_worker()
Expand Down Expand Up @@ -370,7 +369,7 @@ def worker(task_queue, result_queue, timer, timeout=Conf.TIMEOUT):
error_reporter.report()
# We're still going
if not result:
db.close_old_connections()
close_old_django_connections()
timer_value = task.pop('timeout', timeout)
# signal execution
pre_execute.send(sender="django_q", func=f, task=task)
Expand Down Expand Up @@ -408,7 +407,7 @@ def save_task(task, broker):
if task.get('chain', None):
django_q.tasks.async_chain(task['chain'], group=task['group'], cached=task['cached'], sync=task['sync'], broker=broker)
# SAVE LIMIT > 0: Prune database, SAVE_LIMIT 0: No pruning
db.close_old_connections()
close_old_django_connections()
try:
if task['success'] and 0 < Conf.SAVE_LIMIT <= Success.objects.count():
Success.objects.last().delete()
Expand Down Expand Up @@ -489,7 +488,7 @@ def scheduler(broker=None):
"""
if not broker:
broker = get_broker()
db.close_old_connections()
close_old_django_connections()
try:
with db.transaction.atomic():
for s in Schedule.objects.select_for_update().exclude(repeats=0).filter(next_run__lt=timezone.now()):
Expand Down Expand Up @@ -558,6 +557,19 @@ def scheduler(broker=None):
logger.error(e)


def close_old_django_connections():
'''
Close django connections unless running with sync=True.
'''
if Conf.SYNC:
logger.warning(
'Preserving django database connections because sync=True. Beware '
'that tasks are now injected in the calling context/transactions '
'which may result in unexpected bahaviour.')
else:
db.close_old_connections()


def set_cpu_affinity(n, process_ids, actual=not Conf.TESTING):
"""
Sets the cpu affinity for the supplied processes.
Expand Down