-
Notifications
You must be signed in to change notification settings - Fork 0
/
tasks.py
65 lines (48 loc) · 1.53 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
import time
import os
import dramatiq
# setup redis broker
from dramatiq.brokers.redis import RedisBroker
from dramatiq.brokers.stub import StubBroker
from dramatiq.results.backends import RedisBackend, StubBackend
from dramatiq.results import Results
if os.getenv("UNIT_TESTS") == "1":
broker = StubBroker()
stub_backend = StubBackend()
broker.add_middleware(Results(backend=stub_backend))
dramatiq.set_broker(broker)
else:
broker = RedisBroker(host="redis")
results_backend = RedisBackend(host="redis")
broker.add_middleware(Results(backend=results_backend))
dramatiq.set_broker(broker)
@dramatiq.actor
def hello_queue():
print("Hello world!")
class ClassBasedActor(dramatiq.GenericActor):
def perform(self):
print("Hello from class based actor!")
@dramatiq.actor(max_retries=2)
def task_to_retry():
print("Running task")
raise RuntimeError("Error occurred")
@dramatiq.actor(max_age=1000, time_limit=5000)
def time_limited_task():
print("I must hurry")
try:
time.sleep(6.0)
except dramatiq.middleware.time_limit.TimeLimitExceeded as e:
print("Ugh, I didn't get it done in time")
@dramatiq.actor(priority=0)
def high_priority_task(index):
print(f"I'm a very important {index}")
time.sleep(1.0)
@dramatiq.actor(priority=100)
def low_priority_task(index):
print(f"I'm not so important {index}")
time.sleep(2.0)
@dramatiq.actor(store_results=True)
def sleep_and_add(x, y):
print("Got adding task, calculating")
time.sleep(2.0)
return x + y