Skip to content

Commit

Permalink
Used new django-celery-extension
Browse files Browse the repository at this point in the history
  • Loading branch information
matllubos committed Aug 5, 2021
1 parent db70e66 commit f56ae67
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 5 deletions.
11 changes: 11 additions & 0 deletions example/apps/test_security/tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import timedelta

from settings.celery import app as celery_app

from security.task import LoggedTask
Expand Down Expand Up @@ -38,3 +40,12 @@ def retry_task(self):
unique=True)
def unique_task(self):
return 'unique'


@celery_app.task(
base=LoggedTask,
bind=True,
name='ignored_after_success_task',
ignore_task_after_success_timedelta=timedelta(hours=1, minutes=5))
def ignored_after_success_task(self):
return 'ignored_task_after_success'
14 changes: 13 additions & 1 deletion example/apps/test_security/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from security.transport import security_requests as requests
from security.utils import log_context_manager

from apps.test_security.tasks import error_task, retry_task, sum_task, unique_task
from apps.test_security.tasks import error_task, retry_task, sum_task, unique_task, ignored_after_success_task


TRUNCATION_CHAR = '…' if StrictVersion(get_version()) > StrictVersion('2.2') else '...'
Expand Down Expand Up @@ -611,6 +611,18 @@ def test_stale_succeded_celery_task_should_not_be_set_as_succeeded_with_command(
test_call_command('set_celery_task_log_state')
assert_equal(celery_task_log.refresh_from_db().state, CeleryTaskInvocationLogState.SUCCEEDED)

def test_ignored_after_success_task_should_set_ignored_state_for_second_call(self):
ignored_after_success_task.delay()
assert_equal(CeleryTaskInvocationLog.objects.count(), 1)
assert_equal(CeleryTaskRunLog.objects.count(), 1)
assert_equal(CeleryTaskInvocationLog.objects.first().state, CeleryTaskInvocationLogState.SUCCEEDED)
assert_equal(CeleryTaskRunLog.objects.first().state, CeleryTaskRunLogState.SUCCEEDED)

ignored_after_success_task.delay()
assert_equal(CeleryTaskInvocationLog.objects.count(), 2)
assert_equal(CeleryTaskRunLog.objects.count(), 1)
assert_equal(CeleryTaskInvocationLog.objects.first().state, CeleryTaskInvocationLogState.IGNORED)

@override_settings(DJANGO_CELERY_EXTENSIONS_DEFAULT_TASK_STALE_TIME_LIMIT=None)
def test_unique_task_should_have_set_stale_limit(self):
with assert_raises(CeleryError):
Expand Down
2 changes: 1 addition & 1 deletion example/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Django~=3.1
django-germanium==2.2.0
django-germanium==2.2.10
coverage==5.0.4
django-reversion>=2.0.13
responses==0.9.0
Expand Down
1 change: 1 addition & 0 deletions security/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class CeleryTaskInvocationLogState(ChoiceEnum):
FAILED = Choice(4, _('Failed'))
EXPIRED = Choice(6, _('Expired'))
TIMEOUT = Choice(8, _('Timeout'))
IGNORED = Choice(9, _('Ignored'))


class CeleryTaskRunLogState(ChoiceEnum):
Expand Down
24 changes: 22 additions & 2 deletions security/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ def _create_invocation_log(self, invocation_id, task_args, task_kwargs, apply_ti
return celery_task_log

def _update_invocation_log(self, invocation_id, trigger_time, is_duplicate, eta, expires, stale_time_limit,
task_id):
state, task_id):
celery_task_log = CeleryTaskInvocationLog.objects.get(invocation_id=invocation_id)
return celery_task_log.change_and_save(
estimated_time_of_first_arrival=eta,
expires_at=expires,
triggered_at=trigger_time,
stale_at=trigger_time + timedelta(seconds=stale_time_limit) if stale_time_limit is not None else None,
is_duplicate=is_duplicate,
state=CeleryTaskInvocationLogState.TRIGGERED,
state=state,
celery_task_id=task_id
)

Expand Down Expand Up @@ -148,6 +148,7 @@ def on_invocation_trigger(self, invocation_id, args, kwargs, task_id, options):
eta=options.get('eta'),
expires=options.get('expires'),
stale_time_limit=options.get('stale_time_limit'),
state=CeleryTaskInvocationLogState.TRIGGERED,
task_id=task_id
)
self.on_invocation_trigger_log(invocation_log)
Expand All @@ -165,10 +166,29 @@ def on_invocation_unique(self, invocation_id, args, kwargs, task_id, options):
eta=options.get('eta'),
expires=options.get('expires'),
stale_time_limit=options.get('stale_time_limit'),
state=CeleryTaskInvocationLogState.TRIGGERED,
task_id=task_id
)
self.on_invocation_unique_log(invocation_log)

def on_invocation_ignored_log(self, invocation_log):
pass

def on_invocation_ignored(self, invocation_id, args, kwargs, task_id, options):
super().on_invocation_ignored(invocation_id, args, kwargs, task_id, options)

invocation_log = self._update_invocation_log(
invocation_id,
trigger_time=options.get('trigger_time'),
is_duplicate=False,
eta=options.get('eta'),
expires=options.get('expires'),
stale_time_limit=options.get('stale_time_limit'),
state=CeleryTaskInvocationLogState.IGNORED,
task_id=task_id
)
self.on_invocation_ignored_log(invocation_log)

def on_invocation_timeout_log(self, invocation_log):
pass

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def read(fname):
'attrdict>=2.0.1',
'django-choice-enumfields>=1.0.5',
'django-generic-m2m-field>=0.0.4',
'django-celery-extensions>=0.0.10',
'django-celery-extensions>=0.0.13',
],
zip_safe=False
)

0 comments on commit f56ae67

Please sign in to comment.