-
Notifications
You must be signed in to change notification settings - Fork 93
/
Copy pathtasks.py
141 lines (118 loc) · 4.88 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# -*- coding: utf-8 -*-
"""Definition of the QueueOnce task and AlreadyQueued exception."""
from celery import Task, states
from celery.result import EagerResult
from .helpers import queue_once_key, import_backend
class AlreadyQueued(Exception):
def __init__(self, countdown):
self.message = "Expires in {} seconds".format(countdown)
self.countdown = countdown
try:
from inspect import signature
except:
from funcsigs import signature
class QueueOnce(Task):
abstract = True
once = {
'graceful': False,
'unlock_before_run': False
}
"""
'There can be only one'. - Highlander (1986)
An abstract tasks with the ability to detect if it has already been queued.
When running the task (through .delay/.apply_async) it checks if the tasks
is not already queued. By default it will raise an
an AlreadyQueued exception if it is, by you can silence this by including
`once={'graceful': True}` in apply_async or in the task's settings.
Example:
>>> from celery_queue.tasks import QueueOnce
>>> from celery import task
>>> @task(base=QueueOnce, once={'graceful': True})
>>> def example(time):
>>> from time import sleep
>>> sleep(time)
"""
@property
def config(self):
app = self._get_app()
return app.conf
@property
def once_config(self):
return self.config.ONCE
@property
def once_backend(self):
return import_backend(self.once_config)
@property
def default_timeout(self):
return self.once_config['settings'].get('default_timeout', 60 * 60)
def unlock_before_run(self):
return self.once.get('unlock_before_run', False)
def __init__(self, *args, **kwargs):
self._signature = signature(self.run)
return super(QueueOnce, self).__init__(*args, **kwargs)
def __call__(self, *args, **kwargs):
# Only clear the lock before the task's execution if the
# "unlock_before_run" option is True
if self.unlock_before_run():
key = self.get_key(args, kwargs)
self.once_backend.clear_lock(key)
return super(QueueOnce, self).__call__(*args, **kwargs)
def apply_async(self, args=None, kwargs=None, **options):
"""
Attempts to queues a task.
Will raises an AlreadyQueued exception if already queued.
:param \*args: positional arguments passed on to the task.
:param \*\*kwargs: keyword arguments passed on to the task.
:keyword \*\*once: (optional)
:param: graceful: (optional)
If True, wouldn't raise an exception if already queued.
Instead will return none.
:param: timeout: (optional)
An `int' number of seconds after which the lock will expire.
If not set, defaults to 1 hour.
:param: keys: (optional)
"""
once_options = options.get('once', {})
once_graceful = once_options.get(
'graceful', self.once.get('graceful', False))
once_timeout = once_options.get(
'timeout', self.once.get('timeout', self.default_timeout))
if not options.get('retries'):
key = self.get_key(args, kwargs)
try:
self.once_backend.raise_or_lock(key, timeout=once_timeout)
except AlreadyQueued as e:
if once_graceful:
return EagerResult(None, None, states.REJECTED)
raise e
return super(QueueOnce, self).apply_async(args, kwargs, **options)
def _get_call_args(self, args, kwargs):
call_args = self._signature.bind(*args, **kwargs).arguments
# Remove the task instance from the kwargs. This only happens when the
# task has the 'bind' attribute set to True. We remove it, as the task
# has a memory pointer in its repr, that will change between the task
# caller and the celery worker
if isinstance(call_args.get('self'), Task):
del call_args['self']
return call_args
def get_key(self, args=None, kwargs=None):
"""
Generate the key from the name of the task (e.g. 'tasks.example') and
args/kwargs.
"""
restrict_to = self.once.get('keys', None)
args = args or {}
kwargs = kwargs or {}
call_args = self._get_call_args(args, kwargs)
key = queue_once_key(self.name, call_args, restrict_to)
return key
def after_return(self, status, retval, task_id, args, kwargs, einfo):
"""
After a task has run (both successfully or with a failure) clear the
lock if "unlock_before_run" is False.
"""
# Only clear the lock after the task's execution if the
# "unlock_before_run" option is False
if not self.unlock_before_run():
key = self.get_key(args, kwargs)
self.once_backend.clear_lock(key)