redis-tq is a Redis-based multi-producer, multi-consumer Queue. Allows for sharing data between multiple processes or hosts.
Tasks support a "lease time". After that time other workers may consider this client to have crashed or stalled and pick up the item instead. The number of retries can be configured as well.
Based on this example but with many improvements added.
redis-tq is available on PyPI so you can simply install via:
$ pip install redis-tq
On the producing side, populate the queue with tasks and a respective lease timeout:
from redistq import TaskQueue
tq = TaskQueue('localhost', 'myqueue')
for i in range(10):
tq.add(some task, lease_timeout, ttl=3)
On the consuming side:
from redistq import TaskQueue
tq = TaskQueue('localhost', 'myqueue')
while True:
task, task_id = tq.get()
if task is not None:
# do something with task and mark it as complete afterwards
tq.complete(task_id)
if tq.is_empty():
break
# tq.get is non-blocking, so you may want to sleep a
# bit before the next iteration
time.sleep(1)
If the consumer crashes (i.e. the task is not marked as completed after
lease_timeout
seconds), the task will be put back into the task queue. This
rescheduling will happen at most ttl
times and then the task will be
dropped. A callback can be provided if you want to monitor such cases.
The tests will check the presence of a Redis instance on localhost, you can use
docker run --rm -d -p 6379:6379 redis
to get one. Then use make test
, it will take care of creating an appropriate
virtualenv and use it to run the tests.