Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Used new django-celery-extension #95

Merged
merged 1 commit into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions example/apps/test_security/tasks.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from datetime import timedelta

from settings.celery import app as celery_app

from security.task import LoggedTask
Expand Down Expand Up @@ -38,3 +40,12 @@ def retry_task(self):
unique=True)
def unique_task(self):
return 'unique'


@celery_app.task(
base=LoggedTask,
bind=True,
name='ignored_after_success_task',
ignore_task_after_success_timedelta=timedelta(hours=1, minutes=5))
def ignored_after_success_task(self):
return 'ignored_task_after_success'
14 changes: 13 additions & 1 deletion example/apps/test_security/tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from security.transport import security_requests as requests
from security.utils import log_context_manager

from apps.test_security.tasks import error_task, retry_task, sum_task, unique_task
from apps.test_security.tasks import error_task, retry_task, sum_task, unique_task, ignored_after_success_task


TRUNCATION_CHAR = '…' if StrictVersion(get_version()) > StrictVersion('2.2') else '...'
Expand Down Expand Up @@ -611,6 +611,18 @@ def test_stale_succeded_celery_task_should_not_be_set_as_succeeded_with_command(
test_call_command('set_celery_task_log_state')
assert_equal(celery_task_log.refresh_from_db().state, CeleryTaskInvocationLogState.SUCCEEDED)

def test_ignored_after_success_task_should_set_ignored_state_for_second_call(self):
ignored_after_success_task.delay()
assert_equal(CeleryTaskInvocationLog.objects.count(), 1)
assert_equal(CeleryTaskRunLog.objects.count(), 1)
assert_equal(CeleryTaskInvocationLog.objects.first().state, CeleryTaskInvocationLogState.SUCCEEDED)
assert_equal(CeleryTaskRunLog.objects.first().state, CeleryTaskRunLogState.SUCCEEDED)

ignored_after_success_task.delay()
assert_equal(CeleryTaskInvocationLog.objects.count(), 2)
assert_equal(CeleryTaskRunLog.objects.count(), 1)
assert_equal(CeleryTaskInvocationLog.objects.first().state, CeleryTaskInvocationLogState.IGNORED)

@override_settings(DJANGO_CELERY_EXTENSIONS_DEFAULT_TASK_STALE_TIME_LIMIT=None)
def test_unique_task_should_have_set_stale_limit(self):
with assert_raises(CeleryError):
Expand Down
2 changes: 1 addition & 1 deletion example/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
Django~=3.1
django-germanium==2.2.0
django-germanium==2.2.10
coverage==5.0.4
django-reversion>=2.0.13
responses==0.9.0
Expand Down
1 change: 1 addition & 0 deletions security/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ class CeleryTaskInvocationLogState(ChoiceEnum):
FAILED = Choice(4, _('Failed'))
EXPIRED = Choice(6, _('Expired'))
TIMEOUT = Choice(8, _('Timeout'))
IGNORED = Choice(9, _('Ignored'))


class CeleryTaskRunLogState(ChoiceEnum):
Expand Down
24 changes: 22 additions & 2 deletions security/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ def _create_invocation_log(self, invocation_id, task_args, task_kwargs, apply_ti
return celery_task_log

def _update_invocation_log(self, invocation_id, trigger_time, is_duplicate, eta, expires, stale_time_limit,
task_id):
state, task_id):
celery_task_log = CeleryTaskInvocationLog.objects.get(invocation_id=invocation_id)
return celery_task_log.change_and_save(
estimated_time_of_first_arrival=eta,
expires_at=expires,
triggered_at=trigger_time,
stale_at=trigger_time + timedelta(seconds=stale_time_limit) if stale_time_limit is not None else None,
is_duplicate=is_duplicate,
state=CeleryTaskInvocationLogState.TRIGGERED,
state=state,
celery_task_id=task_id
)

Expand Down Expand Up @@ -148,6 +148,7 @@ def on_invocation_trigger(self, invocation_id, args, kwargs, task_id, options):
eta=options.get('eta'),
expires=options.get('expires'),
stale_time_limit=options.get('stale_time_limit'),
state=CeleryTaskInvocationLogState.TRIGGERED,
task_id=task_id
)
self.on_invocation_trigger_log(invocation_log)
Expand All @@ -165,10 +166,29 @@ def on_invocation_unique(self, invocation_id, args, kwargs, task_id, options):
eta=options.get('eta'),
expires=options.get('expires'),
stale_time_limit=options.get('stale_time_limit'),
state=CeleryTaskInvocationLogState.TRIGGERED,
task_id=task_id
)
self.on_invocation_unique_log(invocation_log)

def on_invocation_ignored_log(self, invocation_log):
pass

def on_invocation_ignored(self, invocation_id, args, kwargs, task_id, options):
super().on_invocation_ignored(invocation_id, args, kwargs, task_id, options)

invocation_log = self._update_invocation_log(
invocation_id,
trigger_time=options.get('trigger_time'),
is_duplicate=False,
eta=options.get('eta'),
expires=options.get('expires'),
stale_time_limit=options.get('stale_time_limit'),
state=CeleryTaskInvocationLogState.IGNORED,
task_id=task_id
)
self.on_invocation_ignored_log(invocation_log)

def on_invocation_timeout_log(self, invocation_log):
pass

Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def read(fname):
'attrdict>=2.0.1',
'django-choice-enumfields>=1.0.5',
'django-generic-m2m-field>=0.0.4',
'django-celery-extensions>=0.0.10',
'django-celery-extensions>=0.0.13',
],
zip_safe=False
)