Skip to content

Commit

Permalink
feat(models): add AbstractChordCounter and update ChordCounter
Browse files Browse the repository at this point in the history
- Added new abstract model `AbstractChordCounter` in `abstract.py` for Chord synchronization, including fields for `group_id`, `sub_tasks`, and `count`.
- Updated `ChordCounter` model in `generic.py` to inherit from `AbstractChordCounter`.
- Moved `group_result` method from `ChordCounter` to `AbstractChordCounter`.
- Added helper function `chordcounter_model` in `helpers.py` to return the active `ChordCounter` model.
- Updated import statements in `database.py` to use `chordcounter_model` helper function.
- Added `ChordCounterModel` attribute to `DatabaseBackend` class, and updated `create` and `get` methods to use `ChordCounterModel` instead of `ChordCounter`.

Relates to: #305
  • Loading branch information
diegocastrum committed Oct 21, 2024
1 parent a0ff0ea commit 5ca6730
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 49 deletions.
10 changes: 5 additions & 5 deletions django_celery_results/backends/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
from django.db.utils import InterfaceError
from kombu.exceptions import DecodeError

from ..models import ChordCounter
from ..models.helpers import groupresult_model, taskresult_model
from ..models.helpers import chordcounter_model, groupresult_model, taskresult_model
from ..settings import get_task_props_extension

EXCEPTIONS_TO_CATCH = (InterfaceError,)
Expand All @@ -31,6 +30,7 @@ class DatabaseBackend(BaseDictBackend):

TaskModel = taskresult_model()
GroupModel = groupresult_model()
ChordCounterModel = chordcounter_model()
subpolling_interval = 0.5

def exception_safe_to_retry(self, exc):
Expand Down Expand Up @@ -235,7 +235,7 @@ def apply_chord(self, header_result_args, body, **kwargs):
results = [r.as_tuple() for r in header_result]
chord_size = body.get("chord_size", None) or len(results)
data = json.dumps(results)
ChordCounter.objects.create(
self.ChordCounterModel.objects.create(
group_id=header_result.id, sub_tasks=data, count=chord_size
)

Expand All @@ -252,10 +252,10 @@ def on_chord_part_return(self, request, state, result, **kwargs):
# SELECT FOR UPDATE is not supported on all databases
try:
chord_counter = (
ChordCounter.objects.select_for_update()
self.ChordCounterModel.objects.select_for_update()
.get(group_id=gid)
)
except ChordCounter.DoesNotExist:
except self.ChordCounterModel.DoesNotExist:
logger.warning("Can't find ChordCounter for Group %s", gid)
return
chord_counter.count -= 1
Expand Down
51 changes: 51 additions & 0 deletions django_celery_results/models/abstract.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
"""Abstract models."""

import json

from celery import states
from celery.result import result_from_tuple
from django.conf import settings
from django.db import models
from django.utils.translation import gettext_lazy as _

from .. import managers
from ..models.helpers import groupresult_model

ALL_STATES = sorted(states.ALL_STATES)
TASK_STATE_CHOICES = sorted(zip(ALL_STATES, ALL_STATES))
Expand Down Expand Up @@ -127,6 +131,53 @@ def __str__(self):
return '<Task: {0.task_id} ({0.status})>'.format(self)


class AbstractChordCounter(models.Model):
"""Abstract Chord synchronisation."""

group_id = models.CharField(
max_length=getattr(
settings,
"DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH",
255
),
unique=True,
verbose_name=_("Group ID"),
help_text=_("Celery ID for the Chord header group"),
)
sub_tasks = models.TextField(
help_text=_(
"JSON serialized list of task result tuples. "
"use .group_result() to decode"
)
)
count = models.PositiveIntegerField(
help_text=_(
"Starts at len(chord header) and decrements after each task is "
"finished"
)
)

class Meta:
"""Table information."""

abstract = True

def group_result(self, app=None):
"""Return the GroupResult of self.
Arguments:
---------
app (Celery): app instance to create the GroupResult with.
"""
return groupresult_model()(
self.group_id,
[result_from_tuple(r, app=app)
for r in json.loads(self.sub_tasks)],
app=app
)


class AbstractGroupResult(models.Model):
"""Abstract Task Group result/status."""

Expand Down
47 changes: 4 additions & 43 deletions django_celery_results/models/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@

import json

from celery.result import GroupResult as CeleryGroupResult
from celery.result import result_from_tuple
from django.conf import settings
from django.db import models
from django.utils.translation import gettext_lazy as _

from django_celery_results.models.abstract import (
AbstractChordCounter,
AbstractGroupResult,
AbstractTaskResult,
)
Expand All @@ -24,49 +21,13 @@ class Meta(AbstractTaskResult.Meta):
app_label = "django_celery_results"


class ChordCounter(models.Model):
class ChordCounter(AbstractChordCounter):
"""Chord synchronisation."""

group_id = models.CharField(
max_length=getattr(
settings,
"DJANGO_CELERY_RESULTS_TASK_ID_MAX_LENGTH",
255),
unique=True,
verbose_name=_("Group ID"),
help_text=_("Celery ID for the Chord header group"),
)
sub_tasks = models.TextField(
help_text=_(
"JSON serialized list of task result tuples. "
"use .group_result() to decode"
)
)
count = models.PositiveIntegerField(
help_text=_(
"Starts at len(chord header) and decrements after each task is "
"finished"
)
)

class Meta:
class Meta(AbstractChordCounter.Meta):
abstract = False
app_label = "django_celery_results"

def group_result(self, app=None):
"""Return the GroupResult of self.
Arguments:
---------
app (Celery): app instance to create the GroupResult with.
"""
return CeleryGroupResult(
self.group_id,
[result_from_tuple(r, app=app)
for r in json.loads(self.sub_tasks)],
app=app
)


class GroupResult(AbstractGroupResult):
"""Task Group result/status."""
Expand Down
25 changes: 24 additions & 1 deletion django_celery_results/models/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured

from .generic import GroupResult, TaskResult
from .generic import ChordCounter, GroupResult, TaskResult


def taskresult_model():
Expand All @@ -27,6 +27,29 @@ def taskresult_model():
)


def chordcounter_model():
"""Return the ChordCounter model that is active in this project."""

if not hasattr(settings, 'CELERY_RESULTS_CHORDCOUNTER_MODEL'):
return ChordCounter

try:
return apps.get_model(
settings.CELERY_RESULTS_CHORDCOUNTER_MODEL
)
except ValueError:
raise ImproperlyConfigured(
"CELERY_RESULTS_CHORDCOUNTER_MODEL must be of the form "
"'app_label.model_name'"
)
except LookupError:
raise ImproperlyConfigured(
"CELERY_RESULTS_CHORDCOUNTER_MODEL refers to model "
f"'{settings.CELERY_RESULTS_CHORDCOUNTER_MODEL}' that has not "
"been installed"
)


def groupresult_model():
"""Return the GroupResult model that is active in this project."""
if not hasattr(settings, 'CELERY_RESULTS_GROUPRESULT_MODEL'):
Expand Down

0 comments on commit 5ca6730

Please sign in to comment.