diff --git a/django_celery_results/backends/database.py b/django_celery_results/backends/database.py index 7f31cab6..0276e294 100644 --- a/django_celery_results/backends/database.py +++ b/django_celery_results/backends/database.py @@ -11,8 +11,7 @@ from django.db.utils import InterfaceError from kombu.exceptions import DecodeError -from ..models import ChordCounter -from ..models.helpers import groupresult_model, taskresult_model +from ..models.helpers import chordcounter_model, groupresult_model, taskresult_model from ..settings import get_task_props_extension EXCEPTIONS_TO_CATCH = (InterfaceError,) @@ -31,6 +30,7 @@ class DatabaseBackend(BaseDictBackend): TaskModel = taskresult_model() GroupModel = groupresult_model() + ChordCounterModel = chordcounter_model() subpolling_interval = 0.5 def exception_safe_to_retry(self, exc): @@ -235,7 +235,7 @@ def apply_chord(self, header_result_args, body, **kwargs): results = [r.as_tuple() for r in header_result] chord_size = body.get("chord_size", None) or len(results) data = json.dumps(results) - ChordCounter.objects.create( + self.ChordCounterModel.objects.create( group_id=header_result.id, sub_tasks=data, count=chord_size ) @@ -252,10 +252,10 @@ def on_chord_part_return(self, request, state, result, **kwargs): # SELECT FOR UPDATE is not supported on all databases try: chord_counter = ( - ChordCounter.objects.select_for_update() + self.ChordCounterModel.objects.select_for_update() .get(group_id=gid) ) - except ChordCounter.DoesNotExist: + except self.ChordCounterModel.DoesNotExist: logger.warning("Can't find ChordCounter for Group %s", gid) return chord_counter.count -= 1 diff --git a/django_celery_results/models/abstract.py b/django_celery_results/models/abstract.py index cc74d084..ac888910 100644 --- a/django_celery_results/models/abstract.py +++ b/django_celery_results/models/abstract.py @@ -1,11 +1,15 @@ """Abstract models.""" +import json + from celery import states +from celery.result import result_from_tuple from django.conf import settings from django.db import models from django.utils.translation import gettext_lazy as _ from .. import managers +from ..models.helpers import groupresult_model ALL_STATES = sorted(states.ALL_STATES) TASK_STATE_CHOICES = sorted(zip(ALL_STATES, ALL_STATES)) @@ -127,6 +131,53 @@ def __str__(self): return ''.format(self) +class AbstractChordCounter(models.Model): + """Abstract Chord synchronisation.""" + + group_id = models.CharField( + max_length=getattr( + settings, + "DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH", + 255 + ), + unique=True, + verbose_name=_("Group ID"), + help_text=_("Celery ID for the Chord header group"), + ) + sub_tasks = models.TextField( + help_text=_( + "JSON serialized list of task result tuples. " + "use .group_result() to decode" + ) + ) + count = models.PositiveIntegerField( + help_text=_( + "Starts at len(chord header) and decrements after each task is " + "finished" + ) + ) + + class Meta: + """Table information.""" + + abstract = True + + def group_result(self, app=None): + """Return the GroupResult of self. + + Arguments: + --------- + app (Celery): app instance to create the GroupResult with. + + """ + return groupresult_model()( + self.group_id, + [result_from_tuple(r, app=app) + for r in json.loads(self.sub_tasks)], + app=app + ) + + class AbstractGroupResult(models.Model): """Abstract Task Group result/status.""" diff --git a/django_celery_results/models/generic.py b/django_celery_results/models/generic.py index 582983eb..e5b5da7e 100644 --- a/django_celery_results/models/generic.py +++ b/django_celery_results/models/generic.py @@ -2,13 +2,10 @@ import json -from celery.result import GroupResult as CeleryGroupResult -from celery.result import result_from_tuple -from django.conf import settings -from django.db import models from django.utils.translation import gettext_lazy as _ from django_celery_results.models.abstract import ( + AbstractChordCounter, AbstractGroupResult, AbstractTaskResult, ) @@ -24,49 +21,13 @@ class Meta(AbstractTaskResult.Meta): app_label = "django_celery_results" -class ChordCounter(models.Model): +class ChordCounter(AbstractChordCounter): """Chord synchronisation.""" - group_id = models.CharField( - max_length=getattr( - settings, - "DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH", - 255), - unique=True, - verbose_name=_("Group ID"), - help_text=_("Celery ID for the Chord header group"), - ) - sub_tasks = models.TextField( - help_text=_( - "JSON serialized list of task result tuples. " - "use .group_result() to decode" - ) - ) - count = models.PositiveIntegerField( - help_text=_( - "Starts at len(chord header) and decrements after each task is " - "finished" - ) - ) - - class Meta: + class Meta(AbstractChordCounter.Meta): + abstract = False app_label = "django_celery_results" - def group_result(self, app=None): - """Return the GroupResult of self. - - Arguments: - --------- - app (Celery): app instance to create the GroupResult with. - - """ - return CeleryGroupResult( - self.group_id, - [result_from_tuple(r, app=app) - for r in json.loads(self.sub_tasks)], - app=app - ) - class GroupResult(AbstractGroupResult): """Task Group result/status.""" diff --git a/django_celery_results/models/helpers.py b/django_celery_results/models/helpers.py index fb64fd84..0d9da60a 100644 --- a/django_celery_results/models/helpers.py +++ b/django_celery_results/models/helpers.py @@ -2,7 +2,7 @@ from django.conf import settings from django.core.exceptions import ImproperlyConfigured -from .generic import GroupResult, TaskResult +from .generic import ChordCounter, GroupResult, TaskResult def taskresult_model(): @@ -27,6 +27,29 @@ def taskresult_model(): ) +def chordcounter_model(): + """Return the ChordCounter model that is active in this project.""" + + if not hasattr(settings, 'CELERY_RESULTS_CHORDCOUNTER_MODEL'): + return ChordCounter + + try: + return apps.get_model( + settings.CELERY_RESULTS_CHORDCOUNTER_MODEL + ) + except ValueError: + raise ImproperlyConfigured( + "CELERY_RESULTS_CHORDCOUNTER_MODEL must be of the form " + "'app_label.model_name'" + ) + except LookupError: + raise ImproperlyConfigured( + "CELERY_RESULTS_CHORDCOUNTER_MODEL refers to model " + f"'{settings.CELERY_RESULTS_CHORDCOUNTER_MODEL}' that has not " + "been installed" + ) + + def groupresult_model(): """Return the GroupResult model that is active in this project.""" if not hasattr(settings, 'CELERY_RESULTS_GROUPRESULT_MODEL'):