Skip to content

Commit

Permalink
Merge pull request #38 from valohai/priority-queue
Browse files Browse the repository at this point in the history
Add priority queue support
  • Loading branch information
ruksi authored Feb 4, 2025
2 parents e4443b3 + c32aa9c commit 61610c6
Show file tree
Hide file tree
Showing 10 changed files with 529 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ jobs:
- run: pip install build twine
- run: python -m build .
- run: twine check dist/*
- uses: actions/upload-artifact@v3
- uses: actions/upload-artifact@v4
with:
name: build
path: dist
Expand Down
42 changes: 42 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,48 @@ job = get_job(redis, job_id)
$ minique -u redis://localhost:6379/4 -q work -q anotherqueue -q thirdqueue --allow-callable 'my_jobs.*'
```

### Priority Queues

Minique supports priority queueing as an optional feature using the `enqueue_priority` API.

Priority queues are compatible with standard workers. However, priority is implemented using a
helper data structure, requiring the client needs to call `job.cleanup()` after each job and/or
`PriorityQueue(...).periodic_clean()` to prune this structure of jobs that have already been
processed.

Priority queue requires Lua scripting permissions from the Redis queue service.

```python
from redis import StrictRedis
from minique.api import enqueue_priority, get_job

# Get a Redis connection, somehow.
redis = StrictRedis.from_url('redis://localhost:6379/4')

job = enqueue_priority(
redis=redis,
queue_name='urgent_work',
callable='my_jobs.calcumacalate', # Dotted path to your callable.
kwargs={'a': 5, 'b': 5}, # Only kwargs supported.
priority=1, # Integer
# You can also set a `job_id` yourself (but it must be unique)
)

job_id = job.id # Save the job ID somewhere, maybe?

while not job.has_finished:
pass # Twiddle thumbs...

print(job.result) # Okay!

# Job priorities are stored in a helper hash table which should be cleaned using this method
# after the job has left the queue.
job.cleanup()

# Get the same job later (though not later than 7 days (by default)):
job = get_job(redis, job_id)
```

## Sentry Support

Minique automatically integrates with the [Sentry](https://sentry.io/welcome/)
Expand Down
63 changes: 54 additions & 9 deletions minique/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from minique.excs import DuplicateJob
from minique.models.job import Job
from minique.models.queue import Queue
from minique.models.priority_queue import PriorityQueue
from minique.utils import get_random_pronounceable_string


Expand Down Expand Up @@ -35,6 +36,7 @@ def enqueue(
:raises minique.excs.DuplicateJob: If a job with the same ID already exists.
:raises minique.excs.NoSuchJob: If the job does not exist right after creation.
"""
queue = Queue(redis, queue_name)
job = _define_and_store_job(
redis=redis,
callable=callable,
Expand All @@ -43,7 +45,48 @@ def enqueue(
job_ttl=job_ttl,
result_ttl=result_ttl,
encoding_name=encoding_name,
queue_name=queue_name,
queue=queue,
)
return job


def enqueue_priority(
redis: "Redis[bytes]",
queue_name: str,
callable: Union[Callable[..., Any], str],
kwargs: Optional[Dict[str, Any]] = None,
job_id: Optional[str] = None,
job_ttl: int = 0,
result_ttl: int = 86400 * 7,
encoding_name: Optional[str] = None,
priority: int = 0,
) -> Job:
"""
Queue up callable as a job, placing the job at the last place for its priority.
:param redis: Redis connection
:param queue_name: Name of the queue to enqueue the job in.
:param callable: A dotted path to the callable to execute on the worker.
:param kwargs: Keyword arguments to pass to the callable.
:param job_id: An identifier for the job; defaults to a random string.
:param job_ttl: Time-to-live for the job in seconds; defaults to never expire.
:param result_ttl: Time-to-live for the result in seconds; defaults to 7 days.
:param encoding_name: Name of the encoding to use for the job payload; defaults to JSON.
:param priority: Priority number of this job, defaults to zero.
:raises minique.excs.DuplicateJob: If a job with the same ID already exists.
:raises minique.excs.NoSuchJob: If the job does not exist right after creation.
"""
queue = PriorityQueue(redis, queue_name)
job = _define_and_store_job(
redis=redis,
callable=callable,
kwargs=kwargs,
job_id=job_id,
job_ttl=job_ttl,
result_ttl=result_ttl,
encoding_name=encoding_name,
queue=queue,
priority=priority,
)
return job

Expand Down Expand Up @@ -116,8 +159,7 @@ def cancel_job(
p.hset(job.redis_key, "status", JobStatus.CANCELLED.value)
queue_name = job.get_queue_name()
if queue_name:
queue = Queue(redis, name=queue_name)
p.lrem(queue.redis_key, 0, job.id)
job.dequeue()
if expire_time:
p.expire(job.redis_key, expire_time)
p.execute()
Expand All @@ -134,7 +176,8 @@ def _define_and_store_job(
job_ttl: int = 0,
result_ttl: int = 86400 * 7,
encoding_name: Optional[str] = None,
queue_name: Optional[str] = None,
queue: Optional[Queue] = None,
priority: Optional[int] = None,
) -> Job:
if not encoding_name:
encoding_name = encoding.default_encoding_name
Expand All @@ -159,17 +202,19 @@ def _define_and_store_job(
"job_ttl": int(job_ttl),
"result_ttl": int(result_ttl),
}
if queue_name:
payload["queue"] = queue_name
if queue:
payload["queue"] = queue.name
if priority is not None:
payload["priority"] = priority

with redis.pipeline() as p:
p.hset(job.redis_key, mapping=payload) # type: ignore[arg-type]
if payload["job_ttl"] > 0:
p.expire(job.redis_key, payload["job_ttl"])
if queue_name:
queue = Queue(redis, name=queue_name)
p.rpush(queue.redis_key, job.id)
p.execute()

if queue:
queue.add_job(job)

job.ensure_exists()
return job
4 changes: 4 additions & 0 deletions minique/excs.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,7 @@ class InvalidJob(ValueError):

class MissingJobData(ValueError):
pass


class Retry(Exception):
pass
44 changes: 39 additions & 5 deletions minique/models/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import time
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple
from typing import TYPE_CHECKING, Any, Callable, Dict, Optional, Tuple, Union

from redis import Redis

Expand All @@ -14,9 +14,12 @@
NoSuchJob,
MissingJobData,
)
from minique.utils import cached_property


if TYPE_CHECKING:
from minique.models.queue import Queue
from minique.models.priority_queue import PriorityQueue


class Job:
Expand Down Expand Up @@ -47,14 +50,23 @@ def ensure_enqueued(self) -> Tuple[bool, int]:
status = self.status
if status in (JobStatus.SUCCESS, JobStatus.FAILED, JobStatus.CANCELLED):
raise InvalidStatus(f"Job {self.id} has status {status}, will not enqueue")

return self.get_queue().ensure_enqueued(self)

def dequeue(self) -> bool:
"""
Remove the job from the queue without changing its status.
"""
num_removed = self.redis.lrem(self.get_queue().redis_key, 0, self.id)
return num_removed > 0
return self.get_queue().dequeue_job(self.id)

def cleanup(self) -> bool:
if self.has_priority:
queue = self.get_queue()
if hasattr(queue, "clean_job"): # avoids import
queue.clean_job(self)
return True

return False

@property
def redis_key(self) -> str:
Expand All @@ -74,7 +86,10 @@ def acquisition_info(self) -> Optional[Dict[str, Any]]:

@property
def has_finished(self) -> int:
return self.redis.exists(self.result_redis_key)
has_result = self.redis.exists(self.result_redis_key)
if has_result:
self.cleanup()
return has_result

@property
def has_started(self) -> bool:
Expand All @@ -88,6 +103,7 @@ def encoded_result(self) -> Optional[bytes]:
def result(self) -> Optional[Any]:
result_data = self.encoded_result
if result_data is not None:
self.cleanup()
return self.get_encoding().decode(result_data)
return None

Expand Down Expand Up @@ -140,6 +156,21 @@ def duration(self) -> float:
raise MissingJobData(f"Job {self.id} has no duration")
return float(duration)

@cached_property
def has_priority(self) -> bool:
return self.redis.hget(self.redis_key, "priority") is not None

@property
def priority(self) -> int:
priority = self.redis.hget(self.redis_key, "priority")
if priority is None:
raise MissingJobData(f"Job {self.id} has no priority")
return int(priority)

@priority.setter
def priority(self, new_priority: int) -> None:
self.redis.hset(self.redis_key, "priority", new_priority)

@property
def queue_name(self) -> str:
return self.get_queue_name(missing_ok=False) # type:ignore[return-value]
Expand Down Expand Up @@ -178,9 +209,12 @@ def encoding_name(self) -> str:
def get_encoding(self) -> encoding.BaseEncoding:
return encoding.registry[self.encoding_name]()

def get_queue(self) -> "Queue":
def get_queue(self) -> "Union[Queue, PriorityQueue]":
from minique.models.queue import Queue
from minique.models.priority_queue import PriorityQueue

if self.has_priority:
return PriorityQueue(redis=self.redis, name=self.queue_name)
return Queue(redis=self.redis, name=self.queue_name)

def set_meta(self, meta: Any) -> None:
Expand Down
Loading

0 comments on commit 61610c6

Please sign in to comment.